WIP: Layer 2-Implementierung #6

Draft
thomas wants to merge 39 commits from layer2_dev into main
23 changed files with 2474 additions and 140 deletions

View file

@ -79,3 +79,4 @@ target_link_libraries(
)
add_subdirectory(test)
add_subdirectory(utils/ham64)

View file

@ -13,7 +13,7 @@ HOSTID="$1"
DEV=hamnet70
sudo ip tuntap add dev $DEV mode tun user $(whoami)
sudo ip tuntap add dev $DEV mode tun pi user $(whoami)
sudo ip link set dev $DEV txqueuelen 16 # default is 500
sudo ip link set dev $DEV up
sudo ip address add dev $DEV fd73::$HOSTID/64

View file

@ -12,12 +12,22 @@
/*** LAYER 2 CONFIG ***/
#define MY_CALL undefined // define MY_CALL to your call sign as a C string, e.g. "DL5TKL"
#define PEER_CALL "TESTPEER" // define PEER_CALL to the remote stations call sign as a C string, e.g. "DL5TKL-1"
#define RETRANSMIT_TIMEOUT_MS 1000
#define CONNECTION_TIMEOUT_MS 10000
#define BEACON_INTERVAL_MS 5000
#define MIN_INTERVAL_TIME_MS 10
#define IPV6_NET "fd70::" // the IPv6 "base address". The HAM64 address will be encoded into the lower 64 bits.
/*** TIMING CONFIG ***/
#define TX_SWITCH_BACKOFF_PREAMBLE_MS 42 // only relevant if packet cannot be decoded (maximum packet duration)
#define TX_SWITCH_BACKOFF_END_OF_PACKET_MS 5 // wait for 2*(preamble+rampup+rampdown duration)
#define TX_SWITCH_BACKOFF_AFTER_RX_ON 70 // time the transceiver must stay in RX mode (depends on the RX->TX switch time of the other station)
#define TX_SWITCH_BACKOFF_AFTER_RX_ON_MS 70 // time the transceiver must stay in RX mode (depends on the RX->TX switch time of the other station)
/*** LAYER 1 CONFIG ***/

View file

