Browse Source

Fix ETCP timestamp overflow bug and enable epoll for Linux

nodeinfo-routing-update
Evgeny 2 months ago
parent
commit
994a4bf779
  1. 3
      lib/u_async.c
  2. 68
      src/etcp.c

3
lib/u_async.c

@ -399,8 +399,7 @@ void* uasync_add_socket(struct UASYNC* ua, int fd, socket_callback_t read_cbk, s
ev.events = 0;
if (read_cbk) ev.events |= EPOLLIN;
if (write_cbk) ev.events |= EPOLLOUT;
// Use edge-triggered mode for better performance
ev.events |= EPOLLET;
// Use level-triggered mode (default) for compatibility with UDP sockets
ev.data.ptr = &ua->sockets->sockets[index];
if (epoll_ctl(ua->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) {

68
src/etcp.c

@ -81,6 +81,7 @@ struct ETCP_CONN* etcp_connection_create(struct UTUN_INSTANCE* instance) {
etcp->ack_q = queue_new(instance->ua, 0);
etcp->inflight_pool = memory_pool_init(sizeof(struct INFLIGHT_PACKET));
etcp->io_pool = memory_pool_init(sizeof(struct ETCP_FRAGMENT));
etcp->optimal_inflight=10000;
if (!etcp->input_queue || !etcp->output_queue || !etcp->input_send_q || !etcp->recv_q || !etcp->ack_q ||
!etcp->input_wait_ack || !etcp->inflight_pool || !etcp->io_pool) {
@ -320,9 +321,11 @@ static void input_queue_try_resume(struct ETCP_CONN* etcp) {
size_t send_q_bytes = queue_total_bytes(etcp->input_send_q);
size_t total_bytes = wait_ack_bytes + send_q_bytes;
if (total_bytes < etcp->optimal_inflight) {
if (total_bytes <= etcp->optimal_inflight) {
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] resume callbacks: inflight_bytes=%d, input_len=%d", etcp->log_name, total_bytes, etcp->input_queue->total_bytes);
queue_resume_callback(etcp->input_send_q);// вызвать лишний раз resume не страшно.
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] resumed input_send_q callback", etcp->log_name);
//etcp_stats(etcp);
if (queue_entry_count(etcp->input_send_q) == 0) queue_resume_callback(etcp->input_queue);// и только когда больше нечего отправлять - забираем новый пакет
}
}
@ -406,6 +409,7 @@ static void input_queue_cb(struct ll_queue* q, void* arg) {
// Add to send queue
if (queue_data_put(etcp->input_send_q, (struct ll_entry*)p, p->seq) != 0) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to add packet seq=%u to input_send_q", etcp->log_name, p->seq);
memory_pool_free(etcp->inflight_pool, p);
queue_entry_free((struct ll_entry*)in_pkt);
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "[%s] EXIT (queue put failed)", etcp->log_name);
@ -415,7 +419,7 @@ static void input_queue_cb(struct ll_queue* q, void* arg) {
etcp_conn_process_send_queue(etcp);// сразу обработаем этот пакет
// input_queue_try_resume(etcp);
queue_resume_callback(etcp->input_queue);
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] nextloop, input_queue size=%d ", etcp->log_name, q->count);
}
@ -437,11 +441,28 @@ static void ack_timeout_check(void* arg) {
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "ack_timeout_check: starting check, now=%llu, timeout=%llu, rtt_avg_10=%u, jitter=%u",
// (unsigned long long)now, (unsigned long long)timeout, etcp->rtt_avg_10, etcp->jitter);
struct ll_entry* current;
while (current = etcp->input_wait_ack->head) {
struct INFLIGHT_PACKET* pkt = (struct INFLIGHT_PACKET*)current;
uint64_t elapsed = now - pkt->last_timestamp;
if (elapsed > timeout) {
struct ll_entry* current;
while (current = etcp->input_wait_ack->head) {
struct INFLIGHT_PACKET* pkt = (struct INFLIGHT_PACKET*)current;
// Check for invalid timestamp
if (pkt->last_timestamp == 0) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] ack_timeout_check: packet seq=%u has last_timestamp=0, not sent yet!",
etcp->log_name, pkt->seq);
// Schedule timer to check again later
etcp->retrans_timer = uasync_set_timeout(etcp->instance->ua, timeout, etcp, ack_timeout_check);
return;
}
if (pkt->last_timestamp > now) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] ack_timeout_check: packet seq=%u has last_timestamp=%llu > now=%llu, clock went backwards!",
etcp->log_name, pkt->seq, (unsigned long long)pkt->last_timestamp, (unsigned long long)now);
// Fix the timestamp to prevent overflow
pkt->last_timestamp = now;
}
uint64_t elapsed = now - pkt->last_timestamp;
if (elapsed > timeout) {
DEBUG_WARN(DEBUG_CATEGORY_ETCP, "[%s] ack_timeout_check: timeout for seq=%u, elapsed=%llu, timeout=%llu, send_count=%u. Moving: wait_ack -> send_q",
etcp->log_name, pkt->seq, (unsigned long long)elapsed, (unsigned long long)timeout, pkt->send_count);
@ -512,7 +533,7 @@ struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) {
size_t ack_q_size = queue_entry_count(etcp->ack_q);
if (!inf_pkt && ack_q_size == 0) {
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] no data/ack to send", etcp->log_name);
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] no data/ack to send", etcp->log_name);
return NULL;
}
@ -611,8 +632,8 @@ static void ack_response_timer_cb(void* arg) {// проверяем неотпр
void etcp_output_try_assembly(struct ETCP_CONN* etcp) {
// пробуем собрать выходную очередь из фрагментов
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] etcp=%p, last_delivered_id=%u, recv_q_count=%d",
etcp->log_name, etcp, etcp->last_delivered_id, queue_entry_count(etcp->recv_q));
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] etcp=%p, last_delivered_id=%u, recv_q_count=%d",
// etcp->log_name, etcp, etcp->last_delivered_id, queue_entry_count(etcp->recv_q));
uint32_t next_expected_id = etcp->last_delivered_id + 1;
int delivered_count = 0;
@ -623,12 +644,12 @@ void etcp_output_try_assembly(struct ETCP_CONN* etcp) {
struct ETCP_FRAGMENT* rx_pkt = (struct ETCP_FRAGMENT*)queue_find_data_by_id(etcp->recv_q, next_expected_id);
if (!rx_pkt) {
// No more contiguous packets found
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] no packet found for id=%u, stopping", etcp->log_name, next_expected_id);
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] no packet found for id=%u, stopping", etcp->log_name, next_expected_id);
break;
}
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] assembling packet id=%u (len=%u)", etcp->log_name,
rx_pkt->seq, rx_pkt->ll.len);
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] assembling packet id=%u (len=%u)", etcp->log_name,
// rx_pkt->seq, rx_pkt->ll.len);
// Simply move ETCP_FRAGMENT from recv_q to output_queue - no data copying needed
// Remove from recv_q first
@ -661,7 +682,7 @@ void etcp_output_try_assembly(struct ETCP_CONN* etcp) {
void etcp_ack_recv(struct ETCP_CONN* etcp, uint32_t seq, uint16_t ts, uint16_t dts) {
if (!etcp) return;
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] processing ACK for seq=%u, ts=%u, dts=%u", etcp->log_name, seq, ts, dts);
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] processing ACK for seq=%u, ts=%u, dts=%u", etcp->log_name, seq, ts, dts);
// Find the acknowledged packet in the wait_ack queue
struct INFLIGHT_PACKET* acked_pkt = (struct INFLIGHT_PACKET*)queue_find_data_by_id(etcp->input_wait_ack, seq);
@ -703,8 +724,8 @@ void etcp_ack_recv(struct ETCP_CONN* etcp, uint32_t seq, uint16_t ts, uint16_t d
}
etcp->jitter = rtt_max - rtt_min;
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] RTT updated - last=%u, avg_10=%u, avg_100=%u, jitter=%u",
etcp->log_name, rtt, etcp->rtt_avg_10, etcp->rtt_avg_100, etcp->jitter);
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] RTT updated - last=%u, avg_10=%u, avg_100=%u, jitter=%u",
// etcp->log_name, rtt, etcp->rtt_avg_10, etcp->rtt_avg_100, etcp->jitter);
}
// Update connection statistics
@ -712,7 +733,7 @@ void etcp_ack_recv(struct ETCP_CONN* etcp, uint32_t seq, uint16_t ts, uint16_t d
etcp->bytes_sent_total += acked_pkt->ll.len;
etcp->ack_packets_count++;
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] removed packet seq=%u from wait_ack, unacked_bytes now %u", etcp->log_name, seq, etcp->unacked_bytes);
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] removed packet seq=%u from wait_ack, unacked_bytes now %u total acked=%u", etcp->log_name, seq, etcp->unacked_bytes, etcp->ack_packets_count);
if (acked_pkt->ll.dgram) {
memory_pool_free(etcp->instance->data_pool, acked_pkt->ll.dgram);
@ -722,7 +743,7 @@ void etcp_ack_recv(struct ETCP_CONN* etcp, uint32_t seq, uint16_t ts, uint16_t d
// Try to resume sending more packets if window space opened up
input_queue_try_resume(etcp);
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] completed for seq=%u", etcp->log_name, seq);
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] completed for seq=%u", etcp->log_name, seq);
}
@ -778,7 +799,16 @@ void etcp_conn_input(struct ETCP_DGRAM* pkt) {
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] adding packet seq=%u to recv_q (last_delivered_id=%u)", etcp->log_name, seq, etcp->last_delivered_id);
// отправляем пакет в очередь на сборку
uint8_t* payload_data = memory_pool_alloc(etcp->instance->data_pool);
if (!payload_data) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to allocate payload_data from data_pool", etcp->log_name);
break;
}
struct ETCP_FRAGMENT* rx_pkt = (struct ETCP_FRAGMENT*)queue_entry_new_from_pool(etcp->io_pool);
if (!rx_pkt) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to allocate rx_pkt from io_pool", etcp->log_name);
memory_pool_free(etcp->instance->data_pool, payload_data);
break;
}
rx_pkt->seq=seq;
rx_pkt->timestamp=pkt->timestamp;
rx_pkt->ll.dgram=payload_data;

Loading…
Cancel
Save