Integrate new layer2 TX packet handling

This commit is contained in:
Thomas Kolb 2024-07-18 22:23:24 +02:00
parent 5adbbca786
commit 7381c7e808
2 changed files with 43 additions and 50 deletions

View file

@ -46,14 +46,16 @@ result_t layer2_tx_fill_packet_queue(layer2_tx_t *ctx, int tun_fd)
header.rx_seq_nr = 0; // will be filled in layer2_tx_encode_next_packet() header.rx_seq_nr = 0; // will be filled in layer2_tx_encode_next_packet()
header.tx_request = 0; header.tx_request = 0;
int ret = 0; while(packet_queue_get_free_space(&ctx->packet_queue) > 0) {
do {
int ret = poll(&pfd, 1, 0); int ret = poll(&pfd, 1, 0);
if(ret < 0) { if(ret < 0) {
LOG(LVL_ERR, "poll: %s", strerror(errno)); LOG(LVL_ERR, "poll: %s", strerror(errno));
return ERR_SYSCALL; return ERR_SYSCALL;
} else if(ret > 0) { } else if(ret == 0) {
// no more packets
break;
} else {
// a packet is available -> move it to the queue
header.tx_seq_nr = ctx->next_seq_nr; header.tx_seq_nr = ctx->next_seq_nr;
uint8_t packetbuf[2048]; uint8_t packetbuf[2048];
@ -71,7 +73,7 @@ result_t layer2_tx_fill_packet_queue(layer2_tx_t *ctx, int tun_fd)
ctx->next_seq_nr++; ctx->next_seq_nr++;
ctx->next_seq_nr &= SEQ_NR_MASK; ctx->next_seq_nr &= SEQ_NR_MASK;
} }
} while((ret > 0) && (packet_queue_get_free_space(&ctx->packet_queue) > 0)); }
return OK; return OK;
} }
@ -140,5 +142,6 @@ void layer2_tx_handle_ack(layer2_tx_t *ctx, uint8_t acked_seq)
bool layer2_tx_can_transmit(const layer2_tx_t *ctx) bool layer2_tx_can_transmit(const layer2_tx_t *ctx)
{ {
return packet_queue_get_used_space(&ctx->packet_queue) != 0; return (packet_queue_get_used_space(&ctx->packet_queue) != 0)
&& (packet_queue_get(&ctx->packet_queue, ctx->next_packet_index) != NULL);
} }

View file

