Browse Source

Add ETCP protocol implementation with window management and tests

- Implement Extended Transmission Control Protocol (etcp.c/h) for reliable
  UDP transmission with packet ordering and loss recovery
- Add window management (congestion control) with dynamic window sizing
  based on RTT and bandwidth
- Implement ACK handling, retransmission logic, and gap detection
- Add comprehensive test suite: unit tests, simple test, and stress test
  with network emulation (packet loss, reordering, delay)
- Include mock u_async implementation for testing
- Update Makefile with new ETCP build targets
v2_dev
jek 3 months ago
parent
commit
76c7c56de6
  1. 19
      Makefile
  2. 776
      etcp.c
  3. 164
      etcp.h
  4. 55
      etcp.txt
  5. 273
      etcp_plan.txt
  6. 145
      tests/simple_uasync.c
  7. 19
      tests/simple_uasync.h
  8. 274
      tests/test_etcp.c
  9. 116
      tests/test_etcp_simple.c
  10. 419
      tests/test_etcp_stress.c

19
Makefile

@ -19,10 +19,20 @@ SC_LIB_OBJS := sc_lib.o
UASYNC_OBJS := u_async.o
PN_OBJS := pkt_normalizer.o settings.o
LL_QUEUE_OBJS := ll_queue.o
ETCP_OBJS := etcp.o
all: $(TEST_DIR)/test_ecc_encrypt $(TEST_DIR)/test_sc_lib $(TEST_DIR)/test_udp_secure $(TEST_DIR)/test_pkt_normalizer
all: $(TEST_DIR)/test_ecc_encrypt $(TEST_DIR)/test_sc_lib $(TEST_DIR)/test_udp_secure $(TEST_DIR)/test_pkt_normalizer $(TEST_DIR)/test_etcp $(TEST_DIR)/test_etcp_stress $(TEST_DIR)/test_etcp_simple
$(TEST_DIR)/test_ecc_encrypt: $(TEST_DIR)/test_ecc_encrypt.o $(TINYCRYPT_OBJS)
$(TEST_DIR)/test_pkt_normalizer: $(TEST_DIR)/test_pkt_normalizer.o $(PN_OBJS) $(LL_QUEUE_OBJS) $(UASYNC_OBJS)
$(CC) $(CFLAGS) $(INCLUDES) -o $@ $^
$(TEST_DIR)/test_etcp: $(TEST_DIR)/test_etcp.o $(ETCP_OBJS) $(LL_QUEUE_OBJS) $(UASYNC_OBJS)
$(CC) $(CFLAGS) $(INCLUDES) -o $@ $^
$(TEST_DIR)/test_etcp_stress: $(TEST_DIR)/test_etcp_stress.o $(ETCP_OBJS) $(LL_QUEUE_OBJS) $(TEST_DIR)/simple_uasync.o
$(CC) $(CFLAGS) $(INCLUDES) -o $@ $^
$(TEST_DIR)/test_etcp_simple: $(TEST_DIR)/test_etcp_simple.o $(ETCP_OBJS) $(LL_QUEUE_OBJS) $(TEST_DIR)/simple_uasync.o
$(CC) $(CFLAGS) $(INCLUDES) -o $@ $^
$(TEST_DIR)/test_sc_lib: $(TEST_DIR)/test_sc_lib.o $(SC_LIB_OBJS) $(TINYCRYPT_OBJS)
@ -34,11 +44,14 @@ $(TEST_DIR)/test_udp_secure: $(TEST_DIR)/test_udp_secure.o $(SC_LIB_OBJS) $(UASY
$(TEST_DIR)/test_pkt_normalizer: $(TEST_DIR)/test_pkt_normalizer.o $(PN_OBJS) $(LL_QUEUE_OBJS) $(UASYNC_OBJS)
$(CC) $(CFLAGS) $(INCLUDES) -o $@ $^
$(TEST_DIR)/test_etcp: $(TEST_DIR)/test_etcp.o $(ETCP_OBJS) $(LL_QUEUE_OBJS) $(UASYNC_OBJS)
$(CC) $(CFLAGS) $(INCLUDES) -o $@ $^
%.o: %.c
$(CC) $(CFLAGS) $(INCLUDES) -c $< -o $@
clean:
rm -f $(TEST_DIR)/test_ecc_encrypt $(TEST_DIR)/test_sc_lib $(TEST_DIR)/test_udp_secure $(TEST_DIR)/test_pkt_normalizer \
rm -f $(TEST_DIR)/test_ecc_encrypt $(TEST_DIR)/test_sc_lib $(TEST_DIR)/test_udp_secure $(TEST_DIR)/test_pkt_normalizer $(TEST_DIR)/test_etcp $(TEST_DIR)/test_etcp_stress $(TEST_DIR)/test_etcp_simple \
*.o tinycrypt/lib/source/*.o $(TEST_DIR)/*.o
.PHONY: all clean

776
etcp.c

@ -0,0 +1,776 @@
// etcp.c - Extended Transmission Control Protocol
#include "etcp.h"
#include "u_async.h"
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
// Internal structures
typedef struct rx_packet {
struct rx_packet* next;
uint16_t id;
uint16_t timestamp;
uint8_t* data;
uint16_t data_len;
uint8_t has_payload;
} rx_packet_t;
typedef struct sent_packet {
struct sent_packet* next;
uint16_t id;
uint16_t timestamp;
uint8_t* data;
uint16_t data_len; // Total packet length
uint16_t payload_len; // Payload length (for window accounting)
uint16_t send_time;
uint8_t need_ack;
} sent_packet_t;
// Forward declarations of internal functions
static void tx_process(epkt_t* epkt);
static void retransmit_check(epkt_t* epkt);
static void update_metrics(epkt_t* epkt, uint16_t rtt);
static uint16_t get_current_timestamp(void);
static uint16_t timestamp_diff(uint16_t t1, uint16_t t2);
static int id_compare(uint16_t id1, uint16_t id2);
// Timer callbacks
static void tx_timer_callback(void* arg);
static void retransmit_timer_callback(void* arg);
// Internal queue callbacks
static void tx_queue_callback(ll_queue_t* q, ll_entry_t* entry, void* arg);
static void schedule_ack_timer(epkt_t* epkt);
static void request_retransmission_for_gaps(epkt_t* epkt);
// Initialize new ETCP instance
epkt_t* etcp_init(void) {
epkt_t* epkt = calloc(1, sizeof(epkt_t));
if (!epkt) return NULL;
// Create queues
epkt->tx_queue = queue_new();
epkt->output_queue = queue_new();
if (!epkt->tx_queue || !epkt->output_queue) {
if (epkt->tx_queue) queue_free(epkt->tx_queue);
if (epkt->output_queue) queue_free(epkt->output_queue);
free(epkt);
return NULL;
}
// Set callback for tx queue
queue_set_callback(epkt->tx_queue, tx_queue_callback, epkt);
// Initialize state
epkt->bandwidth = 10000; // Default: 10000 bytes per timebase (0.1us)
epkt->last_sent_timestamp = get_current_timestamp();
epkt->bytes_allowed = 0;
etcp_update_window(epkt); // Initialize window size and retrans timer
// Initialize lists
epkt->rx_list = NULL;
epkt->sent_list = NULL;
// Initialize metrics
epkt->rtt_last = 0;
epkt->rtt_avg_10 = 0;
epkt->rtt_avg_100 = 0;
epkt->jitter = 0;
epkt->bytes_sent_total = 0;
// Initialize IDs
epkt->next_tx_id = 1;
epkt->last_rx_id = 0;
epkt->last_delivered_id = 0;
// Initialize history
epkt->rtt_history_idx = 0;
epkt->rtt_history_count = 0;
// Initialize pending arrays
epkt->pending_ack_count = 0;
epkt->pending_retransmit_count = 0;
// Initialize window management
epkt->unacked_bytes = 0;
epkt->window_size = 0;
epkt->last_acked_id = 0;
epkt->last_rx_ack_id = 0;
epkt->retrans_timer_period = 20; // Default 2ms (20 timebase units)
epkt->next_retrans_time = 0;
epkt->window_blocked = 0;
// No timers yet
epkt->next_tx_timer = NULL;
epkt->retransmit_timer = NULL;
return epkt;
}
// Free ETCP instance
void etcp_free(epkt_t* epkt) {
if (!epkt) return;
// Cancel timers
if (epkt->next_tx_timer) {
uasync_cancel_timeout(epkt->next_tx_timer);
}
if (epkt->retransmit_timer) {
uasync_cancel_timeout(epkt->retransmit_timer);
}
// Free queues
if (epkt->tx_queue) queue_free(epkt->tx_queue);
if (epkt->output_queue) queue_free(epkt->output_queue);
// Free rx_list
rx_packet_t* rx = epkt->rx_list;
while (rx) {
rx_packet_t* next = rx->next;
if (rx->data) free(rx->data);
free(rx);
rx = next;
}
// Free sent_list
sent_packet_t* sent = epkt->sent_list;
while (sent) {
sent_packet_t* next = sent->next;
if (sent->data) free(sent->data);
free(sent);
sent = next;
}
free(epkt);
}
// Set callback for sending packets
void etcp_set_callback(epkt_t* epkt, etcp_tx_callback_t cb, void* arg) {
if (!epkt) return;
epkt->tx_callback = cb;
epkt->tx_callback_arg = arg;
}
// Get output queue
ll_queue_t* etcp_get_output_queue(epkt_t* epkt) {
return epkt ? epkt->output_queue : NULL;
}
// Set bandwidth
void etcp_set_bandwidth(epkt_t* epkt, uint16_t bandwidth) {
if (!epkt) return;
epkt->bandwidth = bandwidth;
etcp_update_window(epkt);
}
// Update window size based on current RTT and bandwidth
void etcp_update_window(epkt_t* epkt) {
if (!epkt) return;
uint16_t rtt = epkt->rtt_avg_10;
if (rtt == 0) {
epkt->window_size = (uint32_t)-1; // Unlimited window until RTT measured
// Keep default retrans_timer_period (20 = 2ms)
return;
}
// window = RTT * bandwidth * 2
// RTT in timebase (0.1ms), bandwidth in bytes per timebase
// Multiply using 32-bit to avoid overflow
uint32_t rtt32 = rtt;
uint32_t bw32 = epkt->bandwidth;
epkt->window_size = rtt32 * bw32 * 2;
// Update retransmission timer period: max(RTT/2, 2ms)
uint16_t rtt_half = rtt / 2;
if (rtt_half < 20) { // 2ms = 20 timebase units
rtt_half = 20;
}
epkt->retrans_timer_period = rtt_half;
}
// Get RTT
uint16_t etcp_get_rtt(epkt_t* epkt) {
return epkt ? epkt->rtt_last : 0;
}
// Get jitter
uint16_t etcp_get_jitter(epkt_t* epkt) {
return epkt ? epkt->jitter : 0;
}
// Put data into transmission queue
int etcp_tx_put(epkt_t* epkt, uint8_t* data, uint16_t len) {
if (!epkt || !data || len == 0) return -1;
// Create queue entry
ll_entry_t* entry = queue_entry_new(len);
if (!entry) return -1;
// Copy data
memcpy(ll_entry_data(entry), data, len);
// Add to queue
int result = queue_entry_put(epkt->tx_queue, entry);
if (result != 0) {
queue_entry_free(entry);
}
return result;
}
// Get total number of packets in transmission queues
int etcp_tx_queue_size(epkt_t* epkt) {
if (!epkt) return 0;
int count = queue_entry_count(epkt->tx_queue);
// Add packets in sent_list waiting for ACK
sent_packet_t* sent = epkt->sent_list;
while (sent) {
if (sent->need_ack) {
count++;
}
sent = sent->next;
}
return count;
}
// ==================== Internal Functions ====================
// Get current timestamp (0.1us timebase)
static uint16_t get_current_timestamp(void) {
// TODO: Implement using u_async time or system time
// For now, return increasing counter
static uint16_t counter = 0;
return counter++;
}
// Calculate positive difference between timestamps (considering wrap-around)
static uint16_t timestamp_diff(uint16_t t1, uint16_t t2) {
return (t1 - t2) & 0xFFFF; // modulo 65536
}
// Compare IDs considering wrap-around
static int id_compare(uint16_t id1, uint16_t id2) {
return (int16_t)(id1 - id2);
}
// Update metrics with new RTT measurement
static void update_metrics(epkt_t* epkt, uint16_t rtt) {
if (!epkt) return;
epkt->rtt_last = rtt;
// Update history
epkt->rtt_history[epkt->rtt_history_idx] = rtt;
epkt->rtt_history_idx = (epkt->rtt_history_idx + 1) % 100;
if (epkt->rtt_history_count < 100) {
epkt->rtt_history_count++;
}
// Calculate average of last 10
uint32_t sum10 = 0;
int count10 = (epkt->rtt_history_count < 10) ? epkt->rtt_history_count : 10;
for (int i = 0; i < count10; i++) {
int idx = (epkt->rtt_history_idx - 1 - i + 100) % 100;
sum10 += epkt->rtt_history[idx];
}
if (count10 > 0) {
epkt->rtt_avg_10 = sum10 / count10;
}
// Calculate average of last 100
uint32_t sum100 = 0;
int count100 = epkt->rtt_history_count;
for (int i = 0; i < count100; i++) {
sum100 += epkt->rtt_history[i];
}
if (count100 > 0) {
epkt->rtt_avg_100 = sum100 / count100;
}
// Update jitter
if (count10 > 0) {
int16_t diff = (int16_t)(epkt->rtt_avg_10 - rtt);
if (diff < 0) diff = -diff;
// jitter += (abs(rtt_avg_10 - rtt_last) - jitter) * 0.1
epkt->jitter += ((uint16_t)diff - epkt->jitter) / 10;
}
// Update window size based on new RTT
etcp_update_window(epkt);
}
// Process transmission queue
static void tx_process(epkt_t* epkt) {
if (!epkt || !epkt->tx_callback) return;
// Check bandwidth limit
uint16_t current_time = get_current_timestamp();
uint16_t delta = timestamp_diff(current_time, epkt->last_sent_timestamp);
if (delta > 0) {
epkt->bytes_allowed += delta * epkt->bandwidth;
epkt->last_sent_timestamp = current_time;
}
// If no bytes allowed, schedule timer
if (epkt->bytes_allowed == 0) {
// Schedule next attempt
if (!epkt->next_tx_timer) {
epkt->next_tx_timer = uasync_set_timeout(1, epkt, tx_timer_callback);
}
return;
}
// Get data from queue
ll_entry_t* entry = queue_entry_get(epkt->tx_queue);
int data_packet = 1; // 1 if packet contains payload, 0 if metrics only
uint16_t data_len = 0;
if (!entry) {
// No data to send, but we might need to send metrics
if (epkt->pending_ack_count == 0 && epkt->pending_retransmit_count == 0) {
return; // Nothing to send at all
}
data_packet = 0;
data_len = 0;
} else {
data_len = ll_entry_size(entry);
}
// Check window size for data packets
if (data_packet && epkt->window_size != (uint32_t)-1) {
if (epkt->unacked_bytes + data_len > epkt->window_size) {
// Window full, block transmission
epkt->window_blocked = 1;
// DEBUG
// printf("Window blocked: unacked=%u, data_len=%u, window=%u\n",
// epkt->unacked_bytes, data_len, epkt->window_size);
// Put entry back to queue
queue_entry_put_first(epkt->tx_queue, entry);
// Schedule check when window might open (after retransmission timer)
if (!epkt->next_tx_timer) {
epkt->next_tx_timer = uasync_set_timeout(epkt->retrans_timer_period, epkt, tx_timer_callback);
}
return;
}
}
// Calculate packet size (header + data + metrics)
uint16_t packet_size = 4; // id+timestamp
if (data_packet) {
packet_size += 1 + data_len; // hdr=0 + data
}
// Add space for ACKs
if (epkt->pending_ack_count > 0) {
packet_size += 1 + epkt->pending_ack_count * 4; // hdr=0x01 + ids+timestamps
}
// Add space for retransmission requests
if (epkt->pending_retransmit_count > 0) {
uint8_t count = epkt->pending_retransmit_count;
if (count > 32) count = 32;
packet_size += 1 + count * 2 + 2; // hdr + IDs + last delivered ID
}
// Check if we have enough bandwidth
if (epkt->bytes_allowed < packet_size) {
// Not enough bandwidth
if (data_packet) {
// Put entry back and schedule timer
queue_entry_put_first(epkt->tx_queue, entry);
}
if (!epkt->next_tx_timer) {
uint16_t wait_time = (packet_size - epkt->bytes_allowed) / epkt->bandwidth;
if (wait_time < 1) wait_time = 1;
epkt->next_tx_timer = uasync_set_timeout(wait_time, epkt, tx_timer_callback);
}
return;
}
// Allocate packet buffer
uint8_t* packet = malloc(packet_size);
if (!packet) {
queue_entry_free(entry);
return;
}
// Build packet
uint8_t* ptr = packet;
// ID and timestamp
uint16_t id;
if (data_packet) {
id = epkt->next_tx_id++;
} else {
id = 0; // metrics-only packet
}
uint16_t timestamp = current_time;
*ptr++ = id >> 8;
*ptr++ = id & 0xFF;
*ptr++ = timestamp >> 8;
*ptr++ = timestamp & 0xFF;
// Add ACKs if any
if (epkt->pending_ack_count > 0) {
*ptr++ = 0x01; // hdr for timestamp report
for (int i = 0; i < epkt->pending_ack_count; i++) {
*ptr++ = epkt->pending_ack_ids[i] >> 8;
*ptr++ = epkt->pending_ack_ids[i] & 0xFF;
*ptr++ = epkt->pending_ack_timestamps[i] >> 8;
*ptr++ = epkt->pending_ack_timestamps[i] & 0xFF;
}
epkt->pending_ack_count = 0;
}
// Add retransmission requests if any
if (epkt->pending_retransmit_count > 0) {
uint8_t count = epkt->pending_retransmit_count;
if (count > 32) count = 32;
*ptr++ = 0x10 + (count - 1); // hdr with count
for (int i = 0; i < count; i++) {
*ptr++ = epkt->pending_retransmit_ids[i] >> 8;
*ptr++ = epkt->pending_retransmit_ids[i] & 0xFF;
}
// Add last delivered ID
*ptr++ = epkt->last_delivered_id >> 8;
*ptr++ = epkt->last_delivered_id & 0xFF;
epkt->pending_retransmit_count = 0;
}
// Add payload if present
if (data_packet) {
*ptr++ = 0x00; // hdr=0 for payload
memcpy(ptr, ll_entry_data(entry), data_len);
ptr += data_len;
}
// Send packet
epkt->tx_callback(epkt, packet, packet_size, epkt->tx_callback_arg);
// Update bandwidth accounting
epkt->bytes_allowed -= packet_size;
epkt->bytes_sent_total += packet_size;
if (data_packet) {
// Store in sent list for possible retransmission
sent_packet_t* sent = malloc(sizeof(sent_packet_t));
if (sent) {
sent->id = id;
sent->timestamp = timestamp;
sent->data = packet; // Keep the packet for retransmission
sent->data_len = packet_size;
sent->send_time = current_time;
sent->need_ack = 1;
sent->payload_len = data_len;
// Update unacked bytes
epkt->unacked_bytes += data_len;
// Add to list
sent->next = epkt->sent_list;
epkt->sent_list = sent;
} else {
free(packet);
}
// Free queue entry
queue_entry_free(entry);
} else {
// Metrics-only packet, free packet buffer
free(packet);
}
// Schedule retransmit check if not already scheduled
if (!epkt->retransmit_timer && epkt->retrans_timer_period > 0) {
epkt->retransmit_timer = uasync_set_timeout(epkt->retrans_timer_period, epkt, retransmit_timer_callback);
}
// Resume queue callback for next packet
queue_resume_callback(epkt->tx_queue);
}
// Check for needed retransmissions
static void retransmit_check(epkt_t* epkt) {
if (!epkt) return;
uint16_t current_time = get_current_timestamp();
// Threshold = RTT * 1.5
uint16_t threshold = epkt->rtt_avg_10 + epkt->rtt_avg_10 / 2;
sent_packet_t* sent = epkt->sent_list;
while (sent) {
uint16_t age = timestamp_diff(current_time, sent->send_time);
if (age > threshold && sent->need_ack) {
// Add to retransmit queue
if (epkt->pending_retransmit_count < 32) {
epkt->pending_retransmit_ids[epkt->pending_retransmit_count++] = sent->id;
}
}
sent = sent->next;
}
// Reschedule check with updated period
if (epkt->retrans_timer_period > 0) {
epkt->retransmit_timer = uasync_set_timeout(epkt->retrans_timer_period, epkt, retransmit_timer_callback);
} else {
epkt->retransmit_timer = NULL;
}
}
// Process received packet
int etcp_rx_input(epkt_t* epkt, uint8_t* pkt, uint16_t len) {
if (!epkt || !pkt || len < 4) return -1;
// Parse header
uint8_t* ptr = pkt;
uint16_t id = (ptr[0] << 8) | ptr[1];
uint16_t timestamp = (ptr[2] << 8) | ptr[3];
ptr += 4;
len -= 4;
// Track last received ID
if (id_compare(id, epkt->last_rx_id) > 0) {
epkt->last_rx_id = id;
}
// Process headers
uint8_t has_payload = 0;
uint8_t* payload = NULL;
uint16_t payload_len = 0;
while (len > 0) {
uint8_t hdr = *ptr++;
len--;
if (hdr == 0x00) {
// Payload
has_payload = 1;
payload = ptr;
payload_len = len;
break; // Payload is the rest of the packet
} else if (hdr == 0x01) {
// Timestamp report
if (len >= 4) {
uint16_t ack_id = (ptr[0] << 8) | ptr[1];
uint16_t ack_timestamp = (ptr[2] << 8) | ptr[3];
ptr += 4;
len -= 4;
// Calculate RTT
uint16_t current_time = get_current_timestamp();
uint16_t rtt_raw = timestamp_diff(current_time, ack_timestamp);
if (rtt_raw > 0) {
update_metrics(epkt, rtt_raw);
}
// Remove acknowledged packet from sent_list
sent_packet_t* sent = epkt->sent_list;
sent_packet_t* prev = NULL;
while (sent) {
if (sent->id == ack_id) {
if (prev) {
prev->next = sent->next;
} else {
epkt->sent_list = sent->next;
}
// Update unacked bytes
epkt->unacked_bytes -= sent->payload_len;
// DEBUG
// printf("ACK received for id=%u, unacked_bytes now=%u, payload_len=%u\n",
// ack_id, epkt->unacked_bytes, sent->payload_len);
// Update last acknowledged ID
if (id_compare(ack_id, epkt->last_acked_id) > 0) {
epkt->last_acked_id = ack_id;
}
// Update last received ACK ID
if (id_compare(ack_id, epkt->last_rx_ack_id) > 0) {
epkt->last_rx_ack_id = ack_id;
}
// Window may have opened
epkt->window_blocked = 0;
free(sent->data);
free(sent);
break;
}
prev = sent;
sent = sent->next;
}
}
} else if (hdr >= 0x10 && hdr <= 0x2F) {
// Retransmission request
uint8_t count = (hdr & 0x0F) + 1;
if (len >= count * 2 + 2) {
// Read IDs to retransmit
for (int i = 0; i < count; i++) {
uint16_t retransmit_id = (ptr[0] << 8) | ptr[1];
ptr += 2;
len -= 2;
// Add to retransmit queue
if (epkt->pending_retransmit_count < 32) {
epkt->pending_retransmit_ids[epkt->pending_retransmit_count++] = retransmit_id;
}
}
// Read last delivered ID
uint16_t last_delivered = (ptr[0] << 8) | ptr[1];
ptr += 2;
len -= 2;
// Update our last delivered if newer
if (id_compare(last_delivered, epkt->last_delivered_id) > 0) {
epkt->last_delivered_id = last_delivered;
}
// Also update last_rx_ack_id (latest known received packet)
if (id_compare(last_delivered, epkt->last_rx_ack_id) > 0) {
epkt->last_rx_ack_id = last_delivered;
}
}
}
// Unknown hdr - skip?
}
// Add to rx_list if has payload
if (has_payload && payload_len > 0) {
// Check for duplicate
rx_packet_t* current = epkt->rx_list;
rx_packet_t* prev = NULL;
while (current) {
int cmp = id_compare(current->id, id);
if (cmp == 0) {
// Duplicate, ignore
return 0;
}
if (cmp > 0) {
// Found insertion point
break;
}
prev = current;
current = current->next;
}
// Create new rx_packet
rx_packet_t* new_pkt = malloc(sizeof(rx_packet_t));
if (!new_pkt) return -1;
new_pkt->id = id;
new_pkt->timestamp = timestamp;
new_pkt->data_len = payload_len;
new_pkt->data = malloc(payload_len);
if (!new_pkt->data) {
free(new_pkt);
return -1;
}
memcpy(new_pkt->data, payload, payload_len);
new_pkt->has_payload = 1;
// Insert into sorted list
new_pkt->next = current;
if (prev) {
prev->next = new_pkt;
} else {
epkt->rx_list = new_pkt;
}
// Add to pending ACKs
if (epkt->pending_ack_count < 32) {
epkt->pending_ack_ids[epkt->pending_ack_count] = id;
epkt->pending_ack_timestamps[epkt->pending_ack_count] = timestamp;
epkt->pending_ack_count++;
schedule_ack_timer(epkt);
}
// Check for gaps and request retransmission
request_retransmission_for_gaps(epkt);
// Move continuous sequence to output queue
uint16_t next_expected = epkt->last_delivered_id + 1;
rx_packet_t* rx = epkt->rx_list;
while (rx && rx->id == next_expected) {
// DEBUG
// printf("Delivering packet id=%u to output queue\n", rx->id);
// Add to output queue
ll_entry_t* entry = queue_entry_new(rx->data_len);
if (entry) {
memcpy(ll_entry_data(entry), rx->data, rx->data_len);
queue_entry_put(epkt->output_queue, entry);
}
// Update last delivered
epkt->last_delivered_id = next_expected;
next_expected++;
// Remove from rx_list
epkt->rx_list = rx->next;
free(rx->data);
free(rx);
rx = epkt->rx_list;
}
}
return 0;
}
// ==================== Timer Callbacks ====================
static void tx_timer_callback(void* arg) {
epkt_t* epkt = (epkt_t*)arg;
if (!epkt) return;
epkt->next_tx_timer = NULL;
tx_process(epkt);
}
static void retransmit_timer_callback(void* arg) {
epkt_t* epkt = (epkt_t*)arg;
if (!epkt) return;
epkt->retransmit_timer = NULL;
retransmit_check(epkt);
}
static void request_retransmission_for_gaps(epkt_t* epkt) {
if (!epkt || !epkt->rx_list) return;
// Find gaps in rx_list
uint16_t expected = epkt->last_delivered_id + 1;
rx_packet_t* current = epkt->rx_list;
while (current) {
if (id_compare(current->id, expected) > 0) {
// Gap detected: missing packets between expected and current->id
uint16_t missing = expected;
while (id_compare(missing, current->id) < 0) {
if (epkt->pending_retransmit_count < 32) {
epkt->pending_retransmit_ids[epkt->pending_retransmit_count++] = missing;
}
missing++;
}
}
expected = current->id + 1;
current = current->next;
}
}
static void schedule_ack_timer(epkt_t* epkt) {
if (!epkt) return;
// If we have pending ACKs or retransmission requests, try to send them
if ((epkt->pending_ack_count > 0 || epkt->pending_retransmit_count > 0) && !epkt->next_tx_timer) {
// Try to send immediately (if bandwidth allows)
tx_process(epkt);
}
}
// ==================== Queue Callbacks ====================
static void tx_queue_callback(ll_queue_t* q, ll_entry_t* entry, void* arg) {
(void)q;
(void)entry;
epkt_t* epkt = (epkt_t*)arg;
if (!epkt) return;
// Start transmission process
tx_process(epkt);
}

164
etcp.h

@ -0,0 +1,164 @@
// etcp.h - Extended Transmission Control Protocol
#ifndef ETCP_H
#define ETCP_H
#include <stdint.h>
#include <stddef.h>
#include "ll_queue.h"
#ifdef __cplusplus
extern "C" {
#endif
// Forward declarations
typedef struct epkt epkt_t;
// Callback type for sending packets via UDP
typedef void (*etcp_tx_callback_t)(epkt_t* epkt, uint8_t* pkt, uint16_t len, void* arg);
// Main ETCP structure
struct epkt {
// Queues
ll_queue_t* tx_queue; // Queue of data to send
ll_queue_t* output_queue; // Output queue (reassembled data)
// Received packets sorted linked list
struct rx_packet* rx_list;
// Sent packets (for retransmission)
struct sent_packet* sent_list;
// Metrics
uint16_t rtt_last; // Last RTT (timebase 0.1us)
uint16_t rtt_avg_10; // Average RTT last 10 packets
uint16_t rtt_avg_100; // Average RTT last 100 packets
uint16_t jitter; // Jitter (averaged)
uint16_t bandwidth; // Current bandwidth (bytes per timebase)
uint32_t bytes_sent_total; // Total bytes sent
uint16_t last_sent_timestamp; // Timestamp of last sent packet
uint32_t bytes_allowed; // Calculated bytes allowed to send
// State
uint16_t next_tx_id; // Next ID for transmission
uint16_t last_rx_id; // Last received ID (for ACK)
uint16_t last_delivered_id; // Last delivered to output_queue ID
// Timers
void* next_tx_timer; // Timer for next transmission
void* retransmit_timer; // Timer for retransmissions
// Callback
etcp_tx_callback_t tx_callback;
void* tx_callback_arg;
// RTT history for averaging
uint16_t rtt_history[100];
uint8_t rtt_history_idx;
uint8_t rtt_history_count;
// Pending ACKs
uint16_t pending_ack_ids[32];
uint16_t pending_ack_timestamps[32];
uint8_t pending_ack_count;
// Pending retransmission requests
uint16_t pending_retransmit_ids[32];
uint8_t pending_retransmit_count;
// Window management
uint32_t unacked_bytes; // Number of bytes sent but not yet acknowledged
uint32_t window_size; // Current window size in bytes (calculated)
uint16_t last_acked_id; // Last acknowledged packet ID
uint16_t last_rx_ack_id; // Latest received ACK ID from receiver
uint16_t retrans_timer_period; // Current retransmission timer period (timebase)
uint16_t next_retrans_time; // Time of next retransmission check
uint8_t window_blocked; // Flag: transmission blocked by window limit
};
// API Functions
/**
* @brief Initialize new ETCP instance
* @return Pointer to new instance or NULL on error
*/
epkt_t* etcp_init(void);
/**
* @brief Free ETCP instance and all associated resources
* @param epkt Instance to free
*/
void etcp_free(epkt_t* epkt);
/**
* @brief Set callback for sending packets via UDP
* @param epkt ETCP instance
* @param cb Callback function
* @param arg User argument passed to callback
*/
void etcp_set_callback(epkt_t* epkt, etcp_tx_callback_t cb, void* arg);
/**
* @brief Process received UDP packet
* @param epkt ETCP instance
* @param pkt Packet data
* @param len Packet length
* @return 0 on success, -1 on error
*/
int etcp_rx_input(epkt_t* epkt, uint8_t* pkt, uint16_t len);
/**
* @brief Get total number of packets waiting in transmission queues
* @param epkt ETCP instance
* @return Number of packets
*/
int etcp_tx_queue_size(epkt_t* epkt);
/**
* @brief Put data into transmission queue
* @param epkt ETCP instance
* @param data Data to send
* @param len Data length
* @return 0 on success, -1 on error
*/
int etcp_tx_put(epkt_t* epkt, uint8_t* data, uint16_t len);
/**
* @brief Get output queue for reading received data
* @param epkt ETCP instance
* @return Pointer to output queue (ll_queue_t*)
*/
ll_queue_t* etcp_get_output_queue(epkt_t* epkt);
/**
* @brief Set bandwidth limit
* @param epkt ETCP instance
* @param bandwidth Bytes per timebase (0.1us)
*/
void etcp_set_bandwidth(epkt_t* epkt, uint16_t bandwidth);
/**
* @brief Update window size based on current RTT and bandwidth
* @param epkt ETCP instance
* Window size = RTT * bandwidth * 2 (bytes in flight)
*/
void etcp_update_window(epkt_t* epkt);
/**
* @brief Get current RTT
* @param epkt ETCP instance
* @return RTT in timebase units
*/
uint16_t etcp_get_rtt(epkt_t* epkt);
/**
* @brief Get current jitter
* @param epkt ETCP instance
* @return Jitter in timebase units
*/
uint16_t etcp_get_jitter(epkt_t* epkt);
#ifdef __cplusplus
}
#endif
#endif // ETCP_H

