Compare commits

...

22 commits

Author SHA1 Message Date
Thomas Kolb c1af786a70 WIP: handle packets from the TUN device in digipeater
Some checks failed
/ build-hamnet70 (push) Failing after 18s
/ build-doc (push) Successful in 17s
/ deploy-doc (push) Has been skipped
This is only a backup commit.
2024-11-10 23:13:54 +01:00
Thomas Kolb 61814584a2 tundev: remove IFF_NO_PI
Some checks failed
/ build-hamnet70 (push) Failing after 18s
/ build-doc (push) Successful in 17s
/ deploy-doc (push) Has been skipped
Without this flag the kernel adds packet information to each packet. The
interesting part of that information is the EtherType of the packet, which
simplifies handling.
2024-11-10 23:10:32 +01:00
Thomas Kolb 49fe1fff29 l2udptest_client: fix compilation
Some checks failed
/ build-hamnet70 (push) Failing after 17s
/ build-doc (push) Successful in 17s
/ deploy-doc (push) Has been skipped
2024-11-10 21:40:07 +01:00
Thomas Kolb 4862c09bdd connection: calculate IPv6 address for peer 2024-11-10 21:39:15 +01:00
Thomas Kolb e44cfb722d Add missing call to digipeater_end_interval()
Some checks failed
/ build-hamnet70 (push) Failing after 17s
/ build-doc (push) Successful in 17s
/ deploy-doc (push) Has been skipped
2024-11-10 21:01:13 +01:00
Thomas Kolb 3a8f469e8e digipeater: add one-shot packet queue and interval handling
Some checks failed
/ build-hamnet70 (push) Failing after 18s
/ build-doc (push) Successful in 17s
/ deploy-doc (push) Has been skipped
The oneshot queue is for connection management frames that are only sent once,
at the beginning of the next burst. An example is the Connection Reset packet.

Intervals define the boundary between the handling of different connections.
The interval can be either ended by a packet with TX Request set, or by a
timeout. In either case, forwarding of packets to the current connection stops
and the connection is re-scheduled to a later point in time.
2024-11-10 20:18:10 +01:00
Thomas Kolb 93d74be000 WIP: managing multiple connections
Some checks failed
/ build-hamnet70 (push) Failing after 18s
/ build-doc (push) Successful in 20s
/ deploy-doc (push) Has been skipped
Working towards handling multiple connections. A lot is still missing.
2024-11-10 17:02:14 +01:00
Thomas Kolb 31a627d434 WIP: digipeater module managing multiple connections
Some checks failed
/ build-hamnet70 (push) Failing after 18s
/ build-doc (push) Successful in 17s
/ deploy-doc (push) Has been skipped
Backup commit. This is a basic setup with lots of TODOs and errors. Will not
compile yet.
2024-11-03 14:40:11 +01:00
Thomas Kolb bfdb174324 l2/connection: refactoring
Some checks failed
/ build-hamnet70 (push) Failing after 18s
/ build-doc (push) Successful in 18s
/ deploy-doc (push) Has been skipped
- split handle_packet() in two functions: the first does basic checks and calls
  the second one. This allows to do the basic checks externally (in the
  multi-connection management module) without duplicating them.
- use the new layer2_encode_packet() function.
2024-11-03 14:35:01 +01:00
Thomas Kolb 0345dc589b l2/packet_structs: add function to encode a complete packet 2024-11-03 14:34:03 +01:00
Thomas Kolb 8c79185df6 Handle retransmits inside the connection module 2024-11-02 20:20:39 +01:00
Thomas Kolb c4e6028d42 Add a module to manage a list of connections 2024-11-02 18:30:25 +01:00
Thomas Kolb 03912a6185 Do all time calculations in uint64_t
This prevents loss of precision that occurs with double-precision floats if
timestamps become very large. Timestamps are already large if they contain a
UNIX time value (requires 60 bits; double has 53 bit resolution).
2024-11-02 16:17:52 +01:00
Thomas Kolb 4281ae4be7 l2udptest: replace by two programs: client and digipeater
Both are identical so far, this is just an infrastructure commit.
2024-11-02 16:07:25 +01:00
Thomas Kolb 04397c2c95 Remove empty packet from queue after burst was transmitted
Some checks failed
/ build-hamnet70 (push) Failing after 28s
/ build-doc (push) Successful in 15s
/ deploy-doc (push) Has been skipped
2024-09-22 18:46:32 +02:00
Thomas Kolb 8c3eca658c connection: do not send ACKs for empty packets
Some checks failed
/ build-hamnet70 (push) Failing after 27s
/ build-doc (push) Successful in 15s
/ deploy-doc (push) Has been skipped
2024-09-22 15:56:43 +02:00
Thomas Kolb dedad8b81f connection: handle ACKs from empty packets correctly
Some checks failed
/ build-hamnet70 (push) Failing after 29s
/ build-doc (push) Successful in 16s
/ deploy-doc (push) Has been skipped
2024-09-22 15:52:44 +02:00
Thomas Kolb f0770baf31 Handle received packets
Some checks failed
/ build-hamnet70 (push) Failing after 29s
/ build-doc (push) Successful in 16s
/ deploy-doc (push) Has been skipped
2024-09-22 15:34:50 +02:00
Thomas Kolb a07ffa265e connection: use correct addresses for empty packets
Some checks failed
/ build-hamnet70 (push) Failing after 28s
/ build-doc (push) Successful in 15s
/ deploy-doc (push) Has been skipped
2024-09-22 15:28:24 +02:00
Thomas Kolb 0a77109ab5 l2udp: Use the new layer2 connection module
Some checks failed
/ build-hamnet70 (push) Failing after 29s
/ build-doc (push) Successful in 16s
/ deploy-doc (push) Has been skipped
2024-09-22 15:24:20 +02:00
Thomas Kolb 85ec105d0f Pass received packets to layer 2 processing
All checks were successful
/ build-hamnet70 (push) Successful in 29s
/ build-doc (push) Successful in 16s
/ deploy-doc (push) Has been skipped
2024-09-22 14:45:24 +02:00
Thomas Kolb ec6dd92444 Basic infrastructure for layer2-over-udp test
All checks were successful
/ build-hamnet70 (push) Successful in 29s
/ build-doc (push) Successful in 17s
/ deploy-doc (push) Has been skipped
2024-09-22 14:32:49 +02:00
18 changed files with 2006 additions and 94 deletions

View file

@ -13,7 +13,7 @@ HOSTID="$1"
DEV=hamnet70
sudo ip tuntap add dev $DEV mode tun user $(whoami)
sudo ip tuntap add dev $DEV mode tun pi user $(whoami)
sudo ip link set dev $DEV txqueuelen 16 # default is 500
sudo ip link set dev $DEV up
sudo ip address add dev $DEV fd73::$HOSTID/64

View file

