Compare commits

..

34 commits

Author SHA1 Message Date
Thomas Kolb 04399c0022 Fix byte order in TUN packet header
All checks were successful
/ build-hamnet70 (push) Successful in 39s
/ build-doc (push) Successful in 31s
/ deploy-doc (push) Has been skipped
2024-12-18 21:01:14 +01:00
Thomas Kolb b2df27bb92 digipeater: avoid writing to full packet queues 2024-12-18 21:00:46 +01:00
Thomas Kolb d77f4d4498 Multiple fixes in packet handling
All checks were successful
/ build-hamnet70 (push) Successful in 33s
/ build-doc (push) Successful in 19s
/ deploy-doc (push) Has been skipped
- add and handle layer 2 packet type correctly in data packets
- don't produce garbage packets if a packet could not be decoded or was not a
  data packet
- handle beacon/connection request/connection parameters handshake
- digipeater cycle timeout does not reset beacon timer anymore. This prevented
  any beacon transmission.
- Reset the connection timeout when empty packets are received
2024-12-15 22:29:47 +01:00
Thomas Kolb f85bb7f83e Update config.h.template to fix CI builds
All checks were successful
/ build-hamnet70 (push) Successful in 32s
/ build-doc (push) Successful in 17s
/ deploy-doc (push) Has been skipped
2024-12-12 19:56:48 +01:00
Thomas Kolb 559283d87f connection: process beacons and send connection requests
Some checks failed
/ build-hamnet70 (push) Failing after 17s
/ build-doc (push) Successful in 17s
/ deploy-doc (push) Has been skipped
2024-12-12 19:44:07 +01:00
Thomas Kolb fefacc69c3 Fixed all warnings 2024-12-12 19:44:07 +01:00
Thomas Kolb a0ccf9699a layer2: Remove event callbacks
Instead, connection and digipeater now update a event code passed by reference
to the maintain() function. This was the only place where the callback was
called before.

Minor side effect: maintain must be called multiple times if multiple events
trigger at the same time.
2024-12-12 19:44:07 +01:00
Thomas Kolb 49dacffde1 digipeater: remove empty packet from current connection after burst 2024-12-12 19:44:07 +01:00
Thomas Kolb f376bd7db3 Remove data callbacks in connection and digipeater modules 2024-12-12 19:44:07 +01:00
Thomas Kolb f895adf877 digipeater: rename interval to cycle
Interval suggests a regular timing structure which is not intended in this
construct and therefore is misleading.
2024-12-12 19:44:07 +01:00
Thomas Kolb 1c8e46f54a digipeater: improve beacon handling 2024-12-12 19:44:07 +01:00
Thomas Kolb 580e4005ed digipeater: enqueue packets in the correct connection queue
So far, only IPv6 is supported.
2024-12-12 19:44:07 +01:00
Thomas Kolb e4961bf519 WIP: handle packets from the TUN device in digipeater
This is only a backup commit.
2024-12-12 19:44:07 +01:00
Thomas Kolb 816d753cfb tundev: remove IFF_NO_PI
Without this flag the kernel adds packet information to each packet. The
interesting part of that information is the EtherType of the packet, which
simplifies handling.
2024-12-12 19:44:07 +01:00
Thomas Kolb 774f5c0420 l2udptest_client: fix compilation 2024-12-12 19:44:07 +01:00
Thomas Kolb f51b1da9d5 connection: calculate IPv6 address for peer 2024-12-12 19:44:07 +01:00
Thomas Kolb 339dc5490c Add missing call to digipeater_end_interval() 2024-12-12 19:44:07 +01:00
Thomas Kolb f3b6ab1a29 digipeater: add one-shot packet queue and interval handling
The oneshot queue is for connection management frames that are only sent once,
at the beginning of the next burst. An example is the Connection Reset packet.

Intervals define the boundary between the handling of different connections.
The interval can be either ended by a packet with TX Request set, or by a
timeout. In either case, forwarding of packets to the current connection stops
and the connection is re-scheduled to a later point in time.
2024-12-12 19:44:07 +01:00
Thomas Kolb 01edbb1db1 WIP: managing multiple connections
Working towards handling multiple connections. A lot is still missing.
2024-12-12 19:44:07 +01:00
Thomas Kolb 6e303e8aed WIP: digipeater module managing multiple connections
Backup commit. This is a basic setup with lots of TODOs and errors. Will not
compile yet.
2024-12-12 19:44:07 +01:00
Thomas Kolb 8572018c4e l2/connection: refactoring
- split handle_packet() in two functions: the first does basic checks and calls
  the second one. This allows to do the basic checks externally (in the
  multi-connection management module) without duplicating them.
- use the new layer2_encode_packet() function.
2024-12-12 19:44:07 +01:00
Thomas Kolb 16f7ae5242 l2/packet_structs: add function to encode a complete packet 2024-12-12 19:44:07 +01:00
Thomas Kolb 97a23701d0 Handle retransmits inside the connection module 2024-12-12 19:44:07 +01:00
Thomas Kolb ec3244a61f Add a module to manage a list of connections 2024-12-12 19:44:07 +01:00
Thomas Kolb a0623668a7 Do all time calculations in uint64_t
This prevents loss of precision that occurs with double-precision floats if
timestamps become very large. Timestamps are already large if they contain a
UNIX time value (requires 60 bits; double has 53 bit resolution).
2024-12-12 19:44:07 +01:00
Thomas Kolb ec9e893c73 l2udptest: replace by two programs: client and digipeater
Both are identical so far, this is just an infrastructure commit.
2024-12-12 19:44:07 +01:00
Thomas Kolb 227837623c Remove empty packet from queue after burst was transmitted 2024-12-12 19:44:07 +01:00
Thomas Kolb 02a2afbc5b connection: do not send ACKs for empty packets 2024-12-12 19:44:07 +01:00
Thomas Kolb 307fdc657d connection: handle ACKs from empty packets correctly 2024-12-12 19:44:07 +01:00
Thomas Kolb 547a39508b Handle received packets 2024-12-12 19:44:07 +01:00
Thomas Kolb be607acbc2 connection: use correct addresses for empty packets 2024-12-12 19:44:07 +01:00
Thomas Kolb 729d61feb7 l2udp: Use the new layer2 connection module 2024-12-12 19:44:07 +01:00
Thomas Kolb 3189aac2ab Pass received packets to layer 2 processing 2024-12-12 19:44:07 +01:00
Thomas Kolb 3c7caeda1b Basic infrastructure for layer2-over-udp test 2024-12-12 19:44:07 +01:00
20 changed files with 2365 additions and 130 deletions

View file

@ -13,7 +13,7 @@ HOSTID="$1"
DEV=hamnet70
sudo ip tuntap add dev $DEV mode tun user $(whoami)
sudo ip tuntap add dev $DEV mode tun pi user $(whoami)
sudo ip link set dev $DEV txqueuelen 16 # default is 500
sudo ip link set dev $DEV up
sudo ip address add dev $DEV fd73::$HOSTID/64

View file

@ -12,6 +12,16 @@
/*** LAYER 2 CONFIG ***/
#define MY_CALL undefined // define MY_CALL to your call sign as a C string, e.g. "DL5TKL"
#define PEER_CALL "TESTPEER" // define PEER_CALL to the remote stations call sign as a C string, e.g. "DL5TKL-1"
#define RETRANSMIT_TIMEOUT_MS 1000
#define CONNECTION_TIMEOUT_MS 10000
#define BEACON_INTERVAL_MS 5000
#define MIN_INTERVAL_TIME_MS 10
#define IPV6_NET "fd70::" // the IPv6 "base address". The HAM64 address will be encoded into the lower 64 bits.
/*** TIMING CONFIG ***/

View file

