diff --git a/impl/test/layer2_over_udp/CMakeLists.txt b/impl/test/layer2_over_udp/CMakeLists.txt index 1745b9f..e1c17fe 100644 --- a/impl/test/layer2_over_udp/CMakeLists.txt +++ b/impl/test/layer2_over_udp/CMakeLists.txt @@ -1,5 +1,5 @@ add_executable( - l2udptest + l2udptest_client ../../src/utils.c ../../src/utils.h ../../src/logger.c @@ -22,11 +22,47 @@ add_executable( ../../src/layer2/connection.h ../../src/layer2/tundev.c ../../src/layer2/tundev.h - l2udptest.c + l2udptest_client.c ) target_link_libraries( - l2udptest + l2udptest_client + fec + m + liquid +) + +#--------------------------- + +add_executable( + l2udptest_digipeater + ../../src/utils.c + ../../src/utils.h + ../../src/logger.c + ../../src/logger.h + ../../src/options.c + ../../src/options.h + ../../src/var_array.c + ../../src/var_array.h + ../../src/config.h + ../../src/jsonlogger.c + ../../src/jsonlogger.h + ../../src/debug_structs.h + ../../src/layer2/packet_structs.c + ../../src/layer2/packet_structs.h + ../../src/layer2/ham64.c + ../../src/layer2/ham64.h + ../../src/layer2/packet_queue.c + ../../src/layer2/packet_queue.h + ../../src/layer2/connection.c + ../../src/layer2/connection.h + ../../src/layer2/tundev.c + ../../src/layer2/tundev.h + l2udptest_digipeater.c +) + +target_link_libraries( + l2udptest_digipeater fec m liquid diff --git a/impl/test/layer2_over_udp/l2udptest.c b/impl/test/layer2_over_udp/l2udptest_client.c similarity index 100% rename from impl/test/layer2_over_udp/l2udptest.c rename to impl/test/layer2_over_udp/l2udptest_client.c diff --git a/impl/test/layer2_over_udp/l2udptest_digipeater.c b/impl/test/layer2_over_udp/l2udptest_digipeater.c new file mode 100644 index 0000000..82d7efd --- /dev/null +++ b/impl/test/layer2_over_udp/l2udptest_digipeater.c @@ -0,0 +1,371 @@ +/* + * SPDX-License-Identifier: GPL-3.0-or-later + * + * Copyright (C) 2024 Thomas Kolb + */ + +#include +#include +#include +#include + +#include + +#include + +#include +#include +#include +#include +#include +#include + +#include "layer2/ham64.h" +#include "utils.h" + +#define LOGGER_MODULE_NAME "main" +#include "logger.h" +#include "options.h" +#include "jsonlogger.h" +#include "debug_structs.h" + +#include "layer2/connection.h" + +#include "layer2/tundev.h" + +#include "config.h" + +#define RESULT_CHECK(stmt) { \ + result_t res = stmt; \ + if(res != OK) { \ + LOG(LVL_FATAL, "Error %d in %s:%d!", res, __FILE__, __LINE__); \ + exit(1); \ + } \ +} + +#define BROADCAST_PORT 3737 + +static int m_tunfd = -1; +static bool m_running = true; + +static int m_bcast_sock = -1; + +static double next_tx_switch_time = 0.0; + +static rx_stats_t m_rx_stats; + +static connection_ctx_t l2conn; + +static void signal_handler(int signal, siginfo_t *info, void *ctx) +{ + (void)signal; + (void)info; + (void)ctx; + + LOG(LVL_INFO, "\nGracefully shutting down on signal %d.", signal); + + m_running = false; +} + +static void block_tx_for(unsigned offset_ms) +{ + next_tx_switch_time = get_hires_time() + (double)offset_ms * 0.001; +} + + +void handle_received_packet(uint8_t *packet_data, size_t packet_len) +{ + block_tx_for(TX_SWITCH_BACKOFF_END_OF_PACKET_MS); + + result_t result = connection_handle_packet(&l2conn, packet_data, packet_len); + switch(result) { + case OK: + m_rx_stats.successful_decodes++; + break; + + case ERR_INTEGRITY: + LOG(LVL_ERR, "Packet could not be decoded by Layer 2."); + m_rx_stats.failed_decodes++; + break; + + case ERR_SEQUENCE: + LOG(LVL_ERR, "Packet not in the expected sequence."); + break; + + default: // all other errors + LOG(LVL_ERR, "layer2_rx_handle_packet() returned error code %u.", result); + break; + } +} + + +static result_t transmit(const uint8_t *data, size_t len) +{ + result_t result = OK; + + struct sockaddr_in bcast_addr = {0}; + + bcast_addr.sin_family = AF_INET; + bcast_addr.sin_port = htons(BROADCAST_PORT); + bcast_addr.sin_addr.s_addr = INADDR_BROADCAST; + + int ret = sendto(m_bcast_sock, data, len, 0, (struct sockaddr*)&bcast_addr, sizeof(bcast_addr)); + if(ret < 0) { + LOG(LVL_ERR, "sendto: %s", strerror(errno)); + exit(EXIT_FAILURE); + } + + fprintf(stderr, "t"); + return result; +} + + +void rx_data_to_tun(struct connection_ctx_s *conn, const uint8_t *data, size_t len) +{ + (void)conn; + + int ret = write(m_tunfd, data, len); + if(ret < 0) { + LOG(LVL_ERR, "write(tun): %s", strerror(errno)); + } +} + + +int main(int argc, char **argv) +{ + // initialize the console logger + logger_init(); + + if(!jsonlogger_init("jsonlog.fifo")) { + LOG(LVL_FATAL, "Could not initialize JSON logger."); + return EXIT_FAILURE; + } + + bool on_air = true; + + srand((int)(get_hires_time() * 1e6)); + + // ** Initialize ** + + char devname[IFNAMSIZ] = "hamnet70"; + m_tunfd = tundev_open(devname); + + if(m_tunfd < 0) { + return 1; + } + + ham64_t my_address, peer_address; + ham64_encode(MY_CALL, &my_address); + ham64_encode(PEER_CALL, &peer_address); + RESULT_CHECK(connection_init(&l2conn, &my_address, &peer_address, rx_data_to_tun)); + + // force connection into the established state + l2conn.conn_state = CONN_STATE_ESTABLISHED; + + // ** Set up signal handling + + struct sigaction term_action = {0}; + term_action.sa_sigaction = signal_handler; + + if(sigaction(SIGTERM, &term_action, NULL) < 0) { + LOG(LVL_ERR, "sigaction: %s", strerror(errno)); + exit(EXIT_FAILURE); + } + + if(sigaction(SIGINT, &term_action, NULL) < 0) { + LOG(LVL_ERR, "sigaction: %s", strerror(errno)); + exit(EXIT_FAILURE); + } + + // ** Set up UDP socket + + m_bcast_sock = socket(AF_INET, SOCK_DGRAM, 0); + if(m_bcast_sock < 0) { + LOG(LVL_ERR, "socket: %s", strerror(errno)); + exit(EXIT_FAILURE); + } + + int broadcastEnable=1; + int ret = setsockopt(m_bcast_sock, SOL_SOCKET, SO_BROADCAST, &broadcastEnable, sizeof(broadcastEnable)); + if(ret < 0) { + LOG(LVL_ERR, "setsockopt: %s", strerror(errno)); + exit(EXIT_FAILURE); + } + + struct sockaddr_in bind_addr = {0}; + + bind_addr.sin_family = AF_INET; + bind_addr.sin_port = htons(BROADCAST_PORT); + bind_addr.sin_addr.s_addr = INADDR_ANY; + + ret = bind(m_bcast_sock, (struct sockaddr*)&bind_addr, sizeof(bind_addr)); + if(ret < 0) { + LOG(LVL_ERR, "bind: %s", strerror(errno)); + exit(EXIT_FAILURE); + } + + // ** Process packets ** + + struct pollfd pfd_tun; + memset(&pfd_tun, 0, sizeof(pfd_tun)); + + pfd_tun.fd = m_tunfd; + pfd_tun.events = POLLIN; + + struct pollfd pfd_bcast; + memset(&pfd_bcast, 0, sizeof(pfd_bcast)); + + pfd_bcast.fd = m_bcast_sock; + pfd_bcast.events = POLLIN; + + double old = get_hires_time(); + size_t total_bytes = 0; + double next_stats_print_time = old + 0.5; + + double retransmit_time = 0.0; + + while(m_running) { + double now = get_hires_time(); + + if(retransmit_time != 0.0 && now >= retransmit_time) { + LOG(LVL_INFO, "Retransmit triggered."); + retransmit_time = 0.0; + connection_restart_tx(&l2conn); + } + + // fill the TX queue from the TUN device + while(connection_can_enqueue_packet(&l2conn)) { + int ret = poll(&pfd_tun, 1, 0 /* timeout */); + if(ret < 0) { + LOG(LVL_ERR, "poll: %s", strerror(errno)); + break; + } else if(ret == 0) { + // no more packets + break; + } else { + // a packet is available -> move it to the queue + static const size_t packetbuf_size = 2048; + uint8_t packetbuf[packetbuf_size]; + ret = read(m_tunfd, packetbuf, packetbuf_size); + if(ret < 0) { + LOG(LVL_ERR, "read: %s", strerror(errno)); + return ERR_SYSCALL; + } else if(ret == 0) { + // no more data, should not happen + break; + } + + RESULT_CHECK(connection_enqueue_packet(&l2conn, packetbuf, ret)); + } + } + + if((now > next_tx_switch_time)) { + if(connection_can_transmit(&l2conn)) { + // there is a packet to be (re)transmitted. + + LOG(LVL_DEBUG, "Starting new burst."); + + size_t burst_len = 0; + + // add packets to the burst until only 50000 samples remain free in the SDR buffer + while(true) { + uint8_t packet_buf[2048]; + size_t packet_size; + + packet_size = connection_encode_next_packet(&l2conn, + connection_get_next_expected_seq(&l2conn), + packet_buf, sizeof(packet_buf)); + + if(packet_size == 0) { + // no more packets available + LOG(LVL_DEBUG, "Ending burst due to empty packet queue."); + break; + } + + LOG(LVL_DEBUG, "Adding packet with %d bytes to burst.", packet_size); + + burst_len++; + RESULT_CHECK(transmit(packet_buf, packet_size)); + } + + connection_tx_clean_empty_packet(&l2conn); + + LOG(LVL_DEBUG, "Burst finished: %zd packets sent.", burst_len); + + if(!on_air) { + LOG(LVL_INFO, "RX -> TX"); + } + + on_air = true; + } else if(on_air) { // TX on, but no more bursts to send + LOG(LVL_INFO, "TX -> RX"); + on_air = false; + + retransmit_time = get_hires_time() + 1.0 + 1.0 * rand() / RAND_MAX; + + block_tx_for(TX_SWITCH_BACKOFF_AFTER_RX_ON); + } + } + + if(!on_air) { + // ** Receive signal ** + + int ret = poll(&pfd_bcast, 1, 10); + if(ret < 0) { + LOG(LVL_ERR, "poll: %s", strerror(errno)); + break; + } + + if(ret == 0) { + continue; + } + + uint8_t packetbuf[65536]; + ret = recv(m_bcast_sock, packetbuf, sizeof(packetbuf), 0); + if(ret < 0) { + LOG(LVL_ERR, "recv: %s", strerror(errno)); + } + + if(ret <= 0) { + break; + } + + handle_received_packet(packetbuf, ret); + + total_bytes += ret; + + double new = get_hires_time(); + if(new >= next_stats_print_time) { + double rate = total_bytes / (new - old); + LOG(LVL_INFO, "\nEstimated rate: %.3f kB/s", rate / 1e3); + LOG(LVL_INFO, "Receiver statistics:"); + LOG(LVL_INFO, " Preambles found: %8zd", m_rx_stats.preambles_found); + LOG(LVL_INFO, " Successful decodes: %8zd (%6.2f %%)", + m_rx_stats.successful_decodes, m_rx_stats.successful_decodes * 100.0f / m_rx_stats.preambles_found); + LOG(LVL_INFO, " Header errors: %8zd (%6.2f %%)", + m_rx_stats.header_errors, m_rx_stats.header_errors * 100.0f / m_rx_stats.preambles_found); + LOG(LVL_INFO, " Failed decodes: %8zd (%6.2f %%)", + m_rx_stats.failed_decodes, m_rx_stats.failed_decodes * 100.0f / m_rx_stats.preambles_found); + next_stats_print_time += 0.5; + + total_bytes = 0; + old = new; + } + + fprintf(stderr, "r"); + } + } + + // ** Cleanup ** + + close(m_bcast_sock); + + connection_destroy(&l2conn); + + jsonlogger_shutdown(); + + LOG(LVL_INFO, "Done."); + + logger_shutdown(); +} +