@ -1,15 +1,24 @@
#include "layer2/packet_structs.h"
#include <string.h>
#include <assert.h>
#define LOGGER_MODULE_NAME "conn"
#include "logger.h"
#include "connection.h"
#include "config.h"
#include "layer2/ham64.h"
#include "layer2/packet_queue.h"
#include "results.h"
#include "utils.h"
#define SEQ_NR_MASK 0xF
result_t connection_init(connection_ctx_t *ctx, const ham64_t *my_addr, const ham64_t *peer_addr)
result_t connection_init(
connection_ctx_t *ctx,
const ham64_t *my_addr,
const ham64_t *peer_addr)
{
ctx->last_acked_seq = 0;
ctx->next_expected_seq = 0;
@ -21,6 +30,31 @@ result_t connection_init(connection_ctx_t *ctx, const ham64_t *my_addr, const ha
ctx->my_addr = *my_addr;
ctx->peer_addr = *peer_addr;
uint64_t now = get_hires_time();
ctx->last_rx_time = now;
ctx->retransmit_time = 0;
// calculate IPv6 address
struct in6_addr net_addr;
inet_pton(AF_INET6, IPV6_NET, &net_addr);
memset(ctx->peer_ipv6_addr.s6_addr, 0, sizeof(ctx->peer_ipv6_addr));
memcpy(ctx->peer_ipv6_addr.s6_addr, net_addr.s6_addr, 8); // copy the network part
// fill the host part from the peers ham64 address. The bytes are filled in
// reverse order to make the „readable“ IPv6 address as short as possible.
for(uint8_t i = 0; i < peer_addr->length; i++) {
ctx->peer_ipv6_addr.s6_addr[15 - 2*i] = (peer_addr->addr[i] >> 8) & 0xFF;
ctx->peer_ipv6_addr.s6_addr[14 - 2*i] = (peer_addr->addr[i] >> 0) & 0xFF;
}
// print the address for debugging
char ipv6_str[INET6_ADDRSTRLEN];
char ham64_str[HAM64_FMT_MAX_LEN];
ham64_format(peer_addr, ham64_str);
inet_ntop(AF_INET6, &ctx->peer_ipv6_addr, ipv6_str, sizeof(ipv6_str));
LOG(LVL_DEBUG, "IPv6 address assigned to %s is %s.", ham64_str, ipv6_str);
ctx->conn_state = CONN_STATE_INITIALIZED;
return OK;
@ -38,7 +72,19 @@ void connection_destroy(connection_ctx_t *ctx)
}
result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, size_t buf_len)
void connection_setup_outgoing(connection_ctx_t *ctx)
{
ctx->conn_state = CONN_STATE_CONNECTING;
}
void connection_setup_incoming(connection_ctx_t *ctx)
{
ctx->conn_state = CONN_STATE_ESTABLISHED;
}
result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, size_t buf_len, layer2_data_packet_t *data_packet, bool *tx_req_rcvd)
{
// check the connection state
switch(ctx->conn_state) {
@ -54,6 +100,9 @@ result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, siz
break;
}
data_packet->payload_type = L2_PAYLOAD_TYPE_INVALID;
data_packet->payload_len = 0;
// check the CRC
size_t packet_size = buf_len - crc_sizeof_key(PAYLOAD_CRC_SCHEME);
@ -71,20 +120,13 @@ result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, siz
}
// check if the packet really should be handled by us
if(!ham64_is_equal(&header.src_addr, &ctx->peer_addr)) {
char fmt_src_addr[HAM64_FMT_MAX_LEN];
char fmt_peer_addr[HAM64_FMT_MAX_LEN];
ham64_format(&header.src_addr, fmt_src_addr);
ham64_format(&ctx->peer_addr, fmt_peer_addr);
LOG(LVL_ERR, "Packet has the wrong source address: got %s, expected %s",
fmt_src_addr, fmt_peer_addr);
return ERR_INVALID_ADDRESS;
if(ham64_is_equal(&ctx->my_addr, &header.src_addr)) {
LOG(LVL_DEBUG, "Packet is from ourselves. Ignored.");
return OK;
}
if(!ham64_is_equal(&header.dst_addr, &ctx->my_addr)) {
if(ham64_get_addr_type(&header.dst_addr) != HAM64_ADDR_TYPE_BROADCAST
&& !ham64_is_equal(&header.dst_addr, &ctx->my_addr)) {
char fmt_dst_addr[HAM64_FMT_MAX_LEN];
char fmt_my_addr[HAM64_FMT_MAX_LEN];
@ -97,50 +139,161 @@ result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, siz
return ERR_INVALID_ADDRESS;
}
LOG(LVL_DEBUG, "Handling %s packet with rx_seq_nr %u, tx_seq_nr %u.",
layer2_msg_type_to_string(header.msg_type), header.rx_seq_nr, header.tx_seq_nr);
size_t header_size = layer2_get_encoded_header_size(&header);
ctx->last_acked_seq = header.rx_seq_nr;
const uint8_t *payload = buf + header_size;
size_t payload_len = packet_size - header_size;
return connection_handle_packet_prechecked(ctx, &header, payload, payload_len, data_packet, tx_req_rcvd);
}
switch(header.msg_type) {
case L2_MSG_TYPE_EMPTY:
LOG(LVL_DEBUG, "Empty packet: accepted ACK for %u.", ctx->last_acked_seq);
return OK; // do not ACK and call back
static result_t handle_conn_mgmt(
connection_ctx_t *ctx,
const layer2_packet_header_t *header,
const uint8_t *payload, size_t payload_len)
{
(void)header;
case L2_MSG_TYPE_CONN_MGMT:
case L2_MSG_TYPE_CONNECTIONLESS:
LOG(LVL_WARN, "Message type %s is not implemented yet.", layer2_msg_type_to_string(header.msg_type));
return OK;
if(payload_len < 1) {
LOG(LVL_ERR, "Connection management packet without any payload is invalid.");
return ERR_INVALID_PARAM;
}
case L2_MSG_TYPE_DATA:
uint8_t packet_type = payload[0];
switch(packet_type) {
case CONN_MGMT_TYPE_BEACON:
if(ctx->conn_state == CONN_STATE_CONNECTING) {
LOG(LVL_INFO, "Received beacon; queueing connection request.");
// enqueue a connection request packet
layer2_packet_header_t conn_request_header;
conn_request_header.tx_request = true;
conn_request_header.dst_addr = ctx->peer_addr;
conn_request_header.src_addr = ctx->my_addr;
conn_request_header.msg_type = L2_MSG_TYPE_CONN_MGMT;
conn_request_header.rx_seq_nr = 0;
conn_request_header.tx_seq_nr = 0;
// create a persistent copy of the packet data.
uint8_t packetbuf[1];
packetbuf[0] = CONN_MGMT_TYPE_CONNECTION_REQUEST;
connection_enqueue_packet(ctx, &conn_request_header, packetbuf, 1);
} else {
LOG(LVL_WARN, "Beacons are ignored in states other than CONNECTING.");
return ERR_INVALID_STATE;
}
break;
case CONN_MGMT_TYPE_CONNECTION_PARAMETERS:
LOG(LVL_INFO, "Connection parameters received! -> connection established");
ctx->next_expected_seq = 1; // connection parameters are packet 0
ctx->conn_state = CONN_STATE_ESTABLISHED;
break;
default:
LOG(LVL_ERR, "Invalid message type %d.", header.msg_type);
LOG(LVL_WARN, "Ignored connection management type %d", packet_type);
break;
}
return OK;
}
result_t connection_handle_packet_prechecked(
connection_ctx_t *ctx,
const layer2_packet_header_t *header,
const uint8_t *payload, size_t payload_len,
layer2_data_packet_t *data_packet,
bool *tx_req_rcvd)
{
data_packet->payload_type = L2_PAYLOAD_TYPE_INVALID;
data_packet->payload_len = 0;
// check the connection state
switch(ctx->conn_state) {
case CONN_STATE_UNINITIALIZED:
case CONN_STATE_INITIALIZED:
case CONN_STATE_CLOSED:
LOG(LVL_ERR, "Trying to pass packet to connection in state %u", ctx->conn_state);
return ERR_INVALID_STATE;
case CONN_STATE_CONNECTING:
case CONN_STATE_ESTABLISHED:
// in these states, packets can be handled
break;
}
// check if this packet is from our designated peer
if(!ham64_is_equal(&header->src_addr, &ctx->peer_addr)) {
char fmt_src_addr[HAM64_FMT_MAX_LEN];
char fmt_peer_addr[HAM64_FMT_MAX_LEN];
ham64_format(&header->src_addr, fmt_src_addr);
ham64_format(&ctx->peer_addr, fmt_peer_addr);
LOG(LVL_ERR, "Packet has the wrong source address: got %s, expected %s",
fmt_src_addr, fmt_peer_addr);
return ERR_INVALID_ADDRESS;
}
LOG(LVL_DEBUG, "Handling %s packet with rx_seq_nr %u, tx_seq_nr %u.",
layer2_msg_type_to_string(header->msg_type), header->rx_seq_nr, header->tx_seq_nr);
ctx->last_acked_seq = header->rx_seq_nr;
*tx_req_rcvd = false;
switch(header->msg_type) {
case L2_MSG_TYPE_EMPTY:
LOG(LVL_DEBUG, "Empty packet: accepted ACK for %u.", ctx->last_acked_seq);
*tx_req_rcvd = header->tx_request;
// empty packets also reset the timeout timer
ctx->last_rx_time = get_hires_time();
// handle the acknowledgement internally
connection_handle_ack(ctx, header->rx_seq_nr, false);
return OK; // do not ACK
case L2_MSG_TYPE_CONN_MGMT:
return handle_conn_mgmt(ctx, header, payload, payload_len);
case L2_MSG_TYPE_CONNECTIONLESS:
LOG(LVL_WARN, "Message type %s is not implemented yet.", layer2_msg_type_to_string(header->msg_type));
return OK;
case L2_MSG_TYPE_DATA:
*tx_req_rcvd = header->tx_request;
break;
default:
LOG(LVL_ERR, "Invalid message type %d.", header->msg_type);
return ERR_INVALID_STATE;
}
if(ctx->next_expected_seq != header.tx_seq_nr) {
LOG(LVL_ERR, "Expected sequence number %u, received %u.", ctx->next_expected_seq, header.tx_seq_nr);
if(ctx->next_expected_seq != header->tx_seq_nr) {
LOG(LVL_ERR, "Expected sequence number %u, received %u.", ctx->next_expected_seq, header->tx_seq_nr);
return ERR_SEQUENCE;
}
ctx->next_expected_seq++;
ctx->next_expected_seq &= 0xF;
LOG(LVL_INFO, "Received ACK for seq_nr %u in packet seq_nr %u.", header.rx_seq_nr, header.tx_seq_nr);
ctx->last_rx_time = get_hires_time();
LOG(LVL_INFO, "Received ACK for seq_nr %u in packet seq_nr %u.", header->rx_seq_nr, header->tx_seq_nr);
// handle the acknowledgement internally
connection_handle_ack(ctx, header.rx_seq_nr);
connection_handle_ack(ctx, header->rx_seq_nr, true);
size_t header_size = layer2_get_encoded_header_size(&header);
// extract the payload and forward it to the tun device
const uint8_t *payload = buf + header_size;
size_t payload_len = packet_size - header_size;
ctx->data_cb(ctx, payload, payload_len);
// pass the decoded data back to the user
data_packet->payload_type = payload[0];
data_packet->payload = payload + 1;
data_packet->payload_len = payload_len - 1;
return OK;
}
@ -158,7 +311,43 @@ uint8_t connection_get_last_acked_seq(const connection_ctx_t *ctx)
}
result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len)
result_t connection_enqueue_packet(
connection_ctx_t *ctx,
const layer2_packet_header_t *header,
uint8_t *payload, size_t payload_len)
{
if(packet_queue_get_free_space(&ctx->packet_queue) == 0) {
return ERR_NO_MEM;
}
uint8_t *packetbuf = NULL;
if(payload) {
// create a persistent copy of the packet data.
// TODO: possibly this copy operation can be removed by passing a malloc'd buffer in.
packetbuf = malloc(payload_len);
if(!packetbuf) {
LOG(LVL_ERR, "malloc failed.");
return ERR_NO_MEM;
}
}
memcpy(packetbuf, payload, payload_len);
packet_queue_add(&ctx->packet_queue, header, packetbuf, payload_len);
LOG(LVL_INFO, "Added packet tx_seq %u to queue -> %zu entries",
header->tx_seq_nr, packet_queue_get_used_space(&ctx->packet_queue));
return OK;
}
result_t connection_enqueue_data_packet(
connection_ctx_t *ctx,
layer2_payload_type_t payload_type,
uint8_t *buf,
size_t buf_len)
{
// check the connection state
switch(ctx->conn_state) {
@ -176,10 +365,6 @@ result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t b
layer2_packet_header_t header;
if(packet_queue_get_free_space(&ctx->packet_queue) == 0) {
return ERR_NO_MEM;
}
header.dst_addr = ctx->peer_addr;
header.src_addr = ctx->my_addr;
header.msg_type = L2_MSG_TYPE_DATA;
@ -187,20 +372,12 @@ result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t b
header.tx_request = 0;
header.tx_seq_nr = ctx->next_seq_nr;
// create a persistent copy of the packet data.
// TODO: possibly this copy operation can be removed by passing a malloc'd buffer in.
uint8_t *packetbuf = malloc(buf_len);
if(!packetbuf) {
LOG(LVL_ERR, "malloc failed.");
return ERR_NO_MEM;
}
uint8_t packet_with_type[buf_len + 1];
memcpy(packetbuf, buf, buf_len);
packet_with_type[0] = (uint8_t)payload_type;
memcpy(packet_with_type+1, buf, buf_len);
packet_queue_add(&ctx->packet_queue, &header, packetbuf, buf_len);
LOG(LVL_INFO, "Added packet tx_seq %u to queue -> %zu entries",
header.tx_seq_nr, packet_queue_get_used_space(&ctx->packet_queue));
ERR_CHECK(connection_enqueue_packet(ctx, &header, packet_with_type, sizeof(packet_with_type)));
ctx->next_seq_nr++;
ctx->next_seq_nr &= SEQ_NR_MASK;
@ -209,6 +386,12 @@ result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t b
}
bool connection_can_enqueue_packet(const connection_ctx_t *ctx)
{
return packet_queue_get_free_space(&ctx->packet_queue) > 0;
}
result_t connection_add_empty_packet(connection_ctx_t *ctx, bool tx_request)
{
// check the connection state
@ -227,10 +410,8 @@ result_t connection_add_empty_packet(connection_ctx_t *ctx, bool tx_request)
layer2_packet_header_t header;
header.dst_addr.addr[0] = 0xFFFF;
header.dst_addr.length = 1;
header.src_addr.addr[0] = 0x0001;
header.src_addr.length = 1;
header.dst_addr = ctx->peer_addr;
header.src_addr = ctx->my_addr;
header.msg_type = L2_MSG_TYPE_EMPTY;
header.rx_seq_nr = 0; // will be filled in layer2_tx_encode_next_packet()
header.tx_seq_nr = 0; // not used in empty packets
@ -244,7 +425,51 @@ result_t connection_add_empty_packet(connection_ctx_t *ctx, bool tx_request)
}
size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t ack_seq_nr, uint8_t *buf, size_t buf_len)
result_t connection_send_parameters(connection_ctx_t *ctx)
{
// check the connection state
switch(ctx->conn_state) {
case CONN_STATE_UNINITIALIZED:
case CONN_STATE_CLOSED:
case CONN_STATE_CONNECTING:
case CONN_STATE_ESTABLISHED:
LOG(LVL_ERR, "Trying to send connection parameters in state %u", ctx->conn_state);
return ERR_INVALID_STATE;
case CONN_STATE_INITIALIZED:
// in these states, packets can be handled
break;
}
layer2_packet_header_t header;
header.dst_addr = ctx->peer_addr;
header.src_addr = ctx->my_addr;
header.msg_type = L2_MSG_TYPE_CONN_MGMT;
header.rx_seq_nr = 0; // will be filled in layer2_tx_encode_next_packet()
header.tx_seq_nr = ctx->next_seq_nr;
header.tx_request = 1;
size_t payload_len = 1;
uint8_t payload[payload_len];
payload[0] = CONN_MGMT_TYPE_CONNECTION_PARAMETERS;
// TODO: calculate IP addresses from clients HAM64 address
ERR_CHECK(connection_enqueue_packet(ctx, &header, payload, payload_len));
ctx->next_seq_nr++;
ctx->next_seq_nr &= SEQ_NR_MASK;
// connection is considered established after the connection parameters are sent.
ctx->conn_state = CONN_STATE_ESTABLISHED;
return OK;
}
size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len, bool *end_burst)
{
// check the connection state
switch(ctx->conn_state) {
@ -252,7 +477,7 @@ size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t ack_seq_nr,
case CONN_STATE_INITIALIZED:
case CONN_STATE_CLOSED:
LOG(LVL_ERR, "Trying to encode packet in inactive state %u", ctx->conn_state);
return ERR_INVALID_STATE;
return 0;
case CONN_STATE_CONNECTING:
case CONN_STATE_ESTABLISHED:
@ -267,32 +492,26 @@ size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t ack_seq_nr,
return 0;
}
unsigned int crc_size = crc_sizeof_key(PAYLOAD_CRC_SCHEME);
assert(buf_len >= LAYER2_PACKET_HEADER_ENCODED_SIZE_MAX + crc_size + entry->data_len);
layer2_packet_header_t header = entry->header;
header.rx_seq_nr = ack_seq_nr;
header.rx_seq_nr = ctx->next_expected_seq;
header.tx_request = (ctx->next_packet_index == (packet_queue_get_used_space(&ctx->packet_queue) - 1));
// encode the header
LOG(LVL_DEBUG, "Encoding packet with rx_seq_nr %u, tx_seq_nr %u.", header.rx_seq_nr, header.tx_seq_nr);
LOG(LVL_DEBUG, "Encoding packet with rx_seq_nr %u, tx_seq_nr %u, tx_request = (%u == %u -> %d).", header.rx_seq_nr, header.tx_seq_nr,
ctx->next_packet_index,
packet_queue_get_used_space(&ctx->packet_queue) - 1,
header.tx_request);
size_t packet_size = layer2_encode_packet_header(&header, buf);
// add the payload data
if(entry->data) {
memcpy(buf + packet_size, entry->data, entry->data_len);
size_t packet_size = layer2_encode_packet(&header, entry->data, entry->data_len, buf, buf_len);
if(packet_size == 0) {
LOG(LVL_ERR, "Buffer too small for encoded packet!");
return 0;
}
packet_size += entry->data_len;
// calculate CRC of everything and append it to the packet
crc_append_key(PAYLOAD_CRC_SCHEME, buf, packet_size);
packet_size += crc_size;
ctx->next_packet_index++;
*end_burst = header.tx_request;
return packet_size;
}
@ -318,7 +537,7 @@ void connection_tx_clean_empty_packet(connection_ctx_t *ctx)
}
void connection_handle_ack(connection_ctx_t *ctx, uint8_t acked_seq)
void connection_handle_ack(connection_ctx_t *ctx, uint8_t acked_seq, bool do_ack)
{
// check the connection state
switch(ctx->conn_state) {
@ -334,8 +553,6 @@ void connection_handle_ack(connection_ctx_t *ctx, uint8_t acked_seq)
break;
}
ctx->next_packet_index = 0;
size_t packets_to_remove = 0;
size_t packets_available = packet_queue_get_used_space(&ctx->packet_queue);
@ -349,13 +566,26 @@ void connection_handle_ack(connection_ctx_t *ctx, uint8_t acked_seq)
packets_to_remove++;
}
packet_queue_delete(&ctx->packet_queue, packets_to_remove);
if(packets_to_remove != 0) {
packet_queue_delete(&ctx->packet_queue, packets_to_remove);
packets_available = packet_queue_get_used_space(&ctx->packet_queue);
// send the next requested packet (all previous ones were deleted above).
if(ctx->next_packet_index >= packets_to_remove) {
ctx->next_packet_index -= packets_to_remove;
} else {
ctx->next_packet_index = 0;
}
LOG(LVL_DEBUG, "handling ack for seq_nr %u, removing %zu packets, %zu packets remaining.", acked_seq, packets_to_remove, packets_available);
packets_available = packet_queue_get_used_space(&ctx->packet_queue);
if(packets_available == 0) {
LOG(LVL_DEBUG, "handling ack for seq_nr %u, removed %zu packets, %zu packets remaining.", acked_seq, packets_to_remove, packets_available);
ctx->retransmit_time = get_hires_time() + HRTIME_MS(RETRANSMIT_TIMEOUT_MS);
} else {
LOG(LVL_DEBUG, "duplicate ACK for seq_nr %u", acked_seq);
}
if(do_ack && packets_available == 0) {
// no packets left in queue, but an acknowledgement must be
// transmitted. Add an empty packet to do that.
result_t res = connection_add_empty_packet(ctx, false);
@ -373,3 +603,55 @@ bool connection_can_transmit(const connection_ctx_t *ctx)
return (packet_queue_get_used_space(&ctx->packet_queue) != 0)
&& (packet_queue_get(&ctx->packet_queue, ctx->next_packet_index) != NULL);
}
bool connection_is_closed(const connection_ctx_t *ctx)
{
switch(ctx->conn_state) {
case CONN_STATE_UNINITIALIZED:
case CONN_STATE_INITIALIZED:
case CONN_STATE_CLOSED:
return true;
case CONN_STATE_CONNECTING:
case CONN_STATE_ESTABLISHED:
return false;
}
return true;
}
result_t connection_maintain(connection_ctx_t *ctx, connection_evt_t *evt)
{
uint64_t now = get_hires_time();
if(now > ctx->last_rx_time + HRTIME_MS(CONNECTION_TIMEOUT_MS)) {
LOG(LVL_INFO, "Connection timed out.");
ctx->conn_state = CONN_STATE_CLOSED;
*evt = CONN_EVT_TIMEOUT;
return OK;
}
if(ctx->retransmit_time != 0 && now >= ctx->retransmit_time) {
LOG(LVL_INFO, "Retransmit triggered.");
ctx->retransmit_time = 0;
connection_restart_tx(ctx);
*evt = CONN_EVT_RETRANSMIT;
return OK;
}
return OK;
}
bool connection_has_ipv6_peer_address(connection_ctx_t *ctx, const uint8_t *address_to_check)
{
for(size_t i = 0; i < 16; i++) {
if(address_to_check[i] != ctx->peer_ipv6_addr.s6_addr[i]) {
return false;
}
}
return true;
}

View file

@ -14,6 +14,8 @@
#include "packet_queue.h"
#include <arpa/inet.h>
struct connection_ctx_s;
typedef enum {
@ -24,17 +26,20 @@ typedef enum {
CONN_STATE_CLOSED //!< Connection has been closed (gracefully or by timeout)
} connection_state_t;
/*!\brief Type for a callback function that is called when a data packet was received. */
typedef void (*connection_data_callback_t)(struct connection_ctx_s *conn, const uint8_t *data, size_t len);
typedef enum {
CONN_EVT_NONE, //!< No event has occurred
CONN_EVT_TIMEOUT, //!< The connection timed out because no packets were received
CONN_EVT_RETRANSMIT, //!< Packet queue transmission is restarted
} connection_evt_t;
typedef struct connection_ctx_s {
connection_state_t conn_state; //!< State of the connection.
connection_data_callback_t data_cb; //!< Callback function for received data packets.
ham64_t my_addr; //!< The local link layer address.
ham64_t peer_addr; //!< The link layer address of the peer.
struct in6_addr peer_ipv6_addr; //!< The peers IPv6 address (generated from LL address)
uint8_t last_acked_seq; //!< Next sequence number expected by the peer (from last Ack).
uint8_t next_expected_seq; //!< Next sequence number expected by us.
@ -42,6 +47,9 @@ typedef struct connection_ctx_s {
size_t next_packet_index; //!< Index in the packet queue of the next packet to transmit.
uint8_t next_seq_nr; //!< Sequence number to tag the next transmitted packet with.
uint64_t retransmit_time; //!< Time when a retransmit shall be triggered.
uint64_t last_rx_time; //!< Time when a packet was last received and decoded.
} connection_ctx_t;
@ -52,20 +60,66 @@ typedef struct connection_ctx_s {
* \param peer_addr The remote link layer address.
* \returns OK if everything worked or a fitting error code.
*/
result_t connection_init(connection_ctx_t *ctx, const ham64_t *my_addr, const ham64_t *peer_addr);
result_t connection_init(
connection_ctx_t *ctx,
const ham64_t *my_addr,
const ham64_t *peer_addr);
/*!\brief Destroy the given layer 2 connection context.
*/
void connection_destroy(connection_ctx_t *ctx);
/*!\brief Set up a outgoing connection.
*
* This puts the connection into CONN_STATE_CONNECTING state, which causes
* beacons to be handled.
*/
void connection_setup_outgoing(connection_ctx_t *ctx);
/*!\brief Set up an incoming connection.
*
* As this function is intended to be called after a connection request was
* handled, it puts the connection directly into CONN_STATE_ESTABLISHED state.
*/
void connection_setup_incoming(connection_ctx_t *ctx);
/*!\brief Handle a received packet.
*
* \param ctx The connection context.
* \param buf Pointer to the packet data.
* \param buf_len Length of the packet.
* \param[inout] ctx The connection context.
* \param[in] buf Pointer to the packet data.
* \param[in] buf_len Length of the packet.
* \param[out] data_packet Structure will be filled with a received data packet.
* \param[out] tx_req_rcvd Was the tx_request flag set in the decoded packet.
* \returns A result code from the packet handling procedure.
*/
result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, size_t buf_len);
result_t connection_handle_packet(
connection_ctx_t *ctx,
const uint8_t *buf,
size_t buf_len,
layer2_data_packet_t *data_packet,
bool *tx_req_rcvd);
/*!\brief Handle a received packet where the header has already been decoded.
*
* This function assumes that the following basic checks were already done:
* - CRC is correct
* - Header can be decoded
* - Destination address is the local address
*
* \param[inout] ctx The connection context.
* \param[in] header Pointer to the decoded header structure.
* \param[in] payload Pointer to the payload data.
* \param[in] payload_len Length of the payload data.
* \param[out] data_packet Structure will be filled with a received data packet.
* \param[out] tx_req_rcvd Was the tx_request flag set in the decoded packet.
* \returns A result code from the packet handling procedure.
*/
result_t connection_handle_packet_prechecked(
connection_ctx_t *ctx,
const layer2_packet_header_t *header,
const uint8_t *payload, size_t payload_len,
layer2_data_packet_t *data_packet,
bool *tx_req_rcvd);
/*!\brief Return the sequence number expected next by our side.
*/
@ -76,9 +130,32 @@ uint8_t connection_get_next_expected_seq(const connection_ctx_t *ctx);
uint8_t connection_get_last_acked_seq(const connection_ctx_t *ctx);
/*!\brief Enqueue a packet for transmission.
* \param ctx The connection context.
* \param header Pointer to the packet header.
* \param payload Pointer to the payload.
* \param payload_len Length of the payload.
*/
result_t connection_enqueue_packet(
connection_ctx_t *ctx,
const layer2_packet_header_t *header,
uint8_t *payload, size_t payload_len);
/*!\brief Enqueue a data packet for transmission.
* \param ctx The connection context.
* \param payload_type Type of the payload.
* \param buf Pointer to the data buffer.
* \param buf_len Length of the data.
*/
result_t connection_enqueue_data_packet(
connection_ctx_t *ctx,
layer2_payload_type_t payload_type,
uint8_t *buf,
size_t buf_len);
/*!\brief Check if there is free space in the TX packet queue.
* \param ctx The connection context.
*/
result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len);
bool connection_can_enqueue_packet(const connection_ctx_t *ctx);
/*!\brief Add an empty packet to ensure an acknowledgement is sent.
* \param ctx The connection context.
@ -86,6 +163,14 @@ result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t b
*/
result_t connection_add_empty_packet(connection_ctx_t *ctx, bool tx_request);
/*!\brief Send connection parameters.
*
* This packet accepts an incoming connection request.
*
* \param ctx The connection context.
*/
result_t connection_send_parameters(connection_ctx_t *ctx);
/*!\brief Encode the next packet for transmission.
*
* \note
@ -94,12 +179,12 @@ result_t connection_add_empty_packet(connection_ctx_t *ctx, bool tx_request);
* called to handle retransmits correctly.
*
* \param ctx The connection context.
* \param ack_seq_nr The received sequence number to send as an acknowledgement.
* \param buf Where to write the encoded packet data.
* \param buf_len Space available in the buffer.
* \param end_burst Output parameter that is set to true if this is the last packet in a burst.
* \returns The number of bytes written to buf or zero if no packet was available.
*/
size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t ack_seq_nr, uint8_t *buf, size_t buf_len);
size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len, bool *end_burst);
/*!\brief Restart the transmission from the beginning of the packet queue.
*/
@ -117,10 +202,30 @@ void connection_tx_clean_empty_packet(connection_ctx_t *ctx);
* \param acked_seq The acknowledged (= next expected) sequence number.
* \param do_ack Whether an empty packet shall be generated if the queue is empty.
*/
void connection_handle_ack(connection_ctx_t *ctx, uint8_t acked_seq);
void connection_handle_ack(connection_ctx_t *ctx, uint8_t acked_seq, bool do_ack);
/*!\brief Check if there are packets queued for transmission.
*/
bool connection_can_transmit(const connection_ctx_t *ctx);
/*!\brief Check if this connection is closed.
*/
bool connection_is_closed(const connection_ctx_t *ctx);
/*!\brief Handle internal maintenance tasks.
*
* This should be called periodically to handle timeouts and retransmissions.
*
* \param ctx[in] The connection context.
* \param evt[out] Set to an event that occurred, or CONN_EVT_NONE.
*/
result_t connection_maintain(connection_ctx_t *ctx, connection_evt_t *evt);
/*!\brief Check if the given IPv6 peer address belongs to this connection.
*
* \param address_to_check Pointer to the IPv6 address to check. Must point to 16 bytes of data.
* \returns True if this address is handled by this connection, false otherwise.
*/
bool connection_has_ipv6_peer_address(connection_ctx_t *ctx, const uint8_t *address_to_check);
#endif // CONNECTION_H