@ -1,15 +1,24 @@
#include "layer2/packet_structs.h"
#include <string.h>
#include <assert.h>
#define LOGGER_MODULE_NAME "conn"
#include "logger.h"
#include "connection.h"
#include "config.h"
#include "layer2/ham64.h"
#include "layer2/packet_queue.h"
#include "results.h"
#include "utils.h"
#define SEQ_NR_MASK 0xF
result_t connection_init(connection_ctx_t *ctx, const ham64_t *my_addr, const ham64_t *peer_addr)
result_t connection_init(
connection_ctx_t *ctx,
const ham64_t *my_addr,
const ham64_t *peer_addr)
{
ctx->last_acked_seq = 0;
ctx->next_expected_seq = 0;
@ -21,6 +30,31 @@ result_t connection_init(connection_ctx_t *ctx, const ham64_t *my_addr, const ha
ctx->my_addr = *my_addr;
ctx->peer_addr = *peer_addr;
uint64_t now = get_hires_time();
ctx->last_rx_time = now;
ctx->retransmit_time = 0;
// calculate IPv6 address
struct in6_addr net_addr;
inet_pton(AF_INET6, IPV6_NET, &net_addr);
memset(ctx->peer_ipv6_addr.s6_addr, 0, sizeof(ctx->peer_ipv6_addr));
memcpy(ctx->peer_ipv6_addr.s6_addr, net_addr.s6_addr, 8); // copy the network part
// fill the host part from the peers ham64 address. The bytes are filled in
// reverse order to make the „readable“ IPv6 address as short as possible.
for(uint8_t i = 0; i < peer_addr->length; i++) {
ctx->peer_ipv6_addr.s6_addr[15 - 2*i] = (peer_addr->addr[i] >> 8) & 0xFF;
ctx->peer_ipv6_addr.s6_addr[14 - 2*i] = (peer_addr->addr[i] >> 0) & 0xFF;
}
// print the address for debugging
char ipv6_str[INET6_ADDRSTRLEN];
char ham64_str[HAM64_FMT_MAX_LEN];
ham64_format(peer_addr, ham64_str);
inet_ntop(AF_INET6, &ctx->peer_ipv6_addr, ipv6_str, sizeof(ipv6_str));
LOG(LVL_DEBUG, "IPv6 address assigned to %s is %s.", ham64_str, ipv6_str);
ctx->conn_state = CONN_STATE_INITIALIZED;
return OK;
@ -38,7 +72,19 @@ void connection_destroy(connection_ctx_t *ctx)
}
result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, size_t buf_len)
void connection_setup_outgoing(connection_ctx_t *ctx)
{
ctx->conn_state = CONN_STATE_CONNECTING;
}
void connection_setup_incoming(connection_ctx_t *ctx)
{
ctx->conn_state = CONN_STATE_ESTABLISHED;
}
result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, size_t buf_len, layer2_data_packet_t *data_packet)
{
// check the connection state
switch(ctx->conn_state) {
@ -54,6 +100,9 @@ result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, siz
break;
}
data_packet->payload_type = L2_PAYLOAD_TYPE_INVALID;
data_packet->payload_len = 0;
// check the CRC
size_t packet_size = buf_len - crc_sizeof_key(PAYLOAD_CRC_SCHEME);
@ -71,20 +120,13 @@ result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, siz
}
// check if the packet really should be handled by us
if(!ham64_is_equal(&header.src_addr, &ctx->peer_addr)) {
char fmt_src_addr[HAM64_FMT_MAX_LEN];
char fmt_peer_addr[HAM64_FMT_MAX_LEN];
ham64_format(&header.src_addr, fmt_src_addr);
ham64_format(&ctx->peer_addr, fmt_peer_addr);
LOG(LVL_ERR, "Packet has the wrong source address: got %s, expected %s",
fmt_src_addr, fmt_peer_addr);
return ERR_INVALID_ADDRESS;
if(ham64_is_equal(&ctx->my_addr, &header.src_addr)) {
LOG(LVL_DEBUG, "Packet is from ourselves. Ignored.");
return OK;
}
if(!ham64_is_equal(&header.dst_addr, &ctx->my_addr)) {
if(ham64_get_addr_type(&header.dst_addr) != HAM64_ADDR_TYPE_BROADCAST
&& !ham64_is_equal(&header.dst_addr, &ctx->my_addr)) {
char fmt_dst_addr[HAM64_FMT_MAX_LEN];
char fmt_my_addr[HAM64_FMT_MAX_LEN];
@ -97,50 +139,156 @@ result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, siz
return ERR_INVALID_ADDRESS;
}
size_t header_size = layer2_get_encoded_header_size(&header);
const uint8_t *payload = buf + header_size;
size_t payload_len = packet_size - header_size;
return connection_handle_packet_prechecked(ctx, &header, payload, payload_len, data_packet);
}
static result_t handle_conn_mgmt(
connection_ctx_t *ctx,
const layer2_packet_header_t *header,
const uint8_t *payload, size_t payload_len)
{
(void)header;
if(payload_len < 1) {
LOG(LVL_ERR, "Connection management packet without any payload is invalid.");
return ERR_INVALID_PARAM;
}
uint8_t packet_type = payload[0];
switch(packet_type) {
case CONN_MGMT_TYPE_BEACON:
if(ctx->conn_state == CONN_STATE_CONNECTING) {
LOG(LVL_INFO, "Received beacon; queueing connection request.");
// enqueue a connection request packet
layer2_packet_header_t conn_request_header;
conn_request_header.tx_request = true;
conn_request_header.dst_addr = ctx->peer_addr;
conn_request_header.src_addr = ctx->my_addr;
conn_request_header.msg_type = L2_MSG_TYPE_CONN_MGMT;
conn_request_header.rx_seq_nr = 0;
conn_request_header.tx_seq_nr = 0;
// create a persistent copy of the packet data.
uint8_t packetbuf[1];
packetbuf[0] = CONN_MGMT_TYPE_CONNECTION_REQUEST;
connection_enqueue_packet(ctx, &conn_request_header, packetbuf, 1);
} else {
LOG(LVL_WARN, "Beacons are ignored in states other than CONNECTING.");
return ERR_INVALID_STATE;
}
break;
case CONN_MGMT_TYPE_CONNECTION_PARAMETERS:
LOG(LVL_INFO, "Connection parameters received! -> connection established");
ctx->next_expected_seq = 1; // connection parameters are packet 0
ctx->conn_state = CONN_STATE_ESTABLISHED;
break;
default:
LOG(LVL_WARN, "Ignored connection management type %d", packet_type);
break;
}
return OK;
}
result_t connection_handle_packet_prechecked(
connection_ctx_t *ctx,
const layer2_packet_header_t *header,
const uint8_t *payload, size_t payload_len,
layer2_data_packet_t *data_packet)
{
data_packet->payload_type = L2_PAYLOAD_TYPE_INVALID;
data_packet->payload_len = 0;
// check the connection state
switch(ctx->conn_state) {
case CONN_STATE_UNINITIALIZED:
case CONN_STATE_INITIALIZED:
case CONN_STATE_CLOSED:
LOG(LVL_ERR, "Trying to pass packet to connection in state %u", ctx->conn_state);
return ERR_INVALID_STATE;
case CONN_STATE_CONNECTING:
case CONN_STATE_ESTABLISHED:
// in these states, packets can be handled
break;
}
// check if this packet is from our designated peer
if(!ham64_is_equal(&header->src_addr, &ctx->peer_addr)) {
char fmt_src_addr[HAM64_FMT_MAX_LEN];
char fmt_peer_addr[HAM64_FMT_MAX_LEN];
ham64_format(&header->src_addr, fmt_src_addr);
ham64_format(&ctx->peer_addr, fmt_peer_addr);
LOG(LVL_ERR, "Packet has the wrong source address: got %s, expected %s",
fmt_src_addr, fmt_peer_addr);
return ERR_INVALID_ADDRESS;
}
LOG(LVL_DEBUG, "Handling %s packet with rx_seq_nr %u, tx_seq_nr %u.",
layer2_msg_type_to_string(header.msg_type), header.rx_seq_nr, header.tx_seq_nr);
layer2_msg_type_to_string(header->msg_type), header->rx_seq_nr, header->tx_seq_nr);
ctx->last_acked_seq = header.rx_seq_nr;
ctx->last_acked_seq = header->rx_seq_nr;
switch(header.msg_type) {
switch(header->msg_type) {
case L2_MSG_TYPE_EMPTY:
LOG(LVL_DEBUG, "Empty packet: accepted ACK for %u.", ctx->last_acked_seq);
return OK; // do not ACK and call back
// empty packets also reset the timeout timer
ctx->last_rx_time = get_hires_time();
// handle the acknowledgement internally
connection_handle_ack(ctx, header->rx_seq_nr, false);
return OK; // do not ACK
case L2_MSG_TYPE_CONN_MGMT:
return handle_conn_mgmt(ctx, header, payload, payload_len);
case L2_MSG_TYPE_CONNECTIONLESS:
LOG(LVL_WARN, "Message type %s is not implemented yet.", layer2_msg_type_to_string(header.msg_type));
LOG(LVL_WARN, "Message type %s is not implemented yet.", layer2_msg_type_to_string(header->msg_type));
return OK;
case L2_MSG_TYPE_DATA:
break;
default:
LOG(LVL_ERR, "Invalid message type %d.", header.msg_type);
LOG(LVL_ERR, "Invalid message type %d.", header->msg_type);
return ERR_INVALID_STATE;
}
if(ctx->next_expected_seq != header.tx_seq_nr) {
LOG(LVL_ERR, "Expected sequence number %u, received %u.", ctx->next_expected_seq, header.tx_seq_nr);
if(ctx->next_expected_seq != header->tx_seq_nr) {
LOG(LVL_ERR, "Expected sequence number %u, received %u.", ctx->next_expected_seq, header->tx_seq_nr);
return ERR_SEQUENCE;
}
ctx->next_expected_seq++;
ctx->next_expected_seq &= 0xF;
LOG(LVL_INFO, "Received ACK for seq_nr %u in packet seq_nr %u.", header.rx_seq_nr, header.tx_seq_nr);
ctx->last_rx_time = get_hires_time();
LOG(LVL_INFO, "Received ACK for seq_nr %u in packet seq_nr %u.", header->rx_seq_nr, header->tx_seq_nr);
// handle the acknowledgement internally
connection_handle_ack(ctx, header.rx_seq_nr);
connection_handle_ack(ctx, header->rx_seq_nr, true);
size_t header_size = layer2_get_encoded_header_size(&header);
// extract the payload and forward it to the tun device
const uint8_t *payload = buf + header_size;
size_t payload_len = packet_size - header_size;
ctx->data_cb(ctx, payload, payload_len);
// pass the decoded data back to the user
data_packet->payload_type = payload[0];
data_packet->payload = payload + 1;
data_packet->payload_len = payload_len - 1;
return OK;
}
@ -158,7 +306,43 @@ uint8_t connection_get_last_acked_seq(const connection_ctx_t *ctx)
}
result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len)
result_t connection_enqueue_packet(
connection_ctx_t *ctx,
const layer2_packet_header_t *header,
uint8_t *payload, size_t payload_len)
{
if(packet_queue_get_free_space(&ctx->packet_queue) == 0) {
return ERR_NO_MEM;
}
uint8_t *packetbuf = NULL;
if(payload) {
// create a persistent copy of the packet data.
// TODO: possibly this copy operation can be removed by passing a malloc'd buffer in.
packetbuf = malloc(payload_len);
if(!packetbuf) {
LOG(LVL_ERR, "malloc failed.");
return ERR_NO_MEM;
}
}
memcpy(packetbuf, payload, payload_len);
packet_queue_add(&ctx->packet_queue, header, packetbuf, payload_len);
LOG(LVL_INFO, "Added packet tx_seq %u to queue -> %zu entries",
header->tx_seq_nr, packet_queue_get_used_space(&ctx->packet_queue));
return OK;
}
result_t connection_enqueue_data_packet(
connection_ctx_t *ctx,
layer2_payload_type_t payload_type,
uint8_t *buf,
size_t buf_len)
{
// check the connection state
switch(ctx->conn_state) {
@ -176,10 +360,6 @@ result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t b
layer2_packet_header_t header;
if(packet_queue_get_free_space(&ctx->packet_queue) == 0) {
return ERR_NO_MEM;
}
header.dst_addr = ctx->peer_addr;
header.src_addr = ctx->my_addr;
header.msg_type = L2_MSG_TYPE_DATA;
@ -187,20 +367,12 @@ result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t b
header.tx_request = 0;
header.tx_seq_nr = ctx->next_seq_nr;
// create a persistent copy of the packet data.
// TODO: possibly this copy operation can be removed by passing a malloc'd buffer in.
uint8_t *packetbuf = malloc(buf_len);
if(!packetbuf) {
LOG(LVL_ERR, "malloc failed.");
return ERR_NO_MEM;
}
uint8_t packet_with_type[buf_len + 1];
memcpy(packetbuf, buf, buf_len);
packet_with_type[0] = (uint8_t)payload_type;
memcpy(packet_with_type+1, buf, buf_len);
packet_queue_add(&ctx->packet_queue, &header, packetbuf, buf_len);
LOG(LVL_INFO, "Added packet tx_seq %u to queue -> %zu entries",
header.tx_seq_nr, packet_queue_get_used_space(&ctx->packet_queue));
ERR_CHECK(connection_enqueue_packet(ctx, &header, packet_with_type, sizeof(packet_with_type)));
ctx->next_seq_nr++;
ctx->next_seq_nr &= SEQ_NR_MASK;
@ -209,6 +381,12 @@ result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t b
}
bool connection_can_enqueue_packet(const connection_ctx_t *ctx)
{
return packet_queue_get_free_space(&ctx->packet_queue) > 0;
}
result_t connection_add_empty_packet(connection_ctx_t *ctx, bool tx_request)
{
// check the connection state
@ -227,10 +405,8 @@ result_t connection_add_empty_packet(connection_ctx_t *ctx, bool tx_request)
layer2_packet_header_t header;
header.dst_addr.addr[0] = 0xFFFF;
header.dst_addr.length = 1;
header.src_addr.addr[0] = 0x0001;
header.src_addr.length = 1;
header.dst_addr = ctx->peer_addr;
header.src_addr = ctx->my_addr;
header.msg_type = L2_MSG_TYPE_EMPTY;
header.rx_seq_nr = 0; // will be filled in layer2_tx_encode_next_packet()
header.tx_seq_nr = 0; // not used in empty packets
@ -244,7 +420,51 @@ result_t connection_add_empty_packet(connection_ctx_t *ctx, bool tx_request)
}
size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t ack_seq_nr, uint8_t *buf, size_t buf_len)
result_t connection_send_parameters(connection_ctx_t *ctx)
{
// check the connection state
switch(ctx->conn_state) {
case CONN_STATE_UNINITIALIZED:
case CONN_STATE_CLOSED:
case CONN_STATE_CONNECTING:
case CONN_STATE_ESTABLISHED:
LOG(LVL_ERR, "Trying to send connection parameters in state %u", ctx->conn_state);
return ERR_INVALID_STATE;
case CONN_STATE_INITIALIZED:
// in these states, packets can be handled
break;
}
layer2_packet_header_t header;
header.dst_addr = ctx->peer_addr;
header.src_addr = ctx->my_addr;
header.msg_type = L2_MSG_TYPE_CONN_MGMT;
header.rx_seq_nr = 0; // will be filled in layer2_tx_encode_next_packet()
header.tx_seq_nr = ctx->next_seq_nr;
header.tx_request = 1;
size_t payload_len = 1;
uint8_t payload[payload_len];
payload[0] = CONN_MGMT_TYPE_CONNECTION_PARAMETERS;
// TODO: calculate IP addresses from clients HAM64 address
ERR_CHECK(connection_enqueue_packet(ctx, &header, payload, payload_len));
ctx->next_seq_nr++;
ctx->next_seq_nr &= SEQ_NR_MASK;
// connection is considered established after the connection parameters are sent.
ctx->conn_state = CONN_STATE_ESTABLISHED;
return OK;
}
size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len, bool *end_burst)
{
// check the connection state
switch(ctx->conn_state) {
@ -252,7 +472,7 @@ size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t ack_seq_nr,
case CONN_STATE_INITIALIZED:
case CONN_STATE_CLOSED:
LOG(LVL_ERR, "Trying to encode packet in inactive state %u", ctx->conn_state);
return ERR_INVALID_STATE;
return 0;
case CONN_STATE_CONNECTING:
case CONN_STATE_ESTABLISHED:
@ -267,32 +487,21 @@ size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t ack_seq_nr,
return 0;
}
unsigned int crc_size = crc_sizeof_key(PAYLOAD_CRC_SCHEME);
assert(buf_len >= LAYER2_PACKET_HEADER_ENCODED_SIZE_MAX + crc_size + entry->data_len);
layer2_packet_header_t header = entry->header;
header.rx_seq_nr = ack_seq_nr;
header.rx_seq_nr = ctx->next_expected_seq;
// encode the header
LOG(LVL_DEBUG, "Encoding packet with rx_seq_nr %u, tx_seq_nr %u.", header.rx_seq_nr, header.tx_seq_nr);
size_t packet_size = layer2_encode_packet_header(&header, buf);
// add the payload data
if(entry->data) {
memcpy(buf + packet_size, entry->data, entry->data_len);
size_t packet_size = layer2_encode_packet(&header, entry->data, entry->data_len, buf, buf_len);
if(packet_size == 0) {
LOG(LVL_ERR, "Buffer too small for encoded packet!");
return 0;
}
packet_size += entry->data_len;
// calculate CRC of everything and append it to the packet
crc_append_key(PAYLOAD_CRC_SCHEME, buf, packet_size);
packet_size += crc_size;
ctx->next_packet_index++;
*end_burst = header.tx_request;
return packet_size;
}
@ -318,7 +527,7 @@ void connection_tx_clean_empty_packet(connection_ctx_t *ctx)
}
void connection_handle_ack(connection_ctx_t *ctx, uint8_t acked_seq)
void connection_handle_ack(connection_ctx_t *ctx, uint8_t acked_seq, bool do_ack)
{
// check the connection state
switch(ctx->conn_state) {
@ -334,8 +543,6 @@ void connection_handle_ack(connection_ctx_t *ctx, uint8_t acked_seq)
break;
}
ctx->next_packet_index = 0;
size_t packets_to_remove = 0;
size_t packets_available = packet_queue_get_used_space(&ctx->packet_queue);
@ -349,13 +556,26 @@ void connection_handle_ack(connection_ctx_t *ctx, uint8_t acked_seq)
packets_to_remove++;
}
packet_queue_delete(&ctx->packet_queue, packets_to_remove);
if(packets_to_remove != 0) {
packet_queue_delete(&ctx->packet_queue, packets_to_remove);
packets_available = packet_queue_get_used_space(&ctx->packet_queue);
// send the next requested packet (all previous ones were deleted above).
if(ctx->next_packet_index >= packets_to_remove) {
ctx->next_packet_index -= packets_to_remove;
} else {
ctx->next_packet_index = 0;
}
LOG(LVL_DEBUG, "handling ack for seq_nr %u, removing %zu packets, %zu packets remaining.", acked_seq, packets_to_remove, packets_available);
packets_available = packet_queue_get_used_space(&ctx->packet_queue);
if(packets_available == 0) {
LOG(LVL_DEBUG, "handling ack for seq_nr %u, removed %zu packets, %zu packets remaining.", acked_seq, packets_to_remove, packets_available);
ctx->retransmit_time = get_hires_time() + HRTIME_MS(RETRANSMIT_TIMEOUT_MS);
} else {
LOG(LVL_DEBUG, "duplicate ACK for seq_nr %u", acked_seq);
}
if(do_ack && packets_available == 0) {
// no packets left in queue, but an acknowledgement must be
// transmitted. Add an empty packet to do that.
result_t res = connection_add_empty_packet(ctx, false);
@ -373,3 +593,55 @@ bool connection_can_transmit(const connection_ctx_t *ctx)
return (packet_queue_get_used_space(&ctx->packet_queue) != 0)
&& (packet_queue_get(&ctx->packet_queue, ctx->next_packet_index) != NULL);
}
bool connection_is_closed(const connection_ctx_t *ctx)
{
switch(ctx->conn_state) {
case CONN_STATE_UNINITIALIZED:
case CONN_STATE_INITIALIZED:
case CONN_STATE_CLOSED:
return true;
case CONN_STATE_CONNECTING:
case CONN_STATE_ESTABLISHED:
return false;
}
return true;
}
result_t connection_maintain(connection_ctx_t *ctx, connection_evt_t *evt)
{
uint64_t now = get_hires_time();
if(now > ctx->last_rx_time + HRTIME_MS(CONNECTION_TIMEOUT_MS)) {
LOG(LVL_INFO, "Connection timed out.");
ctx->conn_state = CONN_STATE_CLOSED;
*evt = CONN_EVT_TIMEOUT;
return OK;
}
if(ctx->retransmit_time != 0 && now >= ctx->retransmit_time) {
LOG(LVL_INFO, "Retransmit triggered.");
ctx->retransmit_time = 0;
connection_restart_tx(ctx);
*evt = CONN_EVT_RETRANSMIT;
return OK;
}
return OK;
}
bool connection_has_ipv6_peer_address(connection_ctx_t *ctx, const uint8_t *address_to_check)
{
for(size_t i = 0; i < 16; i++) {
if(address_to_check[i] != ctx->peer_ipv6_addr.s6_addr[i]) {
return false;
}
}
return true;
}

View file

@ -14,6 +14,8 @@
#include "packet_queue.h"
#include <arpa/inet.h>
struct connection_ctx_s;
typedef enum {
@ -24,17 +26,20 @@ typedef enum {
CONN_STATE_CLOSED //!< Connection has been closed (gracefully or by timeout)
} connection_state_t;
/*!\brief Type for a callback function that is called when a data packet was received. */
typedef void (*connection_data_callback_t)(struct connection_ctx_s *conn, const uint8_t *data, size_t len);
typedef enum {
CONN_EVT_NONE, //!< No event has occurred
CONN_EVT_TIMEOUT, //!< The connection timed out because no packets were received
CONN_EVT_RETRANSMIT, //!< Packet queue transmission is restarted
} connection_evt_t;
typedef struct connection_ctx_s {
connection_state_t conn_state; //!< State of the connection.
connection_data_callback_t data_cb; //!< Callback function for received data packets.
ham64_t my_addr; //!< The local link layer address.
ham64_t peer_addr; //!< The link layer address of the peer.
struct in6_addr peer_ipv6_addr; //!< The peers IPv6 address (generated from LL address)
uint8_t last_acked_seq; //!< Next sequence number expected by the peer (from last Ack).
uint8_t next_expected_seq; //!< Next sequence number expected by us.
@ -42,6 +47,9 @@ typedef struct connection_ctx_s {
size_t next_packet_index; //!< Index in the packet queue of the next packet to transmit.
uint8_t next_seq_nr; //!< Sequence number to tag the next transmitted packet with.
uint64_t retransmit_time; //!< Time when a retransmit shall be triggered.
uint64_t last_rx_time; //!< Time when a packet was last received and decoded.
} connection_ctx_t;
@ -52,20 +60,62 @@ typedef struct connection_ctx_s {
* \param peer_addr The remote link layer address.
* \returns OK if everything worked or a fitting error code.
*/
result_t connection_init(connection_ctx_t *ctx, const ham64_t *my_addr, const ham64_t *peer_addr);
result_t connection_init(
connection_ctx_t *ctx,
const ham64_t *my_addr,
const ham64_t *peer_addr);
/*!\brief Destroy the given layer 2 connection context.
*/
void connection_destroy(connection_ctx_t *ctx);
/*!\brief Set up a outgoing connection.
*
* This puts the connection into CONN_STATE_CONNECTING state, which causes
* beacons to be handled.
*/
void connection_setup_outgoing(connection_ctx_t *ctx);
/*!\brief Set up an incoming connection.
*
* As this function is intended to be called after a connection request was
* handled, it puts the connection directly into CONN_STATE_ESTABLISHED state.
*/
void connection_setup_incoming(connection_ctx_t *ctx);
/*!\brief Handle a received packet.
*
* \param ctx The connection context.
* \param buf Pointer to the packet data.
* \param buf_len Length of the packet.
* \param[inout] ctx The connection context.
* \param[in] buf Pointer to the packet data.
* \param[in] buf_len Length of the packet.
* \param[out] data_packet Structure will be filled with a received data packet.
* \returns A result code from the packet handling procedure.
*/
result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, size_t buf_len);
result_t connection_handle_packet(
connection_ctx_t *ctx,
const uint8_t *buf,
size_t buf_len,
layer2_data_packet_t *data_packet);
/*!\brief Handle a received packet where the header has already been decoded.
*
* This function assumes that the following basic checks were already done:
* - CRC is correct
* - Header can be decoded
* - Destination address is the local address
*
* \param[inout] ctx The connection context.
* \param[in] header Pointer to the decoded header structure.
* \param[in] payload Pointer to the payload data.
* \param[in] payload_len Length of the payload data.
* \param[out] data_packet Structure will be filled with a received data packet.
* \returns A result code from the packet handling procedure.
*/
result_t connection_handle_packet_prechecked(
connection_ctx_t *ctx,
const layer2_packet_header_t *header,
const uint8_t *payload, size_t payload_len,
layer2_data_packet_t *data_packet);
/*!\brief Return the sequence number expected next by our side.
*/
@ -76,9 +126,32 @@ uint8_t connection_get_next_expected_seq(const connection_ctx_t *ctx);
uint8_t connection_get_last_acked_seq(const connection_ctx_t *ctx);
/*!\brief Enqueue a packet for transmission.
* \param ctx The connection context.
* \param header Pointer to the packet header.
* \param payload Pointer to the payload.
* \param payload_len Length of the payload.
*/
result_t connection_enqueue_packet(
connection_ctx_t *ctx,
const layer2_packet_header_t *header,
uint8_t *payload, size_t payload_len);
/*!\brief Enqueue a data packet for transmission.
* \param ctx The connection context.
* \param payload_type Type of the payload.
* \param buf Pointer to the data buffer.
* \param buf_len Length of the data.
*/
result_t connection_enqueue_data_packet(
connection_ctx_t *ctx,
layer2_payload_type_t payload_type,
uint8_t *buf,
size_t buf_len);
/*!\brief Check if there is free space in the TX packet queue.
* \param ctx The connection context.
*/
result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len);
bool connection_can_enqueue_packet(const connection_ctx_t *ctx);
/*!\brief Add an empty packet to ensure an acknowledgement is sent.
* \param ctx The connection context.
@ -86,6 +159,14 @@ result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t b
*/
result_t connection_add_empty_packet(connection_ctx_t *ctx, bool tx_request);
/*!\brief Send connection parameters.
*
* This packet accepts an incoming connection request.
*
* \param ctx The connection context.
*/
result_t connection_send_parameters(connection_ctx_t *ctx);
/*!\brief Encode the next packet for transmission.
*
* \note
@ -94,12 +175,12 @@ result_t connection_add_empty_packet(connection_ctx_t *ctx, bool tx_request);
* called to handle retransmits correctly.
*
* \param ctx The connection context.
* \param ack_seq_nr The received sequence number to send as an acknowledgement.
* \param buf Where to write the encoded packet data.
* \param buf_len Space available in the buffer.
* \param end_burst Output parameter that is set to true if this is the last packet in a burst.
* \returns The number of bytes written to buf or zero if no packet was available.
*/
size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t ack_seq_nr, uint8_t *buf, size_t buf_len);
size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len, bool *end_burst);
/*!\brief Restart the transmission from the beginning of the packet queue.
*/
@ -117,10 +198,30 @@ void connection_tx_clean_empty_packet(connection_ctx_t *ctx);
* \param acked_seq The acknowledged (= next expected) sequence number.
* \param do_ack Whether an empty packet shall be generated if the queue is empty.
*/
void connection_handle_ack(connection_ctx_t *ctx, uint8_t acked_seq);
void connection_handle_ack(connection_ctx_t *ctx, uint8_t acked_seq, bool do_ack);
/*!\brief Check if there are packets queued for transmission.
*/
bool connection_can_transmit(const connection_ctx_t *ctx);
/*!\brief Check if this connection is closed.
*/
bool connection_is_closed(const connection_ctx_t *ctx);
/*!\brief Handle internal maintenance tasks.
*
* This should be called periodically to handle timeouts and retransmissions.
*
* \param ctx[in] The connection context.
* \param evt[out] Set to an event that occurred, or CONN_EVT_NONE.
*/
result_t connection_maintain(connection_ctx_t *ctx, connection_evt_t *evt);
/*!\brief Check if the given IPv6 peer address belongs to this connection.
*
* \param address_to_check Pointer to the IPv6 address to check. Must point to 16 bytes of data.
* \returns True if this address is handled by this connection, false otherwise.
*/
bool connection_has_ipv6_peer_address(connection_ctx_t *ctx, const uint8_t *address_to_check);
#endif // CONNECTION_H

