Combine layer2_rx and layer2_tx in a new connection module

The new module is not used yet, but this is a preparation for the future
multi-client networking support.
This commit is contained in:
Thomas Kolb 2024-08-25 22:26:56 +02:00
parent 899152a530
commit fc9e5c5229
7 changed files with 460 additions and 11 deletions

View file

@ -53,6 +53,8 @@ set(sources
src/layer2/layer2_rx.h src/layer2/layer2_rx.h
src/layer2/ham64.c src/layer2/ham64.c
src/layer2/ham64.h src/layer2/ham64.h
src/layer2/connection.c
src/layer2/connection.h
src/sdr/sdr.c src/sdr/sdr.c
src/sdr/sdr.h src/sdr/sdr.h
) )

View file

@ -0,0 +1,297 @@
#include <string.h>
#include <assert.h>
#include "connection.h"
#include "config.h"
#include "layer2/ham64.h"
#include "results.h"
#define SEQ_NR_MASK 0xF
result_t connection_init(connection_ctx_t *ctx, const ham64_t *my_addr, const ham64_t *peer_addr)
{
ctx->last_acked_seq = 0;
ctx->next_expected_seq = 0;
packet_queue_init(&ctx->packet_queue);
ctx->next_packet_index = 0;
ctx->next_seq_nr = 0;
ctx->my_addr = *my_addr;
ctx->peer_addr = *peer_addr;
ctx->conn_state = CONN_STATE_INITIALIZED;
return OK;
}
void connection_destroy(connection_ctx_t *ctx)
{
ctx->conn_state = CONN_STATE_UNINITIALIZED;
packet_queue_destroy(&ctx->packet_queue);
}
result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, size_t buf_len)
{
// check the CRC
size_t packet_size = buf_len - crc_sizeof_key(PAYLOAD_CRC_SCHEME);
if(!crc_check_key(PAYLOAD_CRC_SCHEME, (unsigned char*)buf, packet_size)) {
LOG(LVL_ERR, "payload CRC check failed!");
return ERR_INTEGRITY;
}
// decode the header
layer2_packet_header_t header;
if(!layer2_decode_packet_header(buf, buf_len, &header)) {
LOG(LVL_ERR, "Header could not be decoded!");
return ERR_INTEGRITY;
}
// check if the packet really should be handled by us
if(!ham64_is_equal(&header.src_addr, &ctx->peer_addr)) {
char fmt_src_addr[HAM64_FMT_MAX_LEN];
char fmt_peer_addr[HAM64_FMT_MAX_LEN];
ham64_format(&header.src_addr, fmt_src_addr);
ham64_format(&ctx->peer_addr, fmt_peer_addr);
LOG(LVL_ERR, "Packet has the wrong source address: got %s, expected %s",
fmt_src_addr, fmt_peer_addr);
return ERR_INVALID_ADDRESS;
}
if(!ham64_is_equal(&header.dst_addr, &ctx->my_addr)) {
char fmt_dst_addr[HAM64_FMT_MAX_LEN];
char fmt_my_addr[HAM64_FMT_MAX_LEN];
ham64_format(&header.dst_addr, fmt_dst_addr);
ham64_format(&ctx->my_addr, fmt_my_addr);
LOG(LVL_ERR, "Packet has the wrong destination address: got %s, expected %s",
fmt_dst_addr, fmt_my_addr);
return ERR_INVALID_ADDRESS;
}
LOG(LVL_DEBUG, "Handling %s packet with rx_seq_nr %u, tx_seq_nr %u.",
layer2_msg_type_to_string(header.msg_type), header.rx_seq_nr, header.tx_seq_nr);
ctx->last_acked_seq = header.rx_seq_nr;
switch(header.msg_type) {
case L2_MSG_TYPE_EMPTY:
LOG(LVL_DEBUG, "Empty packet: accepted ACK for %u.", ctx->last_acked_seq);
return OK; // do not ACK and call back
case L2_MSG_TYPE_CONN_MGMT:
case L2_MSG_TYPE_CONNECTIONLESS:
LOG(LVL_WARN, "Message type %s is not implemented yet.", layer2_msg_type_to_string(header.msg_type));
return OK;
case L2_MSG_TYPE_DATA:
break;
default:
LOG(LVL_ERR, "Invalid message type %d.", header.msg_type);
return ERR_INVALID_STATE;
}
if(ctx->next_expected_seq != header.tx_seq_nr) {
LOG(LVL_ERR, "Expected sequence number %u, received %u.", ctx->next_expected_seq, header.tx_seq_nr);
return ERR_SEQUENCE;
}
ctx->next_expected_seq++;
ctx->next_expected_seq &= 0xF;
LOG(LVL_INFO, "Received ACK for seq_nr %u in packet seq_nr %u.", header.rx_seq_nr, header.tx_seq_nr);
// handle the acknowledgement internally
connection_handle_ack(ctx, header.rx_seq_nr);
size_t header_size = layer2_get_encoded_header_size(&header);
// extract the payload and forward it to the tun device
const uint8_t *payload = buf + header_size;
size_t payload_len = packet_size - header_size;
ctx->data_cb(ctx, payload, payload_len);
return OK;
}
uint8_t connection_get_next_expected_seq(const connection_ctx_t *ctx)
{
return ctx->next_expected_seq;
}
uint8_t connection_get_last_acked_seq(const connection_ctx_t *ctx)
{
return ctx->last_acked_seq;
}
result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len)
{
layer2_packet_header_t header;
if(packet_queue_get_free_space(&ctx->packet_queue) == 0) {
return ERR_NO_MEM;
}
header.dst_addr = ctx->peer_addr;
header.src_addr = ctx->my_addr;
header.msg_type = L2_MSG_TYPE_DATA;
header.rx_seq_nr = 0; // will be filled in layer2_tx_encode_next_packet()
header.tx_request = 0;
header.tx_seq_nr = ctx->next_seq_nr;
// create a persistent copy of the packet data.
// TODO: possibly this copy operation can be removed by passing a malloc'd buffer in.
uint8_t *packetbuf = malloc(buf_len);
if(!packetbuf) {
LOG(LVL_ERR, "malloc failed.");
return ERR_NO_MEM;
}
memcpy(packetbuf, buf, buf_len);
packet_queue_add(&ctx->packet_queue, &header, packetbuf, buf_len);
LOG(LVL_INFO, "Added packet tx_seq %u to queue -> %zu entries",
header.tx_seq_nr, packet_queue_get_used_space(&ctx->packet_queue));
ctx->next_seq_nr++;
ctx->next_seq_nr &= SEQ_NR_MASK;
return OK;
}
result_t connection_add_empty_packet(connection_ctx_t *ctx, bool tx_request)
{
layer2_packet_header_t header;
header.dst_addr.addr[0] = 0xFFFF;
header.dst_addr.length = 1;
header.src_addr.addr[0] = 0x0001;
header.src_addr.length = 1;
header.msg_type = L2_MSG_TYPE_EMPTY;
header.rx_seq_nr = 0; // will be filled in layer2_tx_encode_next_packet()
header.tx_seq_nr = 0; // not used in empty packets
header.tx_request = tx_request;
if (!packet_queue_add(&ctx->packet_queue, &header, NULL, 0)) {
return ERR_NO_MEM;
}
return OK;
}
size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t ack_seq_nr, uint8_t *buf, size_t buf_len)
{
const packet_queue_entry_t *entry = packet_queue_get(&ctx->packet_queue, ctx->next_packet_index);
if(!entry) {
// no more entries
return 0;
}
unsigned int crc_size = crc_sizeof_key(PAYLOAD_CRC_SCHEME);
assert(buf_len >= LAYER2_PACKET_HEADER_ENCODED_SIZE_MAX + crc_size + entry->data_len);
layer2_packet_header_t header = entry->header;
header.rx_seq_nr = ack_seq_nr;
// encode the header
LOG(LVL_DEBUG, "Encoding packet with rx_seq_nr %u, tx_seq_nr %u.", header.rx_seq_nr, header.tx_seq_nr);
size_t packet_size = layer2_encode_packet_header(&header, buf);
// add the payload data
if(entry->data) {
memcpy(buf + packet_size, entry->data, entry->data_len);
}
packet_size += entry->data_len;
// calculate CRC of everything and append it to the packet
crc_append_key(PAYLOAD_CRC_SCHEME, buf, packet_size);
packet_size += crc_size;
ctx->next_packet_index++;
return packet_size;
}
void connection_restart_tx(connection_ctx_t *ctx)
{
ctx->next_packet_index = 0;
}
void connection_tx_clean_empty_packet(connection_ctx_t *ctx)
{
const packet_queue_entry_t *entry = packet_queue_get(&ctx->packet_queue, 0);
if(entry && entry->header.msg_type == L2_MSG_TYPE_EMPTY) {
packet_queue_delete(&ctx->packet_queue, 1);
if(ctx->next_packet_index > 0) {
ctx->next_packet_index--;
}
}
}
void connection_handle_ack(connection_ctx_t *ctx, uint8_t acked_seq)
{
ctx->next_packet_index = 0;
size_t packets_to_remove = 0;
size_t packets_available = packet_queue_get_used_space(&ctx->packet_queue);
for(size_t i = 0; i < packets_available; i++) {
const packet_queue_entry_t *entry = packet_queue_get(&ctx->packet_queue, i);
if(entry->header.tx_seq_nr == acked_seq) {
break;
}
packets_to_remove++;
}
packet_queue_delete(&ctx->packet_queue, packets_to_remove);
packets_available = packet_queue_get_used_space(&ctx->packet_queue);
LOG(LVL_DEBUG, "handling ack for seq_nr %u, removing %zu packets, %zu packets remaining.", acked_seq, packets_to_remove, packets_available);
if(packets_available == 0) {
// no packets left in queue, but an acknowledgement must be
// transmitted. Add an empty packet to do that.
result_t res = connection_add_empty_packet(ctx, false);
if (res != OK) {
LOG(LVL_WARN, "Failed to add empty packet: %d.", res);
}
}
}
bool connection_can_transmit(const connection_ctx_t *ctx)
{
return (packet_queue_get_used_space(&ctx->packet_queue) != 0)
&& (packet_queue_get(&ctx->packet_queue, ctx->next_packet_index) != NULL);
}

