diff --git a/src/etcp.c b/src/etcp.c index 30d842b..6e6d46f 100644 --- a/src/etcp.c +++ b/src/etcp.c @@ -1,5 +1,5 @@ -// etcp.c - ETCP Protocol Implementation (refactored and expanded based on etcp_protocol.txt) - +// etcp.c - ETCP Protocol Implementation (refactored and expanded based on etcp_protocol.txt) + #include "etcp.h" #include "etcp_debug.h" #include "etcp_loadbalancer.h" @@ -9,30 +9,30 @@ #include "../lib/ll_queue.h" #include "../lib/debug_config.h" #include "crc32.h" // For potential hashing, though not used yet. -#include -#include -#include -#include // For bandwidth calcs -#include // For UINT16_MAX -#include "../lib/mem.h" - -// Enable comprehensive debug output for ETCP module -#define DEBUG_CATEGORY_ETCP_DETAILED 1 - -// Constants from spec (adjusted for completeness) -#define MAX_INFLIGHT_BYTES 65536 // Initial window -#define RETRANS_K1 2.0f // RTT multiplier for retrans timeout -#define RETRANS_K2 1.5f // Jitter multiplier -#define ACK_DELAY_TB 20 // ACK timer delay (2ms in 0.1ms units) -#define BURST_DELAY_FACTOR 4 // Delay before burst -#define BURST_SIZE 5 // Packets in burst (1 delayed + 4 burst) -#define RTT_HISTORY_SIZE 10 // For jitter calc -#define MAX_PENDING 32 // For ACKs/retrans (arbitrary; adjust) -#define SECTION_HEADER_SIZE 3 // type(1) + len(2) - -// Container-of macro for getting struct from data pointer -//#define CONTAINER_OF(ptr, type, member) ((type *)((char *)(ptr) - offsetof(type, member))) - +#include +#include +#include +#include // For bandwidth calcs +#include // For UINT16_MAX +#include "../lib/mem.h" + +// Enable comprehensive debug output for ETCP module +#define DEBUG_CATEGORY_ETCP_DETAILED 1 + +// Constants from spec (adjusted for completeness) +#define MAX_INFLIGHT_BYTES 65536 // Initial window +#define RETRANS_K1 2.0f // RTT multiplier for retrans timeout +#define RETRANS_K2 1.5f // Jitter multiplier +#define ACK_DELAY_TB 20 // ACK timer delay (2ms in 0.1ms units) +#define BURST_DELAY_FACTOR 4 // Delay before burst +#define BURST_SIZE 5 // Packets in burst (1 delayed + 4 burst) +#define RTT_HISTORY_SIZE 10 // For jitter calc +#define MAX_PENDING 32 // For ACKs/retrans (arbitrary; adjust) +#define SECTION_HEADER_SIZE 3 // type(1) + len(2) + +// Container-of macro for getting struct from data pointer +//#define CONTAINER_OF(ptr, type, member) ((type *)((char *)(ptr) - offsetof(type, member))) + // Forward declarations static void input_queue_cb(struct ll_queue* q, void* arg); static void etcp_link_ready_callback(struct ETCP_CONN* etcp); @@ -91,75 +91,75 @@ static void drain_and_free_inflight_queue(struct ETCP_CONN* etcp, struct ll_queu queue_free(*q); *q = NULL; } - + uint16_t get_current_timestamp() { - return (uint16_t)get_time_tb(); -} - + return (uint16_t)get_time_tb(); +} + // Timestamp diff (with wrap-around) static uint16_t timestamp_diff(uint16_t t1, uint16_t t2) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); if (t1 >= t2) { - return t1 - t2; - } - return (UINT16_MAX - t2) + t1 + 1; -} - + return t1 - t2; + } + return (UINT16_MAX - t2) + t1 + 1; +} + // Create new ETCP connection struct ETCP_CONN* etcp_connection_create(struct UTUN_INSTANCE* instance) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); if (!instance) return NULL; - - - struct ETCP_CONN* etcp = u_calloc(1, sizeof(struct ETCP_CONN)); - if (!etcp) { - DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "etcp_connection_create: creating connection failed for instance %p", instance); - return NULL; - } - - etcp->instance = instance; - etcp->input_queue = queue_new(instance->ua, 0, "ETCP input"); // No hash for input_queue - etcp->output_queue = queue_new(instance->ua, 0, "ETCP output"); // No hash for output_queue - etcp->input_send_q = queue_new(instance->ua, INFLIGHT_INITIAL_HASH_SIZE, "input_send_q"); // Hash for send_q - etcp->input_wait_ack = queue_new(instance->ua, INFLIGHT_INITIAL_HASH_SIZE, "input_wait_ack"); // Hash for wait_ack - etcp->recv_q = queue_new(instance->ua, INFLIGHT_INITIAL_HASH_SIZE, "recv_q"); // Hash for send_q - etcp->ack_q = queue_new(instance->ua, 0, "ack_q"); - etcp->inflight_pool = memory_pool_init(sizeof(struct INFLIGHT_PACKET)); - etcp->io_pool = memory_pool_init(sizeof(struct ETCP_FRAGMENT)); - etcp->optimal_inflight=10000; - // Initialize log_name with local node_id (peer will be updated later when known) - snprintf(etcp->log_name, sizeof(etcp->log_name), "%04llu->????", - (unsigned long long)(instance->node_id % 10000)); - - if (!etcp->input_queue || !etcp->output_queue || !etcp->input_send_q || !etcp->recv_q || !etcp->ack_q || - !etcp->input_wait_ack || !etcp->inflight_pool || !etcp->io_pool) { - DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "etcp_connection_create: error - closing - input:%p output:%p send:%p wait:%x pool:%p", - etcp->input_queue, etcp->output_queue, etcp->input_send_q, etcp->input_wait_ack, etcp->inflight_pool); - etcp_connection_close(etcp); - return NULL; - } + + + struct ETCP_CONN* etcp = u_calloc(1, sizeof(struct ETCP_CONN)); + if (!etcp) { + DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "etcp_connection_create: creating connection failed for instance %p", instance); + return NULL; + } + + etcp->instance = instance; + etcp->input_queue = queue_new(instance->ua, 0, "ETCP input"); // No hash for input_queue + etcp->output_queue = queue_new(instance->ua, 0, "ETCP output"); // No hash for output_queue + etcp->input_send_q = queue_new(instance->ua, INFLIGHT_INITIAL_HASH_SIZE, "input_send_q"); // Hash for send_q + etcp->input_wait_ack = queue_new(instance->ua, INFLIGHT_INITIAL_HASH_SIZE, "input_wait_ack"); // Hash for wait_ack + etcp->recv_q = queue_new(instance->ua, INFLIGHT_INITIAL_HASH_SIZE, "recv_q"); // Hash for send_q + etcp->ack_q = queue_new(instance->ua, 0, "ack_q"); + etcp->inflight_pool = memory_pool_init(sizeof(struct INFLIGHT_PACKET)); + etcp->io_pool = memory_pool_init(sizeof(struct ETCP_FRAGMENT)); + etcp->optimal_inflight=10000; + // Initialize log_name with local node_id (peer will be updated later when known) + snprintf(etcp->log_name, sizeof(etcp->log_name), "%04llu->????", + (unsigned long long)(instance->node_id % 10000)); + + if (!etcp->input_queue || !etcp->output_queue || !etcp->input_send_q || !etcp->recv_q || !etcp->ack_q || + !etcp->input_wait_ack || !etcp->inflight_pool || !etcp->io_pool) { + DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "etcp_connection_create: error - closing - input:%p output:%p send:%p wait:%x pool:%p", + etcp->input_queue, etcp->output_queue, etcp->input_send_q, etcp->input_wait_ack, etcp->inflight_pool); + etcp_connection_close(etcp); + return NULL; + } // etcp->window_size = MAX_INFLIGHT_BYTES; // Not used etcp->mtu = 1500; // Default MTU etcp->next_tx_id = 1; - etcp->rtt_avg_10 = 10; // Initial guess (1ms) - etcp->rtt_avg_100 = 10; - etcp->rtt_history_idx = 0; - memset(etcp->rtt_history, 0, sizeof(etcp->rtt_history)); - - - etcp->normalizer = pn_init(etcp); - if (!etcp->normalizer) { - etcp_connection_close(etcp); - return NULL; - } - - // Set input queue callback - queue_set_callback(etcp->input_queue, input_queue_cb, etcp); - queue_set_callback(etcp->input_send_q, input_send_q_cb, etcp); - queue_set_callback(etcp->input_wait_ack, wait_ack_cb, etcp); - + etcp->rtt_avg_10 = 10; // Initial guess (1ms) + etcp->rtt_avg_100 = 10; + etcp->rtt_history_idx = 0; + memset(etcp->rtt_history, 0, sizeof(etcp->rtt_history)); + + + etcp->normalizer = pn_init(etcp); + if (!etcp->normalizer) { + etcp_connection_close(etcp); + return NULL; + } + + // Set input queue callback + queue_set_callback(etcp->input_queue, input_queue_cb, etcp); + queue_set_callback(etcp->input_send_q, input_send_q_cb, etcp); + queue_set_callback(etcp->input_wait_ack, wait_ack_cb, etcp); + etcp->link_ready_for_send_fn = etcp_link_ready_callback; // Register with routing module (must be done after normalizer is assigned) @@ -170,7 +170,7 @@ struct ETCP_CONN* etcp_connection_create(struct UTUN_INSTANCE* instance) { return etcp; } - + // Close connection with NULL pointer safety (prevents double free) void etcp_connection_close(struct ETCP_CONN* etcp) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); @@ -215,36 +215,36 @@ void etcp_connection_close(struct ETCP_CONN* etcp) { queue_free(etcp->ack_q); etcp->ack_q = NULL; } - - // Free memory pools after all elements are returned - if (etcp->inflight_pool) { - memory_pool_destroy(etcp->inflight_pool); - etcp->inflight_pool = NULL; - } - - if (etcp->io_pool) { - memory_pool_destroy(etcp->io_pool); - etcp->io_pool = NULL; - } - - // Clear links list safely - if (etcp->links) { - struct ETCP_LINK* link = etcp->links; - while (link) { - struct ETCP_LINK* next = link->next; - etcp_link_close(link); - link = next; - } - etcp->links = NULL; - } - - // Clear next pointer to prevent dangling references - etcp->next = NULL; - - // TODO: Free rx_list, etc. - u_free(etcp); -} - + + // Free memory pools after all elements are returned + if (etcp->inflight_pool) { + memory_pool_destroy(etcp->inflight_pool); + etcp->inflight_pool = NULL; + } + + if (etcp->io_pool) { + memory_pool_destroy(etcp->io_pool); + etcp->io_pool = NULL; + } + + // Clear links list safely + if (etcp->links) { + struct ETCP_LINK* link = etcp->links; + while (link) { + struct ETCP_LINK* next = link->next; + etcp_link_close(link); + link = next; + } + etcp->links = NULL; + } + + // Clear next pointer to prevent dangling references + etcp->next = NULL; + + // TODO: Free rx_list, etc. + u_free(etcp); +} + // Reset connection void etcp_conn_reset(struct ETCP_CONN* etcp) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); @@ -255,6 +255,9 @@ void etcp_conn_reset(struct ETCP_CONN* etcp) { etcp->last_delivered_id = 0; etcp->rx_ack_till = 0; + // Устанавливаем флаг ожидания первого пакета + etcp->got_initial_pkt = 0; + // Reset metrics etcp->unacked_bytes = 0; etcp->rtt_last = 0; @@ -288,6 +291,23 @@ void etcp_conn_reset(struct ETCP_CONN* etcp) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "end"); } +void etcp_conn_reinit(struct ETCP_CONN* etcp) { + DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); + DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Reinitializing ETCP connection [%s]", etcp->log_name); + + // Сбрасываем initialized во всех линках + struct ETCP_LINK* link = etcp->links; + while (link) { + link->initialized = 0; + link = link->next; + } + + // Вызываем etcp_conn_reset для сброса состояния + etcp_conn_reset(etcp); + + DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "end"); +} + // Update log_name when peer_node_id becomes known void etcp_update_log_name(struct ETCP_CONN* etcp) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); @@ -300,42 +320,42 @@ void etcp_update_log_name(struct ETCP_CONN* etcp) { // ====================================================================== Отправка данных - + // Send data through ETCP connection // Allocates memory from data_pool and places in input queue // Returns: 0 on success, -1 on failure int etcp_int_send(struct ETCP_CONN* etcp, const void* data, uint16_t len) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); - + if (!etcp || !data || len == 0) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] invalid parameters (etcp=%p, data=%p, len=%zu)", etcp->log_name, etcp, data, len); return -1; } - + if (!etcp->input_queue) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] input_queue is NULL for etcp=%p", etcp->log_name, etcp); return -1; } - + // Check length against maximum packet size if (len > PACKET_DATA_SIZE) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] packet too large (len=%zu, max=%d)", etcp->log_name, len, PACKET_DATA_SIZE); return -1; } - + // Allocate packet data from data_pool (following ETCP reception pattern) uint8_t* packet_data = memory_pool_alloc(etcp->instance->data_pool); if (!packet_data) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to allocate packet data from data_pool", etcp->log_name); return -1; } - - // Copy user data to packet buffer - memcpy(packet_data, data, len); - - // Create queue entry - this allocates ll_entry + data pointer - - + + // Copy user data to packet buffer + memcpy(packet_data, data, len); + + // Create queue entry - this allocates ll_entry + data pointer + + struct ETCP_FRAGMENT* pkt = (struct ETCP_FRAGMENT*)queue_entry_new_from_pool(etcp->io_pool); if (!pkt) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to allocate queue entry", etcp->log_name); @@ -349,7 +369,7 @@ int etcp_int_send(struct ETCP_CONN* etcp, const void* data, uint16_t len) { pkt->ll.len = len; // размер packet_data DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] created PACKET %p with data %p (len=%zu)", etcp->log_name, pkt, packet_data, len); - + // Add to input queue - input_queue_cb will process it if (queue_data_put(etcp->input_queue, (struct ll_entry*)pkt, 0) != 0) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to add to input queue", etcp->log_name); @@ -357,20 +377,20 @@ int etcp_int_send(struct ETCP_CONN* etcp, const void* data, uint16_t len) { memory_pool_free(etcp->io_pool, pkt); return -1; } - - return 0; -} - - - + + return 0; +} + + + static void input_queue_try_resume(struct ETCP_CONN* etcp) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); - - // если размер input_wait_ack+input_send_q в байтах < optimal_inflight то resume сейчас. - size_t wait_ack_bytes = queue_total_bytes(etcp->input_wait_ack); - size_t send_q_bytes = queue_total_bytes(etcp->input_send_q); - size_t total_bytes = wait_ack_bytes + send_q_bytes; - + + // если размер input_wait_ack+input_send_q в байтах < optimal_inflight то resume сейчас. + size_t wait_ack_bytes = queue_total_bytes(etcp->input_wait_ack); + size_t send_q_bytes = queue_total_bytes(etcp->input_send_q); + size_t total_bytes = wait_ack_bytes + send_q_bytes; + if (total_bytes <= etcp->optimal_inflight) { DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] resume callbacks: inflight_bytes=%d, input_len=%d", etcp->log_name, total_bytes, etcp->input_queue->total_bytes); queue_resume_callback(etcp->input_send_q);// вызвать лишний раз resume не страшно. @@ -378,7 +398,7 @@ static void input_queue_try_resume(struct ETCP_CONN* etcp) { if (queue_entry_count(etcp->input_send_q) == 0) queue_resume_callback(etcp->input_queue);// и только когда больше нечего отправлять - забираем новый пакет } } - + void etcp_stats(struct ETCP_CONN* etcp) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); if (!etcp) return; @@ -421,23 +441,23 @@ void etcp_stats(struct ETCP_CONN* etcp) { DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] last_delivered_id:%u", etcp->log_name, etcp->last_delivered_id); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] rx_ack_till: %u", etcp->log_name, etcp->rx_ack_till); } - + // Input callback for input_queue (добавление новых кодограмм в стек) // input_queue -> input_send_q static void input_queue_cb(struct ll_queue* q, void* arg) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg; struct ETCP_FRAGMENT* in_pkt = (struct ETCP_FRAGMENT*)queue_data_get(q); - + if (!in_pkt) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] cannot get element (pool=%p etcp=%p)", etcp->log_name, etcp->inflight_pool, etcp); queue_resume_callback(q); return; } - - - memory_pool_free(etcp->io_pool, in_pkt);// перемещаем из io_pool в inflight_pool - + + + memory_pool_free(etcp->io_pool, in_pkt);// перемещаем из io_pool в inflight_pool + // Create INFLIGHT_PACKET struct INFLIGHT_PACKET* p = memory_pool_alloc(etcp->inflight_pool); if (!p) { @@ -446,19 +466,19 @@ static void input_queue_cb(struct ll_queue* q, void* arg) { queue_resume_callback(q); return; } - - // Setup inflight packet (based on protocol.txt) - memset(p, 0, sizeof(*p)); - p->seq = etcp->next_tx_id++; // Assign seq - p->state = INFLIGHT_STATE_WAIT_SEND; - p->last_timestamp = 0; + + // Setup inflight packet (based on protocol.txt) + memset(p, 0, sizeof(*p)); + p->seq = etcp->next_tx_id++; // Assign seq + p->state = INFLIGHT_STATE_WAIT_SEND; + p->last_timestamp = 0; p->ll.dgram = in_pkt->ll.dgram; p->ll.dgram_pool = in_pkt->ll.dgram_pool; p->ll.len = in_pkt->ll.len; - - DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] input -> inflight (seq=%u, len=%u)", etcp->log_name, p->seq, p->ll.len); - int len=p->ll.len;// сохраним len - + + DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] input -> inflight (seq=%u, len=%u)", etcp->log_name, p->seq, p->ll.len); + int len=p->ll.len;// сохраним len + // Add to send queue if (queue_data_put(etcp->input_send_q, (struct ll_entry*)p, p->seq) != 0) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to add packet seq=%u to input_send_q", etcp->log_name, p->seq); @@ -468,35 +488,35 @@ static void input_queue_cb(struct ll_queue* q, void* arg) { return; } etcp->unacked_bytes += len; - -// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "input_queue_cb: successfully moved from input_queue to input_send_q"); - - etcp_conn_process_send_queue(etcp);// сразу обработаем этот пакет -// input_queue_try_resume(etcp); - DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] nextloop, input_queue size=%d ", etcp->log_name, q->count); -} - - + +// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "input_queue_cb: successfully moved from input_queue to input_send_q"); + + etcp_conn_process_send_queue(etcp);// сразу обработаем этот пакет +// input_queue_try_resume(etcp); + DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] nextloop, input_queue size=%d ", etcp->log_name, q->count); +} + + static void input_send_q_cb(struct ll_queue* q, void* arg) {// etcp->input_send_q processing DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); struct ETCP_CONN* etcp=(struct ETCP_CONN*)arg; - -// size_t send_q_bytes = queue_total_bytes(etcp->input_send_q); -// size_t send_q_pkts = queue_entry_count(etcp->input_send_q); -// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "input_send_q_cb: input_send_q status: %d pkt %d bytes", send_q_pkts, send_q_bytes); - etcp_conn_process_send_queue(etcp); - } - - + +// size_t send_q_bytes = queue_total_bytes(etcp->input_send_q); +// size_t send_q_pkts = queue_entry_count(etcp->input_send_q); +// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "input_send_q_cb: input_send_q status: %d pkt %d bytes", send_q_pkts, send_q_bytes); + etcp_conn_process_send_queue(etcp); + } + + static void ack_timeout_check(void* arg) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg; uint64_t now = get_time_tb(); - uint64_t timeout = 1000;//(uint64_t)(etcp->rtt_avg_10 * RETRANS_K1) + (uint64_t)(etcp->jitter * RETRANS_K2); - -// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "ack_timeout_check: starting check, now=%llu, timeout=%llu, rtt_avg_10=%u, jitter=%u", -// (unsigned long long)now, (unsigned long long)timeout, etcp->rtt_avg_10, etcp->jitter); - + uint64_t timeout = 1000;//(uint64_t)(etcp->rtt_avg_10 * RETRANS_K1) + (uint64_t)(etcp->jitter * RETRANS_K2); + +// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "ack_timeout_check: starting check, now=%llu, timeout=%llu, rtt_avg_10=%u, jitter=%u", +// (unsigned long long)now, (unsigned long long)timeout, etcp->rtt_avg_10, etcp->jitter); + struct ll_entry* current; while (current = etcp->input_wait_ack->head) { struct INFLIGHT_PACKET* pkt = (struct INFLIGHT_PACKET*)current; @@ -521,42 +541,42 @@ static void ack_timeout_check(void* arg) { if (elapsed > timeout) { DEBUG_WARN(DEBUG_CATEGORY_ETCP, "[%s] ack_timeout_check: timeout for seq=%u, elapsed=%llu, timeout=%llu, send_count=%u. Moving: wait_ack -> send_q", etcp->log_name, pkt->seq, (unsigned long long)elapsed, (unsigned long long)timeout, pkt->send_count); - - // Increment counters - pkt->send_count++; - pkt->retrans_req_count++; // Optional, if used for retrans request logic - pkt->last_timestamp = now; - pkt->last_link = NULL; // Reset last link for re-selection - + + // Increment counters + pkt->send_count++; + pkt->retrans_req_count++; // Optional, if used for retrans request logic + pkt->last_timestamp = now; + pkt->last_link = NULL; // Reset last link for re-selection + // Remove from wait_ack queue_data_get(etcp->input_wait_ack); // Change state and add to send_q for retransmission pkt->state = INFLIGHT_STATE_WAIT_SEND; queue_data_put(etcp->input_send_q, (struct ll_entry*)pkt, pkt->seq); - - // Update stats - etcp->retransmissions_count++; - } - else {// не надо до конца сканировать - они уже сортированы по таймстемпу т.к. очередь fifo, а timestamp = время добавления в очередь = время отправки - // shedule timer - int64_t next_timeout=timeout - elapsed; - etcp->retrans_timer = uasync_set_timeout(etcp->instance->ua, next_timeout+10, etcp, ack_timeout_check); - DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] ack_timeout_check: retransmission timer set for %llu units", etcp->log_name, next_timeout); - return; - } - } - // если всё выгребли - надо взвести resume - etcp->retrans_timer = NULL; - queue_resume_callback(etcp->input_wait_ack); -} - + + // Update stats + etcp->retransmissions_count++; + } + else {// не надо до конца сканировать - они уже сортированы по таймстемпу т.к. очередь fifo, а timestamp = время добавления в очередь = время отправки + // shedule timer + int64_t next_timeout=timeout - elapsed; + etcp->retrans_timer = uasync_set_timeout(etcp->instance->ua, next_timeout+10, etcp, ack_timeout_check); + DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] ack_timeout_check: retransmission timer set for %llu units", etcp->log_name, next_timeout); + return; + } + } + // если всё выгребли - надо взвести resume + etcp->retrans_timer = NULL; + queue_resume_callback(etcp->input_wait_ack); +} + static void wait_ack_cb(struct ll_queue* q, void* arg) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg; ack_timeout_check(etcp);// ack_cb срабатывает когда init (таймер не инициализирован) или когда empty (таймер не активен) -} - +} + // Подготовить и отправить кодограмму // вызывается линком когда освобождается или очередью если появляются данные на передачу struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) { @@ -566,15 +586,15 @@ struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) { DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] no link available", etcp->log_name); return NULL;// если линков нет - ждём появления свободного } - - size_t send_q_size = queue_entry_count(etcp->input_send_q); - - if (send_q_size == 0) {// сгребаем из input_queue -// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: input_send_q empty, check if avail input_queue -> inflight"); - input_queue_try_resume(etcp); -// return NULL; - } - + + size_t send_q_size = queue_entry_count(etcp->input_send_q); + + if (send_q_size == 0) {// сгребаем из input_queue +// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: input_send_q empty, check if avail input_queue -> inflight"); + input_queue_try_resume(etcp); +// return NULL; + } + // First, check if there's a packet in input_send_q (retrans or new) // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: getting packet from input_send_q"); struct INFLIGHT_PACKET* inf_pkt = (struct INFLIGHT_PACKET*)queue_data_get(etcp->input_send_q); @@ -586,39 +606,39 @@ struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) { inf_pkt->state=INFLIGHT_STATE_WAIT_ACK; queue_data_put(etcp->input_wait_ack, (struct ll_entry*)inf_pkt, inf_pkt->seq);// move dgram to wait_ack queue } - - size_t ack_q_size = queue_entry_count(etcp->ack_q); - + + size_t ack_q_size = queue_entry_count(etcp->ack_q); + if (!inf_pkt && ack_q_size == 0) { DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] no data/ack to send", etcp->log_name); return NULL; } - + struct ETCP_DGRAM* dgram = memory_pool_alloc(etcp->instance->pkt_pool); if (!dgram) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to allocate ETCP_DGRAM", etcp->log_name); return NULL; } - - dgram->link = link; - dgram->noencrypt_len=0; - dgram->timestamp=get_current_timestamp(); - -// формат ack: [01] [elements count] [4 байта last_delivered_id] и <[4 байта seq][2 байта recv_ts][2 байта txrx delay ts]> x count - dgram->data[0]=1;// ack - int ptr=2; - - dgram->data[ptr++]=etcp->last_delivered_id; - dgram->data[ptr++]=etcp->last_delivered_id>>8; - dgram->data[ptr++]=etcp->last_delivered_id>>16; - dgram->data[ptr++]=etcp->last_delivered_id>>24; - - int data_len=0; - if (inf_pkt) data_len=inf_pkt->ll.len; - int remain_len=link->mtu -28/*udp headers*/ -13-8-4/*sc_nonce+tag size+crc*/ -9/*hdr[3] + min ack[6]*/ -5/*payload hdr*/ - data_len; -// DEBUG_ERROR(DEBUG_CATEGORY_CRYPTO, "remain_len= %d pl=%d", remain_len, data_len); - -// добавим опциональные заголовки + + dgram->link = link; + dgram->noencrypt_len=0; + dgram->timestamp=get_current_timestamp(); + +// формат ack: [01] [elements count] [4 байта last_delivered_id] и <[4 байта seq][2 байта recv_ts][2 байта txrx delay ts]> x count + dgram->data[0]=1;// ack + int ptr=2; + + dgram->data[ptr++]=etcp->last_delivered_id; + dgram->data[ptr++]=etcp->last_delivered_id>>8; + dgram->data[ptr++]=etcp->last_delivered_id>>16; + dgram->data[ptr++]=etcp->last_delivered_id>>24; + + int data_len=0; + if (inf_pkt) data_len=inf_pkt->ll.len; + int remain_len=link->mtu -28/*udp headers*/ -13-8-4/*sc_nonce+tag size+crc*/ -9/*hdr[3] + min ack[6]*/ -5/*payload hdr*/ - data_len; +// DEBUG_ERROR(DEBUG_CATEGORY_CRYPTO, "remain_len= %d pl=%d", remain_len, data_len); + +// добавим опциональные заголовки struct ACK_PACKET* ack_pkt; while (remain_len>=8) { ack_pkt = (struct ACK_PACKET*)queue_data_get(etcp->ack_q); @@ -640,98 +660,98 @@ struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) { dgram->data[ptr++]=dly>>8; DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] add ACK N%d dTS=%d", etcp->log_name, ack_pkt->seq, dly); queue_entry_free((struct ll_entry*)ack_pkt); - - if (inf_pkt && inf_pkt->ll.len+ptr>=etcp->mtu-10) break;// pkt len (надо просчитать точнее включая все заголовки) - if (ptr>500) break; - } - - dgram->data[1]=ptr/8; - - - if (link->last_recv_updated && remain_len>=5) {// если есть данные - добавим channel_timestamp - uint64_t now=get_time_tb(); - uint64_t dt=now - link->last_recv_local_time; - link->last_recv_updated=0; - if (dt<1000000) { - dgram->data[ptr++]=ETCP_SECTION_TIMESTAMP; - - uint16_t t=link->last_recv_timestamp + dt; - dgram->data[ptr++]=t; - dgram->data[ptr++]=t>>8; - - t=link->last_recv_local_time - link->last_recv_timestamp; - dgram->data[ptr++]=t; - dgram->data[ptr++]=t>>8; - remain_len-=5; - } - } - -// DEBUG_ERROR(DEBUG_CATEGORY_CRYPTO, "remain_len(2)= %d", remain_len); - + + if (inf_pkt && inf_pkt->ll.len+ptr>=etcp->mtu-10) break;// pkt len (надо просчитать точнее включая все заголовки) + if (ptr>500) break; + } + + dgram->data[1]=ptr/8; + + + if (link->last_recv_updated && remain_len>=5) {// если есть данные - добавим channel_timestamp + uint64_t now=get_time_tb(); + uint64_t dt=now - link->last_recv_local_time; + link->last_recv_updated=0; + if (dt<1000000) { + dgram->data[ptr++]=ETCP_SECTION_TIMESTAMP; + + uint16_t t=link->last_recv_timestamp + dt; + dgram->data[ptr++]=t; + dgram->data[ptr++]=t>>8; + + t=link->last_recv_local_time - link->last_recv_timestamp; + dgram->data[ptr++]=t; + dgram->data[ptr++]=t>>8; + remain_len-=5; + } + } + +// DEBUG_ERROR(DEBUG_CATEGORY_CRYPTO, "remain_len(2)= %d", remain_len); + if (inf_pkt) { // фрейм data (0) обязательно в конец DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] add DATA (seq=%u, len=%u), ack_size=%d", etcp->log_name, inf_pkt->seq, inf_pkt->ll.len, dgram->data[1]); - dgram->data[ptr++]=0;// payload - dgram->data[ptr++]=inf_pkt->seq; - dgram->data[ptr++]=inf_pkt->seq>>8; - dgram->data[ptr++]=inf_pkt->seq>>16; - dgram->data[ptr++]=inf_pkt->seq>>24; - - memcpy(&dgram->data[ptr], inf_pkt->ll.dgram, inf_pkt->ll.len); ptr+=inf_pkt->ll.len; - } + dgram->data[ptr++]=0;// payload + dgram->data[ptr++]=inf_pkt->seq; + dgram->data[ptr++]=inf_pkt->seq>>8; + dgram->data[ptr++]=inf_pkt->seq>>16; + dgram->data[ptr++]=inf_pkt->seq>>24; + + memcpy(&dgram->data[ptr], inf_pkt->ll.dgram, inf_pkt->ll.len); ptr+=inf_pkt->ll.len; + } else { int chk=queue_check_consistency(etcp->ack_q); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] only ACK (size=%d) packet with %d bytes total (chk=%d) rem=%d", etcp->log_name, ack_q_size, ptr, chk, remain_len); } if (ptr>=PACKET_DATA_SIZE-50) DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] SIZE ERROR!!! %d", ptr); - + dgram->data_len=ptr; - + etcp_dump_pkt_sections(dgram, link, 1); - + return dgram; -} - +} + // Callback for when a link is ready to send data static void etcp_link_ready_callback(struct ETCP_CONN* etcp) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); if (!etcp) return; -// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_link_ready_callback: processing send queue for etcp=%p", etcp); - etcp_conn_process_send_queue(etcp); -} - +// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_link_ready_callback: processing send queue for etcp=%p", etcp); + etcp_conn_process_send_queue(etcp); +} + // Process packets in send queue and transmit them static void etcp_conn_process_send_queue(struct ETCP_CONN* etcp) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); struct ETCP_DGRAM* dgram; - while(dgram = etcp_request_pkt(etcp)) { -// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_conn_process_send_queue: sending packet"); - etcp_loadbalancer_send(dgram); - } -} - + while(dgram = etcp_request_pkt(etcp)) { +// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_conn_process_send_queue: sending packet"); + etcp_loadbalancer_send(dgram); + } +} + static void ack_response_timer_cb(void* arg) {// проверяем неотправленные ack response и отправляем если надо. DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg; - etcp_conn_process_send_queue(etcp);// проталкиваем (она же должна отправлять только ack если больше ничего нет) -// если ack все еще заняты - обновляем таймаут - if (etcp->ack_q->count) etcp->ack_resp_timer = uasync_set_timeout(etcp->instance->ua, ACK_DELAY_TB, etcp, ack_response_timer_cb); - else etcp->ack_resp_timer=NULL; -} - -// ====================================================================== Прием данных - - + etcp_conn_process_send_queue(etcp);// проталкиваем (она же должна отправлять только ack если больше ничего нет) +// если ack все еще заняты - обновляем таймаут + if (etcp->ack_q->count) etcp->ack_resp_timer = uasync_set_timeout(etcp->instance->ua, ACK_DELAY_TB, etcp, ack_response_timer_cb); + else etcp->ack_resp_timer=NULL; +} + +// ====================================================================== Прием данных + + void etcp_output_try_assembly(struct ETCP_CONN* etcp) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); // пробуем собрать выходную очередь из фрагментов // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] etcp=%p, last_delivered_id=%u, recv_q_count=%d", // etcp->log_name, etcp, etcp->last_delivered_id, queue_entry_count(etcp->recv_q)); - - uint32_t next_expected_id = etcp->last_delivered_id + 1; - int delivered_count = 0; - uint32_t delivered_bytes = 0; - + + uint32_t next_expected_id = etcp->last_delivered_id + 1; + int delivered_count = 0; + uint32_t delivered_bytes = 0; + // Look for contiguous packets starting from next_expected_id while (1) { struct ETCP_FRAGMENT* rx_pkt = (struct ETCP_FRAGMENT*)queue_find_data_by_id(etcp->recv_q, next_expected_id); @@ -761,107 +781,107 @@ void etcp_output_try_assembly(struct ETCP_CONN* etcp) { queue_data_put(etcp->recv_q, (struct ll_entry*)rx_pkt, next_expected_id); break; } - - // Update state for next iteration - etcp->last_delivered_id = next_expected_id; - next_expected_id++; - } - + + // Update state for next iteration + etcp->last_delivered_id = next_expected_id; + next_expected_id++; + } + DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] delivered %u contiguous packets (%u bytes), last_delivered_id=%u, output_queue_count=%d", etcp->log_name, delivered_count, delivered_bytes, etcp->last_delivered_id, queue_entry_count(etcp->output_queue)); } - + // Process ACK receipt - remove acknowledged packet from inflight queues void etcp_ack_recv(struct ETCP_CONN* etcp, uint32_t seq, uint16_t ts, uint16_t dts) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); if (!etcp) return; - -// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] processing ACK for seq=%u, ts=%u, dts=%u", etcp->log_name, seq, ts, dts); - + +// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] processing ACK for seq=%u, ts=%u, dts=%u", etcp->log_name, seq, ts, dts); + // Find the acknowledged packet in the wait_ack queue struct INFLIGHT_PACKET* acked_pkt = (struct INFLIGHT_PACKET*)queue_find_data_by_id(etcp->input_wait_ack, seq); if (acked_pkt) queue_remove_data(etcp->input_wait_ack, (struct ll_entry*)acked_pkt); else { acked_pkt = (struct INFLIGHT_PACKET*)queue_find_data_by_id(etcp->input_send_q, seq); queue_remove_data(etcp->input_send_q, (struct ll_entry*)acked_pkt); } - - if (!acked_pkt) { - // Packet might be already acknowledged or not found - DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "etcp_ack_recv: packet seq=%u not found in wait_ack queue", seq); - return; - } - - // Calculate RTT if timestamps are valid -/* if (ts != (uint16_t)-1 && dts != (uint16_t)-1) { - uint16_t rtt = timestamp_diff(ts, dts); - etcp->rtt_last = rtt; - - // Update RTT averages (exponential smoothing) - if (etcp->rtt_avg_10 == 0) { - etcp->rtt_avg_10 = rtt; - etcp->rtt_avg_100 = rtt; - } else { - // RTT average over 10 packets - etcp->rtt_avg_10 = (etcp->rtt_avg_10 * 9 + rtt) / 10; - // RTT average over 100 packets - etcp->rtt_avg_100 = (etcp->rtt_avg_100 * 99 + rtt) / 100; - } - - // Update jitter calculation (max - min of last 10 RTT samples) - etcp->rtt_history[etcp->rtt_history_idx] = rtt; - etcp->rtt_history_idx = (etcp->rtt_history_idx + 1) % 10; - - uint16_t rtt_min = UINT16_MAX, rtt_max = 0; - for (int i = 0; i < 10; i++) { - if (etcp->rtt_history[i] < rtt_min) rtt_min = etcp->rtt_history[i]; - if (etcp->rtt_history[i] > rtt_max) rtt_max = etcp->rtt_history[i]; - } - etcp->jitter = rtt_max - rtt_min; - + + if (!acked_pkt) { + // Packet might be already acknowledged or not found + DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "etcp_ack_recv: packet seq=%u not found in wait_ack queue", seq); + return; + } + + // Calculate RTT if timestamps are valid +/* if (ts != (uint16_t)-1 && dts != (uint16_t)-1) { + uint16_t rtt = timestamp_diff(ts, dts); + etcp->rtt_last = rtt; + + // Update RTT averages (exponential smoothing) + if (etcp->rtt_avg_10 == 0) { + etcp->rtt_avg_10 = rtt; + etcp->rtt_avg_100 = rtt; + } else { + // RTT average over 10 packets + etcp->rtt_avg_10 = (etcp->rtt_avg_10 * 9 + rtt) / 10; + // RTT average over 100 packets + etcp->rtt_avg_100 = (etcp->rtt_avg_100 * 99 + rtt) / 100; + } + + // Update jitter calculation (max - min of last 10 RTT samples) + etcp->rtt_history[etcp->rtt_history_idx] = rtt; + etcp->rtt_history_idx = (etcp->rtt_history_idx + 1) % 10; + + uint16_t rtt_min = UINT16_MAX, rtt_max = 0; + for (int i = 0; i < 10; i++) { + if (etcp->rtt_history[i] < rtt_min) rtt_min = etcp->rtt_history[i]; + if (etcp->rtt_history[i] > rtt_max) rtt_max = etcp->rtt_history[i]; + } + etcp->jitter = rtt_max - rtt_min; + // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] RTT updated - last=%u, avg_10=%u, avg_100=%u, jitter=%u", // etcp->log_name, rtt, etcp->rtt_avg_10, etcp->rtt_avg_100, etcp->jitter); } -*/ - // Update connection statistics - etcp->unacked_bytes -= acked_pkt->ll.len; - etcp->bytes_sent_total += acked_pkt->ll.len; - etcp->ack_packets_count++; - - DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] removed packet seq=%u from wait_ack, unacked_bytes now %u total acked=%u", etcp->log_name, seq, etcp->unacked_bytes, etcp->ack_packets_count); - - if (acked_pkt->ll.dgram) { - memory_pool_free(etcp->instance->data_pool, acked_pkt->ll.dgram); - } - memory_pool_free(etcp->inflight_pool, acked_pkt); - - // Try to resume sending more packets if window space opened up - input_queue_try_resume(etcp); - +*/ + // Update connection statistics + etcp->unacked_bytes -= acked_pkt->ll.len; + etcp->bytes_sent_total += acked_pkt->ll.len; + etcp->ack_packets_count++; + + DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] removed packet seq=%u from wait_ack, unacked_bytes now %u total acked=%u", etcp->log_name, seq, etcp->unacked_bytes, etcp->ack_packets_count); + + if (acked_pkt->ll.dgram) { + memory_pool_free(etcp->instance->data_pool, acked_pkt->ll.dgram); + } + memory_pool_free(etcp->inflight_pool, acked_pkt); + + // Try to resume sending more packets if window space opened up + input_queue_try_resume(etcp); + // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] completed for seq=%u", etcp->log_name, seq); } - - + + // Process incoming decrypted packet void etcp_conn_input(struct ETCP_DGRAM* pkt) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); if (!pkt || !pkt->data_len) return; - + etcp_dump_pkt_sections(pkt, pkt->link, 0); - + struct ETCP_CONN* etcp = pkt->link->etcp; - uint8_t* data = pkt->data; - uint16_t len = pkt->data_len; - uint16_t ts = pkt->timestamp; // Received timestamp - - // Note: Assume packet starts with sections after timestamp (but timestamp is already extracted in connections?). - // Protocol.txt: timestamp is first 2B, then sections. - // But in conn_input, pkt->data is after timestamp? Assume data starts with first section. - - while (len >= 1) { - uint8_t type = data[0]; - - // Process sections as per protocol.txt - switch (type) { + uint8_t* data = pkt->data; + uint16_t len = pkt->data_len; + uint16_t ts = pkt->timestamp; // Received timestamp + + // Note: Assume packet starts with sections after timestamp (but timestamp is already extracted in connections?). + // Protocol.txt: timestamp is first 2B, then sections. + // But in conn_input, pkt->data is after timestamp? Assume data starts with first section. + + while (len >= 1) { + uint8_t type = data[0]; + + // Process sections as per protocol.txt + switch (type) { case ETCP_SECTION_ACK: { int elm_cnt=data[1]; uint32_t till=data[2] | (data[3]<<8) | (data[4]<<16) | (data[5]<<24); @@ -877,34 +897,50 @@ void etcp_conn_input(struct ETCP_DGRAM* pkt) { while ((int32_t)(etcp->rx_ack_till-till)<0) { etcp->rx_ack_till++; etcp_ack_recv(etcp, etcp->rx_ack_till, -1, -1); }// подтверждаем всё по till break; } - case ETCP_SECTION_TIMESTAMP: { - uint16_t cur_ts=get_current_timestamp(); - uint16_t ret_ts=data[1] | (data[2]<<8);// cur_ts=ret_ts = RTT - uint16_t recv_dt=data[3] | (data[4]<<8); - pkt->link->rtt_last=cur_ts-ret_ts; - pkt->link->recv_dt_last=recv_dt; - data+=5; len-=5; - - struct ETCP_LINK* c=etcp->links; - int rtt_sum=0; - int tt_sum=0; - int cnt=0; - while (c) { - rtt_sum += c->rtt_last; - tt_sum += c->tt_last; - cnt++; - c=c->next; - } - etcp->rtt_last=rtt_sum/cnt; - etcp->tt_last=tt_sum/cnt; - break; - } - case ETCP_SECTION_PAYLOAD: { - - if (len>=5) { + case ETCP_SECTION_TIMESTAMP: { + uint16_t cur_ts=get_current_timestamp(); + uint16_t ret_ts=data[1] | (data[2]<<8);// cur_ts=ret_ts = RTT + uint16_t recv_dt=data[3] | (data[4]<<8); + pkt->link->rtt_last=cur_ts-ret_ts; + pkt->link->recv_dt_last=recv_dt; + data+=5; len-=5; + + struct ETCP_LINK* c=etcp->links; + int rtt_sum=0; + int tt_sum=0; + int cnt=0; + while (c) { + rtt_sum += c->rtt_last; + tt_sum += c->tt_last; + cnt++; + c=c->next; + } + etcp->rtt_last=rtt_sum/cnt; + etcp->tt_last=tt_sum/cnt; + break; + } + case ETCP_SECTION_PAYLOAD: { + + if (len>=5) { // формируем ACK - struct ACK_PACKET* p = (struct ACK_PACKET*)queue_entry_new_from_pool(etcp->instance->ack_pool); uint32_t seq=data[1] | (data[2]<<8) | (data[3]<<16) | (data[4]<<24); + if (etcp->got_initial_pkt == 0) { + if (seq==1) etcp->got_initial_pkt = 1; + else { + DEBUG_WARN(DEBUG_CATEGORY_ETCP, "[%s] Waiting for initial packet but recv seq=%d, ignoring", etcp->log_name, seq); + len=0; + break; + } + } + else { + uint32_t d=seq - etcp->last_delivered_id; + if (d>MAX_INFLIGHT_SIZE) { + DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] Received packet out of inflight bounds: seq=%d last delivered=%d", etcp->log_name, seq, etcp->last_delivered_id); + len=0; + break; + } + } + struct ACK_PACKET* p = (struct ACK_PACKET*)queue_entry_new_from_pool(etcp->instance->ack_pool); p->seq=seq; p->pkt_timestamp=pkt->timestamp; p->recv_timestamp=get_current_timestamp(); @@ -920,12 +956,14 @@ void etcp_conn_input(struct ETCP_DGRAM* pkt) { uint8_t* payload_data = memory_pool_alloc(etcp->instance->data_pool); if (!payload_data) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to allocate payload_data from data_pool", etcp->log_name); + len=0; break; } struct ETCP_FRAGMENT* rx_pkt = (struct ETCP_FRAGMENT*)queue_entry_new_from_pool(etcp->io_pool); if (!rx_pkt) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to allocate rx_pkt from io_pool", etcp->log_name); memory_pool_free(etcp->instance->data_pool, payload_data); + len=0; break; } rx_pkt->seq=seq; @@ -936,23 +974,22 @@ void etcp_conn_input(struct ETCP_DGRAM* pkt) { // Copy the actual payload data memcpy(payload_data, data + 5, pkt_len); queue_data_put(etcp->recv_q, (struct ll_entry*)rx_pkt, seq); - DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] packet seq=%u added to recv_q, calling assembly (last_delivered_id=%u)", etcp->log_name, seq, etcp->last_delivered_id); - if ((int32_t)(seq - etcp->last_delivered_id) == 1) etcp_output_try_assembly(etcp);// пробуем собрать выходную очередь из фрагментов - } - } - len=0; - break; - } - - default: - DEBUG_WARN(DEBUG_CATEGORY_ETCP, "etcp_conn_input: unknown section type=0x%02x", type); - len=0; - break; - } - - } - - memory_pool_free(etcp->instance->pkt_pool, pkt); // Free the incoming dgram -} - - + DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] packet seq=%u added to recv_q, calling assembly (last_delivered_id=%u)", etcp->log_name, seq, etcp->last_delivered_id); + if ((int32_t)(seq - etcp->last_delivered_id) == 1) etcp_output_try_assembly(etcp);// пробуем собрать выходную очередь из фрагментов + } + } + len=0; + break; + } + + default: + DEBUG_WARN(DEBUG_CATEGORY_ETCP, "etcp_conn_input: unknown section type=0x%02x", type); + len=0; + break; + } + + } + + memory_pool_free(etcp->instance->pkt_pool, pkt); // Free the incoming dgram +} + diff --git a/src/etcp.h b/src/etcp.h index 77a63d3..a5ffaa3 100644 --- a/src/etcp.h +++ b/src/etcp.h @@ -34,6 +34,8 @@ uint16_t get_current_timestamp(void); #define INFLIGHT_STATE_WAIT_SEND 1 #define INFLIGHT_INITIAL_HASH_SIZE 1024 +#define MAX_INFLIGHT_SIZE 16384 // максимальное число элементов в inflight приёмной очереди (для предотвращения атак) + // в этот список пакет добавляется когда перемещается из input_queue в input_send_q, при этом к пакету добавляется struct INFLIGHT_PACKET из inflight_pool. // пакет полностью удаляется когда приходит ACK (либо conn_reset/close) struct INFLIGHT_PACKET {// выделяется из etcp->inflight_pool @@ -96,6 +98,7 @@ struct ETCP_CONN { // IDs and state + uint8_t got_initial_pkt; // uint32_t next_tx_id; // Next TX ID uint32_t last_rx_id; // Last received ID uint32_t last_delivered_id; // Last delivered to output_queue @@ -147,6 +150,8 @@ void etcp_connection_close(struct ETCP_CONN* etcp); void etcp_conn_reset(struct ETCP_CONN* etcp); +void etcp_conn_reinit(struct ETCP_CONN* etcp); + // Отправка: используем api ll_queue для очереди ETCP_CONN.input_queue // Прием: используем api ll_queue для очереди ETCP_CONN.output_queue // для очередей используется формат diff --git a/src/pkt_normalizer.c b/src/pkt_normalizer.c index b7b6362..7783091 100644 --- a/src/pkt_normalizer.c +++ b/src/pkt_normalizer.c @@ -351,14 +351,19 @@ static void pn_unpacker_cb(struct ll_queue* q, void* arg) { if (len - ptr < 2) { // Incomplete header, reset DEBUG_ERROR(DEBUG_CATEGORY_NORMALIZER, "reset state"); - pn_unpacker_reset_state(pn); +// pn_unpacker_reset_state(pn); + etcp_conn_reinit(pn->etcp); break; } uint16_t part_size = payload[ptr] | (payload[ptr + 1] << 8); // DEBUG_INFO(DEBUG_CATEGORY_NORMALIZER, "pn_unpacker: new fragment pkt_len=%d (at %d)", part_size, ptr); ptr += 2; - if (part_size<1 || part_size>1500) DEBUG_ERROR(DEBUG_CATEGORY_NORMALIZER, "PART_SIZE ERROR!!! %d", part_size); + if (part_size<1 || part_size>16384) { + DEBUG_ERROR(DEBUG_CATEGORY_NORMALIZER, "PART_SIZE ERROR!!! %d", part_size); + etcp_conn_reinit(pn->etcp); + break; + } pn->recvpart = ll_alloc_lldgram(part_size); if (!pn->recvpart) { diff --git a/tests/bench_timeout_heap b/tests/bench_timeout_heap index f7b2cc0..8900813 100755 Binary files a/tests/bench_timeout_heap and b/tests/bench_timeout_heap differ diff --git a/tests/bench_uasync_timeouts b/tests/bench_uasync_timeouts index 12c310c..3f522ae 100755 Binary files a/tests/bench_uasync_timeouts and b/tests/bench_uasync_timeouts differ diff --git a/tools/bping/bping.c b/tools/bping/bping.c index c82a2ad..f8b63d2 100644 --- a/tools/bping/bping.c +++ b/tools/bping/bping.c @@ -19,7 +19,8 @@ #define RECV_TIMEOUT_SEC 5 static char *host = NULL; -static int data_size = 56; +static int data_size_min = 56; +static int data_size_max = 56; static int pings_per_burst = 10; static int bursts = 0; // 0 = бесконечно static double interval_sec = 1.0; @@ -61,9 +62,29 @@ void send_one_ping(int sock, const struct sockaddr_in *to, int datalen, uint16_t (struct sockaddr *)to, sizeof(*to)); } +static int parse_size_arg(const char *arg, int *min_val, int *max_val) { + const char *colon = strchr(arg, ':'); + if (colon) { + char *end; + long min = strtol(arg, &end, 10); + if (end != colon || min < 0 || min > MAX_DATA_SIZE) return -1; + long max = strtol(colon + 1, &end, 10); + if (*end != '\0' || max < 0 || max > MAX_DATA_SIZE || max < min) return -1; + *min_val = (int)min; + *max_val = (int)max; + } else { + char *end; + long val = strtol(arg, &end, 10); + if (*end != '\0' || val < 0 || val > MAX_DATA_SIZE) return -1; + *min_val = *max_val = (int)val; + } + return 0; +} + void usage() { printf("Использование: bping [-s размер] [-p посылок] [-b пачек] [-i интервал] host\n\n"); printf(" -s размер данные в байтах (по умолчанию 56, макс ~65k)\n"); + printf(" формат: -s 1472 (фиксированный) или -s 100:600 (диапазон)\n"); printf(" -p посылок пингов в одной пачке (по умолчанию 10, макс 10000)\n"); printf(" -b пачек количество пачек (0 = ∞, по умолчанию 0)\n"); printf(" -i интервал между пачками в секундах (0.001, 0.05 и т.д., по умолчанию 1.0)\n\n"); @@ -71,7 +92,7 @@ void usage() { printf("Примеры:\n"); printf(" sudo bping 8.8.8.8\n"); printf(" bping -s 1472 -p 200 -i 0.05 1.1.1.1\n"); - printf(" bping -p 500 -b 0 -i 0.01 192.168.1.1\n"); + printf(" bping -s 100:600 -p 500 -i 0.01 192.168.1.1\n"); exit(1); } @@ -84,7 +105,12 @@ int main(int argc, char **argv) { int opt; while ((opt = getopt(argc, argv, "s:p:b:i:h")) != -1) { switch (opt) { - case 's': data_size = atoi(optarg); break; + case 's': + if (parse_size_arg(optarg, &data_size_min, &data_size_max) != 0) { + fprintf(stderr, "Ошибка: неверный формат размера. Используйте -s число или -s мин:макс\n"); + usage(); + } + break; case 'p': pings_per_burst = atoi(optarg); break; case 'b': bursts = atoi(optarg); break; case 'i': interval_sec = strtod(optarg, NULL); break; @@ -95,7 +121,7 @@ int main(int argc, char **argv) { if (optind >= argc) usage(); host = argv[optind]; - if (data_size < 0 || data_size > MAX_DATA_SIZE || pings_per_burst < 1 || pings_per_burst > MAX_PINGS_PER_BURST) { + if (data_size_min < 0 || data_size_max > MAX_DATA_SIZE || pings_per_burst < 1 || pings_per_burst > MAX_PINGS_PER_BURST) { usage(); } @@ -130,8 +156,13 @@ int main(int argc, char **argv) { uint16_t global_seq = 0; printf("🚀 bping → %s (raw ICMP)\n", host); - printf("Данные: %d байт | В пачке: %d | Пачек: %s | Интервал: %.3f сек\n\n", - data_size, pings_per_burst, (bursts == 0 ? "∞" : "ограничено"), interval_sec); + if (data_size_min == data_size_max) { + printf("Данные: %d байт | В пачке: %d | Пачек: %s | Интервал: %.3f сек\n\n", + data_size_min, pings_per_burst, (bursts == 0 ? "∞" : "ограничено"), interval_sec); + } else { + printf("Данные: %d:%d байт (случайно) | В пачке: %d | Пачек: %s | Интервал: %.3f сек\n\n", + data_size_min, data_size_max, pings_per_burst, (bursts == 0 ? "∞" : "ограничено"), interval_sec); + } // Выделяем память один раз struct timeval *send_times = malloc(pings_per_burst * sizeof(struct timeval)); @@ -150,10 +181,11 @@ int main(int argc, char **argv) { // === Отправляем пачку максимально быстро === int sent = 0; for (int i = 0; i < pings_per_burst; i++) { + int pkt_size = data_size_min + (rand() % (data_size_max - data_size_min + 1)); uint16_t seq = ++global_seq; seq_list[i] = seq; gettimeofday(&send_times[i], NULL); - send_one_ping(sock, &dest, data_size, ident, seq); + send_one_ping(sock, &dest, pkt_size, ident, seq); sent++; }