/* * SPDX-License-Identifier: GPL-3.0-or-later * * Copyright (C) 2024 Thomas Kolb */ #include #include #include #include #include #include #include #include #include #include #include #include #include "layer2/ham64.h" #include "results.h" #include "utils.h" #define LOGGER_MODULE_NAME "main" #include "logger.h" #include "jsonlogger.h" #include "debug_structs.h" #include "layer2/connection.h" #include "layer2/tundev.h" #include "config.h" #define RESULT_CHECK(stmt) { \ result_t res = stmt; \ if(res != OK) { \ LOG(LVL_FATAL, "Error %d in %s:%d!", res, __FILE__, __LINE__); \ exit(1); \ } \ } #define BROADCAST_PORT 3737 static int m_tunfd = -1; static bool m_running = true; static int m_bcast_sock = -1; static uint64_t next_tx_switch_time = 0; static rx_stats_t m_rx_stats; static connection_ctx_t l2conn; static void signal_handler(int signal, siginfo_t *info, void *ctx) { (void)signal; (void)info; (void)ctx; LOG(LVL_INFO, "\nGracefully shutting down on signal %d.", signal); m_running = false; } static void block_tx_for(unsigned offset_ms) { next_tx_switch_time = get_hires_time() + HRTIME_MS(offset_ms); } static result_t transmit(const uint8_t *data, size_t len) { result_t result = OK; struct sockaddr_in bcast_addr = {0}; bcast_addr.sin_family = AF_INET; bcast_addr.sin_port = htons(BROADCAST_PORT); bcast_addr.sin_addr.s_addr = INADDR_BROADCAST; int ret = sendto(m_bcast_sock, data, len, 0, (struct sockaddr*)&bcast_addr, sizeof(bcast_addr)); if(ret < 0) { LOG(LVL_ERR, "sendto: %s", strerror(errno)); exit(EXIT_FAILURE); } fprintf(stderr, "t"); return result; } void rx_data_to_tun(const layer2_data_packet_t *data_packet) { uint8_t tun_packet[4 + data_packet->payload_len]; // flags tun_packet[0] = 0; tun_packet[1] = 0; switch(data_packet->payload_type) { case L2_PAYLOAD_TYPE_IPV6: *(uint16_t*)(tun_packet+2) = htons(0x86dd); break; case L2_PAYLOAD_TYPE_IPV4: *(uint16_t*)(tun_packet+2) = htons(0x0800); break; default: LOG(LVL_ERR, "Unsupported payload type: 0x%08x.", data_packet->payload_type); return; } memcpy(tun_packet+4, data_packet->payload, data_packet->payload_len); int ret = write(m_tunfd, tun_packet, sizeof(tun_packet)); if(ret < 0) { LOG(LVL_ERR, "write(tun): %s", strerror(errno)); } } result_t connect_to_digipeater(connection_ctx_t *conn) { while(conn->conn_state != CONN_STATE_ESTABLISHED) { uint8_t packetbuf[65536]; LOG(LVL_INFO, "Waiting for packets from digipeater (conn. state: %d)", conn->conn_state); // note: recv() is blocking here int ret = recv(m_bcast_sock, packetbuf, sizeof(packetbuf), 0); if(ret < 0) { LOG(LVL_ERR, "recv: %s", strerror(errno)); return ERR_SYSCALL; } else if(ret == 0) { LOG(LVL_ERR, "recv() returned zero."); return ERR_SYSCALL; } layer2_data_packet_t data_packet; bool tx_request_received = false; result_t err_code = connection_handle_packet(conn, packetbuf, ret, &data_packet, &tx_request_received); switch(err_code) { case OK: break; // state machine advanced and there are probably packets to send case ERR_INTEGRITY: case ERR_INVALID_STATE: continue; // retry with the next packet default: LOG(LVL_ERR, "connection_handle_packet() returned %d.", err_code); return err_code; } LOG(LVL_INFO, "Packet processed successfully (new conn. state: %d); Transmitting response (if any).", conn->conn_state); // send any packets in the queue while(connection_can_transmit(conn)) { uint8_t packet_buf[2048]; size_t packet_size; bool end_burst; packet_size = connection_encode_next_packet(conn, packet_buf, sizeof(packet_buf), &end_burst); if(packet_size == 0) { // no more packets available LOG(LVL_DEBUG, "Ending burst due to empty packet queue."); break; } LOG(LVL_DEBUG, "Adding packet with %d bytes to burst.", packet_size); RESULT_CHECK(transmit(packet_buf, packet_size)); if(end_burst) { LOG(LVL_DEBUG, "Ending burst on request."); break; } } } return OK; } void conn_evt_cb(struct connection_ctx_s *conn, connection_evt_t evt, void *user_ctx) { (void)conn; (void)evt; (void)user_ctx; } int main(void) { // initialize the console logger logger_init(); if(!jsonlogger_init("jsonlog.fifo")) { LOG(LVL_FATAL, "Could not initialize JSON logger."); return EXIT_FAILURE; } srand(get_hires_time()); // ** Initialize ** char devname[IFNAMSIZ] = "hamnet70"; m_tunfd = tundev_open(devname); if(m_tunfd < 0) { return 1; } // ** Set up signal handling struct sigaction term_action = {0}; term_action.sa_sigaction = signal_handler; if(sigaction(SIGTERM, &term_action, NULL) < 0) { LOG(LVL_ERR, "sigaction: %s", strerror(errno)); exit(EXIT_FAILURE); } if(sigaction(SIGINT, &term_action, NULL) < 0) { LOG(LVL_ERR, "sigaction: %s", strerror(errno)); exit(EXIT_FAILURE); } // ** Set up UDP socket m_bcast_sock = socket(AF_INET, SOCK_DGRAM, 0); if(m_bcast_sock < 0) { LOG(LVL_ERR, "socket: %s", strerror(errno)); exit(EXIT_FAILURE); } int broadcastEnable=1; int ret = setsockopt(m_bcast_sock, SOL_SOCKET, SO_BROADCAST, &broadcastEnable, sizeof(broadcastEnable)); if(ret < 0) { LOG(LVL_ERR, "setsockopt: %s", strerror(errno)); exit(EXIT_FAILURE); } struct sockaddr_in bind_addr = {0}; bind_addr.sin_family = AF_INET; bind_addr.sin_port = htons(BROADCAST_PORT); bind_addr.sin_addr.s_addr = INADDR_ANY; ret = bind(m_bcast_sock, (struct sockaddr*)&bind_addr, sizeof(bind_addr)); if(ret < 0) { LOG(LVL_ERR, "bind: %s", strerror(errno)); exit(EXIT_FAILURE); } // ** Process packets ** struct pollfd pfd_tun; memset(&pfd_tun, 0, sizeof(pfd_tun)); pfd_tun.fd = m_tunfd; pfd_tun.events = POLLIN; struct pollfd pfd_bcast; memset(&pfd_bcast, 0, sizeof(pfd_bcast)); pfd_bcast.fd = m_bcast_sock; pfd_bcast.events = POLLIN; uint64_t old = get_hires_time(); size_t total_bytes = 0; uint64_t next_stats_print_time = old + HRTIME_MS(500); ham64_t my_address, peer_address; ham64_encode(MY_CALL, &my_address); ham64_encode(PEER_CALL, &peer_address); RESULT_CHECK(connection_init(&l2conn, &my_address, &peer_address)); connection_setup_outgoing(&l2conn); RESULT_CHECK(connect_to_digipeater(&l2conn)); while(m_running) { // fill the TX queue from the TUN device while(connection_can_enqueue_packet(&l2conn)) { int ret = poll(&pfd_tun, 1, 0 /* timeout */); if(ret < 0) { LOG(LVL_ERR, "poll: %s", strerror(errno)); break; } else if(ret == 0) { // no more packets break; } else { // a packet is available -> move it to the queue static const size_t packetbuf_size = 2048; uint8_t packetbuf[packetbuf_size]; ret = read(m_tunfd, packetbuf, packetbuf_size); if(ret < 0) { LOG(LVL_ERR, "read: %s", strerror(errno)); return ERR_SYSCALL; } else if(ret == 0) { // no more data, should not happen break; } else if(ret < 4) { LOG(LVL_ERR, "Not enough data from TUN read() to check packet type!"); return ERR_SYSCALL; } uint16_t flags = *(uint16_t*)packetbuf; uint16_t proto = *((uint16_t*)packetbuf + 1); LOG(LVL_DUMP, "TUN Flags: 0x%04x", flags); LOG(LVL_DUMP, "TUN Proto: 0x%04x", proto); uint8_t *packet_data = packetbuf + 4; size_t packet_length = ret - 4; if(proto != 0xdd86) { LOG(LVL_WARN, "Non-IPv6 packet ignored. Proto: 0x%04x", proto); continue; } RESULT_CHECK(connection_enqueue_data_packet(&l2conn, L2_PAYLOAD_TYPE_IPV6, packet_data, packet_length)); } } // transmit one burst if(connection_can_transmit(&l2conn)) { // there is a packet to be (re)transmitted. LOG(LVL_DEBUG, "Starting new burst."); size_t burst_len = 0; // add packets to the burst until only 50000 samples remain free in the SDR buffer while(true) { uint8_t packet_buf[2048]; size_t packet_size; bool end_burst; packet_size = connection_encode_next_packet(&l2conn, packet_buf, sizeof(packet_buf), &end_burst); if(packet_size == 0) { // no more packets available LOG(LVL_DEBUG, "Ending burst due to empty packet queue."); break; } LOG(LVL_DEBUG, "Adding packet with %d bytes to burst.", packet_size); burst_len++; RESULT_CHECK(transmit(packet_buf, packet_size)); if(end_burst) { LOG(LVL_DEBUG, "Ending burst on request."); break; } } connection_tx_clean_empty_packet(&l2conn); LOG(LVL_DEBUG, "Burst finished: %zd packets sent.", burst_len); } bool may_tx = false; while(!may_tx) { connection_evt_t evt; RESULT_CHECK(connection_maintain(&l2conn, &evt)); if(evt == CONN_EVT_TIMEOUT) { LOG(LVL_ERR, "Connection timed out. Shutting down."); m_running = false; break; } // ** Receive signal ** int ret = poll(&pfd_bcast, 1, 10); if(ret < 0) { LOG(LVL_ERR, "poll: %s", strerror(errno)); return EXIT_FAILURE; } if(ret == 0) { continue; } uint8_t packetbuf[65536]; ret = recv(m_bcast_sock, packetbuf, sizeof(packetbuf), 0); if(ret < 0) { LOG(LVL_ERR, "recv: %s", strerror(errno)); return EXIT_FAILURE; } else if(ret == 0) { LOG(LVL_ERR, "recv() returned zero."); return EXIT_FAILURE; } block_tx_for(TX_SWITCH_BACKOFF_END_OF_PACKET_MS); layer2_data_packet_t data_packet; bool tx_request_received = false; result_t result = connection_handle_packet(&l2conn, packetbuf, ret, &data_packet, &tx_request_received); // Switch to TX when a packet with tx_request=1 was decoded successfully. // Note that this even triggers if the packet is out of sequence (but not for corrupted packets). if(tx_request_received) { may_tx = true; } switch(result) { case OK: // update statistics m_rx_stats.successful_decodes++; total_bytes += ret; if(data_packet.payload_len != 0) { rx_data_to_tun(&data_packet); } break; case ERR_INTEGRITY: LOG(LVL_ERR, "Packet could not be decoded by Layer 2."); m_rx_stats.failed_decodes++; break; case ERR_SEQUENCE: LOG(LVL_ERR, "Packet not in the expected sequence."); break; case ERR_INVALID_STATE: LOG(LVL_WARN, "Packet ignored due to invalid state."); break; default: // all other errors LOG(LVL_ERR, "layer2_rx_handle_packet() returned error code %u.", result); break; } uint64_t new = get_hires_time(); if(new >= next_stats_print_time) { double rate = total_bytes * 1e9 / (new - old); LOG(LVL_INFO, "\nEstimated rate: %.3f kB/s", rate / 1e3); LOG(LVL_INFO, "Receiver statistics:"); LOG(LVL_INFO, " Preambles found: %8zd", m_rx_stats.preambles_found); LOG(LVL_INFO, " Successful decodes: %8zd (%6.2f %%)", m_rx_stats.successful_decodes, m_rx_stats.successful_decodes * 100.0f / m_rx_stats.preambles_found); LOG(LVL_INFO, " Header errors: %8zd (%6.2f %%)", m_rx_stats.header_errors, m_rx_stats.header_errors * 100.0f / m_rx_stats.preambles_found); LOG(LVL_INFO, " Failed decodes: %8zd (%6.2f %%)", m_rx_stats.failed_decodes, m_rx_stats.failed_decodes * 100.0f / m_rx_stats.preambles_found); next_stats_print_time += HRTIME_MS(500); total_bytes = 0; old = new; } fprintf(stderr, "r"); } // give the other side some time to switch to rx sleep_until(next_tx_switch_time); } // ** Cleanup ** close(m_bcast_sock); connection_destroy(&l2conn); jsonlogger_shutdown(); LOG(LVL_INFO, "Done."); logger_shutdown(); }