Remove data callbacks in connection and digipeater modules
Some checks failed
/ build-hamnet70 (push) Failing after 31s
/ build-doc (push) Successful in 49s
/ deploy-doc (push) Has been skipped

This commit is contained in:
Thomas Kolb 2024-11-17 18:26:18 +01:00
parent e3a06685e4
commit 627eed7426
9 changed files with 249 additions and 249 deletions

View file

@ -1,3 +1,4 @@
#include "layer2/packet_structs.h"
#include <string.h>
#include <assert.h>
@ -18,7 +19,6 @@ result_t connection_init(
connection_ctx_t *ctx,
const ham64_t *my_addr,
const ham64_t *peer_addr,
connection_data_callback_t data_cb,
connection_event_callback_t event_cb,
void *user_ctx)
{
@ -32,7 +32,6 @@ result_t connection_init(
ctx->my_addr = *my_addr;
ctx->peer_addr = *peer_addr;
ctx->data_cb = data_cb;
ctx->event_cb = event_cb;
uint64_t now = get_hires_time();
@ -79,7 +78,7 @@ void connection_destroy(connection_ctx_t *ctx)
}
result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, size_t buf_len)
result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, size_t buf_len, layer2_data_packet_t *data_packet)
{
// check the connection state
switch(ctx->conn_state) {
@ -135,14 +134,18 @@ result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, siz
const uint8_t *payload = buf + header_size;
size_t payload_len = packet_size - header_size;
return connection_handle_packet_prechecked(ctx, &header, payload, payload_len);
return connection_handle_packet_prechecked(ctx, &header, payload, payload_len, data_packet);
}
result_t connection_handle_packet_prechecked(
connection_ctx_t *ctx,
const layer2_packet_header_t *header,
const uint8_t *payload, size_t payload_len)
const uint8_t *payload, size_t payload_len,
layer2_data_packet_t *data_packet)
{
data_packet->payload_type = L2_PAYLOAD_TYPE_INVALID;
data_packet->payload_len = 0;
// check the connection state
switch(ctx->conn_state) {
case CONN_STATE_UNINITIALIZED:
@ -182,7 +185,7 @@ result_t connection_handle_packet_prechecked(
LOG(LVL_DEBUG, "Empty packet: accepted ACK for %u.", ctx->last_acked_seq);
// handle the acknowledgement internally
connection_handle_ack(ctx, header->rx_seq_nr, false);
return OK; // do not ACK and call back
return OK; // do not ACK
case L2_MSG_TYPE_CONN_MGMT:
case L2_MSG_TYPE_CONNECTIONLESS:
@ -212,8 +215,10 @@ result_t connection_handle_packet_prechecked(
// handle the acknowledgement internally
connection_handle_ack(ctx, header->rx_seq_nr, true);
// extract the payload and forward it to the tun device
ctx->data_cb(ctx, payload, payload_len, ctx->user_context);
// pass the decoded data back to the user
data_packet->payload_type = payload[0];
data_packet->payload = payload + 1;
data_packet->payload_len = payload_len - 1;
return OK;
}

View file

@ -26,9 +26,6 @@ typedef enum {
CONN_STATE_CLOSED //!< Connection has been closed (gracefully or by timeout)
} connection_state_t;
/*!\brief Type for a callback function that is called when a data packet was received. */
typedef void (*connection_data_callback_t)(struct connection_ctx_s *conn, const uint8_t *data, size_t len, void *user_ctx);
typedef enum {
CONN_EVT_TIMEOUT, //!< The connection timed out because no packets were received
CONN_EVT_RETRANSMIT, //!< Packet queue transmission is restarted
@ -40,7 +37,6 @@ typedef void (*connection_event_callback_t)(struct connection_ctx_s *conn, conne
typedef struct connection_ctx_s {
connection_state_t conn_state; //!< State of the connection.
connection_data_callback_t data_cb; //!< Callback function for received data packets.
connection_event_callback_t event_cb; //!< Callback function for event signalling.
ham64_t my_addr; //!< The local link layer address.
@ -68,7 +64,6 @@ typedef struct connection_ctx_s {
* \param ctx The connection context to initialize.
* \param my_addr The local link layer address.
* \param peer_addr The remote link layer address.
* \param data_cb Callback for handling received payload data.
* \param event_cb Callback for connection events.
* \param user_ctx User context pointer (for arbitrary data).
* \returns OK if everything worked or a fitting error code.
@ -77,7 +72,6 @@ result_t connection_init(
connection_ctx_t *ctx,
const ham64_t *my_addr,
const ham64_t *peer_addr,
connection_data_callback_t data_cb,
connection_event_callback_t event_cb,
void *user_ctx);
@ -87,12 +81,17 @@ void connection_destroy(connection_ctx_t *ctx);
/*!\brief Handle a received packet.
*
* \param ctx The connection context.
* \param buf Pointer to the packet data.
* \param buf_len Length of the packet.
* \param[inout] ctx The connection context.
* \param[in] buf Pointer to the packet data.
* \param[in] buf_len Length of the packet.
* \param[out] data_packet Structure will be filled with a received data packet.
* \returns A result code from the packet handling procedure.
*/
result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, size_t buf_len);
result_t connection_handle_packet(
connection_ctx_t *ctx,
const uint8_t *buf,
size_t buf_len,
layer2_data_packet_t *data_packet);
/*!\brief Handle a received packet where the header has already been decoded.
*
@ -101,16 +100,18 @@ result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, siz
* - Header can be decoded
* - Destination address is the local address
*
* \param ctx The connection context.
* \param header Pointer to the decoded header structure.
* \param payload Pointer to the payload data.
* \param payload_len Length of the payload data.
* \param[inout] ctx The connection context.
* \param[in] header Pointer to the decoded header structure.
* \param[in] payload Pointer to the payload data.
* \param[in] payload_len Length of the payload data.
* \param[out] data_packet Structure will be filled with a received data packet.
* \returns A result code from the packet handling procedure.
*/
result_t connection_handle_packet_prechecked(
connection_ctx_t *ctx,
const layer2_packet_header_t *header,
const uint8_t *payload, size_t payload_len);
const uint8_t *payload, size_t payload_len,
layer2_data_packet_t *data_packet);
/*!\brief Return the sequence number expected next by our side.
*/

View file

@ -8,7 +8,7 @@
#define SEQ_NR_MASK 0xF
static destroy_entry(connection_list_entry_t *entry)
static void destroy_entry(connection_list_entry_t *entry)
{
connection_destroy(&entry->connection);
free(entry);

View file

@ -17,14 +17,6 @@
#include "results.h"
#include "utils.h"
void conn_data_cb(struct connection_ctx_s *conn, const uint8_t *data, size_t len, void *user_ctx)
{
(void)conn;
digipeater_ctx_t *digi_ctx = user_ctx;
digi_ctx->data_cb(digi_ctx, data, len);
}
void conn_event_cb(struct connection_ctx_s *conn, connection_evt_t evt, void *user_ctx)
{
(void)conn;
@ -68,7 +60,7 @@ static result_t digipeater_handle_beacon_responses(digipeater_ctx_t *ctx, const
// and add it at the beginning of the connection list.
connection_ctx_t new_conn;
ERR_CHECK(connection_init(&new_conn, &ctx->my_addr, &header->src_addr, conn_data_cb, conn_event_cb, ctx));
ERR_CHECK(connection_init(&new_conn, &ctx->my_addr, &header->src_addr, conn_event_cb, ctx));
ERR_CHECK(connection_send_parameters(&new_conn));
@ -98,10 +90,9 @@ static size_t encode_beacon_packet(digipeater_ctx_t *ctx, uint8_t *buf, size_t b
}
result_t digipeater_init(digipeater_ctx_t *ctx, const ham64_t *my_addr, digipeater_data_callback_t data_cb)
result_t digipeater_init(digipeater_ctx_t *ctx, const ham64_t *my_addr)
{
ctx->my_addr = *my_addr;
ctx->data_cb = data_cb;
ctx->state = DIGIPEATER_STATE_CONN;
@ -122,8 +113,15 @@ void digipeater_destroy(digipeater_ctx_t *ctx)
}
result_t digipeater_handle_packet(digipeater_ctx_t *ctx, const uint8_t *buf, size_t buf_len)
result_t digipeater_handle_packet(
digipeater_ctx_t *ctx,
const uint8_t *buf,
size_t buf_len,
layer2_data_packet_t *data_packet)
{
data_packet->payload_type = L2_PAYLOAD_TYPE_INVALID;
data_packet->payload_len = 0;
// check the CRC
size_t packet_size = buf_len - crc_sizeof_key(PAYLOAD_CRC_SCHEME);
@ -177,12 +175,14 @@ result_t digipeater_handle_packet(digipeater_ctx_t *ctx, const uint8_t *buf, siz
connection_list_entry_t *head = connection_list_get_head(&ctx->conn_list);
if(head) {
connection_ctx_t *current_conn = &head->connection;
result = connection_handle_packet_prechecked(current_conn, &header, payload, payload_len);
result = connection_handle_packet_prechecked(
current_conn, &header, payload, payload_len, data_packet);
} else {
LOG(LVL_WARN, "Digipeater in CONN state, but there is no active connection! Packet dropped.");
result = OK;
}
}
break;
case DIGIPEATER_STATE_BEACON:
result = digipeater_handle_beacon_responses(ctx, &header, payload, payload_len);
@ -257,8 +257,6 @@ result_t digipeater_fill_packet_queues_from_tundev(digipeater_ctx_t *ctx, int tu
size_t digipeater_encode_next_packet(digipeater_ctx_t *ctx, uint8_t *buf, size_t buf_len, bool *end_burst)
{
uint64_t now = get_hires_time();
size_t packet_size = 0;
*end_burst = false;

View file

@ -26,16 +26,12 @@ typedef enum {
DIGIPEATER_EVT_INTERVAL_END, //!< The current cycle has ended and new packets should be transmitted.
} digipeater_evt_t;
/*!\brief Type for a callback function that is called when a data packet was received. */
typedef void (*digipeater_data_callback_t)(struct digipeater_ctx_s *digi, const uint8_t *data, size_t len);
/*!\brief Type for a callback function that is called when certain events occur. */
typedef void (*digipeater_evt_callback_t)(struct digipeater_ctx_s *digi, digipeater_evt_t evt);
typedef struct digipeater_ctx_s {
digipeater_state_t state; //!< Current operating state
digipeater_data_callback_t data_cb; //!< Callback function for received data packets.
digipeater_evt_callback_t event_cb; //!< Callback function for events.
ham64_t my_addr; //!< The local link layer address.
@ -53,13 +49,11 @@ typedef struct digipeater_ctx_s {
*
* \param ctx The digipeater context to initialize.
* \param my_addr The local link layer address.
* \param data_cb Callback function that handles received (decoded) data packets.
* \returns OK if everything worked or a fitting error code.
*/
result_t digipeater_init(
digipeater_ctx_t *ctx,
const ham64_t *my_addr,
digipeater_data_callback_t data_cb);
const ham64_t *my_addr);
/*!\brief Destroy the given digipeater context.
*/
@ -67,12 +61,17 @@ void digipeater_destroy(digipeater_ctx_t *ctx);
/*!\brief Handle a received packet.
*
* \param ctx The digipeater context.
* \param buf Pointer to the packet data.
* \param buf_len Length of the packet.
* \param[inout] ctx The digipeater context.
* \param[in] buf Pointer to the packet data.
* \param[in] buf_len Length of the packet.
* \param[out] data_packet Structure will be filled with a received data packet.
* \returns A result code from the packet handling procedure.
*/
result_t digipeater_handle_packet(digipeater_ctx_t *ctx, const uint8_t *buf, size_t buf_len);
result_t digipeater_handle_packet(
digipeater_ctx_t *ctx,
const uint8_t *buf,
size_t buf_len,
layer2_data_packet_t *data_packet);
/*!\brief Enqueue a packet for transmission.
* \param ctx The digipeater context.
@ -113,7 +112,8 @@ void digipeater_extend_cycle(digipeater_ctx_t *ctx, uint64_t ns);
/*!\brief End the current cycle.
*
* End the cycle without waiting for the timeout. This switches to the next
* connection and stop forwarding received packets to the current one.
* connection or transmits a beacon. In any case, it stops forwarding received
* packets to the current connection.
*/
result_t digipeater_end_cycle(digipeater_ctx_t *ctx);

View file

@ -96,12 +96,17 @@ const char* layer2_msg_type_to_string(layer2_message_type_t type);
typedef enum {
L2_PAYLOAD_TYPE_IPV4 = 0x00,
L2_PAYLOAD_TYPE_IPV6 = 0x01
L2_PAYLOAD_TYPE_IPV6 = 0x01,
L2_PAYLOAD_TYPE_INVALID = 0x7FFFFFFF
} layer2_payload_type_t;
typedef struct layer2_data_header_s {
typedef struct layer2_data_packet_s {
layer2_payload_type_t payload_type; //!< Type of the contained layer 3 packet
} layer2_data_header_t;
const uint8_t *payload; //!< Pointer to the payload data
size_t payload_len; //!< Length of the payload data
} layer2_data_packet_t;
/* Connection Management Structs */

View file

@ -56,6 +56,10 @@ add_executable(
../../src/layer2/packet_queue.h
../../src/layer2/connection.c
../../src/layer2/connection.h
../../src/layer2/connection_list.c
../../src/layer2/connection_list.h
../../src/layer2/digipeater.c
../../src/layer2/digipeater.h
../../src/layer2/tundev.c
../../src/layer2/tundev.h
l2udptest_digipeater.c

View file

@ -25,7 +25,6 @@
#define LOGGER_MODULE_NAME "main"
#include "logger.h"
#include "options.h"
#include "jsonlogger.h"
#include "debug_structs.h"
@ -75,27 +74,6 @@ static void block_tx_for(unsigned offset_ms)
void handle_received_packet(uint8_t *packet_data, size_t packet_len)
{
block_tx_for(TX_SWITCH_BACKOFF_END_OF_PACKET_MS);
result_t result = connection_handle_packet(&l2conn, packet_data, packet_len);
switch(result) {
case OK:
m_rx_stats.successful_decodes++;
break;
case ERR_INTEGRITY:
LOG(LVL_ERR, "Packet could not be decoded by Layer 2.");
m_rx_stats.failed_decodes++;
break;
case ERR_SEQUENCE:
LOG(LVL_ERR, "Packet not in the expected sequence.");
break;
default: // all other errors
LOG(LVL_ERR, "layer2_rx_handle_packet() returned error code %u.", result);
break;
}
}
@ -120,17 +98,35 @@ static result_t transmit(const uint8_t *data, size_t len)
}
void rx_data_to_tun(struct connection_ctx_s *conn, const uint8_t *data, size_t len, void *user_ctx)
void rx_data_to_tun(const layer2_data_packet_t *data_packet)
{
(void)conn;
(void)user_ctx;
uint8_t tun_packet[4 + data_packet->payload_len];
int ret = write(m_tunfd, data, len);
// flags
tun_packet[0] = 0;
tun_packet[1] = 0;
switch(data_packet->payload_type) {
case L2_PAYLOAD_TYPE_IPV6:
*(uint16_t*)(tun_packet+2) = 0x86dd;
break;
case L2_PAYLOAD_TYPE_IPV4:
*(uint16_t*)(tun_packet+2) = 0x0800;
break;
default:
LOG(LVL_ERR, "Unsupported payload type: 0x%08x.", data_packet->payload_type);
return;
}
memcpy(tun_packet+4, data_packet->payload, data_packet->payload_len);
int ret = write(m_tunfd, tun_packet, sizeof(tun_packet));
if(ret < 0) {
LOG(LVL_ERR, "write(tun): %s", strerror(errno));
}
}
void conn_evt_cb(struct connection_ctx_s *conn, connection_evt_t evt, void *user_ctx)
{
(void)conn;
@ -165,7 +161,7 @@ int main(int argc, char **argv)
ham64_t my_address, peer_address;
ham64_encode(MY_CALL, &my_address);
ham64_encode(PEER_CALL, &peer_address);
RESULT_CHECK(connection_init(&l2conn, &my_address, &peer_address, rx_data_to_tun, conn_evt_cb, NULL));
RESULT_CHECK(connection_init(&l2conn, &my_address, &peer_address, conn_evt_cb, NULL));
// force connection into the established state
l2conn.conn_state = CONN_STATE_ESTABLISHED;
@ -267,64 +263,55 @@ int main(int argc, char **argv)
}
}
if((now > next_tx_switch_time)) {
if(connection_can_transmit(&l2conn)) {
// there is a packet to be (re)transmitted.
// transmit one burst
if(connection_can_transmit(&l2conn)) {
// there is a packet to be (re)transmitted.
LOG(LVL_DEBUG, "Starting new burst.");
LOG(LVL_DEBUG, "Starting new burst.");
size_t burst_len = 0;
size_t burst_len = 0;
// add packets to the burst until only 50000 samples remain free in the SDR buffer
while(true) {
uint8_t packet_buf[2048];
size_t packet_size;
bool end_burst;
// add packets to the burst until only 50000 samples remain free in the SDR buffer
while(true) {
uint8_t packet_buf[2048];
size_t packet_size;
bool end_burst;
packet_size = connection_encode_next_packet(&l2conn,
packet_buf, sizeof(packet_buf), &end_burst);
packet_size = connection_encode_next_packet(&l2conn,
packet_buf, sizeof(packet_buf), &end_burst);
if(packet_size == 0) {
// no more packets available
LOG(LVL_DEBUG, "Ending burst due to empty packet queue.");
break;
}
LOG(LVL_DEBUG, "Adding packet with %d bytes to burst.", packet_size);
burst_len++;
RESULT_CHECK(transmit(packet_buf, packet_size));
if(end_burst) {
LOG(LVL_DEBUG, "Ending burst on request.");
break;
}
if(packet_size == 0) {
// no more packets available
LOG(LVL_DEBUG, "Ending burst due to empty packet queue.");
break;
}
connection_tx_clean_empty_packet(&l2conn);
LOG(LVL_DEBUG, "Adding packet with %d bytes to burst.", packet_size);
LOG(LVL_DEBUG, "Burst finished: %zd packets sent.", burst_len);
burst_len++;
RESULT_CHECK(transmit(packet_buf, packet_size));
if(!on_air) {
LOG(LVL_INFO, "RX -> TX");
if(end_burst) {
LOG(LVL_DEBUG, "Ending burst on request.");
break;
}
on_air = true;
} else if(on_air) { // TX on, but no more bursts to send
LOG(LVL_INFO, "TX -> RX");
on_air = false;
block_tx_for(TX_SWITCH_BACKOFF_AFTER_RX_ON);
}
connection_tx_clean_empty_packet(&l2conn);
LOG(LVL_DEBUG, "Burst finished: %zd packets sent.", burst_len);
}
if(!on_air) {
// make sure the receiver runs for a minimum amount of time
block_tx_for(TX_SWITCH_BACKOFF_AFTER_RX_ON);
while(get_hires_time() < next_tx_switch_time) {
// ** Receive signal **
int ret = poll(&pfd_bcast, 1, 10);
if(ret < 0) {
LOG(LVL_ERR, "poll: %s", strerror(errno));
break;
return EXIT_FAILURE;
}
if(ret == 0) {
@ -335,13 +322,39 @@ int main(int argc, char **argv)
ret = recv(m_bcast_sock, packetbuf, sizeof(packetbuf), 0);
if(ret < 0) {
LOG(LVL_ERR, "recv: %s", strerror(errno));
return EXIT_FAILURE;
} else if(ret == 0) {
LOG(LVL_ERR, "recv() returned zero.");
return EXIT_FAILURE;
}
if(ret <= 0) {
break;
}
block_tx_for(TX_SWITCH_BACKOFF_END_OF_PACKET_MS);
handle_received_packet(packetbuf, ret);
layer2_data_packet_t data_packet;
result_t result = connection_handle_packet(&l2conn, packetbuf, ret, &data_packet);
switch(result) {
case OK:
m_rx_stats.successful_decodes++;
if(data_packet.payload_len != 0) {
rx_data_to_tun(&data_packet);
}
break;
case ERR_INTEGRITY:
LOG(LVL_ERR, "Packet could not be decoded by Layer 2.");
m_rx_stats.failed_decodes++;
break;
case ERR_SEQUENCE:
LOG(LVL_ERR, "Packet not in the expected sequence.");
break;
default: // all other errors
LOG(LVL_ERR, "layer2_rx_handle_packet() returned error code %u.", result);
break;
}
total_bytes += ret;

View file

@ -25,12 +25,10 @@
#define LOGGER_MODULE_NAME "main"
#include "logger.h"
#include "options.h"
#include "jsonlogger.h"
#include "debug_structs.h"
#include "layer2/connection.h"
#include "layer2/connection_list.h"
#include "layer2/digipeater.h"
#include "layer2/tundev.h"
@ -55,8 +53,6 @@ static uint64_t next_tx_switch_time = 0;
static rx_stats_t m_rx_stats;
static connection_ctx_t l2conn;
static void signal_handler(int signal, siginfo_t *info, void *ctx)
{
(void)signal;
@ -74,32 +70,6 @@ static void block_tx_for(unsigned offset_ms)
}
void handle_received_packet(uint8_t *packet_data, size_t packet_len)
{
block_tx_for(TX_SWITCH_BACKOFF_END_OF_PACKET_MS);
result_t result = connection_handle_packet(&l2conn, packet_data, packet_len);
switch(result) {
case OK:
m_rx_stats.successful_decodes++;
break;
case ERR_INTEGRITY:
LOG(LVL_ERR, "Packet could not be decoded by Layer 2.");
m_rx_stats.failed_decodes++;
break;
case ERR_SEQUENCE:
LOG(LVL_ERR, "Packet not in the expected sequence.");
break;
default: // all other errors
LOG(LVL_ERR, "layer2_rx_handle_packet() returned error code %u.", result);
break;
}
}
static result_t transmit(const uint8_t *data, size_t len)
{
result_t result = OK;
@ -121,11 +91,29 @@ static result_t transmit(const uint8_t *data, size_t len)
}
void rx_data_to_tun(struct connection_ctx_s *conn, const uint8_t *data, size_t len)
void rx_data_to_tun(const layer2_data_packet_t *data_packet)
{
(void)conn;
uint8_t tun_packet[4 + data_packet->payload_len];
int ret = write(m_tunfd, data, len);
// flags
tun_packet[0] = 0;
tun_packet[1] = 0;
switch(data_packet->payload_type) {
case L2_PAYLOAD_TYPE_IPV6:
*(uint16_t*)(tun_packet+2) = 0x86dd;
break;
case L2_PAYLOAD_TYPE_IPV4:
*(uint16_t*)(tun_packet+2) = 0x0800;
break;
default:
LOG(LVL_ERR, "Unsupported payload type: 0x%08x.", data_packet->payload_type);
return;
}
memcpy(tun_packet+4, data_packet->payload, data_packet->payload_len);
int ret = write(m_tunfd, tun_packet, sizeof(tun_packet));
if(ret < 0) {
LOG(LVL_ERR, "write(tun): %s", strerror(errno));
}
@ -142,8 +130,6 @@ int main(int argc, char **argv)
return EXIT_FAILURE;
}
bool on_air = true;
srand(get_hires_time());
// ** Initialize **
@ -155,16 +141,12 @@ int main(int argc, char **argv)
return 1;
}
connection_list_t conn_list;
RESULT_CHECK(connection_list_init(&conn_list));
ham64_t my_address, peer_address;
ham64_encode(MY_CALL, &my_address);
ham64_encode(PEER_CALL, &peer_address);
RESULT_CHECK(connection_init(&l2conn, &my_address, &peer_address, rx_data_to_tun));
// force connection into the established state
l2conn.conn_state = CONN_STATE_ESTABLISHED;
digipeater_ctx_t digipeater;
RESULT_CHECK(digipeater_init(&digipeater, &my_address));
// ** Set up signal handling
@ -226,97 +208,63 @@ int main(int argc, char **argv)
size_t total_bytes = 0;
uint64_t next_stats_print_time = old + HRTIME_MS(500);
uint64_t next_beacon_time = old + HRTIME_MS(5000);
while(m_running) {
uint64_t now = get_hires_time();
RESULT_CHECK(digipeater_maintain(&digipeater));
if(now >= next_beacon_time) {
// TODO: encode and transmit beacon
next_beacon_time += HRTIME_MS(5000);
}
RESULT_CHECK(digipeater_fill_packet_queues_from_tundev(&digipeater, m_tunfd));
// FIXME: fill the TX queues from the TUN device
while(connection_can_enqueue_packet(&l2conn)) {
int ret = poll(&pfd_tun, 1, 0 /* timeout */);
if(ret < 0) {
LOG(LVL_ERR, "poll: %s", strerror(errno));
break;
} else if(ret == 0) {
// no more packets
break;
} else {
// a packet is available -> move it to the queue
static const size_t packetbuf_size = 2048;
uint8_t packetbuf[packetbuf_size];
ret = read(m_tunfd, packetbuf, packetbuf_size);
if(ret < 0) {
LOG(LVL_ERR, "read: %s", strerror(errno));
return ERR_SYSCALL;
} else if(ret == 0) {
// no more data, should not happen
// transmit anything available
if(digipeater_can_transmit(&digipeater)) {
// there is a packet to be (re)transmitted.
LOG(LVL_DEBUG, "Starting new burst.");
size_t burst_len = 0;
// add packets to the burst until only 50000 samples remain free in the SDR buffer
while(true) {
uint8_t packet_buf[2048];
size_t packet_size;
bool end_burst;
packet_size = digipeater_encode_next_packet(&digipeater,
packet_buf, sizeof(packet_buf), &end_burst);
if(packet_size == 0) {
// no more packets available
LOG(LVL_DEBUG, "Ending burst due to empty packet queue.");
break;
}
RESULT_CHECK(connection_enqueue_packet(&l2conn, packetbuf, ret));
LOG(LVL_DEBUG, "Adding packet with %d bytes to burst.", packet_size);
burst_len++;
RESULT_CHECK(transmit(packet_buf, packet_size));
if(end_burst) {
LOG(LVL_DEBUG, "Ending burst on request.");
break;
}
}
// FIXME: where to put this in the digipeater?
//connection_tx_clean_empty_packet(&l2conn);
LOG(LVL_DEBUG, "Burst finished: %zd packets sent.", burst_len);
}
if((now > next_tx_switch_time)) {
if(connection_can_transmit(&l2conn)) {
// there is a packet to be (re)transmitted.
// make sure the receiver runs for a minimum amount of time
block_tx_for(TX_SWITCH_BACKOFF_AFTER_RX_ON);
LOG(LVL_DEBUG, "Starting new burst.");
size_t burst_len = 0;
// add packets to the burst until only 50000 samples remain free in the SDR buffer
while(true) {
uint8_t packet_buf[2048];
size_t packet_size;
packet_size = connection_encode_next_packet(&l2conn,
connection_get_next_expected_seq(&l2conn),
packet_buf, sizeof(packet_buf));
if(packet_size == 0) {
// no more packets available
LOG(LVL_DEBUG, "Ending burst due to empty packet queue.");
break;
}
LOG(LVL_DEBUG, "Adding packet with %d bytes to burst.", packet_size);
burst_len++;
RESULT_CHECK(transmit(packet_buf, packet_size));
}
connection_tx_clean_empty_packet(&l2conn);
LOG(LVL_DEBUG, "Burst finished: %zd packets sent.", burst_len);
if(!on_air) {
LOG(LVL_INFO, "RX -> TX");
}
on_air = true;
} else if(on_air) { // TX on, but no more bursts to send
LOG(LVL_INFO, "TX -> RX");
on_air = false;
retransmit_time = get_hires_time() + HRTIME_SEC(1) + HRTIME_SEC(1.0 * rand() / RAND_MAX);
block_tx_for(TX_SWITCH_BACKOFF_AFTER_RX_ON);
}
}
if(!on_air) {
// ** Receive signal **
// receive response
while(get_hires_time() < next_tx_switch_time) {
// ** Receive packets from the broadcast socket **
int ret = poll(&pfd_bcast, 1, 10);
if(ret < 0) {
LOG(LVL_ERR, "poll: %s", strerror(errno));
break;
return EXIT_FAILURE;
}
if(ret == 0) {
@ -327,13 +275,39 @@ int main(int argc, char **argv)
ret = recv(m_bcast_sock, packetbuf, sizeof(packetbuf), 0);
if(ret < 0) {
LOG(LVL_ERR, "recv: %s", strerror(errno));
return EXIT_FAILURE;
} else if(ret == 0) {
LOG(LVL_ERR, "recv() returned zero.");
return EXIT_FAILURE;
}
if(ret <= 0) {
break;
}
block_tx_for(TX_SWITCH_BACKOFF_END_OF_PACKET_MS);
handle_received_packet(packetbuf, ret);
layer2_data_packet_t data_packet;
result_t result = digipeater_handle_packet(&digipeater, packetbuf, ret, &data_packet);
switch(result) {
case OK:
m_rx_stats.successful_decodes++;
if(data_packet.payload_len != 0) {
rx_data_to_tun(&data_packet);
}
break;
case ERR_INTEGRITY:
LOG(LVL_ERR, "Packet could not be decoded by Layer 2.");
m_rx_stats.failed_decodes++;
break;
case ERR_SEQUENCE:
LOG(LVL_ERR, "Packet not in the expected sequence.");
break;
default: // all other errors
LOG(LVL_ERR, "layer2_rx_handle_packet() returned error code %u.", result);
break;
}
total_bytes += ret;
@ -363,7 +337,7 @@ int main(int argc, char **argv)
close(m_bcast_sock);
connection_destroy(&l2conn);
digipeater_destroy(&digipeater);
jsonlogger_shutdown();