55
etcp.txt

@ -0,0 +1,55 @@
etcp - extended transmission control protocol
Протокол для передачи-приёма, пободный TCP, реализованый отдельным модулем (etcp.c/h).
Задача протокола:
- передать пакеты через UDP (учитывая его особенности - потери, негарантированный порядок), восстанавливая порядок и потери.
- пакеты уже предварительно подогнаны под размер чтобы вмещались доп. заголовки и служебные фреймы.
-
На приёмной стороне создаются две очереди.
первая - сортированный linked-list - в нее добавляются принятые пакеты, но отсеиваются дубликаты и вставляются в нужное место.
и перемещаются в выходную очередь когда все нужные пакеты дошли.
при получении принятого пакета он сразу парсится, для всех hdr!=0 вызываем static upd_metric_for_transmitter(epkt*, buf*, size) - и он разгребает метрики, обновляя:
- rtt_last (roud-trip delay) для последнего пакета
- rtt average last 10
- rtt average last 100
- jitter как усредненное: jitter+=(abs(rtt_last_10-rtt_last)-jitter)*0.1f
- для retransmission request - если timestamp последней попытки передачи этого пакета больше rtt_last_10*1.2+jitter*2 то отправляем сейчас
при отправке также ограничиваем полосу пропускания: суммируем сколько байт отправлено всего (uint32_t), обновляем timestamp и расчетное число байт которое может быть отправлено на момент этого timestamp. и формируем таймер для отправки следующего пакета.
вторая - ll_queue - выходная осчередь с пакетами в строгом порядке (строгий инкремент по id).
struct epkt* = etcp_init() - инициализирует новый instance и выделяет под него память
etcp_free(struct epkt*)
etcp_rx_input(struct epkt*,uint8_t* pkt, uint16_t len) - принятый пакет на обработку
rx_output - через механизм ll_queue, функция не нужна.
новый пакет на передачу отправляется в очередь передачи ll_queue. используй callback для обработки очереди.
для передачи сформированных пакетов по udp:
etcp_set_callback(epkt*, &cbk) -> etcp_tx_output(struct epkt*,uint8_t* pkt, uint16_t len) - callback в управляющей структуре (отправка пакета в udp сокет)
внутренняя структура передачи:
при готовности отправить очередной пакет из очереди ll_queue пакеты перемещаются в linked_list (как отправленные но неподтвержденные), и освобождаются при получении подтверждения (ack).
int etcp_tx_queue_size(epkt*) - должна быть функция которая возвращает общее кол-во пакетов на передачу в очередях (входящей и рабочей)
Формат udp пакета:
<id> <timestamp> [<hdr> metrics] <hdr=0> <payload>
id - uint16_t циклический порядковый номер пакета (при передаче следующего пакета инкрементируется). при ретрансмиссии передается с этим же id и payload, но с обновленными остальными полями
timestamp - uint16_t текущий timestamp (циклическое, 16 бит, timebase = 0.1mS)
hdr - 1 байт:
payload - передаваемые полезные данные
metrics - опциональное поле для передачи служебных фреймов (ack, retransmission request, statistic reply)
hdr:
0x00 - hdr для payload (от следующего байта до конца пакета)
0x01 - hdr для отчета о timestamp (время приёма) принятого пакета, 4 байта: <id> <timestamp> - передаётся при очередной передаче пакета, для формирования статичтики на приёмной стороне. отчеты передаются для всех новых принятых пакетов с момента последней передачи. т.е. накапливаем timestamp-ы и передаём их. если пакет потерялся - не страшно.
0x10-0x2f - hdr для перезапроса пакетов (передачу каких пакетов надо повторить. значение определяет количество записей (номеров пакетов) от 1 до 32, если больше - 32 самых старых), далее по 2 байта идут ID пакетов. и в конце - 2 байта номер последнего пакета который ушел в выходную очередь (т.е. последний номер для успешно собранной цепочки)
если что-то еще надо можно добавить.
Если данных нет (очередь на передачу пустая) и нужно передать только метрику, то передаётся пакет с id=0 и без <hdr=0> <payload>. на приёмной стороне он определяется по отсутствию записи с hdr=0