View file

@ -0,0 +1,241 @@
#include <string.h>
#include <assert.h>
#define LOGGER_MODULE_NAME "clist"
#include "logger.h"
#include "connection_list.h"
#include "layer2/connection.h"
#include "results.h"
#include "utils.h"
#define SEQ_NR_MASK 0xF
static void destroy_entry(connection_list_entry_t *entry)
{
connection_destroy(&entry->connection);
free(entry);
}
// find the location where the timestamp should be inserted to maintain
// ascending timestamp order and return the then-previous entry.
//
// If NULL is returned, the list is either empty or the entry should be
// inserted before the head. This must be checked externally.
static connection_list_entry_t* find_prev_for_timestamp(connection_list_t *list, uint64_t timestamp)
{
if(!list->head) {
return NULL;
}
if(timestamp < list->head->next_access_time) {
return NULL;
}
connection_list_entry_t *prev = list->head;
while(prev->next) {
if(prev->next_access_time < timestamp
&& prev->next->next_access_time >= timestamp) {
// location found!
break;
}
prev = prev->next;
}
// either the correct location was found or prev now points to the last entry
// in the list.
return prev;
}
result_t connection_list_init(connection_list_t *list)
{
list->head = NULL;
return OK;
}
void connection_list_destroy(connection_list_t *list)
{
// delete all list entries
while(list->head) {
connection_list_entry_t *next = list->head->next;
destroy_entry(list->head);
list->head = next;
}
}
result_t connection_list_insert(connection_list_t *list, const connection_ctx_t *conn, uint64_t next_access_time)
{
connection_list_entry_t *new_entry = malloc(sizeof(connection_list_entry_t));
if(!new_entry) {
return ERR_NO_MEM;
}
new_entry->connection = *conn;
new_entry->next_access_time = next_access_time;
connection_list_entry_t *prev = find_prev_for_timestamp(list, next_access_time);
if(!prev) {
// the new entry should become the lists head (also works if the list is empty!)
new_entry->next = list->head;
list->head = new_entry;
} else {
// insert after prev
new_entry->next = prev->next;
prev->next = new_entry;
}
return OK;
}
connection_list_entry_t* connection_list_get_head(connection_list_t *list)
{
return list->head;
}
result_t connection_list_reschedule_head(connection_list_t *list, uint64_t next_access_timestamp)
{
if(!list->head) {
return ERR_INVALID_STATE;
}
if(!list->head->next) {
// nothing to do because there is only one entry.
return OK;
}
if(next_access_timestamp < list->head->next->next_access_time) {
// the head does not need to be moved because the new timestamp is still
// smaller than that of the second entry => only update the timestamp.
list->head->next_access_time = next_access_timestamp;
return OK;
}
// detach the entry from the list
connection_list_entry_t *reloc_entry = list->head;
list->head = list->head->next;
// find the location where the entry should be reinserted
connection_list_entry_t *prev = find_prev_for_timestamp(list, next_access_timestamp);
if(!prev) {
// the relocated entry should become the lists head (also works if the list is empty!)
reloc_entry->next = list->head;
list->head = reloc_entry;
} else {
// insert after prev
reloc_entry->next = prev->next;
prev->next = reloc_entry;
}
return OK;
}
result_t connection_list_delete_head(connection_list_t *list)
{
if(!list->head) {
return ERR_INVALID_STATE;
}
connection_list_entry_t *new_head = list->head->next;
destroy_entry(list->head);
list->head = new_head;
return OK;
}
result_t connection_list_delete_closed(connection_list_t *list)
{
if(!list->head) {
return OK;
}
connection_list_entry_t *prev = NULL;
connection_list_entry_t *cur = list->head;
while(cur) {
if(connection_is_closed(&cur->connection)) {
connection_list_entry_t *to_delete = cur;
cur = cur->next;
if(prev) {
prev->next = to_delete->next;
} else {
list->head = to_delete->next;
}
destroy_entry(to_delete);
} else {
prev = cur;
cur = cur->next;
}
}
return OK;
}
result_t connection_list_enqueue_packet(connection_list_t *list, uint8_t *data, size_t data_len)
{
if(data_len < 40) {
// packet not large enough for an IPv6 header
LOG(LVL_DEBUG, "Packet size too small: %zu bytes given, 40 bytes needed.", data_len);
return ERR_INVALID_PARAM;
}
uint8_t version = data[0] >> 4;
if(version != 6) {
LOG(LVL_DEBUG, "IP version (%i) is not 6.", version);
return ERR_INVALID_PARAM;
}
uint8_t *dest_addr = data + 24;
// search the list for the destination address
connection_list_entry_t *ptr = list->head;
while(ptr) {
if(connection_has_ipv6_peer_address(&ptr->connection, dest_addr)) {
// found it!
break;
}
ptr = ptr->next;
}
if(!ptr) {
// address not found in any connection
return ERR_INVALID_ADDRESS;
}
return connection_enqueue_data_packet(&ptr->connection, L2_PAYLOAD_TYPE_IPV6, data, data_len);
}
bool connection_list_can_enqueue_packet(connection_list_t *list)
{
if(!list->head) {
return false; // no entries -> no free space
}
connection_list_entry_t *entry = list->head;
while(entry) {
if(!connection_can_enqueue_packet(&entry->connection)) {
return false;
}
entry = entry->next;
}
return true;
}

