WIP: Layer 2-Implementierung #6

Draft
thomas wants to merge 39 commits from layer2_dev into main
4 changed files with 65 additions and 21 deletions
Showing only changes of commit 729d61feb7 - Show all commits

View file

@ -5,6 +5,7 @@
#include "config.h" #include "config.h"
#include "layer2/ham64.h" #include "layer2/ham64.h"
#include "layer2/packet_queue.h"
#include "results.h" #include "results.h"
#define SEQ_NR_MASK 0xF #define SEQ_NR_MASK 0xF
@ -71,6 +72,11 @@ result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, siz
} }
// check if the packet really should be handled by us // check if the packet really should be handled by us
if(ham64_is_equal(&ctx->my_addr, &header.src_addr)) {
LOG(LVL_DEBUG, "Packet is from ourselves. Ignored.");
return OK;
}
if(!ham64_is_equal(&header.src_addr, &ctx->peer_addr)) { if(!ham64_is_equal(&header.src_addr, &ctx->peer_addr)) {
char fmt_src_addr[HAM64_FMT_MAX_LEN]; char fmt_src_addr[HAM64_FMT_MAX_LEN];
char fmt_peer_addr[HAM64_FMT_MAX_LEN]; char fmt_peer_addr[HAM64_FMT_MAX_LEN];
@ -209,6 +215,12 @@ result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t b
} }
bool connection_can_enqueue_packet(const connection_ctx_t *ctx)
{
return packet_queue_get_free_space(&ctx->packet_queue) > 0;
}
result_t connection_add_empty_packet(connection_ctx_t *ctx, bool tx_request) result_t connection_add_empty_packet(connection_ctx_t *ctx, bool tx_request)
{ {
// check the connection state // check the connection state

View file

@ -80,6 +80,11 @@ uint8_t connection_get_last_acked_seq(const connection_ctx_t *ctx);
*/ */
result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len); result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len);
/*!\brief Check if there is free space in the TX packet queue.
* \param ctx The connection context.
*/
bool connection_can_enqueue_packet(const connection_ctx_t *ctx);
/*!\brief Add an empty packet to ensure an acknowledgement is sent. /*!\brief Add an empty packet to ensure an acknowledgement is sent.
* \param ctx The connection context. * \param ctx The connection context.
* \param tx_request Value of the TX Request field in the packet. * \param tx_request Value of the TX Request field in the packet.

View file

@ -18,10 +18,8 @@ add_executable(
../../src/layer2/ham64.h ../../src/layer2/ham64.h
../../src/layer2/packet_queue.c ../../src/layer2/packet_queue.c
../../src/layer2/packet_queue.h ../../src/layer2/packet_queue.h
../../src/layer2/layer2_tx.c ../../src/layer2/connection.c
../../src/layer2/layer2_tx.h ../../src/layer2/connection.h
../../src/layer2/layer2_rx.c
../../src/layer2/layer2_rx.h
../../src/layer2/tundev.c ../../src/layer2/tundev.c
../../src/layer2/tundev.h ../../src/layer2/tundev.h
l2udptest.c l2udptest.c

View file

@ -20,6 +20,7 @@
#include <signal.h> #include <signal.h>
#include <errno.h> #include <errno.h>
#include "layer2/ham64.h"
#include "utils.h" #include "utils.h"
#define LOGGER_MODULE_NAME "main" #define LOGGER_MODULE_NAME "main"
@ -28,8 +29,7 @@
#include "jsonlogger.h" #include "jsonlogger.h"
#include "debug_structs.h" #include "debug_structs.h"
#include "layer2/layer2_tx.h" #include "layer2/connection.h"
#include "layer2/layer2_rx.h"
#include "layer2/tundev.h" #include "layer2/tundev.h"
@ -54,9 +54,7 @@ static double next_tx_switch_time = 0.0;
static rx_stats_t m_rx_stats; static rx_stats_t m_rx_stats;
static layer2_rx_t l2rx; static connection_ctx_t l2conn;
static layer2_tx_t l2tx;
static void signal_handler(int signal, siginfo_t *info, void *ctx) static void signal_handler(int signal, siginfo_t *info, void *ctx)
{ {
@ -79,11 +77,9 @@ void handle_received_packet(uint8_t *packet_data, size_t packet_len)
{ {
block_tx_for(TX_SWITCH_BACKOFF_END_OF_PACKET_MS); block_tx_for(TX_SWITCH_BACKOFF_END_OF_PACKET_MS);
bool shall_ack; result_t result = connection_handle_packet(&l2conn, packet_data, packet_len);
result_t result = layer2_rx_handle_packet(&l2rx, packet_data, packet_len, &shall_ack);
switch(result) { switch(result) {
case OK: case OK:
layer2_tx_handle_ack(&l2tx, layer2_rx_get_last_acked_seq(&l2rx), shall_ack);
m_rx_stats.successful_decodes++; m_rx_stats.successful_decodes++;
break; break;
@ -147,8 +143,13 @@ int main(int argc, char **argv)
return 1; return 1;
} }
RESULT_CHECK(layer2_tx_init(&l2tx, m_tunfd)); ham64_t my_address, peer_address;
RESULT_CHECK(layer2_rx_init(&l2rx, m_tunfd)); ham64_encode(MY_CALL, &my_address);
ham64_encode(PEER_CALL, &peer_address);
RESULT_CHECK(connection_init(&l2conn, &my_address, &peer_address));
// force connection into the established state
l2conn.conn_state = CONN_STATE_ESTABLISHED;
// ** Set up signal handling // ** Set up signal handling
@ -194,6 +195,12 @@ int main(int argc, char **argv)
// ** Process packets ** // ** Process packets **
struct pollfd pfd_tun;
memset(&pfd_tun, 0, sizeof(pfd_tun));
pfd_tun.fd = m_tunfd;
pfd_tun.events = POLLIN;
struct pollfd pfd_bcast; struct pollfd pfd_bcast;
memset(&pfd_bcast, 0, sizeof(pfd_bcast)); memset(&pfd_bcast, 0, sizeof(pfd_bcast));
@ -214,14 +221,37 @@ int main(int argc, char **argv)
if(retransmit_time != 0.0 && now >= retransmit_time) { if(retransmit_time != 0.0 && now >= retransmit_time) {
LOG(LVL_INFO, "Retransmit triggered."); LOG(LVL_INFO, "Retransmit triggered.");
retransmit_time = 0.0; retransmit_time = 0.0;
layer2_tx_restart(&l2tx); connection_restart_tx(&l2conn);
} }
// fill the TX queue from the TUN device // fill the TX queue from the TUN device
RESULT_CHECK(layer2_tx_fill_packet_queue(&l2tx)); while(connection_can_enqueue_packet(&l2conn)) {
int ret = poll(&pfd_tun, 1, 0 /* timeout */);
if(ret < 0) {
LOG(LVL_ERR, "poll: %s", strerror(errno));
break;
} else if(ret == 0) {
// no more packets
break;
} else {
// a packet is available -> move it to the queue
static const size_t packetbuf_size = 2048;
uint8_t packetbuf[packetbuf_size];
ret = read(m_tunfd, packetbuf, packetbuf_size);
if(ret < 0) {
LOG(LVL_ERR, "read: %s", strerror(errno));
return ERR_SYSCALL;
} else if(ret == 0) {
// no more data, should not happen
break;
}
RESULT_CHECK(connection_enqueue_packet(&l2conn, packetbuf, ret));
}
}
if((now > next_tx_switch_time)) { if((now > next_tx_switch_time)) {
if(layer2_tx_can_transmit(&l2tx)) { if(connection_can_transmit(&l2conn)) {
// there is a packet to be (re)transmitted. // there is a packet to be (re)transmitted.
LOG(LVL_DEBUG, "Starting new burst."); LOG(LVL_DEBUG, "Starting new burst.");
@ -233,8 +263,8 @@ int main(int argc, char **argv)
uint8_t packet_buf[2048]; uint8_t packet_buf[2048];
size_t packet_size; size_t packet_size;
packet_size = layer2_tx_encode_next_packet(&l2tx, packet_size = connection_encode_next_packet(&l2conn,
layer2_rx_get_next_expected_seq(&l2rx), connection_get_next_expected_seq(&l2conn),
packet_buf, sizeof(packet_buf)); packet_buf, sizeof(packet_buf));
if(packet_size == 0) { if(packet_size == 0) {
@ -321,8 +351,7 @@ int main(int argc, char **argv)
close(m_bcast_sock); close(m_bcast_sock);
layer2_tx_destroy(&l2tx); connection_destroy(&l2conn);
layer2_rx_destroy(&l2rx);
jsonlogger_shutdown(); jsonlogger_shutdown();