273
etcp_plan.txt

@ -0,0 +1,273 @@
# План реализации ETCP (Extended Transmission Control Protocol)
## Обзор
Протокол поверх UDP с восстановлением порядка, повторными передачами, метриками RTT/jitter и ограничением полосы. Использует две очереди: сортированный linked-list для принятых пакетов и ll_queue для выходных данных.
## Структуры данных
### struct epkt (etcp.h)
```c
typedef struct epkt epkt_t;
struct epkt {
// Очереди
ll_queue_t* tx_queue; // Очередь пакетов на передачу
ll_queue_t* output_queue; // Выходная очередь (собранные данные)
struct rx_packet* rx_list; // Сортированный linked-list принятых пакетов
// Метрики
uint16_t rtt_last; // Последний RTT (timebase 0.1ms)
uint16_t rtt_avg_10; // Среднее RTT за последние 10 пакетов
uint16_t rtt_avg_100; // Среднее RTT за последние 100 пакетов
uint16_t jitter; // Jitter (усреднённый)
uint16_t bandwidth; // Текущая полоса пропускания (байт/таймбазу)
uint32_t bytes_sent_total; // Всего отправлено байт
uint16_t last_sent_timestamp; // Timestamp последней отправки
uint32_t bytes_allowed; // Расчётное число байт, которое можно отправить
// Состояние
uint16_t next_tx_id; // Следующий ID для передачи
uint16_t last_rx_id; // Последний принятый ID (для ACK)
uint16_t last_delivered_id; // Последний доставленный в output_queue ID
// Таймеры
void* next_tx_timer; // Таймер следующей передачи
void* retransmit_timer; // Таймер повторных передач
// Callback'и
void (*tx_callback)(epkt_t*, uint8_t*, uint16_t); // Отправка в UDP
void* tx_callback_arg;
// Буферы для метрик
uint16_t rtt_history[100]; // История RTT для усреднения
uint8_t rtt_history_idx;
uint8_t rtt_history_count;
// Накопленные timestamp'ы для отчётов
uint16_t pending_ack_ids[32]; // ID пакетов, для которых нужно отправить ACK
uint16_t pending_ack_timestamps[32]; // Соответствующие timestamp'ы
uint8_t pending_ack_count;
};
```
### struct rx_packet (внутренняя)
```c
struct rx_packet {
struct rx_packet* next;
uint16_t id;
uint16_t timestamp;
uint8_t* data;
uint16_t data_len;
// uint8_t has_payload; // 1 если содержит payload (hdr=0) - пакеты без payload сюда не попадаютю просто парсим сразу метрици и всё.
};
```
## Формат пакета
```
<id:2> <timestamp:2> [<hdr:1> <metrics_data>]* <hdr=0> <payload>
```
### Заголовки (hdr):
- `0x00`: payload (данные начинаются со следующего байта)
- `0x01`: отчёт о timestamp принятого пакета (4 байта: id:2 + timestamp:2)
- `0x10`-`0x2F`: перезапрос пакетов + ACK:
- hdr & 0x0F = количество записей (1-32)
- Далее N*2 байт: ID пакетов для повторной передачи
- Последние 2 байта: номер последнего доставленного пакета (ACK)
### Пакет только с метриками:
Если очередь передачи пуста, отправляется пакет с id=0 и без hdr=0+payload.
## Алгоритмы
### 1. Инициализация
- Создать очереди tx_queue и output_queue через queue_new()
- Инициализировать метрики нулями
- Установить bandwidth по умолчанию (например, 100000 байт/таймбазу)
### 2. Приём пакетов (etcp_rx_input)
1. Парсинг:
- Читаем id, timestamp
- Пока есть данные, читаем hdr:
- hdr=0x00: запоминаем payload
- hdr=0x01: обрабатываем отчёт о timestamp (обновляем метрики)
- hdr=0x10-0x2F: обрабатываем перезапрос (добавляем ID в очередь повторной передачи)
2. Для пакетов с hdr!=0 вызываем upd_metric_for_transmitter:
- rtt_last = текущее время - timestamp
- Обновляем rtt_avg_10 и rtt_avg_100 (скользящее среднее)
- jitter += (abs(rtt_avg_10 - rtt_last) - jitter) * 0.1
3. Добавляем пакет в сортированный rx_list:
- Ищем позицию по id (сравнение через (int16_t)(id1-id2))
- Пропускаем дубликаты
- Вставляем в нужное место
4. Перемещаем непрерывную последовательность в output_queue:
- Начиная с last_delivered_id+1, проверяем наличие пакетов в rx_list
- При нахождении непрерывной цепочки перемещаем payload в output_queue
- Освобождаем rx_packet структуры
5. Накопление ACK:
- Для каждого принятого пакета сохраняем id и timestamp в pending_ack_*
- При следующей отправке включаем эти ACK в пакет
### 3. Передача пакетов
1. Очередь tx_queue содержит данные для отправки
2. Функция tx_process вызывается по таймеру или при добавлении в пустую очередь:
- Проверяем ограничение полосы:
- delta_time = текущее_время - last_sent_timestamp
- bytes_allowed += delta_time * bandwidth
- Если bytes_allowed < размер_пакета, планируем таймер и выходим
- Формируем пакет:
- Базовый заголовок: next_tx_id++, текущий timestamp
- Добавляем pending_ack (hdr=0x01 для каждого)
- Добавляем перезапросы если нужно (на основе метрик)
- Добавляем payload из tx_queue (hdr=0x00)
- Если payload нет и есть метрики - отправляем пакет с id=0
- Отправляем через tx_callback
- Обновляем bytes_sent_total, last_sent_timestamp, bytes_allowed
- Сохраняем пакет в список отправленных (для возможной ретрансмиссии)
3. Повторные передачи:
- Для каждого отправленного пакета отслеживаем время отправки
- Если (текущее_время - время_отправки) > rtt_avg_10*1.2 + jitter*2
- Добавляем ID в очередь повторной передачи
### 4. Ограничение полосы
- bandwidth: константа (байт/таймбазу), timebase = 0.1ms
- При инициализации: bytes_allowed = 0, last_sent_timestamp = текущее_время
- При отправке:
- current_time = uasync время
- delta = (int16_t)(current_time - last_sent_timestamp) (циклическое)
- bytes_allowed += delta * bandwidth
- Если bytes_allowed >= размер_пакета:
- bytes_allowed -= размер_пакета
- last_sent_timestamp = current_time
- Отправляем пакет
- Иначе:
- wait_time = (размер_пакета - bytes_allowed) / bandwidth
- Устанавливаем таймер на wait_time
## API функции
### Основные:
```c
epkt_t* etcp_init(void);
void etcp_free(epkt_t* epkt);
void etcp_set_callback(epkt_t* epkt, void (*cb)(epkt_t*, uint8_t*, uint16_t), void* arg);
int etcp_rx_input(epkt_t* epkt, uint8_t* pkt, uint16_t len);
int etcp_tx_queue_size(epkt_t* epkt);
```
### Вспомогательные:
```c
void etcp_set_bandwidth(epkt_t* epkt, uint16_t bandwidth);
uint16_t etcp_get_rtt(epkt_t* epkt);
uint16_t etcp_get_jitter(epkt_t* epkt);
ll_queue_t* etcp_get_output_queue(epkt_t* epkt); // Для извлечения данных
int etcp_tx_put(epkt_t* epkt, uint8_t* data, uint16_t len); // Добавить данные на передачу
```
## Интеграция с проектом
### Зависимости:
- `ll_queue.c/.h` - для очередей
- `u_async.h` - для таймеров
- `stdint.h`, `stdlib.h`, `string.h` - стандартные библиотеки
### Таймеры:
- Использовать `uasync_set_timeout` для:
- Планирования следующей передачи (при ограничении полосы)
- Повторных передач
- Очистки старых пакетов в rx_list
- Все callback'и должны быть быстрыми, не блокирующими
### Обработка циклических значений:
- ID: uint16_t, сравнение через `(int16_t)(a - b)`
- Timestamp: uint16_t, timebase 0.1us, сравнение аналогично
- При вычислении дельты времени учитывать переполнение
## План тестирования
1. **Unit-тесты для парсинга:**
- Корректность разбора различных hdr
- Обработка циклических ID
2. **Тесты очередей:**
- Сортировка в rx_list
- Перемещение в output_queue
- Обработка дубликатов
3. **Тесты метрик:**
- Расчет RTT (скользящее среднее)
- Расчет jitter
- Обновление bandwidth
4. **Интеграционный тест:**
- Два экземпляра ETCP, обмен данными через эмуляцию UDP
- Проверка восстановления порядка при потере пакетов
- Проверка ограничения полосы
5. **Тест производительности:**
- Минимальные задержки
- Корректность работы при высокой нагрузке
## Последовательность реализации
1. Создать etcp.h с определениями структур и API
2. Реализовать etcp.c в следующем порядке:
a) Базовая структура и init/free
b) Внутренние функции для работы с rx_list
c) Парсинг пакетов (etcp_rx_input)
d) Функции метрик (upd_metric_for_transmitter)
e) Механизм передачи (tx_process, ограничение полосы)
f) Повторные передачи
g) Интеграция с таймерами u_async
3. Создать тестовую программу test_etcp.c
4. Протестировать, исправить ошибки
5. Интегрировать в Makefile
## Риски и неопределённости
1. **Переполнение буферов:** Нужны лиматиры на размер rx_list и pending_ack
3. **Производительность:** Сортированный linked-list может быть медленным при большом количестве пакетов. Возможно, нужна оптимизация. - не должно быть много пакетов в списке перезапросов
4. **Интеграция с существующим кодом:** Проверить совместимость стиля кодирования и соглашений об именовании.
## Дополнительные вопросы
1. Нужны ли callback'и для событий (доставка данных, изменение метрик)? - нет, но нужна структура из которой эти метрики можно считывать.
2. Как обрабатывать очень старые пакеты в rx_list (таймаут)? - никак. надо сделать reset_connection - он обнуляет все очереди, метрики итд.
3. Как определять начальный bandwidth и адаптировать его? - пока константа. адаптировать позже.
## Статус реализации (13.01.2026)
### Реализовано:
1. Структуры epkt, rx_packet, sent_packet
2. API функции: etcp_init, etcp_free, etcp_set_callback, etcp_rx_input, etcp_tx_queue_size, etcp_tx_put, etcp_get_output_queue, etcp_set_bandwidth, etcp_get_rtt, etcp_get_jitter
3. Парсинг пакетов с поддержкой заголовков 0x00 (payload), 0x01 (timestamp report), 0x10-0x2F (retransmission request)
4. Сортированный rx_list с учётом циклических ID (сравнение через (int16_t)(id1-id2))
5. Перемещение непрерывной последовательности пакетов в output_queue
6. Ограничение полосы пропускания (bandwidth, bytes_allowed)
7. Отправка пакетов с данными и метриками, включая пакеты только с метриками (id=0)
8. Накопление ACK для полученных пакетов и отправка их в следующий пакет
9. Повторные передачи на основе RTT и jitter
10. Обновление метрик RTT (скользящее среднее за 10 и 100 пакетов) и jitter
11. Интеграция с u_async для таймеров передачи и проверки повторных отправок
12. Unit-тесты, покрывающие базовую функциональность (инициализация, передача, приём, реordering)
### Особенности реализации:
- Timebase: 0.1ms (совместимость с u_async)
- Bandwidth по умолчанию: 10000 байт/таймбазу
- Максимальное количество pending ACK: 32
- Максимальное количество pending retransmit: 32
- RTT история: 100 измерений
### Ограничения и упрощения:
- Отсутствует таймаут для старых пакетов в rx_list
- Отсутствует адаптация bandwidth
- get_current_timestamp использует статический счётчик (для тестов)
- Нет обработки reset_connection
- Нет защиты от переполнения буферов при очень большом количестве пакетов
### Планируемые улучшения:
1. Реализация reset_connection для сброса состояния
2. Использование системного времени для get_current_timestamp
3. Добавление таймаута для rx_list
4. Оптимизация производительности при большом количестве пакетов
5. Адаптация bandwidth на основе метрик