View file

@ -0,0 +1,126 @@
/*
* This file contains functions to handle a single layer 2 connection.
*
* SPDX-License-Identifier: GPL-3.0-or-later
*
* Copyright (C) 2024 Thomas Kolb
*/
#ifndef CONNECTION_H
#define CONNECTION_H
#include <results.h>
#include <stdbool.h>
#include "packet_queue.h"
struct connection_ctx_s;
typedef enum {
CONN_STATE_UNINITIALIZED, //!< Uninitialized. Cannot be used in any way
CONN_STATE_INITIALIZED, //!< Initialized, no packets processed yet
CONN_STATE_CONNECTING, //!< Connection request sent, no two-way communication yet
CONN_STATE_ESTABLISHED, //!< Connection is established
CONN_STATE_CLOSED //!< Connection has been closed (gracefully or by timeout)
} connection_state_t;
/*!\brief Type for a callback function that is called when a data packet was received. */
typedef void (*connection_data_callback_t)(struct connection_ctx_s *conn, const uint8_t *data, size_t len);
typedef struct connection_ctx_s {
connection_state_t conn_state; //!< State of the connection.
connection_data_callback_t data_cb; //!< Callback function for received data packets.
ham64_t my_addr; //!< The local link layer address.
ham64_t peer_addr; //!< The link layer address of the peer.
uint8_t last_acked_seq; //!< Next sequence number expected by the peer (from last Ack).
uint8_t next_expected_seq; //!< Next sequence number expected by us.
packet_queue_t packet_queue; //!< Transmission packet queue.
size_t next_packet_index; //!< Index in the packet queue of the next packet to transmit.
uint8_t next_seq_nr; //!< Sequence number to tag the next transmitted packet with.
} connection_ctx_t;
/*!\brief Initialize the layer 2 connection context.
*
* \param ctx The connection context to initialize.
* \param my_addr The local link layer address.
* \param peer_addr The remote link layer address.
* \returns OK if everything worked or a fitting error code.
*/
result_t connection_init(connection_ctx_t *ctx, const ham64_t *my_addr, const ham64_t *peer_addr);
/*!\brief Destroy the given layer 2 connection context.
*/
void connection_destroy(connection_ctx_t *ctx);
/*!\brief Handle a received packet.
*
* \param ctx The receiver context.
* \param buf Where to write the encoded packet data.
* \param buf_len Space available in the buffer.
* \returns A result code from the packet handling procedure.
*/
result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, size_t buf_len);
/*!\brief Return the sequence number expected next by our side.
*/
uint8_t connection_get_next_expected_seq(const connection_ctx_t *ctx);
/*!\brief Return the sequence number expected next by the other side.
*/
uint8_t connection_get_last_acked_seq(const connection_ctx_t *ctx);
/*!\brief Enqueue a packet for transmission.
* \param ctx The connection context.
*/
result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len);
/*!\brief Add an empty packet to ensure an acknowledgement is sent.
* \param ctx The connection context.
* \param tx_request Value of the TX Request field in the packet.
*/
result_t connection_add_empty_packet(connection_ctx_t *ctx, bool tx_request);
/*!\brief Encode the next packet for transmission.
*
* \note
* If no more packets are available, this function returns zero. In that case,
* either \ref connection_restart() or \ref connection_handle_ack() must be
* called to handle retransmits correctly.
*
* \param ctx The connection context.
* \param ack_seq_nr The received sequence number to send as an acknowledgement.
* \param buf Where to write the encoded packet data.
* \param buf_len Space available in the buffer.
* \returns The number of bytes written to buf or zero if no packet was available.
*/
size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t ack_seq_nr, uint8_t *buf, size_t buf_len);
/*!\brief Restart the transmission from the beginning of the packet queue.
*/
void connection_restart_tx(connection_ctx_t *ctx);
/*!\brief Remove the first packet from the queue if it is an empty packet.
*/
void connection_tx_clean_empty_packet(connection_ctx_t *ctx);
/*!\brief Handle acknowledgements.
* \details
* Removes all packets before the given sequence number from the queue.
*
* \param ctx The connection context.
* \param acked_seq The acknowledged (= next expected) sequence number.
* \param do_ack Whether an empty packet shall be generated if the queue is empty.
*/
void connection_handle_ack(connection_ctx_t *ctx, uint8_t acked_seq);
/*!\brief Check if there are packets queued for transmission.
*/
bool connection_can_transmit(const connection_ctx_t *ctx);
#endif // CONNECTION_H