View file

@ -0,0 +1,109 @@
/*
* This file contains functions to manage a list of layer 2 connections for
* scheduling purposes.
*
* SPDX-License-Identifier: GPL-3.0-or-later
*
* Copyright (C) 2024 Thomas Kolb
*/
#ifndef CONNECTION_LIST_H
#define CONNECTION_LIST_H
#include <results.h>
#include <stdbool.h>
#include "connection.h"
typedef struct connection_list_entry_s {
uint64_t next_access_time; //!< When to next activate this connection
connection_ctx_t connection; //!< The actual connection entry
struct connection_list_entry_s *next; //!< pointer to the next list element
} connection_list_entry_t;
typedef struct connection_list_s {
connection_list_entry_t *head; //!< pointer to the first list element
} connection_list_t;
/*!\brief Initialize a connection list.
*
* \param list The list to initialize.
* \returns OK if everything worked or a fitting error code.
*/
result_t connection_list_init(connection_list_t *list);
/*!\brief Destroy the given layer 2 connection list, deleting all internal data.
*/
void connection_list_destroy(connection_list_t *list);
/*!\brief Insert a connection context into the list.
*
* The connection context will be copied internally. The original copy should
* no longer be used after the context was inserted into the list.
*
* \param list Pointer to the list where the entry shall be inserted.
* \param conn Pointer to the connection context.
* \param next_access_time Timestamp when this connection should be activated.
* \returns OK if everything worked or a fitting error code.
*/
result_t connection_list_insert(connection_list_t *list, const connection_ctx_t *conn, uint64_t next_access_time);
/*!\brief Get the head (first entry) of the list.
*
* \param list Pointer to the list to access.
*
* \returns A pointer to the head entry or NULL if the list is empty.
*/
connection_list_entry_t* connection_list_get_head(connection_list_t *list);
/*!\brief Re-schedule the current head entry.
*
* \param list Pointer to the list to access.
* \param next_access_time Timestamp when this connection should be activated again.
*
* \returns OK if everything worked or a fitting error code.
*/
result_t connection_list_reschedule_head(connection_list_t *list, uint64_t next_access_timestamp);
/*!\brief Delete the current head entry.
*
* \param list Pointer to the list to access.
*
* \returns OK if everything worked or a fitting error code.
*/
result_t connection_list_delete_head(connection_list_t *list);
/*!\brief Delete all connections that are in a closed state.
*
* \param list Pointer to the list to clean.
*
* \returns OK if everything worked or a fitting error code.
*/
result_t connection_list_delete_closed(connection_list_t *list);
/*!\brief Insert a packet in the appropriate connections queue.
*
* The appropriate connection is selected by extracting the destination IP
* address from the packet and comparing it to the peer address of the
* connection.
*
* \param list Pointer to the connection list to operate on.
* \param packet Pointer to the raw packet data (no TUNTAP header!).
* \param packet_len Length of the packet data.
*
* \retval OK if the packet was inserted.
* \retval ERR_NO_MEM if the target queue was full.
* \retval ERR_INVALID_ADDRESS if the destination address does not match any connection.
* \retval ERR_INVALID_PARAM if the packet format is not supported.
*/
result_t connection_list_enqueue_packet(connection_list_t *list, uint8_t *data, size_t data_len);
/*!\brief Determine if a packet can be enqueued in all queues.
*
* \returns False if any queue is full, true otherwise.
*/
bool connection_list_can_enqueue_packet(connection_list_t *list);
#endif // CONNECTION_LIST_H

View file