145
tests/simple_uasync.c

@ -0,0 +1,145 @@
// simple_uasync.c - Minimal uasync implementation for tests
#include "u_async.h"
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
// Timer entry
typedef struct timer_entry {
void* id;
uint32_t expiry_time;
timeout_callback_t callback;
void* user_arg;
struct timer_entry* next;
} timer_entry_t;
// Socket entry (not used in ETCP tests)
typedef struct socket_entry {
void* id;
int fd;
socket_callback_t read_cbk;
socket_callback_t write_cbk;
socket_callback_t except_cbk;
void* user_arg;
struct socket_entry* next;
} socket_entry_t;
// Global state
static timer_entry_t* timer_list = NULL;
static socket_entry_t* socket_list = NULL;
static uint32_t current_time = 0;
static void* next_id = (void*)1;
// Generate unique ID
static void* generate_id(void) {
void* id = next_id;
next_id = (void*)((uintptr_t)next_id + 1);
return id;
}
// Initialize - does nothing in mock
void uasync_init(void) {
current_time = 0;
}
// Mainloop - not used in tests
void uasync_mainloop(void) {
// Should not be called in tests
while (1) {}
}
// Set timeout
void* uasync_set_timeout(int timeout_tb, void* user_arg, timeout_callback_t callback) {
timer_entry_t* timer = malloc(sizeof(timer_entry_t));
if (!timer) return NULL;
timer->id = generate_id();
timer->expiry_time = current_time + (uint32_t)timeout_tb;
timer->callback = callback;
timer->user_arg = user_arg;
timer->next = timer_list;
timer_list = timer;
return timer->id;
}
// Cancel timeout
err_t uasync_cancel_timeout(void* t_id) {
timer_entry_t** pp = &timer_list;
while (*pp) {
if ((*pp)->id == t_id) {
timer_entry_t* to_free = *pp;
*pp = to_free->next;
free(to_free);
return ERR_OK;
}
pp = &(*pp)->next;
}
return ERR_FAIL;
}
// Add socket (not implemented)
void* uasync_add_socket(int fd, socket_callback_t read_cbk,
socket_callback_t write_cbk,
socket_callback_t except_cbk,
void* user_arg) {
(void)fd; (void)read_cbk; (void)write_cbk; (void)except_cbk; (void)user_arg;
return NULL;
}
// Remove socket (not implemented)
err_t uasync_remove_socket(void* s_id) {
(void)s_id;
return ERR_FAIL;
}
// Test helper: advance time and process expired timers
void simple_uasync_advance_time(uint32_t delta_tb) {
current_time += delta_tb;
// Process all expired timers
while (1) {
timer_entry_t* earliest = NULL;
timer_entry_t** earliest_pp = NULL;
uint32_t earliest_time = UINT32_MAX;
// Find earliest expired timer
timer_entry_t** pp = &timer_list;
while (*pp) {
if ((*pp)->expiry_time <= current_time && (*pp)->expiry_time < earliest_time) {
earliest = *pp;
earliest_pp = pp;
earliest_time = (*pp)->expiry_time;
}
pp = &(*pp)->next;
}
if (!earliest) break;
// Remove from list
*earliest_pp = earliest->next;
// Call callback
timeout_callback_t cb = earliest->callback;
void* arg = earliest->user_arg;
free(earliest);
if (cb) {
cb(arg);
}
}
}
// Test helper: get current time
uint32_t simple_uasync_get_time(void) {
return current_time;
}
// Test helper: clear all timers
void simple_uasync_clear(void) {
while (timer_list) {
timer_entry_t* next = timer_list->next;
free(timer_list);
timer_list = next;
}
}