View file

@ -0,0 +1,241 @@
#include <string.h>
#include <assert.h>
#define LOGGER_MODULE_NAME "clist"
#include "logger.h"
#include "connection_list.h"
#include "layer2/connection.h"
#include "results.h"
#include "utils.h"
#define SEQ_NR_MASK 0xF
static void destroy_entry(connection_list_entry_t *entry)
{
connection_destroy(&entry->connection);
free(entry);
}
// find the location where the timestamp should be inserted to maintain
// ascending timestamp order and return the then-previous entry.
//
// If NULL is returned, the list is either empty or the entry should be
// inserted before the head. This must be checked externally.
static connection_list_entry_t* find_prev_for_timestamp(connection_list_t *list, uint64_t timestamp)
{
if(!list->head) {
return NULL;
}
if(timestamp < list->head->next_access_time) {
return NULL;
}
connection_list_entry_t *prev = list->head;
while(prev->next) {
if(prev->next_access_time < timestamp
&& prev->next->next_access_time >= timestamp) {
// location found!
break;
}
prev = prev->next;
}
// either the correct location was found or prev now points to the last entry
// in the list.
return prev;
}
result_t connection_list_init(connection_list_t *list)
{
list->head = NULL;
return OK;
}
void connection_list_destroy(connection_list_t *list)
{
// delete all list entries
while(list->head) {
connection_list_entry_t *next = list->head->next;
destroy_entry(list->head);
list->head = next;
}
}
result_t connection_list_insert(connection_list_t *list, const connection_ctx_t *conn, uint64_t next_access_time)
{
connection_list_entry_t *new_entry = malloc(sizeof(connection_list_entry_t));
if(!new_entry) {
return ERR_NO_MEM;
}
new_entry->connection = *conn;
new_entry->next_access_time = next_access_time;
connection_list_entry_t *prev = find_prev_for_timestamp(list, next_access_time);
if(!prev) {
// the new entry should become the lists head (also works if the list is empty!)
new_entry->next = list->head;
list->head = new_entry;
} else {
// insert after prev
new_entry->next = prev->next;
prev->next = new_entry;
}
return OK;
}
connection_list_entry_t* connection_list_get_head(connection_list_t *list)
{
return list->head;
}
result_t connection_list_reschedule_head(connection_list_t *list, uint64_t next_access_timestamp)
{
if(!list->head) {
return ERR_INVALID_STATE;
}
if(!list->head->next) {
// nothing to do because there is only one entry.
return OK;
}
if(next_access_timestamp < list->head->next->next_access_time) {
// the head does not need to be moved because the new timestamp is still
// smaller than that of the second entry => only update the timestamp.
list->head->next_access_time = next_access_timestamp;
return OK;
}
// detach the entry from the list
connection_list_entry_t *reloc_entry = list->head;
list->head = list->head->next;
// find the location where the entry should be reinserted
connection_list_entry_t *prev = find_prev_for_timestamp(list, next_access_timestamp);
if(!prev) {
// the relocated entry should become the lists head (also works if the list is empty!)
reloc_entry->next = list->head;
list->head = reloc_entry;
} else {
// insert after prev
reloc_entry->next = prev->next;
prev->next = reloc_entry;
}
return OK;
}
result_t connection_list_delete_head(connection_list_t *list)
{
if(!list->head) {
return ERR_INVALID_STATE;
}
connection_list_entry_t *new_head = list->head->next;
destroy_entry(list->head);
list->head = new_head;
return OK;
}
result_t connection_list_delete_closed(connection_list_t *list)
{
if(!list->head) {
return OK;
}
connection_list_entry_t *prev = NULL;
connection_list_entry_t *cur = list->head;
while(cur) {
if(connection_is_closed(&cur->connection)) {
connection_list_entry_t *to_delete = cur;
cur = cur->next;
if(prev) {
prev->next = to_delete->next;
} else {
list->head = to_delete->next;
}
destroy_entry(to_delete);
} else {
prev = cur;
cur = cur->next;
}
}
return OK;
}
result_t connection_list_enqueue_packet(connection_list_t *list, uint8_t *data, size_t data_len)
{
if(data_len < 40) {
// packet not large enough for an IPv6 header
LOG(LVL_DEBUG, "Packet size too small: %zu bytes given, 40 bytes needed.", data_len);
return ERR_INVALID_PARAM;
}
uint8_t version = data[0] >> 4;
if(version != 6) {
LOG(LVL_DEBUG, "IP version (%i) is not 6.", version);
return ERR_INVALID_PARAM;
}
uint8_t *dest_addr = data + 24;
// search the list for the destination address
connection_list_entry_t *ptr = list->head;
while(ptr) {
if(connection_has_ipv6_peer_address(&ptr->connection, dest_addr)) {
// found it!
break;
}
ptr = ptr->next;
}
if(!ptr) {
// address not found in any connection
return ERR_INVALID_ADDRESS;
}
return connection_enqueue_data_packet(&ptr->connection, L2_PAYLOAD_TYPE_IPV6, data, data_len);
}
bool connection_list_can_enqueue_packet(connection_list_t *list)
{
if(!list->head) {
return false; // no entries -> no free space
}
connection_list_entry_t *entry = list->head;
while(entry) {
if(!connection_can_enqueue_packet(&entry->connection)) {
return false;
}
entry = entry->next;
}
return true;
}

