From 994a4bf77973fca095d7fda0b5b082ca689562d0 Mon Sep 17 00:00:00 2001 From: Evgeny Date: Thu, 5 Feb 2026 03:47:34 +0300 Subject: [PATCH] Fix ETCP timestamp overflow bug and enable epoll for Linux --- lib/u_async.c | 3 +-- src/etcp.c | 68 +++++++++++++++++++++++++++++++++++++-------------- 2 files changed, 50 insertions(+), 21 deletions(-) diff --git a/lib/u_async.c b/lib/u_async.c index dadbbed..d18caea 100644 --- a/lib/u_async.c +++ b/lib/u_async.c @@ -399,8 +399,7 @@ void* uasync_add_socket(struct UASYNC* ua, int fd, socket_callback_t read_cbk, s ev.events = 0; if (read_cbk) ev.events |= EPOLLIN; if (write_cbk) ev.events |= EPOLLOUT; - // Use edge-triggered mode for better performance - ev.events |= EPOLLET; + // Use level-triggered mode (default) for compatibility with UDP sockets ev.data.ptr = &ua->sockets->sockets[index]; if (epoll_ctl(ua->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) { diff --git a/src/etcp.c b/src/etcp.c index 071ba56..893f325 100644 --- a/src/etcp.c +++ b/src/etcp.c @@ -81,6 +81,7 @@ struct ETCP_CONN* etcp_connection_create(struct UTUN_INSTANCE* instance) { etcp->ack_q = queue_new(instance->ua, 0); etcp->inflight_pool = memory_pool_init(sizeof(struct INFLIGHT_PACKET)); etcp->io_pool = memory_pool_init(sizeof(struct ETCP_FRAGMENT)); + etcp->optimal_inflight=10000; if (!etcp->input_queue || !etcp->output_queue || !etcp->input_send_q || !etcp->recv_q || !etcp->ack_q || !etcp->input_wait_ack || !etcp->inflight_pool || !etcp->io_pool) { @@ -320,9 +321,11 @@ static void input_queue_try_resume(struct ETCP_CONN* etcp) { size_t send_q_bytes = queue_total_bytes(etcp->input_send_q); size_t total_bytes = wait_ack_bytes + send_q_bytes; - if (total_bytes < etcp->optimal_inflight) { + if (total_bytes <= etcp->optimal_inflight) { + DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] resume callbacks: inflight_bytes=%d, input_len=%d", etcp->log_name, total_bytes, etcp->input_queue->total_bytes); queue_resume_callback(etcp->input_send_q);// вызвать лишний раз resume не страшно. - DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] resumed input_send_q callback", etcp->log_name); +//etcp_stats(etcp); + if (queue_entry_count(etcp->input_send_q) == 0) queue_resume_callback(etcp->input_queue);// и только когда больше нечего отправлять - забираем новый пакет } } @@ -406,6 +409,7 @@ static void input_queue_cb(struct ll_queue* q, void* arg) { // Add to send queue if (queue_data_put(etcp->input_send_q, (struct ll_entry*)p, p->seq) != 0) { + DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to add packet seq=%u to input_send_q", etcp->log_name, p->seq); memory_pool_free(etcp->inflight_pool, p); queue_entry_free((struct ll_entry*)in_pkt); DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "[%s] EXIT (queue put failed)", etcp->log_name); @@ -415,7 +419,7 @@ static void input_queue_cb(struct ll_queue* q, void* arg) { etcp_conn_process_send_queue(etcp);// сразу обработаем этот пакет // input_queue_try_resume(etcp); - queue_resume_callback(etcp->input_queue); + DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] nextloop, input_queue size=%d ", etcp->log_name, q->count); } @@ -437,11 +441,28 @@ static void ack_timeout_check(void* arg) { // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "ack_timeout_check: starting check, now=%llu, timeout=%llu, rtt_avg_10=%u, jitter=%u", // (unsigned long long)now, (unsigned long long)timeout, etcp->rtt_avg_10, etcp->jitter); - struct ll_entry* current; - while (current = etcp->input_wait_ack->head) { - struct INFLIGHT_PACKET* pkt = (struct INFLIGHT_PACKET*)current; - uint64_t elapsed = now - pkt->last_timestamp; - if (elapsed > timeout) { + struct ll_entry* current; + while (current = etcp->input_wait_ack->head) { + struct INFLIGHT_PACKET* pkt = (struct INFLIGHT_PACKET*)current; + + // Check for invalid timestamp + if (pkt->last_timestamp == 0) { + DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] ack_timeout_check: packet seq=%u has last_timestamp=0, not sent yet!", + etcp->log_name, pkt->seq); + // Schedule timer to check again later + etcp->retrans_timer = uasync_set_timeout(etcp->instance->ua, timeout, etcp, ack_timeout_check); + return; + } + + if (pkt->last_timestamp > now) { + DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] ack_timeout_check: packet seq=%u has last_timestamp=%llu > now=%llu, clock went backwards!", + etcp->log_name, pkt->seq, (unsigned long long)pkt->last_timestamp, (unsigned long long)now); + // Fix the timestamp to prevent overflow + pkt->last_timestamp = now; + } + + uint64_t elapsed = now - pkt->last_timestamp; + if (elapsed > timeout) { DEBUG_WARN(DEBUG_CATEGORY_ETCP, "[%s] ack_timeout_check: timeout for seq=%u, elapsed=%llu, timeout=%llu, send_count=%u. Moving: wait_ack -> send_q", etcp->log_name, pkt->seq, (unsigned long long)elapsed, (unsigned long long)timeout, pkt->send_count); @@ -512,7 +533,7 @@ struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) { size_t ack_q_size = queue_entry_count(etcp->ack_q); if (!inf_pkt && ack_q_size == 0) { - DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] no data/ack to send", etcp->log_name); +// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] no data/ack to send", etcp->log_name); return NULL; } @@ -611,8 +632,8 @@ static void ack_response_timer_cb(void* arg) {// проверяем неотпр void etcp_output_try_assembly(struct ETCP_CONN* etcp) { // пробуем собрать выходную очередь из фрагментов - DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] etcp=%p, last_delivered_id=%u, recv_q_count=%d", - etcp->log_name, etcp, etcp->last_delivered_id, queue_entry_count(etcp->recv_q)); +// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] etcp=%p, last_delivered_id=%u, recv_q_count=%d", +// etcp->log_name, etcp, etcp->last_delivered_id, queue_entry_count(etcp->recv_q)); uint32_t next_expected_id = etcp->last_delivered_id + 1; int delivered_count = 0; @@ -623,12 +644,12 @@ void etcp_output_try_assembly(struct ETCP_CONN* etcp) { struct ETCP_FRAGMENT* rx_pkt = (struct ETCP_FRAGMENT*)queue_find_data_by_id(etcp->recv_q, next_expected_id); if (!rx_pkt) { // No more contiguous packets found - DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] no packet found for id=%u, stopping", etcp->log_name, next_expected_id); +// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] no packet found for id=%u, stopping", etcp->log_name, next_expected_id); break; } - DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] assembling packet id=%u (len=%u)", etcp->log_name, - rx_pkt->seq, rx_pkt->ll.len); +// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] assembling packet id=%u (len=%u)", etcp->log_name, +// rx_pkt->seq, rx_pkt->ll.len); // Simply move ETCP_FRAGMENT from recv_q to output_queue - no data copying needed // Remove from recv_q first @@ -661,7 +682,7 @@ void etcp_output_try_assembly(struct ETCP_CONN* etcp) { void etcp_ack_recv(struct ETCP_CONN* etcp, uint32_t seq, uint16_t ts, uint16_t dts) { if (!etcp) return; - DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] processing ACK for seq=%u, ts=%u, dts=%u", etcp->log_name, seq, ts, dts); +// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] processing ACK for seq=%u, ts=%u, dts=%u", etcp->log_name, seq, ts, dts); // Find the acknowledged packet in the wait_ack queue struct INFLIGHT_PACKET* acked_pkt = (struct INFLIGHT_PACKET*)queue_find_data_by_id(etcp->input_wait_ack, seq); @@ -703,8 +724,8 @@ void etcp_ack_recv(struct ETCP_CONN* etcp, uint32_t seq, uint16_t ts, uint16_t d } etcp->jitter = rtt_max - rtt_min; - DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] RTT updated - last=%u, avg_10=%u, avg_100=%u, jitter=%u", - etcp->log_name, rtt, etcp->rtt_avg_10, etcp->rtt_avg_100, etcp->jitter); +// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] RTT updated - last=%u, avg_10=%u, avg_100=%u, jitter=%u", +// etcp->log_name, rtt, etcp->rtt_avg_10, etcp->rtt_avg_100, etcp->jitter); } // Update connection statistics @@ -712,7 +733,7 @@ void etcp_ack_recv(struct ETCP_CONN* etcp, uint32_t seq, uint16_t ts, uint16_t d etcp->bytes_sent_total += acked_pkt->ll.len; etcp->ack_packets_count++; - DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] removed packet seq=%u from wait_ack, unacked_bytes now %u", etcp->log_name, seq, etcp->unacked_bytes); + DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] removed packet seq=%u from wait_ack, unacked_bytes now %u total acked=%u", etcp->log_name, seq, etcp->unacked_bytes, etcp->ack_packets_count); if (acked_pkt->ll.dgram) { memory_pool_free(etcp->instance->data_pool, acked_pkt->ll.dgram); @@ -722,7 +743,7 @@ void etcp_ack_recv(struct ETCP_CONN* etcp, uint32_t seq, uint16_t ts, uint16_t d // Try to resume sending more packets if window space opened up input_queue_try_resume(etcp); - DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] completed for seq=%u", etcp->log_name, seq); +// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] completed for seq=%u", etcp->log_name, seq); } @@ -778,7 +799,16 @@ void etcp_conn_input(struct ETCP_DGRAM* pkt) { DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] adding packet seq=%u to recv_q (last_delivered_id=%u)", etcp->log_name, seq, etcp->last_delivered_id); // отправляем пакет в очередь на сборку uint8_t* payload_data = memory_pool_alloc(etcp->instance->data_pool); + if (!payload_data) { + DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to allocate payload_data from data_pool", etcp->log_name); + break; + } struct ETCP_FRAGMENT* rx_pkt = (struct ETCP_FRAGMENT*)queue_entry_new_from_pool(etcp->io_pool); + if (!rx_pkt) { + DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to allocate rx_pkt from io_pool", etcp->log_name); + memory_pool_free(etcp->instance->data_pool, payload_data); + break; + } rx_pkt->seq=seq; rx_pkt->timestamp=pkt->timestamp; rx_pkt->ll.dgram=payload_data;