19
tests/simple_uasync.h

@ -0,0 +1,19 @@
// simple_uasync.h - Test helpers for uasync mock
#ifndef SIMPLE_UASYNC_H
#define SIMPLE_UASYNC_H
#include <stdint.h>
// These functions are only available when linking with simple_uasync.o
// instead of u_async.o
// Advance time by delta_tb timebase units and process expired timers
void simple_uasync_advance_time(uint32_t delta_tb);
// Get current time in timebase units
uint32_t simple_uasync_get_time(void);
// Clear all pending timers
void simple_uasync_clear(void);
#endif // SIMPLE_UASYNC_H

274
tests/test_etcp.c

@ -0,0 +1,274 @@
// test_etcp.c - Unit tests for ETCP protocol
#include "etcp.h"
#include "u_async.h"
#include "ll_queue.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <stdint.h>
#include <assert.h>
#define TEST_ASSERT(cond, msg) \
do { \
if (!(cond)) { \
printf("FAIL: %s (line %d)\n", msg, __LINE__); \
return 1; \
} else { \
printf("PASS: %s\n", msg); \
} \
} while(0)
// Mock callback storage
typedef struct {
uint8_t* data;
uint16_t len;
epkt_t* epkt;
} mock_packet_t;
#define MAX_MOCK_PACKETS 100
static mock_packet_t mock_packets[MAX_MOCK_PACKETS];
static int mock_packet_count = 0;
static void reset_mock_packets(void) {
for (int i = 0; i < mock_packet_count; i++) {
free(mock_packets[i].data);
mock_packets[i].data = NULL;
}
mock_packet_count = 0;
}
static void mock_tx_callback(epkt_t* epkt, uint8_t* data, uint16_t len, void* arg) {
(void)arg;
assert(mock_packet_count < MAX_MOCK_PACKETS);
mock_packets[mock_packet_count].data = malloc(len);
assert(mock_packets[mock_packet_count].data);
memcpy(mock_packets[mock_packet_count].data, data, len);
mock_packets[mock_packet_count].len = len;
mock_packets[mock_packet_count].epkt = epkt;
mock_packet_count++;
}
// Test 1: Basic initialization and cleanup
int test_init_free(void) {
printf("\n=== Test 1: Initialization and cleanup ===\n");
epkt_t* epkt = etcp_init();
TEST_ASSERT(epkt != NULL, "etcp_init returns non-NULL");
TEST_ASSERT(epkt->tx_queue != NULL, "tx_queue created");
TEST_ASSERT(epkt->output_queue != NULL, "output_queue created");
etcp_free(epkt);
TEST_ASSERT(1, "etcp_free completes without crash");
return 0;
}
// Test 2: Set callback
int test_set_callback(void) {
printf("\n=== Test 2: Set callback ===\n");
epkt_t* epkt = etcp_init();
TEST_ASSERT(epkt != NULL, "etcp_init");
etcp_set_callback(epkt, mock_tx_callback, NULL);
// Callback set, no easy way to verify except through tx
etcp_free(epkt);
return 0;
}
// Test 3: Put data into tx queue
int test_tx_put(void) {
printf("\n=== Test 3: TX queue put ===\n");
epkt_t* epkt = etcp_init();
TEST_ASSERT(epkt != NULL, "etcp_init");
uint8_t test_data[] = {0x01, 0x02, 0x03, 0x04, 0x05};
int result = etcp_tx_put(epkt, test_data, sizeof(test_data));
TEST_ASSERT(result == 0, "etcp_tx_put succeeds");
int queue_size = etcp_tx_queue_size(epkt);
TEST_ASSERT(queue_size == 1, "tx queue size is 1");
etcp_free(epkt);
return 0;
}
// Test 4: Simple packet transmission (without bandwidth limit)
int test_simple_tx(void) {
printf("\n=== Test 4: Simple transmission ===\n");
reset_mock_packets();
epkt_t* epkt = etcp_init();
TEST_ASSERT(epkt != NULL, "etcp_init");
// Set high bandwidth to avoid limiting
etcp_set_bandwidth(epkt, 65535);
etcp_set_callback(epkt, mock_tx_callback, NULL);
uint8_t test_data[] = "Hello ETCP!";
int result = etcp_tx_put(epkt, test_data, sizeof(test_data));
TEST_ASSERT(result == 0, "etcp_tx_put succeeds");
// Transmission may happen via queue callback
// We can't easily verify transmission in this simple test
// Just ensure no crash occurred
TEST_ASSERT(etcp_tx_queue_size(epkt) >= 0, "queue size non-negative");
etcp_free(epkt);
return 0;
}
// Test 5: Packet parsing (rx_input)
int test_rx_input(void) {
printf("\n=== Test 5: RX input parsing ===\n");
epkt_t* epkt = etcp_init();
TEST_ASSERT(epkt != NULL, "etcp_init");
// Create a simple packet: id=1, timestamp=100, hdr=0, payload "test"
uint8_t packet[] = {
0x00, 0x01, // id = 1
0x00, 0x64, // timestamp = 100
0x00, // hdr = 0 (payload)
't', 'e', 's', 't'
};
int result = etcp_rx_input(epkt, packet, sizeof(packet));
TEST_ASSERT(result == 0, "etcp_rx_input succeeds");
// Check that output queue has the data
ll_queue_t* output = etcp_get_output_queue(epkt);
TEST_ASSERT(output != NULL, "output queue exists");
int output_count = queue_entry_count(output);
TEST_ASSERT(output_count == 1, "output queue has 1 packet");
// Verify payload
ll_entry_t* entry = queue_entry_get(output);
TEST_ASSERT(entry != NULL, "got entry from output queue");
uint8_t* data = ll_entry_data(entry);
size_t data_len = ll_entry_size(entry);
TEST_ASSERT(data_len == 4, "payload length is 4");
TEST_ASSERT(memcmp(data, "test", 4) == 0, "payload matches");
queue_entry_free(entry);
etcp_free(epkt);
return 0;
}
// Test 6: Packet reordering
int test_reordering(void) {
printf("\n=== Test 6: Packet reordering ===\n");
epkt_t* epkt = etcp_init();
TEST_ASSERT(epkt != NULL, "etcp_init");
// Create packets with IDs 1, 2, 3
uint8_t packet1[] = {
0x00, 0x01, // id = 1
0x00, 0x10, // timestamp
0x00, // hdr = 0
'a'
};
uint8_t packet2[] = {
0x00, 0x02, // id = 2
0x00, 0x20, // timestamp
0x00, // hdr = 0
'b'
};
uint8_t packet3[] = {
0x00, 0x03, // id = 3
0x00, 0x30, // timestamp
0x00, // hdr = 0
'c'
};
// Receive in wrong order: 2, 1, 3
etcp_rx_input(epkt, packet2, sizeof(packet2));
etcp_rx_input(epkt, packet1, sizeof(packet1));
etcp_rx_input(epkt, packet3, sizeof(packet3));
// Check output queue - should have all 3 packets in correct order
ll_queue_t* output = etcp_get_output_queue(epkt);
TEST_ASSERT(queue_entry_count(output) == 3, "all 3 packets in output");
// Verify order: 1, 2, 3
ll_entry_t* entry;
char expected[] = {'a', 'b', 'c'};
int idx = 0;
while ((entry = queue_entry_get(output)) != NULL) {
uint8_t* data = ll_entry_data(entry);
size_t len = ll_entry_size(entry);
TEST_ASSERT(len == 1, "payload length 1");
TEST_ASSERT(data[0] == expected[idx], "correct packet order");
idx++;
queue_entry_free(entry);
}
TEST_ASSERT(idx == 3, "all packets processed");
etcp_free(epkt);
return 0;
}
// Test 7: Metrics update
int test_metrics(void) {
printf("\n=== Test 7: Metrics ===\n");
epkt_t* epkt = etcp_init();
TEST_ASSERT(epkt != NULL, "etcp_init");
// Initial metrics should be zero
TEST_ASSERT(etcp_get_rtt(epkt) == 0, "initial RTT is 0");
TEST_ASSERT(etcp_get_jitter(epkt) == 0, "initial jitter is 0");
// Send a packet with ACK to update metrics
// This requires more complex setup with round-trip
etcp_free(epkt);
return 0;
}
int main(void) {
printf("Starting ETCP tests...\n");
// Initialize uasync for timers
uasync_init();
int failures = 0;
failures += test_init_free();
failures += test_set_callback();
failures += test_tx_put();
failures += test_simple_tx();
failures += test_rx_input();
failures += test_reordering();
failures += test_metrics();
printf("\n=== Summary ===\n");
if (failures == 0) {
printf("All tests passed!\n");
} else {
printf("%d test(s) failed.\n", failures);
}
reset_mock_packets();
return failures == 0 ? 0 : 1;
}

