From 559283d87fdc5f54ca6de94aac98419f7b06ecf1 Mon Sep 17 00:00:00 2001
From: Thomas Kolb <cfr34k-git@tkolb.de>
Date: Tue, 10 Dec 2024 22:39:26 +0100
Subject: [PATCH] connection: process beacons and send connection requests

---
 impl/src/layer2/connection.c                 | 118 +++++++++++++++----
 impl/src/layer2/connection.h                 |  31 ++++-
 impl/src/layer2/connection_list.c            |   2 +-
 impl/test/layer2_over_udp/l2udptest_client.c |  85 +++++++++++--
 4 files changed, 196 insertions(+), 40 deletions(-)

diff --git a/impl/src/layer2/connection.c b/impl/src/layer2/connection.c
index 08a9279..e2c6f89 100644
--- a/impl/src/layer2/connection.c
+++ b/impl/src/layer2/connection.c
@@ -72,6 +72,18 @@ void connection_destroy(connection_ctx_t *ctx)
 }
 
 
+void connection_setup_outgoing(connection_ctx_t *ctx)
+{
+	ctx->conn_state = CONN_STATE_CONNECTING;
+}
+
+
+void connection_setup_incoming(connection_ctx_t *ctx)
+{
+	ctx->conn_state = CONN_STATE_ESTABLISHED;
+}
+
+
 result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, size_t buf_len, layer2_data_packet_t *data_packet)
 {
 	// check the connection state
@@ -131,6 +143,48 @@ result_t connection_handle_packet(connection_ctx_t *ctx, const uint8_t *buf, siz
 	return connection_handle_packet_prechecked(ctx, &header, payload, payload_len, data_packet);
 }
 
+static result_t handle_conn_mgmt(
+		connection_ctx_t *ctx,
+		const layer2_packet_header_t *header,
+		const uint8_t *payload, size_t payload_len)
+{
+	(void)header;
+
+	if(payload_len < 1) {
+		LOG(LVL_ERR, "Connection management packet without any payload is invalid.");
+		return ERR_INVALID_PARAM;
+	}
+
+	uint8_t packet_type = payload[0];
+
+	if(packet_type == CONN_MGMT_TYPE_BEACON) {
+		if(ctx->conn_state == CONN_STATE_CONNECTING) {
+			LOG(LVL_INFO, "Received beacon; queueing connection request.");
+
+			// enqueue a connection request packet
+			layer2_packet_header_t conn_request_header;
+
+			conn_request_header.tx_request = true;
+			conn_request_header.dst_addr = ctx->peer_addr;
+			conn_request_header.src_addr = ctx->my_addr;
+			conn_request_header.msg_type = L2_MSG_TYPE_CONN_MGMT;
+			conn_request_header.rx_seq_nr = 0;
+			conn_request_header.tx_seq_nr = 0;
+
+			// create a persistent copy of the packet data.
+			uint8_t packetbuf[1];
+			packetbuf[0] = CONN_MGMT_TYPE_CONNECTION_REQUEST;
+
+			connection_enqueue_packet(ctx, &conn_request_header, packetbuf, 1);
+		} else {
+			LOG(LVL_WARN, "Beacons are ignored in states other than CONNECTING.");
+			return ERR_INVALID_STATE;
+		}
+	}
+
+	return OK;
+}
+
 result_t connection_handle_packet_prechecked(
 		connection_ctx_t *ctx,
 		const layer2_packet_header_t *header,
@@ -182,6 +236,8 @@ result_t connection_handle_packet_prechecked(
 			return OK; // do not ACK
 
 		case L2_MSG_TYPE_CONN_MGMT:
+			return handle_conn_mgmt(ctx, header, payload, payload_len);
+
 		case L2_MSG_TYPE_CONNECTIONLESS:
 			LOG(LVL_WARN, "Message type %s is not implemented yet.", layer2_msg_type_to_string(header->msg_type));
 			return OK;
@@ -230,7 +286,39 @@ uint8_t connection_get_last_acked_seq(const connection_ctx_t *ctx)
 }
 
 
-result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len)
+result_t connection_enqueue_packet(
+		connection_ctx_t *ctx,
+		const layer2_packet_header_t *header,
+		uint8_t *payload, size_t payload_len)
+{
+	if(packet_queue_get_free_space(&ctx->packet_queue) == 0) {
+		return ERR_NO_MEM;
+	}
+
+	uint8_t *packetbuf = NULL;
+
+	if(payload) {
+		// create a persistent copy of the packet data.
+		// TODO: possibly this copy operation can be removed by passing a malloc'd buffer in.
+		packetbuf = malloc(payload_len);
+		if(!packetbuf) {
+			LOG(LVL_ERR, "malloc failed.");
+			return ERR_NO_MEM;
+		}
+	}
+
+	memcpy(packetbuf, payload, payload_len);
+
+	packet_queue_add(&ctx->packet_queue, header, packetbuf, payload_len);
+
+	LOG(LVL_INFO, "Added packet tx_seq %u to queue -> %zu entries",
+			header->tx_seq_nr, packet_queue_get_used_space(&ctx->packet_queue));
+
+	return OK;
+}
+
+
+result_t connection_enqueue_data_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len)
 {
 	// check the connection state
 	switch(ctx->conn_state) {
@@ -248,10 +336,6 @@ result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t b
 
 	layer2_packet_header_t header;
 
-	if(packet_queue_get_free_space(&ctx->packet_queue) == 0) {
-		return ERR_NO_MEM;
-	}
-
 	header.dst_addr = ctx->peer_addr;
 	header.src_addr = ctx->my_addr;
 	header.msg_type = L2_MSG_TYPE_DATA;
@@ -259,20 +343,7 @@ result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t b
 	header.tx_request = 0;
 	header.tx_seq_nr = ctx->next_seq_nr;
 
-	// create a persistent copy of the packet data.
-	// TODO: possibly this copy operation can be removed by passing a malloc'd buffer in.
-	uint8_t *packetbuf = malloc(buf_len);
-	if(!packetbuf) {
-		LOG(LVL_ERR, "malloc failed.");
-		return ERR_NO_MEM;
-	}
-
-	memcpy(packetbuf, buf, buf_len);
-
-	packet_queue_add(&ctx->packet_queue, &header, packetbuf, buf_len);
-
-	LOG(LVL_INFO, "Added packet tx_seq %u to queue -> %zu entries",
-			header.tx_seq_nr, packet_queue_get_used_space(&ctx->packet_queue));
+	ERR_CHECK(connection_enqueue_packet(ctx, &header, buf, buf_len));
 
 	ctx->next_seq_nr++;
 	ctx->next_seq_nr &= SEQ_NR_MASK;
@@ -346,18 +417,13 @@ result_t connection_send_parameters(connection_ctx_t *ctx)
 	header.tx_request = 1;
 
 	size_t payload_len = 1;
-	uint8_t *payload = malloc(payload_len);
-	if(!payload) {
-		return ERR_NO_MEM;
-	}
+	uint8_t payload[payload_len];
 
 	payload[0] = CONN_MGMT_TYPE_CONNECTION_PARAMETERS;
 
 	// TODO: calculate IP addresses from client’s HAM64 address
 
-	if (!packet_queue_add(&ctx->packet_queue, &header, payload, payload_len)) {
-		return ERR_NO_MEM;
-	}
+	ERR_CHECK(connection_enqueue_packet(ctx, &header, payload, payload_len));
 
 	ctx->next_seq_nr++;
 	ctx->next_seq_nr &= SEQ_NR_MASK;
diff --git a/impl/src/layer2/connection.h b/impl/src/layer2/connection.h
index 410c0bc..017fe04 100644
--- a/impl/src/layer2/connection.h
+++ b/impl/src/layer2/connection.h
@@ -69,6 +69,20 @@ result_t connection_init(
  */
 void connection_destroy(connection_ctx_t *ctx);
 
+/*!\brief Set up a outgoing connection.
+ *
+ * This puts the connection into CONN_STATE_CONNECTING state, which causes
+ * beacons to be handled.
+ */
+void connection_setup_outgoing(connection_ctx_t *ctx);
+
+/*!\brief Set up an incoming connection.
+ *
+ * As this function is intended to be called after a connection request was
+ * handled, it puts the connection directly into CONN_STATE_ESTABLISHED state.
+ */
+void connection_setup_incoming(connection_ctx_t *ctx);
+
 /*!\brief Handle a received packet.
  *
  * \param[inout] ctx           The connection context.
@@ -112,9 +126,22 @@ uint8_t connection_get_next_expected_seq(const connection_ctx_t *ctx);
 uint8_t connection_get_last_acked_seq(const connection_ctx_t *ctx);
 
 /*!\brief Enqueue a packet for transmission.
- * \param ctx       The connection context.
+ * \param ctx          The connection context.
+ * \param header       Pointer to the packet header.
+ * \param payload      Pointer to the payload.
+ * \param payload_len  Length of the payload.
  */
-result_t connection_enqueue_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len);
+result_t connection_enqueue_packet(
+		connection_ctx_t *ctx,
+		const layer2_packet_header_t *header,
+		uint8_t *payload, size_t payload_len);
+
+/*!\brief Enqueue a data packet for transmission.
+ * \param ctx       The connection context.
+ * \param buf       Pointer to the data buffer.
+ * \param buf_len   Length of the data.
+ */
+result_t connection_enqueue_data_packet(connection_ctx_t *ctx, uint8_t *buf, size_t buf_len);
 
 /*!\brief Check if there is free space in the TX packet queue.
  * \param ctx       The connection context.
diff --git a/impl/src/layer2/connection_list.c b/impl/src/layer2/connection_list.c
index cfaf733..8ead247 100644
--- a/impl/src/layer2/connection_list.c
+++ b/impl/src/layer2/connection_list.c
@@ -213,7 +213,7 @@ result_t connection_list_enqueue_packet(connection_list_t *list, uint8_t *data,
 		return ERR_INVALID_ADDRESS;
 	}
 
-	return connection_enqueue_packet(&ptr->connection, data, data_len);
+	return connection_enqueue_data_packet(&ptr->connection, data, data_len);
 }
 
 
diff --git a/impl/test/layer2_over_udp/l2udptest_client.c b/impl/test/layer2_over_udp/l2udptest_client.c
index 6278a9f..69a6cb4 100644
--- a/impl/test/layer2_over_udp/l2udptest_client.c
+++ b/impl/test/layer2_over_udp/l2udptest_client.c
@@ -122,6 +122,71 @@ void rx_data_to_tun(const layer2_data_packet_t *data_packet)
 }
 
 
+result_t connect_to_digipeater(connection_ctx_t *conn)
+{
+	while(conn->conn_state != CONN_STATE_ESTABLISHED) {
+		uint8_t packetbuf[65536];
+
+		LOG(LVL_INFO, "Waiting for packets from digipeater (conn. state: %d)", conn->conn_state);
+
+		// note: recv() is blocking here
+		int ret = recv(m_bcast_sock, packetbuf, sizeof(packetbuf), 0);
+		if(ret < 0) {
+			LOG(LVL_ERR, "recv: %s", strerror(errno));
+			return ERR_SYSCALL;
+		} else if(ret == 0) {
+			LOG(LVL_ERR, "recv() returned zero.");
+			return ERR_SYSCALL;
+		}
+
+		layer2_data_packet_t data_packet;
+		result_t err_code = connection_handle_packet(conn, packetbuf, ret, &data_packet);
+
+		switch(err_code) {
+			case OK:
+				break; // state machine advanced and there are probably packets to send
+
+			case ERR_INTEGRITY:
+			case ERR_INVALID_STATE:
+				continue; // retry with the next packet
+
+			default:
+				LOG(LVL_ERR, "connection_handle_packet() returned %d.", err_code);
+				return err_code;
+		}
+
+		LOG(LVL_INFO, "Packet processed successfully (new conn. state: %d); Transmitting response (if any).", conn->conn_state);
+
+		// send any packets in the queue
+		while(connection_can_transmit(conn)) {
+			uint8_t packet_buf[2048];
+			size_t packet_size;
+			bool end_burst;
+
+			packet_size = connection_encode_next_packet(conn,
+					packet_buf, sizeof(packet_buf), &end_burst);
+
+			if(packet_size == 0) {
+				// no more packets available
+				LOG(LVL_DEBUG, "Ending burst due to empty packet queue.");
+				break;
+			}
+
+			LOG(LVL_DEBUG, "Adding packet with %d bytes to burst.", packet_size);
+
+			RESULT_CHECK(transmit(packet_buf, packet_size));
+
+			if(end_burst) {
+				LOG(LVL_DEBUG, "Ending burst on request.");
+				break;
+			}
+		}
+	}
+
+	return OK;
+}
+
+
 void conn_evt_cb(struct connection_ctx_s *conn, connection_evt_t evt, void *user_ctx)
 {
 	(void)conn;
@@ -151,14 +216,6 @@ int main(void)
 		return 1;
 	}
 
-	ham64_t my_address, peer_address;
-	ham64_encode(MY_CALL, &my_address);
-	ham64_encode(PEER_CALL, &peer_address);
-	RESULT_CHECK(connection_init(&l2conn, &my_address, &peer_address));
-
-	// force connection into the established state
-	l2conn.conn_state = CONN_STATE_ESTABLISHED;
-
 	// ** Set up signal handling
 
 	struct sigaction term_action = {0};
@@ -219,8 +276,14 @@ int main(void)
 	size_t total_bytes = 0;
 	uint64_t next_stats_print_time = old + HRTIME_MS(500);
 
-	// TODO: wait for beacon
-	// TODO: send connection request
+	ham64_t my_address, peer_address;
+	ham64_encode(MY_CALL, &my_address);
+	ham64_encode(PEER_CALL, &peer_address);
+	RESULT_CHECK(connection_init(&l2conn, &my_address, &peer_address));
+
+	connection_setup_outgoing(&l2conn);
+
+	RESULT_CHECK(connect_to_digipeater(&l2conn));
 
 	while(m_running) {
 		connection_evt_t evt;
@@ -256,7 +319,7 @@ int main(void)
 				LOG(LVL_DUMP, "TUN Flags: 0x%04x", *(uint16_t*)packetbuf);
 				LOG(LVL_DUMP, "TUN Proto: 0x%04x", *((uint16_t*)packetbuf + 1));
 
-				RESULT_CHECK(connection_enqueue_packet(&l2conn, packetbuf, ret));
+				RESULT_CHECK(connection_enqueue_data_packet(&l2conn, packetbuf, ret));
 			}
 		}