From 01edbb1db1789add8dd33f7269fab4678efb3058 Mon Sep 17 00:00:00 2001 From: Thomas Kolb Date: Sun, 10 Nov 2024 17:02:14 +0100 Subject: [PATCH] WIP: managing multiple connections Working towards handling multiple connections. A lot is still missing. --- impl/src/layer2/connection.c | 104 +++++++++++- impl/src/layer2/connection.h | 39 ++++- impl/src/layer2/connection_list.c | 41 ++++- impl/src/layer2/connection_list.h | 8 + impl/src/layer2/digipeater.c | 154 ++++++++++++++++-- impl/src/layer2/digipeater.h | 2 +- impl/src/layer2/packet_structs.h | 19 ++- impl/test/layer2_over_udp/l2udptest_client.c | 9 +- .../layer2_over_udp/l2udptest_digipeater.c | 15 +- 9 files changed, 349 insertions(+), 42 deletions(-) diff --git a/impl/src/layer2/connection.c b/impl/src/layer2/connection.c index d6f6f5e..069b444 100644 --- a/impl/src/layer2/connection.c +++ b/impl/src/layer2/connection.c @@ -1,6 +1,9 @@ #include #include +#define LOGGER_MODULE_NAME "conn" +#include "logger.h" + #include "connection.h" #include "config.h" @@ -11,7 +14,13 @@ #define SEQ_NR_MASK 0xF -result_t connection_init(connection_ctx_t *ctx, const ham64_t *my_addr, const ham64_t *peer_addr, connection_data_callback_t data_cb) +result_t connection_init( + connection_ctx_t *ctx, + const ham64_t *my_addr, + const ham64_t *peer_addr, + connection_data_callback_t data_cb, + connection_event_callback_t event_cb, + void *user_ctx) { ctx->last_acked_seq = 0; ctx->next_expected_seq = 0; @@ -24,6 +33,13 @@ result_t connection_init(connection_ctx_t *ctx, const ham64_t *my_addr, const ha ctx->peer_addr = *peer_addr; ctx->data_cb = data_cb; + ctx->event_cb = event_cb; + + uint64_t now = get_hires_time(); + ctx->last_rx_time = now; + ctx->retransmit_time = 0; + + ctx->user_context = user_ctx; ctx->conn_state = CONN_STATE_INITIALIZED; @@ -168,18 +184,15 @@ result_t connection_handle_packet_prechecked( ctx->next_expected_seq++; ctx->next_expected_seq &= 0xF; + ctx->last_rx_time = get_hires_time(); + LOG(LVL_INFO, "Received ACK for seq_nr %u in packet seq_nr %u.", header->rx_seq_nr, header->tx_seq_nr); // handle the acknowledgement internally connection_handle_ack(ctx, header->rx_seq_nr, true); - size_t header_size = layer2_get_encoded_header_size(&header); - // extract the payload and forward it to the tun device - const uint8_t *payload = buf + header_size; - size_t payload_len = packet_size - header_size; - - ctx->data_cb(ctx, payload, payload_len); + ctx->data_cb(ctx, payload, payload_len, ctx->user_context); return OK; } @@ -287,7 +300,56 @@ result_t connection_add_empty_packet(connection_ctx_t *ctx, bool tx_request) } -size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t ack_seq_nr, uint8_t *buf, size_t buf_len) +result_t connection_send_parameters(connection_ctx_t *ctx) +{ + // check the connection state + switch(ctx->conn_state) { + case CONN_STATE_UNINITIALIZED: + case CONN_STATE_CLOSED: + case CONN_STATE_CONNECTING: + case CONN_STATE_ESTABLISHED: + LOG(LVL_ERR, "Trying to send connection parameters in state %u", ctx->conn_state); + return ERR_INVALID_STATE; + + case CONN_STATE_INITIALIZED: + // in these states, packets can be handled + break; + } + + layer2_packet_header_t header; + + header.dst_addr = ctx->peer_addr; + header.src_addr = ctx->my_addr; + header.msg_type = L2_MSG_TYPE_CONN_MGMT; + header.rx_seq_nr = 0; // will be filled in layer2_tx_encode_next_packet() + header.tx_seq_nr = ctx->next_seq_nr; + header.tx_request = 1; + + size_t payload_len = 1; + uint8_t *payload = malloc(payload_len); + if(!payload) { + return ERR_NO_MEM; + } + + payload[0] = CONN_MGMT_TYPE_CONNECTION_PARAMETERS; + + // TODO: calculate IP addresses from client’s HAM64 address + + if (!packet_queue_add(&ctx->packet_queue, &header, payload, payload_len)) { + return ERR_NO_MEM; + } + + ctx->next_seq_nr++; + ctx->next_seq_nr &= SEQ_NR_MASK; + + // connection is considered established after the connection parameters are sent. + ctx->conn_state = CONN_STATE_ESTABLISHED; + + return OK; +} + + +size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len, bool *end_burst) { // check the connection state switch(ctx->conn_state) { @@ -311,7 +373,7 @@ size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t ack_seq_nr, } layer2_packet_header_t header = entry->header; - header.rx_seq_nr = ack_seq_nr; + header.rx_seq_nr = ctx->next_expected_seq; // encode the header LOG(LVL_DEBUG, "Encoding packet with rx_seq_nr %u, tx_seq_nr %u.", header.rx_seq_nr, header.tx_seq_nr); @@ -324,6 +386,7 @@ size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t ack_seq_nr, ctx->next_packet_index++; + *end_burst = header.tx_request; return packet_size; } @@ -408,14 +471,37 @@ bool connection_can_transmit(const connection_ctx_t *ctx) } +bool connection_is_closed(const connection_ctx_t *ctx) +{ + switch(ctx->conn_state) { + case CONN_STATE_UNINITIALIZED: + case CONN_STATE_INITIALIZED: + case CONN_STATE_CLOSED: + return true; + + case CONN_STATE_CONNECTING: + case CONN_STATE_ESTABLISHED: + return false; + } +} + + result_t connection_maintain(connection_ctx_t *ctx) { uint64_t now = get_hires_time(); + if(now > ctx->last_rx_time + HRTIME_MS(CONNECTION_TIMEOUT_MS)) { + LOG(LVL_INFO, "Connection timed out."); + ctx->conn_state = CONN_STATE_CLOSED; + ctx->event_cb(ctx, CONN_EVT_TIMEOUT, ctx->user_context); + return OK; + } + if(ctx->retransmit_time != 0 && now >= ctx->retransmit_time) { LOG(LVL_INFO, "Retransmit triggered."); ctx->retransmit_time = 0; connection_restart_tx(ctx); + ctx->event_cb(ctx, CONN_EVT_RETRANSMIT, ctx->user_context); } return OK; diff --git a/impl/src/layer2/connection.h b/impl/src/layer2/connection.h index 8ea5791..46979be 100644 --- a/impl/src/layer2/connection.h +++ b/impl/src/layer2/connection.h @@ -25,12 +25,21 @@ typedef enum { } connection_state_t; /*!\brief Type for a callback function that is called when a data packet was received. */ -typedef void (*connection_data_callback_t)(struct connection_ctx_s *conn, const uint8_t *data, size_t len); +typedef void (*connection_data_callback_t)(struct connection_ctx_s *conn, const uint8_t *data, size_t len, void *user_ctx); + +typedef enum { + CONN_EVT_TIMEOUT, //!< The connection timed out because no packets were received + CONN_EVT_RETRANSMIT, //!< Packet queue transmission is restarted +} connection_evt_t; + +/*!\brief Type for a callback function that is called on various connection events. */ +typedef void (*connection_event_callback_t)(struct connection_ctx_s *conn, connection_evt_t evt, void *user_ctx); typedef struct connection_ctx_s { connection_state_t conn_state; //!< State of the connection. connection_data_callback_t data_cb; //!< Callback function for received data packets. + connection_event_callback_t event_cb; //!< Callback function for event signalling. ham64_t my_addr; //!< The local link layer address. ham64_t peer_addr; //!< The link layer address of the peer. @@ -45,6 +54,8 @@ typedef struct connection_ctx_s { uint64_t retransmit_time; //!< Time when a retransmit shall be triggered. uint64_t last_rx_time; //!< Time when a packet was last received and decoded. + + void *user_context; //!< An arbitrary pointer that can set by the user. } connection_ctx_t; @@ -54,9 +65,17 @@ typedef struct connection_ctx_s { * \param my_addr The local link layer address. * \param peer_addr The remote link layer address. * \param data_cb Callback for handling received payload data. + * \param event_cb Callback for connection events. + * \param user_ctx User context pointer (for arbitrary data). * \returns OK if everything worked or a fitting error code. */ -result_t connection_init(connection_ctx_t *ctx, const ham64_t *my_addr, const ham64_t *peer_addr, connection_data_callback_t data_cb); +result_t connection_init( + connection_ctx_t *ctx, + const ham64_t *my_addr, + const ham64_t *peer_addr, + connection_data_callback_t data_cb, + connection_event_callback_t event_cb, + void *user_ctx); /*!\brief Destroy the given layer 2 connection context. */ @@ -113,6 +132,14 @@ bool connection_can_enqueue_packet(const connection_ctx_t *ctx); */ result_t connection_add_empty_packet(connection_ctx_t *ctx, bool tx_request); +/*!\brief Send connection parameters. + * + * This packet accepts an incoming connection request. + * + * \param ctx The connection context. + */ +result_t connection_send_parameters(connection_ctx_t *ctx); + /*!\brief Encode the next packet for transmission. * * \note @@ -121,12 +148,12 @@ result_t connection_add_empty_packet(connection_ctx_t *ctx, bool tx_request); * called to handle retransmits correctly. * * \param ctx The connection context. - * \param ack_seq_nr The received sequence number to send as an acknowledgement. * \param buf Where to write the encoded packet data. * \param buf_len Space available in the buffer. + * \param end_burst Output parameter that is set to true if this is the last packet in a burst. * \returns The number of bytes written to buf or zero if no packet was available. */ -size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t ack_seq_nr, uint8_t *buf, size_t buf_len); +size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len, bool *end_burst); /*!\brief Restart the transmission from the beginning of the packet queue. */ @@ -150,6 +177,10 @@ void connection_handle_ack(connection_ctx_t *ctx, uint8_t acked_seq, bool do_ack */ bool connection_can_transmit(const connection_ctx_t *ctx); +/*!\brief Check if this connection is closed. + */ +bool connection_is_closed(const connection_ctx_t *ctx); + /*!\brief Handle internal maintenance tasks. * * This should be called periodically to handle timeouts and retransmissions. diff --git a/impl/src/layer2/connection_list.c b/impl/src/layer2/connection_list.c index 1a58990..5015de2 100644 --- a/impl/src/layer2/connection_list.c +++ b/impl/src/layer2/connection_list.c @@ -7,6 +7,12 @@ #define SEQ_NR_MASK 0xF +static destroy_entry(connection_list_entry_t *entry) +{ + connection_destroy(&entry->connection); + free(entry); +} + // find the location where the timestamp should be inserted to maintain // ascending timestamp order and return the then-previous entry. // @@ -50,7 +56,7 @@ void connection_list_destroy(connection_list_t *list) // delete all list entries while(list->head) { connection_list_entry_t *next = list->head->next; - free(list->head); + destroy_entry(list->head); list->head = next; } } @@ -135,8 +141,39 @@ result_t connection_list_delete_head(connection_list_t *list) connection_list_entry_t *new_head = list->head->next; - free(list->head); + destroy_entry(list->head); list->head = new_head; return OK; } + + +result_t connection_list_delete_closed(connection_list_t *list) +{ + if(!list->head) { + return OK; + } + + connection_list_entry_t *prev = NULL; + connection_list_entry_t *cur = list->head; + + while(cur) { + if(connection_is_closed(&cur->connection)) { + connection_list_entry_t *to_delete = cur; + cur = cur->next; + + if(prev) { + prev->next = to_delete->next; + } else { + list->head = to_delete->next; + } + + destroy_entry(to_delete); + } else { + prev = cur; + cur = cur->next; + } + } + + return OK; +} diff --git a/impl/src/layer2/connection_list.h b/impl/src/layer2/connection_list.h index 3b16807..b3636c6 100644 --- a/impl/src/layer2/connection_list.h +++ b/impl/src/layer2/connection_list.h @@ -75,4 +75,12 @@ result_t connection_list_reschedule_head(connection_list_t *list, uint64_t next_ */ result_t connection_list_delete_head(connection_list_t *list); +/*!\brief Delete all connections that are in a closed state. + * + * \param list Pointer to the list to clean. + * + * \returns OK if everything worked or a fitting error code. + */ +result_t connection_list_delete_closed(connection_list_t *list); + #endif // CONNECTION_LIST_H diff --git a/impl/src/layer2/digipeater.c b/impl/src/layer2/digipeater.c index 06f324b..bdde98e 100644 --- a/impl/src/layer2/digipeater.c +++ b/impl/src/layer2/digipeater.c @@ -1,14 +1,81 @@ #include #include +#define LOGGER_MODULE_NAME "digi" +#include "logger.h" + #include "digipeater.h" #include "config.h" #include "layer2/connection.h" +#include "layer2/connection_list.h" #include "layer2/ham64.h" +#include "layer2/packet_structs.h" #include "results.h" #include "utils.h" +void conn_data_cb(struct connection_ctx_s *conn, const uint8_t *data, size_t len, void *user_ctx) +{ + (void)conn; + digipeater_ctx_t *digi_ctx = user_ctx; + + digi_ctx->data_cb(digi_ctx, data, len); +} + +void conn_event_cb(struct connection_ctx_s *conn, connection_evt_t evt, void *user_ctx) +{ + (void)conn; + digipeater_ctx_t *digi_ctx = user_ctx; + + switch(evt) { + case CONN_EVT_TIMEOUT: + // connection has been closed by timeout -> clean up the list + connection_list_delete_closed(&digi_ctx->conn_list); + break; + + default: + // do nothing + break; + } +} + +static result_t digipeater_handle_beacon_responses(digipeater_ctx_t *ctx, const layer2_packet_header_t *header, const uint8_t *buf, size_t buf_len) +{ + LOG(LVL_DEBUG, "Handling beacon response packet."); + layer2_dump_packet_header(LVL_DUMP, header); + + if(header->msg_type != L2_MSG_TYPE_CONN_MGMT) { + LOG(LVL_ERR, "Beacon response with invalid message type %i", header->msg_type); + return ERR_INVALID_PARAM; + } + + if(buf_len == 0) { + LOG(LVL_ERR, "Missing payload in beacon response"); + return ERR_INVALID_PARAM; + } + + uint8_t conn_mgmt_type = buf[0]; + + if(conn_mgmt_type != CONN_MGMT_TYPE_CONNECTION_REQUEST) { + LOG(LVL_ERR, "Unexpected connection management type in beacon response: 0x%02x", conn_mgmt_type); + return ERR_INVALID_PARAM; + } + + // packet is valid -> create a new connection, enqueue the parameters message + // and add it at the beginning of the connection list. + + connection_ctx_t new_conn; + ERR_CHECK(connection_init(&new_conn, &ctx->my_addr, &header->src_addr, conn_data_cb, conn_event_cb, ctx)); + + ERR_CHECK(connection_send_parameters(&new_conn)); + + uint64_t now = get_hires_time(); + ERR_CHECK(connection_list_insert(&ctx->conn_list, &new_conn, now)); + + return OK; +} + + result_t digipeater_init(digipeater_ctx_t *ctx, const ham64_t *my_addr, digipeater_data_callback_t data_cb) { ctx->my_addr = *my_addr; @@ -69,14 +136,28 @@ result_t digipeater_handle_packet(digipeater_ctx_t *ctx, const uint8_t *buf, siz } // FIXME: handle connection management packets here - // FIXME: handle data and empty packets in the current connection size_t header_size = layer2_get_encoded_header_size(&header); const uint8_t *payload = buf + header_size; size_t payload_len = packet_size - header_size; - connection_handle_packet_prechecked(current_conn, &header, payload, payload_len); + switch(ctx->state) { + case DIGIPEATER_STATE_CONN: + { + connection_list_entry_t *head = connection_list_get_head(&ctx->conn_list); + if(head) { + connection_ctx_t *current_conn = &head->connection; + return connection_handle_packet_prechecked(current_conn, &header, payload, payload_len); + } else { + LOG(LVL_WARN, "Digipeater in CONN state, but there is no active connection! Packet dropped."); + return OK; + } + } + + case DIGIPEATER_STATE_BEACON: + return digipeater_handle_beacon_responses(ctx, &header, payload, payload_len); + } return OK; } @@ -85,21 +166,50 @@ result_t digipeater_handle_packet(digipeater_ctx_t *ctx, const uint8_t *buf, siz size_t digipeater_encode_next_packet(digipeater_ctx_t *ctx, uint8_t *buf, size_t buf_len, bool *end_burst) { uint64_t now = get_hires_time(); + size_t packet_size = 0; *end_burst = false; if(now > ctx->next_beacon_time) { - // TODO: build a beacon packet - return 0; + // build a beacon packet + layer2_packet_header_t header; + + ham64_t broadcast = {{0xFFFF, 0, 0, 0}, 1}; + + header.dst_addr = broadcast; + header.src_addr = ctx->my_addr; + header.msg_type = L2_MSG_TYPE_CONN_MGMT; + header.rx_seq_nr = 0; // unused + header.tx_seq_nr = 0; // unused + header.tx_request = 1; + + uint8_t payload[1] = {CONN_MGMT_TYPE_BEACON}; + packet_size = layer2_encode_packet(&header, payload, 1, buf, buf_len); + + ctx->state = DIGIPEATER_STATE_BEACON; + *end_burst = true; + return packet_size; + } else { + // pull packets from the current connection + + connection_list_entry_t *head = connection_list_get_head(&ctx->conn_list); + if(!head) { + return 0; + } + + connection_ctx_t *conn = &head->connection; + if(connection_can_transmit(conn)) { + packet_size = connection_encode_next_packet(conn, buf, buf_len, end_burst); + + ctx->state = DIGIPEATER_STATE_CONN; + } + + return packet_size; } - - // TODO: pull packets from the current connection - - return 0; } -bool digipeater_can_transmit(const digipeater_ctx_t *ctx) +bool digipeater_can_transmit(digipeater_ctx_t *ctx) { uint64_t now = get_hires_time(); @@ -107,16 +217,34 @@ bool digipeater_can_transmit(const digipeater_ctx_t *ctx) return true; } - // TODO: also true if next TX time has expired and there are packets waiting for this connection. + connection_list_entry_t *head = connection_list_get_head(&ctx->conn_list); + if(head) { + return connection_can_transmit(&head->connection); + } - /*return (packet_queue_get_used_space(&ctx->packet_queue) != 0) - && (packet_queue_get(&ctx->packet_queue, ctx->next_packet_index) != NULL); */ return false; } result_t digipeater_maintain(digipeater_ctx_t *ctx) { - (void)ctx; + switch(ctx->state) { + case DIGIPEATER_STATE_CONN: + { + connection_list_entry_t *head = connection_list_get_head(&ctx->conn_list); + if(!head) { + LOG(LVL_INFO, "No active connection -> force beacon state for packet handling."); + ctx->state = DIGIPEATER_STATE_BEACON; + } else { + ERR_CHECK(connection_maintain(&head->connection)); + } + break; + } + + case DIGIPEATER_STATE_BEACON: + // nothing to do here + break; + } + return OK; } diff --git a/impl/src/layer2/digipeater.h b/impl/src/layer2/digipeater.h index 5fb75b8..f806864 100644 --- a/impl/src/layer2/digipeater.h +++ b/impl/src/layer2/digipeater.h @@ -83,7 +83,7 @@ size_t digipeater_encode_next_packet(digipeater_ctx_t *ctx, uint8_t *buf, size_t /*!\brief Check if there are packets queued for transmission. */ -bool digipeater_can_transmit(const digipeater_ctx_t *ctx); +bool digipeater_can_transmit(digipeater_ctx_t *ctx); /*!\brief Handle internal maintenance tasks. * diff --git a/impl/src/layer2/packet_structs.h b/impl/src/layer2/packet_structs.h index 3c50545..52d1f88 100644 --- a/impl/src/layer2/packet_structs.h +++ b/impl/src/layer2/packet_structs.h @@ -33,6 +33,7 @@ typedef struct layer2_packet_header_s { ham64_t dst_addr; //!< destination HAM-64 address } layer2_packet_header_t; + // maximum header size // - 1 byte packet info // - 1 byte sequence numbers @@ -104,6 +105,22 @@ typedef struct layer2_data_header_s { /* Connection Management Structs */ -// TODO +typedef enum { + CONN_MGMT_TYPE_BEACON = 0x00, + CONN_MGMT_TYPE_CONNECTION_REQUEST = 0x01, + CONN_MGMT_TYPE_CONNECTION_PARAMETERS = 0x02, + CONN_MGMT_TYPE_CONNECTION_RESET = 0x03, + CONN_MGMT_TYPE_DISCONNECT_REQUEST = 0x04, + CONN_MGMT_TYPE_DISCONNECT = 0x05, +} conn_mgmt_type_t; + +typedef enum { + CONN_PARAM_TYPE_IPV6_ADDRESS = 0x00, + CONN_PARAM_TYPE_IPV6_GATEWAY = 0x01, + CONN_PARAM_TYPE_IPV6_DNS = 0x02, + CONN_PARAM_TYPE_IPV4_ADDRESS = 0x08, + CONN_PARAM_TYPE_IPV4_GATEWAY = 0x09, + CONN_PARAM_TYPE_IPV4_DNS = 0x0A, +} conn_param_type_t; #endif // PACKET_STRUCTS_H diff --git a/impl/test/layer2_over_udp/l2udptest_client.c b/impl/test/layer2_over_udp/l2udptest_client.c index 1d53eab..d4ae4c4 100644 --- a/impl/test/layer2_over_udp/l2udptest_client.c +++ b/impl/test/layer2_over_udp/l2udptest_client.c @@ -222,16 +222,13 @@ int main(int argc, char **argv) size_t total_bytes = 0; uint64_t next_stats_print_time = old + HRTIME_MS(500); - uint64_t retransmit_time = 0; + // TODO: wait for beacon + // TODO: send connection request while(m_running) { uint64_t now = get_hires_time(); - if(retransmit_time != 0 && now >= retransmit_time) { - LOG(LVL_INFO, "Retransmit triggered."); - retransmit_time = 0; - connection_restart_tx(&l2conn); - } + RESULT_CHECK(connection_maintain(&l2conn)); // fill the TX queue from the TUN device while(connection_can_enqueue_packet(&l2conn)) { diff --git a/impl/test/layer2_over_udp/l2udptest_digipeater.c b/impl/test/layer2_over_udp/l2udptest_digipeater.c index 1d53eab..5064875 100644 --- a/impl/test/layer2_over_udp/l2udptest_digipeater.c +++ b/impl/test/layer2_over_udp/l2udptest_digipeater.c @@ -30,6 +30,7 @@ #include "debug_structs.h" #include "layer2/connection.h" +#include "layer2/connection_list.h" #include "layer2/tundev.h" @@ -154,6 +155,9 @@ int main(int argc, char **argv) return 1; } + connection_list_t conn_list; + RESULT_CHECK(connection_list_init(&conn_list)); + ham64_t my_address, peer_address; ham64_encode(MY_CALL, &my_address); ham64_encode(PEER_CALL, &peer_address); @@ -222,18 +226,17 @@ int main(int argc, char **argv) size_t total_bytes = 0; uint64_t next_stats_print_time = old + HRTIME_MS(500); - uint64_t retransmit_time = 0; + uint64_t next_beacon_time = old + HRTIME_MS(5000); while(m_running) { uint64_t now = get_hires_time(); - if(retransmit_time != 0 && now >= retransmit_time) { - LOG(LVL_INFO, "Retransmit triggered."); - retransmit_time = 0; - connection_restart_tx(&l2conn); + if(now >= next_beacon_time) { + // TODO: encode and transmit beacon + next_beacon_time += HRTIME_MS(5000); } - // fill the TX queue from the TUN device + // FIXME: fill the TX queues from the TUN device while(connection_can_enqueue_packet(&l2conn)) { int ret = poll(&pfd_tun, 1, 0 /* timeout */); if(ret < 0) {