@ -0,0 +1,385 @@
#include <poll.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#define LOGGER_MODULE_NAME "digi"
#include "logger.h"
#include "digipeater.h"
#include "config.h"
#include "layer2/connection.h"
#include "layer2/connection_list.h"
#include "layer2/ham64.h"
#include "layer2/packet_structs.h"
#include "layer2/packet_queue.h"
#include "results.h"
#include "utils.h"
static result_t digipeater_handle_beacon_responses(digipeater_ctx_t *ctx, const layer2_packet_header_t *header, const uint8_t *buf, size_t buf_len)
{
LOG(LVL_DEBUG, "Handling beacon response packet.");
layer2_dump_packet_header(LVL_DUMP, header);
if(header->msg_type != L2_MSG_TYPE_CONN_MGMT) {
LOG(LVL_ERR, "Beacon response with invalid message type %i", header->msg_type);
return ERR_INVALID_PARAM;
}
if(buf_len == 0) {
LOG(LVL_ERR, "Missing payload in beacon response");
return ERR_INVALID_PARAM;
}
uint8_t conn_mgmt_type = buf[0];
if(conn_mgmt_type != CONN_MGMT_TYPE_CONNECTION_REQUEST) {
LOG(LVL_ERR, "Unexpected connection management type in beacon response: 0x%02x", conn_mgmt_type);
return ERR_INVALID_PARAM;
}
// packet is valid -> create a new connection, enqueue the parameters message
// and add it at the beginning of the connection list.
connection_ctx_t new_conn;
ERR_CHECK(connection_init(&new_conn, &ctx->my_addr, &header->src_addr));
ERR_CHECK(connection_send_parameters(&new_conn));
uint64_t now = get_hires_time();
ERR_CHECK(connection_list_insert(&ctx->conn_list, &new_conn, now));
return OK;
}
static size_t encode_beacon_packet(digipeater_ctx_t *ctx, uint8_t *buf, size_t buf_len)
{
// build a beacon packet
layer2_packet_header_t header;
ham64_t broadcast = {{0xFFFF, 0, 0, 0}, 1};
header.dst_addr = broadcast;
header.src_addr = ctx->my_addr;
header.msg_type = L2_MSG_TYPE_CONN_MGMT;
header.rx_seq_nr = 0; // unused
header.tx_seq_nr = 0; // unused
header.tx_request = 1;
uint8_t payload[1] = {CONN_MGMT_TYPE_BEACON};
return layer2_encode_packet(&header, payload, 1, buf, buf_len);
}
result_t digipeater_init(digipeater_ctx_t *ctx, const ham64_t *my_addr)
{
ctx->my_addr = *my_addr;
ctx->state = DIGIPEATER_STATE_CONN;
packet_queue_init(&ctx->oneshot_queue);
uint64_t now = get_hires_time();
ctx->next_beacon_time = now + HRTIME_MS(BEACON_INTERVAL_MS);
ctx->cycle_end_time = now;
return connection_list_init(&ctx->conn_list);
}
void digipeater_destroy(digipeater_ctx_t *ctx)
{
connection_list_destroy(&ctx->conn_list);
}
result_t digipeater_handle_packet(
digipeater_ctx_t *ctx,
const uint8_t *buf,
size_t buf_len,
layer2_data_packet_t *data_packet)
{
data_packet->payload_type = L2_PAYLOAD_TYPE_INVALID;
data_packet->payload_len = 0;
// check the CRC
size_t packet_size = buf_len - crc_sizeof_key(PAYLOAD_CRC_SCHEME);
if(!crc_check_key(PAYLOAD_CRC_SCHEME, (unsigned char*)buf, packet_size)) {
LOG(LVL_ERR, "CRC check failed!");
return ERR_INTEGRITY;
}
// decode the header
layer2_packet_header_t header;
if(!layer2_decode_packet_header(buf, buf_len, &header)) {
LOG(LVL_ERR, "Header could not be decoded!");
return ERR_INTEGRITY;
}
layer2_dump_packet_header(LVL_DUMP, &header);
// check if the packet really should be handled by us
if(ham64_is_equal(&ctx->my_addr, &header.src_addr)) {
LOG(LVL_DEBUG, "Packet is from ourselves. Ignored.");
return OK;
}
if(!ham64_is_equal(&header.dst_addr, &ctx->my_addr)) {
char fmt_dst_addr[HAM64_FMT_MAX_LEN];
char fmt_my_addr[HAM64_FMT_MAX_LEN];
ham64_format(&header.dst_addr, fmt_dst_addr);
ham64_format(&ctx->my_addr, fmt_my_addr);
LOG(LVL_ERR, "Packet has the wrong destination address: got %s, expected %s",
fmt_dst_addr, fmt_my_addr);
return ERR_INVALID_ADDRESS;
}
// FIXME: handle connection management packets here (or somewhere else):
// - Disconnect Request
size_t header_size = layer2_get_encoded_header_size(&header);
const uint8_t *payload = buf + header_size;
size_t payload_len = packet_size - header_size;
result_t result = OK;
switch(ctx->state) {
case DIGIPEATER_STATE_CONN:
{
connection_list_entry_t *head = connection_list_get_head(&ctx->conn_list);
if(head) {
connection_ctx_t *current_conn = &head->connection;
result = connection_handle_packet_prechecked(
current_conn, &header, payload, payload_len, data_packet);
} else {
LOG(LVL_WARN, "Digipeater in CONN state, but there is no active connection! Packet dropped.");
result = OK;
}
}
break;
case DIGIPEATER_STATE_BEACON:
result = digipeater_handle_beacon_responses(ctx, &header, payload, payload_len);
}
// end the current cycle if tx_request is set in an incoming packet
if(header.tx_request) {
LOG(LVL_INFO, "TX Request was received. Ending current cycle.");
digipeater_end_cycle(ctx);
}
return result;
}
result_t digipeater_fill_packet_queues_from_tundev(digipeater_ctx_t *ctx, int tun_fd)
{
// first check if any queue is already full, so we don't have to drop packets
// from the TUN device queue.
if(!connection_list_can_enqueue_packet(&ctx->conn_list)) {
LOG(LVL_DEBUG, "No connection or no free space in queues.");
return OK; // do nothing
}
struct pollfd pfd_tun;
memset(&pfd_tun, 0, sizeof(pfd_tun));
pfd_tun.fd = tun_fd;
pfd_tun.events = POLLIN;
while(connection_list_can_enqueue_packet(&ctx->conn_list)) {
int ret = poll(&pfd_tun, 1, 0 /* timeout */);
if(ret < 0) {
LOG(LVL_ERR, "poll: %s", strerror(errno));
return ERR_SYSCALL;
} else if(ret == 0) {
// no more packets
break;
} else {
// a packet is available -> read it
static const size_t packetbuf_size = 2048;
uint8_t packetbuf[packetbuf_size];
ret = read(tun_fd, packetbuf, packetbuf_size);
if(ret < 0) {
LOG(LVL_ERR, "read: %s", strerror(errno));
return ERR_SYSCALL;
} else if(ret == 0) {
// no more data, should not happen
break;
} else if(ret < 4) {
LOG(LVL_ERR, "Not enough data from TUN read() to check packet type!");
return ERR_SYSCALL;
}
uint16_t flags = *(uint16_t*)packetbuf;
uint16_t proto = *((uint16_t*)packetbuf + 1);
LOG(LVL_DUMP, "TUN Flags: 0x%04x", flags);
LOG(LVL_DUMP, "TUN Proto: 0x%04x", proto);
uint8_t *packet_data = packetbuf + 4;
size_t packet_length = ret - 4;
// note: octets are swapped in case statements
switch(proto) {
case 0xdd86: // IPv6
WARN_ON_ERR(connection_list_enqueue_packet(&ctx->conn_list, packet_data, packet_length));
break;
default:
LOG(LVL_WARN, "Unsupported Protocol 0x%04x. Packet dropped.", proto);
continue;
}
}
}
return OK;
}
size_t digipeater_encode_next_packet(digipeater_ctx_t *ctx, uint8_t *buf, size_t buf_len, bool *end_burst)
{
size_t packet_size = 0;
*end_burst = false;
// send packets from the one-shot queue
const packet_queue_entry_t *queue_entry = packet_queue_get(&ctx->oneshot_queue, 0);
if(queue_entry) {
*end_burst = queue_entry->header.tx_request == 1;
packet_size = layer2_encode_packet(&queue_entry->header, queue_entry->data, queue_entry->data_len, buf, buf_len);
if(packet_size) {
packet_queue_delete(&ctx->oneshot_queue, 1);
return packet_size;
}
}
switch(ctx->state) {
case DIGIPEATER_STATE_BEACON:
packet_size = encode_beacon_packet(ctx, buf, buf_len);
ctx->next_beacon_time += HRTIME_MS(BEACON_INTERVAL_MS);
*end_burst = true;
break;
case DIGIPEATER_STATE_CONN: {
// pull packets from the current connection
connection_list_entry_t *head = connection_list_get_head(&ctx->conn_list);
if(!head) {
return 0;
}
connection_ctx_t *conn = &head->connection;
if(connection_can_transmit(conn)) {
packet_size = connection_encode_next_packet(conn, buf, buf_len, end_burst);
if(*end_burst) {
connection_tx_clean_empty_packet(conn);
}
ctx->state = DIGIPEATER_STATE_CONN;
}
}
break;
}
return packet_size;
}
bool digipeater_can_transmit(digipeater_ctx_t *ctx)
{
uint64_t now = get_hires_time();
if(now > ctx->next_beacon_time) {
return true;
}
if(packet_queue_get_used_space(&ctx->oneshot_queue) != 0) {
return true;
}
connection_list_entry_t *head = connection_list_get_head(&ctx->conn_list);
if(head) {
return connection_can_transmit(&head->connection);
}
return false;
}
void digipeater_extend_cycle(digipeater_ctx_t *ctx, uint64_t ns)
{
ctx->cycle_end_time += ns;
}
result_t digipeater_end_cycle(digipeater_ctx_t *ctx)
{
uint64_t now = get_hires_time();
if(now >= ctx->next_beacon_time) {
ctx->state = DIGIPEATER_STATE_BEACON;
} else {
// TODO: adjust the time based on connection activity; right now this results
// in round-robin scheduling
connection_list_reschedule_head(&ctx->conn_list, now + HRTIME_MS(MIN_INTERVAL_TIME_MS));
ctx->state = DIGIPEATER_STATE_CONN;
ctx->cycle_end_time = now + HRTIME_MS(MIN_INTERVAL_TIME_MS);
}
return OK;
}
result_t digipeater_maintain(digipeater_ctx_t *ctx)
{
uint64_t now = get_hires_time();
if(now > ctx->cycle_end_time) {
// at the end of the cycle, the next connection is activated and the
// current one is re-scheduled.
LOG(LVL_DEBUG, "Interval ended by timeout at %llu ns.", now);
digipeater_end_cycle(ctx);
}
switch(ctx->state) {
case DIGIPEATER_STATE_CONN:
{
connection_list_entry_t *head = connection_list_get_head(&ctx->conn_list);
if(!head) {
LOG(LVL_INFO, "No active connection -> force beacon state for packet handling.");
ctx->state = DIGIPEATER_STATE_BEACON;
} else {
connection_evt_t evt;
ERR_CHECK(connection_maintain(&head->connection, &evt));
switch(evt) {
case CONN_EVT_TIMEOUT:
// connection has been closed by timeout -> clean up the list
connection_list_delete_closed(&ctx->conn_list);
break;
default:
// do nothing
break;
}
}
break;
}
case DIGIPEATER_STATE_BEACON:
// nothing to do here
break;
}
return OK;
}

View file

@ -0,0 +1,121 @@
/*
* This file contains functions to handle a single layer 2 digipeater.
*
* SPDX-License-Identifier: GPL-3.0-or-later
*
* Copyright (C) 2024 Thomas Kolb
*/
#ifndef DIGIPEATER_H
#define DIGIPEATER_H
#include <results.h>
#include <stdbool.h>
#include "connection_list.h"
#include "layer2/packet_queue.h"
struct digipeater_ctx_s;
typedef enum {
DIGIPEATER_STATE_BEACON, //!< Beacon sent, waiting for connection requests
DIGIPEATER_STATE_CONN, //!< Handling client connections
} digipeater_state_t;
typedef enum {
DIGIPEATER_EVT_INTERVAL_END, //!< The current cycle has ended and new packets should be transmitted.
} digipeater_evt_t;
typedef struct digipeater_ctx_s {
digipeater_state_t state; //!< Current operating state
ham64_t my_addr; //!< The local link layer address.
packet_queue_t oneshot_queue; //!< Queue for packets that are sent once and connection-independent
uint64_t next_beacon_time; //!< Absolute timestamp of the next beacon transmission.
uint64_t cycle_end_time; //!< Absolute timestamp of the end of the current cycle.
connection_list_t conn_list; //!< List of connections.
} digipeater_ctx_t;
/*!\brief Initialize the digipeater context.
*
* \param ctx The digipeater context to initialize.
* \param my_addr The local link layer address.
* \returns OK if everything worked or a fitting error code.
*/
result_t digipeater_init(
digipeater_ctx_t *ctx,
const ham64_t *my_addr);
/*!\brief Destroy the given digipeater context.
*/
void digipeater_destroy(digipeater_ctx_t *ctx);
/*!\brief Handle a received packet.
*
* \param[inout] ctx The digipeater context.
* \param[in] buf Pointer to the packet data.
* \param[in] buf_len Length of the packet.
* \param[out] data_packet Structure will be filled with a received data packet.
* \returns A result code from the packet handling procedure.
*/
result_t digipeater_handle_packet(
digipeater_ctx_t *ctx,
const uint8_t *buf,
size_t buf_len,
layer2_data_packet_t *data_packet);
/*!\brief Enqueue a packet for transmission.
* \param ctx The digipeater context.
* \param tun_fd File descriptor for an open TUN device.
*/
result_t digipeater_fill_packet_queues_from_tundev(digipeater_ctx_t *ctx, int tun_fd);
/*!\brief Encode the next packet for transmission.
*
* \note
* If no packets are currently available for transmission, this function returns zero.
*
* \param ctx The digipeater context.
* \param buf Where to write the encoded packet data.
* \param buf_len Space available in the buffer.
* \param end_burst Set to true if this packet should end the current burst and start the transmission.
* \returns The number of bytes written to buf or zero if no packet was available.
*/
size_t digipeater_encode_next_packet(digipeater_ctx_t *ctx, uint8_t *buf, size_t buf_len, bool *end_burst);
/*!\brief Check if there are packets queued for transmission.
*/
bool digipeater_can_transmit(digipeater_ctx_t *ctx);
/*!\brief Extend the current cycle.
*
* By default, the cycle duration is set to a minimum length. It must be
* extended by the time needed to transmit and receive packets. As the time
* necessary for packet transfer is unknown to the Layer 2, it must be
* calculated externally.
*
* This function should be called whenever
* - A packet is transmitted from digipeater_encode_next_packet(), or
* - A packet is being received
*/
void digipeater_extend_cycle(digipeater_ctx_t *ctx, uint64_t ns);
/*!\brief End the current cycle.
*
* End the cycle without waiting for the timeout. This switches to the next
* connection or transmits a beacon. In any case, it stops forwarding received
* packets to the current connection.
*/
result_t digipeater_end_cycle(digipeater_ctx_t *ctx);
/*!\brief Handle internal maintenance tasks.
*
* This should be called periodically to handle timeouts and retransmissions.
*/
result_t digipeater_maintain(digipeater_ctx_t *ctx);
#endif // DIGIPEATER_H

View file