View file

@ -0,0 +1,109 @@
/*
* This file contains functions to manage a list of layer 2 connections for
* scheduling purposes.
*
* SPDX-License-Identifier: GPL-3.0-or-later
*
* Copyright (C) 2024 Thomas Kolb
*/
#ifndef CONNECTION_LIST_H
#define CONNECTION_LIST_H
#include <results.h>
#include <stdbool.h>
#include "connection.h"
typedef struct connection_list_entry_s {
uint64_t next_access_time; //!< When to next activate this connection
connection_ctx_t connection; //!< The actual connection entry
struct connection_list_entry_s *next; //!< pointer to the next list element
} connection_list_entry_t;
typedef struct connection_list_s {
connection_list_entry_t *head; //!< pointer to the first list element
} connection_list_t;
/*!\brief Initialize a connection list.
*
* \param list The list to initialize.
* \returns OK if everything worked or a fitting error code.
*/
result_t connection_list_init(connection_list_t *list);
/*!\brief Destroy the given layer 2 connection list, deleting all internal data.
*/
void connection_list_destroy(connection_list_t *list);
/*!\brief Insert a connection context into the list.
*
* The connection context will be copied internally. The original copy should
* no longer be used after the context was inserted into the list.
*
* \param list Pointer to the list where the entry shall be inserted.
* \param conn Pointer to the connection context.
* \param next_access_time Timestamp when this connection should be activated.
* \returns OK if everything worked or a fitting error code.
*/
result_t connection_list_insert(connection_list_t *list, const connection_ctx_t *conn, uint64_t next_access_time);
/*!\brief Get the head (first entry) of the list.
*
* \param list Pointer to the list to access.
*
* \returns A pointer to the head entry or NULL if the list is empty.
*/
connection_list_entry_t* connection_list_get_head(connection_list_t *list);
/*!\brief Re-schedule the current head entry.
*
* \param list Pointer to the list to access.
* \param next_access_time Timestamp when this connection should be activated again.
*
* \returns OK if everything worked or a fitting error code.
*/
result_t connection_list_reschedule_head(connection_list_t *list, uint64_t next_access_timestamp);
/*!\brief Delete the current head entry.
*
* \param list Pointer to the list to access.
*
* \returns OK if everything worked or a fitting error code.
*/
result_t connection_list_delete_head(connection_list_t *list);
/*!\brief Delete all connections that are in a closed state.
*
* \param list Pointer to the list to clean.
*
* \returns OK if everything worked or a fitting error code.
*/
result_t connection_list_delete_closed(connection_list_t *list);
/*!\brief Insert a packet in the appropriate connections queue.
*
* The appropriate connection is selected by extracting the destination IP
* address from the packet and comparing it to the peer address of the
* connection.
*
* \param list Pointer to the connection list to operate on.
* \param packet Pointer to the raw packet data (no TUNTAP header!).
* \param packet_len Length of the packet data.
*
* \retval OK if the packet was inserted.
* \retval ERR_NO_MEM if the target queue was full.
* \retval ERR_INVALID_ADDRESS if the destination address does not match any connection.
* \retval ERR_INVALID_PARAM if the packet format is not supported.
*/
result_t connection_list_enqueue_packet(connection_list_t *list, uint8_t *data, size_t data_len);
/*!\brief Determine if a packet can be enqueued in all queues.
*
* \returns False if any queue is full, true otherwise.
*/
bool connection_list_can_enqueue_packet(connection_list_t *list);
#endif // CONNECTION_LIST_H

View file

@ -0,0 +1,386 @@
#include <poll.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#define LOGGER_MODULE_NAME "digi"
#include "logger.h"
#include "digipeater.h"
#include "config.h"
#include "layer2/connection.h"
#include "layer2/connection_list.h"
#include "layer2/ham64.h"
#include "layer2/packet_structs.h"
#include "layer2/packet_queue.h"
#include "results.h"
#include "utils.h"
static result_t digipeater_handle_beacon_responses(digipeater_ctx_t *ctx, const layer2_packet_header_t *header, const uint8_t *buf, size_t buf_len)
{
LOG(LVL_DEBUG, "Handling beacon response packet.");
layer2_dump_packet_header(LVL_DUMP, header);
if(header->msg_type != L2_MSG_TYPE_CONN_MGMT) {
LOG(LVL_ERR, "Beacon response with invalid message type %i", header->msg_type);
return ERR_INVALID_PARAM;
}
if(buf_len == 0) {
LOG(LVL_ERR, "Missing payload in beacon response");
return ERR_INVALID_PARAM;
}
uint8_t conn_mgmt_type = buf[0];
if(conn_mgmt_type != CONN_MGMT_TYPE_CONNECTION_REQUEST) {
LOG(LVL_ERR, "Unexpected connection management type in beacon response: 0x%02x", conn_mgmt_type);
return ERR_INVALID_PARAM;
}
// packet is valid -> create a new connection, enqueue the parameters message
// and add it at the beginning of the connection list.
connection_ctx_t new_conn;
ERR_CHECK(connection_init(&new_conn, &ctx->my_addr, &header->src_addr));
ERR_CHECK(connection_send_parameters(&new_conn));
uint64_t now = get_hires_time();
ERR_CHECK(connection_list_insert(&ctx->conn_list, &new_conn, now));
return OK;
}
static size_t encode_beacon_packet(digipeater_ctx_t *ctx, uint8_t *buf, size_t buf_len)
{
// build a beacon packet
layer2_packet_header_t header;
ham64_t broadcast = {{0xFFFF, 0, 0, 0}, 1};
header.dst_addr = broadcast;
header.src_addr = ctx->my_addr;
header.msg_type = L2_MSG_TYPE_CONN_MGMT;
header.rx_seq_nr = 0; // unused
header.tx_seq_nr = 0; // unused
header.tx_request = 1;
uint8_t payload[1] = {CONN_MGMT_TYPE_BEACON};
return layer2_encode_packet(&header, payload, 1, buf, buf_len);
}
result_t digipeater_init(digipeater_ctx_t *ctx, const ham64_t *my_addr)
{
ctx->my_addr = *my_addr;
ctx->state = DIGIPEATER_STATE_CONN;
packet_queue_init(&ctx->oneshot_queue);
uint64_t now = get_hires_time();
ctx->next_beacon_time = now + HRTIME_MS(BEACON_INTERVAL_MS);
ctx->cycle_end_time = now;
return connection_list_init(&ctx->conn_list);
}
void digipeater_destroy(digipeater_ctx_t *ctx)
{
connection_list_destroy(&ctx->conn_list);
}
result_t digipeater_handle_packet(
digipeater_ctx_t *ctx,
const uint8_t *buf,
size_t buf_len,
layer2_data_packet_t *data_packet)
{
data_packet->payload_type = L2_PAYLOAD_TYPE_INVALID;
data_packet->payload_len = 0;
// check the CRC
size_t packet_size = buf_len - crc_sizeof_key(PAYLOAD_CRC_SCHEME);
if(!crc_check_key(PAYLOAD_CRC_SCHEME, (unsigned char*)buf, packet_size)) {
LOG(LVL_ERR, "CRC check failed!");
return ERR_INTEGRITY;
}
// decode the header
layer2_packet_header_t header;
if(!layer2_decode_packet_header(buf, buf_len, &header)) {
LOG(LVL_ERR, "Header could not be decoded!");
return ERR_INTEGRITY;
}
layer2_dump_packet_header(LVL_DUMP, &header);
// check if the packet really should be handled by us
if(ham64_is_equal(&ctx->my_addr, &header.src_addr)) {
LOG(LVL_DEBUG, "Packet is from ourselves. Ignored.");
return OK;
}
if(!ham64_is_equal(&header.dst_addr, &ctx->my_addr)) {
char fmt_dst_addr[HAM64_FMT_MAX_LEN];
char fmt_my_addr[HAM64_FMT_MAX_LEN];
ham64_format(&header.dst_addr, fmt_dst_addr);
ham64_format(&ctx->my_addr, fmt_my_addr);
LOG(LVL_ERR, "Packet has the wrong destination address: got %s, expected %s",
fmt_dst_addr, fmt_my_addr);
return ERR_INVALID_ADDRESS;
}
// FIXME: handle connection management packets here (or somewhere else):
// - Disconnect Request
size_t header_size = layer2_get_encoded_header_size(&header);
const uint8_t *payload = buf + header_size;
size_t payload_len = packet_size - header_size;
result_t result = OK;
switch(ctx->state) {
case DIGIPEATER_STATE_CONN:
{
connection_list_entry_t *head = connection_list_get_head(&ctx->conn_list);
if(head) {
connection_ctx_t *current_conn = &head->connection;
bool tx_request_received = false;
result = connection_handle_packet_prechecked(
current_conn, &header, payload, payload_len, data_packet, &tx_request_received);
} else {
LOG(LVL_WARN, "Digipeater in CONN state, but there is no active connection! Packet dropped.");
result = OK;
}
}
break;
case DIGIPEATER_STATE_BEACON:
result = digipeater_handle_beacon_responses(ctx, &header, payload, payload_len);
}
// end the current cycle if tx_request is set in an incoming packet
if(header.tx_request) {
LOG(LVL_INFO, "TX Request was received. Ending current cycle.");
digipeater_end_cycle(ctx);
}
return result;
}
result_t digipeater_fill_packet_queues_from_tundev(digipeater_ctx_t *ctx, int tun_fd)
{
// first check if any queue is already full, so we don't have to drop packets
// from the TUN device queue.
if(!connection_list_can_enqueue_packet(&ctx->conn_list)) {
LOG(LVL_DEBUG, "No connection or no free space in queues.");
return OK; // do nothing
}
struct pollfd pfd_tun;
memset(&pfd_tun, 0, sizeof(pfd_tun));
pfd_tun.fd = tun_fd;
pfd_tun.events = POLLIN;
while(connection_list_can_enqueue_packet(&ctx->conn_list)) {
int ret = poll(&pfd_tun, 1, 0 /* timeout */);
if(ret < 0) {
LOG(LVL_ERR, "poll: %s", strerror(errno));
return ERR_SYSCALL;
} else if(ret == 0) {
// no more packets
break;
} else {
// a packet is available -> read it
static const size_t packetbuf_size = 2048;
uint8_t packetbuf[packetbuf_size];
ret = read(tun_fd, packetbuf, packetbuf_size);
if(ret < 0) {
LOG(LVL_ERR, "read: %s", strerror(errno));
return ERR_SYSCALL;
} else if(ret == 0) {
// no more data, should not happen
break;
} else if(ret < 4) {
LOG(LVL_ERR, "Not enough data from TUN read() to check packet type!");
return ERR_SYSCALL;
}
uint16_t flags = *(uint16_t*)packetbuf;
uint16_t proto = *((uint16_t*)packetbuf + 1);
LOG(LVL_DUMP, "TUN Flags: 0x%04x", flags);
LOG(LVL_DUMP, "TUN Proto: 0x%04x", proto);
uint8_t *packet_data = packetbuf + 4;
size_t packet_length = ret - 4;
// note: octets are swapped in case statements
switch(proto) {
case 0xdd86: // IPv6
WARN_ON_ERR(connection_list_enqueue_packet(&ctx->conn_list, packet_data, packet_length));
break;
default:
LOG(LVL_WARN, "Unsupported Protocol 0x%04x. Packet dropped.", proto);
continue;
}
}
}
return OK;
}
size_t digipeater_encode_next_packet(digipeater_ctx_t *ctx, uint8_t *buf, size_t buf_len, bool *end_burst)
{
size_t packet_size = 0;
*end_burst = false;
// send packets from the one-shot queue
const packet_queue_entry_t *queue_entry = packet_queue_get(&ctx->oneshot_queue, 0);
if(queue_entry) {
*end_burst = queue_entry->header.tx_request == 1;
packet_size = layer2_encode_packet(&queue_entry->header, queue_entry->data, queue_entry->data_len, buf, buf_len);
if(packet_size) {
packet_queue_delete(&ctx->oneshot_queue, 1);
return packet_size;
}
}
switch(ctx->state) {
case DIGIPEATER_STATE_BEACON:
packet_size = encode_beacon_packet(ctx, buf, buf_len);
ctx->next_beacon_time += HRTIME_MS(BEACON_INTERVAL_MS);
*end_burst = true;
break;
case DIGIPEATER_STATE_CONN: {
// pull packets from the current connection
connection_list_entry_t *head = connection_list_get_head(&ctx->conn_list);
if(!head) {
return 0;
}
connection_ctx_t *conn = &head->connection;
if(connection_can_transmit(conn)) {
packet_size = connection_encode_next_packet(conn, buf, buf_len, end_burst);
if(*end_burst) {
connection_tx_clean_empty_packet(conn);
}
ctx->state = DIGIPEATER_STATE_CONN;
}
}
break;
}
return packet_size;
}
bool digipeater_can_transmit(digipeater_ctx_t *ctx)
{
uint64_t now = get_hires_time();
if(now > ctx->next_beacon_time) {
return true;
}
if(packet_queue_get_used_space(&ctx->oneshot_queue) != 0) {
return true;
}
connection_list_entry_t *head = connection_list_get_head(&ctx->conn_list);
if(head) {
return connection_can_transmit(&head->connection);
}
return false;
}
void digipeater_extend_cycle(digipeater_ctx_t *ctx, uint64_t ns)
{
ctx->cycle_end_time += ns;
}
result_t digipeater_end_cycle(digipeater_ctx_t *ctx)
{
uint64_t now = get_hires_time();
if(now >= ctx->next_beacon_time) {
ctx->state = DIGIPEATER_STATE_BEACON;
} else {
// TODO: adjust the time based on connection activity; right now this results
// in round-robin scheduling
connection_list_reschedule_head(&ctx->conn_list, now + HRTIME_MS(MIN_INTERVAL_TIME_MS));
ctx->state = DIGIPEATER_STATE_CONN;
ctx->cycle_end_time = now + HRTIME_MS(MIN_INTERVAL_TIME_MS);
}
return OK;
}
result_t digipeater_maintain(digipeater_ctx_t *ctx)
{
uint64_t now = get_hires_time();
if(now > ctx->cycle_end_time) {
// at the end of the cycle, the next connection is activated and the
// current one is re-scheduled.
LOG(LVL_DEBUG, "Interval ended by timeout at %llu ns.", now);
digipeater_end_cycle(ctx);
}
switch(ctx->state) {
case DIGIPEATER_STATE_CONN:
{
connection_list_entry_t *head = connection_list_get_head(&ctx->conn_list);
if(!head) {
LOG(LVL_INFO, "No active connection -> force beacon state for packet handling.");
ctx->state = DIGIPEATER_STATE_BEACON;
} else {
connection_evt_t evt;
ERR_CHECK(connection_maintain(&head->connection, &evt));
switch(evt) {
case CONN_EVT_TIMEOUT:
// connection has been closed by timeout -> clean up the list
connection_list_delete_closed(&ctx->conn_list);
break;
default:
// do nothing
break;
}
}
break;
}
case DIGIPEATER_STATE_BEACON:
// nothing to do here
break;
}
return OK;
}