View file

@ -187,3 +187,19 @@ void ham64_format(const ham64_t *ham64, char *out)
out[5*ham64->length - 1] = '\0'; out[5*ham64->length - 1] = '\0';
} }
bool ham64_is_equal(const ham64_t *a, const ham64_t *b)
{
if(a->length != b->length) {
return false;
}
for(uint8_t i = 0; i < a->length; i++) {
if(a->addr[i] != b->addr[i]) {
return false;
}
}
return true;
}

View file

@ -9,6 +9,7 @@
#include <stddef.h> #include <stddef.h>
#include <stdint.h> #include <stdint.h>
#include <stdbool.h>
// buffer size required for the string representation of a maximum-length HAM64 // buffer size required for the string representation of a maximum-length HAM64
// address, including terminating zero. // address, including terminating zero.
@ -62,4 +63,11 @@ const char *ham64_addr_type_to_string(ham64_addr_type_t addr_type);
*/ */
void ham64_format(const ham64_t *ham64, char *out); void ham64_format(const ham64_t *ham64, char *out);
/*!\brief Check if two ham64 addresses are equal.
* \param a Pointer to the first address.
* \param b Pointer to the second address.
* \returns True if a and b are equal, false otherwise.
*/
bool ham64_is_equal(const ham64_t *a, const ham64_t *b);
#endif // HAM64_H #endif // HAM64_H

