Browse Source

ack fix

nodeinfo-routing-update
jeka 3 weeks ago
parent
commit
771cbc1c3f
  1. 52
      src/etcp.c
  2. 2
      src/etcp.h
  3. 2
      src/etcp_connections.c

52
src/etcp.c

@ -548,42 +548,23 @@ static void input_send_q_cb(struct ll_queue* q, void* arg) {// etcp->input_send_
}
static void ack_timeout_check(void* arg) {
static void ack_timeout_cb(void* arg);
static void ack_timeout_check(struct ETCP_CONN* etcp) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg;
uint64_t now = get_time_tb();
uint64_t timeout = 1000;//(uint64_t)(etcp->rtt_avg_10 * RETRANS_K1) + (uint64_t)(etcp->jitter * RETRANS_K2);
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] RETRANS TMR HANDLER", etcp->log_name);
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "ack_timeout_check: starting check, now=%llu, timeout=%llu, rtt_avg_10=%u, jitter=%u",
// (unsigned long long)now, (unsigned long long)timeout, etcp->rtt_avg_10, etcp->jitter);
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "ack_timeout_check: starting check, now=%llu, timeout=%llu, rtt_avg_10=%u, jitter=%u",
(unsigned long long)now, (unsigned long long)timeout, etcp->rtt_avg_10, etcp->jitter);
struct ll_entry* current;
while (current = etcp->input_wait_ack->head) {
struct INFLIGHT_PACKET* pkt = (struct INFLIGHT_PACKET*)current;
// Check for invalid timestamp
if (pkt->last_timestamp == 0) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] ack_timeout_check: packet seq=%u has last_timestamp=0, not sent yet!",
etcp->log_name, pkt->seq);
// Schedule timer to check again later
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] RETRANS TMR SET", etcp->log_name);
etcp->retrans_timer = uasync_set_timeout(etcp->instance->ua, timeout, etcp, ack_timeout_check);
return;
}
if (pkt->last_timestamp > now) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] ack_timeout_check: packet seq=%u has last_timestamp=%llu > now=%llu, clock went backwards!",
etcp->log_name, pkt->seq, (unsigned long long)pkt->last_timestamp, (unsigned long long)now);
// Fix the timestamp to prevent overflow
pkt->last_timestamp = now;
}
uint64_t elapsed = now - pkt->last_timestamp;
int64_t elapsed = now - pkt->last_timestamp;
if (elapsed > timeout) {
DEBUG_WARN(DEBUG_CATEGORY_ETCP, "[%s] ack_timeout_check: timeout for seq=%u, elapsed=%llu, timeout=%llu, send_count=%u. Moving: wait_ack -> send_q",
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[%s] ack_timeout_check: timeout for seq=%u, elapsed=%llu, timeout=%llu, send_count=%u. Moving: wait_ack -> send_q",
etcp->log_name, pkt->seq, (unsigned long long)elapsed, (unsigned long long)timeout, pkt->send_count);
// Increment counters
@ -605,21 +586,26 @@ static void ack_timeout_check(void* arg) {
else {// не надо до конца сканировать - они уже сортированы по таймстемпу т.к. очередь fifo, а timestamp = время добавления в очередь = время отправки
// shedule timer
int64_t next_timeout=timeout - elapsed;
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] RETRANS TMR SET", etcp->log_name);
etcp->retrans_timer = uasync_set_timeout(etcp->instance->ua, next_timeout+10, etcp, ack_timeout_check);
if (next_timeout<0) next_timeout=0;
etcp->retrans_timer = uasync_set_timeout(etcp->instance->ua, next_timeout+10, etcp, ack_timeout_cb);
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] ack_timeout_check: retransmission timer set for %llu units", etcp->log_name, next_timeout);
return;
}
}
// если всё выгребли - надо взвести resume
etcp->retrans_timer = NULL;
queue_resume_callback(etcp->input_wait_ack);
}
static void wait_ack_cb(struct ll_queue* q, void* arg) {
static void ack_timeout_cb(void* arg) {// сработал таймер переотправки
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg;
etcp->retrans_timer=NULL;
ack_timeout_check(etcp);// он установит новый таймер или выгребет всё и установит ожидание на очередь
}
static void wait_ack_cb(struct ll_queue* q, void* arg) {// добавили пакет в ожидание ACK
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg;
ack_timeout_check(etcp);// ack_cb срабатывает когда init (таймер не инициализирован) или когда empty (таймер не активен)
if (!etcp->retrans_timer) ack_timeout_check(etcp);// если таймер ретрансмиссий взведен - дожидаемся таймера.
}
// Подготовить и отправить кодограмму

2
src/etcp.h

@ -162,7 +162,7 @@ struct ETCP_CONN {
// Logging identifier (format: "XXXX→XXXX [name]" - last 4 digits of local and peer node_id + optional name)
char log_name[256];
const char* name; // Connection name from config (e.g., "client_test1"), or NULL/empty
char* name; // Connection name from config (e.g., "client_test1"), or NULL/empty
};
// Functions

2
src/etcp_connections.c

@ -629,6 +629,7 @@ int etcp_encrypt_send(struct ETCP_DGRAM* dgram) {
size_t enc_buf_len=0;
dgram->timestamp=get_current_timestamp();
dgram->link->total_encrypted += dgram->data_len;
// DUMP: Show packet before encryption
if (debug_should_output(DEBUG_LEVEL_DEBUG, DEBUG_CATEGORY_CRYPTO)) log_dump("ECTP_ENCRYPT_SEND", dgram->data, dgram->data_len);
// DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Encrypt start");
@ -672,7 +673,6 @@ int etcp_encrypt_send(struct ETCP_DGRAM* dgram) {
dgram->link->send_errors++; errcode=4; goto es_err;
} else {
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "sendto succeeded, sent=%zd bytes to port %d", sent, ntohs(((struct sockaddr_in*)addr)->sin_port));
dgram->link->total_encrypted += sent;
}
return (int)sent;
es_err:

Loading…
Cancel
Save