You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1245 lines
42 KiB

// etcp.c - Extended Transmission Control Protocol
#include "etcp.h"
#include "u_async.h"
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <sys/time.h>
#include <time.h>
// Service packet headers
#define ETCP_RESET_HEADER 0x02
#define ETCP_RESET_ACK_HEADER 0x03
// Internal structures
typedef struct rx_packet {
struct rx_packet* next;
uint16_t id;
uint16_t timestamp;
uint8_t* data;
uint16_t data_len;
uint8_t has_payload;
} rx_packet_t;
typedef struct sent_packet {
struct sent_packet* next;
uint16_t id;
uint16_t timestamp;
uint8_t* data;
uint16_t data_len; // Total packet length
uint16_t payload_len; // Payload length (for window accounting)
uint16_t send_time;
uint8_t need_ack;
uint8_t need_retransmit;
} sent_packet_t;
// Forward declarations of internal functions
static void tx_process(epkt_t* epkt);
static void retransmit_check(epkt_t* epkt);
static void update_metrics(epkt_t* epkt, uint16_t rtt);
static uint16_t get_current_timestamp(void);
static uint16_t timestamp_diff(uint16_t t1, uint16_t t2);
static int id_compare(uint16_t id1, uint16_t id2);
static void retransmit_packet(epkt_t* epkt, uint16_t id);
// Timer callbacks
static void tx_timer_callback(void* arg);
static void retransmit_timer_callback(void* arg);
// Internal queue callbacks
static void tx_queue_callback(ll_queue_t* q, ll_entry_t* entry, void* arg);
static void schedule_ack_timer(epkt_t* epkt);
static void request_retransmission_for_gaps(epkt_t* epkt);
// Reset handling
static void etcp_send_reset(epkt_t* epkt);
static void etcp_send_reset_ack(epkt_t* epkt);
static void reset_timer_callback(void* arg);
// Initialize new ETCP instance
epkt_t* etcp_init(uasync_t* ua) {
epkt_t* epkt = calloc(1, sizeof(epkt_t));
if (!epkt) return NULL;
epkt->ua = ua;
// Create queues
epkt->tx_queue = queue_new(ua);
epkt->output_queue = queue_new(ua);
if (!epkt->tx_queue || !epkt->output_queue) {
if (epkt->tx_queue) queue_free(epkt->tx_queue);
if (epkt->output_queue) queue_free(epkt->output_queue);
free(epkt);
return NULL;
}
// Set callback for tx queue
queue_set_callback(epkt->tx_queue, tx_queue_callback, epkt);
// Initialize state
epkt->bandwidth = 10000; // Default: 10000 bytes per timebase (0.1us)
epkt->last_sent_timestamp = get_current_timestamp();
epkt->bytes_allowed = 0;
etcp_update_window(epkt); // Initialize window size and retrans timer
// Initialize lists
epkt->rx_list = NULL;
epkt->sent_list = NULL;
// Initialize metrics
epkt->rtt_last = 0;
epkt->rtt_avg_10 = 0;
epkt->rtt_avg_100 = 0;
epkt->jitter = 0;
epkt->bytes_sent_total = 0;
// Initialize statistics
epkt->retransmissions_count = 0;
epkt->ack_packets_count = 0;
epkt->control_packets_count = 0;
epkt->total_packets_sent = 0;
epkt->unique_packets_sent = 0;
epkt->bytes_received_total = 0;
// Initialize IDs
epkt->next_tx_id = 1;
epkt->last_sent_id = 0;
epkt->last_rx_id = 0;
epkt->last_delivered_id = 0;
// Initialize history
epkt->rtt_history_idx = 0;
epkt->rtt_history_count = 0;
// Initialize pending arrays
epkt->pending_ack_count = 0;
epkt->pending_retransmit_count = 0;
// Initialize window management
epkt->unacked_bytes = 0;
epkt->last_acked_id = 0;
epkt->last_rx_ack_id = 0;
epkt->retrans_timer_period = 20; // Default 2ms (20 timebase units)
epkt->next_retrans_time = 0;
epkt->window_blocked = 0;
// Forward progress tracking
epkt->oldest_missing_id = 0;
epkt->missing_since_time = 0;
// No timers yet
epkt->next_tx_timer = NULL;
epkt->retransmit_timer = NULL;
return epkt;
}
// Free ETCP instance
void etcp_free(epkt_t* epkt) {
if (!epkt) return;
// Cancel timers
if (epkt->next_tx_timer) {
uasync_cancel_timeout(epkt->ua, epkt->next_tx_timer);
epkt->next_tx_timer = NULL;
}
if (epkt->retransmit_timer) {
uasync_cancel_timeout(epkt->ua, epkt->retransmit_timer);
epkt->retransmit_timer = NULL;
}
if (epkt->reset_timer) {
uasync_cancel_timeout(epkt->ua, epkt->reset_timer);
epkt->reset_timer = NULL;
}
// Free queues
if (epkt->tx_queue) queue_free(epkt->tx_queue);
if (epkt->output_queue) queue_free(epkt->output_queue);
// Free rx_list
rx_packet_t* rx = epkt->rx_list;
while (rx) {
rx_packet_t* next = rx->next;
if (rx->data) free(rx->data);
free(rx);
rx = next;
}
// Free sent_list
sent_packet_t* sent = epkt->sent_list;
while (sent) {
sent_packet_t* next = sent->next;
if (sent->data) free(sent->data);
free(sent);
sent = next;
}
free(epkt);
}
// Set callback for sending packets
void etcp_set_callback(epkt_t* epkt, etcp_tx_callback_t cb, void* arg) {
if (!epkt) return;
epkt->tx_callback = cb;
epkt->tx_callback_arg = arg;
}
// Get output queue
ll_queue_t* etcp_get_output_queue(epkt_t* epkt) {
return epkt ? epkt->output_queue : NULL;
}
// Set bandwidth
void etcp_set_bandwidth(epkt_t* epkt, uint16_t bandwidth) {
if (!epkt) return;
epkt->bandwidth = bandwidth;
etcp_update_window(epkt);
}
// Update window size based on current RTT and bandwidth
void etcp_update_window(epkt_t* epkt) {
if (!epkt) return;
uint16_t rtt = epkt->rtt_avg_10;
if (rtt == 0) {
epkt->window_size = (uint32_t)-1; // Unlimited window until RTT measured
// Keep default retrans_timer_period (20 = 2ms)
return;
}
// window = RTT * bandwidth * 2
// RTT in timebase (0.1ms), bandwidth in bytes per timebase
// Multiply using 64-bit to avoid overflow, cap at UINT32_MAX
uint64_t rtt64 = rtt;
uint64_t bw64 = epkt->bandwidth;
uint64_t window64 = rtt64 * bw64 * 2;
if (window64 > UINT32_MAX) {
epkt->window_size = UINT32_MAX;
} else {
epkt->window_size = (uint32_t)window64;
}
// Update retransmission timer period: max(RTT/2, 2ms)
uint16_t rtt_half = rtt / 2;
if (rtt_half < 20) { // 2ms = 20 timebase units
rtt_half = 20;
}
epkt->retrans_timer_period = rtt_half;
}
// Reset connection state
void etcp_reset(epkt_t* epkt) {
if (!epkt) return;
// Cancel timers
if (epkt->next_tx_timer) {
uasync_cancel_timeout(epkt->ua, epkt->next_tx_timer);
epkt->next_tx_timer = NULL;
}
if (epkt->retransmit_timer) {
uasync_cancel_timeout(epkt->ua, epkt->retransmit_timer);
epkt->retransmit_timer = NULL;
}
// Clear tx queue
ll_entry_t* entry;
while ((entry = queue_entry_get(epkt->tx_queue)) != NULL) {
queue_entry_free(entry);
}
// Clear output queue
while ((entry = queue_entry_get(epkt->output_queue)) != NULL) {
queue_entry_free(entry);
}
// Free rx_list
rx_packet_t* rx = epkt->rx_list;
while (rx) {
rx_packet_t* next = rx->next;
if (rx->data) free(rx->data);
free(rx);
rx = next;
}
epkt->rx_list = NULL;
// Free sent_list
sent_packet_t* sent = epkt->sent_list;
while (sent) {
sent_packet_t* next = sent->next;
if (sent->data) free(sent->data);
free(sent);
sent = next;
}
epkt->sent_list = NULL;
// Reset state
epkt->last_sent_timestamp = get_current_timestamp();
epkt->bytes_allowed = 0;
etcp_update_window(epkt);
// Reset metrics
epkt->rtt_last = 0;
epkt->rtt_avg_10 = 0;
epkt->rtt_avg_100 = 0;
epkt->jitter = 0;
epkt->bytes_sent_total = 0;
// Reset statistics
epkt->retransmissions_count = 0;
epkt->ack_packets_count = 0;
epkt->control_packets_count = 0;
epkt->total_packets_sent = 0;
epkt->unique_packets_sent = 0;
epkt->bytes_received_total = 0;
// Reset IDs
epkt->next_tx_id = 1;
epkt->last_rx_id = 0;
epkt->last_delivered_id = 0;
// Reset history
epkt->rtt_history_idx = 0;
epkt->rtt_history_count = 0;
// Reset pending arrays
epkt->pending_ack_count = 0;
epkt->pending_retransmit_count = 0;
// Reset window management
epkt->unacked_bytes = 0;
epkt->window_size = 0;
epkt->last_acked_id = 0;
epkt->last_rx_ack_id = 0;
epkt->retrans_timer_period = 20;
epkt->next_retrans_time = 0;
epkt->window_blocked = 0;
}
// Get RTT
uint16_t etcp_get_rtt(epkt_t* epkt) {
return epkt ? epkt->rtt_last : 0;
}
// Get jitter
uint16_t etcp_get_jitter(epkt_t* epkt) {
return epkt ? epkt->jitter : 0;
}
// Put data into transmission queue
int etcp_tx_put(epkt_t* epkt, uint8_t* data, uint16_t len) {
if (!epkt || !data || len == 0) return -1;
// Create queue entry
ll_entry_t* entry = queue_entry_new(len);
if (!entry) return -1;
// Copy data
memcpy(ll_entry_data(entry), data, len);
// Add to queue
ETCP_DEBUG_LOG("etcp_tx_put: adding packet len=%u, tx_queue count before=%d\n", len, queue_entry_count(epkt->tx_queue));
int result = queue_entry_put(epkt->tx_queue, entry);
if (result != 0) {
ETCP_DEBUG_LOG("etcp_tx_put: queue_entry_put failed, result=%d\n", result);
queue_entry_free(entry);
} else {
ETCP_DEBUG_LOG("etcp_tx_put: queued successfully, tx_queue count after=%d\n", queue_entry_count(epkt->tx_queue));
}
return result;
}
// Get total number of packets in transmission queues
int etcp_tx_queue_size(epkt_t* epkt) {
if (!epkt) return 0;
int count = queue_entry_count(epkt->tx_queue);
// Add packets in sent_list waiting for ACK
sent_packet_t* sent = epkt->sent_list;
while (sent) {
if (sent->need_ack) {
count++;
}
sent = sent->next;
}
return count;
}
// ==================== Internal Functions ====================
// Get current timestamp (0.1us timebase)
static uint16_t get_current_timestamp(void) {
#ifdef ENABLE_TESTS
// For tests, use a simple counter to ensure deterministic behavior
static uint16_t counter = 0;
return counter++;
#else
// Production: use monotonic clock if available
#ifdef CLOCK_MONOTONIC
struct timespec ts;
if (clock_gettime(CLOCK_MONOTONIC, &ts) == 0) {
// Convert to 0.1ms units (100us)
// 1 second = 10,000,000 timebase units (0.1us each)
// But we need modulo 65536 for 16-bit timestamp
uint64_t ns = (uint64_t)ts.tv_sec * 1000000000ULL + (uint64_t)ts.tv_nsec;
uint64_t timebase_units = ns / 100000; // 0.1ms = 100,000ns
return (uint16_t)(timebase_units & 0xFFFF);
}
#endif
// Fallback to gettimeofday (not monotonic but available everywhere)
struct timeval tv;
gettimeofday(&tv, NULL);
uint64_t us = (uint64_t)tv.tv_sec * 1000000ULL + (uint64_t)tv.tv_usec;
uint64_t timebase_units = us / 100; // 0.1ms = 100us
return (uint16_t)(timebase_units & 0xFFFF);
#endif
}
// Calculate positive difference between timestamps (considering wrap-around)
static uint16_t timestamp_diff(uint16_t t1, uint16_t t2) {
return (t1 - t2) & 0xFFFF; // modulo 65536
}
// Compare IDs considering wrap-around
static int id_compare(uint16_t id1, uint16_t id2) {
return (int16_t)(id1 - id2);
}
// Update metrics with new RTT measurement
static void update_metrics(epkt_t* epkt, uint16_t rtt) {
if (!epkt) return;
epkt->rtt_last = rtt;
// Update history
epkt->rtt_history[epkt->rtt_history_idx] = rtt;
epkt->rtt_history_idx = (epkt->rtt_history_idx + 1) % 100;
if (epkt->rtt_history_count < 100) {
epkt->rtt_history_count++;
}
// Calculate average of last 10
uint32_t sum10 = 0;
int count10 = (epkt->rtt_history_count < 10) ? epkt->rtt_history_count : 10;
for (int i = 0; i < count10; i++) {
int idx = (epkt->rtt_history_idx - 1 - i + 100) % 100;
sum10 += epkt->rtt_history[idx];
}
if (count10 > 0) {
epkt->rtt_avg_10 = sum10 / count10;
}
// Calculate average of last 100
uint32_t sum100 = 0;
int count100 = epkt->rtt_history_count;
for (int i = 0; i < count100; i++) {
sum100 += epkt->rtt_history[i];
}
if (count100 > 0) {
epkt->rtt_avg_100 = sum100 / count100;
}
// Update jitter
if (count10 > 0) {
int16_t diff = (int16_t)(epkt->rtt_avg_10 - rtt);
if (diff < 0) diff = -diff;
// jitter += (abs(rtt_avg_10 - rtt_last) - jitter) * 0.1
epkt->jitter += ((uint16_t)diff - epkt->jitter) / 10;
}
// Update window size based on new RTT
etcp_update_window(epkt);
}
// Process transmission queue
static void tx_process(epkt_t* epkt) {
if (!epkt || !epkt->tx_callback) return;
// Check bandwidth limit
uint16_t current_time = get_current_timestamp();
uint16_t delta = timestamp_diff(current_time, epkt->last_sent_timestamp);
if (delta > 0) {
epkt->bytes_allowed += delta * epkt->bandwidth;
epkt->last_sent_timestamp = current_time;
}
// If no bytes allowed, schedule timer
if (epkt->bytes_allowed == 0) {
// Schedule next attempt
if (!epkt->next_tx_timer) {
epkt->next_tx_timer = uasync_set_timeout(epkt->ua, 1, epkt, tx_timer_callback);
}
return;
}
// Get data from queue
ll_entry_t* entry = queue_entry_get(epkt->tx_queue);
int data_packet = 1; // 1 if packet contains payload, 0 if metrics only
uint16_t data_len = 0;
if (!entry) {
// No data to send, but we might need to send metrics
if (epkt->pending_ack_count == 0 && epkt->pending_retransmit_count == 0) {
return; // Nothing to send at all
}
data_packet = 0;
data_len = 0;
} else {
data_len = ll_entry_size(entry);
}
// Check window size for data packets
if (data_packet && epkt->window_size != (uint32_t)-1) {
if (epkt->unacked_bytes + data_len > epkt->window_size) {
// Window full, block transmission
epkt->window_blocked = 1;
ETCP_LOG("Window blocked: unacked=%u, data_len=%u, window=%u\n",
epkt->unacked_bytes, data_len, epkt->window_size);
// Put entry back to queue
queue_entry_put_first(epkt->tx_queue, entry);
// Schedule check when window might open (after retransmission timer)
if (!epkt->next_tx_timer) {
epkt->next_tx_timer = uasync_set_timeout(epkt->ua, epkt->retrans_timer_period, epkt, tx_timer_callback);
}
return;
}
}
// Calculate packet size (header + data + metrics)
uint16_t packet_size = 4; // id+timestamp
if (data_packet) {
packet_size += 1 + data_len; // hdr=0 + data
}
// Add space for ACKs
if (epkt->pending_ack_count > 0) {
packet_size += 1 + 1 + epkt->pending_ack_count * 4 + 4; // hdr=0x01 + count + ids+timestamps + 2 IDs
}
// Add space for retransmission requests
if (epkt->pending_retransmit_count > 0) {
uint8_t count = epkt->pending_retransmit_count;
if (count > 32) count = 32;
packet_size += 1 + count * 2 + 4; // hdr + IDs + last delivered ID + last received ID
}
// Check if we have enough bandwidth
if (epkt->bytes_allowed < packet_size) {
// Not enough bandwidth
if (data_packet) {
// Put entry back and schedule timer
queue_entry_put_first(epkt->tx_queue, entry);
}
if (!epkt->next_tx_timer) {
uint16_t wait_time = (packet_size - epkt->bytes_allowed) / epkt->bandwidth;
if (wait_time < 1) wait_time = 1;
epkt->next_tx_timer = uasync_set_timeout(epkt->ua, wait_time, epkt, tx_timer_callback);
}
return;
}
// Allocate packet buffer
uint8_t* packet = malloc(packet_size);
if (!packet) {
queue_entry_free(entry);
return;
}
// Build packet
uint8_t* ptr = packet;
// ID and timestamp
uint16_t id;
if (data_packet) {
id = epkt->next_tx_id++;
// Update last sent ID if newer
if (id != 0 && id_compare(id, epkt->last_sent_id) > 0) {
epkt->last_sent_id = id;
}
} else {
id = 0; // metrics-only packet
}
uint16_t timestamp = current_time;
*ptr++ = id >> 8;
*ptr++ = id & 0xFF;
*ptr++ = timestamp >> 8;
*ptr++ = timestamp & 0xFF;
// Add ACKs if any
if (epkt->pending_ack_count > 0) {
uint8_t count = epkt->pending_ack_count;
if (count > 32) count = 32;
*ptr++ = 0x01; // hdr for timestamp report
*ptr++ = count; // number of timestamp pairs
for (int i = 0; i < count; i++) {
*ptr++ = epkt->pending_ack_ids[i] >> 8;
*ptr++ = epkt->pending_ack_ids[i] & 0xFF;
*ptr++ = epkt->pending_ack_timestamps[i] >> 8;
*ptr++ = epkt->pending_ack_timestamps[i] & 0xFF;
}
// Add last delivered ID and last received ID
*ptr++ = epkt->last_delivered_id >> 8;
*ptr++ = epkt->last_delivered_id & 0xFF;
*ptr++ = epkt->last_rx_id >> 8;
*ptr++ = epkt->last_rx_id & 0xFF;
epkt->pending_ack_count = 0;
}
// Add retransmission requests if any
if (epkt->pending_retransmit_count > 0) {
uint8_t count = epkt->pending_retransmit_count;
if (count > 32) count = 32;
ETCP_LOG("Sending retransmit requests: count=%u, IDs: ", count);
for (int i = 0; i < count; i++) {
ETCP_LOG("%u ", epkt->pending_retransmit_ids[i]);
}
ETCP_LOG("last_delivered=%u, last_rx=%u\n", epkt->last_delivered_id, epkt->last_rx_id);
*ptr++ = 0x10 + (count - 1); // hdr with count
for (int i = 0; i < count; i++) {
*ptr++ = epkt->pending_retransmit_ids[i] >> 8;
*ptr++ = epkt->pending_retransmit_ids[i] & 0xFF;
}
// Add last delivered ID and last received ID
*ptr++ = epkt->last_delivered_id >> 8;
*ptr++ = epkt->last_delivered_id & 0xFF;
*ptr++ = epkt->last_rx_id >> 8;
*ptr++ = epkt->last_rx_id & 0xFF;
epkt->pending_retransmit_count = 0;
}
// Add payload if present
if (data_packet) {
*ptr++ = 0x00; // hdr=0 for payload
memcpy(ptr, ll_entry_data(entry), data_len);
ptr += data_len;
}
// Update statistics before sending
epkt->total_packets_sent++;
if (data_packet) {
epkt->unique_packets_sent++;
}
if (epkt->pending_ack_count > 0) {
epkt->ack_packets_count++;
epkt->control_packets_count++;
}
if (epkt->pending_retransmit_count > 0) {
epkt->control_packets_count++;
}
// Send packet
epkt->tx_callback(epkt, packet, packet_size, epkt->tx_callback_arg);
// Update bandwidth accounting
epkt->bytes_allowed -= packet_size;
epkt->bytes_sent_total += packet_size;
if (data_packet) {
// Store in sent list for possible retransmission
sent_packet_t* sent = malloc(sizeof(sent_packet_t));
if (sent) {
sent->id = id;
sent->timestamp = timestamp;
sent->data = packet; // Keep the packet for retransmission
sent->data_len = packet_size;
sent->send_time = current_time;
sent->need_ack = 1;
sent->payload_len = data_len;
// Update unacked bytes
epkt->unacked_bytes += data_len;
// Add to list
sent->next = epkt->sent_list;
epkt->sent_list = sent;
} else {
free(packet);
}
// Free queue entry
queue_entry_free(entry);
} else {
// Metrics-only packet, free packet buffer
free(packet);
}
// Schedule retransmit check if not already scheduled
if (!epkt->retransmit_timer && epkt->retrans_timer_period > 0) {
epkt->retransmit_timer = uasync_set_timeout(epkt->ua, epkt->retrans_timer_period, epkt, retransmit_timer_callback);
}
// Resume queue callback for next packet
queue_resume_callback(epkt->tx_queue);
}
// Check for needed retransmissions
static void retransmit_check(epkt_t* epkt) {
if (!epkt) return;
uint16_t current_time = get_current_timestamp();
// Threshold = RTT * 1.5
uint16_t threshold = epkt->rtt_avg_10 + epkt->rtt_avg_10 / 2;
sent_packet_t* sent = epkt->sent_list;
while (sent) {
uint16_t age = timestamp_diff(current_time, sent->send_time);
if (age > threshold && sent->need_ack) {
// Add to retransmit queue
if (epkt->pending_retransmit_count < 32) {
epkt->pending_retransmit_ids[epkt->pending_retransmit_count++] = sent->id;
}
}
sent = sent->next;
}
// Special retransmission for the newest unacked packet after 2×RTT
if (epkt->last_sent_id != 0 && id_compare(epkt->last_sent_id, epkt->last_acked_id) > 0) {
// Find the newest packet in sent_list
sent_packet_t* sent = epkt->sent_list;
while (sent) {
if (sent->id == epkt->last_sent_id && sent->need_ack) {
uint16_t age = timestamp_diff(current_time, sent->send_time);
uint16_t threshold_new = epkt->rtt_avg_10 * 2;
if (age > threshold_new) {
// Check if already in retransmit queue
int already = 0;
for (int i = 0; i < epkt->pending_retransmit_count; i++) {
if (epkt->pending_retransmit_ids[i] == sent->id) {
already = 1;
break;
}
}
if (!already && epkt->pending_retransmit_count < 32) {
epkt->pending_retransmit_ids[epkt->pending_retransmit_count++] = sent->id;
}
}
break;
}
sent = sent->next;
}
}
// Reschedule check with updated period
if (epkt->retrans_timer_period > 0) {
epkt->retransmit_timer = uasync_set_timeout(epkt->ua, epkt->retrans_timer_period, epkt, retransmit_timer_callback);
} else {
epkt->retransmit_timer = NULL;
}
}
// Process received packet
int etcp_rx_input(epkt_t* epkt, uint8_t* pkt, uint16_t len) {
if (!epkt || !pkt || len < 4) return -1;
// Update received bytes statistics
epkt->bytes_received_total += len;
// Parse header
uint8_t* ptr = pkt;
uint16_t id = (ptr[0] << 8) | ptr[1];
uint16_t timestamp = (ptr[2] << 8) | ptr[3];
ptr += 4;
len -= 4;
// Track last received ID
if (id_compare(id, epkt->last_rx_id) > 0) {
epkt->last_rx_id = id;
}
// Process headers
uint8_t has_payload = 0;
uint8_t* payload = NULL;
uint16_t payload_len = 0;
while (len > 0) {
uint8_t hdr = *ptr++;
len--;
if (hdr == 0x00) {
// Payload
has_payload = 1;
payload = ptr;
payload_len = len;
break; // Payload is the rest of the packet
} else if (hdr == 0x01) {
// Timestamp report with count
if (len >= 1) {
uint8_t count = *ptr++;
len--;
if (len >= count * 4 + 4) {
// Process each timestamp pair
for (int i = 0; i < count; i++) {
uint16_t ack_id = (ptr[0] << 8) | ptr[1];
uint16_t ack_timestamp = (ptr[2] << 8) | ptr[3];
ptr += 4;
len -= 4;
// Calculate RTT
uint16_t current_time = get_current_timestamp();
uint16_t rtt_raw = timestamp_diff(current_time, ack_timestamp);
if (rtt_raw > 0) {
update_metrics(epkt, rtt_raw);
}
// Remove acknowledged packet from sent_list
sent_packet_t* sent = epkt->sent_list;
sent_packet_t* prev = NULL;
while (sent) {
if (sent->id == ack_id) {
if (prev) {
prev->next = sent->next;
} else {
epkt->sent_list = sent->next;
}
// Update unacked bytes (with underflow protection)
if (sent->payload_len > epkt->unacked_bytes) {
epkt->unacked_bytes = 0;
} else {
epkt->unacked_bytes -= sent->payload_len;
}
ETCP_LOG("ACK received for id=%u, unacked_bytes now=%u, payload_len=%u\n",
ack_id, epkt->unacked_bytes, sent->payload_len);
// Update last acknowledged ID
if (id_compare(ack_id, epkt->last_acked_id) > 0) {
epkt->last_acked_id = ack_id;
}
// Update last received ACK ID
if (id_compare(ack_id, epkt->last_rx_ack_id) > 0) {
epkt->last_rx_ack_id = ack_id;
}
// Window may have opened
epkt->window_blocked = 0;
// Forward progress tracking
epkt->oldest_missing_id = 0;
epkt->missing_since_time = 0;
free(sent->data);
free(sent);
break;
}
prev = sent;
sent = sent->next;
}
}
// Read last delivered ID and last received ID
uint16_t last_delivered = (ptr[0] << 8) | ptr[1];
uint16_t last_received = (ptr[2] << 8) | ptr[3];
ptr += 4;
len -= 4;
// Update our last delivered if newer
if (id_compare(last_delivered, epkt->last_delivered_id) > 0) {
epkt->last_delivered_id = last_delivered;
// Clear oldest missing if it's now delivered or skipped
if (epkt->oldest_missing_id != 0 && id_compare(epkt->oldest_missing_id, epkt->last_delivered_id) <= 0) {
epkt->oldest_missing_id = 0;
epkt->missing_since_time = 0;
}
}
// Update last_rx_ack_id (latest known received packet)
if (id_compare(last_received, epkt->last_rx_ack_id) > 0) {
epkt->last_rx_ack_id = last_received;
}
}
}
} else if (hdr >= 0x10 && hdr <= 0x2F) {
// Retransmission request with two IDs
uint8_t count = (hdr & 0x0F) + 1;
if (len >= count * 2 + 4) {
ETCP_LOG("Retransmit request received: count=%u, IDs: ", count);
// Read IDs to retransmit
for (int i = 0; i < count; i++) {
uint16_t retransmit_id = (ptr[0] << 8) | ptr[1];
ptr += 2;
len -= 2;
// Retransmit the requested packet
retransmit_packet(epkt, retransmit_id);
ETCP_LOG("%u ", retransmit_id);
}
ETCP_LOG("\n");
// Read last delivered ID and last received ID
uint16_t last_delivered = (ptr[0] << 8) | ptr[1];
uint16_t last_received = (ptr[2] << 8) | ptr[3];
ptr += 4;
len -= 4;
// Update our last delivered if newer
ETCP_LOG("Comparing last_delivered: remote=%u, local=%u, cmp=%d\n",
last_delivered, epkt->last_delivered_id, id_compare(last_delivered, epkt->last_delivered_id));
if (id_compare(last_delivered, epkt->last_delivered_id) > 0) {
epkt->last_delivered_id = last_delivered;
ETCP_LOG("Updated last_delivered_id to %u\n", epkt->last_delivered_id);
// Clear oldest missing if it's now delivered or skipped
if (epkt->oldest_missing_id != 0 && id_compare(epkt->oldest_missing_id, epkt->last_delivered_id) <= 0) {
epkt->oldest_missing_id = 0;
epkt->missing_since_time = 0;
}
}
// Update last_rx_ack_id (latest known received packet)
if (id_compare(last_received, epkt->last_rx_ack_id) > 0) {
epkt->last_rx_ack_id = last_received;
}
ETCP_LOG("Updated from retransmit request: last_delivered=%u, last_received=%u, our_last_delivered=%u\n",
last_delivered, last_received, epkt->last_delivered_id);
}
} else if (hdr == ETCP_RESET_HEADER) {
// Reset request
ETCP_LOG("Reset request received\n");
// Send reset ACK
etcp_send_reset_ack(epkt);
// Reset our own state
etcp_reset(epkt);
} else if (hdr == ETCP_RESET_ACK_HEADER) {
// Reset ACK received
ETCP_LOG("Reset ACK received\n");
epkt->reset_ack_received = 1;
epkt->reset_pending = 0;
if (epkt->reset_timer) {
uasync_cancel_timeout(epkt->ua, epkt->reset_timer);
epkt->reset_timer = NULL;
}
}
// Unknown hdr - skip?
}
// Add to rx_list if has payload and id != 0 (id=0 is for metrics-only)
if (has_payload && payload_len > 0 && id != 0) {
// Check for duplicate
rx_packet_t* current = epkt->rx_list;
rx_packet_t* prev = NULL;
while (current) {
int cmp = id_compare(current->id, id);
if (cmp == 0) {
// Duplicate, ignore
return 0;
}
if (cmp > 0) {
// Found insertion point
break;
}
prev = current;
current = current->next;
}
// Create new rx_packet
rx_packet_t* new_pkt = malloc(sizeof(rx_packet_t));
if (!new_pkt) return -1;
new_pkt->id = id;
new_pkt->timestamp = timestamp;
new_pkt->data_len = payload_len;
new_pkt->data = malloc(payload_len);
if (!new_pkt->data) {
free(new_pkt);
return -1;
}
memcpy(new_pkt->data, payload, payload_len);
new_pkt->has_payload = 1;
// Insert into sorted list
new_pkt->next = current;
if (prev) {
prev->next = new_pkt;
} else {
epkt->rx_list = new_pkt;
}
ETCP_LOG("Received packet id=%u, last_delivered=%u\n", id, epkt->last_delivered_id);
// If this packet was the oldest missing, clear the tracking
if (epkt->oldest_missing_id != 0 && epkt->oldest_missing_id == id) {
epkt->oldest_missing_id = 0;
epkt->missing_since_time = 0;
}
// Add to pending ACKs
if (epkt->pending_ack_count < 32) {
epkt->pending_ack_ids[epkt->pending_ack_count] = id;
epkt->pending_ack_timestamps[epkt->pending_ack_count] = timestamp;
epkt->pending_ack_count++;
schedule_ack_timer(epkt);
}
// Check for gaps and request retransmission
request_retransmission_for_gaps(epkt);
// Move continuous sequence to output queue
uint16_t next_expected = epkt->last_delivered_id + 1;
ETCP_LOG("Delivery check: next_expected=%u\n", next_expected);
// Continue delivering as long as we find the next expected packet
int delivered;
do {
delivered = 0;
rx_packet_t* rx = epkt->rx_list;
rx_packet_t* prev = NULL;
// Search for packet with id == next_expected
while (rx) {
if (rx->id == next_expected) {
ETCP_LOG("Delivering packet id=%u to output queue\n", rx->id);
// Add to output queue
ll_entry_t* entry = queue_entry_new(rx->data_len);
if (entry) {
memcpy(ll_entry_data(entry), rx->data, rx->data_len);
queue_entry_put(epkt->output_queue, entry);
}
// Update last delivered
epkt->last_delivered_id = next_expected;
// Clear oldest missing if it's now delivered or skipped
if (epkt->oldest_missing_id != 0 && id_compare(epkt->oldest_missing_id, epkt->last_delivered_id) <= 0) {
epkt->oldest_missing_id = 0;
epkt->missing_since_time = 0;
}
next_expected++;
// Remove from rx_list
if (prev) {
prev->next = rx->next;
} else {
epkt->rx_list = rx->next;
}
free(rx->data);
free(rx);
delivered = 1;
break;
}
prev = rx;
rx = rx->next;
}
} while (delivered);
}
return 0;
}
// ==================== Timer Callbacks ====================
static void tx_timer_callback(void* arg) {
epkt_t* epkt = (epkt_t*)arg;
if (!epkt) return;
epkt->next_tx_timer = NULL;
tx_process(epkt);
}
static void retransmit_timer_callback(void* arg) {
epkt_t* epkt = (epkt_t*)arg;
if (!epkt) return;
epkt->retransmit_timer = NULL;
retransmit_check(epkt);
}
static void request_retransmission_for_gaps(epkt_t* epkt) {
if (!epkt || !epkt->rx_list) return;
// Forward progress: if oldest missing packet has been missing for > 3×RTT, skip it
if (epkt->oldest_missing_id != 0) {
uint16_t current_time = get_current_timestamp();
uint16_t missing_duration = timestamp_diff(current_time, epkt->missing_since_time);
uint16_t threshold = epkt->rtt_avg_10 * 3;
if (threshold < 60) threshold = 60; // Minimum 6ms
if (missing_duration > threshold) {
ETCP_LOG("Forward progress: skipping missing packet id=%u (missing for %u > threshold %u)\n",
epkt->oldest_missing_id, missing_duration, threshold);
// Advance last_delivered_id past the missing packet
epkt->last_delivered_id = epkt->oldest_missing_id;
epkt->oldest_missing_id = 0;
epkt->missing_since_time = 0;
// Continue to deliver any now-contiguous packets
// The function will be called again after this update
}
}
// Find gaps in rx_list
uint16_t expected = epkt->last_delivered_id + 1;
rx_packet_t* current = epkt->rx_list;
while (current) {
if (id_compare(current->id, expected) > 0) {
// Gap detected: missing packets between expected and current->id
uint16_t missing = expected;
while (id_compare(missing, current->id) < 0) {
if (epkt->pending_retransmit_count < 32) {
epkt->pending_retransmit_ids[epkt->pending_retransmit_count++] = missing;
ETCP_LOG("Gap: requesting retransmit for id=%u (last_delivered=%u)\n",
missing, epkt->last_delivered_id);
// Track oldest missing packet for forward progress
if (epkt->oldest_missing_id == 0 || id_compare(missing, epkt->oldest_missing_id) < 0) {
epkt->oldest_missing_id = missing;
epkt->missing_since_time = get_current_timestamp();
}
}
missing++;
}
}
expected = current->id + 1;
current = current->next;
}
}
static void schedule_ack_timer(epkt_t* epkt) {
if (!epkt) return;
// If we have pending ACKs or retransmission requests, try to send them
if ((epkt->pending_ack_count > 0 || epkt->pending_retransmit_count > 0) && !epkt->next_tx_timer) {
// Try to send immediately (if bandwidth allows)
tx_process(epkt);
}
}
// Retransmit a specific packet
static void retransmit_packet(epkt_t* epkt, uint16_t id) {
if (!epkt || !epkt->tx_callback) return;
// Find packet in sent_list
sent_packet_t* sent = epkt->sent_list;
while (sent) {
if (sent->id == id) {
// Update statistics
epkt->retransmissions_count++;
epkt->total_packets_sent++;
// Resend the packet (with same data)
epkt->tx_callback(epkt, sent->data, sent->data_len, epkt->tx_callback_arg);
// Update send time for retransmission timeout
sent->send_time = get_current_timestamp();
ETCP_LOG("Retransmitted packet id=%u\n", id);
return;
}
sent = sent->next;
}
ETCP_LOG("Cannot retransmit packet id=%u: not found in sent_list\n", id);
}
// ==================== Statistics API ====================
void etcp_get_stats(epkt_t* epkt,
uint32_t* retransmissions,
uint32_t* total_packets_sent,
uint32_t* unique_packets_sent,
uint32_t* bytes_sent_total,
uint32_t* bytes_received_total,
uint32_t* ack_packets_count,
uint32_t* control_packets_count) {
if (!epkt) return;
if (retransmissions) *retransmissions = epkt->retransmissions_count;
if (total_packets_sent) *total_packets_sent = epkt->total_packets_sent;
if (unique_packets_sent) *unique_packets_sent = epkt->unique_packets_sent;
if (bytes_sent_total) *bytes_sent_total = epkt->bytes_sent_total;
if (bytes_received_total) *bytes_received_total = epkt->bytes_received_total;
if (ack_packets_count) *ack_packets_count = epkt->ack_packets_count;
if (control_packets_count) *control_packets_count = epkt->control_packets_count;
}
// ==================== Reset Handling ====================
static void reset_timer_callback(void* arg) {
epkt_t* epkt = (epkt_t*)arg;
if (!epkt) return;
if (epkt->reset_ack_received) {
// ACK received, stop retrying
epkt->reset_timer = NULL;
return;
}
epkt->reset_retry_count++;
if (epkt->reset_retry_count > 10) {
// Too many retries, give up
ETCP_LOG("Reset retry limit exceeded\n");
epkt->reset_pending = 0;
epkt->reset_timer = NULL;
return;
}
// Resend reset packet
ETCP_LOG("Resending reset packet (retry %u)\n", epkt->reset_retry_count);
etcp_send_reset(epkt);
// Schedule next retry in 100ms (1000 timebase units)
epkt->reset_timer = uasync_set_timeout(epkt->ua, 1000, epkt, reset_timer_callback);
}
static void etcp_send_reset(epkt_t* epkt) {
if (!epkt) return;
// Create reset packet: 4-byte header (ID=0, timestamp=0) + reset header
uint8_t packet[5];
packet[0] = 0; // ID high byte (0)
packet[1] = 0; // ID low byte (0)
packet[2] = 0; // timestamp high byte
packet[3] = 0; // timestamp low byte
packet[4] = ETCP_RESET_HEADER;
epkt->reset_pending = 1;
epkt->reset_ack_received = 0;
if (epkt->tx_callback) {
epkt->tx_callback(epkt, packet, sizeof(packet), epkt->tx_callback_arg);
}
}
static void etcp_send_reset_ack(epkt_t* epkt) {
if (!epkt) return;
// Create reset ACK packet: 4-byte header (ID=0, timestamp=0) + reset ACK header
uint8_t packet[5];
packet[0] = 0; // ID high byte (0)
packet[1] = 0; // ID low byte (0)
packet[2] = 0; // timestamp high byte
packet[3] = 0; // timestamp low byte
packet[4] = ETCP_RESET_ACK_HEADER;
if (epkt->tx_callback) {
epkt->tx_callback(epkt, packet, sizeof(packet), epkt->tx_callback_arg);
}
}
// Public reset function
void etcp_reset_connection(epkt_t* epkt) {
if (!epkt) return;
ETCP_LOG("Initiating connection reset\n");
// Cancel any existing reset timer
if (epkt->reset_timer) {
uasync_cancel_timeout(epkt->ua, epkt->reset_timer);
epkt->reset_timer = NULL;
}
epkt->reset_pending = 1;
epkt->reset_ack_received = 0;
epkt->reset_retry_count = 0;
// Send first reset packet
etcp_send_reset(epkt);
// Start retry timer (100ms = 1000 timebase units)
epkt->reset_timer = uasync_set_timeout(epkt->ua, 1000, epkt, reset_timer_callback);
}
// ==================== Queue Callbacks ====================
static void tx_queue_callback(ll_queue_t* q, ll_entry_t* entry, void* arg) {
(void)q;
(void)entry;
epkt_t* epkt = (epkt_t*)arg;
if (!epkt) return;
ETCP_LOG("tx_queue_callback triggered\n");
// Start transmission process
tx_process(epkt);
}