@ -1,7 +1,6 @@
#include <linux/if.h> #include <linux/if.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <ctype.h>
#include <string.h> #include <string.h>
#include <liquid/liquid.h> #include <liquid/liquid.h>
@ -23,6 +22,7 @@
#include "layer1/rx.h" #include "layer1/rx.h"
#include "layer2/tundev.h" #include "layer2/tundev.h"
#include "layer2/layer2_tx.h"
#include "sdr/sdr.h" #include "sdr/sdr.h"
@ -145,6 +145,8 @@ int main(int argc, char **argv)
layer1_tx_t tx; layer1_tx_t tx;
layer1_rx_t rx; layer1_rx_t rx;
layer2_tx_t l2tx;
sdr_ctx_t sdr; sdr_ctx_t sdr;
// initialize the console logger // initialize the console logger
@ -177,6 +179,8 @@ int main(int argc, char **argv)
RESULT_CHECK(layer1_tx_init(&tx)); RESULT_CHECK(layer1_tx_init(&tx));
RESULT_CHECK(layer1_rx_init(&rx, cb_rx)); RESULT_CHECK(layer1_rx_init(&rx, cb_rx));
RESULT_CHECK(layer2_tx_init(&l2tx));
// ** Set up signal handling // ** Set up signal handling
struct sigaction term_action = {0}; struct sigaction term_action = {0};
@ -212,13 +216,12 @@ int main(int argc, char **argv)
while(m_running) { while(m_running) {
double now = get_hires_time(); double now = get_hires_time();
// fill the TX queue from the TUN device
layer2_tx_fill_packet_queue(&l2tx, m_tunfd);
if((now > next_tx_switch_time) && (on_air || !layer1_rx_is_busy(&rx))) { if((now > next_tx_switch_time) && (on_air || !layer1_rx_is_busy(&rx))) {
int ret = poll(&pfd, 1, 0); if(layer2_tx_can_transmit(&l2tx)) {
if(ret < 0) { // there is a packet to be (re)transmitted.
LOG(LVL_ERR, "poll: %s", strerror(errno));
break;
} else if(ret > 0) {
// there is a packet to be read.
// check free buffer space (50 ms required corresponding to 5000 baseband symbols) // check free buffer space (50 ms required corresponding to 5000 baseband symbols)
size_t buffer_free_space_samples = sdr_get_tx_buffer_free_space(&sdr); size_t buffer_free_space_samples = sdr_get_tx_buffer_free_space(&sdr);
@ -231,57 +234,42 @@ int main(int argc, char **argv)
continue; continue;
} }
uint8_t packetbuf[2048]; // reset the TX (generates the ramp-up)
ret = read(m_tunfd, packetbuf, sizeof(packetbuf)); LOG(LVL_DEBUG, "Starting new burst.");
if(ret < 0) {
LOG(LVL_ERR, "read: %s", strerror(errno));
break;
} else if(ret == 0) {
// no more data
break;
}
/* append CRC to packet */
unsigned int crc_size = crc_sizeof_key(PAYLOAD_CRC_SCHEME);
if((unsigned)ret > sizeof(packetbuf) - crc_size) {
LOG(LVL_ERR, "Packet is too large to append CRC! %i bytes.", ret);
break;
}
crc_append_key(PAYLOAD_CRC_SCHEME, packetbuf, ret);
LOG(LVL_DEBUG, "Transmitting packet with %d bytes.", ret + crc_size);
RESULT_CHECK(layer1_tx_reset(&tx)); RESULT_CHECK(layer1_tx_reset(&tx));
// ** Modulate packet ** // add packets to the burst until only 50000 samples remain free in the SDR buffer
while(layer1_tx_get_sample_count(&tx) + 50000 < buffer_free_space_samples) {
uint8_t packet_buf[2048];
size_t packet_size;
RESULT_CHECK(layer1_tx_add_packet_to_burst(&tx, packetbuf, ret + crc_size)); packet_size = layer2_tx_encode_next_packet(&l2tx, 0 /* FIXME */, packet_buf, sizeof(packet_buf));
if(packet_size == 0) {
// no more packets available
LOG(LVL_DEBUG, "Ending burst due to empty packet queue.");
break;
}
LOG(LVL_DEBUG, "Adding packet with %d bytes to burst.", packet_size);
RESULT_CHECK(layer1_tx_add_packet_to_burst(&tx, packet_buf, packet_size));
}
// generate the ramp-down
RESULT_CHECK(layer1_tx_finalize_burst(&tx)); RESULT_CHECK(layer1_tx_finalize_burst(&tx));
size_t burst_len = layer1_tx_get_sample_count(&tx); size_t burst_len = layer1_tx_get_sample_count(&tx);
const float complex *whole_burst = layer1_tx_get_sample_data(&tx); const float complex *whole_burst = layer1_tx_get_sample_data(&tx);
LOG(LVL_DEBUG, "Burst finalized: %zd samples.", burst_len);
dump_array_cf(whole_burst, burst_len, 1.0f, "/tmp/tx.cpx"); dump_array_cf(whole_burst, burst_len, 1.0f, "/tmp/tx.cpx");
// ensure that the buffer is full before TX is turned on to avoid transmitting empty buffers // ensure that the buffer is full before TX is turned on to avoid transmitting empty buffers
RESULT_CHECK(transmit(&sdr, whole_burst, burst_len)); RESULT_CHECK(transmit(&sdr, whole_burst, burst_len));
if(!on_air) { if(!on_air) {
size_t buffer_used_samples = sdr_get_tx_buffer_used_space(&sdr);
const size_t min_required_samples = 4*128*1024;
int ret2 = poll(&pfd, 1, 0);
if(ret2 < 0) {
LOG(LVL_ERR, "poll: %s", strerror(errno));
break;
} else if(ret2 != 0 && (buffer_used_samples < min_required_samples)) {
// enqueue more packets before starting TX
LOG(LVL_INFO, "Pre-buffering more packets: %zd / %zd samples.", buffer_used_samples, min_required_samples);
continue;
}
LOG(LVL_INFO, "RX -> TX"); LOG(LVL_INFO, "RX -> TX");
RESULT_CHECK(sdr_stop_rx(&sdr)); RESULT_CHECK(sdr_stop_rx(&sdr));
@ -366,6 +354,8 @@ int main(int argc, char **argv)
// ** Cleanup ** // ** Cleanup **
layer2_tx_destroy(&l2tx);
layer1_tx_shutdown(&tx); layer1_tx_shutdown(&tx);
layer1_rx_shutdown(&rx); layer1_rx_shutdown(&rx);