Remove data callbacks in connection and digipeater modules

This commit is contained in:
Thomas Kolb 2024-11-17 18:26:18 +01:00
parent f895adf877
commit f376bd7db3
9 changed files with 249 additions and 249 deletions

View file

@ -1,3 +1,4 @@
#include "layer2/packet_structs.h"
#include <string.h> #include <string.h>
#include <assert.h> #include <assert.h>
@ -18,7 +19,6 @@ result_t connection_init(
connection_ctx_t *ctx, connection_ctx_t *ctx,
const ham64_t *my_addr, const ham64_t *my_addr,
const ham64_t *peer_addr, const ham64_t *peer_addr,
connection_data_callback_t data_cb,
connection_event_callback_t event_cb, connection_event_callback_t event_cb,
void *user_ctx) void *user_ctx)
{ {
@ -32,7 +32,6 @@ result_t connection_init(
ctx->my_addr = *my_addr; ctx->my_addr = *my_addr;
ctx->peer_addr = *peer_addr; ctx->peer_addr = *peer_addr;
ctx->data_cb = data_cb;
ctx->event_cb = event_cb; ctx->event_cb = event_cb;
uint64_t now = get_hires_time(); uint64_t now = get_hires_time();
@ -79,7 +78,7 @@ void connection_destroy(connection_ctx_t *ctx)
} }
result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, size_t buf_len) result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, size_t buf_len, layer2_data_packet_t *data_packet)
{ {
// check the connection state // check the connection state
switch(ctx->conn_state) { switch(ctx->conn_state) {
@ -135,14 +134,18 @@ result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, siz
const uint8_t *payload = buf + header_size; const uint8_t *payload = buf + header_size;
size_t payload_len = packet_size - header_size; size_t payload_len = packet_size - header_size;
return connection_handle_packet_prechecked(ctx, &header, payload, payload_len); return connection_handle_packet_prechecked(ctx, &header, payload, payload_len, data_packet);
} }
result_t connection_handle_packet_prechecked( result_t connection_handle_packet_prechecked(
connection_ctx_t *ctx, connection_ctx_t *ctx,
const layer2_packet_header_t *header, const layer2_packet_header_t *header,
const uint8_t *payload, size_t payload_len) const uint8_t *payload, size_t payload_len,
layer2_data_packet_t *data_packet)
{ {
data_packet->payload_type = L2_PAYLOAD_TYPE_INVALID;
data_packet->payload_len = 0;
// check the connection state // check the connection state
switch(ctx->conn_state) { switch(ctx->conn_state) {
case CONN_STATE_UNINITIALIZED: case CONN_STATE_UNINITIALIZED:
@ -182,7 +185,7 @@ result_t connection_handle_packet_prechecked(
LOG(LVL_DEBUG, "Empty packet: accepted ACK for %u.", ctx->last_acked_seq); LOG(LVL_DEBUG, "Empty packet: accepted ACK for %u.", ctx->last_acked_seq);
// handle the acknowledgement internally // handle the acknowledgement internally
connection_handle_ack(ctx, header->rx_seq_nr, false); connection_handle_ack(ctx, header->rx_seq_nr, false);
return OK; // do not ACK and call back return OK; // do not ACK
case L2_MSG_TYPE_CONN_MGMT: case L2_MSG_TYPE_CONN_MGMT:
case L2_MSG_TYPE_CONNECTIONLESS: case L2_MSG_TYPE_CONNECTIONLESS:
@ -212,8 +215,10 @@ result_t connection_handle_packet_prechecked(
// handle the acknowledgement internally // handle the acknowledgement internally
connection_handle_ack(ctx, header->rx_seq_nr, true); connection_handle_ack(ctx, header->rx_seq_nr, true);
// extract the payload and forward it to the tun device // pass the decoded data back to the user
ctx->data_cb(ctx, payload, payload_len, ctx->user_context); data_packet->payload_type = payload[0];
data_packet->payload = payload + 1;
data_packet->payload_len = payload_len - 1;
return OK; return OK;
} }

View file

@ -26,9 +26,6 @@ typedef enum {
CONN_STATE_CLOSED //!< Connection has been closed (gracefully or by timeout) CONN_STATE_CLOSED //!< Connection has been closed (gracefully or by timeout)
} connection_state_t; } connection_state_t;
/*!\brief Type for a callback function that is called when a data packet was received. */
typedef void (*connection_data_callback_t)(struct connection_ctx_s *conn, const uint8_t *data, size_t len, void *user_ctx);
typedef enum { typedef enum {
CONN_EVT_TIMEOUT, //!< The connection timed out because no packets were received CONN_EVT_TIMEOUT, //!< The connection timed out because no packets were received
CONN_EVT_RETRANSMIT, //!< Packet queue transmission is restarted CONN_EVT_RETRANSMIT, //!< Packet queue transmission is restarted
@ -40,7 +37,6 @@ typedef void (*connection_event_callback_t)(struct connection_ctx_s *conn, conne
typedef struct connection_ctx_s { typedef struct connection_ctx_s {
connection_state_t conn_state; //!< State of the connection. connection_state_t conn_state; //!< State of the connection.
connection_data_callback_t data_cb; //!< Callback function for received data packets.
connection_event_callback_t event_cb; //!< Callback function for event signalling. connection_event_callback_t event_cb; //!< Callback function for event signalling.
ham64_t my_addr; //!< The local link layer address. ham64_t my_addr; //!< The local link layer address.
@ -68,7 +64,6 @@ typedef struct connection_ctx_s {
* \param ctx The connection context to initialize. * \param ctx The connection context to initialize.
* \param my_addr The local link layer address. * \param my_addr The local link layer address.
* \param peer_addr The remote link layer address. * \param peer_addr The remote link layer address.
* \param data_cb Callback for handling received payload data.
* \param event_cb Callback for connection events. * \param event_cb Callback for connection events.
* \param user_ctx User context pointer (for arbitrary data). * \param user_ctx User context pointer (for arbitrary data).
* \returns OK if everything worked or a fitting error code. * \returns OK if everything worked or a fitting error code.
@ -77,7 +72,6 @@ result_t connection_init(
connection_ctx_t *ctx, connection_ctx_t *ctx,
const ham64_t *my_addr, const ham64_t *my_addr,
const ham64_t *peer_addr, const ham64_t *peer_addr,
connection_data_callback_t data_cb,
connection_event_callback_t event_cb, connection_event_callback_t event_cb,
void *user_ctx); void *user_ctx);
@ -87,12 +81,17 @@ void connection_destroy(connection_ctx_t *ctx);
/*!\brief Handle a received packet. /*!\brief Handle a received packet.
* *
* \param ctx The connection context. * \param[inout] ctx The connection context.
* \param buf Pointer to the packet data. * \param[in] buf Pointer to the packet data.
* \param buf_len Length of the packet. * \param[in] buf_len Length of the packet.
* \param[out] data_packet Structure will be filled with a received data packet.
* \returns A result code from the packet handling procedure. * \returns A result code from the packet handling procedure.
*/ */
result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, size_t buf_len); result_t connection_handle_packet(
connection_ctx_t *ctx,
const uint8_t *buf,
size_t buf_len,
layer2_data_packet_t *data_packet);
/*!\brief Handle a received packet where the header has already been decoded. /*!\brief Handle a received packet where the header has already been decoded.
* *
@ -101,16 +100,18 @@ result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, siz
* - Header can be decoded * - Header can be decoded
* - Destination address is the local address * - Destination address is the local address
* *
* \param ctx The connection context. * \param[inout] ctx The connection context.
* \param header Pointer to the decoded header structure. * \param[in] header Pointer to the decoded header structure.
* \param payload Pointer to the payload data. * \param[in] payload Pointer to the payload data.
* \param payload_len Length of the payload data. * \param[in] payload_len Length of the payload data.
* \param[out] data_packet Structure will be filled with a received data packet.
* \returns A result code from the packet handling procedure. * \returns A result code from the packet handling procedure.
*/ */
result_t connection_handle_packet_prechecked( result_t connection_handle_packet_prechecked(
connection_ctx_t *ctx, connection_ctx_t *ctx,
const layer2_packet_header_t *header, const layer2_packet_header_t *header,
const uint8_t *payload, size_t payload_len); const uint8_t *payload, size_t payload_len,
layer2_data_packet_t *data_packet);
/*!\brief Return the sequence number expected next by our side. /*!\brief Return the sequence number expected next by our side.
*/ */

View file

@ -8,7 +8,7 @@
#define SEQ_NR_MASK 0xF #define SEQ_NR_MASK 0xF
static destroy_entry(connection_list_entry_t *entry) static void destroy_entry(connection_list_entry_t *entry)
{ {
connection_destroy(&entry->connection); connection_destroy(&entry->connection);
free(entry); free(entry);

View file

@ -17,14 +17,6 @@
#include "results.h" #include "results.h"
#include "utils.h" #include "utils.h"
void conn_data_cb(struct connection_ctx_s *conn, const uint8_t *data, size_t len, void *user_ctx)
{
(void)conn;
digipeater_ctx_t *digi_ctx = user_ctx;
digi_ctx->data_cb(digi_ctx, data, len);
}
void conn_event_cb(struct connection_ctx_s *conn, connection_evt_t evt, void *user_ctx) void conn_event_cb(struct connection_ctx_s *conn, connection_evt_t evt, void *user_ctx)
{ {
(void)conn; (void)conn;
@ -68,7 +60,7 @@ static result_t digipeater_handle_beacon_responses(digipeater_ctx_t *ctx, const
// and add it at the beginning of the connection list. // and add it at the beginning of the connection list.
connection_ctx_t new_conn; connection_ctx_t new_conn;
ERR_CHECK(connection_init(&new_conn, &ctx->my_addr, &header->src_addr, conn_data_cb, conn_event_cb, ctx)); ERR_CHECK(connection_init(&new_conn, &ctx->my_addr, &header->src_addr, conn_event_cb, ctx));
ERR_CHECK(connection_send_parameters(&new_conn)); ERR_CHECK(connection_send_parameters(&new_conn));
@ -98,10 +90,9 @@ static size_t encode_beacon_packet(digipeater_ctx_t *ctx, uint8_t *buf, size_t b
} }
result_t digipeater_init(digipeater_ctx_t *ctx, const ham64_t *my_addr, digipeater_data_callback_t data_cb) result_t digipeater_init(digipeater_ctx_t *ctx, const ham64_t *my_addr)
{ {
ctx->my_addr = *my_addr; ctx->my_addr = *my_addr;
ctx->data_cb = data_cb;
ctx->state = DIGIPEATER_STATE_CONN; ctx->state = DIGIPEATER_STATE_CONN;
@ -122,8 +113,15 @@ void digipeater_destroy(digipeater_ctx_t *ctx)
} }
result_t digipeater_handle_packet(digipeater_ctx_t *ctx, const uint8_t *buf, size_t buf_len) result_t digipeater_handle_packet(
digipeater_ctx_t *ctx,
const uint8_t *buf,
size_t buf_len,
layer2_data_packet_t *data_packet)
{ {
data_packet->payload_type = L2_PAYLOAD_TYPE_INVALID;
data_packet->payload_len = 0;
// check the CRC // check the CRC
size_t packet_size = buf_len - crc_sizeof_key(PAYLOAD_CRC_SCHEME); size_t packet_size = buf_len - crc_sizeof_key(PAYLOAD_CRC_SCHEME);
@ -177,12 +175,14 @@ result_t digipeater_handle_packet(digipeater_ctx_t *ctx, const uint8_t *buf, siz
connection_list_entry_t *head = connection_list_get_head(&ctx->conn_list); connection_list_entry_t *head = connection_list_get_head(&ctx->conn_list);
if(head) { if(head) {
connection_ctx_t *current_conn = &head->connection; connection_ctx_t *current_conn = &head->connection;
result = connection_handle_packet_prechecked(current_conn, &header, payload, payload_len); result = connection_handle_packet_prechecked(
current_conn, &header, payload, payload_len, data_packet);
} else { } else {
LOG(LVL_WARN, "Digipeater in CONN state, but there is no active connection! Packet dropped."); LOG(LVL_WARN, "Digipeater in CONN state, but there is no active connection! Packet dropped.");
result = OK; result = OK;
} }
} }
break;
case DIGIPEATER_STATE_BEACON: case DIGIPEATER_STATE_BEACON:
result = digipeater_handle_beacon_responses(ctx, &header, payload, payload_len); result = digipeater_handle_beacon_responses(ctx, &header, payload, payload_len);
@ -257,8 +257,6 @@ result_t digipeater_fill_packet_queues_from_tundev(digipeater_ctx_t *ctx, int tu
size_t digipeater_encode_next_packet(digipeater_ctx_t *ctx, uint8_t *buf, size_t buf_len, bool *end_burst) size_t digipeater_encode_next_packet(digipeater_ctx_t *ctx, uint8_t *buf, size_t buf_len, bool *end_burst)
{ {
uint64_t now = get_hires_time();
size_t packet_size = 0; size_t packet_size = 0;
*end_burst = false; *end_burst = false;

View file

@ -26,16 +26,12 @@ typedef enum {
DIGIPEATER_EVT_INTERVAL_END, //!< The current cycle has ended and new packets should be transmitted. DIGIPEATER_EVT_INTERVAL_END, //!< The current cycle has ended and new packets should be transmitted.
} digipeater_evt_t; } digipeater_evt_t;
/*!\brief Type for a callback function that is called when a data packet was received. */
typedef void (*digipeater_data_callback_t)(struct digipeater_ctx_s *digi, const uint8_t *data, size_t len);
/*!\brief Type for a callback function that is called when certain events occur. */ /*!\brief Type for a callback function that is called when certain events occur. */
typedef void (*digipeater_evt_callback_t)(struct digipeater_ctx_s *digi, digipeater_evt_t evt); typedef void (*digipeater_evt_callback_t)(struct digipeater_ctx_s *digi, digipeater_evt_t evt);
typedef struct digipeater_ctx_s { typedef struct digipeater_ctx_s {
digipeater_state_t state; //!< Current operating state digipeater_state_t state; //!< Current operating state
digipeater_data_callback_t data_cb; //!< Callback function for received data packets.
digipeater_evt_callback_t event_cb; //!< Callback function for events. digipeater_evt_callback_t event_cb; //!< Callback function for events.
ham64_t my_addr; //!< The local link layer address. ham64_t my_addr; //!< The local link layer address.
@ -53,13 +49,11 @@ typedef struct digipeater_ctx_s {
* *
* \param ctx The digipeater context to initialize. * \param ctx The digipeater context to initialize.
* \param my_addr The local link layer address. * \param my_addr The local link layer address.
* \param data_cb Callback function that handles received (decoded) data packets.
* \returns OK if everything worked or a fitting error code. * \returns OK if everything worked or a fitting error code.
*/ */
result_t digipeater_init( result_t digipeater_init(
digipeater_ctx_t *ctx, digipeater_ctx_t *ctx,
const ham64_t *my_addr, const ham64_t *my_addr);
digipeater_data_callback_t data_cb);
/*!\brief Destroy the given digipeater context. /*!\brief Destroy the given digipeater context.
*/ */
@ -67,12 +61,17 @@ void digipeater_destroy(digipeater_ctx_t *ctx);
/*!\brief Handle a received packet. /*!\brief Handle a received packet.
* *
* \param ctx The digipeater context. * \param[inout] ctx The digipeater context.
* \param buf Pointer to the packet data. * \param[in] buf Pointer to the packet data.
* \param buf_len Length of the packet. * \param[in] buf_len Length of the packet.
* \param[out] data_packet Structure will be filled with a received data packet.
* \returns A result code from the packet handling procedure. * \returns A result code from the packet handling procedure.
*/ */
result_t digipeater_handle_packet(digipeater_ctx_t *ctx, const uint8_t *buf, size_t buf_len); result_t digipeater_handle_packet(
digipeater_ctx_t *ctx,
const uint8_t *buf,
size_t buf_len,
layer2_data_packet_t *data_packet);
/*!\brief Enqueue a packet for transmission. /*!\brief Enqueue a packet for transmission.
* \param ctx The digipeater context. * \param ctx The digipeater context.
@ -113,7 +112,8 @@ void digipeater_extend_cycle(digipeater_ctx_t *ctx, uint64_t ns);
/*!\brief End the current cycle. /*!\brief End the current cycle.
* *
* End the cycle without waiting for the timeout. This switches to the next * End the cycle without waiting for the timeout. This switches to the next
* connection and stop forwarding received packets to the current one. * connection or transmits a beacon. In any case, it stops forwarding received
* packets to the current connection.
*/ */
result_t digipeater_end_cycle(digipeater_ctx_t *ctx); result_t digipeater_end_cycle(digipeater_ctx_t *ctx);

View file

@ -96,12 +96,17 @@ const char* layer2_msg_type_to_string(layer2_message_type_t type);
typedef enum { typedef enum {
L2_PAYLOAD_TYPE_IPV4 = 0x00, L2_PAYLOAD_TYPE_IPV4 = 0x00,
L2_PAYLOAD_TYPE_IPV6 = 0x01 L2_PAYLOAD_TYPE_IPV6 = 0x01,
L2_PAYLOAD_TYPE_INVALID = 0x7FFFFFFF
} layer2_payload_type_t; } layer2_payload_type_t;
typedef struct layer2_data_header_s { typedef struct layer2_data_packet_s {
layer2_payload_type_t payload_type; //!< Type of the contained layer 3 packet layer2_payload_type_t payload_type; //!< Type of the contained layer 3 packet
} layer2_data_header_t;
const uint8_t *payload; //!< Pointer to the payload data
size_t payload_len; //!< Length of the payload data
} layer2_data_packet_t;
/* Connection Management Structs */ /* Connection Management Structs */

View file

@ -56,6 +56,10 @@ add_executable(
../../src/layer2/packet_queue.h ../../src/layer2/packet_queue.h
../../src/layer2/connection.c ../../src/layer2/connection.c
../../src/layer2/connection.h ../../src/layer2/connection.h
../../src/layer2/connection_list.c
../../src/layer2/connection_list.h
../../src/layer2/digipeater.c
../../src/layer2/digipeater.h
../../src/layer2/tundev.c ../../src/layer2/tundev.c
../../src/layer2/tundev.h ../../src/layer2/tundev.h
l2udptest_digipeater.c l2udptest_digipeater.c

View file

@ -25,7 +25,6 @@
#define LOGGER_MODULE_NAME "main" #define LOGGER_MODULE_NAME "main"
#include "logger.h" #include "logger.h"
#include "options.h"
#include "jsonlogger.h" #include "jsonlogger.h"
#include "debug_structs.h" #include "debug_structs.h"
@ -75,27 +74,6 @@ static void block_tx_for(unsigned offset_ms)
void handle_received_packet(uint8_t *packet_data, size_t packet_len) void handle_received_packet(uint8_t *packet_data, size_t packet_len)
{ {
block_tx_for(TX_SWITCH_BACKOFF_END_OF_PACKET_MS);
result_t result = connection_handle_packet(&l2conn, packet_data, packet_len);
switch(result) {
case OK:
m_rx_stats.successful_decodes++;
break;
case ERR_INTEGRITY:
LOG(LVL_ERR, "Packet could not be decoded by Layer 2.");
m_rx_stats.failed_decodes++;
break;
case ERR_SEQUENCE:
LOG(LVL_ERR, "Packet not in the expected sequence.");
break;
default: // all other errors
LOG(LVL_ERR, "layer2_rx_handle_packet() returned error code %u.", result);
break;
}
} }
@ -120,17 +98,35 @@ static result_t transmit(const uint8_t *data, size_t len)
} }
void rx_data_to_tun(struct connection_ctx_s *conn, const uint8_t *data, size_t len, void *user_ctx) void rx_data_to_tun(const layer2_data_packet_t *data_packet)
{ {
(void)conn; uint8_t tun_packet[4 + data_packet->payload_len];
(void)user_ctx;
int ret = write(m_tunfd, data, len); // flags
tun_packet[0] = 0;
tun_packet[1] = 0;
switch(data_packet->payload_type) {
case L2_PAYLOAD_TYPE_IPV6:
*(uint16_t*)(tun_packet+2) = 0x86dd;
break;
case L2_PAYLOAD_TYPE_IPV4:
*(uint16_t*)(tun_packet+2) = 0x0800;
break;
default:
LOG(LVL_ERR, "Unsupported payload type: 0x%08x.", data_packet->payload_type);
return;
}
memcpy(tun_packet+4, data_packet->payload, data_packet->payload_len);
int ret = write(m_tunfd, tun_packet, sizeof(tun_packet));
if(ret < 0) { if(ret < 0) {
LOG(LVL_ERR, "write(tun): %s", strerror(errno)); LOG(LVL_ERR, "write(tun): %s", strerror(errno));
} }
} }
void conn_evt_cb(struct connection_ctx_s *conn, connection_evt_t evt, void *user_ctx) void conn_evt_cb(struct connection_ctx_s *conn, connection_evt_t evt, void *user_ctx)
{ {
(void)conn; (void)conn;
@ -165,7 +161,7 @@ int main(int argc, char **argv)
ham64_t my_address, peer_address; ham64_t my_address, peer_address;
ham64_encode(MY_CALL, &my_address); ham64_encode(MY_CALL, &my_address);
ham64_encode(PEER_CALL, &peer_address); ham64_encode(PEER_CALL, &peer_address);
RESULT_CHECK(connection_init(&l2conn, &my_address, &peer_address, rx_data_to_tun, conn_evt_cb, NULL)); RESULT_CHECK(connection_init(&l2conn, &my_address, &peer_address, conn_evt_cb, NULL));
// force connection into the established state // force connection into the established state
l2conn.conn_state = CONN_STATE_ESTABLISHED; l2conn.conn_state = CONN_STATE_ESTABLISHED;
@ -267,64 +263,55 @@ int main(int argc, char **argv)
} }
} }
if((now > next_tx_switch_time)) { // transmit one burst
if(connection_can_transmit(&l2conn)) { if(connection_can_transmit(&l2conn)) {
// there is a packet to be (re)transmitted. // there is a packet to be (re)transmitted.
LOG(LVL_DEBUG, "Starting new burst."); LOG(LVL_DEBUG, "Starting new burst.");
size_t burst_len = 0; size_t burst_len = 0;
// add packets to the burst until only 50000 samples remain free in the SDR buffer // add packets to the burst until only 50000 samples remain free in the SDR buffer
while(true) { while(true) {
uint8_t packet_buf[2048]; uint8_t packet_buf[2048];
size_t packet_size; size_t packet_size;
bool end_burst; bool end_burst;
packet_size = connection_encode_next_packet(&l2conn, packet_size = connection_encode_next_packet(&l2conn,
packet_buf, sizeof(packet_buf), &end_burst); packet_buf, sizeof(packet_buf), &end_burst);
if(packet_size == 0) { if(packet_size == 0) {
// no more packets available // no more packets available
LOG(LVL_DEBUG, "Ending burst due to empty packet queue."); LOG(LVL_DEBUG, "Ending burst due to empty packet queue.");
break; break;
}
LOG(LVL_DEBUG, "Adding packet with %d bytes to burst.", packet_size);
burst_len++;
RESULT_CHECK(transmit(packet_buf, packet_size));
if(end_burst) {
LOG(LVL_DEBUG, "Ending burst on request.");
break;
}
} }
connection_tx_clean_empty_packet(&l2conn); LOG(LVL_DEBUG, "Adding packet with %d bytes to burst.", packet_size);
LOG(LVL_DEBUG, "Burst finished: %zd packets sent.", burst_len); burst_len++;
RESULT_CHECK(transmit(packet_buf, packet_size));
if(!on_air) { if(end_burst) {
LOG(LVL_INFO, "RX -> TX"); LOG(LVL_DEBUG, "Ending burst on request.");
break;
} }
on_air = true;
} else if(on_air) { // TX on, but no more bursts to send
LOG(LVL_INFO, "TX -> RX");
on_air = false;
block_tx_for(TX_SWITCH_BACKOFF_AFTER_RX_ON);
} }
connection_tx_clean_empty_packet(&l2conn);
LOG(LVL_DEBUG, "Burst finished: %zd packets sent.", burst_len);
} }
if(!on_air) { // make sure the receiver runs for a minimum amount of time
block_tx_for(TX_SWITCH_BACKOFF_AFTER_RX_ON);
while(get_hires_time() < next_tx_switch_time) {
// ** Receive signal ** // ** Receive signal **
int ret = poll(&pfd_bcast, 1, 10); int ret = poll(&pfd_bcast, 1, 10);
if(ret < 0) { if(ret < 0) {
LOG(LVL_ERR, "poll: %s", strerror(errno)); LOG(LVL_ERR, "poll: %s", strerror(errno));
break; return EXIT_FAILURE;
} }
if(ret == 0) { if(ret == 0) {
@ -335,13 +322,39 @@ int main(int argc, char **argv)
ret = recv(m_bcast_sock, packetbuf, sizeof(packetbuf), 0); ret = recv(m_bcast_sock, packetbuf, sizeof(packetbuf), 0);
if(ret < 0) { if(ret < 0) {
LOG(LVL_ERR, "recv: %s", strerror(errno)); LOG(LVL_ERR, "recv: %s", strerror(errno));
return EXIT_FAILURE;
} else if(ret == 0) {
LOG(LVL_ERR, "recv() returned zero.");
return EXIT_FAILURE;
} }
if(ret <= 0) { block_tx_for(TX_SWITCH_BACKOFF_END_OF_PACKET_MS);
break;
}
handle_received_packet(packetbuf, ret); layer2_data_packet_t data_packet;
result_t result = connection_handle_packet(&l2conn, packetbuf, ret, &data_packet);
switch(result) {
case OK:
m_rx_stats.successful_decodes++;
if(data_packet.payload_len != 0) {
rx_data_to_tun(&data_packet);
}
break;
case ERR_INTEGRITY:
LOG(LVL_ERR, "Packet could not be decoded by Layer 2.");
m_rx_stats.failed_decodes++;
break;
case ERR_SEQUENCE:
LOG(LVL_ERR, "Packet not in the expected sequence.");
break;
default: // all other errors
LOG(LVL_ERR, "layer2_rx_handle_packet() returned error code %u.", result);
break;
}
total_bytes += ret; total_bytes += ret;

View file

@ -25,12 +25,10 @@
#define LOGGER_MODULE_NAME "main" #define LOGGER_MODULE_NAME "main"
#include "logger.h" #include "logger.h"
#include "options.h"
#include "jsonlogger.h" #include "jsonlogger.h"
#include "debug_structs.h" #include "debug_structs.h"
#include "layer2/connection.h" #include "layer2/digipeater.h"
#include "layer2/connection_list.h"
#include "layer2/tundev.h" #include "layer2/tundev.h"
@ -55,8 +53,6 @@ static uint64_t next_tx_switch_time = 0;
static rx_stats_t m_rx_stats; static rx_stats_t m_rx_stats;
static connection_ctx_t l2conn;
static void signal_handler(int signal, siginfo_t *info, void *ctx) static void signal_handler(int signal, siginfo_t *info, void *ctx)
{ {
(void)signal; (void)signal;
@ -74,32 +70,6 @@ static void block_tx_for(unsigned offset_ms)
} }
void handle_received_packet(uint8_t *packet_data, size_t packet_len)
{
block_tx_for(TX_SWITCH_BACKOFF_END_OF_PACKET_MS);
result_t result = connection_handle_packet(&l2conn, packet_data, packet_len);
switch(result) {
case OK:
m_rx_stats.successful_decodes++;
break;
case ERR_INTEGRITY:
LOG(LVL_ERR, "Packet could not be decoded by Layer 2.");
m_rx_stats.failed_decodes++;
break;
case ERR_SEQUENCE:
LOG(LVL_ERR, "Packet not in the expected sequence.");
break;
default: // all other errors
LOG(LVL_ERR, "layer2_rx_handle_packet() returned error code %u.", result);
break;
}
}
static result_t transmit(const uint8_t *data, size_t len) static result_t transmit(const uint8_t *data, size_t len)
{ {
result_t result = OK; result_t result = OK;
@ -121,11 +91,29 @@ static result_t transmit(const uint8_t *data, size_t len)
} }
void rx_data_to_tun(struct connection_ctx_s *conn, const uint8_t *data, size_t len) void rx_data_to_tun(const layer2_data_packet_t *data_packet)
{ {
(void)conn; uint8_t tun_packet[4 + data_packet->payload_len];
int ret = write(m_tunfd, data, len); // flags
tun_packet[0] = 0;
tun_packet[1] = 0;
switch(data_packet->payload_type) {
case L2_PAYLOAD_TYPE_IPV6:
*(uint16_t*)(tun_packet+2) = 0x86dd;
break;
case L2_PAYLOAD_TYPE_IPV4:
*(uint16_t*)(tun_packet+2) = 0x0800;
break;
default:
LOG(LVL_ERR, "Unsupported payload type: 0x%08x.", data_packet->payload_type);
return;
}
memcpy(tun_packet+4, data_packet->payload, data_packet->payload_len);
int ret = write(m_tunfd, tun_packet, sizeof(tun_packet));
if(ret < 0) { if(ret < 0) {
LOG(LVL_ERR, "write(tun): %s", strerror(errno)); LOG(LVL_ERR, "write(tun): %s", strerror(errno));
} }
@ -142,8 +130,6 @@ int main(int argc, char **argv)
return EXIT_FAILURE; return EXIT_FAILURE;
} }
bool on_air = true;
srand(get_hires_time()); srand(get_hires_time());
// ** Initialize ** // ** Initialize **
@ -155,16 +141,12 @@ int main(int argc, char **argv)
return 1; return 1;
} }
connection_list_t conn_list;
RESULT_CHECK(connection_list_init(&conn_list));
ham64_t my_address, peer_address; ham64_t my_address, peer_address;
ham64_encode(MY_CALL, &my_address); ham64_encode(MY_CALL, &my_address);
ham64_encode(PEER_CALL, &peer_address); ham64_encode(PEER_CALL, &peer_address);
RESULT_CHECK(connection_init(&l2conn, &my_address, &peer_address, rx_data_to_tun));
// force connection into the established state digipeater_ctx_t digipeater;
l2conn.conn_state = CONN_STATE_ESTABLISHED; RESULT_CHECK(digipeater_init(&digipeater, &my_address));
// ** Set up signal handling // ** Set up signal handling
@ -226,97 +208,63 @@ int main(int argc, char **argv)
size_t total_bytes = 0; size_t total_bytes = 0;
uint64_t next_stats_print_time = old + HRTIME_MS(500); uint64_t next_stats_print_time = old + HRTIME_MS(500);
uint64_t next_beacon_time = old + HRTIME_MS(5000);
while(m_running) { while(m_running) {
uint64_t now = get_hires_time(); RESULT_CHECK(digipeater_maintain(&digipeater));
if(now >= next_beacon_time) { RESULT_CHECK(digipeater_fill_packet_queues_from_tundev(&digipeater, m_tunfd));
// TODO: encode and transmit beacon
next_beacon_time += HRTIME_MS(5000);
}
// FIXME: fill the TX queues from the TUN device // transmit anything available
while(connection_can_enqueue_packet(&l2conn)) { if(digipeater_can_transmit(&digipeater)) {
int ret = poll(&pfd_tun, 1, 0 /* timeout */); // there is a packet to be (re)transmitted.
if(ret < 0) {
LOG(LVL_ERR, "poll: %s", strerror(errno)); LOG(LVL_DEBUG, "Starting new burst.");
break;
} else if(ret == 0) { size_t burst_len = 0;
// no more packets
break; // add packets to the burst until only 50000 samples remain free in the SDR buffer
} else { while(true) {
// a packet is available -> move it to the queue uint8_t packet_buf[2048];
static const size_t packetbuf_size = 2048; size_t packet_size;
uint8_t packetbuf[packetbuf_size];
ret = read(m_tunfd, packetbuf, packetbuf_size); bool end_burst;
if(ret < 0) {
LOG(LVL_ERR, "read: %s", strerror(errno)); packet_size = digipeater_encode_next_packet(&digipeater,
return ERR_SYSCALL; packet_buf, sizeof(packet_buf), &end_burst);
} else if(ret == 0) {
// no more data, should not happen if(packet_size == 0) {
// no more packets available
LOG(LVL_DEBUG, "Ending burst due to empty packet queue.");
break; break;
} }
RESULT_CHECK(connection_enqueue_packet(&l2conn, packetbuf, ret)); LOG(LVL_DEBUG, "Adding packet with %d bytes to burst.", packet_size);
burst_len++;
RESULT_CHECK(transmit(packet_buf, packet_size));
if(end_burst) {
LOG(LVL_DEBUG, "Ending burst on request.");
break;
}
} }
// FIXME: where to put this in the digipeater?
//connection_tx_clean_empty_packet(&l2conn);
LOG(LVL_DEBUG, "Burst finished: %zd packets sent.", burst_len);
} }
if((now > next_tx_switch_time)) { // make sure the receiver runs for a minimum amount of time
if(connection_can_transmit(&l2conn)) { block_tx_for(TX_SWITCH_BACKOFF_AFTER_RX_ON);
// there is a packet to be (re)transmitted.
LOG(LVL_DEBUG, "Starting new burst."); // receive response
while(get_hires_time() < next_tx_switch_time) {
size_t burst_len = 0; // ** Receive packets from the broadcast socket **
// add packets to the burst until only 50000 samples remain free in the SDR buffer
while(true) {
uint8_t packet_buf[2048];
size_t packet_size;
packet_size = connection_encode_next_packet(&l2conn,
connection_get_next_expected_seq(&l2conn),
packet_buf, sizeof(packet_buf));
if(packet_size == 0) {
// no more packets available
LOG(LVL_DEBUG, "Ending burst due to empty packet queue.");
break;
}
LOG(LVL_DEBUG, "Adding packet with %d bytes to burst.", packet_size);
burst_len++;
RESULT_CHECK(transmit(packet_buf, packet_size));
}
connection_tx_clean_empty_packet(&l2conn);
LOG(LVL_DEBUG, "Burst finished: %zd packets sent.", burst_len);
if(!on_air) {
LOG(LVL_INFO, "RX -> TX");
}
on_air = true;
} else if(on_air) { // TX on, but no more bursts to send
LOG(LVL_INFO, "TX -> RX");
on_air = false;
retransmit_time = get_hires_time() + HRTIME_SEC(1) + HRTIME_SEC(1.0 * rand() / RAND_MAX);
block_tx_for(TX_SWITCH_BACKOFF_AFTER_RX_ON);
}
}
if(!on_air) {
// ** Receive signal **
int ret = poll(&pfd_bcast, 1, 10); int ret = poll(&pfd_bcast, 1, 10);
if(ret < 0) { if(ret < 0) {
LOG(LVL_ERR, "poll: %s", strerror(errno)); LOG(LVL_ERR, "poll: %s", strerror(errno));
break; return EXIT_FAILURE;
} }
if(ret == 0) { if(ret == 0) {
@ -327,13 +275,39 @@ int main(int argc, char **argv)
ret = recv(m_bcast_sock, packetbuf, sizeof(packetbuf), 0); ret = recv(m_bcast_sock, packetbuf, sizeof(packetbuf), 0);
if(ret < 0) { if(ret < 0) {
LOG(LVL_ERR, "recv: %s", strerror(errno)); LOG(LVL_ERR, "recv: %s", strerror(errno));
return EXIT_FAILURE;
} else if(ret == 0) {
LOG(LVL_ERR, "recv() returned zero.");
return EXIT_FAILURE;
} }
if(ret <= 0) { block_tx_for(TX_SWITCH_BACKOFF_END_OF_PACKET_MS);
break;
}
handle_received_packet(packetbuf, ret); layer2_data_packet_t data_packet;
result_t result = digipeater_handle_packet(&digipeater, packetbuf, ret, &data_packet);
switch(result) {
case OK:
m_rx_stats.successful_decodes++;
if(data_packet.payload_len != 0) {
rx_data_to_tun(&data_packet);
}
break;
case ERR_INTEGRITY:
LOG(LVL_ERR, "Packet could not be decoded by Layer 2.");
m_rx_stats.failed_decodes++;
break;
case ERR_SEQUENCE:
LOG(LVL_ERR, "Packet not in the expected sequence.");
break;
default: // all other errors
LOG(LVL_ERR, "layer2_rx_handle_packet() returned error code %u.", result);
break;
}
total_bytes += ret; total_bytes += ret;
@ -363,7 +337,7 @@ int main(int argc, char **argv)
close(m_bcast_sock); close(m_bcast_sock);
connection_destroy(&l2conn); digipeater_destroy(&digipeater);
jsonlogger_shutdown(); jsonlogger_shutdown();