116
tests/test_etcp_simple.c

@ -0,0 +1,116 @@
// test_etcp_simple.c - Simple test to verify ETCP sender-receiver communication
#include "etcp.h"
#include "u_async.h"
#include "ll_queue.h"
#include "simple_uasync.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <stdint.h>
// Sender's TX callback - just forward to receiver
static void sender_tx_callback(epkt_t* epkt, uint8_t* data, uint16_t len, void* arg) {
(void)epkt;
epkt_t* receiver = (epkt_t*)arg;
printf("Sender TX callback: sending %u bytes to receiver\n", len);
// Forward directly to receiver (no loss, no delay)
etcp_rx_input(receiver, data, len);
}
// Receiver's TX callback - would send ACKs back to sender in real scenario
static void receiver_tx_callback(epkt_t* epkt, uint8_t* data, uint16_t len, void* arg) {
(void)epkt;
(void)data;
(void)len;
(void)arg;
printf("Receiver TX callback: ACK sent back\n");
}
int main(void) {
printf("Simple ETCP sender-receiver test\n");
// Initialize uasync (mock)
uasync_init();
// Create ETCP instances
epkt_t* sender = etcp_init();
epkt_t* receiver = etcp_init();
if (!sender || !receiver) {
printf("ERROR: Failed to create ETCP instances\n");
return 1;
}
// Set up callbacks
etcp_set_callback(sender, sender_tx_callback, receiver);
etcp_set_callback(receiver, receiver_tx_callback, sender);
// Send a simple packet
const char* test_data = "Hello, ETCP!";
uint16_t data_len = strlen(test_data);
uint8_t* data = malloc(data_len);
memcpy(data, test_data, data_len);
printf("Sending test packet: %s\n", test_data);
if (etcp_tx_put(sender, data, data_len) != 0) {
printf("ERROR: Failed to queue packet\n");
free(data);
etcp_free(sender);
etcp_free(receiver);
return 1;
}
free(data); // etcp_tx_put makes its own copy
// Advance time to allow transmission
printf("Advancing time...\n");
for (int i = 0; i < 10; i++) {
simple_uasync_advance_time(10); // 1ms each
// Process any timers (retransmissions, etc.)
}
// Check receiver's output queue
ll_queue_t* output_queue = etcp_get_output_queue(receiver);
if (!output_queue) {
printf("ERROR: Receiver output queue is NULL\n");
etcp_free(sender);
etcp_free(receiver);
return 1;
}
ll_entry_t* entry = queue_entry_get(output_queue);
if (!entry) {
printf("FAIL: No packet in receiver output queue\n");
// Debug: check queue size
printf("Queue size check: %d\n", queue_entry_count(output_queue));
etcp_free(sender);
etcp_free(receiver);
return 1;
}
uint8_t* received_data = ll_entry_data(entry);
uint16_t received_len = ll_entry_size(entry);
printf("SUCCESS: Received packet of length %u\n", received_len);
printf("Data: ");
for (uint16_t i = 0; i < received_len; i++) {
printf("%c", received_data[i]);
}
printf("\n");
if (received_len == data_len && memcmp(received_data, test_data, data_len) == 0) {
printf("PASS: Data matches!\n");
} else {
printf("FAIL: Data doesn't match\n");
}
queue_entry_free(entry);
etcp_free(sender);
etcp_free(receiver);
return 0;
}

