// etcp.c - ETCP Protocol Implementation (refactored and expanded based on etcp_protocol.txt) #include "etcp.h" #include "etcp_debug.h" #include "etcp_loadbalancer.h" #include "routing.h" #include "route_bgp.h" #include "../lib/u_async.h" #include "../lib/ll_queue.h" #include "../lib/debug_config.h" #include "crc32.h" // For potential hashing, though not used yet. #include #include #include #include // For bandwidth calcs #include // For UINT16_MAX // Enable comprehensive debug output for ETCP module #define DEBUG_CATEGORY_ETCP_DETAILED 1 // Constants from spec (adjusted for completeness) #define MAX_INFLIGHT_BYTES 65536 // Initial window #define RETRANS_K1 2.0f // RTT multiplier for retrans timeout #define RETRANS_K2 1.5f // Jitter multiplier #define ACK_DELAY_TB 20 // ACK timer delay (2ms in 0.1ms units) #define BURST_DELAY_FACTOR 4 // Delay before burst #define BURST_SIZE 5 // Packets in burst (1 delayed + 4 burst) #define RTT_HISTORY_SIZE 10 // For jitter calc #define MAX_PENDING 32 // For ACKs/retrans (arbitrary; adjust) #define SECTION_HEADER_SIZE 3 // type(1) + len(2) // Container-of macro for getting struct from data pointer //#define CONTAINER_OF(ptr, type, member) ((type *)((char *)(ptr) - offsetof(type, member))) // Forward declarations static void input_queue_cb(struct ll_queue* q, void* arg); static void etcp_link_ready_callback(struct ETCP_CONN* etcp); static void input_send_q_cb(struct ll_queue* q, void* arg); static void wait_ack_cb(struct ll_queue* q, void* arg); static void etcp_conn_process_send_queue(struct ETCP_CONN* etcp); // Helper function to drain and free ETCP_FRAGMENT queue static void drain_and_free_fragment_queue(struct ETCP_CONN* etcp, struct ll_queue** q) { if (!*q) return; struct ETCP_FRAGMENT* pkt; while ((pkt = (struct ETCP_FRAGMENT*)queue_data_get(*q)) != NULL) { if (pkt->ll.dgram) { memory_pool_free(etcp->instance->data_pool, pkt->ll.dgram); } memory_pool_free(etcp->io_pool, pkt); } queue_free(*q); *q = NULL; } // Helper function to clear INFLIGHT_PACKET queue (keep queue, free elements) static void clear_inflight_queue(struct ll_queue* q, struct ETCP_CONN* etcp) { if (!q) return; struct INFLIGHT_PACKET* pkt; while ((pkt = (struct INFLIGHT_PACKET*)queue_data_get(q)) != NULL) { if (pkt->ll.dgram) { memory_pool_free(etcp->instance->data_pool, pkt->ll.dgram); } memory_pool_free(etcp->inflight_pool, pkt); } } // Helper function to clear ETCP_FRAGMENT queue (keep queue, free elements) static void clear_fragment_queue(struct ll_queue* q, struct ETCP_CONN* etcp) { if (!q) return; struct ETCP_FRAGMENT* pkt; while ((pkt = (struct ETCP_FRAGMENT*)queue_data_get(q)) != NULL) { if (pkt->ll.dgram) { memory_pool_free(etcp->instance->data_pool, pkt->ll.dgram); } memory_pool_free(etcp->io_pool, pkt); } } // Helper function to drain and free INFLIGHT_PACKET queue static void drain_and_free_inflight_queue(struct ETCP_CONN* etcp, struct ll_queue** q) { if (!*q) return; struct INFLIGHT_PACKET* pkt; while ((pkt = (struct INFLIGHT_PACKET*)queue_data_get(*q)) != NULL) { if (pkt->ll.dgram) { memory_pool_free(etcp->instance->data_pool, pkt->ll.dgram); } memory_pool_free(etcp->inflight_pool, pkt); } queue_free(*q); *q = NULL; } uint16_t get_current_timestamp() { return (uint16_t)get_time_tb(); } // Timestamp diff (with wrap-around) static uint16_t timestamp_diff(uint16_t t1, uint16_t t2) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); if (t1 >= t2) { return t1 - t2; } return (UINT16_MAX - t2) + t1 + 1; } // Create new ETCP connection struct ETCP_CONN* etcp_connection_create(struct UTUN_INSTANCE* instance) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); if (!instance) return NULL; struct ETCP_CONN* etcp = calloc(1, sizeof(struct ETCP_CONN)); if (!etcp) { DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "etcp_connection_create: creating connection failed for instance %p", instance); return NULL; } etcp->instance = instance; etcp->input_queue = queue_new(instance->ua, 0); // No hash for input_queue etcp->output_queue = queue_new(instance->ua, 0); // No hash for output_queue etcp->input_send_q = queue_new(instance->ua, INFLIGHT_INITIAL_HASH_SIZE); // Hash for send_q etcp->input_wait_ack = queue_new(instance->ua, INFLIGHT_INITIAL_HASH_SIZE); // Hash for wait_ack etcp->recv_q = queue_new(instance->ua, INFLIGHT_INITIAL_HASH_SIZE); // Hash for send_q etcp->ack_q = queue_new(instance->ua, 0); etcp->inflight_pool = memory_pool_init(sizeof(struct INFLIGHT_PACKET)); etcp->io_pool = memory_pool_init(sizeof(struct ETCP_FRAGMENT)); etcp->optimal_inflight=10000; // Initialize log_name with local node_id (peer will be updated later when known) snprintf(etcp->log_name, sizeof(etcp->log_name), "%04llu->????", (unsigned long long)(instance->node_id % 10000)); if (!etcp->input_queue || !etcp->output_queue || !etcp->input_send_q || !etcp->recv_q || !etcp->ack_q || !etcp->input_wait_ack || !etcp->inflight_pool || !etcp->io_pool) { DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "etcp_connection_create: error - closing - input:%p output:%p send:%p wait:%x pool:%p", etcp->input_queue, etcp->output_queue, etcp->input_send_q, etcp->input_wait_ack, etcp->inflight_pool); etcp_connection_close(etcp); return NULL; } // etcp->window_size = MAX_INFLIGHT_BYTES; // Not used etcp->mtu = 1500; // Default MTU etcp->next_tx_id = 1; etcp->rtt_avg_10 = 10; // Initial guess (1ms) etcp->rtt_avg_100 = 10; etcp->rtt_history_idx = 0; memset(etcp->rtt_history, 0, sizeof(etcp->rtt_history)); etcp->normalizer = pn_init(etcp); if (!etcp->normalizer) { etcp_connection_close(etcp); return NULL; } // Set input queue callback queue_set_callback(etcp->input_queue, input_queue_cb, etcp); queue_set_callback(etcp->input_send_q, input_send_q_cb, etcp); queue_set_callback(etcp->input_wait_ack, wait_ack_cb, etcp); etcp->link_ready_for_send_fn = etcp_link_ready_callback; // Register with routing module (must be done after normalizer is assigned) routing_add_conn(etcp); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] connection initialized. ETCP=%p mtu=%d, next_tx_id=%u", etcp->log_name, etcp, etcp->mtu, etcp->next_tx_id); return etcp; } // Close connection with NULL pointer safety (prevents double free) void etcp_connection_close(struct ETCP_CONN* etcp) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); if (!etcp) return; // Cancel active timers to prevent memory leaks if (etcp->retrans_timer) { uasync_cancel_timeout(etcp->instance->ua, etcp->retrans_timer); etcp->retrans_timer = NULL; } if (etcp->ack_resp_timer) { uasync_cancel_timeout(etcp->instance->ua, etcp->ack_resp_timer); etcp->ack_resp_timer = NULL; } routing_del_conn(etcp); // Notify BGP about connection closure (send withdraws, remove from senders_list) if (etcp->instance && etcp->instance->bgp) { route_bgp_remove_conn(etcp); } // Deinitialize packet normalizer (this will call routing_del_conn) if (etcp->normalizer) { pn_deinit((struct PKTNORM*)etcp->normalizer); etcp->normalizer = NULL; } // Drain and free all queues using helper functions drain_and_free_fragment_queue(etcp, &etcp->input_queue); drain_and_free_fragment_queue(etcp, &etcp->output_queue); drain_and_free_inflight_queue(etcp, &etcp->input_send_q); drain_and_free_inflight_queue(etcp, &etcp->input_wait_ack); drain_and_free_fragment_queue(etcp, &etcp->recv_q); // Drain and free ack_q (contains ACK_PACKET from ack_pool - special handling) if (etcp->ack_q) { struct ACK_PACKET* pkt; while ((pkt = (struct ACK_PACKET*)queue_data_get(etcp->ack_q)) != NULL) { queue_entry_free((struct ll_entry*)pkt); } queue_free(etcp->ack_q); etcp->ack_q = NULL; } // Free memory pools after all elements are returned if (etcp->inflight_pool) { memory_pool_destroy(etcp->inflight_pool); etcp->inflight_pool = NULL; } if (etcp->io_pool) { memory_pool_destroy(etcp->io_pool); etcp->io_pool = NULL; } // Clear links list safely if (etcp->links) { struct ETCP_LINK* link = etcp->links; while (link) { struct ETCP_LINK* next = link->next; etcp_link_close(link); link = next; } etcp->links = NULL; } // Clear next pointer to prevent dangling references etcp->next = NULL; // TODO: Free rx_list, etc. free(etcp); } // Reset connection void etcp_conn_reset(struct ETCP_CONN* etcp) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); // Reset IDs etcp->next_tx_id = 1; etcp->last_rx_id = 0; etcp->last_delivered_id = 0; etcp->rx_ack_till = 0; // Reset metrics etcp->unacked_bytes = 0; etcp->rtt_last = 0; etcp->rtt_avg_10 = 0; etcp->rtt_avg_100 = 0; etcp->jitter = 0; etcp->bytes_sent_total = 0; etcp->retransmissions_count = 0; etcp->ack_packets_count = 0; // Reset RTT history memset(etcp->rtt_history, 0, sizeof(etcp->rtt_history)); etcp->rtt_history_idx = 0; // Clear queues (keep queue structures) clear_inflight_queue(etcp->input_send_q, etcp); clear_inflight_queue(etcp->input_wait_ack, etcp); clear_fragment_queue(etcp->recv_q, etcp); // Reset timers (just clear the pointers - timers will expire naturally) etcp->retrans_timer = NULL; etcp->ack_resp_timer = NULL; } // Update log_name when peer_node_id becomes known void etcp_update_log_name(struct ETCP_CONN* etcp) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); if (!etcp || !etcp->instance) return; uint64_t local_id = etcp->instance->node_id % 10000; uint64_t peer_id = etcp->peer_node_id % 10000; snprintf(etcp->log_name, sizeof(etcp->log_name), "%04llu->%04llu", (unsigned long long)local_id, (unsigned long long)peer_id); } // ====================================================================== Отправка данных // Send data through ETCP connection // Allocates memory from data_pool and places in input queue // Returns: 0 on success, -1 on failure int etcp_int_send(struct ETCP_CONN* etcp, const void* data, uint16_t len) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); if (!etcp || !data || len == 0) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] invalid parameters (etcp=%p, data=%p, len=%zu)", etcp->log_name, etcp, data, len); return -1; } if (!etcp->input_queue) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] input_queue is NULL for etcp=%p", etcp->log_name, etcp); return -1; } // Check length against maximum packet size if (len > PACKET_DATA_SIZE) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] packet too large (len=%zu, max=%d)", etcp->log_name, len, PACKET_DATA_SIZE); return -1; } // Allocate packet data from data_pool (following ETCP reception pattern) uint8_t* packet_data = memory_pool_alloc(etcp->instance->data_pool); if (!packet_data) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to allocate packet data from data_pool", etcp->log_name); return -1; } // Copy user data to packet buffer memcpy(packet_data, data, len); // Create queue entry - this allocates ll_entry + data pointer struct ETCP_FRAGMENT* pkt = (struct ETCP_FRAGMENT*)queue_entry_new_from_pool(etcp->io_pool); if (!pkt) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to allocate queue entry", etcp->log_name); memory_pool_free(etcp->instance->data_pool, packet_data); return -1; } pkt->seq = 0; // Will be assigned by input_queue_cb pkt->timestamp = 0; // Will be set by input_queue_cb pkt->ll.dgram = packet_data; // Point to data_pool allocation pkt->ll.len = len; // размер packet_data DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] created PACKET %p with data %p (len=%zu)", etcp->log_name, pkt, packet_data, len); // Add to input queue - input_queue_cb will process it if (queue_data_put(etcp->input_queue, (struct ll_entry*)pkt, 0) != 0) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to add to input queue", etcp->log_name); memory_pool_free(etcp->instance->data_pool, packet_data); memory_pool_free(etcp->io_pool, pkt); return -1; } return 0; } static void input_queue_try_resume(struct ETCP_CONN* etcp) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); // если размер input_wait_ack+input_send_q в байтах < optimal_inflight то resume сейчас. size_t wait_ack_bytes = queue_total_bytes(etcp->input_wait_ack); size_t send_q_bytes = queue_total_bytes(etcp->input_send_q); size_t total_bytes = wait_ack_bytes + send_q_bytes; if (total_bytes <= etcp->optimal_inflight) { DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] resume callbacks: inflight_bytes=%d, input_len=%d", etcp->log_name, total_bytes, etcp->input_queue->total_bytes); queue_resume_callback(etcp->input_send_q);// вызвать лишний раз resume не страшно. //etcp_stats(etcp); if (queue_entry_count(etcp->input_send_q) == 0) queue_resume_callback(etcp->input_queue);// и только когда больше нечего отправлять - забираем новый пакет } } void etcp_stats(struct ETCP_CONN* etcp) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); if (!etcp) return; DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] stats for conn=%p:", etcp->log_name, etcp); // Queue statistics DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] Queues:", etcp->log_name); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] input_queue: %zu pkts, %zu bytes", etcp->log_name, queue_entry_count(etcp->input_queue), queue_total_bytes(etcp->input_queue)); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] input_send_q: %zu pkts, %zu bytes", etcp->log_name, queue_entry_count(etcp->input_send_q), queue_total_bytes(etcp->input_send_q)); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] input_wait_ack: %zu pkts, %zu bytes", etcp->log_name, queue_entry_count(etcp->input_wait_ack), queue_total_bytes(etcp->input_wait_ack)); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] ack_q: %zu pkts", etcp->log_name, queue_entry_count(etcp->ack_q)); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] recv_q: %zu pkts", etcp->log_name, queue_entry_count(etcp->recv_q)); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] output_queue: %zu pkts, %zu bytes", etcp->log_name, queue_entry_count(etcp->output_queue), queue_total_bytes(etcp->output_queue)); // RTT metrics DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] RTT metrics:", etcp->log_name); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] rtt_last: %u (0.1ms)", etcp->log_name, etcp->rtt_last); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] rtt_avg_10: %u (0.1ms)", etcp->log_name, etcp->rtt_avg_10); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] rtt_avg_100: %u (0.1ms)", etcp->log_name, etcp->rtt_avg_100); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] jitter: %u (0.1ms)", etcp->log_name, etcp->jitter); // Counters DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] Counters:", etcp->log_name); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] bytes_sent_total: %u", etcp->log_name, etcp->bytes_sent_total); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] retransmissions_count: %u", etcp->log_name, etcp->retransmissions_count); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] ack_packets_count: %u", etcp->log_name, etcp->ack_packets_count); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] unacked_bytes: %u", etcp->log_name, etcp->unacked_bytes); // IDs DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] IDs:", etcp->log_name); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] next_tx_id: %u", etcp->log_name, etcp->next_tx_id); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] last_rx_id: %u", etcp->log_name, etcp->last_rx_id); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] last_delivered_id:%u", etcp->log_name, etcp->last_delivered_id); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] rx_ack_till: %u", etcp->log_name, etcp->rx_ack_till); } // Input callback for input_queue (добавление новых кодограмм в стек) // input_queue -> input_send_q static void input_queue_cb(struct ll_queue* q, void* arg) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg; struct ETCP_FRAGMENT* in_pkt = (struct ETCP_FRAGMENT*)queue_data_get(q); if (!in_pkt) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] cannot get element (pool=%p etcp=%p)", etcp->log_name, etcp->inflight_pool, etcp); queue_resume_callback(q); return; } memory_pool_free(etcp->io_pool, in_pkt);// перемещаем из io_pool в inflight_pool // Create INFLIGHT_PACKET struct INFLIGHT_PACKET* p = memory_pool_alloc(etcp->inflight_pool); if (!p) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] cannot allocate INFLIGHT_PACKET (pool=%p etcp=%p)", etcp->log_name, etcp->inflight_pool, etcp); queue_entry_free((struct ll_entry*)in_pkt); // Free the ETCP_FRAGMENT queue_resume_callback(q); return; } // Setup inflight packet (based on protocol.txt) memset(p, 0, sizeof(*p)); p->seq = etcp->next_tx_id++; // Assign seq p->state = INFLIGHT_STATE_WAIT_SEND; p->last_timestamp = 0; p->ll.dgram = in_pkt->ll.dgram; p->ll.dgram_pool = in_pkt->ll.dgram_pool; p->ll.len = in_pkt->ll.len; DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] input -> inflight (seq=%u, len=%u)", etcp->log_name, p->seq, p->ll.len); int len=p->ll.len;// сохраним len // Add to send queue if (queue_data_put(etcp->input_send_q, (struct ll_entry*)p, p->seq) != 0) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to add packet seq=%u to input_send_q", etcp->log_name, p->seq); memory_pool_free(etcp->inflight_pool, p); queue_entry_free((struct ll_entry*)in_pkt); DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "[%s] EXIT (queue put failed)", etcp->log_name); return; } etcp->unacked_bytes += len; // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "input_queue_cb: successfully moved from input_queue to input_send_q"); etcp_conn_process_send_queue(etcp);// сразу обработаем этот пакет // input_queue_try_resume(etcp); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] nextloop, input_queue size=%d ", etcp->log_name, q->count); } static void input_send_q_cb(struct ll_queue* q, void* arg) {// etcp->input_send_q processing DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); struct ETCP_CONN* etcp=(struct ETCP_CONN*)arg; // size_t send_q_bytes = queue_total_bytes(etcp->input_send_q); // size_t send_q_pkts = queue_entry_count(etcp->input_send_q); // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "input_send_q_cb: input_send_q status: %d pkt %d bytes", send_q_pkts, send_q_bytes); etcp_conn_process_send_queue(etcp); } static void ack_timeout_check(void* arg) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg; uint64_t now = get_time_tb(); uint64_t timeout = 1000;//(uint64_t)(etcp->rtt_avg_10 * RETRANS_K1) + (uint64_t)(etcp->jitter * RETRANS_K2); // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "ack_timeout_check: starting check, now=%llu, timeout=%llu, rtt_avg_10=%u, jitter=%u", // (unsigned long long)now, (unsigned long long)timeout, etcp->rtt_avg_10, etcp->jitter); struct ll_entry* current; while (current = etcp->input_wait_ack->head) { struct INFLIGHT_PACKET* pkt = (struct INFLIGHT_PACKET*)current; // Check for invalid timestamp if (pkt->last_timestamp == 0) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] ack_timeout_check: packet seq=%u has last_timestamp=0, not sent yet!", etcp->log_name, pkt->seq); // Schedule timer to check again later etcp->retrans_timer = uasync_set_timeout(etcp->instance->ua, timeout, etcp, ack_timeout_check); return; } if (pkt->last_timestamp > now) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] ack_timeout_check: packet seq=%u has last_timestamp=%llu > now=%llu, clock went backwards!", etcp->log_name, pkt->seq, (unsigned long long)pkt->last_timestamp, (unsigned long long)now); // Fix the timestamp to prevent overflow pkt->last_timestamp = now; } uint64_t elapsed = now - pkt->last_timestamp; if (elapsed > timeout) { DEBUG_WARN(DEBUG_CATEGORY_ETCP, "[%s] ack_timeout_check: timeout for seq=%u, elapsed=%llu, timeout=%llu, send_count=%u. Moving: wait_ack -> send_q", etcp->log_name, pkt->seq, (unsigned long long)elapsed, (unsigned long long)timeout, pkt->send_count); // Increment counters pkt->send_count++; pkt->retrans_req_count++; // Optional, if used for retrans request logic pkt->last_timestamp = now; pkt->last_link = NULL; // Reset last link for re-selection // Remove from wait_ack queue_data_get(etcp->input_wait_ack); // Change state and add to send_q for retransmission pkt->state = INFLIGHT_STATE_WAIT_SEND; queue_data_put(etcp->input_send_q, (struct ll_entry*)pkt, pkt->seq); // Update stats etcp->retransmissions_count++; } else {// не надо до конца сканировать - они уже сортированы по таймстемпу т.к. очередь fifo, а timestamp = время добавления в очередь = время отправки // shedule timer int64_t next_timeout=timeout - elapsed; etcp->retrans_timer = uasync_set_timeout(etcp->instance->ua, next_timeout+10, etcp, ack_timeout_check); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] ack_timeout_check: retransmission timer set for %llu units", etcp->log_name, next_timeout); return; } } // если всё выгребли - надо взвести resume etcp->retrans_timer = NULL; queue_resume_callback(etcp->input_wait_ack); } static void wait_ack_cb(struct ll_queue* q, void* arg) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg; ack_timeout_check(etcp);// ack_cb срабатывает когда init (таймер не инициализирован) или когда empty (таймер не активен) } // Подготовить и отправить кодограмму // вызывается линком когда освобождается или очередью если появляются данные на передачу struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); struct ETCP_LINK* link = etcp_loadbalancer_select_link(etcp); if (!link) { DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] no link available", etcp->log_name); return NULL;// если линков нет - ждём появления свободного } size_t send_q_size = queue_entry_count(etcp->input_send_q); if (send_q_size == 0) {// сгребаем из input_queue // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: input_send_q empty, check if avail input_queue -> inflight"); input_queue_try_resume(etcp); // return NULL; } // First, check if there's a packet in input_send_q (retrans or new) // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: getting packet from input_send_q"); struct INFLIGHT_PACKET* inf_pkt = (struct INFLIGHT_PACKET*)queue_data_get(etcp->input_send_q); if (inf_pkt) { // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] prepare udp dgram for send packet %p (seq=%u, len=%u)", etcp->log_name, inf_pkt, inf_pkt->seq, inf_pkt->ll.len); inf_pkt->last_timestamp=get_time_tb(); inf_pkt->send_count++; inf_pkt->state=INFLIGHT_STATE_WAIT_ACK; queue_data_put(etcp->input_wait_ack, (struct ll_entry*)inf_pkt, inf_pkt->seq);// move dgram to wait_ack queue } size_t ack_q_size = queue_entry_count(etcp->ack_q); if (!inf_pkt && ack_q_size == 0) { // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] no data/ack to send", etcp->log_name); return NULL; } struct ETCP_DGRAM* dgram = memory_pool_alloc(etcp->instance->pkt_pool); if (!dgram) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to allocate ETCP_DGRAM", etcp->log_name); return NULL; } dgram->link = link; dgram->noencrypt_len=0; dgram->timestamp=get_current_timestamp(); // формат ack: [01] [elements count] [4 байта last_delivered_id] и <[4 байта seq][2 байта recv_ts][2 байта txrx delay ts]> x count dgram->data[0]=1;// ack int ptr=2; dgram->data[ptr++]=etcp->last_delivered_id; dgram->data[ptr++]=etcp->last_delivered_id>>8; dgram->data[ptr++]=etcp->last_delivered_id>>16; dgram->data[ptr++]=etcp->last_delivered_id>>24; // тут (потом) добавим опциональные заголовки struct ACK_PACKET* ack_pkt; while ((ack_pkt = (struct ACK_PACKET*)queue_data_get(etcp->ack_q))) { // seq 4 байта dgram->data[ptr++]=ack_pkt->seq; dgram->data[ptr++]=ack_pkt->seq>>8; dgram->data[ptr++]=ack_pkt->seq>>16; dgram->data[ptr++]=ack_pkt->seq>>24; // ts приема 2 байта dgram->data[ptr++]=ack_pkt->recv_timestamp; dgram->data[ptr++]=ack_pkt->recv_timestamp>>8; // время задержки 2 байта между recv и ack uint16_t dly=get_current_timestamp()-ack_pkt->recv_timestamp; dgram->data[ptr++]=dly; dgram->data[ptr++]=dly>>8; DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] add ACK N%d dTS=%d", etcp->log_name, ack_pkt->seq, dly); queue_entry_free((struct ll_entry*)ack_pkt); if (inf_pkt && inf_pkt->ll.len+ptr>=etcp->mtu-10) break;// pkt len (надо просчитать точнее включая все заголовки) if (ptr>500) break; } dgram->data[1]=ptr/8; if (inf_pkt) { // фрейм data (0) обязательно в конец DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] add DATA (seq=%u, len=%u), ack_size=%d", etcp->log_name, inf_pkt->seq, inf_pkt->ll.len, dgram->data[1]); dgram->data[ptr++]=0;// payload dgram->data[ptr++]=inf_pkt->seq; dgram->data[ptr++]=inf_pkt->seq>>8; dgram->data[ptr++]=inf_pkt->seq>>16; dgram->data[ptr++]=inf_pkt->seq>>24; memcpy(&dgram->data[ptr], inf_pkt->ll.dgram, inf_pkt->ll.len); ptr+=inf_pkt->ll.len; } else { DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] only ACK packet with %d bytes total", etcp->log_name, dgram->data_len); } dgram->data_len=ptr; etcp_dump_pkt_sections(dgram, link, 1); return dgram; } // Callback for when a link is ready to send data static void etcp_link_ready_callback(struct ETCP_CONN* etcp) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); if (!etcp) return; // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_link_ready_callback: processing send queue for etcp=%p", etcp); etcp_conn_process_send_queue(etcp); } // Process packets in send queue and transmit them static void etcp_conn_process_send_queue(struct ETCP_CONN* etcp) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); struct ETCP_DGRAM* dgram; while(dgram = etcp_request_pkt(etcp)) { // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_conn_process_send_queue: sending packet"); etcp_loadbalancer_send(dgram); } } static void ack_response_timer_cb(void* arg) {// проверяем неотправленные ack response и отправляем если надо. DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg; etcp_conn_process_send_queue(etcp);// проталкиваем (она же должна отправлять только ack если больше ничего нет) // если ack все еще заняты - обновляем таймаут if (etcp->ack_q->count) etcp->ack_resp_timer = uasync_set_timeout(etcp->instance->ua, ACK_DELAY_TB, etcp, ack_response_timer_cb); else etcp->ack_resp_timer=NULL; } // ====================================================================== Прием данных void etcp_output_try_assembly(struct ETCP_CONN* etcp) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); // пробуем собрать выходную очередь из фрагментов // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] etcp=%p, last_delivered_id=%u, recv_q_count=%d", // etcp->log_name, etcp, etcp->last_delivered_id, queue_entry_count(etcp->recv_q)); uint32_t next_expected_id = etcp->last_delivered_id + 1; int delivered_count = 0; uint32_t delivered_bytes = 0; // Look for contiguous packets starting from next_expected_id while (1) { struct ETCP_FRAGMENT* rx_pkt = (struct ETCP_FRAGMENT*)queue_find_data_by_id(etcp->recv_q, next_expected_id); if (!rx_pkt) { // No more contiguous packets found // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] no packet found for id=%u, stopping", etcp->log_name, next_expected_id); break; } // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] assembling packet id=%u (len=%u)", etcp->log_name, // rx_pkt->seq, rx_pkt->ll.len); // Simply move ETCP_FRAGMENT from recv_q to output_queue - no data copying needed // Remove from recv_q first queue_remove_data(etcp->recv_q, (struct ll_entry*)rx_pkt); // Add to output_queue using the same ETCP_FRAGMENT structure if (queue_data_put(etcp->output_queue, (struct ll_entry*)rx_pkt, next_expected_id) == 0) { delivered_bytes += rx_pkt->ll.len; delivered_count++; DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "[%s] moved packet id=%u to output_queue", etcp->log_name, next_expected_id); } else { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to add packet id=%u to output_queue", etcp->log_name, next_expected_id); // Put it back in recv_q if we can't add to output_queue queue_data_put(etcp->recv_q, (struct ll_entry*)rx_pkt, next_expected_id); break; } // Update state for next iteration etcp->last_delivered_id = next_expected_id; next_expected_id++; } DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] delivered %u contiguous packets (%u bytes), last_delivered_id=%u, output_queue_count=%d", etcp->log_name, delivered_count, delivered_bytes, etcp->last_delivered_id, queue_entry_count(etcp->output_queue)); } // Process ACK receipt - remove acknowledged packet from inflight queues void etcp_ack_recv(struct ETCP_CONN* etcp, uint32_t seq, uint16_t ts, uint16_t dts) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); if (!etcp) return; // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] processing ACK for seq=%u, ts=%u, dts=%u", etcp->log_name, seq, ts, dts); // Find the acknowledged packet in the wait_ack queue struct INFLIGHT_PACKET* acked_pkt = (struct INFLIGHT_PACKET*)queue_find_data_by_id(etcp->input_wait_ack, seq); if (acked_pkt) queue_remove_data(etcp->input_wait_ack, (struct ll_entry*)acked_pkt); else { acked_pkt = (struct INFLIGHT_PACKET*)queue_find_data_by_id(etcp->input_send_q, seq); queue_remove_data(etcp->input_send_q, (struct ll_entry*)acked_pkt); } if (!acked_pkt) { // Packet might be already acknowledged or not found DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "etcp_ack_recv: packet seq=%u not found in wait_ack queue", seq); return; } // Calculate RTT if timestamps are valid if (ts != (uint16_t)-1 && dts != (uint16_t)-1) { uint16_t rtt = timestamp_diff(ts, dts); etcp->rtt_last = rtt; // Update RTT averages (exponential smoothing) if (etcp->rtt_avg_10 == 0) { etcp->rtt_avg_10 = rtt; etcp->rtt_avg_100 = rtt; } else { // RTT average over 10 packets etcp->rtt_avg_10 = (etcp->rtt_avg_10 * 9 + rtt) / 10; // RTT average over 100 packets etcp->rtt_avg_100 = (etcp->rtt_avg_100 * 99 + rtt) / 100; } // Update jitter calculation (max - min of last 10 RTT samples) etcp->rtt_history[etcp->rtt_history_idx] = rtt; etcp->rtt_history_idx = (etcp->rtt_history_idx + 1) % 10; uint16_t rtt_min = UINT16_MAX, rtt_max = 0; for (int i = 0; i < 10; i++) { if (etcp->rtt_history[i] < rtt_min) rtt_min = etcp->rtt_history[i]; if (etcp->rtt_history[i] > rtt_max) rtt_max = etcp->rtt_history[i]; } etcp->jitter = rtt_max - rtt_min; // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] RTT updated - last=%u, avg_10=%u, avg_100=%u, jitter=%u", // etcp->log_name, rtt, etcp->rtt_avg_10, etcp->rtt_avg_100, etcp->jitter); } // Update connection statistics etcp->unacked_bytes -= acked_pkt->ll.len; etcp->bytes_sent_total += acked_pkt->ll.len; etcp->ack_packets_count++; DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] removed packet seq=%u from wait_ack, unacked_bytes now %u total acked=%u", etcp->log_name, seq, etcp->unacked_bytes, etcp->ack_packets_count); if (acked_pkt->ll.dgram) { memory_pool_free(etcp->instance->data_pool, acked_pkt->ll.dgram); } memory_pool_free(etcp->inflight_pool, acked_pkt); // Try to resume sending more packets if window space opened up input_queue_try_resume(etcp); // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] completed for seq=%u", etcp->log_name, seq); } // Process incoming decrypted packet void etcp_conn_input(struct ETCP_DGRAM* pkt) { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); if (!pkt || !pkt->data_len) return; etcp_dump_pkt_sections(pkt, pkt->link, 0); struct ETCP_CONN* etcp = pkt->link->etcp; uint8_t* data = pkt->data; uint16_t len = pkt->data_len; uint16_t ts = pkt->timestamp; // Received timestamp // Note: Assume packet starts with sections after timestamp (but timestamp is already extracted in connections?). // Protocol.txt: timestamp is first 2B, then sections. // But in conn_input, pkt->data is after timestamp? Assume data starts with first section. while (len >= 1) { uint8_t type = data[0]; // Process sections as per protocol.txt switch (type) { case ETCP_SECTION_ACK: { int elm_cnt=data[1]; uint32_t till=data[2] | (data[3]<<8) | (data[4]<<16) | (data[5]<<24); int ack_section_len = 6 + elm_cnt * 8; data+=ack_section_len; len-=ack_section_len; for (int i=0; irx_ack_till-till)<0) { etcp->rx_ack_till++; etcp_ack_recv(etcp, etcp->rx_ack_till, -1, -1); }// подтверждаем всё по till break; } case ETCP_SECTION_PAYLOAD: { if (len>=5) { // формируем ACK struct ACK_PACKET* p = (struct ACK_PACKET*)queue_entry_new_from_pool(etcp->instance->ack_pool); uint32_t seq=data[1] | (data[2]<<8) | (data[3]<<16) | (data[4]<<24); p->seq=seq; p->pkt_timestamp=pkt->timestamp; p->recv_timestamp=get_current_timestamp(); queue_data_put(etcp->ack_q, (struct ll_entry*)p, p->seq); if (etcp->ack_resp_timer == NULL) { etcp->ack_resp_timer = uasync_set_timeout(etcp->instance->ua, ACK_DELAY_TB, etcp, ack_response_timer_cb); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] set ack_timer for delayed ACK send", etcp->log_name); } if ((int32_t)(etcp->last_delivered_id-seq)<0) if (queue_find_data_by_id(etcp->recv_q, seq)==NULL) {// проверяем есть ли пакет с этим seq uint32_t pkt_len=len-5; DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] adding packet seq=%u to recv_q (last_delivered_id=%u)", etcp->log_name, seq, etcp->last_delivered_id); // отправляем пакет в очередь на сборку uint8_t* payload_data = memory_pool_alloc(etcp->instance->data_pool); if (!payload_data) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to allocate payload_data from data_pool", etcp->log_name); break; } struct ETCP_FRAGMENT* rx_pkt = (struct ETCP_FRAGMENT*)queue_entry_new_from_pool(etcp->io_pool); if (!rx_pkt) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to allocate rx_pkt from io_pool", etcp->log_name); memory_pool_free(etcp->instance->data_pool, payload_data); break; } rx_pkt->seq=seq; rx_pkt->timestamp=pkt->timestamp; rx_pkt->ll.dgram=payload_data; rx_pkt->ll.dgram_pool=etcp->instance->data_pool; rx_pkt->ll.len=pkt_len; // Copy the actual payload data memcpy(payload_data, data + 5, pkt_len); queue_data_put(etcp->recv_q, (struct ll_entry*)rx_pkt, seq); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] packet seq=%u added to recv_q, calling assembly (last_delivered_id=%u)", etcp->log_name, seq, etcp->last_delivered_id); if (etcp->last_delivered_id+1==seq) etcp_output_try_assembly(etcp);// пробуем собрать выходную очередь из фрагментов } } len=0; break; } default: DEBUG_WARN(DEBUG_CATEGORY_ETCP, "etcp_conn_input: unknown section type=0x%02x", type); len=0; break; } } memory_pool_free(etcp->instance->pkt_pool, pkt); // Free the incoming dgram }