@ -1,15 +1,26 @@
#include <string.h>
#include <assert.h>
#define LOGGER_MODULE_NAME "conn"
#include "logger.h"
#include "connection.h"
#include "config.h"
#include "layer2/ham64.h"
#include "layer2/packet_queue.h"
#include "results.h"
#include "utils.h"
#define SEQ_NR_MASK 0xF
result_t connection_init(connection_ctx_t *ctx, const ham64_t *my_addr, const ham64_t *peer_addr)
result_t connection_init(
connection_ctx_t *ctx,
const ham64_t *my_addr,
const ham64_t *peer_addr,
connection_data_callback_t data_cb,
connection_event_callback_t event_cb,
void *user_ctx)
{
ctx->last_acked_seq = 0;
ctx->next_expected_seq = 0;
@ -21,6 +32,36 @@ result_t connection_init(connection_ctx_t *ctx, const ham64_t *my_addr, const ha
ctx->my_addr = *my_addr;
ctx->peer_addr = *peer_addr;
ctx->data_cb = data_cb;
ctx->event_cb = event_cb;
uint64_t now = get_hires_time();
ctx->last_rx_time = now;
ctx->retransmit_time = 0;
ctx->user_context = user_ctx;
// calculate IPv6 address
struct in6_addr net_addr;
inet_pton(AF_INET6, IPV6_NET, &net_addr);
memset(ctx->peer_ipv6_addr.s6_addr, 0, sizeof(ctx->peer_ipv6_addr));
memcpy(ctx->peer_ipv6_addr.s6_addr, net_addr.s6_addr, 8); // copy the network part
// fill the host part from the peers ham64 address. The bytes are filled in
// reverse order to make the „readable“ IPv6 address as short as possible.
for(uint8_t i = 0; i < peer_addr->length; i++) {
ctx->peer_ipv6_addr.s6_addr[15 - 2*i] = (peer_addr->addr[i] >> 8) & 0xFF;
ctx->peer_ipv6_addr.s6_addr[14 - 2*i] = (peer_addr->addr[i] >> 0) & 0xFF;
}
// print the address for debugging
char ipv6_str[INET6_ADDRSTRLEN];
char ham64_str[HAM64_FMT_MAX_LEN];
ham64_format(peer_addr, ham64_str);
inet_ntop(AF_INET6, &ctx->peer_ipv6_addr, ipv6_str, sizeof(ipv6_str));
LOG(LVL_DEBUG, "IPv6 address assigned to %s is %s.", ham64_str, ipv6_str);
ctx->conn_state = CONN_STATE_INITIALIZED;
return OK;
@ -71,17 +112,9 @@ result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, siz
}
// check if the packet really should be handled by us
if(!ham64_is_equal(&header.src_addr, &ctx->peer_addr)) {
char fmt_src_addr[HAM64_FMT_MAX_LEN];
char fmt_peer_addr[HAM64_FMT_MAX_LEN];
ham64_format(&header.src_addr, fmt_src_addr);
ham64_format(&ctx->peer_addr, fmt_peer_addr);
LOG(LVL_ERR, "Packet has the wrong source address: got %s, expected %s",
fmt_src_addr, fmt_peer_addr);
return ERR_INVALID_ADDRESS;
if(ham64_is_equal(&ctx->my_addr, &header.src_addr)) {
LOG(LVL_DEBUG, "Packet is from ourselves. Ignored.");
return OK;
}
if(!ham64_is_equal(&header.dst_addr, &ctx->my_addr)) {
@ -97,50 +130,90 @@ result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, siz
return ERR_INVALID_ADDRESS;
}
size_t header_size = layer2_get_encoded_header_size(&header);
const uint8_t *payload = buf + header_size;
size_t payload_len = packet_size - header_size;
return connection_handle_packet_prechecked(ctx, &header, payload, payload_len);
}
result_t connection_handle_packet_prechecked(
connection_ctx_t *ctx,
const layer2_packet_header_t *header,
const uint8_t *payload, size_t payload_len)
{
// check the connection state
switch(ctx->conn_state) {
case CONN_STATE_UNINITIALIZED:
case CONN_STATE_INITIALIZED:
case CONN_STATE_CLOSED:
LOG(LVL_ERR, "Trying to pass packet to connection in state %u", ctx->conn_state);
return ERR_INVALID_STATE;
case CONN_STATE_CONNECTING:
case CONN_STATE_ESTABLISHED:
// in these states, packets can be handled
break;
}
// check if this packet is from our designated peer
if(!ham64_is_equal(&header->src_addr, &ctx->peer_addr)) {
char fmt_src_addr[HAM64_FMT_MAX_LEN];
char fmt_peer_addr[HAM64_FMT_MAX_LEN];
ham64_format(&header->src_addr, fmt_src_addr);
ham64_format(&ctx->peer_addr, fmt_peer_addr);
LOG(LVL_ERR, "Packet has the wrong source address: got %s, expected %s",
fmt_src_addr, fmt_peer_addr);
return ERR_INVALID_ADDRESS;
}
LOG(LVL_DEBUG, "Handling %s packet with rx_seq_nr %u, tx_seq_nr %u.",
layer2_msg_type_to_string(header.msg_type), header.rx_seq_nr, header.tx_seq_nr);
layer2_msg_type_to_string(header->msg_type), header->rx_seq_nr, header->tx_seq_nr);
ctx->last_acked_seq = header.rx_seq_nr;
ctx->last_acked_seq = header->rx_seq_nr;
switch(header.msg_type) {
switch(header->msg_type) {
case L2_MSG_TYPE_EMPTY:
LOG(LVL_DEBUG, "Empty packet: accepted ACK for %u.", ctx->last_acked_seq);
// handle the acknowledgement internally
connection_handle_ack(ctx, header->rx_seq_nr, false);
return OK; // do not ACK and call back
case L2_MSG_TYPE_CONN_MGMT:
case L2_MSG_TYPE_CONNECTIONLESS:
LOG(LVL_WARN, "Message type %s is not implemented yet.", layer2_msg_type_to_string(header.msg_type));
LOG(LVL_WARN, "Message type %s is not implemented yet.", layer2_msg_type_to_string(header->msg_type));
return OK;
case L2_MSG_TYPE_DATA:
break;
default:
LOG(LVL_ERR, "Invalid message type %d.", header.msg_type);
LOG(LVL_ERR, "Invalid message type %d.", header->msg_type);
return ERR_INVALID_STATE;
}
if(ctx->next_expected_seq != header.tx_seq_nr) {
LOG(LVL_ERR, "Expected sequence number %u, received %u.", ctx->next_expected_seq, header.tx_seq_nr);
if(ctx->next_expected_seq != header->tx_seq_nr) {
LOG(LVL_ERR, "Expected sequence number %u, received %u.", ctx->next_expected_seq, header->tx_seq_nr);
return ERR_SEQUENCE;
}
ctx->next_expected_seq++;
ctx->next_expected_seq &= 0xF;
LOG(LVL_INFO, "Received ACK for seq_nr %u in packet seq_nr %u.", header.rx_seq_nr, header.tx_seq_nr);
ctx->last_rx_time = get_hires_time();
LOG(LVL_INFO, "Received ACK for seq_nr %u in packet seq_nr %u.", header->rx_seq_nr, header->tx_seq_nr);
// handle the acknowledgement internally
connection_handle_ack(ctx, header.rx_seq_nr);
size_t header_size = layer2_get_encoded_header_size(&header);
connection_handle_ack(ctx, header->rx_seq_nr, true);
// extract the payload and forward it to the tun device
const uint8_t *payload = buf + header_size;
size_t payload_len = packet_size - header_size;
ctx->data_cb(ctx, payload, payload_len);
ctx->data_cb(ctx, payload, payload_len, ctx->user_context);
return OK;
}
@ -209,6 +282,12 @@ result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t b
}
bool connection_can_enqueue_packet(const connection_ctx_t *ctx)
{
return packet_queue_get_free_space(&ctx->packet_queue) > 0;
}
result_t connection_add_empty_packet(connection_ctx_t *ctx, bool tx_request)
{
// check the connection state
@ -227,10 +306,8 @@ result_t connection_add_empty_packet(connection_ctx_t *ctx, bool tx_request)
layer2_packet_header_t header;
header.dst_addr.addr[0] = 0xFFFF;
header.dst_addr.length = 1;
header.src_addr.addr[0] = 0x0001;
header.src_addr.length = 1;
header.dst_addr = ctx->peer_addr;
header.src_addr = ctx->my_addr;
header.msg_type = L2_MSG_TYPE_EMPTY;
header.rx_seq_nr = 0; // will be filled in layer2_tx_encode_next_packet()
header.tx_seq_nr = 0; // not used in empty packets
@ -244,7 +321,56 @@ result_t connection_add_empty_packet(connection_ctx_t *ctx, bool tx_request)
}
size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t ack_seq_nr, uint8_t *buf, size_t buf_len)
result_t connection_send_parameters(connection_ctx_t *ctx)
{
// check the connection state
switch(ctx->conn_state) {
case CONN_STATE_UNINITIALIZED:
case CONN_STATE_CLOSED:
case CONN_STATE_CONNECTING:
case CONN_STATE_ESTABLISHED:
LOG(LVL_ERR, "Trying to send connection parameters in state %u", ctx->conn_state);
return ERR_INVALID_STATE;
case CONN_STATE_INITIALIZED:
// in these states, packets can be handled
break;
}
layer2_packet_header_t header;
header.dst_addr = ctx->peer_addr;
header.src_addr = ctx->my_addr;
header.msg_type = L2_MSG_TYPE_CONN_MGMT;
header.rx_seq_nr = 0; // will be filled in layer2_tx_encode_next_packet()
header.tx_seq_nr = ctx->next_seq_nr;
header.tx_request = 1;
size_t payload_len = 1;
uint8_t *payload = malloc(payload_len);
if(!payload) {
return ERR_NO_MEM;
}
payload[0] = CONN_MGMT_TYPE_CONNECTION_PARAMETERS;
// TODO: calculate IP addresses from clients HAM64 address
if (!packet_queue_add(&ctx->packet_queue, &header, payload, payload_len)) {
return ERR_NO_MEM;
}
ctx->next_seq_nr++;
ctx->next_seq_nr &= SEQ_NR_MASK;
// connection is considered established after the connection parameters are sent.
ctx->conn_state = CONN_STATE_ESTABLISHED;
return OK;
}
size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len, bool *end_burst)
{
// check the connection state
switch(ctx->conn_state) {
@ -252,7 +378,7 @@ size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t ack_seq_nr,
case CONN_STATE_INITIALIZED:
case CONN_STATE_CLOSED:
LOG(LVL_ERR, "Trying to encode packet in inactive state %u", ctx->conn_state);
return ERR_INVALID_STATE;
return 0;
case CONN_STATE_CONNECTING:
case CONN_STATE_ESTABLISHED:
@ -267,32 +393,21 @@ size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t ack_seq_nr,
return 0;
}
unsigned int crc_size = crc_sizeof_key(PAYLOAD_CRC_SCHEME);
assert(buf_len >= LAYER2_PACKET_HEADER_ENCODED_SIZE_MAX + crc_size + entry->data_len);
layer2_packet_header_t header = entry->header;
header.rx_seq_nr = ack_seq_nr;
header.rx_seq_nr = ctx->next_expected_seq;
// encode the header
LOG(LVL_DEBUG, "Encoding packet with rx_seq_nr %u, tx_seq_nr %u.", header.rx_seq_nr, header.tx_seq_nr);
size_t packet_size = layer2_encode_packet_header(&header, buf);
// add the payload data
if(entry->data) {
memcpy(buf + packet_size, entry->data, entry->data_len);
size_t packet_size = layer2_encode_packet(&header, entry->data, entry->data_len, buf, buf_len);
if(packet_size == 0) {
LOG(LVL_ERR, "Buffer too small for encoded packet!");
return 0;
}
packet_size += entry->data_len;
// calculate CRC of everything and append it to the packet
crc_append_key(PAYLOAD_CRC_SCHEME, buf, packet_size);
packet_size += crc_size;
ctx->next_packet_index++;
*end_burst = header.tx_request;
return packet_size;
}
@ -318,7 +433,7 @@ void connection_tx_clean_empty_packet(connection_ctx_t *ctx)
}
void connection_handle_ack(connection_ctx_t *ctx, uint8_t acked_seq)
void connection_handle_ack(connection_ctx_t *ctx, uint8_t acked_seq, bool do_ack)
{
// check the connection state
switch(ctx->conn_state) {
@ -353,9 +468,11 @@ void connection_handle_ack(connection_ctx_t *ctx, uint8_t acked_seq)
packets_available = packet_queue_get_used_space(&ctx->packet_queue);
LOG(LVL_DEBUG, "handling ack for seq_nr %u, removing %zu packets, %zu packets remaining.", acked_seq, packets_to_remove, packets_available);
LOG(LVL_DEBUG, "handling ack for seq_nr %u, removed %zu packets, %zu packets remaining.", acked_seq, packets_to_remove, packets_available);
if(packets_available == 0) {
ctx->retransmit_time = get_hires_time() + HRTIME_MS(RETRANSMIT_TIMEOUT_MS);
if(do_ack && packets_available == 0) {
// no packets left in queue, but an acknowledgement must be
// transmitted. Add an empty packet to do that.
result_t res = connection_add_empty_packet(ctx, false);
@ -373,3 +490,40 @@ bool connection_can_transmit(const connection_ctx_t *ctx)
return (packet_queue_get_used_space(&ctx->packet_queue) != 0)
&& (packet_queue_get(&ctx->packet_queue, ctx->next_packet_index) != NULL);
}
bool connection_is_closed(const connection_ctx_t *ctx)
{
switch(ctx->conn_state) {
case CONN_STATE_UNINITIALIZED:
case CONN_STATE_INITIALIZED:
case CONN_STATE_CLOSED:
return true;
case CONN_STATE_CONNECTING:
case CONN_STATE_ESTABLISHED:
return false;
}
}
result_t connection_maintain(connection_ctx_t *ctx)
{
uint64_t now = get_hires_time();
if(now > ctx->last_rx_time + HRTIME_MS(CONNECTION_TIMEOUT_MS)) {
LOG(LVL_INFO, "Connection timed out.");
ctx->conn_state = CONN_STATE_CLOSED;
ctx->event_cb(ctx, CONN_EVT_TIMEOUT, ctx->user_context);
return OK;
}
if(ctx->retransmit_time != 0 && now >= ctx->retransmit_time) {
LOG(LVL_INFO, "Retransmit triggered.");
ctx->retransmit_time = 0;
connection_restart_tx(ctx);
ctx->event_cb(ctx, CONN_EVT_RETRANSMIT, ctx->user_context);
}
return OK;
}

View file

@ -14,6 +14,8 @@
#include "packet_queue.h"
#include <arpa/inet.h>
struct connection_ctx_s;
typedef enum {
@ -25,16 +27,27 @@ typedef enum {
} connection_state_t;
/*!\brief Type for a callback function that is called when a data packet was received. */
typedef void (*connection_data_callback_t)(struct connection_ctx_s *conn, const uint8_t *data, size_t len);
typedef void (*connection_data_callback_t)(struct connection_ctx_s *conn, const uint8_t *data, size_t len, void *user_ctx);
typedef enum {
CONN_EVT_TIMEOUT, //!< The connection timed out because no packets were received
CONN_EVT_RETRANSMIT, //!< Packet queue transmission is restarted
} connection_evt_t;
/*!\brief Type for a callback function that is called on various connection events. */
typedef void (*connection_event_callback_t)(struct connection_ctx_s *conn, connection_evt_t evt, void *user_ctx);
typedef struct connection_ctx_s {
connection_state_t conn_state; //!< State of the connection.
connection_data_callback_t data_cb; //!< Callback function for received data packets.
connection_event_callback_t event_cb; //!< Callback function for event signalling.
ham64_t my_addr; //!< The local link layer address.
ham64_t peer_addr; //!< The link layer address of the peer.
struct in6_addr peer_ipv6_addr; //!< The peers IPv6 address (generated from LL address)
uint8_t last_acked_seq; //!< Next sequence number expected by the peer (from last Ack).
uint8_t next_expected_seq; //!< Next sequence number expected by us.
@ -42,6 +55,11 @@ typedef struct connection_ctx_s {
size_t next_packet_index; //!< Index in the packet queue of the next packet to transmit.
uint8_t next_seq_nr; //!< Sequence number to tag the next transmitted packet with.
uint64_t retransmit_time; //!< Time when a retransmit shall be triggered.
uint64_t last_rx_time; //!< Time when a packet was last received and decoded.
void *user_context; //!< An arbitrary pointer that can set by the user.
} connection_ctx_t;
@ -50,9 +68,18 @@ typedef struct connection_ctx_s {
* \param ctx The connection context to initialize.
* \param my_addr The local link layer address.
* \param peer_addr The remote link layer address.
* \param data_cb Callback for handling received payload data.
* \param event_cb Callback for connection events.
* \param user_ctx User context pointer (for arbitrary data).
* \returns OK if everything worked or a fitting error code.
*/
result_t connection_init(connection_ctx_t *ctx, const ham64_t *my_addr, const ham64_t *peer_addr);
result_t connection_init(
connection_ctx_t *ctx,
const ham64_t *my_addr,
const ham64_t *peer_addr,
connection_data_callback_t data_cb,
connection_event_callback_t event_cb,
void *user_ctx);
/*!\brief Destroy the given layer 2 connection context.
*/
@ -67,6 +94,24 @@ void connection_destroy(connection_ctx_t *ctx);
*/
result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, size_t buf_len);
/*!\brief Handle a received packet where the header has already been decoded.
*
* This function assumes that the following basic checks were already done:
* - CRC is correct
* - Header can be decoded
* - Destination address is the local address
*
* \param ctx The connection context.
* \param header Pointer to the decoded header structure.
* \param payload Pointer to the payload data.
* \param payload_len Length of the payload data.
* \returns A result code from the packet handling procedure.
*/
result_t connection_handle_packet_prechecked(
connection_ctx_t *ctx,
const layer2_packet_header_t *header,
const uint8_t *payload, size_t payload_len);
/*!\brief Return the sequence number expected next by our side.
*/
uint8_t connection_get_next_expected_seq(const connection_ctx_t *ctx);
@ -80,12 +125,25 @@ uint8_t connection_get_last_acked_seq(const connection_ctx_t *ctx);
*/
result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len);
/*!\brief Check if there is free space in the TX packet queue.
* \param ctx The connection context.
*/
bool connection_can_enqueue_packet(const connection_ctx_t *ctx);
/*!\brief Add an empty packet to ensure an acknowledgement is sent.
* \param ctx The connection context.
* \param tx_request Value of the TX Request field in the packet.
*/
result_t connection_add_empty_packet(connection_ctx_t *ctx, bool tx_request);
/*!\brief Send connection parameters.
*
* This packet accepts an incoming connection request.
*
* \param ctx The connection context.
*/
result_t connection_send_parameters(connection_ctx_t *ctx);
/*!\brief Encode the next packet for transmission.
*
* \note
@ -94,12 +152,12 @@ result_t connection_add_empty_packet(connection_ctx_t *ctx, bool tx_request);
* called to handle retransmits correctly.
*
* \param ctx The connection context.
* \param ack_seq_nr The received sequence number to send as an acknowledgement.
* \param buf Where to write the encoded packet data.
* \param buf_len Space available in the buffer.
* \param end_burst Output parameter that is set to true if this is the last packet in a burst.
* \returns The number of bytes written to buf or zero if no packet was available.
*/
size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t ack_seq_nr, uint8_t *buf, size_t buf_len);
size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len, bool *end_burst);
/*!\brief Restart the transmission from the beginning of the packet queue.
*/
@ -117,10 +175,20 @@ void connection_tx_clean_empty_packet(connection_ctx_t *ctx);
* \param acked_seq The acknowledged (= next expected) sequence number.
* \param do_ack Whether an empty packet shall be generated if the queue is empty.
*/
void connection_handle_ack(connection_ctx_t *ctx, uint8_t acked_seq);
void connection_handle_ack(connection_ctx_t *ctx, uint8_t acked_seq, bool do_ack);
/*!\brief Check if there are packets queued for transmission.
*/
bool connection_can_transmit(const connection_ctx_t *ctx);
/*!\brief Check if this connection is closed.
*/
bool connection_is_closed(const connection_ctx_t *ctx);
/*!\brief Handle internal maintenance tasks.
*
* This should be called periodically to handle timeouts and retransmissions.
*/
result_t connection_maintain(connection_ctx_t *ctx);
#endif // CONNECTION_H

View file

@ -0,0 +1,200 @@
#include <string.h>
#include <assert.h>
#include "connection_list.h"
#include "layer2/connection.h"
#include "results.h"
#define SEQ_NR_MASK 0xF
static destroy_entry(connection_list_entry_t *entry)
{
connection_destroy(&entry->connection);
free(entry);
}
// find the location where the timestamp should be inserted to maintain
// ascending timestamp order and return the then-previous entry.
//
// If NULL is returned, the list is either empty or the entry should be
// inserted before the head. This must be checked externally.
static connection_list_entry_t* find_prev_for_timestamp(connection_list_t *list, uint64_t timestamp)
{
if(!list->head) {
return NULL;
}
if(timestamp < list->head->next_access_time) {
return NULL;
}
connection_list_entry_t *prev = list->head;
while(prev->next) {
if(prev->next_access_time < timestamp
&& prev->next->next_access_time >= timestamp) {
// location found!
break;
}
prev = prev->next;
}
// either the correct location was found or prev now points to the last entry
// in the list.
return prev;
}
result_t connection_list_init(connection_list_t *list)
{
list->head = NULL;
return OK;
}
void connection_list_destroy(connection_list_t *list)
{
// delete all list entries
while(list->head) {
connection_list_entry_t *next = list->head->next;
destroy_entry(list->head);
list->head = next;
}
}
result_t connection_list_insert(connection_list_t *list, const connection_ctx_t *conn, uint64_t next_access_time)
{
connection_list_entry_t *new_entry = malloc(sizeof(connection_list_entry_t));
if(!new_entry) {
return ERR_NO_MEM;
}
new_entry->connection = *conn;
new_entry->next_access_time = next_access_time;
connection_list_entry_t *prev = find_prev_for_timestamp(list, next_access_time);
if(!prev) {
// the new entry should become the lists head (also works if the list is empty!)
new_entry->next = list->head;
list->head = new_entry;
} else {
// insert after prev
new_entry->next = prev->next;
prev->next = new_entry;
}
return OK;
}
connection_list_entry_t* connection_list_get_head(connection_list_t *list)
{
return list->head;
}
result_t connection_list_reschedule_head(connection_list_t *list, uint64_t next_access_timestamp)
{
if(!list->head) {
return ERR_INVALID_STATE;
}
if(!list->head->next) {
// nothing to do because there is only one entry.
return OK;
}
if(next_access_timestamp < list->head->next->next_access_time) {
// the head does not need to be moved because the new timestamp is still
// smaller than that of the second entry => only update the timestamp.
list->head->next_access_time = next_access_timestamp;
return OK;
}
// detach the entry from the list
connection_list_entry_t *reloc_entry = list->head;
list->head = list->head->next;
// find the location where the entry should be reinserted
connection_list_entry_t *prev = find_prev_for_timestamp(list, next_access_timestamp);
if(!prev) {
// the relocated entry should become the lists head (also works if the list is empty!)
reloc_entry->next = list->head;
list->head = reloc_entry;
} else {
// insert after prev
reloc_entry->next = prev->next;
prev->next = reloc_entry;
}
return OK;
}
result_t connection_list_delete_head(connection_list_t *list)
{
if(!list->head) {
return ERR_INVALID_STATE;
}
connection_list_entry_t *new_head = list->head->next;
destroy_entry(list->head);
list->head = new_head;
return OK;
}
result_t connection_list_delete_closed(connection_list_t *list)
{
if(!list->head) {
return OK;
}
connection_list_entry_t *prev = NULL;
connection_list_entry_t *cur = list->head;
while(cur) {
if(connection_is_closed(&cur->connection)) {
connection_list_entry_t *to_delete = cur;
cur = cur->next;
if(prev) {
prev->next = to_delete->next;
} else {
list->head = to_delete->next;
}
destroy_entry(to_delete);
} else {
prev = cur;
cur = cur->next;
}
}
return OK;
}
bool connection_list_can_enqueue_packet(connection_list_t *list)
{
if(!list->head) {
return false; // no entries -> no free space
}
connection_list_entry_t *entry = list->head;
while(entry) {
if(!connection_can_enqueue_packet(&entry->connection)) {
return false;
}
entry = entry->next;
}
return true;
}

View file

@ -0,0 +1,92 @@
/*
* This file contains functions to manage a list of layer 2 connections for
* scheduling purposes.
*
* SPDX-License-Identifier: GPL-3.0-or-later
*
* Copyright (C) 2024 Thomas Kolb
*/
#ifndef CONNECTION_LIST_H
#define CONNECTION_LIST_H
#include <results.h>
#include <stdbool.h>
#include "connection.h"
typedef struct connection_list_entry_s {
uint64_t next_access_time; //!< When to next activate this connection
connection_ctx_t connection; //!< The actual connection entry
struct connection_list_entry_s *next; //!< pointer to the next list element
} connection_list_entry_t;
typedef struct connection_list_s {
connection_list_entry_t *head; //!< pointer to the first list element
} connection_list_t;
/*!\brief Initialize a connection list.
*
* \param list The list to initialize.
* \returns OK if everything worked or a fitting error code.
*/
result_t connection_list_init(connection_list_t *list);
/*!\brief Destroy the given layer 2 connection list, deleting all internal data.
*/
void connection_list_destroy(connection_list_t *list);
/*!\brief Insert a connection context into the list.
*
* The connection context will be copied internally. The original copy should
* no longer be used after the context was inserted into the list.
*
* \param list Pointer to the list where the entry shall be inserted.
* \param conn Pointer to the connection context.
* \param next_access_time Timestamp when this connection should be activated.
* \returns OK if everything worked or a fitting error code.
*/
result_t connection_list_insert(connection_list_t *list, const connection_ctx_t *conn, uint64_t next_access_time);
/*!\brief Get the head (first entry) of the list.
*
* \param list Pointer to the list to access.
*
* \returns A pointer to the head entry or NULL if the list is empty.
*/
connection_list_entry_t* connection_list_get_head(connection_list_t *list);
/*!\brief Re-schedule the current head entry.
*
* \param list Pointer to the list to access.
* \param next_access_time Timestamp when this connection should be activated again.
*
* \returns OK if everything worked or a fitting error code.
*/
result_t connection_list_reschedule_head(connection_list_t *list, uint64_t next_access_timestamp);
/*!\brief Delete the current head entry.
*
* \param list Pointer to the list to access.
*
* \returns OK if everything worked or a fitting error code.
*/
result_t connection_list_delete_head(connection_list_t *list);
/*!\brief Delete all connections that are in a closed state.
*
* \param list Pointer to the list to clean.
*
* \returns OK if everything worked or a fitting error code.
*/
result_t connection_list_delete_closed(connection_list_t *list);
/*!\brief Determine if a packet can be enqueued in all queues.
*
* \returns False if any queue is full, true otherwise.
*/
bool connection_list_can_enqueue_packet(connection_list_t *list);
#endif // CONNECTION_LIST_H

View file

@ -0,0 +1,355 @@
#include <poll.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#define LOGGER_MODULE_NAME "digi"
#include "logger.h"
#include "digipeater.h"
#include "config.h"
#include "layer2/connection.h"
#include "layer2/connection_list.h"
#include "layer2/ham64.h"
#include "layer2/packet_structs.h"
#include "layer2/packet_queue.h"
#include "results.h"
#include "utils.h"
void conn_data_cb(struct connection_ctx_s *conn, const uint8_t *data, size_t len, void *user_ctx)
{
(void)conn;
digipeater_ctx_t *digi_ctx = user_ctx;
digi_ctx->data_cb(digi_ctx, data, len);
}
void conn_event_cb(struct connection_ctx_s *conn, connection_evt_t evt, void *user_ctx)
{
(void)conn;
digipeater_ctx_t *digi_ctx = user_ctx;
switch(evt) {
case CONN_EVT_TIMEOUT:
// connection has been closed by timeout -> clean up the list
connection_list_delete_closed(&digi_ctx->conn_list);
break;
default:
// do nothing
break;
}
}
static result_t digipeater_handle_beacon_responses(digipeater_ctx_t *ctx, const layer2_packet_header_t *header, const uint8_t *buf, size_t buf_len)
{
LOG(LVL_DEBUG, "Handling beacon response packet.");
layer2_dump_packet_header(LVL_DUMP, header);
if(header->msg_type != L2_MSG_TYPE_CONN_MGMT) {
LOG(LVL_ERR, "Beacon response with invalid message type %i", header->msg_type);
return ERR_INVALID_PARAM;
}
if(buf_len == 0) {
LOG(LVL_ERR, "Missing payload in beacon response");
return ERR_INVALID_PARAM;
}
uint8_t conn_mgmt_type = buf[0];
if(conn_mgmt_type != CONN_MGMT_TYPE_CONNECTION_REQUEST) {
LOG(LVL_ERR, "Unexpected connection management type in beacon response: 0x%02x", conn_mgmt_type);
return ERR_INVALID_PARAM;
}
// packet is valid -> create a new connection, enqueue the parameters message
// and add it at the beginning of the connection list.
connection_ctx_t new_conn;
ERR_CHECK(connection_init(&new_conn, &ctx->my_addr, &header->src_addr, conn_data_cb, conn_event_cb, ctx));
ERR_CHECK(connection_send_parameters(&new_conn));
uint64_t now = get_hires_time();
ERR_CHECK(connection_list_insert(&ctx->conn_list, &new_conn, now));
return OK;
}
result_t digipeater_init(digipeater_ctx_t *ctx, const ham64_t *my_addr, digipeater_data_callback_t data_cb)
{
ctx->my_addr = *my_addr;
ctx->data_cb = data_cb;
ctx->state = DIGIPEATER_STATE_CONN;
packet_queue_init(&ctx->oneshot_queue);
uint64_t now = get_hires_time();
ctx->next_beacon_time = now + HRTIME_MS(BEACON_INTERVAL_MS);
ctx->interval_end_time = now;
return connection_list_init(&ctx->conn_list);
}
void digipeater_destroy(digipeater_ctx_t *ctx)
{
connection_list_destroy(&ctx->conn_list);
}
result_t digipeater_handle_packet(digipeater_ctx_t *ctx, const uint8_t *buf, size_t buf_len)
{
// check the CRC
size_t packet_size = buf_len - crc_sizeof_key(PAYLOAD_CRC_SCHEME);
if(!crc_check_key(PAYLOAD_CRC_SCHEME, (unsigned char*)buf, packet_size)) {
LOG(LVL_ERR, "CRC check failed!");
return ERR_INTEGRITY;
}
// decode the header
layer2_packet_header_t header;
if(!layer2_decode_packet_header(buf, buf_len, &header)) {
LOG(LVL_ERR, "Header could not be decoded!");
return ERR_INTEGRITY;
}
layer2_dump_packet_header(LVL_DUMP, &header);
// check if the packet really should be handled by us
if(ham64_is_equal(&ctx->my_addr, &header.src_addr)) {
LOG(LVL_DEBUG, "Packet is from ourselves. Ignored.");
return OK;
}
if(!ham64_is_equal(&header.dst_addr, &ctx->my_addr)) {
char fmt_dst_addr[HAM64_FMT_MAX_LEN];
char fmt_my_addr[HAM64_FMT_MAX_LEN];
ham64_format(&header.dst_addr, fmt_dst_addr);
ham64_format(&ctx->my_addr, fmt_my_addr);
LOG(LVL_ERR, "Packet has the wrong destination address: got %s, expected %s",
fmt_dst_addr, fmt_my_addr);
return ERR_INVALID_ADDRESS;
}
// FIXME: handle connection management packets here (or somewhere else):
// - Disconnect Request
size_t header_size = layer2_get_encoded_header_size(&header);
const uint8_t *payload = buf + header_size;
size_t payload_len = packet_size - header_size;
result_t result = OK;
switch(ctx->state) {
case DIGIPEATER_STATE_CONN:
{
connection_list_entry_t *head = connection_list_get_head(&ctx->conn_list);
if(head) {
connection_ctx_t *current_conn = &head->connection;
result = connection_handle_packet_prechecked(current_conn, &header, payload, payload_len);
} else {
LOG(LVL_WARN, "Digipeater in CONN state, but there is no active connection! Packet dropped.");
result = OK;
}
}
case DIGIPEATER_STATE_BEACON:
result = digipeater_handle_beacon_responses(ctx, &header, payload, payload_len);
}
// end the current interval if tx_request is set in an incoming packet
if(header.tx_request) {
LOG(LVL_INFO, "TX Request was received. Ending current interval.");
digipeater_end_interval(ctx);
}
return result;
}
result_t digipeater_fill_packet_queues_from_tundev(digipeater_ctx_t *ctx, int tun_fd)
{
// first check if any queue is already full, so we don't have to drop packets
// from the TUN device queue.
if(!connection_list_can_enqueue_packet(&ctx->conn_list)) {
LOG(LVL_DEBUG, "No free space in queues.");
return OK; // do nothing
}
struct pollfd pfd_tun;
memset(&pfd_tun, 0, sizeof(pfd_tun));
pfd_tun.fd = tun_fd;
pfd_tun.events = POLLIN;
while(true) {
int ret = poll(&pfd_tun, 1, 0 /* timeout */);
if(ret < 0) {
LOG(LVL_ERR, "poll: %s", strerror(errno));
return ERR_SYSCALL;
} else if(ret == 0) {
// no more packets
break;
} else {
// a packet is available -> read it
static const size_t packetbuf_size = 2048;
uint8_t packetbuf[packetbuf_size];
ret = read(tun_fd, packetbuf, packetbuf_size);
if(ret < 0) {
LOG(LVL_ERR, "read: %s", strerror(errno));
return ERR_SYSCALL;
} else if(ret == 0) {
// no more data, should not happen
break;
}
LOG(LVL_DUMP, "TUN Flags: 0x%04x", *(uint16_t*)packetbuf);
LOG(LVL_DUMP, "TUN Proto: 0x%04x", *((uint16_t*)packetbuf + 1));
// FIXME:
//ERR_CHECK(connection_list_enqueue_packet(entry->connection, packetbuf, ret));
}
}
return OK;
}
size_t digipeater_encode_next_packet(digipeater_ctx_t *ctx, uint8_t *buf, size_t buf_len, bool *end_burst)
{
uint64_t now = get_hires_time();
size_t packet_size = 0;
*end_burst = false;
if(now > ctx->next_beacon_time) {
// build a beacon packet
layer2_packet_header_t header;
ham64_t broadcast = {{0xFFFF, 0, 0, 0}, 1};
header.dst_addr = broadcast;
header.src_addr = ctx->my_addr;
header.msg_type = L2_MSG_TYPE_CONN_MGMT;
header.rx_seq_nr = 0; // unused
header.tx_seq_nr = 0; // unused
header.tx_request = 1;
uint8_t payload[1] = {CONN_MGMT_TYPE_BEACON};
packet_size = layer2_encode_packet(&header, payload, 1, buf, buf_len);
ctx->state = DIGIPEATER_STATE_BEACON;
*end_burst = true;
return packet_size;
}
// send packets from the one-shot queue
const packet_queue_entry_t *queue_entry = packet_queue_get(&ctx->oneshot_queue, 0);
if(queue_entry) {
*end_burst = queue_entry->header.tx_request == 1;
packet_size = layer2_encode_packet(&queue_entry->header, queue_entry->data, queue_entry->data_len, buf, buf_len);
if(packet_size) {
packet_queue_delete(&ctx->oneshot_queue, 1);
return packet_size;
}
}
// pull packets from the current connection
connection_list_entry_t *head = connection_list_get_head(&ctx->conn_list);
if(!head) {
return 0;
}
connection_ctx_t *conn = &head->connection;
if(connection_can_transmit(conn)) {
packet_size = connection_encode_next_packet(conn, buf, buf_len, end_burst);
ctx->state = DIGIPEATER_STATE_CONN;
}
return packet_size;
}
bool digipeater_can_transmit(digipeater_ctx_t *ctx)
{
uint64_t now = get_hires_time();
if(now > ctx->next_beacon_time) {
return true;
}
connection_list_entry_t *head = connection_list_get_head(&ctx->conn_list);
if(head) {
return connection_can_transmit(&head->connection);
}
return false;
}
void digipeater_extend_interval(digipeater_ctx_t *ctx, uint64_t ns)
{
ctx->interval_end_time += ns;
}
result_t digipeater_end_interval(digipeater_ctx_t *ctx)
{
uint64_t now = get_hires_time();
// TODO: adjust the time based on connection activity; right now this results
// in round-robin scheduling
connection_list_reschedule_head(&ctx->conn_list, now + HRTIME_MS(MIN_INTERVAL_TIME_MS));
ctx->state = DIGIPEATER_STATE_CONN;
ctx->interval_end_time = now + HRTIME_MS(MIN_INTERVAL_TIME_MS);
return OK;
}
result_t digipeater_maintain(digipeater_ctx_t *ctx)
{
uint64_t now = get_hires_time();
if(now > ctx->interval_end_time) {
// at the end of the interval, the next connection is activated and the
// current one is re-scheduled.
LOG(LVL_DEBUG, "Interval ended by timeout at %llu ns.", now);
digipeater_end_interval(ctx);
}
switch(ctx->state) {
case DIGIPEATER_STATE_CONN:
{
connection_list_entry_t *head = connection_list_get_head(&ctx->conn_list);
if(!head) {
LOG(LVL_INFO, "No active connection -> force beacon state for packet handling.");
ctx->state = DIGIPEATER_STATE_BEACON;
} else {
ERR_CHECK(connection_maintain(&head->connection));
}
break;
}
case DIGIPEATER_STATE_BEACON:
// nothing to do here
break;
}
return OK;
}

View file

@ -0,0 +1,126 @@
/*
* This file contains functions to handle a single layer 2 digipeater.
*
* SPDX-License-Identifier: GPL-3.0-or-later
*
* Copyright (C) 2024 Thomas Kolb
*/
#ifndef DIGIPEATER_H
#define DIGIPEATER_H
#include <results.h>
#include <stdbool.h>
#include "connection_list.h"
#include "layer2/packet_queue.h"
struct digipeater_ctx_s;
typedef enum {
DIGIPEATER_STATE_BEACON, //!< Beacon sent, waiting for connection requests
DIGIPEATER_STATE_CONN, //!< Handling client connections
} digipeater_state_t;
typedef enum {
DIGIPEATER_EVT_INTERVAL_END, //!< The current interval has ended and new packets should be transmitted.
} digipeater_evt_t;
/*!\brief Type for a callback function that is called when a data packet was received. */
typedef void (*digipeater_data_callback_t)(struct digipeater_ctx_s *digi, const uint8_t *data, size_t len);
/*!\brief Type for a callback function that is called when certain events occur. */
typedef void (*digipeater_evt_callback_t)(struct digipeater_ctx_s *digi, digipeater_evt_t evt);
typedef struct digipeater_ctx_s {
digipeater_state_t state; //!< Current operating state
digipeater_data_callback_t data_cb; //!< Callback function for received data packets.
digipeater_evt_callback_t event_cb; //!< Callback function for events.
ham64_t my_addr; //!< The local link layer address.
packet_queue_t oneshot_queue; //!< Queue for packets that are sent once and connection-independent
uint64_t next_beacon_time; //!< Absolute timestamp of the next beacon transmission.
uint64_t interval_end_time; //!< Absolute timestamp of the end of the current interval.
connection_list_t conn_list; //!< List of connections.
} digipeater_ctx_t;
/*!\brief Initialize the digipeater context.
*
* \param ctx The digipeater context to initialize.
* \param my_addr The local link layer address.
* \param data_cb Callback function that handles received (decoded) data packets.
* \returns OK if everything worked or a fitting error code.
*/
result_t digipeater_init(
digipeater_ctx_t *ctx,
const ham64_t *my_addr,
digipeater_data_callback_t data_cb);
/*!\brief Destroy the given digipeater context.
*/
void digipeater_destroy(digipeater_ctx_t *ctx);
/*!\brief Handle a received packet.
*
* \param ctx The digipeater context.
* \param buf Pointer to the packet data.
* \param buf_len Length of the packet.
* \returns A result code from the packet handling procedure.
*/
result_t digipeater_handle_packet(digipeater_ctx_t *ctx, const uint8_t *buf, size_t buf_len);
/*!\brief Enqueue a packet for transmission.
* \param ctx The digipeater context.
* \param tun_fd File descriptor for an open TUN device.
*/
result_t digipeater_fill_packet_queues_from_tundev(digipeater_ctx_t *ctx, int tun_fd);
/*!\brief Encode the next packet for transmission.
*
* \note
* If no packets are currently available for transmission, this function returns zero.
*
* \param ctx The digipeater context.
* \param buf Where to write the encoded packet data.
* \param buf_len Space available in the buffer.
* \param end_burst Set to true if this packet should end the current burst and start the transmission.
* \returns The number of bytes written to buf or zero if no packet was available.
*/
size_t digipeater_encode_next_packet(digipeater_ctx_t *ctx, uint8_t *buf, size_t buf_len, bool *end_burst);
/*!\brief Check if there are packets queued for transmission.
*/
bool digipeater_can_transmit(digipeater_ctx_t *ctx);
/*!\brief Extend the current interval.
*
* By default, the interval duration is set to a minimum length. It must be
* extended by the time needed to transmit and receive packets. As the time
* necessary for packet transfer is unknown to the Layer 2, it must be
* calculated externally.
*
* This function should be called whenever
* - A packet is transmitted from digipeater_encode_next_packet(), or
* - A packet is being received
*/
void digipeater_extend_interval(digipeater_ctx_t *ctx, uint64_t ns);
/*!\brief End the current interval.
*
* End the interval without waiting for the timeout. This switches to the next
* connection and stop forwarding received packets to the current one.
*/
result_t digipeater_end_interval(digipeater_ctx_t *ctx);
/*!\brief Handle internal maintenance tasks.
*
* This should be called periodically to handle timeouts and retransmissions.
*/
result_t digipeater_maintain(digipeater_ctx_t *ctx);
#endif // DIGIPEATER_H

View file

@ -8,9 +8,13 @@
#include <assert.h>
#include <string.h>
#include <liquid/liquid.h>
#define LOGGER_MODULE_NAME "l2ps"
#include "logger.h"
#include "config.h"
#include "packet_structs.h"
@ -129,3 +133,36 @@ void layer2_dump_packet_header(int level, const layer2_packet_header_t *header)
}
}
size_t layer2_encode_packet(
const layer2_packet_header_t *header,
uint8_t *payload, size_t payload_len,
uint8_t *data_out, size_t data_out_len)
{
unsigned int crc_size = crc_sizeof_key(PAYLOAD_CRC_SCHEME);
if(data_out_len < LAYER2_PACKET_HEADER_ENCODED_SIZE_MAX) {
return 0;
}
size_t packet_size = layer2_encode_packet_header(header, data_out);
if(data_out_len < packet_size + crc_size + payload_len) {
return 0;
}
// add the payload data
if(payload) {
memcpy(data_out + packet_size, payload, payload_len);
}
packet_size += payload_len;
// calculate CRC of everything and append it to the packet
crc_append_key(PAYLOAD_CRC_SCHEME, data_out, packet_size);
packet_size += crc_size;
return packet_size;
}