View file

@ -0,0 +1,121 @@
/*
* This file contains functions to handle a single layer 2 digipeater.
*
* SPDX-License-Identifier: GPL-3.0-or-later
*
* Copyright (C) 2024 Thomas Kolb
*/
#ifndef DIGIPEATER_H
#define DIGIPEATER_H
#include <results.h>
#include <stdbool.h>
#include "connection_list.h"
#include "layer2/packet_queue.h"
struct digipeater_ctx_s;
typedef enum {
DIGIPEATER_STATE_BEACON, //!< Beacon sent, waiting for connection requests
DIGIPEATER_STATE_CONN, //!< Handling client connections
} digipeater_state_t;
typedef enum {
DIGIPEATER_EVT_INTERVAL_END, //!< The current cycle has ended and new packets should be transmitted.
} digipeater_evt_t;
typedef struct digipeater_ctx_s {
digipeater_state_t state; //!< Current operating state
ham64_t my_addr; //!< The local link layer address.
packet_queue_t oneshot_queue; //!< Queue for packets that are sent once and connection-independent
uint64_t next_beacon_time; //!< Absolute timestamp of the next beacon transmission.
uint64_t cycle_end_time; //!< Absolute timestamp of the end of the current cycle.
connection_list_t conn_list; //!< List of connections.
} digipeater_ctx_t;
/*!\brief Initialize the digipeater context.
*
* \param ctx The digipeater context to initialize.
* \param my_addr The local link layer address.
* \returns OK if everything worked or a fitting error code.
*/
result_t digipeater_init(
digipeater_ctx_t *ctx,
const ham64_t *my_addr);
/*!\brief Destroy the given digipeater context.
*/
void digipeater_destroy(digipeater_ctx_t *ctx);
/*!\brief Handle a received packet.
*
* \param[inout] ctx The digipeater context.
* \param[in] buf Pointer to the packet data.
* \param[in] buf_len Length of the packet.
* \param[out] data_packet Structure will be filled with a received data packet.
* \returns A result code from the packet handling procedure.
*/
result_t digipeater_handle_packet(
digipeater_ctx_t *ctx,
const uint8_t *buf,
size_t buf_len,
layer2_data_packet_t *data_packet);
/*!\brief Enqueue a packet for transmission.
* \param ctx The digipeater context.
* \param tun_fd File descriptor for an open TUN device.
*/
result_t digipeater_fill_packet_queues_from_tundev(digipeater_ctx_t *ctx, int tun_fd);
/*!\brief Encode the next packet for transmission.
*
* \note
* If no packets are currently available for transmission, this function returns zero.
*
* \param ctx The digipeater context.
* \param buf Where to write the encoded packet data.
* \param buf_len Space available in the buffer.
* \param end_burst Set to true if this packet should end the current burst and start the transmission.
* \returns The number of bytes written to buf or zero if no packet was available.
*/
size_t digipeater_encode_next_packet(digipeater_ctx_t *ctx, uint8_t *buf, size_t buf_len, bool *end_burst);
/*!\brief Check if there are packets queued for transmission.
*/
bool digipeater_can_transmit(digipeater_ctx_t *ctx);
/*!\brief Extend the current cycle.
*
* By default, the cycle duration is set to a minimum length. It must be
* extended by the time needed to transmit and receive packets. As the time
* necessary for packet transfer is unknown to the Layer 2, it must be
* calculated externally.
*
* This function should be called whenever
* - A packet is transmitted from digipeater_encode_next_packet(), or
* - A packet is being received
*/
void digipeater_extend_cycle(digipeater_ctx_t *ctx, uint64_t ns);
/*!\brief End the current cycle.
*
* End the cycle without waiting for the timeout. This switches to the next
* connection or transmits a beacon. In any case, it stops forwarding received
* packets to the current connection.
*/
result_t digipeater_end_cycle(digipeater_ctx_t *ctx);
/*!\brief Handle internal maintenance tasks.
*
* This should be called periodically to handle timeouts and retransmissions.
*/
result_t digipeater_maintain(digipeater_ctx_t *ctx);
#endif // DIGIPEATER_H

View file

@ -8,9 +8,13 @@
#include <assert.h>
#include <string.h>
#include <liquid/liquid.h>
#define LOGGER_MODULE_NAME "l2ps"
#include "logger.h"
#include "config.h"
#include "packet_structs.h"
@ -129,3 +133,36 @@ void layer2_dump_packet_header(int level, const layer2_packet_header_t *header)
}
}
size_t layer2_encode_packet(
const layer2_packet_header_t *header,
uint8_t *payload, size_t payload_len,
uint8_t *data_out, size_t data_out_len)
{
unsigned int crc_size = crc_sizeof_key(PAYLOAD_CRC_SCHEME);
if(data_out_len < LAYER2_PACKET_HEADER_ENCODED_SIZE_MAX) {
return 0;
}
size_t packet_size = layer2_encode_packet_header(header, data_out);
if(data_out_len < packet_size + crc_size + payload_len) {
return 0;
}
// add the payload data
if(payload) {
memcpy(data_out + packet_size, payload, payload_len);
}
packet_size += payload_len;
// calculate CRC of everything and append it to the packet
crc_append_key(PAYLOAD_CRC_SCHEME, data_out, packet_size);
packet_size += crc_size;
return packet_size;
}

View file

