diff --git a/impl/CMakeLists.txt b/impl/CMakeLists.txt index 554c6e1..a819d87 100644 --- a/impl/CMakeLists.txt +++ b/impl/CMakeLists.txt @@ -53,6 +53,8 @@ set(sources src/layer2/layer2_rx.h src/layer2/ham64.c src/layer2/ham64.h + src/layer2/connection.c + src/layer2/connection.h src/sdr/sdr.c src/sdr/sdr.h ) diff --git a/impl/src/layer2/connection.c b/impl/src/layer2/connection.c new file mode 100644 index 0000000..87e00ab --- /dev/null +++ b/impl/src/layer2/connection.c @@ -0,0 +1,297 @@ +#include +#include + +#include "connection.h" + +#include "config.h" +#include "layer2/ham64.h" +#include "results.h" + +#define SEQ_NR_MASK 0xF + +result_t connection_init(connection_ctx_t *ctx, const ham64_t *my_addr, const ham64_t *peer_addr) +{ + ctx->last_acked_seq = 0; + ctx->next_expected_seq = 0; + + packet_queue_init(&ctx->packet_queue); + ctx->next_packet_index = 0; + ctx->next_seq_nr = 0; + + ctx->my_addr = *my_addr; + ctx->peer_addr = *peer_addr; + + ctx->conn_state = CONN_STATE_INITIALIZED; + + return OK; +} + + +void connection_destroy(connection_ctx_t *ctx) +{ + ctx->conn_state = CONN_STATE_UNINITIALIZED; + packet_queue_destroy(&ctx->packet_queue); +} + + +result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, size_t buf_len) +{ + // check the CRC + size_t packet_size = buf_len - crc_sizeof_key(PAYLOAD_CRC_SCHEME); + + if(!crc_check_key(PAYLOAD_CRC_SCHEME, (unsigned char*)buf, packet_size)) { + LOG(LVL_ERR, "payload CRC check failed!"); + return ERR_INTEGRITY; + } + + // decode the header + layer2_packet_header_t header; + + if(!layer2_decode_packet_header(buf, buf_len, &header)) { + LOG(LVL_ERR, "Header could not be decoded!"); + return ERR_INTEGRITY; + } + + // check if the packet really should be handled by us + if(!ham64_is_equal(&header.src_addr, &ctx->peer_addr)) { + char fmt_src_addr[HAM64_FMT_MAX_LEN]; + char fmt_peer_addr[HAM64_FMT_MAX_LEN]; + + ham64_format(&header.src_addr, fmt_src_addr); + ham64_format(&ctx->peer_addr, fmt_peer_addr); + + LOG(LVL_ERR, "Packet has the wrong source address: got %s, expected %s", + fmt_src_addr, fmt_peer_addr); + + return ERR_INVALID_ADDRESS; + } + + if(!ham64_is_equal(&header.dst_addr, &ctx->my_addr)) { + char fmt_dst_addr[HAM64_FMT_MAX_LEN]; + char fmt_my_addr[HAM64_FMT_MAX_LEN]; + + ham64_format(&header.dst_addr, fmt_dst_addr); + ham64_format(&ctx->my_addr, fmt_my_addr); + + LOG(LVL_ERR, "Packet has the wrong destination address: got %s, expected %s", + fmt_dst_addr, fmt_my_addr); + + return ERR_INVALID_ADDRESS; + } + + LOG(LVL_DEBUG, "Handling %s packet with rx_seq_nr %u, tx_seq_nr %u.", + layer2_msg_type_to_string(header.msg_type), header.rx_seq_nr, header.tx_seq_nr); + + ctx->last_acked_seq = header.rx_seq_nr; + + + switch(header.msg_type) { + case L2_MSG_TYPE_EMPTY: + LOG(LVL_DEBUG, "Empty packet: accepted ACK for %u.", ctx->last_acked_seq); + return OK; // do not ACK and call back + + case L2_MSG_TYPE_CONN_MGMT: + case L2_MSG_TYPE_CONNECTIONLESS: + LOG(LVL_WARN, "Message type %s is not implemented yet.", layer2_msg_type_to_string(header.msg_type)); + return OK; + + case L2_MSG_TYPE_DATA: + break; + + default: + LOG(LVL_ERR, "Invalid message type %d.", header.msg_type); + return ERR_INVALID_STATE; + } + + if(ctx->next_expected_seq != header.tx_seq_nr) { + LOG(LVL_ERR, "Expected sequence number %u, received %u.", ctx->next_expected_seq, header.tx_seq_nr); + return ERR_SEQUENCE; + } + + ctx->next_expected_seq++; + ctx->next_expected_seq &= 0xF; + + LOG(LVL_INFO, "Received ACK for seq_nr %u in packet seq_nr %u.", header.rx_seq_nr, header.tx_seq_nr); + + // handle the acknowledgement internally + connection_handle_ack(ctx, header.rx_seq_nr); + + size_t header_size = layer2_get_encoded_header_size(&header); + + // extract the payload and forward it to the tun device + const uint8_t *payload = buf + header_size; + size_t payload_len = packet_size - header_size; + + ctx->data_cb(ctx, payload, payload_len); + + return OK; +} + + +uint8_t connection_get_next_expected_seq(const connection_ctx_t *ctx) +{ + return ctx->next_expected_seq; +} + + +uint8_t connection_get_last_acked_seq(const connection_ctx_t *ctx) +{ + return ctx->last_acked_seq; +} + + +result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len) +{ + layer2_packet_header_t header; + + if(packet_queue_get_free_space(&ctx->packet_queue) == 0) { + return ERR_NO_MEM; + } + + header.dst_addr = ctx->peer_addr; + header.src_addr = ctx->my_addr; + header.msg_type = L2_MSG_TYPE_DATA; + header.rx_seq_nr = 0; // will be filled in layer2_tx_encode_next_packet() + header.tx_request = 0; + header.tx_seq_nr = ctx->next_seq_nr; + + // create a persistent copy of the packet data. + // TODO: possibly this copy operation can be removed by passing a malloc'd buffer in. + uint8_t *packetbuf = malloc(buf_len); + if(!packetbuf) { + LOG(LVL_ERR, "malloc failed."); + return ERR_NO_MEM; + } + + memcpy(packetbuf, buf, buf_len); + + packet_queue_add(&ctx->packet_queue, &header, packetbuf, buf_len); + + LOG(LVL_INFO, "Added packet tx_seq %u to queue -> %zu entries", + header.tx_seq_nr, packet_queue_get_used_space(&ctx->packet_queue)); + + ctx->next_seq_nr++; + ctx->next_seq_nr &= SEQ_NR_MASK; + + return OK; +} + + +result_t connection_add_empty_packet(connection_ctx_t *ctx, bool tx_request) +{ + layer2_packet_header_t header; + + header.dst_addr.addr[0] = 0xFFFF; + header.dst_addr.length = 1; + header.src_addr.addr[0] = 0x0001; + header.src_addr.length = 1; + header.msg_type = L2_MSG_TYPE_EMPTY; + header.rx_seq_nr = 0; // will be filled in layer2_tx_encode_next_packet() + header.tx_seq_nr = 0; // not used in empty packets + header.tx_request = tx_request; + + if (!packet_queue_add(&ctx->packet_queue, &header, NULL, 0)) { + return ERR_NO_MEM; + } + + return OK; +} + + +size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t ack_seq_nr, uint8_t *buf, size_t buf_len) +{ + const packet_queue_entry_t *entry = packet_queue_get(&ctx->packet_queue, ctx->next_packet_index); + + if(!entry) { + // no more entries + return 0; + } + + unsigned int crc_size = crc_sizeof_key(PAYLOAD_CRC_SCHEME); + + assert(buf_len >= LAYER2_PACKET_HEADER_ENCODED_SIZE_MAX + crc_size + entry->data_len); + + layer2_packet_header_t header = entry->header; + header.rx_seq_nr = ack_seq_nr; + + // encode the header + LOG(LVL_DEBUG, "Encoding packet with rx_seq_nr %u, tx_seq_nr %u.", header.rx_seq_nr, header.tx_seq_nr); + + size_t packet_size = layer2_encode_packet_header(&header, buf); + + // add the payload data + if(entry->data) { + memcpy(buf + packet_size, entry->data, entry->data_len); + } + + packet_size += entry->data_len; + + // calculate CRC of everything and append it to the packet + crc_append_key(PAYLOAD_CRC_SCHEME, buf, packet_size); + + packet_size += crc_size; + + ctx->next_packet_index++; + + return packet_size; +} + + +void connection_restart_tx(connection_ctx_t *ctx) +{ + ctx->next_packet_index = 0; +} + + +void connection_tx_clean_empty_packet(connection_ctx_t *ctx) +{ + const packet_queue_entry_t *entry = packet_queue_get(&ctx->packet_queue, 0); + if(entry && entry->header.msg_type == L2_MSG_TYPE_EMPTY) { + packet_queue_delete(&ctx->packet_queue, 1); + + if(ctx->next_packet_index > 0) { + ctx->next_packet_index--; + } + } +} + + +void connection_handle_ack(connection_ctx_t *ctx, uint8_t acked_seq) +{ + ctx->next_packet_index = 0; + + size_t packets_to_remove = 0; + size_t packets_available = packet_queue_get_used_space(&ctx->packet_queue); + + for(size_t i = 0; i < packets_available; i++) { + const packet_queue_entry_t *entry = packet_queue_get(&ctx->packet_queue, i); + + if(entry->header.tx_seq_nr == acked_seq) { + break; + } + + packets_to_remove++; + } + + packet_queue_delete(&ctx->packet_queue, packets_to_remove); + + packets_available = packet_queue_get_used_space(&ctx->packet_queue); + + LOG(LVL_DEBUG, "handling ack for seq_nr %u, removing %zu packets, %zu packets remaining.", acked_seq, packets_to_remove, packets_available); + + if(packets_available == 0) { + // no packets left in queue, but an acknowledgement must be + // transmitted. Add an empty packet to do that. + result_t res = connection_add_empty_packet(ctx, false); + if (res != OK) { + LOG(LVL_WARN, "Failed to add empty packet: %d.", res); + } + } +} + + +bool connection_can_transmit(const connection_ctx_t *ctx) +{ + return (packet_queue_get_used_space(&ctx->packet_queue) != 0) + && (packet_queue_get(&ctx->packet_queue, ctx->next_packet_index) != NULL); +} diff --git a/impl/src/layer2/connection.h b/impl/src/layer2/connection.h new file mode 100644 index 0000000..d975f58 --- /dev/null +++ b/impl/src/layer2/connection.h @@ -0,0 +1,126 @@ +/* + * This file contains functions to handle a single layer 2 connection. + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + * Copyright (C) 2024 Thomas Kolb + */ + +#ifndef CONNECTION_H +#define CONNECTION_H + +#include +#include + +#include "packet_queue.h" + +struct connection_ctx_s; + +typedef enum { + CONN_STATE_UNINITIALIZED, //!< Uninitialized. Cannot be used in any way + CONN_STATE_INITIALIZED, //!< Initialized, no packets processed yet + CONN_STATE_CONNECTING, //!< Connection request sent, no two-way communication yet + CONN_STATE_ESTABLISHED, //!< Connection is established + CONN_STATE_CLOSED //!< Connection has been closed (gracefully or by timeout) +} connection_state_t; + +/*!\brief Type for a callback function that is called when a data packet was received. */ +typedef void (*connection_data_callback_t)(struct connection_ctx_s *conn, const uint8_t *data, size_t len); + +typedef struct connection_ctx_s { + connection_state_t conn_state; //!< State of the connection. + + connection_data_callback_t data_cb; //!< Callback function for received data packets. + + ham64_t my_addr; //!< The local link layer address. + ham64_t peer_addr; //!< The link layer address of the peer. + + uint8_t last_acked_seq; //!< Next sequence number expected by the peer (from last Ack). + uint8_t next_expected_seq; //!< Next sequence number expected by us. + + packet_queue_t packet_queue; //!< Transmission packet queue. + + size_t next_packet_index; //!< Index in the packet queue of the next packet to transmit. + uint8_t next_seq_nr; //!< Sequence number to tag the next transmitted packet with. +} connection_ctx_t; + + +/*!\brief Initialize the layer 2 connection context. + * + * \param ctx The connection context to initialize. + * \param my_addr The local link layer address. + * \param peer_addr The remote link layer address. + * \returns OK if everything worked or a fitting error code. + */ +result_t connection_init(connection_ctx_t *ctx, const ham64_t *my_addr, const ham64_t *peer_addr); + +/*!\brief Destroy the given layer 2 connection context. + */ +void connection_destroy(connection_ctx_t *ctx); + +/*!\brief Handle a received packet. + * + * \param ctx The receiver context. + * \param buf Where to write the encoded packet data. + * \param buf_len Space available in the buffer. + * \returns A result code from the packet handling procedure. + */ +result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, size_t buf_len); + +/*!\brief Return the sequence number expected next by our side. + */ +uint8_t connection_get_next_expected_seq(const connection_ctx_t *ctx); + +/*!\brief Return the sequence number expected next by the other side. + */ +uint8_t connection_get_last_acked_seq(const connection_ctx_t *ctx); + +/*!\brief Enqueue a packet for transmission. + * \param ctx The connection context. + */ +result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len); + +/*!\brief Add an empty packet to ensure an acknowledgement is sent. + * \param ctx The connection context. + * \param tx_request Value of the TX Request field in the packet. + */ +result_t connection_add_empty_packet(connection_ctx_t *ctx, bool tx_request); + +/*!\brief Encode the next packet for transmission. + * + * \note + * If no more packets are available, this function returns zero. In that case, + * either \ref connection_restart() or \ref connection_handle_ack() must be + * called to handle retransmits correctly. + * + * \param ctx The connection context. + * \param ack_seq_nr The received sequence number to send as an acknowledgement. + * \param buf Where to write the encoded packet data. + * \param buf_len Space available in the buffer. + * \returns The number of bytes written to buf or zero if no packet was available. + */ +size_t connection_encode_next_packet(connection_ctx_t *ctx, uint8_t ack_seq_nr, uint8_t *buf, size_t buf_len); + +/*!\brief Restart the transmission from the beginning of the packet queue. + */ +void connection_restart_tx(connection_ctx_t *ctx); + +/*!\brief Remove the first packet from the queue if it is an empty packet. + */ +void connection_tx_clean_empty_packet(connection_ctx_t *ctx); + +/*!\brief Handle acknowledgements. + * \details + * Removes all packets before the given sequence number from the queue. + * + * \param ctx The connection context. + * \param acked_seq The acknowledged (= next expected) sequence number. + * \param do_ack Whether an empty packet shall be generated if the queue is empty. + */ +void connection_handle_ack(connection_ctx_t *ctx, uint8_t acked_seq); + +/*!\brief Check if there are packets queued for transmission. + */ +bool connection_can_transmit(const connection_ctx_t *ctx); + +#endif // CONNECTION_H diff --git a/impl/src/layer2/ham64.c b/impl/src/layer2/ham64.c index b4301ca..5d00292 100644 --- a/impl/src/layer2/ham64.c +++ b/impl/src/layer2/ham64.c @@ -187,3 +187,19 @@ void ham64_format(const ham64_t *ham64, char *out) out[5*ham64->length - 1] = '\0'; } + + +bool ham64_is_equal(const ham64_t *a, const ham64_t *b) +{ + if(a->length != b->length) { + return false; + } + + for(uint8_t i = 0; i < a->length; i++) { + if(a->addr[i] != b->addr[i]) { + return false; + } + } + + return true; +} diff --git a/impl/src/layer2/ham64.h b/impl/src/layer2/ham64.h index 71082cb..35798f5 100644 --- a/impl/src/layer2/ham64.h +++ b/impl/src/layer2/ham64.h @@ -9,6 +9,7 @@ #include #include +#include // buffer size required for the string representation of a maximum-length HAM64 // address, including terminating zero. @@ -62,4 +63,11 @@ const char *ham64_addr_type_to_string(ham64_addr_type_t addr_type); */ void ham64_format(const ham64_t *ham64, char *out); +/*!\brief Check if two ham64 addresses are equal. + * \param a Pointer to the first address. + * \param b Pointer to the second address. + * \returns True if a and b are equal, false otherwise. + */ +bool ham64_is_equal(const ham64_t *a, const ham64_t *b); + #endif // HAM64_H diff --git a/impl/src/layer2/layer2_rx.c b/impl/src/layer2/layer2_rx.c index 31dfa9b..bf504f4 100644 --- a/impl/src/layer2/layer2_rx.c +++ b/impl/src/layer2/layer2_rx.c @@ -6,7 +6,6 @@ */ #include -#include #include #include diff --git a/impl/src/results.h b/impl/src/results.h index 60f38c3..cb847f4 100644 --- a/impl/src/results.h +++ b/impl/src/results.h @@ -11,16 +11,17 @@ typedef enum { OK, - ERR_INVALID_STATE, - ERR_INVALID_PARAM, // invalid / nonsense parameters given - ERR_NO_MEM, // not enough memory or allocation error - ERR_SIZE, // a given size is invalid - ERR_LIQUID, // an error occurred in the LiquidDSP library. - ERR_SYSCALL, // a syscall failed. Use errno to determine the cause. - ERR_SOAPY, // an error occurred in the SoapySDR library. - ERR_SDR, // an error occurred in the SDR interface. - ERR_INTEGRITY, // an integrity check failed (e.g. CRC of received packet is wrong) - ERR_SEQUENCE, // an unexpected packet was received + ERR_INVALID_STATE, // module or context is in an invalid state + ERR_INVALID_PARAM, // invalid / nonsense parameters given + ERR_INVALID_ADDRESS, // invalid address received or given + ERR_NO_MEM, // not enough memory or allocation error + ERR_SIZE, // a given size is invalid + ERR_LIQUID, // an error occurred in the LiquidDSP library. + ERR_SYSCALL, // a syscall failed. Use errno to determine the cause. + ERR_SOAPY, // an error occurred in the SoapySDR library. + ERR_SDR, // an error occurred in the SDR interface. + ERR_INTEGRITY, // an integrity check failed (e.g. CRC of received packet is wrong) + ERR_SEQUENCE, // an unexpected packet was received } result_t; #ifdef DEBUG_LIQUID