View file

@ -33,6 +33,7 @@ typedef struct layer2_packet_header_s {
ham64_t dst_addr; //!< destination HAM-64 address
} layer2_packet_header_t;
// maximum header size
// - 1 byte packet info
// - 1 byte sequence numbers
@ -74,6 +75,19 @@ bool layer2_decode_packet_header(const uint8_t *encoded, size_t encoded_len, lay
*/
void layer2_dump_packet_header(int level, const layer2_packet_header_t *header);
/*!\brief Encode a complete packet (with header and CRC)
* \param header The header structure to encode.
* \param payload The payload data to encode.
* \param payload_len The length of the payload data.
* \param data_out The buffer for the encoded packet data.
* \param data_out_len The size of the output buffer.
* \returns The number of bytes written to the output buffer or 0 if not enough space was available.
*/
size_t layer2_encode_packet(
const layer2_packet_header_t *header,
uint8_t *payload, size_t payload_len,
uint8_t *data_out, size_t data_out_len);
/*!\brief Get a string representation of the given message type.
*/
const char* layer2_msg_type_to_string(layer2_message_type_t type);
@ -91,6 +105,22 @@ typedef struct layer2_data_header_s {
/* Connection Management Structs */
// TODO
typedef enum {
CONN_MGMT_TYPE_BEACON = 0x00,
CONN_MGMT_TYPE_CONNECTION_REQUEST = 0x01,
CONN_MGMT_TYPE_CONNECTION_PARAMETERS = 0x02,
CONN_MGMT_TYPE_CONNECTION_RESET = 0x03,
CONN_MGMT_TYPE_DISCONNECT_REQUEST = 0x04,
CONN_MGMT_TYPE_DISCONNECT = 0x05,
} conn_mgmt_type_t;
typedef enum {
CONN_PARAM_TYPE_IPV6_ADDRESS = 0x00,
CONN_PARAM_TYPE_IPV6_GATEWAY = 0x01,
CONN_PARAM_TYPE_IPV6_DNS = 0x02,
CONN_PARAM_TYPE_IPV4_ADDRESS = 0x08,
CONN_PARAM_TYPE_IPV4_GATEWAY = 0x09,
CONN_PARAM_TYPE_IPV4_DNS = 0x0A,
} conn_param_type_t;
#endif // PACKET_STRUCTS_H

View file

@ -29,7 +29,7 @@ int tundev_open(char *dev)
struct ifreq ifr;
memset(&ifr, 0, sizeof(ifr));
ifr.ifr_flags = IFF_TUN | IFF_NO_PI;
ifr.ifr_flags = IFF_TUN;
strncpy(ifr.ifr_name, dev, IFNAMSIZ);

View file

@ -50,7 +50,7 @@
static int m_tunfd = -1;
static bool m_running = true;
static double next_tx_switch_time = 0.0;
static uint64_t next_tx_switch_time = 0;
static rx_stats_t m_rx_stats;
@ -71,7 +71,7 @@ static void signal_handler(int signal, siginfo_t *info, void *ctx)
static void block_tx_for(unsigned offset_ms)
{
next_tx_switch_time = get_hires_time() + (double)offset_ms * 0.001;
next_tx_switch_time = get_hires_time() + HRTIME_MS(offset_ms);
}
void print_complex_array(const char *varname, float complex const *array, size_t len)
@ -186,7 +186,7 @@ int main(int argc, char **argv)
bool on_air = true;
srand((int)(get_hires_time() * 1e6));
srand(get_hires_time());
// ** Initialize **
@ -233,18 +233,18 @@ int main(int argc, char **argv)
unsigned rx_retries = 0;
double old = get_hires_time();
uint64_t old = get_hires_time();
size_t total_samples = 0;
double next_stats_print_time = old + 0.5;
uint64_t next_stats_print_time = old + HRTIME_MS(500);
double retransmit_time = 0.0;
uint64_t retransmit_time = 0;
while(m_running) {
double now = get_hires_time();
uint64_t now = get_hires_time();
if(retransmit_time != 0.0 && now >= retransmit_time) {
if(retransmit_time != 0 && now >= retransmit_time) {
LOG(LVL_INFO, "Retransmit triggered.");
retransmit_time = 0.0;
retransmit_time = 0;
layer2_tx_restart(&l2tx);
}
@ -325,7 +325,7 @@ int main(int argc, char **argv)
RESULT_CHECK(sdr_start_rx(&sdr));
on_air = false;
retransmit_time = get_hires_time() + 1.0 + 1.0 * rand() / RAND_MAX;
retransmit_time = get_hires_time() + HRTIME_SEC(1) + HRTIME_SEC(1.0 * rand() / RAND_MAX);
block_tx_for(TX_SWITCH_BACKOFF_AFTER_RX_ON);
}
@ -360,9 +360,9 @@ int main(int argc, char **argv)
total_samples += n_rf_samples;
double new = get_hires_time();
uint64_t new = get_hires_time();
if(new >= next_stats_print_time) {
double rate = total_samples / (new - old);
double rate = total_samples * 1e9 / (new - old);
LOG(LVL_INFO, "\nEstimated rate: %.3f MS/s", rate / 1e6);
LOG(LVL_INFO, "Receiver statistics:");
LOG(LVL_INFO, " Preambles found: %8zd", m_rx_stats.preambles_found);

View file

@ -99,9 +99,9 @@ static int tx_callback(hackrf_transfer *transfer)
return HACKRF_ERROR_OTHER;
}
if(sdr_ctx->tx_start_time == 0.0) {
if(sdr_ctx->tx_start_time == 0) {
sdr_ctx->tx_start_time = get_hires_time();
sdr_ctx->tx_duration = 10e-3; // give a little headroom
sdr_ctx->tx_duration = HRTIME_MS(10); // give a little headroom
LOG(LVL_INFO, "TX time tracking reset: start = %.3f.", sdr_ctx->tx_start_time);
}
@ -112,7 +112,7 @@ static int tx_callback(hackrf_transfer *transfer)
if(samples_read != 0) {
// only add time if any actual samples were transmitted
sdr_ctx->tx_duration += (double)samples_requested / SDR_TX_SAMPLING_RATE;
sdr_ctx->tx_duration += HRTIME_SEC((double)samples_requested / SDR_TX_SAMPLING_RATE);
}
LOG(LVL_DEBUG, "copied %u samples to HackRF.", samples_read);
@ -377,8 +377,8 @@ result_t sdr_flush_tx_buffer(sdr_ctx_t *ctx)
return 0;
}
double now = get_hires_time();
double end = ctx->tx_start_time + ctx->tx_duration;
uint64_t now = get_hires_time();
uint64_t end = ctx->tx_start_time + ctx->tx_duration;
if(sem_post(&ctx->buf_sem) < 0) {
LOG(LVL_ERR, "sem_post: %s", strerror(errno));

View file

@ -76,11 +76,23 @@ err_close:
return false;
}
double get_hires_time(void)
uint64_t get_hires_time(void)
{
struct timespec clk;
clock_gettime(CLOCK_MONOTONIC, &clk);
return clk.tv_sec + 1e-9 * clk.tv_nsec;
return clk.tv_sec * 1000000000ULL + (uint64_t)clk.tv_nsec;
}
void sleep_until(uint64_t hires_time)
{
struct timespec tv;
int ret;
tv.tv_sec = hires_time / 1000000000ULL;
tv.tv_nsec = hires_time % 1000000000ULL;
do {
ret = clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &tv, NULL);
} while(ret == EINTR);
}
void fsleep(double d)
@ -93,18 +105,6 @@ void fsleep(double d)
nanosleep(&ts, NULL);
}
void sleep_until(double hires_time)
{
struct timespec tv;
int ret;
tv.tv_sec = hires_time;
tv.tv_nsec = (uint64_t)(1e9 * hires_time) % 1000000000;
do {
ret = clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &tv, NULL);
} while(ret == EINTR);
}
void hexdump(const uint8_t *data, size_t len)
{
static const char lut[16] = "0123456789ABCDEF";

View file

@ -11,6 +11,10 @@
#include <stdbool.h>
#include <liquid/liquid.h>
#define HRTIME_US(x) ((uint64_t)(1000ULL * (x)))
#define HRTIME_MS(x) ((uint64_t)(1000000ULL * (x)))
#define HRTIME_SEC(x) ((uint64_t)(1000000000ULL * (x)))
/*! Dump a array of complex numbers.
*
* \param data Pointer to the data to dump.
@ -30,9 +34,26 @@ bool dump_array_cf(const float complex *data, size_t n, float T, const char *fil
*/
bool dump_array_f(const float *data, size_t n, float T, const char *filename);
void sleep_until(double hires_time);
/*! Sleep until the given absolute timestamp in ns.
*
* The current timestamp can be retrieved using \ref get_hires_time().
*
* \param hires_time The resume timestamp in ns.
*/
void sleep_until(uint64_t hires_time);
/*! Returns the current high-resulution timestamp.
*
* This timestamp comes from the CLOCK_MONOTONIC source and has no defined relation to the wall-clock time. It can be used to calculate intervals, though.
*
* \returns A timestamp in nanosecond resolution.
*/
uint64_t get_hires_time(void);
void fsleep(double d);
double get_hires_time(void);
void hexdump(const uint8_t *data, size_t len);

View file

@ -222,3 +222,7 @@ target_link_libraries(
test_interleaver
m
)
#------------------------------------
add_subdirectory(layer2_over_udp)

View file

@ -0,0 +1,69 @@
add_executable(
l2udptest_client
../../src/utils.c
../../src/utils.h
../../src/logger.c
../../src/logger.h
../../src/options.c
../../src/options.h
../../src/var_array.c
../../src/var_array.h
../../src/config.h
../../src/jsonlogger.c
../../src/jsonlogger.h
../../src/debug_structs.h
../../src/layer2/packet_structs.c
../../src/layer2/packet_structs.h
../../src/layer2/ham64.c
../../src/layer2/ham64.h
../../src/layer2/packet_queue.c
../../src/layer2/packet_queue.h
../../src/layer2/connection.c
../../src/layer2/connection.h
../../src/layer2/tundev.c
../../src/layer2/tundev.h
l2udptest_client.c
)
target_link_libraries(
l2udptest_client
fec
m
liquid
)
#---------------------------
add_executable(
l2udptest_digipeater
../../src/utils.c
../../src/utils.h
../../src/logger.c
../../src/logger.h
../../src/options.c
../../src/options.h
../../src/var_array.c
../../src/var_array.h
../../src/config.h
../../src/jsonlogger.c
../../src/jsonlogger.h
../../src/debug_structs.h
../../src/layer2/packet_structs.c
../../src/layer2/packet_structs.h
../../src/layer2/ham64.c
../../src/layer2/ham64.h
../../src/layer2/packet_queue.c
../../src/layer2/packet_queue.h
../../src/layer2/connection.c
../../src/layer2/connection.h
../../src/layer2/tundev.c
../../src/layer2/tundev.h
l2udptest_digipeater.c
)
target_link_libraries(
l2udptest_digipeater
fec
m
liquid
)

View file

@ -0,0 +1,382 @@
/*
* SPDX-License-Identifier: GPL-3.0-or-later
*
* Copyright (C) 2024 Thomas Kolb
*/
#include <linux/if.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <liquid/liquid.h>
#include <sys/poll.h>
#include <unistd.h>
#include <fcntl.h>
#include <poll.h>
#include <signal.h>
#include <errno.h>
#include "layer2/ham64.h"
#include "utils.h"
#define LOGGER_MODULE_NAME "main"
#include "logger.h"
#include "options.h"
#include "jsonlogger.h"
#include "debug_structs.h"
#include "layer2/connection.h"
#include "layer2/tundev.h"
#include "config.h"
#define RESULT_CHECK(stmt) { \
result_t res = stmt; \
if(res != OK) { \
LOG(LVL_FATAL, "Error %d in %s:%d!", res, __FILE__, __LINE__); \
exit(1); \
} \
}
#define BROADCAST_PORT 3737
static int m_tunfd = -1;
static bool m_running = true;
static int m_bcast_sock = -1;
static uint64_t next_tx_switch_time = 0;
static rx_stats_t m_rx_stats;
static connection_ctx_t l2conn;
static void signal_handler(int signal, siginfo_t *info, void *ctx)
{
(void)signal;
(void)info;
(void)ctx;
LOG(LVL_INFO, "\nGracefully shutting down on signal %d.", signal);
m_running = false;
}
static void block_tx_for(unsigned offset_ms)
{
next_tx_switch_time = get_hires_time() + HRTIME_MS(offset_ms);
}
void handle_received_packet(uint8_t *packet_data, size_t packet_len)
{
block_tx_for(TX_SWITCH_BACKOFF_END_OF_PACKET_MS);
result_t result = connection_handle_packet(&l2conn, packet_data, packet_len);
switch(result) {
case OK:
m_rx_stats.successful_decodes++;
break;
case ERR_INTEGRITY:
LOG(LVL_ERR, "Packet could not be decoded by Layer 2.");
m_rx_stats.failed_decodes++;
break;
case ERR_SEQUENCE:
LOG(LVL_ERR, "Packet not in the expected sequence.");
break;
default: // all other errors
LOG(LVL_ERR, "layer2_rx_handle_packet() returned error code %u.", result);
break;
}
}
static result_t transmit(const uint8_t *data, size_t len)
{
result_t result = OK;
struct sockaddr_in bcast_addr = {0};
bcast_addr.sin_family = AF_INET;
bcast_addr.sin_port = htons(BROADCAST_PORT);
bcast_addr.sin_addr.s_addr = INADDR_BROADCAST;
int ret = sendto(m_bcast_sock, data, len, 0, (struct sockaddr*)&bcast_addr, sizeof(bcast_addr));
if(ret < 0) {
LOG(LVL_ERR, "sendto: %s", strerror(errno));
exit(EXIT_FAILURE);
}
fprintf(stderr, "t");
return result;
}
void rx_data_to_tun(struct connection_ctx_s *conn, const uint8_t *data, size_t len, void *user_ctx)
{
(void)conn;
(void)user_ctx;
int ret = write(m_tunfd, data, len);
if(ret < 0) {
LOG(LVL_ERR, "write(tun): %s", strerror(errno));
}
}
void conn_evt_cb(struct connection_ctx_s *conn, connection_evt_t evt, void *user_ctx)
{
(void)conn;
(void)evt;
(void)user_ctx;
}
int main(int argc, char **argv)
{
// initialize the console logger
logger_init();
if(!jsonlogger_init("jsonlog.fifo")) {
LOG(LVL_FATAL, "Could not initialize JSON logger.");
return EXIT_FAILURE;
}
bool on_air = true;
srand(get_hires_time());
// ** Initialize **
char devname[IFNAMSIZ] = "hamnet70";
m_tunfd = tundev_open(devname);
if(m_tunfd < 0) {
return 1;
}
ham64_t my_address, peer_address;
ham64_encode(MY_CALL, &my_address);
ham64_encode(PEER_CALL, &peer_address);
RESULT_CHECK(connection_init(&l2conn, &my_address, &peer_address, rx_data_to_tun, conn_evt_cb, NULL));
// force connection into the established state
l2conn.conn_state = CONN_STATE_ESTABLISHED;
// ** Set up signal handling
struct sigaction term_action = {0};
term_action.sa_sigaction = signal_handler;
if(sigaction(SIGTERM, &term_action, NULL) < 0) {
LOG(LVL_ERR, "sigaction: %s", strerror(errno));
exit(EXIT_FAILURE);
}
if(sigaction(SIGINT, &term_action, NULL) < 0) {
LOG(LVL_ERR, "sigaction: %s", strerror(errno));
exit(EXIT_FAILURE);
}
// ** Set up UDP socket
m_bcast_sock = socket(AF_INET, SOCK_DGRAM, 0);
if(m_bcast_sock < 0) {
LOG(LVL_ERR, "socket: %s", strerror(errno));
exit(EXIT_FAILURE);
}
int broadcastEnable=1;
int ret = setsockopt(m_bcast_sock, SOL_SOCKET, SO_BROADCAST, &broadcastEnable, sizeof(broadcastEnable));
if(ret < 0) {
LOG(LVL_ERR, "setsockopt: %s", strerror(errno));
exit(EXIT_FAILURE);
}
struct sockaddr_in bind_addr = {0};
bind_addr.sin_family = AF_INET;
bind_addr.sin_port = htons(BROADCAST_PORT);
bind_addr.sin_addr.s_addr = INADDR_ANY;
ret = bind(m_bcast_sock, (struct sockaddr*)&bind_addr, sizeof(bind_addr));
if(ret < 0) {
LOG(LVL_ERR, "bind: %s", strerror(errno));
exit(EXIT_FAILURE);
}
// ** Process packets **
struct pollfd pfd_tun;
memset(&pfd_tun, 0, sizeof(pfd_tun));
pfd_tun.fd = m_tunfd;
pfd_tun.events = POLLIN;
struct pollfd pfd_bcast;
memset(&pfd_bcast, 0, sizeof(pfd_bcast));
pfd_bcast.fd = m_bcast_sock;
pfd_bcast.events = POLLIN;
uint64_t old = get_hires_time();
size_t total_bytes = 0;
uint64_t next_stats_print_time = old + HRTIME_MS(500);
// TODO: wait for beacon
// TODO: send connection request
while(m_running) {
uint64_t now = get_hires_time();
RESULT_CHECK(connection_maintain(&l2conn));
// fill the TX queue from the TUN device
while(connection_can_enqueue_packet(&l2conn)) {
int ret = poll(&pfd_tun, 1, 0 /* timeout */);
if(ret < 0) {
LOG(LVL_ERR, "poll: %s", strerror(errno));
break;
} else if(ret == 0) {
// no more packets
break;
} else {
// a packet is available -> move it to the queue
static const size_t packetbuf_size = 2048;
uint8_t packetbuf[packetbuf_size];
ret = read(m_tunfd, packetbuf, packetbuf_size);
if(ret < 0) {
LOG(LVL_ERR, "read: %s", strerror(errno));
return ERR_SYSCALL;
} else if(ret == 0) {
// no more data, should not happen
break;
}
LOG(LVL_DUMP, "TUN Flags: 0x%04x", *(uint16_t*)packetbuf);
LOG(LVL_DUMP, "TUN Proto: 0x%04x", *((uint16_t*)packetbuf + 1));
RESULT_CHECK(connection_enqueue_packet(&l2conn, packetbuf, ret));
}
}
if((now > next_tx_switch_time)) {
if(connection_can_transmit(&l2conn)) {
// there is a packet to be (re)transmitted.
LOG(LVL_DEBUG, "Starting new burst.");
size_t burst_len = 0;
// add packets to the burst until only 50000 samples remain free in the SDR buffer
while(true) {
uint8_t packet_buf[2048];
size_t packet_size;
bool end_burst;
packet_size = connection_encode_next_packet(&l2conn,
packet_buf, sizeof(packet_buf), &end_burst);
if(packet_size == 0) {
// no more packets available
LOG(LVL_DEBUG, "Ending burst due to empty packet queue.");
break;
}
LOG(LVL_DEBUG, "Adding packet with %d bytes to burst.", packet_size);
burst_len++;
RESULT_CHECK(transmit(packet_buf, packet_size));
if(end_burst) {
LOG(LVL_DEBUG, "Ending burst on request.");
break;
}
}
connection_tx_clean_empty_packet(&l2conn);
LOG(LVL_DEBUG, "Burst finished: %zd packets sent.", burst_len);
if(!on_air) {
LOG(LVL_INFO, "RX -> TX");
}
on_air = true;
} else if(on_air) { // TX on, but no more bursts to send
LOG(LVL_INFO, "TX -> RX");
on_air = false;
block_tx_for(TX_SWITCH_BACKOFF_AFTER_RX_ON);
}
}
if(!on_air) {
// ** Receive signal **
int ret = poll(&pfd_bcast, 1, 10);
if(ret < 0) {
LOG(LVL_ERR, "poll: %s", strerror(errno));
break;
}
if(ret == 0) {
continue;
}
uint8_t packetbuf[65536];
ret = recv(m_bcast_sock, packetbuf, sizeof(packetbuf), 0);
if(ret < 0) {
LOG(LVL_ERR, "recv: %s", strerror(errno));
}
if(ret <= 0) {
break;
}
handle_received_packet(packetbuf, ret);
total_bytes += ret;
uint64_t new = get_hires_time();
if(new >= next_stats_print_time) {
double rate = total_bytes * 1e9 / (new - old);
LOG(LVL_INFO, "\nEstimated rate: %.3f kB/s", rate / 1e3);
LOG(LVL_INFO, "Receiver statistics:");
LOG(LVL_INFO, " Preambles found: %8zd", m_rx_stats.preambles_found);
LOG(LVL_INFO, " Successful decodes: %8zd (%6.2f %%)",
m_rx_stats.successful_decodes, m_rx_stats.successful_decodes * 100.0f / m_rx_stats.preambles_found);
LOG(LVL_INFO, " Header errors: %8zd (%6.2f %%)",
m_rx_stats.header_errors, m_rx_stats.header_errors * 100.0f / m_rx_stats.preambles_found);
LOG(LVL_INFO, " Failed decodes: %8zd (%6.2f %%)",
m_rx_stats.failed_decodes, m_rx_stats.failed_decodes * 100.0f / m_rx_stats.preambles_found);
next_stats_print_time += HRTIME_MS(500);
total_bytes = 0;
old = new;
}
fprintf(stderr, "r");
}
}
// ** Cleanup **
close(m_bcast_sock);
connection_destroy(&l2conn);
jsonlogger_shutdown();
LOG(LVL_INFO, "Done.");
logger_shutdown();
}

