You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

745 lines
31 KiB

// etcp.c - ETCP Protocol Implementation (refactored and expanded based on etcp_protocol.txt)
#include "etcp.h"
#include "etcp_loadbalancer.h"
#include "../lib/u_async.h"
#include "../lib/ll_queue.h"
#include "../lib/debug_config.h"
#include "crc32.h" // For potential hashing, though not used yet.
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <math.h> // For bandwidth calcs
#include <limits.h> // For UINT16_MAX
// Enable comprehensive debug output for ETCP module
#define DEBUG_CATEGORY_ETCP_DETAILED 1
// Constants from spec (adjusted for completeness)
#define MAX_INFLIGHT_BYTES 65536 // Initial window
#define RETRANS_K1 2.0f // RTT multiplier for retrans timeout
#define RETRANS_K2 1.5f // Jitter multiplier
#define ACK_DELAY_TB 20 // ACK timer delay (2ms in 0.1ms units)
#define BURST_DELAY_FACTOR 4 // Delay before burst
#define BURST_SIZE 5 // Packets in burst (1 delayed + 4 burst)
#define RTT_HISTORY_SIZE 10 // For jitter calc
#define MAX_PENDING 32 // For ACKs/retrans (arbitrary; adjust)
#define SECTION_HEADER_SIZE 3 // type(1) + len(2)
// Container-of macro for getting struct from data pointer
//#define CONTAINER_OF(ptr, type, member) ((type *)((char *)(ptr) - offsetof(type, member)))
// Forward declarations
static void input_queue_cb(struct ll_queue* q, void* arg);
static void etcp_link_ready_callback(struct ETCP_CONN* etcp);
static void input_send_q_cb(struct ll_queue* q, void* arg);
static void wait_ack_cb(struct ll_queue* q, void* arg);
static void etcp_conn_process_send_queue(struct ETCP_CONN* etcp);
// Get current time in 0.1ms units
uint64_t get_current_time_units() {
struct timeval tv;
gettimeofday(&tv, NULL);
uint64_t time_units = ((uint64_t)tv.tv_sec * 10000ULL) + (tv.tv_usec / 100);
// DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "get_current_time_units: tv_sec=%ld, tv_usec=%ld, result=%llu",
// tv.tv_sec, tv.tv_usec, (unsigned long long)time_units);
return time_units;
}
uint16_t get_current_timestamp() {
uint16_t timestamp = (uint16_t)(get_current_time_units() & 0xFFFF);
// DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "get_current_timestamp: result=%u", timestamp);
return timestamp;
}
// Timestamp diff (with wrap-around)
static uint16_t timestamp_diff(uint16_t t1, uint16_t t2) {
// DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "timestamp_diff: t1=%u, t2=%u", t1, t2);
if (t1 >= t2) {
return t1 - t2;
}
return (UINT16_MAX - t2) + t1 + 1;
}
// Create new ETCP connection
struct ETCP_CONN* etcp_connection_create(struct UTUN_INSTANCE* instance) {
if (!instance) return NULL;
struct ETCP_CONN* etcp = calloc(1, sizeof(struct ETCP_CONN));
if (!etcp) {
DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "etcp_connection_create: creating connection failed for instance %p", instance);
return NULL;
}
etcp->instance = instance;
etcp->input_queue = queue_new(instance->ua, 0); // No hash for input_queue
etcp->output_queue = queue_new(instance->ua, 0); // No hash for output_queue
etcp->input_send_q = queue_new(instance->ua, INFLIGHT_INITIAL_HASH_SIZE); // Hash for send_q
etcp->input_wait_ack = queue_new(instance->ua, INFLIGHT_INITIAL_HASH_SIZE); // Hash for wait_ack
etcp->recv_q = queue_new(instance->ua, INFLIGHT_INITIAL_HASH_SIZE); // Hash for send_q
etcp->ack_q = queue_new(instance->ua, 0);
etcp->inflight_pool = memory_pool_init(sizeof(struct INFLIGHT_PACKET));
etcp->rx_pool = memory_pool_init(sizeof(struct ETCP_FRAGMENT));
if (!etcp->input_queue || !etcp->output_queue || !etcp->input_send_q || !etcp->recv_q || !etcp->ack_q ||
!etcp->input_wait_ack || !etcp->inflight_pool || !etcp->rx_pool) {
DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "etcp_connection_create: error - closing - input:%p output:%p send:%p wait:%x pool:%p",
etcp->input_queue, etcp->output_queue, etcp->input_send_q, etcp->input_wait_ack, etcp->inflight_pool);
etcp_connection_close(etcp);
return NULL;
}
etcp->mtu = 1500; // Default MTU
etcp->window_size = MAX_INFLIGHT_BYTES;
etcp->next_tx_id = 1;
etcp->rtt_avg_10 = 10; // Initial guess (1ms)
etcp->rtt_avg_100 = 10;
etcp->rtt_history_idx = 0;
memset(etcp->rtt_history, 0, sizeof(etcp->rtt_history));
// Set input queue callback
queue_set_callback(etcp->input_queue, input_queue_cb, etcp);
queue_set_callback(etcp->input_send_q, input_send_q_cb, etcp);
queue_set_callback(etcp->input_wait_ack, wait_ack_cb, etcp);
etcp->link_ready_for_send_fn = etcp_link_ready_callback;
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_connection_create: connection initialized. ETCP=%p mtu=%d, window_size=%u, next_tx_id=%u",
etcp, etcp->mtu, etcp->window_size, etcp->next_tx_id);
return etcp;
}
// Close connection with NULL pointer safety (prevents double free)
void etcp_connection_close(struct ETCP_CONN* etcp) {
if (!etcp) return;
// Drain and free input_queue (contains ETCP_FRAGMENT with pkt_data from data_pool)
if (etcp->input_queue) {
struct ETCP_FRAGMENT* pkt;
while ((pkt = queue_data_get(etcp->input_queue)) != NULL) {
if (pkt->pkt_data) {
memory_pool_free(etcp->instance->data_pool, pkt->pkt_data);
}
memory_pool_free(etcp->rx_pool, pkt);
}
queue_free(etcp->input_queue);
etcp->input_queue = NULL;
}
// Drain and free output_queue (contains ETCP_FRAGMENT with pkt_data from data_pool)
if (etcp->output_queue) {
struct ETCP_FRAGMENT* pkt;
while ((pkt = queue_data_get(etcp->output_queue)) != NULL) {
if (pkt->pkt_data) {
memory_pool_free(etcp->instance->data_pool, pkt->pkt_data);
}
memory_pool_free(etcp->rx_pool, pkt);
}
queue_free(etcp->output_queue);
etcp->output_queue = NULL;
}
// Drain and free input_send_q (contains INFLIGHT_PACKET with pkt_data from data_pool)
if (etcp->input_send_q) {
struct INFLIGHT_PACKET* pkt;
while ((pkt = queue_data_get(etcp->input_send_q)) != NULL) {
if (pkt->pkt_data) {
memory_pool_free(etcp->instance->data_pool, pkt->pkt_data);
}
memory_pool_free(etcp->inflight_pool, pkt);
}
queue_free(etcp->input_send_q);
etcp->input_send_q = NULL;
}
// Drain and free input_wait_ack (contains INFLIGHT_PACKET with pkt_data from data_pool)
if (etcp->input_wait_ack) {
struct INFLIGHT_PACKET* pkt;
while ((pkt = queue_data_get(etcp->input_wait_ack)) != NULL) {
if (pkt->pkt_data) {
memory_pool_free(etcp->instance->data_pool, pkt->pkt_data);
}
memory_pool_free(etcp->inflight_pool, pkt);
}
queue_free(etcp->input_wait_ack);
etcp->input_wait_ack = NULL;
}
// Drain and free ack_q (contains ACK_PACKET from ack_pool)
if (etcp->ack_q) {
struct ACK_PACKET* pkt;
while ((pkt = queue_data_get(etcp->ack_q)) != NULL) {
queue_data_free(pkt);
}
queue_free(etcp->ack_q);
etcp->ack_q = NULL;
}
// Drain and free recv_q (contains ETCP_FRAGMENT with pkt_data from data_pool)
if (etcp->recv_q) {
struct ETCP_FRAGMENT* pkt;
while ((pkt = queue_data_get(etcp->recv_q)) != NULL) {
if (pkt->pkt_data) {
memory_pool_free(etcp->instance->data_pool, pkt->pkt_data);
}
memory_pool_free(etcp->rx_pool, pkt);
}
queue_free(etcp->recv_q);
etcp->recv_q = NULL;
}
// Free memory pools after all elements are returned
if (etcp->inflight_pool) {
memory_pool_destroy(etcp->inflight_pool);
etcp->inflight_pool = NULL;
}
if (etcp->rx_pool) {
memory_pool_destroy(etcp->rx_pool);
etcp->rx_pool = NULL;
}
// Clear links list safely
if (etcp->links) {
struct ETCP_LINK* link = etcp->links;
while (link) {
struct ETCP_LINK* next = link->next;
etcp_link_close(link);
link = next;
}
etcp->links = NULL;
}
// Clear next pointer to prevent dangling references
etcp->next = NULL;
// TODO: Free rx_list, etc.
free(etcp);
}
// Reset connection (stub)
void etcp_conn_reset(struct ETCP_CONN* etcp) {
// Reset IDs, queues, etc. as per protocol.txt
etcp->next_tx_id = 1;
etcp->last_rx_id = 0;
etcp->last_delivered_id = 0;
// Clear inflight, rx_list, etc.
}
// ====================================================================== Отправка данных
// Send data through ETCP connection
// Allocates memory from data_pool and places in input queue
// Returns: 0 on success, -1 on failure
int etcp_send(struct ETCP_CONN* etcp, const void* data, uint16_t len) {
// DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "etcp_send: ENTER etcp=%p, data=%p, len=%zu", etcp, data, len);
if (!etcp || !data || len == 0) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_send: invalid parameters (etcp=%p, data=%p, len=%zu)", etcp, data, len);
return -1;
}
if (!etcp->input_queue) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_send: input_queue is NULL for etcp=%p", etcp);
return -1;
}
// Check length against maximum packet size
if (len > PACKET_DATA_SIZE) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_send: packet too large (len=%zu, max=%d)", len, PACKET_DATA_SIZE);
return -1;
}
// Allocate packet data from data_pool (following ETCP reception pattern)
uint8_t* packet_data = memory_pool_alloc(etcp->instance->data_pool);
if (!packet_data) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_send: failed to allocate packet data from data_pool");
return -1;
}
// Copy user data to packet buffer
memcpy(packet_data, data, len);
// Create queue entry - this allocates ll_entry + data pointer
struct ETCP_FRAGMENT* pkt = queue_data_new_from_pool(etcp->rx_pool);
if (!pkt) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_send: failed to allocate queue entry");
memory_pool_free(etcp->instance->data_pool, packet_data);
return -1;
}
pkt->seq = 0; // Will be assigned by input_queue_cb
pkt->timestamp = 0; // Will be set by input_queue_cb
pkt->pkt_data = packet_data; // Point to data_pool allocation
pkt->ll.size = len; // размер packet_data
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_send: created PACKET %p with data %p (len=%zu)", pkt, packet_data, len);
// Add to input queue - input_queue_cb will process it
if (queue_data_put(etcp->input_queue, pkt, 0) != 0) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_send: failed to add to input queue");
memory_pool_free(etcp->instance->data_pool, packet_data);
memory_pool_free(etcp->rx_pool, pkt);
return -1;
}
return 0;
}
static void input_queue_try_resume(struct ETCP_CONN* etcp) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "input_queue_try_resume: ENTER etcp=%p", etcp);
// если размер input_wait_ack+input_send_q в байтах < optimal_inflight то resume сейчас.
size_t wait_ack_bytes = queue_total_bytes(etcp->input_wait_ack);
size_t send_q_bytes = queue_total_bytes(etcp->input_send_q);
size_t total_bytes = wait_ack_bytes + send_q_bytes;
if (total_bytes < etcp->optimal_inflight) {
queue_resume_callback(etcp->input_send_q);// вызвать лишний раз resume не страшно.
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "input_queue_try_resume: resumed input_send_q callback");
}
}
// Input callback for input_queue (добавление новых кодограмм в стек)
// input_queue -> input_send_q
static void input_queue_cb(struct ll_queue* q, void* arg) {
struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg;
struct ETCP_FRAGMENT* in_pkt = queue_data_get(q);
if (!in_pkt) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "input_queue_cb: cannot get element (pool=%p etcp=%p)", etcp->inflight_pool, etcp);
queue_resume_callback(q);
return;
}
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "input_queue_cb: processing ETCP_FRAGMENT %p (seq=%u, len=%u)", in_pkt, in_pkt->seq, in_pkt->ll.size);
memory_pool_free(etcp->rx_pool, in_pkt);// перемещаем из rx_pool в inflight_pool
// Create INFLIGHT_PACKET
struct INFLIGHT_PACKET* p = memory_pool_alloc(etcp->inflight_pool);
if (!p) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "input_queue_cb: cannot allocate INFLIGHT_PACKET (pool=%p etcp=%p)", etcp->inflight_pool, etcp);
queue_data_free(in_pkt); // Free the ETCP_FRAGMENT
queue_resume_callback(q);
return;
}
// Setup inflight packet (based on protocol.txt)
memset(p, 0, sizeof(*p));
p->seq = etcp->next_tx_id++; // Assign seq
p->state = INFLIGHT_STATE_WAIT_SEND;
p->last_timestamp = 0;
p->pkt_data = in_pkt->pkt_data;
p->ll.size = in_pkt->ll.size;
// Add to send queue
if (queue_data_put(etcp->input_send_q, p, p->seq) != 0) {
memory_pool_free(etcp->inflight_pool, p);
queue_data_free(in_pkt);
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "input_queue_cb: EXIT (queue put failed)");
return;
}
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "input_queue_cb: successfully moved from input_queue to input_send_q");
input_queue_try_resume(etcp);
// Resume input_queue callback to process next packet if any
queue_resume_callback(q);
}
static void input_send_q_cb(struct ll_queue* q, void* arg) {// etcp->input_send_q processing
struct ETCP_CONN* etcp=(struct ETCP_CONN*)arg;
size_t send_q_bytes = queue_total_bytes(etcp->input_send_q);
size_t send_q_pkts = queue_entry_count(etcp->input_send_q);
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "input_send_q_cb: input_send_q status: %d pkt %d bytes", send_q_pkts, send_q_bytes);
etcp_conn_process_send_queue(etcp);
}
static void ack_timeout_check(void* arg) {
struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg;
uint64_t now = get_current_time_units();
uint64_t timeout = 1000;//(uint64_t)(etcp->rtt_avg_10 * RETRANS_K1) + (uint64_t)(etcp->jitter * RETRANS_K2);
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "ack_timeout_check: starting check, now=%llu, timeout=%llu, rtt_avg_10=%u, jitter=%u",
(unsigned long long)now, (unsigned long long)timeout, etcp->rtt_avg_10, etcp->jitter);
struct ll_entry* current = etcp->input_wait_ack->head;
while (current) {
struct ll_entry* next = current->next; // Save next as we may remove current
struct INFLIGHT_PACKET* pkt = (struct INFLIGHT_PACKET*)current;
uint64_t elapsed = now - pkt->last_timestamp;
if (elapsed > timeout) {
DEBUG_WARN(DEBUG_CATEGORY_ETCP, "ack_timeout_check: timeout for seq=%u, elapsed=%llu, timeout=%llu, send_count=%u",
pkt->seq, (unsigned long long)elapsed, (unsigned long long)timeout, pkt->send_count);
// Increment counters
pkt->send_count++;
pkt->retrans_req_count++; // Optional, if used for retrans request logic
pkt->last_timestamp = now;
pkt->last_link = NULL; // Reset last link for re-selection
// Remove from wait_ack
queue_remove_data(etcp->input_wait_ack, pkt);
// Change state and add to send_q for retransmission
pkt->state = INFLIGHT_STATE_WAIT_SEND;
queue_data_put(etcp->input_send_q, pkt, pkt->seq);
// Update stats
etcp->retransmissions_count++;
}
else {// не надо до конца сканировать - они уже сортированы по таймстемпу т.к. очередь fifo, а timestamp = время добавления в очередь = время отправки
// shedule timer
int64_t next_timeout=timeout - elapsed;
etcp->retrans_timer = uasync_set_timeout(etcp->instance->ua, next_timeout+10, etcp, ack_timeout_check);
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "ack_timeout_check: rescheduled timer for %llu units", next_timeout);
break;
}
current = next;
}
}
static void wait_ack_cb(struct ll_queue* q, void* arg) {
struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg;
ack_timeout_check(etcp);
}
// Подготовить и отправить кодограмму
// вызывается линком когда освобождается или очередью если появляются данные на передачу
struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) {
struct ETCP_LINK* link = etcp_loadbalancer_select_link(etcp);
if (!link) {
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: no link available");
return NULL;// если линков нет - ждём появления свободного
}
size_t send_q_size = queue_entry_count(etcp->input_send_q);
if (send_q_size == 0) {// сгребаем из input_queue
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: input_send_q empty, check if avail input_queue -> inflight");
input_queue_try_resume(etcp);
// return NULL;
}
// First, check if there's a packet in input_send_q (retrans or new)
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: getting packet from input_send_q");
struct INFLIGHT_PACKET* inf_pkt = queue_data_get(etcp->input_send_q);
if (inf_pkt) {
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: prepare udp dgram for send packet %p (seq=%u, len=%u)", inf_pkt, inf_pkt->seq, inf_pkt->ll.size);
inf_pkt->last_timestamp=get_current_time_units();
inf_pkt->send_count++;
inf_pkt->state=INFLIGHT_STATE_WAIT_ACK;
queue_data_put(etcp->input_wait_ack, inf_pkt, inf_pkt->seq);// move dgram to wait_ack queue
}
size_t ack_q_size = queue_entry_count(etcp->ack_q);
if (!inf_pkt && ack_q_size == 0) {
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: no data/ack to send");
return NULL;
}
struct ETCP_DGRAM* dgram = memory_pool_alloc(etcp->instance->pkt_pool);
if (!dgram) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: failed to allocate ETCP_DGRAM");
return NULL;
}
dgram->link = link;
dgram->noencrypt_len=0;
dgram->timestamp=get_current_timestamp();
// формат ack: [01] [elements count] [4 байта last_delivered_id] и <[4 байта seq][2 байта recv_ts][2 байта txrx delay ts]> x count
dgram->data[0]=1;// ack
int ptr=2;
dgram->data[ptr++]=etcp->last_delivered_id;
dgram->data[ptr++]=etcp->last_delivered_id>>8;
dgram->data[ptr++]=etcp->last_delivered_id>>16;
dgram->data[ptr++]=etcp->last_delivered_id>>24;
// тут (потом) добавим опциональные заголовки
struct ACK_PACKET* ack_pkt;
while (ack_pkt = queue_data_get(etcp->ack_q)) {
// seq 4 байта
dgram->data[ptr++]=ack_pkt->seq;
dgram->data[ptr++]=ack_pkt->seq>>8;
dgram->data[ptr++]=ack_pkt->seq>>16;
dgram->data[ptr++]=ack_pkt->seq>>24;
// ts приема 2 байта
dgram->data[ptr++]=ack_pkt->recv_timestamp;
dgram->data[ptr++]=ack_pkt->recv_timestamp>>8;
// время задержки 2 байта между recv и ack
uint16_t dly=get_current_timestamp()-ack_pkt->recv_timestamp;
dgram->data[ptr++]=dly;
dgram->data[ptr++]=dly>>8;
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: add ACK N%d dTS=%d", ack_pkt->seq, dly);
queue_data_free(ack_pkt);
if (inf_pkt && inf_pkt->ll.size+ptr>=etcp->mtu-10) break;// pkt len (надо просчитать точнее включая все заголовки)
if (ptr>500) break;
}
dgram->data[1]=ptr/8;
if (inf_pkt) {
// фрейм data (0) обязательно в конец
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: packet with payload (seq=%u, len=%u), ack_size=%d", inf_pkt->seq, inf_pkt->ll.size, dgram->data[1]);
dgram->data[ptr++]=0;// payload
dgram->data[ptr++]=inf_pkt->seq;
dgram->data[ptr++]=inf_pkt->seq>>8;
dgram->data[ptr++]=inf_pkt->seq>>16;
dgram->data[ptr++]=inf_pkt->seq>>24;
memcpy(&dgram->data[ptr], inf_pkt->pkt_data, inf_pkt->ll.size); ptr+=inf_pkt->ll.size;
}
else {
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: built packet with %d bytes total", dgram->data_len);
}
dgram->data_len=ptr;
return dgram;
}
// Callback for when a link is ready to send data
static void etcp_link_ready_callback(struct ETCP_CONN* etcp) {
if (!etcp) return;
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_link_ready_callback: processing send queue for etcp=%p", etcp);
etcp_conn_process_send_queue(etcp);
}
// Process packets in send queue and transmit them
static void etcp_conn_process_send_queue(struct ETCP_CONN* etcp) {
struct ETCP_DGRAM* dgram;
while(dgram = etcp_request_pkt(etcp)) {
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_conn_process_send_queue: sending packet");
etcp_loadbalancer_send(dgram);
}
}
static void ack_response_timer_cb(void* arg) {// проверяем неотправленные ack response и отправляем если надо.
struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg;
etcp_conn_process_send_queue(etcp);// проталкиваем (она же должна отправлять только ack если больше ничего нет)
// если ack все еще заняты - обновляем таймаут
if (etcp->ack_q->count) etcp->ack_resp_timer = uasync_set_timeout(etcp->instance->ua, ACK_DELAY_TB, etcp, ack_response_timer_cb);
else etcp->ack_resp_timer=NULL;
}
// ====================================================================== Прием данных
void etcp_output_try_assembly(struct ETCP_CONN* etcp) {
// пробуем собрать выходную очередь из фрагментов
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_output_try_assembly: etcp=%p, last_delivered_id=%u, recv_q_count=%d",
etcp, etcp->last_delivered_id, queue_entry_count(etcp->recv_q));
uint32_t next_expected_id = etcp->last_delivered_id + 1;
int delivered_count = 0;
uint32_t delivered_bytes = 0;
// Look for contiguous packets starting from next_expected_id
while (1) {
struct ETCP_FRAGMENT* rx_pkt = queue_find_data_by_id(etcp->recv_q, next_expected_id);
if (!rx_pkt) {
// No more contiguous packets found
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_output_try_assembly: no packet found for id=%u, stopping", next_expected_id);
break;
}
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_output_try_assembly: assembling packet id=%u (len=%u)",
rx_pkt->seq, rx_pkt->ll.size);
// Simply move ETCP_FRAGMENT from recv_q to output_queue - no data copying needed
// Remove from recv_q first
queue_remove_data(etcp->recv_q, rx_pkt);
// Add to output_queue using the same ETCP_FRAGMENT structure
if (queue_data_put(etcp->output_queue, rx_pkt, next_expected_id) == 0) {
delivered_bytes += rx_pkt->ll.size;
delivered_count++;
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "etcp_output_try_assembly: moved packet id=%u to output_queue",
next_expected_id);
} else {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_output_try_assembly: failed to add packet id=%u to output_queue",
next_expected_id);
// Put it back in recv_q if we can't add to output_queue
queue_data_put(etcp->recv_q, rx_pkt, next_expected_id);
break;
}
// Update state for next iteration
etcp->last_delivered_id = next_expected_id;
next_expected_id++;
}
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_output_try_assembly: delivered %u contiguous packets (%u bytes), last_delivered_id=%u, output_queue_count=%d",
delivered_count, delivered_bytes, etcp->last_delivered_id, queue_entry_count(etcp->output_queue));
}
// Process ACK receipt - remove acknowledged packet from inflight queues
void etcp_ack_recv(struct ETCP_CONN* etcp, uint32_t seq, uint16_t ts, uint16_t dts) {
if (!etcp) return;
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_ack_recv: processing ACK for seq=%u, ts=%u, dts=%u", seq, ts, dts);
// Find the acknowledged packet in the wait_ack queue
struct INFLIGHT_PACKET* acked_pkt = queue_find_data_by_id(etcp->input_wait_ack, seq);
if (acked_pkt) queue_remove_data(etcp->input_wait_ack, acked_pkt);
else { acked_pkt = queue_find_data_by_id(etcp->input_send_q, seq);
queue_remove_data(etcp->input_send_q, acked_pkt);
}
if (!acked_pkt) {
// Packet might be already acknowledged or not found
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "etcp_ack_recv: packet seq=%u not found in wait_ack queue", seq);
return;
}
// Calculate RTT if timestamps are valid
if (ts != (uint16_t)-1 && dts != (uint16_t)-1) {
uint16_t rtt = timestamp_diff(ts, dts);
etcp->rtt_last = rtt;
// Update RTT averages (exponential smoothing)
if (etcp->rtt_avg_10 == 0) {
etcp->rtt_avg_10 = rtt;
etcp->rtt_avg_100 = rtt;
} else {
// RTT average over 10 packets
etcp->rtt_avg_10 = (etcp->rtt_avg_10 * 9 + rtt) / 10;
// RTT average over 100 packets
etcp->rtt_avg_100 = (etcp->rtt_avg_100 * 99 + rtt) / 100;
}
// Update jitter calculation (max - min of last 10 RTT samples)
etcp->rtt_history[etcp->rtt_history_idx] = rtt;
etcp->rtt_history_idx = (etcp->rtt_history_idx + 1) % 10;
uint16_t rtt_min = UINT16_MAX, rtt_max = 0;
for (int i = 0; i < 10; i++) {
if (etcp->rtt_history[i] < rtt_min) rtt_min = etcp->rtt_history[i];
if (etcp->rtt_history[i] > rtt_max) rtt_max = etcp->rtt_history[i];
}
etcp->jitter = rtt_max - rtt_min;
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_ack_recv: RTT updated - last=%u, avg_10=%u, avg_100=%u, jitter=%u",
rtt, etcp->rtt_avg_10, etcp->rtt_avg_100, etcp->jitter);
}
// Update connection statistics
etcp->unacked_bytes -= acked_pkt->ll.size;
etcp->bytes_sent_total += acked_pkt->ll.size;
etcp->ack_packets_count++;
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_ack_recv: removed packet seq=%u from wait_ack, unacked_bytes now %u", seq, etcp->unacked_bytes);
if (acked_pkt->pkt_data) {
memory_pool_free(etcp->instance->data_pool, acked_pkt->pkt_data);
}
memory_pool_free(etcp->inflight_pool, acked_pkt);
// Try to resume sending more packets if window space opened up
input_queue_try_resume(etcp);
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_ack_recv: completed for seq=%u", seq);
}
// Process incoming decrypted packet
void etcp_conn_input(struct ETCP_DGRAM* pkt) {
if (!pkt || !pkt->data_len) return;
struct ETCP_CONN* etcp = pkt->link->etcp;
uint8_t* data = pkt->data;
uint16_t len = pkt->data_len;
uint16_t ts = pkt->timestamp; // Received timestamp
// Note: Assume packet starts with sections after timestamp (but timestamp is already extracted in connections?).
// Protocol.txt: timestamp is first 2B, then sections.
// But in conn_input, pkt->data is after timestamp? Assume data starts with first section.
while (len >= 1) {
uint8_t type = data[0];
// Process sections as per protocol.txt
switch (type) {
case ETCP_SECTION_ACK: {
int elm_cnt=data[1];
uint32_t till=data[2] | (data[3]<<8) | (data[4]<<16) | (data[5]<<24);
data+=6;
for (int i=0; i<elm_cnt; i++) {
uint32_t seq=data[0] | (data[1]<<8) | (data[2]<<16) | (data[3]<<24);
uint16_t ts=data[4] | (data[5]<<8);
uint16_t dts=data[6] | (data[7]<<8);
etcp_ack_recv(etcp, seq, ts, dts);
data+=8;
}
while (etcp->rx_ack_till-till<0) { etcp->rx_ack_till++; etcp_ack_recv(etcp, etcp->rx_ack_till, -1, -1); }// подтверждаем всё по till
break;
}
case ETCP_SECTION_PAYLOAD: {
if (len>=5) {
// формируем ACK
struct ACK_PACKET* p = queue_data_new_from_pool(etcp->instance->ack_pool);
uint32_t seq=data[1] | (data[2]<<8) | (data[3]<<16) | (data[4]<<24);
p->seq=seq;
p->pkt_timestamp=pkt->timestamp;
p->recv_timestamp=get_current_timestamp();
queue_data_put(etcp->ack_q, p, p->seq);
if (etcp->ack_resp_timer == NULL) {
etcp->ack_resp_timer = uasync_set_timeout(etcp->instance->ua, ACK_DELAY_TB, etcp, ack_response_timer_cb);
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_conn_input: set ack_timer for delayed ACK send");
}
if ((int32_t)(etcp->last_delivered_id-seq)<0) if (queue_find_data_by_id(etcp->recv_q, seq)==NULL) {// проверяем есть ли пакет с этим seq
uint32_t pkt_len=len-5;
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_conn_input: adding packet seq=%u to recv_q (last_delivered_id=%u)", seq, etcp->last_delivered_id);
// отправляем пакет в очередь на сборку
uint8_t* payload_data = memory_pool_alloc(etcp->instance->data_pool);
struct ETCP_FRAGMENT* rx_pkt = queue_data_new_from_pool(etcp->rx_pool);
rx_pkt->seq=seq;
rx_pkt->timestamp=pkt->timestamp;
rx_pkt->pkt_data=payload_data;
rx_pkt->ll.size=pkt_len;
// Copy the actual payload data
memcpy(payload_data, data + 5, pkt_len);
queue_data_put(etcp->recv_q, rx_pkt, seq);
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_conn_input: packet seq=%u added to recv_q, calling assembly (last_delivered_id=%u)", seq, etcp->last_delivered_id);
if (etcp->last_delivered_id+1==seq) etcp_output_try_assembly(etcp);// пробуем собрать выходную очередь из фрагментов
}
}
len=0;
break;
}
default:
DEBUG_WARN(DEBUG_CATEGORY_ETCP, "etcp_conn_input: unknown section type=0x%02x", type);
len=0;
break;
}
}
memory_pool_free(etcp->instance->pkt_pool, pkt); // Free the incoming dgram
}