@ -33,6 +33,7 @@ typedef struct layer2_packet_header_s {
ham64_t dst_addr; //!< destination HAM-64 address
} layer2_packet_header_t;
// maximum header size
// - 1 byte packet info
// - 1 byte sequence numbers
@ -74,6 +75,19 @@ bool layer2_decode_packet_header(const uint8_t *encoded, size_t encoded_len, lay
*/
void layer2_dump_packet_header(int level, const layer2_packet_header_t *header);
/*!\brief Encode a complete packet (with header and CRC)
* \param header The header structure to encode.
* \param payload The payload data to encode.
* \param payload_len The length of the payload data.
* \param data_out The buffer for the encoded packet data.
* \param data_out_len The size of the output buffer.
* \returns The number of bytes written to the output buffer or 0 if not enough space was available.
*/
size_t layer2_encode_packet(
const layer2_packet_header_t *header,
uint8_t *payload, size_t payload_len,
uint8_t *data_out, size_t data_out_len);
/*!\brief Get a string representation of the given message type.
*/
const char* layer2_msg_type_to_string(layer2_message_type_t type);
@ -82,15 +96,36 @@ const char* layer2_msg_type_to_string(layer2_message_type_t type);
typedef enum {
L2_PAYLOAD_TYPE_IPV4 = 0x00,
L2_PAYLOAD_TYPE_IPV6 = 0x01
L2_PAYLOAD_TYPE_IPV6 = 0x01,
L2_PAYLOAD_TYPE_INVALID = 0x7FFFFFFF
} layer2_payload_type_t;
typedef struct layer2_data_header_s {
typedef struct layer2_data_packet_s {
layer2_payload_type_t payload_type; //!< Type of the contained layer 3 packet
} layer2_data_header_t;
const uint8_t *payload; //!< Pointer to the payload data
size_t payload_len; //!< Length of the payload data
} layer2_data_packet_t;
/* Connection Management Structs */
// TODO
typedef enum {
CONN_MGMT_TYPE_BEACON = 0x00,
CONN_MGMT_TYPE_CONNECTION_REQUEST = 0x01,
CONN_MGMT_TYPE_CONNECTION_PARAMETERS = 0x02,
CONN_MGMT_TYPE_CONNECTION_RESET = 0x03,
CONN_MGMT_TYPE_DISCONNECT_REQUEST = 0x04,
CONN_MGMT_TYPE_DISCONNECT = 0x05,
} conn_mgmt_type_t;
typedef enum {
CONN_PARAM_TYPE_IPV6_ADDRESS = 0x00,
CONN_PARAM_TYPE_IPV6_GATEWAY = 0x01,
CONN_PARAM_TYPE_IPV6_DNS = 0x02,
CONN_PARAM_TYPE_IPV4_ADDRESS = 0x08,
CONN_PARAM_TYPE_IPV4_GATEWAY = 0x09,
CONN_PARAM_TYPE_IPV4_DNS = 0x0A,
} conn_param_type_t;
#endif // PACKET_STRUCTS_H

View file

@ -29,7 +29,7 @@ int tundev_open(char *dev)
struct ifreq ifr;
memset(&ifr, 0, sizeof(ifr));
ifr.ifr_flags = IFF_TUN | IFF_NO_PI;
ifr.ifr_flags = IFF_TUN;
strncpy(ifr.ifr_name, dev, IFNAMSIZ);

View file

@ -50,7 +50,7 @@
static int m_tunfd = -1;
static bool m_running = true;
static double next_tx_switch_time = 0.0;
static uint64_t next_tx_switch_time = 0;
static rx_stats_t m_rx_stats;
@ -71,7 +71,7 @@ static void signal_handler(int signal, siginfo_t *info, void *ctx)
static void block_tx_for(unsigned offset_ms)
{
next_tx_switch_time = get_hires_time() + (double)offset_ms * 0.001;
next_tx_switch_time = get_hires_time() + HRTIME_MS(offset_ms);
}
void print_complex_array(const char *varname, float complex const *array, size_t len)
@ -186,7 +186,7 @@ int main(int argc, char **argv)
bool on_air = true;
srand((int)(get_hires_time() * 1e6));
srand(get_hires_time());
// ** Initialize **
@ -233,18 +233,18 @@ int main(int argc, char **argv)
unsigned rx_retries = 0;
double old = get_hires_time();
uint64_t old = get_hires_time();
size_t total_samples = 0;
double next_stats_print_time = old + 0.5;
uint64_t next_stats_print_time = old + HRTIME_MS(500);
double retransmit_time = 0.0;
uint64_t retransmit_time = 0;
while(m_running) {
double now = get_hires_time();
uint64_t now = get_hires_time();
if(retransmit_time != 0.0 && now >= retransmit_time) {
if(retransmit_time != 0 && now >= retransmit_time) {
LOG(LVL_INFO, "Retransmit triggered.");
retransmit_time = 0.0;
retransmit_time = 0;
layer2_tx_restart(&l2tx);
}
@ -325,9 +325,9 @@ int main(int argc, char **argv)
RESULT_CHECK(sdr_start_rx(&sdr));
on_air = false;
retransmit_time = get_hires_time() + 1.0 + 1.0 * rand() / RAND_MAX;
retransmit_time = get_hires_time() + HRTIME_SEC(1) + HRTIME_SEC(1.0 * rand() / RAND_MAX);
block_tx_for(TX_SWITCH_BACKOFF_AFTER_RX_ON);
block_tx_for(TX_SWITCH_BACKOFF_AFTER_RX_ON_MS);
}
}
@ -360,9 +360,9 @@ int main(int argc, char **argv)
total_samples += n_rf_samples;
double new = get_hires_time();
uint64_t new = get_hires_time();
if(new >= next_stats_print_time) {
double rate = total_samples / (new - old);
double rate = total_samples * 1e9 / (new - old);
LOG(LVL_INFO, "\nEstimated rate: %.3f MS/s", rate / 1e6);
LOG(LVL_INFO, "Receiver statistics:");
LOG(LVL_INFO, " Preambles found: %8zd", m_rx_stats.preambles_found);

View file

@ -49,4 +49,12 @@ typedef enum {
} \
} while(0);
#define WARN_ON_ERR(call) \
do { \
result_t err_check_result = call; \
if(err_check_result != OK) { \
LOG(LVL_WARN, "Error ignored at %s:%d: %d", __FILE__, __LINE__, err_check_result); \
} \
} while(0);
#endif // RESULTS_H

View file

@ -99,9 +99,9 @@ static int tx_callback(hackrf_transfer *transfer)
return HACKRF_ERROR_OTHER;
}
if(sdr_ctx->tx_start_time == 0.0) {
if(sdr_ctx->tx_start_time == 0) {
sdr_ctx->tx_start_time = get_hires_time();
sdr_ctx->tx_duration = 10e-3; // give a little headroom
sdr_ctx->tx_duration = HRTIME_MS(10); // give a little headroom
LOG(LVL_INFO, "TX time tracking reset: start = %.3f.", sdr_ctx->tx_start_time);
}
@ -112,7 +112,7 @@ static int tx_callback(hackrf_transfer *transfer)
if(samples_read != 0) {
// only add time if any actual samples were transmitted
sdr_ctx->tx_duration += (double)samples_requested / SDR_TX_SAMPLING_RATE;
sdr_ctx->tx_duration += HRTIME_SEC((double)samples_requested / SDR_TX_SAMPLING_RATE);
}
LOG(LVL_DEBUG, "copied %u samples to HackRF.", samples_read);
@ -377,8 +377,8 @@ result_t sdr_flush_tx_buffer(sdr_ctx_t *ctx)
return 0;
}
double now = get_hires_time();
double end = ctx->tx_start_time + ctx->tx_duration;
uint64_t now = get_hires_time();
uint64_t end = ctx->tx_start_time + ctx->tx_duration;
if(sem_post(&ctx->buf_sem) < 0) {
LOG(LVL_ERR, "sem_post: %s", strerror(errno));

View file

@ -76,11 +76,23 @@ err_close:
return false;
}
double get_hires_time(void)
uint64_t get_hires_time(void)
{
struct timespec clk;
clock_gettime(CLOCK_MONOTONIC, &clk);
return clk.tv_sec + 1e-9 * clk.tv_nsec;
return clk.tv_sec * 1000000000ULL + (uint64_t)clk.tv_nsec;
}
void sleep_until(uint64_t hires_time)
{
struct timespec tv;
int ret;
tv.tv_sec = hires_time / 1000000000ULL;
tv.tv_nsec = hires_time % 1000000000ULL;
do {
ret = clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &tv, NULL);
} while(ret == EINTR);
}
void fsleep(double d)
@ -93,18 +105,6 @@ void fsleep(double d)
nanosleep(&ts, NULL);
}
void sleep_until(double hires_time)
{
struct timespec tv;
int ret;
tv.tv_sec = hires_time;
tv.tv_nsec = (uint64_t)(1e9 * hires_time) % 1000000000;
do {
ret = clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &tv, NULL);
} while(ret == EINTR);
}
void hexdump(const uint8_t *data, size_t len)
{
static const char lut[16] = "0123456789ABCDEF";

View file

@ -11,6 +11,10 @@
#include <stdbool.h>
#include <liquid/liquid.h>
#define HRTIME_US(x) ((uint64_t)(1000ULL * (x)))
#define HRTIME_MS(x) ((uint64_t)(1000000ULL * (x)))
#define HRTIME_SEC(x) ((uint64_t)(1000000000ULL * (x)))
/*! Dump a array of complex numbers.
*
* \param data Pointer to the data to dump.
@ -30,9 +34,26 @@ bool dump_array_cf(const float complex *data, size_t n, float T, const char *fil
*/
bool dump_array_f(const float *data, size_t n, float T, const char *filename);
void sleep_until(double hires_time);
/*! Sleep until the given absolute timestamp in ns.
*
* The current timestamp can be retrieved using \ref get_hires_time().
*
* \param hires_time The resume timestamp in ns.
*/
void sleep_until(uint64_t hires_time);
/*! Returns the current high-resulution timestamp.
*
* This timestamp comes from the CLOCK_MONOTONIC source and has no defined relation to the wall-clock time. It can be used to calculate intervals, though.
*
* \returns A timestamp in nanosecond resolution.
*/
uint64_t get_hires_time(void);
void fsleep(double d);
double get_hires_time(void);
void hexdump(const uint8_t *data, size_t len);

View file

@ -222,3 +222,7 @@ target_link_libraries(
test_interleaver
m
)
#------------------------------------
add_subdirectory(layer2_over_udp)

View file

@ -0,0 +1,73 @@
add_executable(
l2udptest_client
../../src/utils.c
../../src/utils.h
../../src/logger.c
../../src/logger.h
../../src/options.c
../../src/options.h
../../src/var_array.c
../../src/var_array.h
../../src/config.h
../../src/jsonlogger.c
../../src/jsonlogger.h
../../src/debug_structs.h
../../src/layer2/packet_structs.c
../../src/layer2/packet_structs.h
../../src/layer2/ham64.c
../../src/layer2/ham64.h
../../src/layer2/packet_queue.c
../../src/layer2/packet_queue.h
../../src/layer2/connection.c
../../src/layer2/connection.h
../../src/layer2/tundev.c
../../src/layer2/tundev.h
l2udptest_client.c
)
target_link_libraries(
l2udptest_client
fec
m
liquid
)
#---------------------------
add_executable(
l2udptest_digipeater
../../src/utils.c
../../src/utils.h
../../src/logger.c
../../src/logger.h
../../src/options.c
../../src/options.h
../../src/var_array.c
../../src/var_array.h
../../src/config.h
../../src/jsonlogger.c
../../src/jsonlogger.h
../../src/debug_structs.h
../../src/layer2/packet_structs.c
../../src/layer2/packet_structs.h
../../src/layer2/ham64.c
../../src/layer2/ham64.h
../../src/layer2/packet_queue.c
../../src/layer2/packet_queue.h
../../src/layer2/connection.c
../../src/layer2/connection.h
../../src/layer2/connection_list.c
../../src/layer2/connection_list.h
../../src/layer2/digipeater.c
../../src/layer2/digipeater.h
../../src/layer2/tundev.c
../../src/layer2/tundev.h
l2udptest_digipeater.c
)
target_link_libraries(
l2udptest_digipeater
fec
m
liquid
)

View file

@ -0,0 +1,484 @@
/*
* SPDX-License-Identifier: GPL-3.0-or-later
*
* Copyright (C) 2024 Thomas Kolb
*/
#include <linux/if.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <liquid/liquid.h>
#include <sys/poll.h>
#include <unistd.h>
#include <fcntl.h>
#include <poll.h>
#include <signal.h>
#include <errno.h>
#include "layer2/ham64.h"
#include "results.h"
#include "utils.h"
#define LOGGER_MODULE_NAME "main"
#include "logger.h"
#include "jsonlogger.h"
#include "debug_structs.h"
#include "layer2/connection.h"
#include "layer2/tundev.h"
#include "config.h"
#define RESULT_CHECK(stmt) { \
result_t res = stmt; \
if(res != OK) { \
LOG(LVL_FATAL, "Error %d in %s:%d!", res, __FILE__, __LINE__); \
exit(1); \
} \
}
#define BROADCAST_PORT 3737
static int m_tunfd = -1;
static bool m_running = true;
static int m_bcast_sock = -1;
static uint64_t next_tx_switch_time = 0;
static rx_stats_t m_rx_stats;
static connection_ctx_t l2conn;
static void signal_handler(int signal, siginfo_t *info, void *ctx)
{
(void)signal;
(void)info;
(void)ctx;
LOG(LVL_INFO, "\nGracefully shutting down on signal %d.", signal);
m_running = false;
}
static void block_tx_for(unsigned offset_ms)
{
next_tx_switch_time = get_hires_time() + HRTIME_MS(offset_ms);
}
static result_t transmit(const uint8_t *data, size_t len)
{
result_t result = OK;
struct sockaddr_in bcast_addr = {0};
bcast_addr.sin_family = AF_INET;
bcast_addr.sin_port = htons(BROADCAST_PORT);
bcast_addr.sin_addr.s_addr = INADDR_BROADCAST;
int ret = sendto(m_bcast_sock, data, len, 0, (struct sockaddr*)&bcast_addr, sizeof(bcast_addr));
if(ret < 0) {
LOG(LVL_ERR, "sendto: %s", strerror(errno));
exit(EXIT_FAILURE);
}
fprintf(stderr, "t");
return result;
}
void rx_data_to_tun(const layer2_data_packet_t *data_packet)
{
uint8_t tun_packet[4 + data_packet->payload_len];
// flags
tun_packet[0] = 0;
tun_packet[1] = 0;
switch(data_packet->payload_type) {
case L2_PAYLOAD_TYPE_IPV6:
*(uint16_t*)(tun_packet+2) = htons(0x86dd);
break;
case L2_PAYLOAD_TYPE_IPV4:
*(uint16_t*)(tun_packet+2) = htons(0x0800);
break;
default:
LOG(LVL_ERR, "Unsupported payload type: 0x%08x.", data_packet->payload_type);
return;
}
memcpy(tun_packet+4, data_packet->payload, data_packet->payload_len);
int ret = write(m_tunfd, tun_packet, sizeof(tun_packet));
if(ret < 0) {
LOG(LVL_ERR, "write(tun): %s", strerror(errno));
}
}
result_t connect_to_digipeater(connection_ctx_t *conn)
{
while(conn->conn_state != CONN_STATE_ESTABLISHED) {
uint8_t packetbuf[65536];
LOG(LVL_INFO, "Waiting for packets from digipeater (conn. state: %d)", conn->conn_state);
// note: recv() is blocking here
int ret = recv(m_bcast_sock, packetbuf, sizeof(packetbuf), 0);
if(ret < 0) {
LOG(LVL_ERR, "recv: %s", strerror(errno));
return ERR_SYSCALL;
} else if(ret == 0) {
LOG(LVL_ERR, "recv() returned zero.");
return ERR_SYSCALL;
}
layer2_data_packet_t data_packet;
bool tx_request_received = false;
result_t err_code = connection_handle_packet(conn, packetbuf, ret, &data_packet, &tx_request_received);
switch(err_code) {
case OK:
break; // state machine advanced and there are probably packets to send
case ERR_INTEGRITY:
case ERR_INVALID_STATE:
continue; // retry with the next packet
default:
LOG(LVL_ERR, "connection_handle_packet() returned %d.", err_code);
return err_code;
}
LOG(LVL_INFO, "Packet processed successfully (new conn. state: %d); Transmitting response (if any).", conn->conn_state);
// send any packets in the queue
while(connection_can_transmit(conn)) {
uint8_t packet_buf[2048];
size_t packet_size;
bool end_burst;
packet_size = connection_encode_next_packet(conn,
packet_buf, sizeof(packet_buf), &end_burst);
if(packet_size == 0) {
// no more packets available
LOG(LVL_DEBUG, "Ending burst due to empty packet queue.");
break;
}
LOG(LVL_DEBUG, "Adding packet with %d bytes to burst.", packet_size);
RESULT_CHECK(transmit(packet_buf, packet_size));
if(end_burst) {
LOG(LVL_DEBUG, "Ending burst on request.");
break;
}
}
}
return OK;
}
void conn_evt_cb(struct connection_ctx_s *conn, connection_evt_t evt, void *user_ctx)
{
(void)conn;
(void)evt;
(void)user_ctx;
}
int main(void)
{
// initialize the console logger
logger_init();
if(!jsonlogger_init("jsonlog.fifo")) {
LOG(LVL_FATAL, "Could not initialize JSON logger.");
return EXIT_FAILURE;
}
srand(get_hires_time());
// ** Initialize **
char devname[IFNAMSIZ] = "hamnet70";
m_tunfd = tundev_open(devname);
if(m_tunfd < 0) {
return 1;
}
// ** Set up signal handling
struct sigaction term_action = {0};
term_action.sa_sigaction = signal_handler;
if(sigaction(SIGTERM, &term_action, NULL) < 0) {
LOG(LVL_ERR, "sigaction: %s", strerror(errno));
exit(EXIT_FAILURE);
}
if(sigaction(SIGINT, &term_action, NULL) < 0) {
LOG(LVL_ERR, "sigaction: %s", strerror(errno));
exit(EXIT_FAILURE);
}
// ** Set up UDP socket
m_bcast_sock = socket(AF_INET, SOCK_DGRAM, 0);
if(m_bcast_sock < 0) {
LOG(LVL_ERR, "socket: %s", strerror(errno));
exit(EXIT_FAILURE);
}
int broadcastEnable=1;
int ret = setsockopt(m_bcast_sock, SOL_SOCKET, SO_BROADCAST, &broadcastEnable, sizeof(broadcastEnable));
if(ret < 0) {
LOG(LVL_ERR, "setsockopt: %s", strerror(errno));
exit(EXIT_FAILURE);
}
struct sockaddr_in bind_addr = {0};
bind_addr.sin_family = AF_INET;
bind_addr.sin_port = htons(BROADCAST_PORT);
bind_addr.sin_addr.s_addr = INADDR_ANY;
ret = bind(m_bcast_sock, (struct sockaddr*)&bind_addr, sizeof(bind_addr));
if(ret < 0) {
LOG(LVL_ERR, "bind: %s", strerror(errno));
exit(EXIT_FAILURE);
}
// ** Process packets **
struct pollfd pfd_tun;
memset(&pfd_tun, 0, sizeof(pfd_tun));
pfd_tun.fd = m_tunfd;
pfd_tun.events = POLLIN;
struct pollfd pfd_bcast;
memset(&pfd_bcast, 0, sizeof(pfd_bcast));
pfd_bcast.fd = m_bcast_sock;
pfd_bcast.events = POLLIN;
uint64_t old = get_hires_time();
size_t total_bytes = 0;
uint64_t next_stats_print_time = old + HRTIME_MS(500);
ham64_t my_address, peer_address;
ham64_encode(MY_CALL, &my_address);
ham64_encode(PEER_CALL, &peer_address);
RESULT_CHECK(connection_init(&l2conn, &my_address, &peer_address));
connection_setup_outgoing(&l2conn);
RESULT_CHECK(connect_to_digipeater(&l2conn));
while(m_running) {
// fill the TX queue from the TUN device
while(connection_can_enqueue_packet(&l2conn)) {
int ret = poll(&pfd_tun, 1, 0 /* timeout */);
if(ret < 0) {
LOG(LVL_ERR, "poll: %s", strerror(errno));
break;
} else if(ret == 0) {
// no more packets
break;
} else {
// a packet is available -> move it to the queue
static const size_t packetbuf_size = 2048;
uint8_t packetbuf[packetbuf_size];
ret = read(m_tunfd, packetbuf, packetbuf_size);
if(ret < 0) {
LOG(LVL_ERR, "read: %s", strerror(errno));
return ERR_SYSCALL;
} else if(ret == 0) {
// no more data, should not happen
break;
} else if(ret < 4) {
LOG(LVL_ERR, "Not enough data from TUN read() to check packet type!");
return ERR_SYSCALL;
}
uint16_t flags = *(uint16_t*)packetbuf;
uint16_t proto = *((uint16_t*)packetbuf + 1);
LOG(LVL_DUMP, "TUN Flags: 0x%04x", flags);
LOG(LVL_DUMP, "TUN Proto: 0x%04x", proto);
uint8_t *packet_data = packetbuf + 4;
size_t packet_length = ret - 4;
if(proto != 0xdd86) {
LOG(LVL_WARN, "Non-IPv6 packet ignored. Proto: 0x%04x", proto);
continue;
}
RESULT_CHECK(connection_enqueue_data_packet(&l2conn, L2_PAYLOAD_TYPE_IPV6, packet_data, packet_length));
}
}
// transmit one burst
if(connection_can_transmit(&l2conn)) {
// there is a packet to be (re)transmitted.
LOG(LVL_DEBUG, "Starting new burst.");
size_t burst_len = 0;
// add packets to the burst until only 50000 samples remain free in the SDR buffer
while(true) {
uint8_t packet_buf[2048];
size_t packet_size;
bool end_burst;
packet_size = connection_encode_next_packet(&l2conn,
packet_buf, sizeof(packet_buf), &end_burst);
if(packet_size == 0) {
// no more packets available
LOG(LVL_DEBUG, "Ending burst due to empty packet queue.");
break;
}
LOG(LVL_DEBUG, "Adding packet with %d bytes to burst.", packet_size);
burst_len++;
RESULT_CHECK(transmit(packet_buf, packet_size));
if(end_burst) {
LOG(LVL_DEBUG, "Ending burst on request.");
break;
}
}
connection_tx_clean_empty_packet(&l2conn);
LOG(LVL_DEBUG, "Burst finished: %zd packets sent.", burst_len);
}
bool may_tx = false;
while(!may_tx) {
connection_evt_t evt;
RESULT_CHECK(connection_maintain(&l2conn, &evt));
if(evt == CONN_EVT_TIMEOUT) {
LOG(LVL_ERR, "Connection timed out. Shutting down.");
m_running = false;
break;
}
// ** Receive signal **
int ret = poll(&pfd_bcast, 1, 10);
if(ret < 0) {
LOG(LVL_ERR, "poll: %s", strerror(errno));
return EXIT_FAILURE;
}
if(ret == 0) {
continue;
}
uint8_t packetbuf[65536];
ret = recv(m_bcast_sock, packetbuf, sizeof(packetbuf), 0);
if(ret < 0) {
LOG(LVL_ERR, "recv: %s", strerror(errno));
return EXIT_FAILURE;
} else if(ret == 0) {
LOG(LVL_ERR, "recv() returned zero.");
return EXIT_FAILURE;
}
block_tx_for(TX_SWITCH_BACKOFF_END_OF_PACKET_MS);
layer2_data_packet_t data_packet;
bool tx_request_received = false;
result_t result = connection_handle_packet(&l2conn, packetbuf, ret, &data_packet, &tx_request_received);
// Switch to TX when a packet with tx_request=1 was decoded successfully.
// Note that this even triggers if the packet is out of sequence (but not for corrupted packets).
if(tx_request_received) {
may_tx = true;
}
switch(result) {
case OK:
// update statistics
m_rx_stats.successful_decodes++;
total_bytes += ret;
if(data_packet.payload_len != 0) {
rx_data_to_tun(&data_packet);
}
break;
case ERR_INTEGRITY:
LOG(LVL_ERR, "Packet could not be decoded by Layer 2.");
m_rx_stats.failed_decodes++;
break;
case ERR_SEQUENCE:
LOG(LVL_ERR, "Packet not in the expected sequence.");
break;
case ERR_INVALID_STATE:
LOG(LVL_WARN, "Packet ignored due to invalid state.");
break;
default: // all other errors
LOG(LVL_ERR, "layer2_rx_handle_packet() returned error code %u.", result);
break;
}
uint64_t new = get_hires_time();
if(new >= next_stats_print_time) {
double rate = total_bytes * 1e9 / (new - old);
LOG(LVL_INFO, "\nEstimated rate: %.3f kB/s", rate / 1e3);
LOG(LVL_INFO, "Receiver statistics:");
LOG(LVL_INFO, " Preambles found: %8zd", m_rx_stats.preambles_found);
LOG(LVL_INFO, " Successful decodes: %8zd (%6.2f %%)",
m_rx_stats.successful_decodes, m_rx_stats.successful_decodes * 100.0f / m_rx_stats.preambles_found);
LOG(LVL_INFO, " Header errors: %8zd (%6.2f %%)",
m_rx_stats.header_errors, m_rx_stats.header_errors * 100.0f / m_rx_stats.preambles_found);
LOG(LVL_INFO, " Failed decodes: %8zd (%6.2f %%)",
m_rx_stats.failed_decodes, m_rx_stats.failed_decodes * 100.0f / m_rx_stats.preambles_found);
next_stats_print_time += HRTIME_MS(500);
total_bytes = 0;
old = new;
}
fprintf(stderr, "r");
}
// give the other side some time to switch to rx
sleep_until(next_tx_switch_time);
}
// ** Cleanup **
close(m_bcast_sock);
connection_destroy(&l2conn);
jsonlogger_shutdown();
LOG(LVL_INFO, "Done.");
logger_shutdown();
}