@ -8,9 +8,13 @@
#include <assert.h>
#include <string.h>
#include <liquid/liquid.h>
#define LOGGER_MODULE_NAME "l2ps"
#include "logger.h"
#include "config.h"
#include "packet_structs.h"
@ -129,3 +133,36 @@ void layer2_dump_packet_header(int level, const layer2_packet_header_t *header)
}
}
size_t layer2_encode_packet(
const layer2_packet_header_t *header,
uint8_t *payload, size_t payload_len,
uint8_t *data_out, size_t data_out_len)
{
unsigned int crc_size = crc_sizeof_key(PAYLOAD_CRC_SCHEME);
if(data_out_len < LAYER2_PACKET_HEADER_ENCODED_SIZE_MAX) {
return 0;
}
size_t packet_size = layer2_encode_packet_header(header, data_out);
if(data_out_len < packet_size + crc_size + payload_len) {
return 0;
}
// add the payload data
if(payload) {
memcpy(data_out + packet_size, payload, payload_len);
}
packet_size += payload_len;
// calculate CRC of everything and append it to the packet
crc_append_key(PAYLOAD_CRC_SCHEME, data_out, packet_size);
packet_size += crc_size;
return packet_size;
}

View file

@ -33,6 +33,7 @@ typedef struct layer2_packet_header_s {
ham64_t dst_addr; //!< destination HAM-64 address
} layer2_packet_header_t;
// maximum header size
// - 1 byte packet info
// - 1 byte sequence numbers
@ -74,6 +75,19 @@ bool layer2_decode_packet_header(const uint8_t *encoded, size_t encoded_len, lay
*/
void layer2_dump_packet_header(int level, const layer2_packet_header_t *header);
/*!\brief Encode a complete packet (with header and CRC)
* \param header The header structure to encode.
* \param payload The payload data to encode.
* \param payload_len The length of the payload data.
* \param data_out The buffer for the encoded packet data.
* \param data_out_len The size of the output buffer.
* \returns The number of bytes written to the output buffer or 0 if not enough space was available.
*/
size_t layer2_encode_packet(
const layer2_packet_header_t *header,
uint8_t *payload, size_t payload_len,
uint8_t *data_out, size_t data_out_len);
/*!\brief Get a string representation of the given message type.
*/
const char* layer2_msg_type_to_string(layer2_message_type_t type);
@ -82,15 +96,36 @@ const char* layer2_msg_type_to_string(layer2_message_type_t type);
typedef enum {
L2_PAYLOAD_TYPE_IPV4 = 0x00,
L2_PAYLOAD_TYPE_IPV6 = 0x01
L2_PAYLOAD_TYPE_IPV6 = 0x01,
L2_PAYLOAD_TYPE_INVALID = 0x7FFFFFFF
} layer2_payload_type_t;
typedef struct layer2_data_header_s {
typedef struct layer2_data_packet_s {
layer2_payload_type_t payload_type; //!< Type of the contained layer 3 packet
} layer2_data_header_t;
const uint8_t *payload; //!< Pointer to the payload data
size_t payload_len; //!< Length of the payload data
} layer2_data_packet_t;
/* Connection Management Structs */
// TODO
typedef enum {
CONN_MGMT_TYPE_BEACON = 0x00,
CONN_MGMT_TYPE_CONNECTION_REQUEST = 0x01,
CONN_MGMT_TYPE_CONNECTION_PARAMETERS = 0x02,
CONN_MGMT_TYPE_CONNECTION_RESET = 0x03,
CONN_MGMT_TYPE_DISCONNECT_REQUEST = 0x04,
CONN_MGMT_TYPE_DISCONNECT = 0x05,
} conn_mgmt_type_t;
typedef enum {
CONN_PARAM_TYPE_IPV6_ADDRESS = 0x00,
CONN_PARAM_TYPE_IPV6_GATEWAY = 0x01,
CONN_PARAM_TYPE_IPV6_DNS = 0x02,
CONN_PARAM_TYPE_IPV4_ADDRESS = 0x08,
CONN_PARAM_TYPE_IPV4_GATEWAY = 0x09,
CONN_PARAM_TYPE_IPV4_DNS = 0x0A,
} conn_param_type_t;
#endif // PACKET_STRUCTS_H

View file

@ -29,7 +29,7 @@ int tundev_open(char *dev)
struct ifreq ifr;
memset(&ifr, 0, sizeof(ifr));
ifr.ifr_flags = IFF_TUN | IFF_NO_PI;
ifr.ifr_flags = IFF_TUN;
strncpy(ifr.ifr_name, dev, IFNAMSIZ);

View file

@ -50,7 +50,7 @@
static int m_tunfd = -1;
static bool m_running = true;
static double next_tx_switch_time = 0.0;
static uint64_t next_tx_switch_time = 0;
static rx_stats_t m_rx_stats;
@ -71,7 +71,7 @@ static void signal_handler(int signal, siginfo_t *info, void *ctx)
static void block_tx_for(unsigned offset_ms)
{
next_tx_switch_time = get_hires_time() + (double)offset_ms * 0.001;
next_tx_switch_time = get_hires_time() + HRTIME_MS(offset_ms);
}
void print_complex_array(const char *varname, float complex const *array, size_t len)
@ -186,7 +186,7 @@ int main(int argc, char **argv)
bool on_air = true;
srand((int)(get_hires_time() * 1e6));
srand(get_hires_time());
// ** Initialize **
@ -233,18 +233,18 @@ int main(int argc, char **argv)
unsigned rx_retries = 0;
double old = get_hires_time();
uint64_t old = get_hires_time();
size_t total_samples = 0;
double next_stats_print_time = old + 0.5;
uint64_t next_stats_print_time = old + HRTIME_MS(500);
double retransmit_time = 0.0;
uint64_t retransmit_time = 0;
while(m_running) {
double now = get_hires_time();
uint64_t now = get_hires_time();
if(retransmit_time != 0.0 && now >= retransmit_time) {
if(retransmit_time != 0 && now >= retransmit_time) {
LOG(LVL_INFO, "Retransmit triggered.");
retransmit_time = 0.0;
retransmit_time = 0;
layer2_tx_restart(&l2tx);
}
@ -325,7 +325,7 @@ int main(int argc, char **argv)
RESULT_CHECK(sdr_start_rx(&sdr));
on_air = false;
retransmit_time = get_hires_time() + 1.0 + 1.0 * rand() / RAND_MAX;
retransmit_time = get_hires_time() + HRTIME_SEC(1) + HRTIME_SEC(1.0 * rand() / RAND_MAX);
block_tx_for(TX_SWITCH_BACKOFF_AFTER_RX_ON);
}
@ -360,9 +360,9 @@ int main(int argc, char **argv)
total_samples += n_rf_samples;
double new = get_hires_time();
uint64_t new = get_hires_time();
if(new >= next_stats_print_time) {
double rate = total_samples / (new - old);
double rate = total_samples * 1e9 / (new - old);
LOG(LVL_INFO, "\nEstimated rate: %.3f MS/s", rate / 1e6);
LOG(LVL_INFO, "Receiver statistics:");
LOG(LVL_INFO, " Preambles found: %8zd", m_rx_stats.preambles_found);

View file

@ -49,4 +49,12 @@ typedef enum {
} \
} while(0);
#define WARN_ON_ERR(call) \
do { \
result_t err_check_result = call; \
if(err_check_result != OK) { \
LOG(LVL_WARN, "Error ignored at %s:%d: %d", __FILE__, __LINE__, err_check_result); \
} \
} while(0);
#endif // RESULTS_H

View file

@ -99,9 +99,9 @@ static int tx_callback(hackrf_transfer *transfer)
return HACKRF_ERROR_OTHER;
}
if(sdr_ctx->tx_start_time == 0.0) {
if(sdr_ctx->tx_start_time == 0) {
sdr_ctx->tx_start_time = get_hires_time();
sdr_ctx->tx_duration = 10e-3; // give a little headroom
sdr_ctx->tx_duration = HRTIME_MS(10); // give a little headroom
LOG(LVL_INFO, "TX time tracking reset: start = %.3f.", sdr_ctx->tx_start_time);
}
@ -112,7 +112,7 @@ static int tx_callback(hackrf_transfer *transfer)
if(samples_read != 0) {
// only add time if any actual samples were transmitted
sdr_ctx->tx_duration += (double)samples_requested / SDR_TX_SAMPLING_RATE;
sdr_ctx->tx_duration += HRTIME_SEC((double)samples_requested / SDR_TX_SAMPLING_RATE);
}
LOG(LVL_DEBUG, "copied %u samples to HackRF.", samples_read);
@ -377,8 +377,8 @@ result_t sdr_flush_tx_buffer(sdr_ctx_t *ctx)
return 0;
}
double now = get_hires_time();
double end = ctx->tx_start_time + ctx->tx_duration;
uint64_t now = get_hires_time();
uint64_t end = ctx->tx_start_time + ctx->tx_duration;
if(sem_post(&ctx->buf_sem) < 0) {
LOG(LVL_ERR, "sem_post: %s", strerror(errno));

View file

@ -76,11 +76,23 @@ err_close:
return false;
}
double get_hires_time(void)
uint64_t get_hires_time(void)
{
struct timespec clk;
clock_gettime(CLOCK_MONOTONIC, &clk);
return clk.tv_sec + 1e-9 * clk.tv_nsec;
return clk.tv_sec * 1000000000ULL + (uint64_t)clk.tv_nsec;
}
void sleep_until(uint64_t hires_time)
{
struct timespec tv;
int ret;
tv.tv_sec = hires_time / 1000000000ULL;
tv.tv_nsec = hires_time % 1000000000ULL;
do {
ret = clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &tv, NULL);
} while(ret == EINTR);
}
void fsleep(double d)
@ -93,18 +105,6 @@ void fsleep(double d)
nanosleep(&ts, NULL);
}
void sleep_until(double hires_time)
{
struct timespec tv;
int ret;
tv.tv_sec = hires_time;
tv.tv_nsec = (uint64_t)(1e9 * hires_time) % 1000000000;
do {
ret = clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &tv, NULL);
} while(ret == EINTR);
}
void hexdump(const uint8_t *data, size_t len)
{
static const char lut[16] = "0123456789ABCDEF";

View file

@ -11,6 +11,10 @@
#include <stdbool.h>
#include <liquid/liquid.h>
#define HRTIME_US(x) ((uint64_t)(1000ULL * (x)))
#define HRTIME_MS(x) ((uint64_t)(1000000ULL * (x)))
#define HRTIME_SEC(x) ((uint64_t)(1000000000ULL * (x)))
/*! Dump a array of complex numbers.
*
* \param data Pointer to the data to dump.
@ -30,9 +34,26 @@ bool dump_array_cf(const float complex *data, size_t n, float T, const char *fil
*/
bool dump_array_f(const float *data, size_t n, float T, const char *filename);
void sleep_until(double hires_time);
/*! Sleep until the given absolute timestamp in ns.
*
* The current timestamp can be retrieved using \ref get_hires_time().
*
* \param hires_time The resume timestamp in ns.
*/
void sleep_until(uint64_t hires_time);
/*! Returns the current high-resulution timestamp.
*
* This timestamp comes from the CLOCK_MONOTONIC source and has no defined relation to the wall-clock time. It can be used to calculate intervals, though.
*
* \returns A timestamp in nanosecond resolution.
*/
uint64_t get_hires_time(void);
void fsleep(double d);
double get_hires_time(void);
void hexdump(const uint8_t *data, size_t len);

View file

@ -222,3 +222,7 @@ target_link_libraries(
test_interleaver
m
)
#------------------------------------
add_subdirectory(layer2_over_udp)

View file

@ -0,0 +1,73 @@
add_executable(
l2udptest_client
../../src/utils.c
../../src/utils.h
../../src/logger.c
../../src/logger.h
../../src/options.c
../../src/options.h
../../src/var_array.c
../../src/var_array.h
../../src/config.h
../../src/jsonlogger.c
../../src/jsonlogger.h
../../src/debug_structs.h
../../src/layer2/packet_structs.c
../../src/layer2/packet_structs.h
../../src/layer2/ham64.c
../../src/layer2/ham64.h
../../src/layer2/packet_queue.c
../../src/layer2/packet_queue.h
../../src/layer2/connection.c
../../src/layer2/connection.h
../../src/layer2/tundev.c
../../src/layer2/tundev.h
l2udptest_client.c
)
target_link_libraries(
l2udptest_client
fec
m
liquid
)
#---------------------------
add_executable(
l2udptest_digipeater
../../src/utils.c
../../src/utils.h
../../src/logger.c
../../src/logger.h
../../src/options.c
../../src/options.h
../../src/var_array.c
../../src/var_array.h
../../src/config.h
../../src/jsonlogger.c
../../src/jsonlogger.h
../../src/debug_structs.h
../../src/layer2/packet_structs.c
../../src/layer2/packet_structs.h
../../src/layer2/ham64.c
../../src/layer2/ham64.h
../../src/layer2/packet_queue.c
../../src/layer2/packet_queue.h
../../src/layer2/connection.c
../../src/layer2/connection.h
../../src/layer2/connection_list.c
../../src/layer2/connection_list.h
../../src/layer2/digipeater.c
../../src/layer2/digipeater.h
../../src/layer2/tundev.c
../../src/layer2/tundev.h
l2udptest_digipeater.c
)
target_link_libraries(
l2udptest_digipeater
fec
m
liquid
)

View file

@ -0,0 +1,473 @@
/*
* SPDX-License-Identifier: GPL-3.0-or-later
*
* Copyright (C) 2024 Thomas Kolb
*/
#include <linux/if.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <liquid/liquid.h>
#include <sys/poll.h>
#include <unistd.h>
#include <fcntl.h>
#include <poll.h>
#include <signal.h>
#include <errno.h>
#include "layer2/ham64.h"
#include "results.h"
#include "utils.h"
#define LOGGER_MODULE_NAME "main"
#include "logger.h"
#include "jsonlogger.h"
#include "debug_structs.h"
#include "layer2/connection.h"
#include "layer2/tundev.h"
#include "config.h"
#define RESULT_CHECK(stmt) { \
result_t res = stmt; \
if(res != OK) { \
LOG(LVL_FATAL, "Error %d in %s:%d!", res, __FILE__, __LINE__); \
exit(1); \
} \
}
#define BROADCAST_PORT 3737
static int m_tunfd = -1;
static bool m_running = true;
static int m_bcast_sock = -1;
static uint64_t next_tx_switch_time = 0;
static rx_stats_t m_rx_stats;
static connection_ctx_t l2conn;
static void signal_handler(int signal, siginfo_t *info, void *ctx)
{
(void)signal;
(void)info;
(void)ctx;
LOG(LVL_INFO, "\nGracefully shutting down on signal %d.", signal);
m_running = false;
}
static void block_tx_for(unsigned offset_ms)
{
next_tx_switch_time = get_hires_time() + HRTIME_MS(offset_ms);
}
static result_t transmit(const uint8_t *data, size_t len)
{
result_t result = OK;
struct sockaddr_in bcast_addr = {0};
bcast_addr.sin_family = AF_INET;
bcast_addr.sin_port = htons(BROADCAST_PORT);
bcast_addr.sin_addr.s_addr = INADDR_BROADCAST;
int ret = sendto(m_bcast_sock, data, len, 0, (struct sockaddr*)&bcast_addr, sizeof(bcast_addr));
if(ret < 0) {
LOG(LVL_ERR, "sendto: %s", strerror(errno));
exit(EXIT_FAILURE);
}
fprintf(stderr, "t");
return result;
}
void rx_data_to_tun(const layer2_data_packet_t *data_packet)
{
uint8_t tun_packet[4 + data_packet->payload_len];
// flags
tun_packet[0] = 0;
tun_packet[1] = 0;
switch(data_packet->payload_type) {
case L2_PAYLOAD_TYPE_IPV6:
*(uint16_t*)(tun_packet+2) = htons(0x86dd);
break;
case L2_PAYLOAD_TYPE_IPV4:
*(uint16_t*)(tun_packet+2) = htons(0x0800);
break;
default:
LOG(LVL_ERR, "Unsupported payload type: 0x%08x.", data_packet->payload_type);
return;
}
memcpy(tun_packet+4, data_packet->payload, data_packet->payload_len);
int ret = write(m_tunfd, tun_packet, sizeof(tun_packet));
if(ret < 0) {
LOG(LVL_ERR, "write(tun): %s", strerror(errno));
}
}
result_t connect_to_digipeater(connection_ctx_t *conn)
{
while(conn->conn_state != CONN_STATE_ESTABLISHED) {
uint8_t packetbuf[65536];
LOG(LVL_INFO, "Waiting for packets from digipeater (conn. state: %d)", conn->conn_state);
// note: recv() is blocking here
int ret = recv(m_bcast_sock, packetbuf, sizeof(packetbuf), 0);
if(ret < 0) {
LOG(LVL_ERR, "recv: %s", strerror(errno));
return ERR_SYSCALL;
} else if(ret == 0) {
LOG(LVL_ERR, "recv() returned zero.");
return ERR_SYSCALL;
}
layer2_data_packet_t data_packet;
result_t err_code = connection_handle_packet(conn, packetbuf, ret, &data_packet);
switch(err_code) {
case OK:
break; // state machine advanced and there are probably packets to send
case ERR_INTEGRITY:
case ERR_INVALID_STATE:
continue; // retry with the next packet
default:
LOG(LVL_ERR, "connection_handle_packet() returned %d.", err_code);
return err_code;
}
LOG(LVL_INFO, "Packet processed successfully (new conn. state: %d); Transmitting response (if any).", conn->conn_state);
// send any packets in the queue
while(connection_can_transmit(conn)) {
uint8_t packet_buf[2048];
size_t packet_size;
bool end_burst;
packet_size = connection_encode_next_packet(conn,
packet_buf, sizeof(packet_buf), &end_burst);
if(packet_size == 0) {
// no more packets available
LOG(LVL_DEBUG, "Ending burst due to empty packet queue.");
break;
}
LOG(LVL_DEBUG, "Adding packet with %d bytes to burst.", packet_size);
RESULT_CHECK(transmit(packet_buf, packet_size));
if(end_burst) {
LOG(LVL_DEBUG, "Ending burst on request.");
break;
}
}
}
return OK;
}
void conn_evt_cb(struct connection_ctx_s *conn, connection_evt_t evt, void *user_ctx)
{
(void)conn;
(void)evt;
(void)user_ctx;
}
int main(void)
{
// initialize the console logger
logger_init();
if(!jsonlogger_init("jsonlog.fifo")) {
LOG(LVL_FATAL, "Could not initialize JSON logger.");
return EXIT_FAILURE;
}
srand(get_hires_time());
// ** Initialize **
char devname[IFNAMSIZ] = "hamnet70";
m_tunfd = tundev_open(devname);
if(m_tunfd < 0) {
return 1;
}
// ** Set up signal handling
struct sigaction term_action = {0};
term_action.sa_sigaction = signal_handler;
if(sigaction(SIGTERM, &term_action, NULL) < 0) {
LOG(LVL_ERR, "sigaction: %s", strerror(errno));
exit(EXIT_FAILURE);
}
if(sigaction(SIGINT, &term_action, NULL) < 0) {
LOG(LVL_ERR, "sigaction: %s", strerror(errno));
exit(EXIT_FAILURE);
}
// ** Set up UDP socket
m_bcast_sock = socket(AF_INET, SOCK_DGRAM, 0);
if(m_bcast_sock < 0) {
LOG(LVL_ERR, "socket: %s", strerror(errno));
exit(EXIT_FAILURE);
}
int broadcastEnable=1;
int ret = setsockopt(m_bcast_sock, SOL_SOCKET, SO_BROADCAST, &broadcastEnable, sizeof(broadcastEnable));
if(ret < 0) {
LOG(LVL_ERR, "setsockopt: %s", strerror(errno));
exit(EXIT_FAILURE);
}
struct sockaddr_in bind_addr = {0};
bind_addr.sin_family = AF_INET;
bind_addr.sin_port = htons(BROADCAST_PORT);
bind_addr.sin_addr.s_addr = INADDR_ANY;
ret = bind(m_bcast_sock, (struct sockaddr*)&bind_addr, sizeof(bind_addr));
if(ret < 0) {
LOG(LVL_ERR, "bind: %s", strerror(errno));
exit(EXIT_FAILURE);
}
// ** Process packets **
struct pollfd pfd_tun;
memset(&pfd_tun, 0, sizeof(pfd_tun));
pfd_tun.fd = m_tunfd;
pfd_tun.events = POLLIN;
struct pollfd pfd_bcast;
memset(&pfd_bcast, 0, sizeof(pfd_bcast));
pfd_bcast.fd = m_bcast_sock;
pfd_bcast.events = POLLIN;
uint64_t old = get_hires_time();
size_t total_bytes = 0;
uint64_t next_stats_print_time = old + HRTIME_MS(500);
ham64_t my_address, peer_address;
ham64_encode(MY_CALL, &my_address);
ham64_encode(PEER_CALL, &peer_address);
RESULT_CHECK(connection_init(&l2conn, &my_address, &peer_address));
connection_setup_outgoing(&l2conn);
RESULT_CHECK(connect_to_digipeater(&l2conn));
while(m_running) {
connection_evt_t evt;
RESULT_CHECK(connection_maintain(&l2conn, &evt));
if(evt == CONN_EVT_TIMEOUT) {
LOG(LVL_ERR, "Connection timed out. Shutting down.");
break;
}
// fill the TX queue from the TUN device
while(connection_can_enqueue_packet(&l2conn)) {
int ret = poll(&pfd_tun, 1, 0 /* timeout */);
if(ret < 0) {
LOG(LVL_ERR, "poll: %s", strerror(errno));
break;
} else if(ret == 0) {
// no more packets
break;
} else {
// a packet is available -> move it to the queue
static const size_t packetbuf_size = 2048;
uint8_t packetbuf[packetbuf_size];
ret = read(m_tunfd, packetbuf, packetbuf_size);
if(ret < 0) {
LOG(LVL_ERR, "read: %s", strerror(errno));
return ERR_SYSCALL;
} else if(ret == 0) {
// no more data, should not happen
break;
} else if(ret < 4) {
LOG(LVL_ERR, "Not enough data from TUN read() to check packet type!");
return ERR_SYSCALL;
}
uint16_t flags = *(uint16_t*)packetbuf;
uint16_t proto = *((uint16_t*)packetbuf + 1);
LOG(LVL_DUMP, "TUN Flags: 0x%04x", flags);
LOG(LVL_DUMP, "TUN Proto: 0x%04x", proto);
uint8_t *packet_data = packetbuf + 4;
size_t packet_length = ret - 4;
if(proto != 0xdd86) {
LOG(LVL_WARN, "Non-IPv6 packet ignored. Proto: 0x%04x", proto);
continue;
}
RESULT_CHECK(connection_enqueue_data_packet(&l2conn, L2_PAYLOAD_TYPE_IPV6, packet_data, packet_length));
}
}
// transmit one burst
if(connection_can_transmit(&l2conn)) {
// there is a packet to be (re)transmitted.
LOG(LVL_DEBUG, "Starting new burst.");
size_t burst_len = 0;
// add packets to the burst until only 50000 samples remain free in the SDR buffer
while(true) {
uint8_t packet_buf[2048];
size_t packet_size;
bool end_burst;
packet_size = connection_encode_next_packet(&l2conn,
packet_buf, sizeof(packet_buf), &end_burst);
if(packet_size == 0) {
// no more packets available
LOG(LVL_DEBUG, "Ending burst due to empty packet queue.");
break;
}
LOG(LVL_DEBUG, "Adding packet with %d bytes to burst.", packet_size);
burst_len++;
RESULT_CHECK(transmit(packet_buf, packet_size));
if(end_burst) {
LOG(LVL_DEBUG, "Ending burst on request.");
break;
}
}
connection_tx_clean_empty_packet(&l2conn);
LOG(LVL_DEBUG, "Burst finished: %zd packets sent.", burst_len);
}
// make sure the receiver runs for a minimum amount of time
block_tx_for(TX_SWITCH_BACKOFF_AFTER_RX_ON);
while(get_hires_time() < next_tx_switch_time) {
// ** Receive signal **
int ret = poll(&pfd_bcast, 1, 10);
if(ret < 0) {
LOG(LVL_ERR, "poll: %s", strerror(errno));
return EXIT_FAILURE;
}
if(ret == 0) {
continue;
}
uint8_t packetbuf[65536];
ret = recv(m_bcast_sock, packetbuf, sizeof(packetbuf), 0);
if(ret < 0) {
LOG(LVL_ERR, "recv: %s", strerror(errno));
return EXIT_FAILURE;
} else if(ret == 0) {
LOG(LVL_ERR, "recv() returned zero.");
return EXIT_FAILURE;
}
block_tx_for(TX_SWITCH_BACKOFF_END_OF_PACKET_MS);
layer2_data_packet_t data_packet;
result_t result = connection_handle_packet(&l2conn, packetbuf, ret, &data_packet);
switch(result) {
case OK:
m_rx_stats.successful_decodes++;
if(data_packet.payload_len != 0) {
rx_data_to_tun(&data_packet);
}
break;
case ERR_INTEGRITY:
LOG(LVL_ERR, "Packet could not be decoded by Layer 2.");
m_rx_stats.failed_decodes++;
break;
case ERR_SEQUENCE:
LOG(LVL_ERR, "Packet not in the expected sequence.");
break;
case ERR_INVALID_STATE:
LOG(LVL_WARN, "Packet ignored due to invalid state.");
break;
default: // all other errors
LOG(LVL_ERR, "layer2_rx_handle_packet() returned error code %u.", result);
break;
}
total_bytes += ret;
uint64_t new = get_hires_time();
if(new >= next_stats_print_time) {
double rate = total_bytes * 1e9 / (new - old);
LOG(LVL_INFO, "\nEstimated rate: %.3f kB/s", rate / 1e3);
LOG(LVL_INFO, "Receiver statistics:");
LOG(LVL_INFO, " Preambles found: %8zd", m_rx_stats.preambles_found);
LOG(LVL_INFO, " Successful decodes: %8zd (%6.2f %%)",
m_rx_stats.successful_decodes, m_rx_stats.successful_decodes * 100.0f / m_rx_stats.preambles_found);
LOG(LVL_INFO, " Header errors: %8zd (%6.2f %%)",
m_rx_stats.header_errors, m_rx_stats.header_errors * 100.0f / m_rx_stats.preambles_found);
LOG(LVL_INFO, " Failed decodes: %8zd (%6.2f %%)",
m_rx_stats.failed_decodes, m_rx_stats.failed_decodes * 100.0f / m_rx_stats.preambles_found);
next_stats_print_time += HRTIME_MS(500);
total_bytes = 0;
old = new;
}
fprintf(stderr, "r");
}
}
// ** Cleanup **
close(m_bcast_sock);
connection_destroy(&l2conn);
jsonlogger_shutdown();
LOG(LVL_INFO, "Done.");
logger_shutdown();
}

