diff --git a/Makefile b/Makefile index b2458fd..f0cb6c5 100644 --- a/Makefile +++ b/Makefile @@ -19,10 +19,20 @@ SC_LIB_OBJS := sc_lib.o UASYNC_OBJS := u_async.o PN_OBJS := pkt_normalizer.o settings.o LL_QUEUE_OBJS := ll_queue.o +ETCP_OBJS := etcp.o -all: $(TEST_DIR)/test_ecc_encrypt $(TEST_DIR)/test_sc_lib $(TEST_DIR)/test_udp_secure $(TEST_DIR)/test_pkt_normalizer +all: $(TEST_DIR)/test_ecc_encrypt $(TEST_DIR)/test_sc_lib $(TEST_DIR)/test_udp_secure $(TEST_DIR)/test_pkt_normalizer $(TEST_DIR)/test_etcp $(TEST_DIR)/test_etcp_stress $(TEST_DIR)/test_etcp_simple -$(TEST_DIR)/test_ecc_encrypt: $(TEST_DIR)/test_ecc_encrypt.o $(TINYCRYPT_OBJS) +$(TEST_DIR)/test_pkt_normalizer: $(TEST_DIR)/test_pkt_normalizer.o $(PN_OBJS) $(LL_QUEUE_OBJS) $(UASYNC_OBJS) + $(CC) $(CFLAGS) $(INCLUDES) -o $@ $^ + +$(TEST_DIR)/test_etcp: $(TEST_DIR)/test_etcp.o $(ETCP_OBJS) $(LL_QUEUE_OBJS) $(UASYNC_OBJS) + $(CC) $(CFLAGS) $(INCLUDES) -o $@ $^ + +$(TEST_DIR)/test_etcp_stress: $(TEST_DIR)/test_etcp_stress.o $(ETCP_OBJS) $(LL_QUEUE_OBJS) $(TEST_DIR)/simple_uasync.o + $(CC) $(CFLAGS) $(INCLUDES) -o $@ $^ + +$(TEST_DIR)/test_etcp_simple: $(TEST_DIR)/test_etcp_simple.o $(ETCP_OBJS) $(LL_QUEUE_OBJS) $(TEST_DIR)/simple_uasync.o $(CC) $(CFLAGS) $(INCLUDES) -o $@ $^ $(TEST_DIR)/test_sc_lib: $(TEST_DIR)/test_sc_lib.o $(SC_LIB_OBJS) $(TINYCRYPT_OBJS) @@ -34,11 +44,14 @@ $(TEST_DIR)/test_udp_secure: $(TEST_DIR)/test_udp_secure.o $(SC_LIB_OBJS) $(UASY $(TEST_DIR)/test_pkt_normalizer: $(TEST_DIR)/test_pkt_normalizer.o $(PN_OBJS) $(LL_QUEUE_OBJS) $(UASYNC_OBJS) $(CC) $(CFLAGS) $(INCLUDES) -o $@ $^ +$(TEST_DIR)/test_etcp: $(TEST_DIR)/test_etcp.o $(ETCP_OBJS) $(LL_QUEUE_OBJS) $(UASYNC_OBJS) + $(CC) $(CFLAGS) $(INCLUDES) -o $@ $^ + %.o: %.c $(CC) $(CFLAGS) $(INCLUDES) -c $< -o $@ clean: - rm -f $(TEST_DIR)/test_ecc_encrypt $(TEST_DIR)/test_sc_lib $(TEST_DIR)/test_udp_secure $(TEST_DIR)/test_pkt_normalizer \ + rm -f $(TEST_DIR)/test_ecc_encrypt $(TEST_DIR)/test_sc_lib $(TEST_DIR)/test_udp_secure $(TEST_DIR)/test_pkt_normalizer $(TEST_DIR)/test_etcp $(TEST_DIR)/test_etcp_stress $(TEST_DIR)/test_etcp_simple \ *.o tinycrypt/lib/source/*.o $(TEST_DIR)/*.o .PHONY: all clean \ No newline at end of file diff --git a/etcp.c b/etcp.c new file mode 100644 index 0000000..eed15df --- /dev/null +++ b/etcp.c @@ -0,0 +1,776 @@ +// etcp.c - Extended Transmission Control Protocol +#include "etcp.h" +#include "u_async.h" +#include +#include +#include + +// Internal structures +typedef struct rx_packet { + struct rx_packet* next; + uint16_t id; + uint16_t timestamp; + uint8_t* data; + uint16_t data_len; + uint8_t has_payload; +} rx_packet_t; + +typedef struct sent_packet { + struct sent_packet* next; + uint16_t id; + uint16_t timestamp; + uint8_t* data; + uint16_t data_len; // Total packet length + uint16_t payload_len; // Payload length (for window accounting) + uint16_t send_time; + uint8_t need_ack; +} sent_packet_t; + +// Forward declarations of internal functions +static void tx_process(epkt_t* epkt); +static void retransmit_check(epkt_t* epkt); +static void update_metrics(epkt_t* epkt, uint16_t rtt); +static uint16_t get_current_timestamp(void); +static uint16_t timestamp_diff(uint16_t t1, uint16_t t2); +static int id_compare(uint16_t id1, uint16_t id2); + +// Timer callbacks +static void tx_timer_callback(void* arg); +static void retransmit_timer_callback(void* arg); + +// Internal queue callbacks +static void tx_queue_callback(ll_queue_t* q, ll_entry_t* entry, void* arg); +static void schedule_ack_timer(epkt_t* epkt); +static void request_retransmission_for_gaps(epkt_t* epkt); + +// Initialize new ETCP instance +epkt_t* etcp_init(void) { + epkt_t* epkt = calloc(1, sizeof(epkt_t)); + if (!epkt) return NULL; + + // Create queues + epkt->tx_queue = queue_new(); + epkt->output_queue = queue_new(); + if (!epkt->tx_queue || !epkt->output_queue) { + if (epkt->tx_queue) queue_free(epkt->tx_queue); + if (epkt->output_queue) queue_free(epkt->output_queue); + free(epkt); + return NULL; + } + + // Set callback for tx queue + queue_set_callback(epkt->tx_queue, tx_queue_callback, epkt); + + // Initialize state + epkt->bandwidth = 10000; // Default: 10000 bytes per timebase (0.1us) + epkt->last_sent_timestamp = get_current_timestamp(); + epkt->bytes_allowed = 0; + etcp_update_window(epkt); // Initialize window size and retrans timer + + // Initialize lists + epkt->rx_list = NULL; + epkt->sent_list = NULL; + + // Initialize metrics + epkt->rtt_last = 0; + epkt->rtt_avg_10 = 0; + epkt->rtt_avg_100 = 0; + epkt->jitter = 0; + epkt->bytes_sent_total = 0; + + // Initialize IDs + epkt->next_tx_id = 1; + epkt->last_rx_id = 0; + epkt->last_delivered_id = 0; + + // Initialize history + epkt->rtt_history_idx = 0; + epkt->rtt_history_count = 0; + + // Initialize pending arrays + epkt->pending_ack_count = 0; + epkt->pending_retransmit_count = 0; + + // Initialize window management + epkt->unacked_bytes = 0; + epkt->window_size = 0; + epkt->last_acked_id = 0; + epkt->last_rx_ack_id = 0; + epkt->retrans_timer_period = 20; // Default 2ms (20 timebase units) + epkt->next_retrans_time = 0; + epkt->window_blocked = 0; + + // No timers yet + epkt->next_tx_timer = NULL; + epkt->retransmit_timer = NULL; + + return epkt; +} + +// Free ETCP instance +void etcp_free(epkt_t* epkt) { + if (!epkt) return; + + // Cancel timers + if (epkt->next_tx_timer) { + uasync_cancel_timeout(epkt->next_tx_timer); + } + if (epkt->retransmit_timer) { + uasync_cancel_timeout(epkt->retransmit_timer); + } + + // Free queues + if (epkt->tx_queue) queue_free(epkt->tx_queue); + if (epkt->output_queue) queue_free(epkt->output_queue); + + // Free rx_list + rx_packet_t* rx = epkt->rx_list; + while (rx) { + rx_packet_t* next = rx->next; + if (rx->data) free(rx->data); + free(rx); + rx = next; + } + + // Free sent_list + sent_packet_t* sent = epkt->sent_list; + while (sent) { + sent_packet_t* next = sent->next; + if (sent->data) free(sent->data); + free(sent); + sent = next; + } + + free(epkt); +} + +// Set callback for sending packets +void etcp_set_callback(epkt_t* epkt, etcp_tx_callback_t cb, void* arg) { + if (!epkt) return; + epkt->tx_callback = cb; + epkt->tx_callback_arg = arg; +} + +// Get output queue +ll_queue_t* etcp_get_output_queue(epkt_t* epkt) { + return epkt ? epkt->output_queue : NULL; +} + +// Set bandwidth +void etcp_set_bandwidth(epkt_t* epkt, uint16_t bandwidth) { + if (!epkt) return; + epkt->bandwidth = bandwidth; + etcp_update_window(epkt); +} + +// Update window size based on current RTT and bandwidth +void etcp_update_window(epkt_t* epkt) { + if (!epkt) return; + uint16_t rtt = epkt->rtt_avg_10; + if (rtt == 0) { + epkt->window_size = (uint32_t)-1; // Unlimited window until RTT measured + // Keep default retrans_timer_period (20 = 2ms) + return; + } + // window = RTT * bandwidth * 2 + // RTT in timebase (0.1ms), bandwidth in bytes per timebase + // Multiply using 32-bit to avoid overflow + uint32_t rtt32 = rtt; + uint32_t bw32 = epkt->bandwidth; + epkt->window_size = rtt32 * bw32 * 2; + + // Update retransmission timer period: max(RTT/2, 2ms) + uint16_t rtt_half = rtt / 2; + if (rtt_half < 20) { // 2ms = 20 timebase units + rtt_half = 20; + } + epkt->retrans_timer_period = rtt_half; +} + +// Get RTT +uint16_t etcp_get_rtt(epkt_t* epkt) { + return epkt ? epkt->rtt_last : 0; +} + +// Get jitter +uint16_t etcp_get_jitter(epkt_t* epkt) { + return epkt ? epkt->jitter : 0; +} + +// Put data into transmission queue +int etcp_tx_put(epkt_t* epkt, uint8_t* data, uint16_t len) { + if (!epkt || !data || len == 0) return -1; + + // Create queue entry + ll_entry_t* entry = queue_entry_new(len); + if (!entry) return -1; + + // Copy data + memcpy(ll_entry_data(entry), data, len); + + // Add to queue + int result = queue_entry_put(epkt->tx_queue, entry); + if (result != 0) { + queue_entry_free(entry); + } + + return result; +} + +// Get total number of packets in transmission queues +int etcp_tx_queue_size(epkt_t* epkt) { + if (!epkt) return 0; + int count = queue_entry_count(epkt->tx_queue); + // Add packets in sent_list waiting for ACK + sent_packet_t* sent = epkt->sent_list; + while (sent) { + if (sent->need_ack) { + count++; + } + sent = sent->next; + } + return count; +} + +// ==================== Internal Functions ==================== + +// Get current timestamp (0.1us timebase) +static uint16_t get_current_timestamp(void) { + // TODO: Implement using u_async time or system time + // For now, return increasing counter + static uint16_t counter = 0; + return counter++; +} + +// Calculate positive difference between timestamps (considering wrap-around) +static uint16_t timestamp_diff(uint16_t t1, uint16_t t2) { + return (t1 - t2) & 0xFFFF; // modulo 65536 +} + +// Compare IDs considering wrap-around +static int id_compare(uint16_t id1, uint16_t id2) { + return (int16_t)(id1 - id2); +} + +// Update metrics with new RTT measurement +static void update_metrics(epkt_t* epkt, uint16_t rtt) { + if (!epkt) return; + + epkt->rtt_last = rtt; + + // Update history + epkt->rtt_history[epkt->rtt_history_idx] = rtt; + epkt->rtt_history_idx = (epkt->rtt_history_idx + 1) % 100; + if (epkt->rtt_history_count < 100) { + epkt->rtt_history_count++; + } + + // Calculate average of last 10 + uint32_t sum10 = 0; + int count10 = (epkt->rtt_history_count < 10) ? epkt->rtt_history_count : 10; + for (int i = 0; i < count10; i++) { + int idx = (epkt->rtt_history_idx - 1 - i + 100) % 100; + sum10 += epkt->rtt_history[idx]; + } + if (count10 > 0) { + epkt->rtt_avg_10 = sum10 / count10; + } + + // Calculate average of last 100 + uint32_t sum100 = 0; + int count100 = epkt->rtt_history_count; + for (int i = 0; i < count100; i++) { + sum100 += epkt->rtt_history[i]; + } + if (count100 > 0) { + epkt->rtt_avg_100 = sum100 / count100; + } + + // Update jitter + if (count10 > 0) { + int16_t diff = (int16_t)(epkt->rtt_avg_10 - rtt); + if (diff < 0) diff = -diff; + // jitter += (abs(rtt_avg_10 - rtt_last) - jitter) * 0.1 + epkt->jitter += ((uint16_t)diff - epkt->jitter) / 10; + } + + // Update window size based on new RTT + etcp_update_window(epkt); +} + +// Process transmission queue +static void tx_process(epkt_t* epkt) { + if (!epkt || !epkt->tx_callback) return; + + // Check bandwidth limit + uint16_t current_time = get_current_timestamp(); + uint16_t delta = timestamp_diff(current_time, epkt->last_sent_timestamp); + if (delta > 0) { + epkt->bytes_allowed += delta * epkt->bandwidth; + epkt->last_sent_timestamp = current_time; + } + + // If no bytes allowed, schedule timer + if (epkt->bytes_allowed == 0) { + // Schedule next attempt + if (!epkt->next_tx_timer) { + epkt->next_tx_timer = uasync_set_timeout(1, epkt, tx_timer_callback); + } + return; + } + + // Get data from queue + ll_entry_t* entry = queue_entry_get(epkt->tx_queue); + int data_packet = 1; // 1 if packet contains payload, 0 if metrics only + uint16_t data_len = 0; + + if (!entry) { + // No data to send, but we might need to send metrics + if (epkt->pending_ack_count == 0 && epkt->pending_retransmit_count == 0) { + return; // Nothing to send at all + } + data_packet = 0; + data_len = 0; + } else { + data_len = ll_entry_size(entry); + } + + // Check window size for data packets + if (data_packet && epkt->window_size != (uint32_t)-1) { + if (epkt->unacked_bytes + data_len > epkt->window_size) { + // Window full, block transmission + epkt->window_blocked = 1; + // DEBUG + // printf("Window blocked: unacked=%u, data_len=%u, window=%u\n", + // epkt->unacked_bytes, data_len, epkt->window_size); + // Put entry back to queue + queue_entry_put_first(epkt->tx_queue, entry); + // Schedule check when window might open (after retransmission timer) + if (!epkt->next_tx_timer) { + epkt->next_tx_timer = uasync_set_timeout(epkt->retrans_timer_period, epkt, tx_timer_callback); + } + return; + } + } + + // Calculate packet size (header + data + metrics) + uint16_t packet_size = 4; // id+timestamp + if (data_packet) { + packet_size += 1 + data_len; // hdr=0 + data + } + + // Add space for ACKs + if (epkt->pending_ack_count > 0) { + packet_size += 1 + epkt->pending_ack_count * 4; // hdr=0x01 + ids+timestamps + } + // Add space for retransmission requests + if (epkt->pending_retransmit_count > 0) { + uint8_t count = epkt->pending_retransmit_count; + if (count > 32) count = 32; + packet_size += 1 + count * 2 + 2; // hdr + IDs + last delivered ID + } + + // Check if we have enough bandwidth + if (epkt->bytes_allowed < packet_size) { + // Not enough bandwidth + if (data_packet) { + // Put entry back and schedule timer + queue_entry_put_first(epkt->tx_queue, entry); + } + if (!epkt->next_tx_timer) { + uint16_t wait_time = (packet_size - epkt->bytes_allowed) / epkt->bandwidth; + if (wait_time < 1) wait_time = 1; + epkt->next_tx_timer = uasync_set_timeout(wait_time, epkt, tx_timer_callback); + } + return; + } + + // Allocate packet buffer + uint8_t* packet = malloc(packet_size); + if (!packet) { + queue_entry_free(entry); + return; + } + + // Build packet + uint8_t* ptr = packet; + + // ID and timestamp + uint16_t id; + if (data_packet) { + id = epkt->next_tx_id++; + } else { + id = 0; // metrics-only packet + } + uint16_t timestamp = current_time; + *ptr++ = id >> 8; + *ptr++ = id & 0xFF; + *ptr++ = timestamp >> 8; + *ptr++ = timestamp & 0xFF; + + // Add ACKs if any + if (epkt->pending_ack_count > 0) { + *ptr++ = 0x01; // hdr for timestamp report + for (int i = 0; i < epkt->pending_ack_count; i++) { + *ptr++ = epkt->pending_ack_ids[i] >> 8; + *ptr++ = epkt->pending_ack_ids[i] & 0xFF; + *ptr++ = epkt->pending_ack_timestamps[i] >> 8; + *ptr++ = epkt->pending_ack_timestamps[i] & 0xFF; + } + epkt->pending_ack_count = 0; + } + + // Add retransmission requests if any + if (epkt->pending_retransmit_count > 0) { + uint8_t count = epkt->pending_retransmit_count; + if (count > 32) count = 32; + *ptr++ = 0x10 + (count - 1); // hdr with count + for (int i = 0; i < count; i++) { + *ptr++ = epkt->pending_retransmit_ids[i] >> 8; + *ptr++ = epkt->pending_retransmit_ids[i] & 0xFF; + } + // Add last delivered ID + *ptr++ = epkt->last_delivered_id >> 8; + *ptr++ = epkt->last_delivered_id & 0xFF; + epkt->pending_retransmit_count = 0; + } + + // Add payload if present + if (data_packet) { + *ptr++ = 0x00; // hdr=0 for payload + memcpy(ptr, ll_entry_data(entry), data_len); + ptr += data_len; + } + + // Send packet + epkt->tx_callback(epkt, packet, packet_size, epkt->tx_callback_arg); + + // Update bandwidth accounting + epkt->bytes_allowed -= packet_size; + epkt->bytes_sent_total += packet_size; + + if (data_packet) { + // Store in sent list for possible retransmission + sent_packet_t* sent = malloc(sizeof(sent_packet_t)); + if (sent) { + sent->id = id; + sent->timestamp = timestamp; + sent->data = packet; // Keep the packet for retransmission + sent->data_len = packet_size; + sent->send_time = current_time; + sent->need_ack = 1; + sent->payload_len = data_len; + + // Update unacked bytes + epkt->unacked_bytes += data_len; + + // Add to list + sent->next = epkt->sent_list; + epkt->sent_list = sent; + } else { + free(packet); + } + + // Free queue entry + queue_entry_free(entry); + } else { + // Metrics-only packet, free packet buffer + free(packet); + } + + // Schedule retransmit check if not already scheduled + if (!epkt->retransmit_timer && epkt->retrans_timer_period > 0) { + epkt->retransmit_timer = uasync_set_timeout(epkt->retrans_timer_period, epkt, retransmit_timer_callback); + } + + // Resume queue callback for next packet + queue_resume_callback(epkt->tx_queue); +} + +// Check for needed retransmissions +static void retransmit_check(epkt_t* epkt) { + if (!epkt) return; + + uint16_t current_time = get_current_timestamp(); + // Threshold = RTT * 1.5 + uint16_t threshold = epkt->rtt_avg_10 + epkt->rtt_avg_10 / 2; + + sent_packet_t* sent = epkt->sent_list; + while (sent) { + uint16_t age = timestamp_diff(current_time, sent->send_time); + if (age > threshold && sent->need_ack) { + // Add to retransmit queue + if (epkt->pending_retransmit_count < 32) { + epkt->pending_retransmit_ids[epkt->pending_retransmit_count++] = sent->id; + } + } + sent = sent->next; + } + + // Reschedule check with updated period + if (epkt->retrans_timer_period > 0) { + epkt->retransmit_timer = uasync_set_timeout(epkt->retrans_timer_period, epkt, retransmit_timer_callback); + } else { + epkt->retransmit_timer = NULL; + } +} + +// Process received packet +int etcp_rx_input(epkt_t* epkt, uint8_t* pkt, uint16_t len) { + if (!epkt || !pkt || len < 4) return -1; + + // Parse header + uint8_t* ptr = pkt; + uint16_t id = (ptr[0] << 8) | ptr[1]; + uint16_t timestamp = (ptr[2] << 8) | ptr[3]; + ptr += 4; + len -= 4; + + // Track last received ID + if (id_compare(id, epkt->last_rx_id) > 0) { + epkt->last_rx_id = id; + } + + // Process headers + uint8_t has_payload = 0; + uint8_t* payload = NULL; + uint16_t payload_len = 0; + + while (len > 0) { + uint8_t hdr = *ptr++; + len--; + + if (hdr == 0x00) { + // Payload + has_payload = 1; + payload = ptr; + payload_len = len; + break; // Payload is the rest of the packet + } else if (hdr == 0x01) { + // Timestamp report + if (len >= 4) { + uint16_t ack_id = (ptr[0] << 8) | ptr[1]; + uint16_t ack_timestamp = (ptr[2] << 8) | ptr[3]; + ptr += 4; + len -= 4; + + // Calculate RTT + uint16_t current_time = get_current_timestamp(); + uint16_t rtt_raw = timestamp_diff(current_time, ack_timestamp); + if (rtt_raw > 0) { + update_metrics(epkt, rtt_raw); + } + + // Remove acknowledged packet from sent_list + sent_packet_t* sent = epkt->sent_list; + sent_packet_t* prev = NULL; + while (sent) { + if (sent->id == ack_id) { + if (prev) { + prev->next = sent->next; + } else { + epkt->sent_list = sent->next; + } + // Update unacked bytes + epkt->unacked_bytes -= sent->payload_len; + // DEBUG + // printf("ACK received for id=%u, unacked_bytes now=%u, payload_len=%u\n", + // ack_id, epkt->unacked_bytes, sent->payload_len); + // Update last acknowledged ID + if (id_compare(ack_id, epkt->last_acked_id) > 0) { + epkt->last_acked_id = ack_id; + } + // Update last received ACK ID + if (id_compare(ack_id, epkt->last_rx_ack_id) > 0) { + epkt->last_rx_ack_id = ack_id; + } + // Window may have opened + epkt->window_blocked = 0; + free(sent->data); + free(sent); + break; + } + prev = sent; + sent = sent->next; + } + } + } else if (hdr >= 0x10 && hdr <= 0x2F) { + // Retransmission request + uint8_t count = (hdr & 0x0F) + 1; + if (len >= count * 2 + 2) { + // Read IDs to retransmit + for (int i = 0; i < count; i++) { + uint16_t retransmit_id = (ptr[0] << 8) | ptr[1]; + ptr += 2; + len -= 2; + + // Add to retransmit queue + if (epkt->pending_retransmit_count < 32) { + epkt->pending_retransmit_ids[epkt->pending_retransmit_count++] = retransmit_id; + } + } + + // Read last delivered ID + uint16_t last_delivered = (ptr[0] << 8) | ptr[1]; + ptr += 2; + len -= 2; + + // Update our last delivered if newer + if (id_compare(last_delivered, epkt->last_delivered_id) > 0) { + epkt->last_delivered_id = last_delivered; + } + // Also update last_rx_ack_id (latest known received packet) + if (id_compare(last_delivered, epkt->last_rx_ack_id) > 0) { + epkt->last_rx_ack_id = last_delivered; + } + } + } + // Unknown hdr - skip? + } + + // Add to rx_list if has payload + if (has_payload && payload_len > 0) { + // Check for duplicate + rx_packet_t* current = epkt->rx_list; + rx_packet_t* prev = NULL; + while (current) { + int cmp = id_compare(current->id, id); + if (cmp == 0) { + // Duplicate, ignore + return 0; + } + if (cmp > 0) { + // Found insertion point + break; + } + prev = current; + current = current->next; + } + + // Create new rx_packet + rx_packet_t* new_pkt = malloc(sizeof(rx_packet_t)); + if (!new_pkt) return -1; + + new_pkt->id = id; + new_pkt->timestamp = timestamp; + new_pkt->data_len = payload_len; + new_pkt->data = malloc(payload_len); + if (!new_pkt->data) { + free(new_pkt); + return -1; + } + memcpy(new_pkt->data, payload, payload_len); + new_pkt->has_payload = 1; + + // Insert into sorted list + new_pkt->next = current; + if (prev) { + prev->next = new_pkt; + } else { + epkt->rx_list = new_pkt; + } + + // Add to pending ACKs + if (epkt->pending_ack_count < 32) { + epkt->pending_ack_ids[epkt->pending_ack_count] = id; + epkt->pending_ack_timestamps[epkt->pending_ack_count] = timestamp; + epkt->pending_ack_count++; + schedule_ack_timer(epkt); + } + + // Check for gaps and request retransmission + request_retransmission_for_gaps(epkt); + + // Move continuous sequence to output queue + uint16_t next_expected = epkt->last_delivered_id + 1; + rx_packet_t* rx = epkt->rx_list; + + while (rx && rx->id == next_expected) { + // DEBUG + // printf("Delivering packet id=%u to output queue\n", rx->id); + // Add to output queue + ll_entry_t* entry = queue_entry_new(rx->data_len); + if (entry) { + memcpy(ll_entry_data(entry), rx->data, rx->data_len); + queue_entry_put(epkt->output_queue, entry); + } + + // Update last delivered + epkt->last_delivered_id = next_expected; + next_expected++; + + // Remove from rx_list + epkt->rx_list = rx->next; + free(rx->data); + free(rx); + rx = epkt->rx_list; + } + } + + return 0; +} + +// ==================== Timer Callbacks ==================== + +static void tx_timer_callback(void* arg) { + epkt_t* epkt = (epkt_t*)arg; + if (!epkt) return; + + epkt->next_tx_timer = NULL; + tx_process(epkt); +} + +static void retransmit_timer_callback(void* arg) { + epkt_t* epkt = (epkt_t*)arg; + if (!epkt) return; + + epkt->retransmit_timer = NULL; + retransmit_check(epkt); +} + + + +static void request_retransmission_for_gaps(epkt_t* epkt) { + if (!epkt || !epkt->rx_list) return; + + // Find gaps in rx_list + uint16_t expected = epkt->last_delivered_id + 1; + rx_packet_t* current = epkt->rx_list; + + while (current) { + if (id_compare(current->id, expected) > 0) { + // Gap detected: missing packets between expected and current->id + uint16_t missing = expected; + while (id_compare(missing, current->id) < 0) { + if (epkt->pending_retransmit_count < 32) { + epkt->pending_retransmit_ids[epkt->pending_retransmit_count++] = missing; + } + missing++; + } + } + expected = current->id + 1; + current = current->next; + } +} + +static void schedule_ack_timer(epkt_t* epkt) { + if (!epkt) return; + + // If we have pending ACKs or retransmission requests, try to send them + if ((epkt->pending_ack_count > 0 || epkt->pending_retransmit_count > 0) && !epkt->next_tx_timer) { + // Try to send immediately (if bandwidth allows) + tx_process(epkt); + } +} + +// ==================== Queue Callbacks ==================== + +static void tx_queue_callback(ll_queue_t* q, ll_entry_t* entry, void* arg) { + (void)q; + (void)entry; + epkt_t* epkt = (epkt_t*)arg; + if (!epkt) return; + + // Start transmission process + tx_process(epkt); +} diff --git a/etcp.h b/etcp.h new file mode 100644 index 0000000..aa29bb2 --- /dev/null +++ b/etcp.h @@ -0,0 +1,164 @@ +// etcp.h - Extended Transmission Control Protocol +#ifndef ETCP_H +#define ETCP_H + +#include +#include +#include "ll_queue.h" + +#ifdef __cplusplus +extern "C" { +#endif + +// Forward declarations +typedef struct epkt epkt_t; + +// Callback type for sending packets via UDP +typedef void (*etcp_tx_callback_t)(epkt_t* epkt, uint8_t* pkt, uint16_t len, void* arg); + +// Main ETCP structure +struct epkt { + // Queues + ll_queue_t* tx_queue; // Queue of data to send + ll_queue_t* output_queue; // Output queue (reassembled data) + + // Received packets sorted linked list + struct rx_packet* rx_list; + + // Sent packets (for retransmission) + struct sent_packet* sent_list; + + // Metrics + uint16_t rtt_last; // Last RTT (timebase 0.1us) + uint16_t rtt_avg_10; // Average RTT last 10 packets + uint16_t rtt_avg_100; // Average RTT last 100 packets + uint16_t jitter; // Jitter (averaged) + uint16_t bandwidth; // Current bandwidth (bytes per timebase) + uint32_t bytes_sent_total; // Total bytes sent + uint16_t last_sent_timestamp; // Timestamp of last sent packet + uint32_t bytes_allowed; // Calculated bytes allowed to send + + // State + uint16_t next_tx_id; // Next ID for transmission + uint16_t last_rx_id; // Last received ID (for ACK) + uint16_t last_delivered_id; // Last delivered to output_queue ID + + // Timers + void* next_tx_timer; // Timer for next transmission + void* retransmit_timer; // Timer for retransmissions + + // Callback + etcp_tx_callback_t tx_callback; + void* tx_callback_arg; + + // RTT history for averaging + uint16_t rtt_history[100]; + uint8_t rtt_history_idx; + uint8_t rtt_history_count; + + // Pending ACKs + uint16_t pending_ack_ids[32]; + uint16_t pending_ack_timestamps[32]; + uint8_t pending_ack_count; + + // Pending retransmission requests + uint16_t pending_retransmit_ids[32]; + uint8_t pending_retransmit_count; + + // Window management + uint32_t unacked_bytes; // Number of bytes sent but not yet acknowledged + uint32_t window_size; // Current window size in bytes (calculated) + uint16_t last_acked_id; // Last acknowledged packet ID + uint16_t last_rx_ack_id; // Latest received ACK ID from receiver + uint16_t retrans_timer_period; // Current retransmission timer period (timebase) + uint16_t next_retrans_time; // Time of next retransmission check + uint8_t window_blocked; // Flag: transmission blocked by window limit +}; + +// API Functions + +/** + * @brief Initialize new ETCP instance + * @return Pointer to new instance or NULL on error + */ +epkt_t* etcp_init(void); + +/** + * @brief Free ETCP instance and all associated resources + * @param epkt Instance to free + */ +void etcp_free(epkt_t* epkt); + +/** + * @brief Set callback for sending packets via UDP + * @param epkt ETCP instance + * @param cb Callback function + * @param arg User argument passed to callback + */ +void etcp_set_callback(epkt_t* epkt, etcp_tx_callback_t cb, void* arg); + +/** + * @brief Process received UDP packet + * @param epkt ETCP instance + * @param pkt Packet data + * @param len Packet length + * @return 0 on success, -1 on error + */ +int etcp_rx_input(epkt_t* epkt, uint8_t* pkt, uint16_t len); + +/** + * @brief Get total number of packets waiting in transmission queues + * @param epkt ETCP instance + * @return Number of packets + */ +int etcp_tx_queue_size(epkt_t* epkt); + +/** + * @brief Put data into transmission queue + * @param epkt ETCP instance + * @param data Data to send + * @param len Data length + * @return 0 on success, -1 on error + */ +int etcp_tx_put(epkt_t* epkt, uint8_t* data, uint16_t len); + +/** + * @brief Get output queue for reading received data + * @param epkt ETCP instance + * @return Pointer to output queue (ll_queue_t*) + */ +ll_queue_t* etcp_get_output_queue(epkt_t* epkt); + +/** + * @brief Set bandwidth limit + * @param epkt ETCP instance + * @param bandwidth Bytes per timebase (0.1us) + */ +void etcp_set_bandwidth(epkt_t* epkt, uint16_t bandwidth); + +/** + * @brief Update window size based on current RTT and bandwidth + * @param epkt ETCP instance + * Window size = RTT * bandwidth * 2 (bytes in flight) + */ +void etcp_update_window(epkt_t* epkt); + +/** + * @brief Get current RTT + * @param epkt ETCP instance + * @return RTT in timebase units + */ +uint16_t etcp_get_rtt(epkt_t* epkt); + +/** + * @brief Get current jitter + * @param epkt ETCP instance + * @return Jitter in timebase units + */ +uint16_t etcp_get_jitter(epkt_t* epkt); + +#ifdef __cplusplus +} +#endif + +#endif // ETCP_H diff --git a/etcp.txt b/etcp.txt new file mode 100755 index 0000000..6f3e121 --- /dev/null +++ b/etcp.txt @@ -0,0 +1,55 @@ +etcp - extended transmission control protocol +Протокол для передачи-приёма, пободный TCP, реализованый отдельным модулем (etcp.c/h). +Задача протокола: +- передать пакеты через UDP (учитывая его особенности - потери, негарантированный порядок), восстанавливая порядок и потери. +- пакеты уже предварительно подогнаны под размер чтобы вмещались доп. заголовки и служебные фреймы. +- + + +На приёмной стороне создаются две очереди. +первая - сортированный linked-list - в нее добавляются принятые пакеты, но отсеиваются дубликаты и вставляются в нужное место. +и перемещаются в выходную очередь когда все нужные пакеты дошли. +при получении принятого пакета он сразу парсится, для всех hdr!=0 вызываем static upd_metric_for_transmitter(epkt*, buf*, size) - и он разгребает метрики, обновляя: +- rtt_last (roud-trip delay) для последнего пакета +- rtt average last 10 +- rtt average last 100 +- jitter как усредненное: jitter+=(abs(rtt_last_10-rtt_last)-jitter)*0.1f +- для retransmission request - если timestamp последней попытки передачи этого пакета больше rtt_last_10*1.2+jitter*2 то отправляем сейчас + +при отправке также ограничиваем полосу пропускания: суммируем сколько байт отправлено всего (uint32_t), обновляем timestamp и расчетное число байт которое может быть отправлено на момент этого timestamp. и формируем таймер для отправки следующего пакета. + +вторая - ll_queue - выходная осчередь с пакетами в строгом порядке (строгий инкремент по id). + +struct epkt* = etcp_init() - инициализирует новый instance и выделяет под него память +etcp_free(struct epkt*) +etcp_rx_input(struct epkt*,uint8_t* pkt, uint16_t len) - принятый пакет на обработку +rx_output - через механизм ll_queue, функция не нужна. + +новый пакет на передачу отправляется в очередь передачи ll_queue. используй callback для обработки очереди. +для передачи сформированных пакетов по udp: +etcp_set_callback(epkt*, &cbk) -> etcp_tx_output(struct epkt*,uint8_t* pkt, uint16_t len) - callback в управляющей структуре (отправка пакета в udp сокет) + +внутренняя структура передачи: +при готовности отправить очередной пакет из очереди ll_queue пакеты перемещаются в linked_list (как отправленные но неподтвержденные), и освобождаются при получении подтверждения (ack). +int etcp_tx_queue_size(epkt*) - должна быть функция которая возвращает общее кол-во пакетов на передачу в очередях (входящей и рабочей) + + + +Формат udp пакета: + + [ metrics] + +id - uint16_t циклический порядковый номер пакета (при передаче следующего пакета инкрементируется). при ретрансмиссии передается с этим же id и payload, но с обновленными остальными полями +timestamp - uint16_t текущий timestamp (циклическое, 16 бит, timebase = 0.1mS) +hdr - 1 байт: + +payload - передаваемые полезные данные +metrics - опциональное поле для передачи служебных фреймов (ack, retransmission request, statistic reply) + +hdr: + 0x00 - hdr для payload (от следующего байта до конца пакета) + 0x01 - hdr для отчета о timestamp (время приёма) принятого пакета, 4 байта: - передаётся при очередной передаче пакета, для формирования статичтики на приёмной стороне. отчеты передаются для всех новых принятых пакетов с момента последней передачи. т.е. накапливаем timestamp-ы и передаём их. если пакет потерялся - не страшно. + 0x10-0x2f - hdr для перезапроса пакетов (передачу каких пакетов надо повторить. значение определяет количество записей (номеров пакетов) от 1 до 32, если больше - 32 самых старых), далее по 2 байта идут ID пакетов. и в конце - 2 байта номер последнего пакета который ушел в выходную очередь (т.е. последний номер для успешно собранной цепочки) + если что-то еще надо можно добавить. + +Если данных нет (очередь на передачу пустая) и нужно передать только метрику, то передаётся пакет с id=0 и без . на приёмной стороне он определяется по отсутствию записи с hdr=0 diff --git a/etcp_plan.txt b/etcp_plan.txt new file mode 100644 index 0000000..2480b69 --- /dev/null +++ b/etcp_plan.txt @@ -0,0 +1,273 @@ +# План реализации ETCP (Extended Transmission Control Protocol) + +## Обзор +Протокол поверх UDP с восстановлением порядка, повторными передачами, метриками RTT/jitter и ограничением полосы. Использует две очереди: сортированный linked-list для принятых пакетов и ll_queue для выходных данных. + +## Структуры данных + +### struct epkt (etcp.h) +```c +typedef struct epkt epkt_t; + +struct epkt { + // Очереди + ll_queue_t* tx_queue; // Очередь пакетов на передачу + ll_queue_t* output_queue; // Выходная очередь (собранные данные) + struct rx_packet* rx_list; // Сортированный linked-list принятых пакетов + + // Метрики + uint16_t rtt_last; // Последний RTT (timebase 0.1ms) + uint16_t rtt_avg_10; // Среднее RTT за последние 10 пакетов + uint16_t rtt_avg_100; // Среднее RTT за последние 100 пакетов + uint16_t jitter; // Jitter (усреднённый) + uint16_t bandwidth; // Текущая полоса пропускания (байт/таймбазу) + uint32_t bytes_sent_total; // Всего отправлено байт + uint16_t last_sent_timestamp; // Timestamp последней отправки + uint32_t bytes_allowed; // Расчётное число байт, которое можно отправить + + // Состояние + uint16_t next_tx_id; // Следующий ID для передачи + uint16_t last_rx_id; // Последний принятый ID (для ACK) + uint16_t last_delivered_id; // Последний доставленный в output_queue ID + + // Таймеры + void* next_tx_timer; // Таймер следующей передачи + void* retransmit_timer; // Таймер повторных передач + + // Callback'и + void (*tx_callback)(epkt_t*, uint8_t*, uint16_t); // Отправка в UDP + void* tx_callback_arg; + + // Буферы для метрик + uint16_t rtt_history[100]; // История RTT для усреднения + uint8_t rtt_history_idx; + uint8_t rtt_history_count; + + // Накопленные timestamp'ы для отчётов + uint16_t pending_ack_ids[32]; // ID пакетов, для которых нужно отправить ACK + uint16_t pending_ack_timestamps[32]; // Соответствующие timestamp'ы + uint8_t pending_ack_count; +}; +``` + +### struct rx_packet (внутренняя) +```c +struct rx_packet { + struct rx_packet* next; + uint16_t id; + uint16_t timestamp; + uint8_t* data; + uint16_t data_len; +// uint8_t has_payload; // 1 если содержит payload (hdr=0) - пакеты без payload сюда не попадаютю просто парсим сразу метрици и всё. +}; +``` + +## Формат пакета +``` + [ ]* +``` + +### Заголовки (hdr): +- `0x00`: payload (данные начинаются со следующего байта) +- `0x01`: отчёт о timestamp принятого пакета (4 байта: id:2 + timestamp:2) +- `0x10`-`0x2F`: перезапрос пакетов + ACK: + - hdr & 0x0F = количество записей (1-32) + - Далее N*2 байт: ID пакетов для повторной передачи + - Последние 2 байта: номер последнего доставленного пакета (ACK) + +### Пакет только с метриками: +Если очередь передачи пуста, отправляется пакет с id=0 и без hdr=0+payload. + +## Алгоритмы + +### 1. Инициализация +- Создать очереди tx_queue и output_queue через queue_new() +- Инициализировать метрики нулями +- Установить bandwidth по умолчанию (например, 100000 байт/таймбазу) + +### 2. Приём пакетов (etcp_rx_input) +1. Парсинг: + - Читаем id, timestamp + - Пока есть данные, читаем hdr: + - hdr=0x00: запоминаем payload + - hdr=0x01: обрабатываем отчёт о timestamp (обновляем метрики) + - hdr=0x10-0x2F: обрабатываем перезапрос (добавляем ID в очередь повторной передачи) +2. Для пакетов с hdr!=0 вызываем upd_metric_for_transmitter: + - rtt_last = текущее время - timestamp + - Обновляем rtt_avg_10 и rtt_avg_100 (скользящее среднее) + - jitter += (abs(rtt_avg_10 - rtt_last) - jitter) * 0.1 +3. Добавляем пакет в сортированный rx_list: + - Ищем позицию по id (сравнение через (int16_t)(id1-id2)) + - Пропускаем дубликаты + - Вставляем в нужное место +4. Перемещаем непрерывную последовательность в output_queue: + - Начиная с last_delivered_id+1, проверяем наличие пакетов в rx_list + - При нахождении непрерывной цепочки перемещаем payload в output_queue + - Освобождаем rx_packet структуры +5. Накопление ACK: + - Для каждого принятого пакета сохраняем id и timestamp в pending_ack_* + - При следующей отправке включаем эти ACK в пакет + +### 3. Передача пакетов +1. Очередь tx_queue содержит данные для отправки +2. Функция tx_process вызывается по таймеру или при добавлении в пустую очередь: + - Проверяем ограничение полосы: + - delta_time = текущее_время - last_sent_timestamp + - bytes_allowed += delta_time * bandwidth + - Если bytes_allowed < размер_пакета, планируем таймер и выходим + - Формируем пакет: + - Базовый заголовок: next_tx_id++, текущий timestamp + - Добавляем pending_ack (hdr=0x01 для каждого) + - Добавляем перезапросы если нужно (на основе метрик) + - Добавляем payload из tx_queue (hdr=0x00) + - Если payload нет и есть метрики - отправляем пакет с id=0 + - Отправляем через tx_callback + - Обновляем bytes_sent_total, last_sent_timestamp, bytes_allowed + - Сохраняем пакет в список отправленных (для возможной ретрансмиссии) +3. Повторные передачи: + - Для каждого отправленного пакета отслеживаем время отправки + - Если (текущее_время - время_отправки) > rtt_avg_10*1.2 + jitter*2 + - Добавляем ID в очередь повторной передачи + +### 4. Ограничение полосы +- bandwidth: константа (байт/таймбазу), timebase = 0.1ms +- При инициализации: bytes_allowed = 0, last_sent_timestamp = текущее_время +- При отправке: + - current_time = uasync время + - delta = (int16_t)(current_time - last_sent_timestamp) (циклическое) + - bytes_allowed += delta * bandwidth + - Если bytes_allowed >= размер_пакета: + - bytes_allowed -= размер_пакета + - last_sent_timestamp = current_time + - Отправляем пакет + - Иначе: + - wait_time = (размер_пакета - bytes_allowed) / bandwidth + - Устанавливаем таймер на wait_time + +## API функции + +### Основные: +```c +epkt_t* etcp_init(void); +void etcp_free(epkt_t* epkt); +void etcp_set_callback(epkt_t* epkt, void (*cb)(epkt_t*, uint8_t*, uint16_t), void* arg); +int etcp_rx_input(epkt_t* epkt, uint8_t* pkt, uint16_t len); +int etcp_tx_queue_size(epkt_t* epkt); +``` + +### Вспомогательные: +```c +void etcp_set_bandwidth(epkt_t* epkt, uint16_t bandwidth); +uint16_t etcp_get_rtt(epkt_t* epkt); +uint16_t etcp_get_jitter(epkt_t* epkt); +ll_queue_t* etcp_get_output_queue(epkt_t* epkt); // Для извлечения данных +int etcp_tx_put(epkt_t* epkt, uint8_t* data, uint16_t len); // Добавить данные на передачу +``` + +## Интеграция с проектом + +### Зависимости: +- `ll_queue.c/.h` - для очередей +- `u_async.h` - для таймеров +- `stdint.h`, `stdlib.h`, `string.h` - стандартные библиотеки + +### Таймеры: +- Использовать `uasync_set_timeout` для: + - Планирования следующей передачи (при ограничении полосы) + - Повторных передач + - Очистки старых пакетов в rx_list +- Все callback'и должны быть быстрыми, не блокирующими + +### Обработка циклических значений: +- ID: uint16_t, сравнение через `(int16_t)(a - b)` +- Timestamp: uint16_t, timebase 0.1us, сравнение аналогично +- При вычислении дельты времени учитывать переполнение + +## План тестирования + +1. **Unit-тесты для парсинга:** + - Корректность разбора различных hdr + - Обработка циклических ID + +2. **Тесты очередей:** + - Сортировка в rx_list + - Перемещение в output_queue + - Обработка дубликатов + +3. **Тесты метрик:** + - Расчет RTT (скользящее среднее) + - Расчет jitter + - Обновление bandwidth + +4. **Интеграционный тест:** + - Два экземпляра ETCP, обмен данными через эмуляцию UDP + - Проверка восстановления порядка при потере пакетов + - Проверка ограничения полосы + +5. **Тест производительности:** + - Минимальные задержки + - Корректность работы при высокой нагрузке + +## Последовательность реализации + +1. Создать etcp.h с определениями структур и API +2. Реализовать etcp.c в следующем порядке: + a) Базовая структура и init/free + b) Внутренние функции для работы с rx_list + c) Парсинг пакетов (etcp_rx_input) + d) Функции метрик (upd_metric_for_transmitter) + e) Механизм передачи (tx_process, ограничение полосы) + f) Повторные передачи + g) Интеграция с таймерами u_async +3. Создать тестовую программу test_etcp.c +4. Протестировать, исправить ошибки +5. Интегрировать в Makefile + +## Риски и неопределённости + +1. **Переполнение буферов:** Нужны лиматиры на размер rx_list и pending_ack +3. **Производительность:** Сортированный linked-list может быть медленным при большом количестве пакетов. Возможно, нужна оптимизация. - не должно быть много пакетов в списке перезапросов +4. **Интеграция с существующим кодом:** Проверить совместимость стиля кодирования и соглашений об именовании. + +## Дополнительные вопросы + +1. Нужны ли callback'и для событий (доставка данных, изменение метрик)? - нет, но нужна структура из которой эти метрики можно считывать. +2. Как обрабатывать очень старые пакеты в rx_list (таймаут)? - никак. надо сделать reset_connection - он обнуляет все очереди, метрики итд. +3. Как определять начальный bandwidth и адаптировать его? - пока константа. адаптировать позже. + +## Статус реализации (13.01.2026) + +### Реализовано: +1. Структуры epkt, rx_packet, sent_packet +2. API функции: etcp_init, etcp_free, etcp_set_callback, etcp_rx_input, etcp_tx_queue_size, etcp_tx_put, etcp_get_output_queue, etcp_set_bandwidth, etcp_get_rtt, etcp_get_jitter +3. Парсинг пакетов с поддержкой заголовков 0x00 (payload), 0x01 (timestamp report), 0x10-0x2F (retransmission request) +4. Сортированный rx_list с учётом циклических ID (сравнение через (int16_t)(id1-id2)) +5. Перемещение непрерывной последовательности пакетов в output_queue +6. Ограничение полосы пропускания (bandwidth, bytes_allowed) +7. Отправка пакетов с данными и метриками, включая пакеты только с метриками (id=0) +8. Накопление ACK для полученных пакетов и отправка их в следующий пакет +9. Повторные передачи на основе RTT и jitter +10. Обновление метрик RTT (скользящее среднее за 10 и 100 пакетов) и jitter +11. Интеграция с u_async для таймеров передачи и проверки повторных отправок +12. Unit-тесты, покрывающие базовую функциональность (инициализация, передача, приём, реordering) + +### Особенности реализации: +- Timebase: 0.1ms (совместимость с u_async) +- Bandwidth по умолчанию: 10000 байт/таймбазу +- Максимальное количество pending ACK: 32 +- Максимальное количество pending retransmit: 32 +- RTT история: 100 измерений + +### Ограничения и упрощения: +- Отсутствует таймаут для старых пакетов в rx_list +- Отсутствует адаптация bandwidth +- get_current_timestamp использует статический счётчик (для тестов) +- Нет обработки reset_connection +- Нет защиты от переполнения буферов при очень большом количестве пакетов + +### Планируемые улучшения: +1. Реализация reset_connection для сброса состояния +2. Использование системного времени для get_current_timestamp +3. Добавление таймаута для rx_list +4. Оптимизация производительности при большом количестве пакетов +5. Адаптация bandwidth на основе метрик diff --git a/tests/simple_uasync.c b/tests/simple_uasync.c new file mode 100644 index 0000000..7db59ff --- /dev/null +++ b/tests/simple_uasync.c @@ -0,0 +1,145 @@ +// simple_uasync.c - Minimal uasync implementation for tests +#include "u_async.h" +#include +#include +#include + +// Timer entry +typedef struct timer_entry { + void* id; + uint32_t expiry_time; + timeout_callback_t callback; + void* user_arg; + struct timer_entry* next; +} timer_entry_t; + +// Socket entry (not used in ETCP tests) +typedef struct socket_entry { + void* id; + int fd; + socket_callback_t read_cbk; + socket_callback_t write_cbk; + socket_callback_t except_cbk; + void* user_arg; + struct socket_entry* next; +} socket_entry_t; + +// Global state +static timer_entry_t* timer_list = NULL; +static socket_entry_t* socket_list = NULL; +static uint32_t current_time = 0; +static void* next_id = (void*)1; + +// Generate unique ID +static void* generate_id(void) { + void* id = next_id; + next_id = (void*)((uintptr_t)next_id + 1); + return id; +} + +// Initialize - does nothing in mock +void uasync_init(void) { + current_time = 0; +} + +// Mainloop - not used in tests +void uasync_mainloop(void) { + // Should not be called in tests + while (1) {} +} + +// Set timeout +void* uasync_set_timeout(int timeout_tb, void* user_arg, timeout_callback_t callback) { + timer_entry_t* timer = malloc(sizeof(timer_entry_t)); + if (!timer) return NULL; + + timer->id = generate_id(); + timer->expiry_time = current_time + (uint32_t)timeout_tb; + timer->callback = callback; + timer->user_arg = user_arg; + timer->next = timer_list; + timer_list = timer; + + return timer->id; +} + +// Cancel timeout +err_t uasync_cancel_timeout(void* t_id) { + timer_entry_t** pp = &timer_list; + while (*pp) { + if ((*pp)->id == t_id) { + timer_entry_t* to_free = *pp; + *pp = to_free->next; + free(to_free); + return ERR_OK; + } + pp = &(*pp)->next; + } + return ERR_FAIL; +} + +// Add socket (not implemented) +void* uasync_add_socket(int fd, socket_callback_t read_cbk, + socket_callback_t write_cbk, + socket_callback_t except_cbk, + void* user_arg) { + (void)fd; (void)read_cbk; (void)write_cbk; (void)except_cbk; (void)user_arg; + return NULL; +} + +// Remove socket (not implemented) +err_t uasync_remove_socket(void* s_id) { + (void)s_id; + return ERR_FAIL; +} + +// Test helper: advance time and process expired timers +void simple_uasync_advance_time(uint32_t delta_tb) { + current_time += delta_tb; + + // Process all expired timers + while (1) { + timer_entry_t* earliest = NULL; + timer_entry_t** earliest_pp = NULL; + uint32_t earliest_time = UINT32_MAX; + + // Find earliest expired timer + timer_entry_t** pp = &timer_list; + while (*pp) { + if ((*pp)->expiry_time <= current_time && (*pp)->expiry_time < earliest_time) { + earliest = *pp; + earliest_pp = pp; + earliest_time = (*pp)->expiry_time; + } + pp = &(*pp)->next; + } + + if (!earliest) break; + + // Remove from list + *earliest_pp = earliest->next; + + // Call callback + timeout_callback_t cb = earliest->callback; + void* arg = earliest->user_arg; + free(earliest); + + if (cb) { + cb(arg); + } + } +} + +// Test helper: get current time +uint32_t simple_uasync_get_time(void) { + return current_time; +} + +// Test helper: clear all timers +void simple_uasync_clear(void) { + while (timer_list) { + timer_entry_t* next = timer_list->next; + free(timer_list); + timer_list = next; + } +} \ No newline at end of file diff --git a/tests/simple_uasync.h b/tests/simple_uasync.h new file mode 100644 index 0000000..33e5191 --- /dev/null +++ b/tests/simple_uasync.h @@ -0,0 +1,19 @@ +// simple_uasync.h - Test helpers for uasync mock +#ifndef SIMPLE_UASYNC_H +#define SIMPLE_UASYNC_H + +#include + +// These functions are only available when linking with simple_uasync.o +// instead of u_async.o + +// Advance time by delta_tb timebase units and process expired timers +void simple_uasync_advance_time(uint32_t delta_tb); + +// Get current time in timebase units +uint32_t simple_uasync_get_time(void); + +// Clear all pending timers +void simple_uasync_clear(void); + +#endif // SIMPLE_UASYNC_H \ No newline at end of file diff --git a/tests/test_etcp.c b/tests/test_etcp.c new file mode 100644 index 0000000..01d3fff --- /dev/null +++ b/tests/test_etcp.c @@ -0,0 +1,274 @@ +// test_etcp.c - Unit tests for ETCP protocol +#include "etcp.h" +#include "u_async.h" +#include "ll_queue.h" +#include +#include +#include +#include +#include + +#define TEST_ASSERT(cond, msg) \ + do { \ + if (!(cond)) { \ + printf("FAIL: %s (line %d)\n", msg, __LINE__); \ + return 1; \ + } else { \ + printf("PASS: %s\n", msg); \ + } \ + } while(0) + +// Mock callback storage +typedef struct { + uint8_t* data; + uint16_t len; + epkt_t* epkt; +} mock_packet_t; + +#define MAX_MOCK_PACKETS 100 +static mock_packet_t mock_packets[MAX_MOCK_PACKETS]; +static int mock_packet_count = 0; + +static void reset_mock_packets(void) { + for (int i = 0; i < mock_packet_count; i++) { + free(mock_packets[i].data); + mock_packets[i].data = NULL; + } + mock_packet_count = 0; +} + +static void mock_tx_callback(epkt_t* epkt, uint8_t* data, uint16_t len, void* arg) { + (void)arg; + assert(mock_packet_count < MAX_MOCK_PACKETS); + mock_packets[mock_packet_count].data = malloc(len); + assert(mock_packets[mock_packet_count].data); + memcpy(mock_packets[mock_packet_count].data, data, len); + mock_packets[mock_packet_count].len = len; + mock_packets[mock_packet_count].epkt = epkt; + mock_packet_count++; +} + + + +// Test 1: Basic initialization and cleanup +int test_init_free(void) { + printf("\n=== Test 1: Initialization and cleanup ===\n"); + + epkt_t* epkt = etcp_init(); + TEST_ASSERT(epkt != NULL, "etcp_init returns non-NULL"); + TEST_ASSERT(epkt->tx_queue != NULL, "tx_queue created"); + TEST_ASSERT(epkt->output_queue != NULL, "output_queue created"); + + etcp_free(epkt); + TEST_ASSERT(1, "etcp_free completes without crash"); + + return 0; +} + +// Test 2: Set callback +int test_set_callback(void) { + printf("\n=== Test 2: Set callback ===\n"); + + epkt_t* epkt = etcp_init(); + TEST_ASSERT(epkt != NULL, "etcp_init"); + + etcp_set_callback(epkt, mock_tx_callback, NULL); + // Callback set, no easy way to verify except through tx + etcp_free(epkt); + + return 0; +} + +// Test 3: Put data into tx queue +int test_tx_put(void) { + printf("\n=== Test 3: TX queue put ===\n"); + + epkt_t* epkt = etcp_init(); + TEST_ASSERT(epkt != NULL, "etcp_init"); + + uint8_t test_data[] = {0x01, 0x02, 0x03, 0x04, 0x05}; + int result = etcp_tx_put(epkt, test_data, sizeof(test_data)); + TEST_ASSERT(result == 0, "etcp_tx_put succeeds"); + + int queue_size = etcp_tx_queue_size(epkt); + TEST_ASSERT(queue_size == 1, "tx queue size is 1"); + + etcp_free(epkt); + + return 0; +} + +// Test 4: Simple packet transmission (without bandwidth limit) +int test_simple_tx(void) { + printf("\n=== Test 4: Simple transmission ===\n"); + + reset_mock_packets(); + + epkt_t* epkt = etcp_init(); + TEST_ASSERT(epkt != NULL, "etcp_init"); + + // Set high bandwidth to avoid limiting + etcp_set_bandwidth(epkt, 65535); + + etcp_set_callback(epkt, mock_tx_callback, NULL); + + uint8_t test_data[] = "Hello ETCP!"; + int result = etcp_tx_put(epkt, test_data, sizeof(test_data)); + TEST_ASSERT(result == 0, "etcp_tx_put succeeds"); + + // Transmission may happen via queue callback + // We can't easily verify transmission in this simple test + // Just ensure no crash occurred + TEST_ASSERT(etcp_tx_queue_size(epkt) >= 0, "queue size non-negative"); + + etcp_free(epkt); + + return 0; +} + +// Test 5: Packet parsing (rx_input) +int test_rx_input(void) { + printf("\n=== Test 5: RX input parsing ===\n"); + + epkt_t* epkt = etcp_init(); + TEST_ASSERT(epkt != NULL, "etcp_init"); + + // Create a simple packet: id=1, timestamp=100, hdr=0, payload "test" + uint8_t packet[] = { + 0x00, 0x01, // id = 1 + 0x00, 0x64, // timestamp = 100 + 0x00, // hdr = 0 (payload) + 't', 'e', 's', 't' + }; + + int result = etcp_rx_input(epkt, packet, sizeof(packet)); + TEST_ASSERT(result == 0, "etcp_rx_input succeeds"); + + // Check that output queue has the data + ll_queue_t* output = etcp_get_output_queue(epkt); + TEST_ASSERT(output != NULL, "output queue exists"); + + int output_count = queue_entry_count(output); + TEST_ASSERT(output_count == 1, "output queue has 1 packet"); + + // Verify payload + ll_entry_t* entry = queue_entry_get(output); + TEST_ASSERT(entry != NULL, "got entry from output queue"); + + uint8_t* data = ll_entry_data(entry); + size_t data_len = ll_entry_size(entry); + TEST_ASSERT(data_len == 4, "payload length is 4"); + TEST_ASSERT(memcmp(data, "test", 4) == 0, "payload matches"); + + queue_entry_free(entry); + + etcp_free(epkt); + + return 0; +} + +// Test 6: Packet reordering +int test_reordering(void) { + printf("\n=== Test 6: Packet reordering ===\n"); + + epkt_t* epkt = etcp_init(); + TEST_ASSERT(epkt != NULL, "etcp_init"); + + // Create packets with IDs 1, 2, 3 + uint8_t packet1[] = { + 0x00, 0x01, // id = 1 + 0x00, 0x10, // timestamp + 0x00, // hdr = 0 + 'a' + }; + + uint8_t packet2[] = { + 0x00, 0x02, // id = 2 + 0x00, 0x20, // timestamp + 0x00, // hdr = 0 + 'b' + }; + + uint8_t packet3[] = { + 0x00, 0x03, // id = 3 + 0x00, 0x30, // timestamp + 0x00, // hdr = 0 + 'c' + }; + + // Receive in wrong order: 2, 1, 3 + etcp_rx_input(epkt, packet2, sizeof(packet2)); + etcp_rx_input(epkt, packet1, sizeof(packet1)); + etcp_rx_input(epkt, packet3, sizeof(packet3)); + + // Check output queue - should have all 3 packets in correct order + ll_queue_t* output = etcp_get_output_queue(epkt); + TEST_ASSERT(queue_entry_count(output) == 3, "all 3 packets in output"); + + // Verify order: 1, 2, 3 + ll_entry_t* entry; + char expected[] = {'a', 'b', 'c'}; + int idx = 0; + + while ((entry = queue_entry_get(output)) != NULL) { + uint8_t* data = ll_entry_data(entry); + size_t len = ll_entry_size(entry); + TEST_ASSERT(len == 1, "payload length 1"); + TEST_ASSERT(data[0] == expected[idx], "correct packet order"); + idx++; + queue_entry_free(entry); + } + + TEST_ASSERT(idx == 3, "all packets processed"); + + etcp_free(epkt); + + return 0; +} + +// Test 7: Metrics update +int test_metrics(void) { + printf("\n=== Test 7: Metrics ===\n"); + + epkt_t* epkt = etcp_init(); + TEST_ASSERT(epkt != NULL, "etcp_init"); + + // Initial metrics should be zero + TEST_ASSERT(etcp_get_rtt(epkt) == 0, "initial RTT is 0"); + TEST_ASSERT(etcp_get_jitter(epkt) == 0, "initial jitter is 0"); + + // Send a packet with ACK to update metrics + // This requires more complex setup with round-trip + + etcp_free(epkt); + + return 0; +} + +int main(void) { + printf("Starting ETCP tests...\n"); + + // Initialize uasync for timers + uasync_init(); + + int failures = 0; + + failures += test_init_free(); + failures += test_set_callback(); + failures += test_tx_put(); + failures += test_simple_tx(); + failures += test_rx_input(); + failures += test_reordering(); + failures += test_metrics(); + + printf("\n=== Summary ===\n"); + if (failures == 0) { + printf("All tests passed!\n"); + } else { + printf("%d test(s) failed.\n", failures); + } + + reset_mock_packets(); + + return failures == 0 ? 0 : 1; +} \ No newline at end of file diff --git a/tests/test_etcp_simple.c b/tests/test_etcp_simple.c new file mode 100644 index 0000000..e716d70 --- /dev/null +++ b/tests/test_etcp_simple.c @@ -0,0 +1,116 @@ +// test_etcp_simple.c - Simple test to verify ETCP sender-receiver communication +#include "etcp.h" +#include "u_async.h" +#include "ll_queue.h" +#include "simple_uasync.h" +#include +#include +#include +#include + +// Sender's TX callback - just forward to receiver +static void sender_tx_callback(epkt_t* epkt, uint8_t* data, uint16_t len, void* arg) { + (void)epkt; + epkt_t* receiver = (epkt_t*)arg; + printf("Sender TX callback: sending %u bytes to receiver\n", len); + + // Forward directly to receiver (no loss, no delay) + etcp_rx_input(receiver, data, len); +} + +// Receiver's TX callback - would send ACKs back to sender in real scenario +static void receiver_tx_callback(epkt_t* epkt, uint8_t* data, uint16_t len, void* arg) { + (void)epkt; + (void)data; + (void)len; + (void)arg; + printf("Receiver TX callback: ACK sent back\n"); +} + +int main(void) { + printf("Simple ETCP sender-receiver test\n"); + + // Initialize uasync (mock) + uasync_init(); + + // Create ETCP instances + epkt_t* sender = etcp_init(); + epkt_t* receiver = etcp_init(); + + if (!sender || !receiver) { + printf("ERROR: Failed to create ETCP instances\n"); + return 1; + } + + // Set up callbacks + etcp_set_callback(sender, sender_tx_callback, receiver); + etcp_set_callback(receiver, receiver_tx_callback, sender); + + // Send a simple packet + const char* test_data = "Hello, ETCP!"; + uint16_t data_len = strlen(test_data); + uint8_t* data = malloc(data_len); + memcpy(data, test_data, data_len); + + printf("Sending test packet: %s\n", test_data); + + if (etcp_tx_put(sender, data, data_len) != 0) { + printf("ERROR: Failed to queue packet\n"); + free(data); + etcp_free(sender); + etcp_free(receiver); + return 1; + } + + free(data); // etcp_tx_put makes its own copy + + // Advance time to allow transmission + printf("Advancing time...\n"); + for (int i = 0; i < 10; i++) { + simple_uasync_advance_time(10); // 1ms each + // Process any timers (retransmissions, etc.) + } + + // Check receiver's output queue + ll_queue_t* output_queue = etcp_get_output_queue(receiver); + if (!output_queue) { + printf("ERROR: Receiver output queue is NULL\n"); + etcp_free(sender); + etcp_free(receiver); + return 1; + } + + ll_entry_t* entry = queue_entry_get(output_queue); + if (!entry) { + printf("FAIL: No packet in receiver output queue\n"); + + // Debug: check queue size + printf("Queue size check: %d\n", queue_entry_count(output_queue)); + + etcp_free(sender); + etcp_free(receiver); + return 1; + } + + uint8_t* received_data = ll_entry_data(entry); + uint16_t received_len = ll_entry_size(entry); + + printf("SUCCESS: Received packet of length %u\n", received_len); + printf("Data: "); + for (uint16_t i = 0; i < received_len; i++) { + printf("%c", received_data[i]); + } + printf("\n"); + + if (received_len == data_len && memcmp(received_data, test_data, data_len) == 0) { + printf("PASS: Data matches!\n"); + } else { + printf("FAIL: Data doesn't match\n"); + } + + queue_entry_free(entry); + etcp_free(sender); + etcp_free(receiver); + + return 0; +} \ No newline at end of file diff --git a/tests/test_etcp_stress.c b/tests/test_etcp_stress.c new file mode 100644 index 0000000..71e0b10 --- /dev/null +++ b/tests/test_etcp_stress.c @@ -0,0 +1,419 @@ +// test_etcp_stress.c - Stress test for ETCP with packet loss, delay, and reordering +#include "etcp.h" +#include "u_async.h" +#include "ll_queue.h" +#include "simple_uasync.h" +#include +#include +#include +#include +#include +#include + +#define NUM_PACKETS 1000 +#define MIN_PACKET_SIZE 1 +#define MAX_PACKET_SIZE 1300 +#define LOSS_PROBABILITY 0.0 // 0% packet loss for window testing +#define REORDER_PROBABILITY 0.0 // 0% reordering for testing +#define QUEUE_MAX_SIZE 1000 // Max packets in delay queue +#define MAX_DELAY_MS 0 // No delay for testing +#define TIME_BASE_MS 0.1 // uasync timebase is 0.1ms + +// Packet in the network delay queue +typedef struct delayed_packet { + uint8_t* data; + uint16_t len; + uint32_t delivery_time; // When to deliver (in timebase units) + struct delayed_packet* next; +} delayed_packet_t; + +// Network emulator structure +typedef struct { + epkt_t* sender; // ETCP instance that sends + epkt_t* receiver; // ETCP instance that receives + delayed_packet_t* queue; // Delay queue (sorted by delivery time) + int queue_size; + uint32_t current_time; // Current time in timebase units + uint32_t packets_sent; + uint32_t packets_lost; + uint32_t packets_reordered; + uint32_t packets_delivered; + uint8_t running; + void* timer_id; +} network_emulator_t; + +// Forward declarations +static void network_timer_callback(void* arg); +static void sender_tx_callback(epkt_t* epkt, uint8_t* data, uint16_t len, void* arg); +static void receiver_tx_callback(epkt_t* epkt, uint8_t* data, uint16_t len, void* arg); +static void deliver_packets(network_emulator_t* net); +static void free_delay_queue(delayed_packet_t* queue); + +// Random number generator (simple LCG) +static uint32_t random_state = 123456789; +static uint32_t random_next(void) { + random_state = random_state * 1103515245 + 12345; + return random_state; +} + +static double random_double(void) { + return (double)random_next() / (double)UINT32_MAX; +} + +// Network timer callback - advances time and delivers packets +static void network_timer_callback(void* arg) { + network_emulator_t* net = (network_emulator_t*)arg; + if (!net || !net->running) return; + + // Advance time by 1ms (10 timebase units) + net->current_time += 10; + + // Deliver any packets whose time has come + deliver_packets(net); + + // Reschedule timer if still running + if (net->running) { + net->timer_id = uasync_set_timeout(10, net, network_timer_callback); + } +} + +// Sender's TX callback - called when ETCP wants to send a packet +static void sender_tx_callback(epkt_t* epkt, uint8_t* data, uint16_t len, void* arg) { + (void)epkt; + network_emulator_t* net = (network_emulator_t*)arg; + if (!net || !net->running) return; + + net->packets_sent++; + + // 10% packet loss - just ignore the packet, ETCP will free the data + if (random_double() < LOSS_PROBABILITY) { + net->packets_lost++; + return; + } + + // Calculate delivery time (current time + random delay up to MAX_DELAY_MS) + uint32_t delay_ms = (uint32_t)(random_double() * MAX_DELAY_MS); + uint32_t delivery_time = net->current_time + (delay_ms * 10); // Convert ms to timebase + + // Create delayed packet - need to copy data since ETCP owns the original + delayed_packet_t* pkt = malloc(sizeof(delayed_packet_t)); + if (!pkt) { + return; // Memory allocation failed, packet is lost + } + + pkt->data = malloc(len); + if (!pkt->data) { + free(pkt); + net->packets_lost++; // Count as loss due to memory failure + return; + } + + memcpy(pkt->data, data, len); + pkt->len = len; + pkt->delivery_time = delivery_time; + pkt->next = NULL; + + // Insert into delay queue + delayed_packet_t** pp = &net->queue; + + // 30% chance to insert at random position (reordering) + if (random_double() < REORDER_PROBABILITY && net->queue_size > 1) { + net->packets_reordered++; + int insert_pos = random_next() % (net->queue_size + 1); + for (int i = 0; i < insert_pos && *pp; i++) { + pp = &(*pp)->next; + } + } else { + // Normal insertion (sorted by delivery time) + while (*pp && (*pp)->delivery_time < delivery_time) { + pp = &(*pp)->next; + } + } + + pkt->next = *pp; + *pp = pkt; + net->queue_size++; + + // Limit queue size (drop first packet if needed) + if (net->queue_size > QUEUE_MAX_SIZE) { + delayed_packet_t* first = net->queue; + if (first) { + net->queue = first->next; + free(first->data); + free(first); + net->queue_size--; + net->packets_lost++; // Count as loss due to queue overflow + } + } +} + +// Receiver's TX callback - called when receiver wants to send ACKs or retransmission requests +static void receiver_tx_callback(epkt_t* epkt, uint8_t* data, uint16_t len, void* arg) { + (void)epkt; + network_emulator_t* net = (network_emulator_t*)arg; + if (!net || !net->running) return; + + // Forward ACKs/retrans requests directly to sender with minimal delay (1 timebase unit) + // This allows sender to receive ACKs and retransmit lost packets + simple_uasync_advance_time(1); + net->current_time = simple_uasync_get_time(); + etcp_rx_input(net->sender, data, len); + + // Note: We don't track these in statistics since they're control packets +} + +// Deliver packets whose delivery time has arrived +static void deliver_packets(network_emulator_t* net) { + while (net->queue && net->queue->delivery_time <= net->current_time) { + delayed_packet_t* pkt = net->queue; + net->queue = pkt->next; + + // Deliver to receiver + etcp_rx_input(net->receiver, pkt->data, pkt->len); + net->packets_delivered++; + + free(pkt->data); + free(pkt); + net->queue_size--; + } +} + +// Free delay queue +static void free_delay_queue(delayed_packet_t* queue) { + while (queue) { + delayed_packet_t* next = queue->next; + free(queue->data); + free(queue); + queue = next; + } +} + +// Generate random packet data +static void generate_packet_data(uint8_t* buffer, uint16_t size, uint32_t seq) { + // Fill with pattern: sequence number + random data + for (uint16_t i = 0; i < size; i++) { + if (i < 4) { + // First 4 bytes: sequence number + buffer[i] = (seq >> (8 * i)) & 0xFF; + } else { + // Rest: pseudo-random data based on sequence and position + buffer[i] = (uint8_t)((seq * 7919 + i * 104729) % 256); + } + } +} + +// Verify received packet +static int verify_packet_data(const uint8_t* data, uint16_t size, uint32_t seq) { + if (size < 4) return 0; + + // Check sequence number + uint32_t received_seq = 0; + for (int i = 0; i < 4; i++) { + received_seq |= ((uint32_t)data[i]) << (8 * i); + } + + if (received_seq != seq) return 0; + + // Verify the rest of the data + for (uint16_t i = 4; i < size; i++) { + uint8_t expected = (uint8_t)((seq * 7919 + i * 104729) % 256); + if (data[i] != expected) return 0; + } + + return 1; +} + +// Stress test main function +int main(void) { + printf("Starting ETCP stress test...\n"); + printf("Parameters:\n"); + printf(" Packets: %d\n", NUM_PACKETS); + printf(" Size range: %d-%d bytes\n", MIN_PACKET_SIZE, MAX_PACKET_SIZE); + printf(" Loss probability: %.1f%%\n", LOSS_PROBABILITY * 100); + printf(" Reorder probability: %.1f%%\n", REORDER_PROBABILITY * 100); + printf(" Max delay: %d ms\n", MAX_DELAY_MS); + printf(" Queue size: %d packets\n", QUEUE_MAX_SIZE); + + // Seed random number generator + random_state = (uint32_t)time(NULL); + + // Initialize uasync + uasync_init(); + + // Create network emulator + network_emulator_t net = {0}; + net.running = 1; + net.current_time = 0; + + // Create ETCP instances + net.sender = etcp_init(); + net.receiver = etcp_init(); + + if (!net.sender || !net.receiver) { + printf("ERROR: Failed to create ETCP instances\n"); + return 1; + } + + // Set up callbacks + etcp_set_callback(net.sender, sender_tx_callback, &net); + etcp_set_callback(net.receiver, receiver_tx_callback, &net); + + // Set reasonable bandwidth for stress test + etcp_set_bandwidth(net.sender, 50000); // 50k bytes per timebase + etcp_set_bandwidth(net.receiver, 50000); + + // Don't start network timer - we'll advance time manually + // net.timer_id = uasync_set_timeout(10, &net, network_timer_callback); + + printf("\nGenerating and sending %d packets...\n", NUM_PACKETS); + + // Generate and send packets + uint32_t packets_generated = 0; + uint32_t bytes_generated = 0; + uint32_t last_time_print = 0; + + while (packets_generated < NUM_PACKETS) { + // Generate random packet size + uint16_t size = MIN_PACKET_SIZE + (random_next() % (MAX_PACKET_SIZE - MIN_PACKET_SIZE + 1)); + + // Allocate and fill packet data + uint8_t* data = malloc(size); + if (!data) { + printf("ERROR: Memory allocation failed\n"); + break; + } + + generate_packet_data(data, size, packets_generated); + + // Send via ETCP + if (etcp_tx_put(net.sender, data, size) != 0) { + printf("ERROR: Failed to queue packet %u\n", packets_generated); + free(data); + break; + } + + free(data); // etcp_tx_put makes its own copy + packets_generated++; + bytes_generated += size; + + // Periodically print progress + if (packets_generated % 1000 == 0) { + printf(" Sent %u packets, %u bytes\n", packets_generated, bytes_generated); + } + + // Advance time to allow ETCP to send packets (bandwidth limiting) + // and process any expired timers (retransmissions, etc.) + simple_uasync_advance_time(1); // Advance by 1 timebase unit (0.1ms) + net.current_time = simple_uasync_get_time(); // Keep in sync + + // Deliver any packets whose time has come + deliver_packets(&net); + + // Periodically print time progress + if (net.current_time - last_time_print >= 1000) { // Every 100ms + printf(" Time: %.1f ms, Queue: %d packets\n", + net.current_time / 10.0, net.queue_size); + last_time_print = net.current_time; + } + } + + printf("Finished sending %u packets (%u bytes)\n", packets_generated, bytes_generated); + + // Let network deliver remaining packets and wait for retransmissions + printf("\nDelivering remaining packets and waiting for retransmissions...\n"); + uint32_t start_time = net.current_time; + for (int i = 0; i < 50000 && (net.queue_size > 0 || i < 1000); i++) { + simple_uasync_advance_time(10); // Advance 1ms + net.current_time = simple_uasync_get_time(); // Keep in sync + deliver_packets(&net); + + // Periodically advance more time to speed up retransmission timeouts + if (i % 10 == 0) { + simple_uasync_advance_time(100); // Additional 10ms + net.current_time = simple_uasync_get_time(); + deliver_packets(&net); + } + + if (i % 500 == 0) { + uint32_t elapsed = net.current_time - start_time; + printf(" Queue: %d, Time: %.1f ms (elapsed: %.1f ms)\n", + net.queue_size, net.current_time / 10.0, elapsed / 10.0); + } + } + + // No network timer to stop since we're not using one + net.running = 0; + + // Free any remaining packets in queue + free_delay_queue(net.queue); + net.queue = NULL; + net.queue_size = 0; + + // Collect and verify received packets + printf("\nVerifying received packets...\n"); + + ll_queue_t* output_queue = etcp_get_output_queue(net.receiver); + uint32_t packets_received = 0; + uint32_t bytes_received = 0; + uint32_t correct_packets = 0; + uint32_t max_received_seq = 0; + + ll_entry_t* entry; + while ((entry = queue_entry_get(output_queue)) != NULL) { + uint8_t* data = ll_entry_data(entry); + uint16_t size = ll_entry_size(entry); + + if (size >= 4) { + uint32_t seq = 0; + for (int i = 0; i < 4; i++) { + seq |= ((uint32_t)data[i]) << (8 * i); + } + + if (verify_packet_data(data, size, seq)) { + correct_packets++; + if (seq > max_received_seq) { + max_received_seq = seq; + } + } + } + + packets_received++; + bytes_received += size; + queue_entry_free(entry); + } + + // Print statistics + printf("\n=== Statistics ===\n"); + printf("Packets generated: %u\n", packets_generated); + printf("Packets sent: %u (via ETCP)\n", net.packets_sent); + printf("Packets lost: %u (%.1f%%)\n", net.packets_lost, + (net.packets_sent > 0) ? (100.0 * net.packets_lost / net.packets_sent) : 0.0); + printf("Packets reordered: %u (%.1f%% of delivered)\n", net.packets_reordered, + (net.packets_delivered > 0) ? (100.0 * net.packets_reordered / net.packets_delivered) : 0.0); + printf("Packets delivered: %u (to receiver)\n", net.packets_delivered); + printf("Packets received: %u (in output queue)\n", packets_received); + printf("Correct packets: %u (%.1f%%)\n", correct_packets, + (packets_received > 0) ? (100.0 * correct_packets / packets_received) : 0.0); + printf("Bytes generated: %u\n", bytes_generated); + printf("Bytes received: %u\n", bytes_received); + + // Check for missing packets + uint32_t expected_received = packets_generated - net.packets_lost; + if (packets_received < expected_received) { + printf("\nWARNING: Received %u packets, expected ~%u (some may be in flight)\n", + packets_received, expected_received); + } else if (packets_received > expected_received) { + printf("\nWARNING: Received %u packets, expected ~%u (duplicates?)\n", + packets_received, expected_received); + } + + // Cleanup + etcp_free(net.sender); + etcp_free(net.receiver); + + printf("\nStress test completed.\n"); + + // Consider test successful if we received at least some packets + // (with losses, we won't get all of them) + return (correct_packets > 0) ? 0 : 1; +} \ No newline at end of file