View file

@ -0,0 +1,345 @@
/*
* SPDX-License-Identifier: GPL-3.0-or-later
*
* Copyright (C) 2024 Thomas Kolb
*/
#include <linux/if.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <liquid/liquid.h>
#include <sys/poll.h>
#include <unistd.h>
#include <fcntl.h>
#include <poll.h>
#include <signal.h>
#include <errno.h>
#include "layer2/ham64.h"
#include "utils.h"
#define LOGGER_MODULE_NAME "main"
#include "logger.h"
#include "jsonlogger.h"
#include "debug_structs.h"
#include "layer2/digipeater.h"
#include "layer2/tundev.h"
#include "config.h"
#define RESULT_CHECK(stmt) { \
result_t res = stmt; \
if(res != OK) { \
LOG(LVL_FATAL, "Error %d in %s:%d!", res, __FILE__, __LINE__); \
exit(1); \
} \
}
#define BROADCAST_PORT 3737
static int m_tunfd = -1;
static bool m_running = true;
static int m_bcast_sock = -1;
static uint64_t next_tx_switch_time = 0;
static rx_stats_t m_rx_stats;
static void signal_handler(int signal, siginfo_t *info, void *ctx)
{
(void)signal;
(void)info;
(void)ctx;
LOG(LVL_INFO, "\nGracefully shutting down on signal %d.", signal);
m_running = false;
}
static void block_tx_for(unsigned offset_ms)
{
next_tx_switch_time = get_hires_time() + HRTIME_MS(offset_ms);
}
static result_t transmit(const uint8_t *data, size_t len)
{
result_t result = OK;
struct sockaddr_in bcast_addr = {0};
bcast_addr.sin_family = AF_INET;
bcast_addr.sin_port = htons(BROADCAST_PORT);
bcast_addr.sin_addr.s_addr = INADDR_BROADCAST;
int ret = sendto(m_bcast_sock, data, len, 0, (struct sockaddr*)&bcast_addr, sizeof(bcast_addr));
if(ret < 0) {
LOG(LVL_ERR, "sendto: %s", strerror(errno));
exit(EXIT_FAILURE);
}
fprintf(stderr, "t");
return result;
}
void rx_data_to_tun(const layer2_data_packet_t *data_packet)
{
uint8_t tun_packet[4 + data_packet->payload_len];
// flags
tun_packet[0] = 0;
tun_packet[1] = 0;
switch(data_packet->payload_type) {
case L2_PAYLOAD_TYPE_IPV6:
*(uint16_t*)(tun_packet+2) = htons(0x86dd);
break;
case L2_PAYLOAD_TYPE_IPV4:
*(uint16_t*)(tun_packet+2) = htons(0x0800);
break;
default:
LOG(LVL_ERR, "Unsupported payload type: 0x%08x.", data_packet->payload_type);
return;
}
memcpy(tun_packet+4, data_packet->payload, data_packet->payload_len);
int ret = write(m_tunfd, tun_packet, sizeof(tun_packet));
if(ret < 0) {
LOG(LVL_ERR, "write(tun): %s", strerror(errno));
}
}
int main(void)
{
// initialize the console logger
logger_init();
if(!jsonlogger_init("jsonlog.fifo")) {
LOG(LVL_FATAL, "Could not initialize JSON logger.");
return EXIT_FAILURE;
}
srand(get_hires_time());
// ** Initialize **
char devname[IFNAMSIZ] = "hamnet70";
m_tunfd = tundev_open(devname);
if(m_tunfd < 0) {
return 1;
}
ham64_t my_address, peer_address;
ham64_encode(MY_CALL, &my_address);
ham64_encode(PEER_CALL, &peer_address);
digipeater_ctx_t digipeater;
RESULT_CHECK(digipeater_init(&digipeater, &my_address));
// ** Set up signal handling
struct sigaction term_action = {0};
term_action.sa_sigaction = signal_handler;
if(sigaction(SIGTERM, &term_action, NULL) < 0) {
LOG(LVL_ERR, "sigaction: %s", strerror(errno));
exit(EXIT_FAILURE);
}
if(sigaction(SIGINT, &term_action, NULL) < 0) {
LOG(LVL_ERR, "sigaction: %s", strerror(errno));
exit(EXIT_FAILURE);
}
// ** Set up UDP socket
m_bcast_sock = socket(AF_INET, SOCK_DGRAM, 0);
if(m_bcast_sock < 0) {
LOG(LVL_ERR, "socket: %s", strerror(errno));
exit(EXIT_FAILURE);
}
int broadcastEnable=1;
int ret = setsockopt(m_bcast_sock, SOL_SOCKET, SO_BROADCAST, &broadcastEnable, sizeof(broadcastEnable));
if(ret < 0) {
LOG(LVL_ERR, "setsockopt: %s", strerror(errno));
exit(EXIT_FAILURE);
}
struct sockaddr_in bind_addr = {0};
bind_addr.sin_family = AF_INET;
bind_addr.sin_port = htons(BROADCAST_PORT);
bind_addr.sin_addr.s_addr = INADDR_ANY;
ret = bind(m_bcast_sock, (struct sockaddr*)&bind_addr, sizeof(bind_addr));
if(ret < 0) {
LOG(LVL_ERR, "bind: %s", strerror(errno));
exit(EXIT_FAILURE);
}
// ** Process packets **
struct pollfd pfd_tun;
memset(&pfd_tun, 0, sizeof(pfd_tun));
pfd_tun.fd = m_tunfd;
pfd_tun.events = POLLIN;
struct pollfd pfd_bcast;
memset(&pfd_bcast, 0, sizeof(pfd_bcast));
pfd_bcast.fd = m_bcast_sock;
pfd_bcast.events = POLLIN;
uint64_t old = get_hires_time();
size_t total_bytes = 0;
uint64_t next_stats_print_time = old + HRTIME_MS(500);
while(m_running) {
RESULT_CHECK(digipeater_maintain(&digipeater));
RESULT_CHECK(digipeater_fill_packet_queues_from_tundev(&digipeater, m_tunfd));
// transmit anything available
if(digipeater_can_transmit(&digipeater)) {
// there is a packet to be (re)transmitted.
LOG(LVL_DEBUG, "Starting new burst.");
size_t burst_len = 0;
// add packets to the burst until only 50000 samples remain free in the SDR buffer
while(true) {
uint8_t packet_buf[2048];
size_t packet_size;
bool end_burst;
packet_size = digipeater_encode_next_packet(&digipeater,
packet_buf, sizeof(packet_buf), &end_burst);
if(packet_size == 0) {
// no more packets available
LOG(LVL_WARN, "Ending burst without TX Request!");
break;
}
LOG(LVL_DEBUG, "Adding packet with %d bytes to burst.", packet_size);
burst_len++;
RESULT_CHECK(transmit(packet_buf, packet_size));
if(end_burst) {
LOG(LVL_DEBUG, "Ending burst on request.");
break;
}
}
LOG(LVL_DEBUG, "Burst finished: %zd packets sent.", burst_len);
}
// make sure the receiver runs for a minimum amount of time
block_tx_for(TX_SWITCH_BACKOFF_AFTER_RX_ON_MS);
// receive response
while(get_hires_time() < next_tx_switch_time) {
// ** Receive packets from the broadcast socket **
int ret = poll(&pfd_bcast, 1, 10);
if(ret < 0) {
LOG(LVL_ERR, "poll: %s", strerror(errno));
return EXIT_FAILURE;
}
if(ret == 0) {
continue;
}
uint8_t packetbuf[65536];
ret = recv(m_bcast_sock, packetbuf, sizeof(packetbuf), 0);
if(ret < 0) {
LOG(LVL_ERR, "recv: %s", strerror(errno));
return EXIT_FAILURE;
} else if(ret == 0) {
LOG(LVL_ERR, "recv() returned zero.");
return EXIT_FAILURE;
}
block_tx_for(TX_SWITCH_BACKOFF_END_OF_PACKET_MS);
layer2_data_packet_t data_packet;
result_t result = digipeater_handle_packet(&digipeater, packetbuf, ret, &data_packet);
switch(result) {
case OK:
m_rx_stats.successful_decodes++;
if(data_packet.payload_len != 0) {
rx_data_to_tun(&data_packet);
}
break;
case ERR_INTEGRITY:
LOG(LVL_ERR, "Packet could not be decoded by Layer 2.");
m_rx_stats.failed_decodes++;
break;
case ERR_SEQUENCE:
LOG(LVL_ERR, "Packet not in the expected sequence.");
break;
default: // all other errors
LOG(LVL_ERR, "layer2_rx_handle_packet() returned error code %u.", result);
break;
}
total_bytes += ret;
uint64_t new = get_hires_time();
if(new >= next_stats_print_time) {
double rate = total_bytes * 1e9 / (new - old);
LOG(LVL_INFO, "\nEstimated rate: %.3f kB/s", rate / 1e3);
LOG(LVL_INFO, "Receiver statistics:");
LOG(LVL_INFO, " Preambles found: %8zd", m_rx_stats.preambles_found);
LOG(LVL_INFO, " Successful decodes: %8zd (%6.2f %%)",
m_rx_stats.successful_decodes, m_rx_stats.successful_decodes * 100.0f / m_rx_stats.preambles_found);
LOG(LVL_INFO, " Header errors: %8zd (%6.2f %%)",
m_rx_stats.header_errors, m_rx_stats.header_errors * 100.0f / m_rx_stats.preambles_found);
LOG(LVL_INFO, " Failed decodes: %8zd (%6.2f %%)",
m_rx_stats.failed_decodes, m_rx_stats.failed_decodes * 100.0f / m_rx_stats.preambles_found);
next_stats_print_time += HRTIME_MS(500);
total_bytes = 0;
old = new;
}
fprintf(stderr, "r");
}
}
// ** Cleanup **
close(m_bcast_sock);
digipeater_destroy(&digipeater);
jsonlogger_shutdown();
LOG(LVL_INFO, "Done.");
logger_shutdown();
}