View file

@ -0,0 +1,374 @@
/*
* SPDX-License-Identifier: GPL-3.0-or-later
*
* Copyright (C) 2024 Thomas Kolb
*/
#include <linux/if.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <liquid/liquid.h>
#include <sys/poll.h>
#include <unistd.h>
#include <fcntl.h>
#include <poll.h>
#include <signal.h>
#include <errno.h>
#include "layer2/ham64.h"
#include "utils.h"
#define LOGGER_MODULE_NAME "main"
#include "logger.h"
#include "options.h"
#include "jsonlogger.h"
#include "debug_structs.h"
#include "layer2/connection.h"
#include "layer2/connection_list.h"
#include "layer2/tundev.h"
#include "config.h"
#define RESULT_CHECK(stmt) { \
result_t res = stmt; \
if(res != OK) { \
LOG(LVL_FATAL, "Error %d in %s:%d!", res, __FILE__, __LINE__); \
exit(1); \
} \
}
#define BROADCAST_PORT 3737
static int m_tunfd = -1;
static bool m_running = true;
static int m_bcast_sock = -1;
static uint64_t next_tx_switch_time = 0;
static rx_stats_t m_rx_stats;
static connection_ctx_t l2conn;
static void signal_handler(int signal, siginfo_t *info, void *ctx)
{
(void)signal;
(void)info;
(void)ctx;
LOG(LVL_INFO, "\nGracefully shutting down on signal %d.", signal);
m_running = false;
}
static void block_tx_for(unsigned offset_ms)
{
next_tx_switch_time = get_hires_time() + HRTIME_MS(offset_ms);
}
void handle_received_packet(uint8_t *packet_data, size_t packet_len)
{
block_tx_for(TX_SWITCH_BACKOFF_END_OF_PACKET_MS);
result_t result = connection_handle_packet(&l2conn, packet_data, packet_len);
switch(result) {
case OK:
m_rx_stats.successful_decodes++;
break;
case ERR_INTEGRITY:
LOG(LVL_ERR, "Packet could not be decoded by Layer 2.");
m_rx_stats.failed_decodes++;
break;
case ERR_SEQUENCE:
LOG(LVL_ERR, "Packet not in the expected sequence.");
break;
default: // all other errors
LOG(LVL_ERR, "layer2_rx_handle_packet() returned error code %u.", result);
break;
}
}
static result_t transmit(const uint8_t *data, size_t len)
{
result_t result = OK;
struct sockaddr_in bcast_addr = {0};
bcast_addr.sin_family = AF_INET;
bcast_addr.sin_port = htons(BROADCAST_PORT);
bcast_addr.sin_addr.s_addr = INADDR_BROADCAST;
int ret = sendto(m_bcast_sock, data, len, 0, (struct sockaddr*)&bcast_addr, sizeof(bcast_addr));
if(ret < 0) {
LOG(LVL_ERR, "sendto: %s", strerror(errno));
exit(EXIT_FAILURE);
}
fprintf(stderr, "t");
return result;
}
void rx_data_to_tun(struct connection_ctx_s *conn, const uint8_t *data, size_t len)
{
(void)conn;
int ret = write(m_tunfd, data, len);
if(ret < 0) {
LOG(LVL_ERR, "write(tun): %s", strerror(errno));
}
}
int main(int argc, char **argv)
{
// initialize the console logger
logger_init();
if(!jsonlogger_init("jsonlog.fifo")) {
LOG(LVL_FATAL, "Could not initialize JSON logger.");
return EXIT_FAILURE;
}
bool on_air = true;
srand(get_hires_time());
// ** Initialize **
char devname[IFNAMSIZ] = "hamnet70";
m_tunfd = tundev_open(devname);
if(m_tunfd < 0) {
return 1;
}
connection_list_t conn_list;
RESULT_CHECK(connection_list_init(&conn_list));
ham64_t my_address, peer_address;
ham64_encode(MY_CALL, &my_address);
ham64_encode(PEER_CALL, &peer_address);
RESULT_CHECK(connection_init(&l2conn, &my_address, &peer_address, rx_data_to_tun));
// force connection into the established state
l2conn.conn_state = CONN_STATE_ESTABLISHED;
// ** Set up signal handling
struct sigaction term_action = {0};
term_action.sa_sigaction = signal_handler;
if(sigaction(SIGTERM, &term_action, NULL) < 0) {
LOG(LVL_ERR, "sigaction: %s", strerror(errno));
exit(EXIT_FAILURE);
}
if(sigaction(SIGINT, &term_action, NULL) < 0) {
LOG(LVL_ERR, "sigaction: %s", strerror(errno));
exit(EXIT_FAILURE);
}
// ** Set up UDP socket
m_bcast_sock = socket(AF_INET, SOCK_DGRAM, 0);
if(m_bcast_sock < 0) {
LOG(LVL_ERR, "socket: %s", strerror(errno));
exit(EXIT_FAILURE);
}
int broadcastEnable=1;
int ret = setsockopt(m_bcast_sock, SOL_SOCKET, SO_BROADCAST, &broadcastEnable, sizeof(broadcastEnable));
if(ret < 0) {
LOG(LVL_ERR, "setsockopt: %s", strerror(errno));
exit(EXIT_FAILURE);
}
struct sockaddr_in bind_addr = {0};
bind_addr.sin_family = AF_INET;
bind_addr.sin_port = htons(BROADCAST_PORT);
bind_addr.sin_addr.s_addr = INADDR_ANY;
ret = bind(m_bcast_sock, (struct sockaddr*)&bind_addr, sizeof(bind_addr));
if(ret < 0) {
LOG(LVL_ERR, "bind: %s", strerror(errno));
exit(EXIT_FAILURE);
}
// ** Process packets **
struct pollfd pfd_tun;
memset(&pfd_tun, 0, sizeof(pfd_tun));
pfd_tun.fd = m_tunfd;
pfd_tun.events = POLLIN;
struct pollfd pfd_bcast;
memset(&pfd_bcast, 0, sizeof(pfd_bcast));
pfd_bcast.fd = m_bcast_sock;
pfd_bcast.events = POLLIN;
uint64_t old = get_hires_time();
size_t total_bytes = 0;
uint64_t next_stats_print_time = old + HRTIME_MS(500);
uint64_t next_beacon_time = old + HRTIME_MS(5000);
while(m_running) {
uint64_t now = get_hires_time();
if(now >= next_beacon_time) {
// TODO: encode and transmit beacon
next_beacon_time += HRTIME_MS(5000);
}
// FIXME: fill the TX queues from the TUN device
while(connection_can_enqueue_packet(&l2conn)) {
int ret = poll(&pfd_tun, 1, 0 /* timeout */);
if(ret < 0) {
LOG(LVL_ERR, "poll: %s", strerror(errno));
break;
} else if(ret == 0) {
// no more packets
break;
} else {
// a packet is available -> move it to the queue
static const size_t packetbuf_size = 2048;
uint8_t packetbuf[packetbuf_size];
ret = read(m_tunfd, packetbuf, packetbuf_size);
if(ret < 0) {
LOG(LVL_ERR, "read: %s", strerror(errno));
return ERR_SYSCALL;
} else if(ret == 0) {
// no more data, should not happen
break;
}
RESULT_CHECK(connection_enqueue_packet(&l2conn, packetbuf, ret));
}
}
if((now > next_tx_switch_time)) {
if(connection_can_transmit(&l2conn)) {
// there is a packet to be (re)transmitted.
LOG(LVL_DEBUG, "Starting new burst.");
size_t burst_len = 0;
// add packets to the burst until only 50000 samples remain free in the SDR buffer
while(true) {
uint8_t packet_buf[2048];
size_t packet_size;
packet_size = connection_encode_next_packet(&l2conn,
connection_get_next_expected_seq(&l2conn),
packet_buf, sizeof(packet_buf));
if(packet_size == 0) {
// no more packets available
LOG(LVL_DEBUG, "Ending burst due to empty packet queue.");
break;
}
LOG(LVL_DEBUG, "Adding packet with %d bytes to burst.", packet_size);
burst_len++;
RESULT_CHECK(transmit(packet_buf, packet_size));
}
connection_tx_clean_empty_packet(&l2conn);
LOG(LVL_DEBUG, "Burst finished: %zd packets sent.", burst_len);
if(!on_air) {
LOG(LVL_INFO, "RX -> TX");
}
on_air = true;
} else if(on_air) { // TX on, but no more bursts to send
LOG(LVL_INFO, "TX -> RX");
on_air = false;
retransmit_time = get_hires_time() + HRTIME_SEC(1) + HRTIME_SEC(1.0 * rand() / RAND_MAX);
block_tx_for(TX_SWITCH_BACKOFF_AFTER_RX_ON);
}
}
if(!on_air) {
// ** Receive signal **
int ret = poll(&pfd_bcast, 1, 10);
if(ret < 0) {
LOG(LVL_ERR, "poll: %s", strerror(errno));
break;
}
if(ret == 0) {
continue;
}
uint8_t packetbuf[65536];
ret = recv(m_bcast_sock, packetbuf, sizeof(packetbuf), 0);
if(ret < 0) {
LOG(LVL_ERR, "recv: %s", strerror(errno));
}
if(ret <= 0) {
break;
}
handle_received_packet(packetbuf, ret);
total_bytes += ret;
uint64_t new = get_hires_time();
if(new >= next_stats_print_time) {
double rate = total_bytes * 1e9 / (new - old);
LOG(LVL_INFO, "\nEstimated rate: %.3f kB/s", rate / 1e3);
LOG(LVL_INFO, "Receiver statistics:");
LOG(LVL_INFO, " Preambles found: %8zd", m_rx_stats.preambles_found);
LOG(LVL_INFO, " Successful decodes: %8zd (%6.2f %%)",
m_rx_stats.successful_decodes, m_rx_stats.successful_decodes * 100.0f / m_rx_stats.preambles_found);
LOG(LVL_INFO, " Header errors: %8zd (%6.2f %%)",
m_rx_stats.header_errors, m_rx_stats.header_errors * 100.0f / m_rx_stats.preambles_found);
LOG(LVL_INFO, " Failed decodes: %8zd (%6.2f %%)",
m_rx_stats.failed_decodes, m_rx_stats.failed_decodes * 100.0f / m_rx_stats.preambles_found);
next_stats_print_time += HRTIME_MS(500);
total_bytes = 0;
old = new;
}
fprintf(stderr, "r");
}
}
// ** Cleanup **
close(m_bcast_sock);
connection_destroy(&l2conn);
jsonlogger_shutdown();
LOG(LVL_INFO, "Done.");
logger_shutdown();
}