hamnet70/impl/test/layer2_over_udp/l2udptest_client.c
Thomas Kolb 01edbb1db1 WIP: managing multiple connections
Working towards handling multiple connections. A lot is still missing.
2024-12-12 19:44:07 +01:00

369 lines
8.2 KiB
C

/*
* SPDX-License-Identifier: GPL-3.0-or-later
*
* Copyright (C) 2024 Thomas Kolb
*/
#include <linux/if.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <liquid/liquid.h>
#include <sys/poll.h>
#include <unistd.h>
#include <fcntl.h>
#include <poll.h>
#include <signal.h>
#include <errno.h>
#include "layer2/ham64.h"
#include "utils.h"
#define LOGGER_MODULE_NAME "main"
#include "logger.h"
#include "options.h"
#include "jsonlogger.h"
#include "debug_structs.h"
#include "layer2/connection.h"
#include "layer2/tundev.h"
#include "config.h"
#define RESULT_CHECK(stmt) { \
result_t res = stmt; \
if(res != OK) { \
LOG(LVL_FATAL, "Error %d in %s:%d!", res, __FILE__, __LINE__); \
exit(1); \
} \
}
#define BROADCAST_PORT 3737
static int m_tunfd = -1;
static bool m_running = true;
static int m_bcast_sock = -1;
static uint64_t next_tx_switch_time = 0;
static rx_stats_t m_rx_stats;
static connection_ctx_t l2conn;
static void signal_handler(int signal, siginfo_t *info, void *ctx)
{
(void)signal;
(void)info;
(void)ctx;
LOG(LVL_INFO, "\nGracefully shutting down on signal %d.", signal);
m_running = false;
}
static void block_tx_for(unsigned offset_ms)
{
next_tx_switch_time = get_hires_time() + HRTIME_MS(offset_ms);
}
void handle_received_packet(uint8_t *packet_data, size_t packet_len)
{
block_tx_for(TX_SWITCH_BACKOFF_END_OF_PACKET_MS);
result_t result = connection_handle_packet(&l2conn, packet_data, packet_len);
switch(result) {
case OK:
m_rx_stats.successful_decodes++;
break;
case ERR_INTEGRITY:
LOG(LVL_ERR, "Packet could not be decoded by Layer 2.");
m_rx_stats.failed_decodes++;
break;
case ERR_SEQUENCE:
LOG(LVL_ERR, "Packet not in the expected sequence.");
break;
default: // all other errors
LOG(LVL_ERR, "layer2_rx_handle_packet() returned error code %u.", result);
break;
}
}
static result_t transmit(const uint8_t *data, size_t len)
{
result_t result = OK;
struct sockaddr_in bcast_addr = {0};
bcast_addr.sin_family = AF_INET;
bcast_addr.sin_port = htons(BROADCAST_PORT);
bcast_addr.sin_addr.s_addr = INADDR_BROADCAST;
int ret = sendto(m_bcast_sock, data, len, 0, (struct sockaddr*)&bcast_addr, sizeof(bcast_addr));
if(ret < 0) {
LOG(LVL_ERR, "sendto: %s", strerror(errno));
exit(EXIT_FAILURE);
}
fprintf(stderr, "t");
return result;
}
void rx_data_to_tun(struct connection_ctx_s *conn, const uint8_t *data, size_t len)
{
(void)conn;
int ret = write(m_tunfd, data, len);
if(ret < 0) {
LOG(LVL_ERR, "write(tun): %s", strerror(errno));
}
}
int main(int argc, char **argv)
{
// initialize the console logger
logger_init();
if(!jsonlogger_init("jsonlog.fifo")) {
LOG(LVL_FATAL, "Could not initialize JSON logger.");
return EXIT_FAILURE;
}
bool on_air = true;
srand(get_hires_time());
// ** Initialize **
char devname[IFNAMSIZ] = "hamnet70";
m_tunfd = tundev_open(devname);
if(m_tunfd < 0) {
return 1;
}
ham64_t my_address, peer_address;
ham64_encode(MY_CALL, &my_address);
ham64_encode(PEER_CALL, &peer_address);
RESULT_CHECK(connection_init(&l2conn, &my_address, &peer_address, rx_data_to_tun));
// force connection into the established state
l2conn.conn_state = CONN_STATE_ESTABLISHED;
// ** Set up signal handling
struct sigaction term_action = {0};
term_action.sa_sigaction = signal_handler;
if(sigaction(SIGTERM, &term_action, NULL) < 0) {
LOG(LVL_ERR, "sigaction: %s", strerror(errno));
exit(EXIT_FAILURE);
}
if(sigaction(SIGINT, &term_action, NULL) < 0) {
LOG(LVL_ERR, "sigaction: %s", strerror(errno));
exit(EXIT_FAILURE);
}
// ** Set up UDP socket
m_bcast_sock = socket(AF_INET, SOCK_DGRAM, 0);
if(m_bcast_sock < 0) {
LOG(LVL_ERR, "socket: %s", strerror(errno));
exit(EXIT_FAILURE);
}
int broadcastEnable=1;
int ret = setsockopt(m_bcast_sock, SOL_SOCKET, SO_BROADCAST, &broadcastEnable, sizeof(broadcastEnable));
if(ret < 0) {
LOG(LVL_ERR, "setsockopt: %s", strerror(errno));
exit(EXIT_FAILURE);
}
struct sockaddr_in bind_addr = {0};
bind_addr.sin_family = AF_INET;
bind_addr.sin_port = htons(BROADCAST_PORT);
bind_addr.sin_addr.s_addr = INADDR_ANY;
ret = bind(m_bcast_sock, (struct sockaddr*)&bind_addr, sizeof(bind_addr));
if(ret < 0) {
LOG(LVL_ERR, "bind: %s", strerror(errno));
exit(EXIT_FAILURE);
}
// ** Process packets **
struct pollfd pfd_tun;
memset(&pfd_tun, 0, sizeof(pfd_tun));
pfd_tun.fd = m_tunfd;
pfd_tun.events = POLLIN;
struct pollfd pfd_bcast;
memset(&pfd_bcast, 0, sizeof(pfd_bcast));
pfd_bcast.fd = m_bcast_sock;
pfd_bcast.events = POLLIN;
uint64_t old = get_hires_time();
size_t total_bytes = 0;
uint64_t next_stats_print_time = old + HRTIME_MS(500);
// TODO: wait for beacon
// TODO: send connection request
while(m_running) {
uint64_t now = get_hires_time();
RESULT_CHECK(connection_maintain(&l2conn));
// fill the TX queue from the TUN device
while(connection_can_enqueue_packet(&l2conn)) {
int ret = poll(&pfd_tun, 1, 0 /* timeout */);
if(ret < 0) {
LOG(LVL_ERR, "poll: %s", strerror(errno));
break;
} else if(ret == 0) {
// no more packets
break;
} else {
// a packet is available -> move it to the queue
static const size_t packetbuf_size = 2048;
uint8_t packetbuf[packetbuf_size];
ret = read(m_tunfd, packetbuf, packetbuf_size);
if(ret < 0) {
LOG(LVL_ERR, "read: %s", strerror(errno));
return ERR_SYSCALL;
} else if(ret == 0) {
// no more data, should not happen
break;
}
RESULT_CHECK(connection_enqueue_packet(&l2conn, packetbuf, ret));
}
}
if((now > next_tx_switch_time)) {
if(connection_can_transmit(&l2conn)) {
// there is a packet to be (re)transmitted.
LOG(LVL_DEBUG, "Starting new burst.");
size_t burst_len = 0;
// add packets to the burst until only 50000 samples remain free in the SDR buffer
while(true) {
uint8_t packet_buf[2048];
size_t packet_size;
packet_size = connection_encode_next_packet(&l2conn,
connection_get_next_expected_seq(&l2conn),
packet_buf, sizeof(packet_buf));
if(packet_size == 0) {
// no more packets available
LOG(LVL_DEBUG, "Ending burst due to empty packet queue.");
break;
}
LOG(LVL_DEBUG, "Adding packet with %d bytes to burst.", packet_size);
burst_len++;
RESULT_CHECK(transmit(packet_buf, packet_size));
}
connection_tx_clean_empty_packet(&l2conn);
LOG(LVL_DEBUG, "Burst finished: %zd packets sent.", burst_len);
if(!on_air) {
LOG(LVL_INFO, "RX -> TX");
}
on_air = true;
} else if(on_air) { // TX on, but no more bursts to send
LOG(LVL_INFO, "TX -> RX");
on_air = false;
retransmit_time = get_hires_time() + HRTIME_SEC(1) + HRTIME_SEC(1.0 * rand() / RAND_MAX);
block_tx_for(TX_SWITCH_BACKOFF_AFTER_RX_ON);
}
}
if(!on_air) {
// ** Receive signal **
int ret = poll(&pfd_bcast, 1, 10);
if(ret < 0) {
LOG(LVL_ERR, "poll: %s", strerror(errno));
break;
}
if(ret == 0) {
continue;
}
uint8_t packetbuf[65536];
ret = recv(m_bcast_sock, packetbuf, sizeof(packetbuf), 0);
if(ret < 0) {
LOG(LVL_ERR, "recv: %s", strerror(errno));
}
if(ret <= 0) {
break;
}
handle_received_packet(packetbuf, ret);
total_bytes += ret;
uint64_t new = get_hires_time();
if(new >= next_stats_print_time) {
double rate = total_bytes * 1e9 / (new - old);
LOG(LVL_INFO, "\nEstimated rate: %.3f kB/s", rate / 1e3);
LOG(LVL_INFO, "Receiver statistics:");
LOG(LVL_INFO, " Preambles found: %8zd", m_rx_stats.preambles_found);
LOG(LVL_INFO, " Successful decodes: %8zd (%6.2f %%)",
m_rx_stats.successful_decodes, m_rx_stats.successful_decodes * 100.0f / m_rx_stats.preambles_found);
LOG(LVL_INFO, " Header errors: %8zd (%6.2f %%)",
m_rx_stats.header_errors, m_rx_stats.header_errors * 100.0f / m_rx_stats.preambles_found);
LOG(LVL_INFO, " Failed decodes: %8zd (%6.2f %%)",
m_rx_stats.failed_decodes, m_rx_stats.failed_decodes * 100.0f / m_rx_stats.preambles_found);
next_stats_print_time += HRTIME_MS(500);
total_bytes = 0;
old = new;
}
fprintf(stderr, "r");
}
}
// ** Cleanup **
close(m_bcast_sock);
connection_destroy(&l2conn);
jsonlogger_shutdown();
LOG(LVL_INFO, "Done.");
logger_shutdown();
}