419
tests/test_etcp_stress.c

@ -0,0 +1,419 @@
// test_etcp_stress.c - Stress test for ETCP with packet loss, delay, and reordering
#include "etcp.h"
#include "u_async.h"
#include "ll_queue.h"
#include "simple_uasync.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <stdint.h>
#include <assert.h>
#include <time.h>
#define NUM_PACKETS 1000
#define MIN_PACKET_SIZE 1
#define MAX_PACKET_SIZE 1300
#define LOSS_PROBABILITY 0.0 // 0% packet loss for window testing
#define REORDER_PROBABILITY 0.0 // 0% reordering for testing
#define QUEUE_MAX_SIZE 1000 // Max packets in delay queue
#define MAX_DELAY_MS 0 // No delay for testing
#define TIME_BASE_MS 0.1 // uasync timebase is 0.1ms
// Packet in the network delay queue
typedef struct delayed_packet {
uint8_t* data;
uint16_t len;
uint32_t delivery_time; // When to deliver (in timebase units)
struct delayed_packet* next;
} delayed_packet_t;
// Network emulator structure
typedef struct {
epkt_t* sender; // ETCP instance that sends
epkt_t* receiver; // ETCP instance that receives
delayed_packet_t* queue; // Delay queue (sorted by delivery time)
int queue_size;
uint32_t current_time; // Current time in timebase units
uint32_t packets_sent;
uint32_t packets_lost;
uint32_t packets_reordered;
uint32_t packets_delivered;
uint8_t running;
void* timer_id;
} network_emulator_t;
// Forward declarations
static void network_timer_callback(void* arg);
static void sender_tx_callback(epkt_t* epkt, uint8_t* data, uint16_t len, void* arg);
static void receiver_tx_callback(epkt_t* epkt, uint8_t* data, uint16_t len, void* arg);
static void deliver_packets(network_emulator_t* net);
static void free_delay_queue(delayed_packet_t* queue);
// Random number generator (simple LCG)
static uint32_t random_state = 123456789;
static uint32_t random_next(void) {
random_state = random_state * 1103515245 + 12345;
return random_state;
}
static double random_double(void) {
return (double)random_next() / (double)UINT32_MAX;
}
// Network timer callback - advances time and delivers packets
static void network_timer_callback(void* arg) {
network_emulator_t* net = (network_emulator_t*)arg;
if (!net || !net->running) return;
// Advance time by 1ms (10 timebase units)
net->current_time += 10;
// Deliver any packets whose time has come
deliver_packets(net);
// Reschedule timer if still running
if (net->running) {
net->timer_id = uasync_set_timeout(10, net, network_timer_callback);
}
}
// Sender's TX callback - called when ETCP wants to send a packet
static void sender_tx_callback(epkt_t* epkt, uint8_t* data, uint16_t len, void* arg) {
(void)epkt;
network_emulator_t* net = (network_emulator_t*)arg;
if (!net || !net->running) return;
net->packets_sent++;
// 10% packet loss - just ignore the packet, ETCP will free the data
if (random_double() < LOSS_PROBABILITY) {
net->packets_lost++;
return;
}
// Calculate delivery time (current time + random delay up to MAX_DELAY_MS)
uint32_t delay_ms = (uint32_t)(random_double() * MAX_DELAY_MS);
uint32_t delivery_time = net->current_time + (delay_ms * 10); // Convert ms to timebase
// Create delayed packet - need to copy data since ETCP owns the original
delayed_packet_t* pkt = malloc(sizeof(delayed_packet_t));
if (!pkt) {
return; // Memory allocation failed, packet is lost
}
pkt->data = malloc(len);
if (!pkt->data) {
free(pkt);
net->packets_lost++; // Count as loss due to memory failure
return;
}
memcpy(pkt->data, data, len);
pkt->len = len;
pkt->delivery_time = delivery_time;
pkt->next = NULL;
// Insert into delay queue
delayed_packet_t** pp = &net->queue;
// 30% chance to insert at random position (reordering)
if (random_double() < REORDER_PROBABILITY && net->queue_size > 1) {
net->packets_reordered++;
int insert_pos = random_next() % (net->queue_size + 1);
for (int i = 0; i < insert_pos && *pp; i++) {
pp = &(*pp)->next;
}
} else {
// Normal insertion (sorted by delivery time)
while (*pp && (*pp)->delivery_time < delivery_time) {
pp = &(*pp)->next;
}
}
pkt->next = *pp;
*pp = pkt;
net->queue_size++;
// Limit queue size (drop first packet if needed)
if (net->queue_size > QUEUE_MAX_SIZE) {
delayed_packet_t* first = net->queue;
if (first) {
net->queue = first->next;
free(first->data);
free(first);
net->queue_size--;
net->packets_lost++; // Count as loss due to queue overflow
}
}
}
// Receiver's TX callback - called when receiver wants to send ACKs or retransmission requests
static void receiver_tx_callback(epkt_t* epkt, uint8_t* data, uint16_t len, void* arg) {
(void)epkt;
network_emulator_t* net = (network_emulator_t*)arg;
if (!net || !net->running) return;
// Forward ACKs/retrans requests directly to sender with minimal delay (1 timebase unit)
// This allows sender to receive ACKs and retransmit lost packets
simple_uasync_advance_time(1);
net->current_time = simple_uasync_get_time();
etcp_rx_input(net->sender, data, len);
// Note: We don't track these in statistics since they're control packets
}
// Deliver packets whose delivery time has arrived
static void deliver_packets(network_emulator_t* net) {
while (net->queue && net->queue->delivery_time <= net->current_time) {
delayed_packet_t* pkt = net->queue;
net->queue = pkt->next;
// Deliver to receiver
etcp_rx_input(net->receiver, pkt->data, pkt->len);
net->packets_delivered++;
free(pkt->data);
free(pkt);
net->queue_size--;
}
}
// Free delay queue
static void free_delay_queue(delayed_packet_t* queue) {
while (queue) {
delayed_packet_t* next = queue->next;
free(queue->data);
free(queue);
queue = next;
}
}
// Generate random packet data
static void generate_packet_data(uint8_t* buffer, uint16_t size, uint32_t seq) {
// Fill with pattern: sequence number + random data
for (uint16_t i = 0; i < size; i++) {
if (i < 4) {
// First 4 bytes: sequence number
buffer[i] = (seq >> (8 * i)) & 0xFF;
} else {
// Rest: pseudo-random data based on sequence and position
buffer[i] = (uint8_t)((seq * 7919 + i * 104729) % 256);
}
}
}
// Verify received packet
static int verify_packet_data(const uint8_t* data, uint16_t size, uint32_t seq) {
if (size < 4) return 0;
// Check sequence number
uint32_t received_seq = 0;
for (int i = 0; i < 4; i++) {
received_seq |= ((uint32_t)data[i]) << (8 * i);
}
if (received_seq != seq) return 0;
// Verify the rest of the data
for (uint16_t i = 4; i < size; i++) {
uint8_t expected = (uint8_t)((seq * 7919 + i * 104729) % 256);
if (data[i] != expected) return 0;
}
return 1;
}
// Stress test main function
int main(void) {
printf("Starting ETCP stress test...\n");
printf("Parameters:\n");
printf(" Packets: %d\n", NUM_PACKETS);
printf(" Size range: %d-%d bytes\n", MIN_PACKET_SIZE, MAX_PACKET_SIZE);
printf(" Loss probability: %.1f%%\n", LOSS_PROBABILITY * 100);
printf(" Reorder probability: %.1f%%\n", REORDER_PROBABILITY * 100);
printf(" Max delay: %d ms\n", MAX_DELAY_MS);
printf(" Queue size: %d packets\n", QUEUE_MAX_SIZE);
// Seed random number generator
random_state = (uint32_t)time(NULL);
// Initialize uasync
uasync_init();
// Create network emulator
network_emulator_t net = {0};
net.running = 1;
net.current_time = 0;
// Create ETCP instances
net.sender = etcp_init();
net.receiver = etcp_init();
if (!net.sender || !net.receiver) {
printf("ERROR: Failed to create ETCP instances\n");
return 1;
}
// Set up callbacks
etcp_set_callback(net.sender, sender_tx_callback, &net);
etcp_set_callback(net.receiver, receiver_tx_callback, &net);
// Set reasonable bandwidth for stress test
etcp_set_bandwidth(net.sender, 50000); // 50k bytes per timebase
etcp_set_bandwidth(net.receiver, 50000);
// Don't start network timer - we'll advance time manually
// net.timer_id = uasync_set_timeout(10, &net, network_timer_callback);
printf("\nGenerating and sending %d packets...\n", NUM_PACKETS);
// Generate and send packets
uint32_t packets_generated = 0;
uint32_t bytes_generated = 0;
uint32_t last_time_print = 0;
while (packets_generated < NUM_PACKETS) {
// Generate random packet size
uint16_t size = MIN_PACKET_SIZE + (random_next() % (MAX_PACKET_SIZE - MIN_PACKET_SIZE + 1));
// Allocate and fill packet data
uint8_t* data = malloc(size);
if (!data) {
printf("ERROR: Memory allocation failed\n");
break;
}
generate_packet_data(data, size, packets_generated);
// Send via ETCP
if (etcp_tx_put(net.sender, data, size) != 0) {
printf("ERROR: Failed to queue packet %u\n", packets_generated);
free(data);
break;
}
free(data); // etcp_tx_put makes its own copy
packets_generated++;
bytes_generated += size;
// Periodically print progress
if (packets_generated % 1000 == 0) {
printf(" Sent %u packets, %u bytes\n", packets_generated, bytes_generated);
}
// Advance time to allow ETCP to send packets (bandwidth limiting)
// and process any expired timers (retransmissions, etc.)
simple_uasync_advance_time(1); // Advance by 1 timebase unit (0.1ms)
net.current_time = simple_uasync_get_time(); // Keep in sync
// Deliver any packets whose time has come
deliver_packets(&net);
// Periodically print time progress
if (net.current_time - last_time_print >= 1000) { // Every 100ms
printf(" Time: %.1f ms, Queue: %d packets\n",
net.current_time / 10.0, net.queue_size);
last_time_print = net.current_time;
}
}
printf("Finished sending %u packets (%u bytes)\n", packets_generated, bytes_generated);
// Let network deliver remaining packets and wait for retransmissions
printf("\nDelivering remaining packets and waiting for retransmissions...\n");
uint32_t start_time = net.current_time;
for (int i = 0; i < 50000 && (net.queue_size > 0 || i < 1000); i++) {
simple_uasync_advance_time(10); // Advance 1ms
net.current_time = simple_uasync_get_time(); // Keep in sync
deliver_packets(&net);
// Periodically advance more time to speed up retransmission timeouts
if (i % 10 == 0) {
simple_uasync_advance_time(100); // Additional 10ms
net.current_time = simple_uasync_get_time();
deliver_packets(&net);
}
if (i % 500 == 0) {
uint32_t elapsed = net.current_time - start_time;
printf(" Queue: %d, Time: %.1f ms (elapsed: %.1f ms)\n",
net.queue_size, net.current_time / 10.0, elapsed / 10.0);
}
}
// No network timer to stop since we're not using one
net.running = 0;
// Free any remaining packets in queue
free_delay_queue(net.queue);
net.queue = NULL;
net.queue_size = 0;
// Collect and verify received packets
printf("\nVerifying received packets...\n");
ll_queue_t* output_queue = etcp_get_output_queue(net.receiver);
uint32_t packets_received = 0;
uint32_t bytes_received = 0;
uint32_t correct_packets = 0;
uint32_t max_received_seq = 0;
ll_entry_t* entry;
while ((entry = queue_entry_get(output_queue)) != NULL) {
uint8_t* data = ll_entry_data(entry);
uint16_t size = ll_entry_size(entry);
if (size >= 4) {
uint32_t seq = 0;
for (int i = 0; i < 4; i++) {
seq |= ((uint32_t)data[i]) << (8 * i);
}
if (verify_packet_data(data, size, seq)) {
correct_packets++;
if (seq > max_received_seq) {
max_received_seq = seq;
}
}
}
packets_received++;
bytes_received += size;
queue_entry_free(entry);
}
// Print statistics
printf("\n=== Statistics ===\n");
printf("Packets generated: %u\n", packets_generated);
printf("Packets sent: %u (via ETCP)\n", net.packets_sent);
printf("Packets lost: %u (%.1f%%)\n", net.packets_lost,
(net.packets_sent > 0) ? (100.0 * net.packets_lost / net.packets_sent) : 0.0);
printf("Packets reordered: %u (%.1f%% of delivered)\n", net.packets_reordered,
(net.packets_delivered > 0) ? (100.0 * net.packets_reordered / net.packets_delivered) : 0.0);
printf("Packets delivered: %u (to receiver)\n", net.packets_delivered);
printf("Packets received: %u (in output queue)\n", packets_received);
printf("Correct packets: %u (%.1f%%)\n", correct_packets,
(packets_received > 0) ? (100.0 * correct_packets / packets_received) : 0.0);
printf("Bytes generated: %u\n", bytes_generated);
printf("Bytes received: %u\n", bytes_received);
// Check for missing packets
uint32_t expected_received = packets_generated - net.packets_lost;
if (packets_received < expected_received) {
printf("\nWARNING: Received %u packets, expected ~%u (some may be in flight)\n",
packets_received, expected_received);
} else if (packets_received > expected_received) {
printf("\nWARNING: Received %u packets, expected ~%u (duplicates?)\n",
packets_received, expected_received);
}
// Cleanup
etcp_free(net.sender);
etcp_free(net.receiver);
printf("\nStress test completed.\n");
// Consider test successful if we received at least some packets
// (with losses, we won't get all of them)
return (correct_packets > 0) ? 0 : 1;
}
Loading…
Cancel
Save