WIP: managing multiple connections

Working towards handling multiple connections. A lot is still missing.
This commit is contained in:
Thomas Kolb 2024-11-10 17:02:14 +01:00
parent 6e303e8aed
commit 01edbb1db1
9 changed files with 349 additions and 42 deletions

View file

@ -1,6 +1,9 @@
#include <string.h>
#include <assert.h>
#define LOGGER_MODULE_NAME "conn"
#include "logger.h"
#include "connection.h"
#include "config.h"
@ -11,7 +14,13 @@
#define SEQ_NR_MASK 0xF
result_t connection_init(connection_ctx_t *ctx, const ham64_t *my_addr, const ham64_t *peer_addr, connection_data_callback_t data_cb)
result_t connection_init(
connection_ctx_t *ctx,
const ham64_t *my_addr,
const ham64_t *peer_addr,
connection_data_callback_t data_cb,
connection_event_callback_t event_cb,
void *user_ctx)
{
ctx->last_acked_seq = 0;
ctx->next_expected_seq = 0;
@ -24,6 +33,13 @@ result_t connection_init(connection_ctx_t *ctx, const ham64_t *my_addr, const ha
ctx->peer_addr = *peer_addr;
ctx->data_cb = data_cb;
ctx->event_cb = event_cb;
uint64_t now = get_hires_time();
ctx->last_rx_time = now;
ctx->retransmit_time = 0;
ctx->user_context = user_ctx;
ctx->conn_state = CONN_STATE_INITIALIZED;
@ -168,18 +184,15 @@ result_t connection_handle_packet_prechecked(
ctx->next_expected_seq++;
ctx->next_expected_seq &= 0xF;
ctx->last_rx_time = get_hires_time();
LOG(LVL_INFO, "Received ACK for seq_nr %u in packet seq_nr %u.", header->rx_seq_nr, header->tx_seq_nr);
// handle the acknowledgement internally
connection_handle_ack(ctx, header->rx_seq_nr, true);
size_t header_size = layer2_get_encoded_header_size(&header);
// extract the payload and forward it to the tun device
const uint8_t *payload = buf + header_size;
size_t payload_len = packet_size - header_size;
ctx->data_cb(ctx, payload, payload_len);
ctx->data_cb(ctx, payload, payload_len, ctx->user_context);
return OK;
}
@ -287,7 +300,56 @@ result_t connection_add_empty_packet(connection_ctx_t *ctx, bool tx_request)
}
size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t ack_seq_nr, uint8_t *buf, size_t buf_len)
result_t connection_send_parameters(connection_ctx_t *ctx)
{
// check the connection state
switch(ctx->conn_state) {
case CONN_STATE_UNINITIALIZED:
case CONN_STATE_CLOSED:
case CONN_STATE_CONNECTING:
case CONN_STATE_ESTABLISHED:
LOG(LVL_ERR, "Trying to send connection parameters in state %u", ctx->conn_state);
return ERR_INVALID_STATE;
case CONN_STATE_INITIALIZED:
// in these states, packets can be handled
break;
}
layer2_packet_header_t header;
header.dst_addr = ctx->peer_addr;
header.src_addr = ctx->my_addr;
header.msg_type = L2_MSG_TYPE_CONN_MGMT;
header.rx_seq_nr = 0; // will be filled in layer2_tx_encode_next_packet()
header.tx_seq_nr = ctx->next_seq_nr;
header.tx_request = 1;
size_t payload_len = 1;
uint8_t *payload = malloc(payload_len);
if(!payload) {
return ERR_NO_MEM;
}
payload[0] = CONN_MGMT_TYPE_CONNECTION_PARAMETERS;
// TODO: calculate IP addresses from clients HAM64 address
if (!packet_queue_add(&ctx->packet_queue, &header, payload, payload_len)) {
return ERR_NO_MEM;
}
ctx->next_seq_nr++;
ctx->next_seq_nr &= SEQ_NR_MASK;
// connection is considered established after the connection parameters are sent.
ctx->conn_state = CONN_STATE_ESTABLISHED;
return OK;
}
size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len, bool *end_burst)
{
// check the connection state
switch(ctx->conn_state) {
@ -311,7 +373,7 @@ size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t ack_seq_nr,
}
layer2_packet_header_t header = entry->header;
header.rx_seq_nr = ack_seq_nr;
header.rx_seq_nr = ctx->next_expected_seq;
// encode the header
LOG(LVL_DEBUG, "Encoding packet with rx_seq_nr %u, tx_seq_nr %u.", header.rx_seq_nr, header.tx_seq_nr);
@ -324,6 +386,7 @@ size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t ack_seq_nr,
ctx->next_packet_index++;
*end_burst = header.tx_request;
return packet_size;
}
@ -408,14 +471,37 @@ bool connection_can_transmit(const connection_ctx_t *ctx)
}
bool connection_is_closed(const connection_ctx_t *ctx)
{
switch(ctx->conn_state) {
case CONN_STATE_UNINITIALIZED:
case CONN_STATE_INITIALIZED:
case CONN_STATE_CLOSED:
return true;
case CONN_STATE_CONNECTING:
case CONN_STATE_ESTABLISHED:
return false;
}
}
result_t connection_maintain(connection_ctx_t *ctx)
{
uint64_t now = get_hires_time();
if(now > ctx->last_rx_time + HRTIME_MS(CONNECTION_TIMEOUT_MS)) {
LOG(LVL_INFO, "Connection timed out.");
ctx->conn_state = CONN_STATE_CLOSED;
ctx->event_cb(ctx, CONN_EVT_TIMEOUT, ctx->user_context);
return OK;
}
if(ctx->retransmit_time != 0 && now >= ctx->retransmit_time) {
LOG(LVL_INFO, "Retransmit triggered.");
ctx->retransmit_time = 0;
connection_restart_tx(ctx);
ctx->event_cb(ctx, CONN_EVT_RETRANSMIT, ctx->user_context);
}
return OK;

View file

@ -25,12 +25,21 @@ typedef enum {
} connection_state_t;
/*!\brief Type for a callback function that is called when a data packet was received. */
typedef void (*connection_data_callback_t)(struct connection_ctx_s *conn, const uint8_t *data, size_t len);
typedef void (*connection_data_callback_t)(struct connection_ctx_s *conn, const uint8_t *data, size_t len, void *user_ctx);
typedef enum {
CONN_EVT_TIMEOUT, //!< The connection timed out because no packets were received
CONN_EVT_RETRANSMIT, //!< Packet queue transmission is restarted
} connection_evt_t;
/*!\brief Type for a callback function that is called on various connection events. */
typedef void (*connection_event_callback_t)(struct connection_ctx_s *conn, connection_evt_t evt, void *user_ctx);
typedef struct connection_ctx_s {
connection_state_t conn_state; //!< State of the connection.
connection_data_callback_t data_cb; //!< Callback function for received data packets.
connection_event_callback_t event_cb; //!< Callback function for event signalling.
ham64_t my_addr; //!< The local link layer address.
ham64_t peer_addr; //!< The link layer address of the peer.
@ -45,6 +54,8 @@ typedef struct connection_ctx_s {
uint64_t retransmit_time; //!< Time when a retransmit shall be triggered.
uint64_t last_rx_time; //!< Time when a packet was last received and decoded.
void *user_context; //!< An arbitrary pointer that can set by the user.
} connection_ctx_t;
@ -54,9 +65,17 @@ typedef struct connection_ctx_s {
* \param my_addr The local link layer address.
* \param peer_addr The remote link layer address.
* \param data_cb Callback for handling received payload data.
* \param event_cb Callback for connection events.
* \param user_ctx User context pointer (for arbitrary data).
* \returns OK if everything worked or a fitting error code.
*/
result_t connection_init(connection_ctx_t *ctx, const ham64_t *my_addr, const ham64_t *peer_addr, connection_data_callback_t data_cb);
result_t connection_init(
connection_ctx_t *ctx,
const ham64_t *my_addr,
const ham64_t *peer_addr,
connection_data_callback_t data_cb,
connection_event_callback_t event_cb,
void *user_ctx);
/*!\brief Destroy the given layer 2 connection context.
*/
@ -113,6 +132,14 @@ bool connection_can_enqueue_packet(const connection_ctx_t *ctx);
*/
result_t connection_add_empty_packet(connection_ctx_t *ctx, bool tx_request);
/*!\brief Send connection parameters.
*
* This packet accepts an incoming connection request.
*
* \param ctx The connection context.
*/
result_t connection_send_parameters(connection_ctx_t *ctx);
/*!\brief Encode the next packet for transmission.
*
* \note
@ -121,12 +148,12 @@ result_t connection_add_empty_packet(connection_ctx_t *ctx, bool tx_request);
* called to handle retransmits correctly.
*
* \param ctx The connection context.
* \param ack_seq_nr The received sequence number to send as an acknowledgement.
* \param buf Where to write the encoded packet data.
* \param buf_len Space available in the buffer.
* \param end_burst Output parameter that is set to true if this is the last packet in a burst.
* \returns The number of bytes written to buf or zero if no packet was available.
*/
size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t ack_seq_nr, uint8_t *buf, size_t buf_len);
size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len, bool *end_burst);
/*!\brief Restart the transmission from the beginning of the packet queue.
*/
@ -150,6 +177,10 @@ void connection_handle_ack(connection_ctx_t *ctx, uint8_t acked_seq, bool do_ack
*/
bool connection_can_transmit(const connection_ctx_t *ctx);
/*!\brief Check if this connection is closed.
*/
bool connection_is_closed(const connection_ctx_t *ctx);
/*!\brief Handle internal maintenance tasks.
*
* This should be called periodically to handle timeouts and retransmissions.

View file

@ -7,6 +7,12 @@
#define SEQ_NR_MASK 0xF
static destroy_entry(connection_list_entry_t *entry)
{
connection_destroy(&entry->connection);
free(entry);
}
// find the location where the timestamp should be inserted to maintain
// ascending timestamp order and return the then-previous entry.
//
@ -50,7 +56,7 @@ void connection_list_destroy(connection_list_t *list)
// delete all list entries
while(list->head) {
connection_list_entry_t *next = list->head->next;
free(list->head);
destroy_entry(list->head);
list->head = next;
}
}
@ -135,8 +141,39 @@ result_t connection_list_delete_head(connection_list_t *list)
connection_list_entry_t *new_head = list->head->next;
free(list->head);
destroy_entry(list->head);
list->head = new_head;
return OK;
}
result_t connection_list_delete_closed(connection_list_t *list)
{
if(!list->head) {
return OK;
}
connection_list_entry_t *prev = NULL;
connection_list_entry_t *cur = list->head;
while(cur) {
if(connection_is_closed(&cur->connection)) {
connection_list_entry_t *to_delete = cur;
cur = cur->next;
if(prev) {
prev->next = to_delete->next;
} else {
list->head = to_delete->next;
}
destroy_entry(to_delete);
} else {
prev = cur;
cur = cur->next;
}
}
return OK;
}

View file

@ -75,4 +75,12 @@ result_t connection_list_reschedule_head(connection_list_t *list, uint64_t next_
*/
result_t connection_list_delete_head(connection_list_t *list);
/*!\brief Delete all connections that are in a closed state.
*
* \param list Pointer to the list to clean.
*
* \returns OK if everything worked or a fitting error code.
*/
result_t connection_list_delete_closed(connection_list_t *list);
#endif // CONNECTION_LIST_H

View file

@ -1,14 +1,81 @@
#include <string.h>
#include <assert.h>
#define LOGGER_MODULE_NAME "digi"
#include "logger.h"
#include "digipeater.h"
#include "config.h"
#include "layer2/connection.h"
#include "layer2/connection_list.h"
#include "layer2/ham64.h"
#include "layer2/packet_structs.h"
#include "results.h"
#include "utils.h"
void conn_data_cb(struct connection_ctx_s *conn, const uint8_t *data, size_t len, void *user_ctx)
{
(void)conn;
digipeater_ctx_t *digi_ctx = user_ctx;
digi_ctx->data_cb(digi_ctx, data, len);
}
void conn_event_cb(struct connection_ctx_s *conn, connection_evt_t evt, void *user_ctx)
{
(void)conn;
digipeater_ctx_t *digi_ctx = user_ctx;
switch(evt) {
case CONN_EVT_TIMEOUT:
// connection has been closed by timeout -> clean up the list
connection_list_delete_closed(&digi_ctx->conn_list);
break;
default:
// do nothing
break;
}
}
static result_t digipeater_handle_beacon_responses(digipeater_ctx_t *ctx, const layer2_packet_header_t *header, const uint8_t *buf, size_t buf_len)
{
LOG(LVL_DEBUG, "Handling beacon response packet.");
layer2_dump_packet_header(LVL_DUMP, header);
if(header->msg_type != L2_MSG_TYPE_CONN_MGMT) {
LOG(LVL_ERR, "Beacon response with invalid message type %i", header->msg_type);
return ERR_INVALID_PARAM;
}
if(buf_len == 0) {
LOG(LVL_ERR, "Missing payload in beacon response");
return ERR_INVALID_PARAM;
}
uint8_t conn_mgmt_type = buf[0];
if(conn_mgmt_type != CONN_MGMT_TYPE_CONNECTION_REQUEST) {
LOG(LVL_ERR, "Unexpected connection management type in beacon response: 0x%02x", conn_mgmt_type);
return ERR_INVALID_PARAM;
}
// packet is valid -> create a new connection, enqueue the parameters message
// and add it at the beginning of the connection list.
connection_ctx_t new_conn;
ERR_CHECK(connection_init(&new_conn, &ctx->my_addr, &header->src_addr, conn_data_cb, conn_event_cb, ctx));
ERR_CHECK(connection_send_parameters(&new_conn));
uint64_t now = get_hires_time();
ERR_CHECK(connection_list_insert(&ctx->conn_list, &new_conn, now));
return OK;
}
result_t digipeater_init(digipeater_ctx_t *ctx, const ham64_t *my_addr, digipeater_data_callback_t data_cb)
{
ctx->my_addr = *my_addr;
@ -69,14 +136,28 @@ result_t digipeater_handle_packet(digipeater_ctx_t *ctx, const uint8_t *buf, siz
}
// FIXME: handle connection management packets here
// FIXME: handle data and empty packets in the current connection
size_t header_size = layer2_get_encoded_header_size(&header);
const uint8_t *payload = buf + header_size;
size_t payload_len = packet_size - header_size;
connection_handle_packet_prechecked(current_conn, &header, payload, payload_len);
switch(ctx->state) {
case DIGIPEATER_STATE_CONN:
{
connection_list_entry_t *head = connection_list_get_head(&ctx->conn_list);
if(head) {
connection_ctx_t *current_conn = &head->connection;
return connection_handle_packet_prechecked(current_conn, &header, payload, payload_len);
} else {
LOG(LVL_WARN, "Digipeater in CONN state, but there is no active connection! Packet dropped.");
return OK;
}
}
case DIGIPEATER_STATE_BEACON:
return digipeater_handle_beacon_responses(ctx, &header, payload, payload_len);
}
return OK;
}
@ -85,21 +166,50 @@ result_t digipeater_handle_packet(digipeater_ctx_t *ctx, const uint8_t *buf, siz
size_t digipeater_encode_next_packet(digipeater_ctx_t *ctx, uint8_t *buf, size_t buf_len, bool *end_burst)
{
uint64_t now = get_hires_time();
size_t packet_size = 0;
*end_burst = false;
if(now > ctx->next_beacon_time) {
// TODO: build a beacon packet
// build a beacon packet
layer2_packet_header_t header;
ham64_t broadcast = {{0xFFFF, 0, 0, 0}, 1};
header.dst_addr = broadcast;
header.src_addr = ctx->my_addr;
header.msg_type = L2_MSG_TYPE_CONN_MGMT;
header.rx_seq_nr = 0; // unused
header.tx_seq_nr = 0; // unused
header.tx_request = 1;
uint8_t payload[1] = {CONN_MGMT_TYPE_BEACON};
packet_size = layer2_encode_packet(&header, payload, 1, buf, buf_len);
ctx->state = DIGIPEATER_STATE_BEACON;
*end_burst = true;
return packet_size;
} else {
// pull packets from the current connection
connection_list_entry_t *head = connection_list_get_head(&ctx->conn_list);
if(!head) {
return 0;
}
// TODO: pull packets from the current connection
connection_ctx_t *conn = &head->connection;
if(connection_can_transmit(conn)) {
packet_size = connection_encode_next_packet(conn, buf, buf_len, end_burst);
return 0;
ctx->state = DIGIPEATER_STATE_CONN;
}
return packet_size;
}
}
bool digipeater_can_transmit(const digipeater_ctx_t *ctx)
bool digipeater_can_transmit(digipeater_ctx_t *ctx)
{
uint64_t now = get_hires_time();
@ -107,16 +217,34 @@ bool digipeater_can_transmit(const digipeater_ctx_t *ctx)
return true;
}
// TODO: also true if next TX time has expired and there are packets waiting for this connection.
connection_list_entry_t *head = connection_list_get_head(&ctx->conn_list);
if(head) {
return connection_can_transmit(&head->connection);
}
/*return (packet_queue_get_used_space(&ctx->packet_queue) != 0)
&& (packet_queue_get(&ctx->packet_queue, ctx->next_packet_index) != NULL); */
return false;
}
result_t digipeater_maintain(digipeater_ctx_t *ctx)
{
(void)ctx;
switch(ctx->state) {
case DIGIPEATER_STATE_CONN:
{
connection_list_entry_t *head = connection_list_get_head(&ctx->conn_list);
if(!head) {
LOG(LVL_INFO, "No active connection -> force beacon state for packet handling.");
ctx->state = DIGIPEATER_STATE_BEACON;
} else {
ERR_CHECK(connection_maintain(&head->connection));
}
break;
}
case DIGIPEATER_STATE_BEACON:
// nothing to do here
break;
}
return OK;
}

View file

@ -83,7 +83,7 @@ size_t digipeater_encode_next_packet(digipeater_ctx_t *ctx, uint8_t *buf, size_t
/*!\brief Check if there are packets queued for transmission.
*/
bool digipeater_can_transmit(const digipeater_ctx_t *ctx);
bool digipeater_can_transmit(digipeater_ctx_t *ctx);
/*!\brief Handle internal maintenance tasks.
*

View file

@ -33,6 +33,7 @@ typedef struct layer2_packet_header_s {
ham64_t dst_addr; //!< destination HAM-64 address
} layer2_packet_header_t;
// maximum header size
// - 1 byte packet info
// - 1 byte sequence numbers
@ -104,6 +105,22 @@ typedef struct layer2_data_header_s {
/* Connection Management Structs */
// TODO
typedef enum {
CONN_MGMT_TYPE_BEACON = 0x00,
CONN_MGMT_TYPE_CONNECTION_REQUEST = 0x01,
CONN_MGMT_TYPE_CONNECTION_PARAMETERS = 0x02,
CONN_MGMT_TYPE_CONNECTION_RESET = 0x03,
CONN_MGMT_TYPE_DISCONNECT_REQUEST = 0x04,
CONN_MGMT_TYPE_DISCONNECT = 0x05,
} conn_mgmt_type_t;
typedef enum {
CONN_PARAM_TYPE_IPV6_ADDRESS = 0x00,
CONN_PARAM_TYPE_IPV6_GATEWAY = 0x01,
CONN_PARAM_TYPE_IPV6_DNS = 0x02,
CONN_PARAM_TYPE_IPV4_ADDRESS = 0x08,
CONN_PARAM_TYPE_IPV4_GATEWAY = 0x09,
CONN_PARAM_TYPE_IPV4_DNS = 0x0A,
} conn_param_type_t;
#endif // PACKET_STRUCTS_H

View file

@ -222,16 +222,13 @@ int main(int argc, char **argv)
size_t total_bytes = 0;
uint64_t next_stats_print_time = old + HRTIME_MS(500);
uint64_t retransmit_time = 0;
// TODO: wait for beacon
// TODO: send connection request
while(m_running) {
uint64_t now = get_hires_time();
if(retransmit_time != 0 && now >= retransmit_time) {
LOG(LVL_INFO, "Retransmit triggered.");
retransmit_time = 0;
connection_restart_tx(&l2conn);
}
RESULT_CHECK(connection_maintain(&l2conn));
// fill the TX queue from the TUN device
while(connection_can_enqueue_packet(&l2conn)) {

View file

@ -30,6 +30,7 @@
#include "debug_structs.h"
#include "layer2/connection.h"
#include "layer2/connection_list.h"
#include "layer2/tundev.h"
@ -154,6 +155,9 @@ int main(int argc, char **argv)
return 1;
}
connection_list_t conn_list;
RESULT_CHECK(connection_list_init(&conn_list));
ham64_t my_address, peer_address;
ham64_encode(MY_CALL, &my_address);
ham64_encode(PEER_CALL, &peer_address);
@ -222,18 +226,17 @@ int main(int argc, char **argv)
size_t total_bytes = 0;
uint64_t next_stats_print_time = old + HRTIME_MS(500);
uint64_t retransmit_time = 0;
uint64_t next_beacon_time = old + HRTIME_MS(5000);
while(m_running) {
uint64_t now = get_hires_time();
if(retransmit_time != 0 && now >= retransmit_time) {
LOG(LVL_INFO, "Retransmit triggered.");
retransmit_time = 0;
connection_restart_tx(&l2conn);
if(now >= next_beacon_time) {
// TODO: encode and transmit beacon
next_beacon_time += HRTIME_MS(5000);
}
// fill the TX queue from the TUN device
// FIXME: fill the TX queues from the TUN device
while(connection_can_enqueue_packet(&l2conn)) {
int ret = poll(&pfd_tun, 1, 0 /* timeout */);
if(ret < 0) {