View file

@ -6,7 +6,6 @@
*/ */
#include <string.h> #include <string.h>
#include <assert.h>
#include <unistd.h> #include <unistd.h>
#include <errno.h> #include <errno.h>

View file

@ -11,16 +11,17 @@
typedef enum { typedef enum {
OK, OK,
ERR_INVALID_STATE, ERR_INVALID_STATE, // module or context is in an invalid state
ERR_INVALID_PARAM, // invalid / nonsense parameters given ERR_INVALID_PARAM, // invalid / nonsense parameters given
ERR_NO_MEM, // not enough memory or allocation error ERR_INVALID_ADDRESS, // invalid address received or given
ERR_SIZE, // a given size is invalid ERR_NO_MEM, // not enough memory or allocation error
ERR_LIQUID, // an error occurred in the LiquidDSP library. ERR_SIZE, // a given size is invalid
ERR_SYSCALL, // a syscall failed. Use errno to determine the cause. ERR_LIQUID, // an error occurred in the LiquidDSP library.
ERR_SOAPY, // an error occurred in the SoapySDR library. ERR_SYSCALL, // a syscall failed. Use errno to determine the cause.
ERR_SDR, // an error occurred in the SDR interface. ERR_SOAPY, // an error occurred in the SoapySDR library.
ERR_INTEGRITY, // an integrity check failed (e.g. CRC of received packet is wrong) ERR_SDR, // an error occurred in the SDR interface.
ERR_SEQUENCE, // an unexpected packet was received ERR_INTEGRITY, // an integrity check failed (e.g. CRC of received packet is wrong)
ERR_SEQUENCE, // an unexpected packet was received
} result_t; } result_t;
#ifdef DEBUG_LIQUID #ifdef DEBUG_LIQUID