View file

@ -0,0 +1,13 @@
add_executable(
call2ham64
../../src/layer2/ham64.c
../../src/layer2/ham64.h
../../src/logger.c
../../src/logger.h
call2ham64.c
)
target_link_libraries(
call2ham64
m
)

View file

@ -0,0 +1,59 @@
#include <layer2/ham64.h>
#include <logger.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
int main(int argc, char **argv)
{
logger_init();
if(argc < 2) {
LOG(LVL_FATAL, "Not enough arguments!");
LOG(LVL_FATAL, "");
LOG(LVL_FATAL, "usage: %s <callsign>", argv[0]);
return EXIT_FAILURE;
}
const char *callsign = argv[1];
size_t len = strlen(callsign);
if(len > 12) {
LOG(LVL_ERR, "Call sign too long: %zu chars (allowed: 12)", len);
return EXIT_FAILURE;
}
ham64_t ham64;
size_t encoded_len = ham64_encode(callsign, &ham64);
if(encoded_len == 0) {
LOG(LVL_ERR, "Encoding the call sign failed.");
return EXIT_FAILURE;
}
LOG(LVL_INFO, "Call sign encoded successfully: %zu words.", encoded_len);
char formatted[HAM64_FMT_MAX_LEN];
ham64_format(&ham64, formatted);
printf("ham64: %s\n", formatted);
// calculate IPv6 address
struct in6_addr ipv6_addr;
memset(&ipv6_addr, 0, sizeof(struct in6_addr));
// fill the host part from the ham64 address. The bytes are filled in
// reverse order to make the „readable“ IPv6 address as short as possible.
for(uint8_t i = 0; i < ham64.length; i++) {
ipv6_addr.s6_addr[15 - 2*i] = (ham64.addr[i] >> 8) & 0xFF;
ipv6_addr.s6_addr[14 - 2*i] = (ham64.addr[i] >> 0) & 0xFF;
}
// print the address
char ipv6_str[INET6_ADDRSTRLEN];
inet_ntop(AF_INET6, &ipv6_addr, ipv6_str, sizeof(ipv6_str));
printf("ipv6: %s\n", ipv6_str);
}