View file

@ -0,0 +1,345 @@
/*
* SPDX-License-Identifier: GPL-3.0-or-later
*
* Copyright (C) 2024 Thomas Kolb
*/
#include <linux/if.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <liquid/liquid.h>
#include <sys/poll.h>
#include <unistd.h>
#include <fcntl.h>
#include <poll.h>
#include <signal.h>
#include <errno.h>
#include "layer2/ham64.h"
#include "utils.h"
#define LOGGER_MODULE_NAME "main"
#include "logger.h"
#include "jsonlogger.h"
#include "debug_structs.h"
#include "layer2/digipeater.h"
#include "layer2/tundev.h"
#include "config.h"
#define RESULT_CHECK(stmt) { \
result_t res = stmt; \
if(res != OK) { \
LOG(LVL_FATAL, "Error %d in %s:%d!", res, __FILE__, __LINE__); \
exit(1); \
} \
}
#define BROADCAST_PORT 3737
static int m_tunfd = -1;
static bool m_running = true;
static int m_bcast_sock = -1;
static uint64_t next_tx_switch_time = 0;
static rx_stats_t m_rx_stats;
static void signal_handler(int signal, siginfo_t *info, void *ctx)
{
(void)signal;
(void)info;
(void)ctx;
LOG(LVL_INFO, "\nGracefully shutting down on signal %d.", signal);
m_running = false;
}
static void block_tx_for(unsigned offset_ms)
{
next_tx_switch_time = get_hires_time() + HRTIME_MS(offset_ms);
}
static result_t transmit(const uint8_t *data, size_t len)
{
result_t result = OK;
struct sockaddr_in bcast_addr = {0};
bcast_addr.sin_family = AF_INET;
bcast_addr.sin_port = htons(BROADCAST_PORT);
bcast_addr.sin_addr.s_addr = INADDR_BROADCAST;
int ret = sendto(m_bcast_sock, data, len, 0, (struct sockaddr*)&bcast_addr, sizeof(bcast_addr));
if(ret < 0) {
LOG(LVL_ERR, "sendto: %s", strerror(errno));
exit(EXIT_FAILURE);
}
fprintf(stderr, "t");
return result;
}
void rx_data_to_tun(const layer2_data_packet_t *data_packet)
{
uint8_t tun_packet[4 + data_packet->payload_len];
// flags
tun_packet[0] = 0;
tun_packet[1] = 0;
switch(data_packet->payload_type) {
case L2_PAYLOAD_TYPE_IPV6:
*(uint16_t*)(tun_packet+2) = htons(0x86dd);
break;
case L2_PAYLOAD_TYPE_IPV4:
*(uint16_t*)(tun_packet+2) = htons(0x0800);
break;
default:
LOG(LVL_ERR, "Unsupported payload type: 0x%08x.", data_packet->payload_type);
return;
}
memcpy(tun_packet+4, data_packet->payload, data_packet->payload_len);
int ret = write(m_tunfd, tun_packet, sizeof(tun_packet));
if(ret < 0) {
LOG(LVL_ERR, "write(tun): %s", strerror(errno));
}
}
int main(void)
{
// initialize the console logger
logger_init();
if(!jsonlogger_init("jsonlog.fifo")) {
LOG(LVL_FATAL, "Could not initialize JSON logger.");
return EXIT_FAILURE;
}
srand(get_hires_time());
// ** Initialize **
char devname[IFNAMSIZ] = "hamnet70";
m_tunfd = tundev_open(devname);
if(m_tunfd < 0) {
return 1;
}
ham64_t my_address, peer_address;
ham64_encode(MY_CALL, &my_address);
ham64_encode(PEER_CALL, &peer_address);
digipeater_ctx_t digipeater;
RESULT_CHECK(digipeater_init(&digipeater, &my_address));
// ** Set up signal handling
struct sigaction term_action = {0};
term_action.sa_sigaction = signal_handler;
if(sigaction(SIGTERM, &term_action, NULL) < 0) {
LOG(LVL_ERR, "sigaction: %s", strerror(errno));
exit(EXIT_FAILURE);
}
if(sigaction(SIGINT, &term_action, NULL) < 0) {
LOG(LVL_ERR, "sigaction: %s", strerror(errno));
exit(EXIT_FAILURE);
}
// ** Set up UDP socket
m_bcast_sock = socket(AF_INET, SOCK_DGRAM, 0);
if(m_bcast_sock < 0) {
LOG(LVL_ERR, "socket: %s", strerror(errno));
exit(EXIT_FAILURE);
}
int broadcastEnable=1;
int ret = setsockopt(m_bcast_sock, SOL_SOCKET, SO_BROADCAST, &broadcastEnable, sizeof(broadcastEnable));
if(ret < 0) {
LOG(LVL_ERR, "setsockopt: %s", strerror(errno));
exit(EXIT_FAILURE);
}
struct sockaddr_in bind_addr = {0};
bind_addr.sin_family = AF_INET;
bind_addr.sin_port = htons(BROADCAST_PORT);
bind_addr.sin_addr.s_addr = INADDR_ANY;
ret = bind(m_bcast_sock, (struct sockaddr*)&bind_addr, sizeof(bind_addr));
if(ret < 0) {
LOG(LVL_ERR, "bind: %s", strerror(errno));
exit(EXIT_FAILURE);
}
// ** Process packets **
struct pollfd pfd_tun;
memset(&pfd_tun, 0, sizeof(pfd_tun));
pfd_tun.fd = m_tunfd;
pfd_tun.events = POLLIN;
struct pollfd pfd_bcast;
memset(&pfd_bcast, 0, sizeof(pfd_bcast));
pfd_bcast.fd = m_bcast_sock;
pfd_bcast.events = POLLIN;
uint64_t old = get_hires_time();
size_t total_bytes = 0;
uint64_t next_stats_print_time = old + HRTIME_MS(500);
while(m_running) {
RESULT_CHECK(digipeater_maintain(&digipeater));
RESULT_CHECK(digipeater_fill_packet_queues_from_tundev(&digipeater, m_tunfd));
// transmit anything available
if(digipeater_can_transmit(&digipeater)) {
// there is a packet to be (re)transmitted.
LOG(LVL_DEBUG, "Starting new burst.");
size_t burst_len = 0;
// add packets to the burst until only 50000 samples remain free in the SDR buffer
while(true) {
uint8_t packet_buf[2048];
size_t packet_size;
bool end_burst;
packet_size = digipeater_encode_next_packet(&digipeater,
packet_buf, sizeof(packet_buf), &end_burst);
if(packet_size == 0) {
// no more packets available
LOG(LVL_DEBUG, "Ending burst due to empty packet queue.");
break;
}
LOG(LVL_DEBUG, "Adding packet with %d bytes to burst.", packet_size);
burst_len++;
RESULT_CHECK(transmit(packet_buf, packet_size));
if(end_burst) {
LOG(LVL_DEBUG, "Ending burst on request.");
break;
}
}
LOG(LVL_DEBUG, "Burst finished: %zd packets sent.", burst_len);
}
// make sure the receiver runs for a minimum amount of time
block_tx_for(TX_SWITCH_BACKOFF_AFTER_RX_ON);
// receive response
while(get_hires_time() < next_tx_switch_time) {
// ** Receive packets from the broadcast socket **
int ret = poll(&pfd_bcast, 1, 10);
if(ret < 0) {
LOG(LVL_ERR, "poll: %s", strerror(errno));
return EXIT_FAILURE;
}
if(ret == 0) {
continue;
}
uint8_t packetbuf[65536];
ret = recv(m_bcast_sock, packetbuf, sizeof(packetbuf), 0);
if(ret < 0) {
LOG(LVL_ERR, "recv: %s", strerror(errno));
return EXIT_FAILURE;
} else if(ret == 0) {
LOG(LVL_ERR, "recv() returned zero.");
return EXIT_FAILURE;
}
block_tx_for(TX_SWITCH_BACKOFF_END_OF_PACKET_MS);
layer2_data_packet_t data_packet;
result_t result = digipeater_handle_packet(&digipeater, packetbuf, ret, &data_packet);
switch(result) {
case OK:
m_rx_stats.successful_decodes++;
if(data_packet.payload_len != 0) {
rx_data_to_tun(&data_packet);
}
break;
case ERR_INTEGRITY:
LOG(LVL_ERR, "Packet could not be decoded by Layer 2.");
m_rx_stats.failed_decodes++;
break;
case ERR_SEQUENCE:
LOG(LVL_ERR, "Packet not in the expected sequence.");
break;
default: // all other errors
LOG(LVL_ERR, "layer2_rx_handle_packet() returned error code %u.", result);
break;
}
total_bytes += ret;
uint64_t new = get_hires_time();
if(new >= next_stats_print_time) {
double rate = total_bytes * 1e9 / (new - old);
LOG(LVL_INFO, "\nEstimated rate: %.3f kB/s", rate / 1e3);
LOG(LVL_INFO, "Receiver statistics:");
LOG(LVL_INFO, " Preambles found: %8zd", m_rx_stats.preambles_found);
LOG(LVL_INFO, " Successful decodes: %8zd (%6.2f %%)",
m_rx_stats.successful_decodes, m_rx_stats.successful_decodes * 100.0f / m_rx_stats.preambles_found);
LOG(LVL_INFO, " Header errors: %8zd (%6.2f %%)",
m_rx_stats.header_errors, m_rx_stats.header_errors * 100.0f / m_rx_stats.preambles_found);
LOG(LVL_INFO, " Failed decodes: %8zd (%6.2f %%)",
m_rx_stats.failed_decodes, m_rx_stats.failed_decodes * 100.0f / m_rx_stats.preambles_found);
next_stats_print_time += HRTIME_MS(500);
total_bytes = 0;
old = new;
}
fprintf(stderr, "r");
}
}
// ** Cleanup **
close(m_bcast_sock);
digipeater_destroy(&digipeater);
jsonlogger_shutdown();
LOG(LVL_INFO, "Done.");
logger_shutdown();
}