connection: process beacons and send connection requests
Some checks failed
/ build-hamnet70 (push) Failing after 17s
/ build-doc (push) Successful in 17s
/ deploy-doc (push) Has been skipped

This commit is contained in:
Thomas Kolb 2024-12-10 22:39:26 +01:00
parent fefacc69c3
commit 559283d87f
4 changed files with 196 additions and 40 deletions

View file

@ -72,6 +72,18 @@ void connection_destroy(connection_ctx_t *ctx)
}
void connection_setup_outgoing(connection_ctx_t *ctx)
{
ctx->conn_state = CONN_STATE_CONNECTING;
}
void connection_setup_incoming(connection_ctx_t *ctx)
{
ctx->conn_state = CONN_STATE_ESTABLISHED;
}
result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, size_t buf_len, layer2_data_packet_t *data_packet)
{
// check the connection state
@ -131,6 +143,48 @@ result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, siz
return connection_handle_packet_prechecked(ctx, &header, payload, payload_len, data_packet);
}
static result_t handle_conn_mgmt(
connection_ctx_t *ctx,
const layer2_packet_header_t *header,
const uint8_t *payload, size_t payload_len)
{
(void)header;
if(payload_len < 1) {
LOG(LVL_ERR, "Connection management packet without any payload is invalid.");
return ERR_INVALID_PARAM;
}
uint8_t packet_type = payload[0];
if(packet_type == CONN_MGMT_TYPE_BEACON) {
if(ctx->conn_state == CONN_STATE_CONNECTING) {
LOG(LVL_INFO, "Received beacon; queueing connection request.");
// enqueue a connection request packet
layer2_packet_header_t conn_request_header;
conn_request_header.tx_request = true;
conn_request_header.dst_addr = ctx->peer_addr;
conn_request_header.src_addr = ctx->my_addr;
conn_request_header.msg_type = L2_MSG_TYPE_CONN_MGMT;
conn_request_header.rx_seq_nr = 0;
conn_request_header.tx_seq_nr = 0;
// create a persistent copy of the packet data.
uint8_t packetbuf[1];
packetbuf[0] = CONN_MGMT_TYPE_CONNECTION_REQUEST;
connection_enqueue_packet(ctx, &conn_request_header, packetbuf, 1);
} else {
LOG(LVL_WARN, "Beacons are ignored in states other than CONNECTING.");
return ERR_INVALID_STATE;
}
}
return OK;
}
result_t connection_handle_packet_prechecked(
connection_ctx_t *ctx,
const layer2_packet_header_t *header,
@ -182,6 +236,8 @@ result_t connection_handle_packet_prechecked(
return OK; // do not ACK
case L2_MSG_TYPE_CONN_MGMT:
return handle_conn_mgmt(ctx, header, payload, payload_len);
case L2_MSG_TYPE_CONNECTIONLESS:
LOG(LVL_WARN, "Message type %s is not implemented yet.", layer2_msg_type_to_string(header->msg_type));
return OK;
@ -230,7 +286,39 @@ uint8_t connection_get_last_acked_seq(const connection_ctx_t *ctx)
}
result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len)
result_t connection_enqueue_packet(
connection_ctx_t *ctx,
const layer2_packet_header_t *header,
uint8_t *payload, size_t payload_len)
{
if(packet_queue_get_free_space(&ctx->packet_queue) == 0) {
return ERR_NO_MEM;
}
uint8_t *packetbuf = NULL;
if(payload) {
// create a persistent copy of the packet data.
// TODO: possibly this copy operation can be removed by passing a malloc'd buffer in.
packetbuf = malloc(payload_len);
if(!packetbuf) {
LOG(LVL_ERR, "malloc failed.");
return ERR_NO_MEM;
}
}
memcpy(packetbuf, payload, payload_len);
packet_queue_add(&ctx->packet_queue, header, packetbuf, payload_len);
LOG(LVL_INFO, "Added packet tx_seq %u to queue -> %zu entries",
header->tx_seq_nr, packet_queue_get_used_space(&ctx->packet_queue));
return OK;
}
result_t connection_enqueue_data_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len)
{
// check the connection state
switch(ctx->conn_state) {
@ -248,10 +336,6 @@ result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t b
layer2_packet_header_t header;
if(packet_queue_get_free_space(&ctx->packet_queue) == 0) {
return ERR_NO_MEM;
}
header.dst_addr = ctx->peer_addr;
header.src_addr = ctx->my_addr;
header.msg_type = L2_MSG_TYPE_DATA;
@ -259,20 +343,7 @@ result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t b
header.tx_request = 0;
header.tx_seq_nr = ctx->next_seq_nr;
// create a persistent copy of the packet data.
// TODO: possibly this copy operation can be removed by passing a malloc'd buffer in.
uint8_t *packetbuf = malloc(buf_len);
if(!packetbuf) {
LOG(LVL_ERR, "malloc failed.");
return ERR_NO_MEM;
}
memcpy(packetbuf, buf, buf_len);
packet_queue_add(&ctx->packet_queue, &header, packetbuf, buf_len);
LOG(LVL_INFO, "Added packet tx_seq %u to queue -> %zu entries",
header.tx_seq_nr, packet_queue_get_used_space(&ctx->packet_queue));
ERR_CHECK(connection_enqueue_packet(ctx, &header, buf, buf_len));
ctx->next_seq_nr++;
ctx->next_seq_nr &= SEQ_NR_MASK;
@ -346,18 +417,13 @@ result_t connection_send_parameters(connection_ctx_t *ctx)
header.tx_request = 1;
size_t payload_len = 1;
uint8_t *payload = malloc(payload_len);
if(!payload) {
return ERR_NO_MEM;
}
uint8_t payload[payload_len];
payload[0] = CONN_MGMT_TYPE_CONNECTION_PARAMETERS;
// TODO: calculate IP addresses from clients HAM64 address
if (!packet_queue_add(&ctx->packet_queue, &header, payload, payload_len)) {
return ERR_NO_MEM;
}
ERR_CHECK(connection_enqueue_packet(ctx, &header, payload, payload_len));
ctx->next_seq_nr++;
ctx->next_seq_nr &= SEQ_NR_MASK;

View file

@ -69,6 +69,20 @@ result_t connection_init(
*/
void connection_destroy(connection_ctx_t *ctx);
/*!\brief Set up a outgoing connection.
*
* This puts the connection into CONN_STATE_CONNECTING state, which causes
* beacons to be handled.
*/
void connection_setup_outgoing(connection_ctx_t *ctx);
/*!\brief Set up an incoming connection.
*
* As this function is intended to be called after a connection request was
* handled, it puts the connection directly into CONN_STATE_ESTABLISHED state.
*/
void connection_setup_incoming(connection_ctx_t *ctx);
/*!\brief Handle a received packet.
*
* \param[inout] ctx The connection context.
@ -112,9 +126,22 @@ uint8_t connection_get_next_expected_seq(const connection_ctx_t *ctx);
uint8_t connection_get_last_acked_seq(const connection_ctx_t *ctx);
/*!\brief Enqueue a packet for transmission.
* \param ctx The connection context.
* \param ctx The connection context.
* \param header Pointer to the packet header.
* \param payload Pointer to the payload.
* \param payload_len Length of the payload.
*/
result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len);
result_t connection_enqueue_packet(
connection_ctx_t *ctx,
const layer2_packet_header_t *header,
uint8_t *payload, size_t payload_len);
/*!\brief Enqueue a data packet for transmission.
* \param ctx The connection context.
* \param buf Pointer to the data buffer.
* \param buf_len Length of the data.
*/
result_t connection_enqueue_data_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len);
/*!\brief Check if there is free space in the TX packet queue.
* \param ctx The connection context.

View file

@ -213,7 +213,7 @@ result_t connection_list_enqueue_packet(connection_list_t *list, uint8_t *data,
return ERR_INVALID_ADDRESS;
}
return connection_enqueue_packet(&ptr->connection, data, data_len);
return connection_enqueue_data_packet(&ptr->connection, data, data_len);
}

View file

@ -122,6 +122,71 @@ void rx_data_to_tun(const layer2_data_packet_t *data_packet)
}
result_t connect_to_digipeater(connection_ctx_t *conn)
{
while(conn->conn_state != CONN_STATE_ESTABLISHED) {
uint8_t packetbuf[65536];
LOG(LVL_INFO, "Waiting for packets from digipeater (conn. state: %d)", conn->conn_state);
// note: recv() is blocking here
int ret = recv(m_bcast_sock, packetbuf, sizeof(packetbuf), 0);
if(ret < 0) {
LOG(LVL_ERR, "recv: %s", strerror(errno));
return ERR_SYSCALL;
} else if(ret == 0) {
LOG(LVL_ERR, "recv() returned zero.");
return ERR_SYSCALL;
}
layer2_data_packet_t data_packet;
result_t err_code = connection_handle_packet(conn, packetbuf, ret, &data_packet);
switch(err_code) {
case OK:
break; // state machine advanced and there are probably packets to send
case ERR_INTEGRITY:
case ERR_INVALID_STATE:
continue; // retry with the next packet
default:
LOG(LVL_ERR, "connection_handle_packet() returned %d.", err_code);
return err_code;
}
LOG(LVL_INFO, "Packet processed successfully (new conn. state: %d); Transmitting response (if any).", conn->conn_state);
// send any packets in the queue
while(connection_can_transmit(conn)) {
uint8_t packet_buf[2048];
size_t packet_size;
bool end_burst;
packet_size = connection_encode_next_packet(conn,
packet_buf, sizeof(packet_buf), &end_burst);
if(packet_size == 0) {
// no more packets available
LOG(LVL_DEBUG, "Ending burst due to empty packet queue.");
break;
}
LOG(LVL_DEBUG, "Adding packet with %d bytes to burst.", packet_size);
RESULT_CHECK(transmit(packet_buf, packet_size));
if(end_burst) {
LOG(LVL_DEBUG, "Ending burst on request.");
break;
}
}
}
return OK;
}
void conn_evt_cb(struct connection_ctx_s *conn, connection_evt_t evt, void *user_ctx)
{
(void)conn;
@ -151,14 +216,6 @@ int main(void)
return 1;
}
ham64_t my_address, peer_address;
ham64_encode(MY_CALL, &my_address);
ham64_encode(PEER_CALL, &peer_address);
RESULT_CHECK(connection_init(&l2conn, &my_address, &peer_address));
// force connection into the established state
l2conn.conn_state = CONN_STATE_ESTABLISHED;
// ** Set up signal handling
struct sigaction term_action = {0};
@ -219,8 +276,14 @@ int main(void)
size_t total_bytes = 0;
uint64_t next_stats_print_time = old + HRTIME_MS(500);
// TODO: wait for beacon
// TODO: send connection request
ham64_t my_address, peer_address;
ham64_encode(MY_CALL, &my_address);
ham64_encode(PEER_CALL, &peer_address);
RESULT_CHECK(connection_init(&l2conn, &my_address, &peer_address));
connection_setup_outgoing(&l2conn);
RESULT_CHECK(connect_to_digipeater(&l2conn));
while(m_running) {
connection_evt_t evt;
@ -256,7 +319,7 @@ int main(void)
LOG(LVL_DUMP, "TUN Flags: 0x%04x", *(uint16_t*)packetbuf);
LOG(LVL_DUMP, "TUN Proto: 0x%04x", *((uint16_t*)packetbuf + 1));
RESULT_CHECK(connection_enqueue_packet(&l2conn, packetbuf, ret));
RESULT_CHECK(connection_enqueue_data_packet(&l2conn, packetbuf, ret));
}
}