Browse Source

reconnect fix

nodeinfo-routing-update
jeka 3 weeks ago
parent
commit
8c424fdf51
  1. 6
      src/etcp.c
  2. 94
      src/etcp_connections.c

6
src/etcp.c

@ -184,6 +184,7 @@ void etcp_connection_close(struct ETCP_CONN* etcp) {
// Cancel active timers to prevent memory leaks // Cancel active timers to prevent memory leaks
if (etcp->retrans_timer) { if (etcp->retrans_timer) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] RETRANS TMR CANCEL", etcp->log_name);
uasync_cancel_timeout(etcp->instance->ua, etcp->retrans_timer); uasync_cancel_timeout(etcp->instance->ua, etcp->retrans_timer);
etcp->retrans_timer = NULL; etcp->retrans_timer = NULL;
} }
@ -548,11 +549,14 @@ static void input_send_q_cb(struct ll_queue* q, void* arg) {// etcp->input_send_
static void ack_timeout_check(void* arg) { static void ack_timeout_check(void* arg) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg; struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg;
uint64_t now = get_time_tb(); uint64_t now = get_time_tb();
uint64_t timeout = 1000;//(uint64_t)(etcp->rtt_avg_10 * RETRANS_K1) + (uint64_t)(etcp->jitter * RETRANS_K2); uint64_t timeout = 1000;//(uint64_t)(etcp->rtt_avg_10 * RETRANS_K1) + (uint64_t)(etcp->jitter * RETRANS_K2);
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] RETRANS TMR HANDLER", etcp->log_name);
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "ack_timeout_check: starting check, now=%llu, timeout=%llu, rtt_avg_10=%u, jitter=%u", // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "ack_timeout_check: starting check, now=%llu, timeout=%llu, rtt_avg_10=%u, jitter=%u",
// (unsigned long long)now, (unsigned long long)timeout, etcp->rtt_avg_10, etcp->jitter); // (unsigned long long)now, (unsigned long long)timeout, etcp->rtt_avg_10, etcp->jitter);
@ -565,6 +569,7 @@ static void ack_timeout_check(void* arg) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] ack_timeout_check: packet seq=%u has last_timestamp=0, not sent yet!", DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] ack_timeout_check: packet seq=%u has last_timestamp=0, not sent yet!",
etcp->log_name, pkt->seq); etcp->log_name, pkt->seq);
// Schedule timer to check again later // Schedule timer to check again later
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] RETRANS TMR SET", etcp->log_name);
etcp->retrans_timer = uasync_set_timeout(etcp->instance->ua, timeout, etcp, ack_timeout_check); etcp->retrans_timer = uasync_set_timeout(etcp->instance->ua, timeout, etcp, ack_timeout_check);
return; return;
} }
@ -600,6 +605,7 @@ static void ack_timeout_check(void* arg) {
else {// не надо до конца сканировать - они уже сортированы по таймстемпу т.к. очередь fifo, а timestamp = время добавления в очередь = время отправки else {// не надо до конца сканировать - они уже сортированы по таймстемпу т.к. очередь fifo, а timestamp = время добавления в очередь = время отправки
// shedule timer // shedule timer
int64_t next_timeout=timeout - elapsed; int64_t next_timeout=timeout - elapsed;
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] RETRANS TMR SET", etcp->log_name);
etcp->retrans_timer = uasync_set_timeout(etcp->instance->ua, next_timeout+10, etcp, ack_timeout_check); etcp->retrans_timer = uasync_set_timeout(etcp->instance->ua, next_timeout+10, etcp, ack_timeout_check);
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] ack_timeout_check: retransmission timer set for %llu units", etcp->log_name, next_timeout); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] ack_timeout_check: retransmission timer set for %llu units", etcp->log_name, next_timeout);
return; return;

94
src/etcp_connections.c

@ -24,7 +24,7 @@ static void etcp_connections_read_callback_socket(socket_t sock, void* arg);
static void etcp_link_remove_from_connections(struct ETCP_SOCKET* conn, struct ETCP_LINK* link); static void etcp_link_remove_from_connections(struct ETCP_SOCKET* conn, struct ETCP_LINK* link);
static void etcp_link_send_init(struct ETCP_LINK* link, uint8_t reset); static void etcp_link_send_init(struct ETCP_LINK* link, uint8_t reset);
static int etcp_link_send_reset(struct ETCP_LINK* link); //static int etcp_link_send_reset(struct ETCP_LINK* link);
static void etcp_link_init_timer_cbk(void* arg); static void etcp_link_init_timer_cbk(void* arg);
static void etcp_link_send_keepalive(struct ETCP_LINK* link); static void etcp_link_send_keepalive(struct ETCP_LINK* link);
static void keepalive_timer_cb(void* arg); static void keepalive_timer_cb(void* arg);
@ -185,22 +185,6 @@ static int etcp_all_links_down(struct ETCP_CONN* etcp) {
return 1; // All links are down return 1; // All links are down
} }
// Cancel init_timer for all links of an ETCP_CONN
static void etcp_cancel_all_init_timers(struct ETCP_CONN* etcp) {
if (!etcp) return;
struct ETCP_LINK* link = etcp->links;
while (link) {
if (link->init_timer) {
uasync_cancel_timeout(link->etcp->instance->ua, link->init_timer);
link->init_timer = NULL;
DEBUG_DEBUG(DEBUG_CATEGORY_CONNECTION, "[%s] Cancelled init_timer on link %p",
etcp->log_name, link);
}
link = link->next;
}
}
// Keepalive timer callback // Keepalive timer callback
static void keepalive_timer_cb(void* arg) { static void keepalive_timer_cb(void* arg) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, ""); DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
@ -261,27 +245,6 @@ restart_timer:
} }
} }
static int etcp_link_send_reset(struct ETCP_LINK* link) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!link) return -1;
struct ETCP_DGRAM* dgram = u_malloc(sizeof(struct ETCP_DGRAM) + 4);
if (!dgram) {
DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "etcp_link_send_reset: malloc failed");
return -1;
}
dgram->link = link;
dgram->data_len = 1;
dgram->noencrypt_len = 0;
dgram->data[0] = 0x06;
DEBUG_INFO(DEBUG_CATEGORY_CONNECTION, "Sending RESET to link");
int ret = etcp_encrypt_send(dgram);
u_free(dgram);
return ret;
}
static uint32_t sockaddr_hash(struct sockaddr_storage* addr) { static uint32_t sockaddr_hash(struct sockaddr_storage* addr) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, ""); DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
socklen_t addr_len = (addr->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6); socklen_t addr_len = (addr->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6);
@ -645,46 +608,6 @@ void etcp_link_close(struct ETCP_LINK* link) {
u_free(link); u_free(link);
} }
// Reset link state (for INIT_RESPONSE with reset)
static void etcp_link_reset(struct ETCP_LINK* link) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!link) return;
DEBUG_INFO(DEBUG_CATEGORY_CONNECTION, "[%s] Resetting link %p (local_id=%d)",
link->etcp->log_name, link, link->local_link_id);
// Cancel all timers
if (link->init_timer) {
uasync_cancel_timeout(link->etcp->instance->ua, link->init_timer);
link->init_timer = NULL;
}
if (link->shaper_timer) {
uasync_cancel_timeout(link->etcp->instance->ua, link->shaper_timer);
link->shaper_timer = NULL;
}
// Reset state
link->initialized = 0;
link->link_status = 0;
link->recv_keepalive = 0;
link->remote_keepalive = 0;
link->init_retry_count = 0;
link->init_timeout = 0;
// Reset shaper state
link->shaper_load_time_tb = 0;
link->shaper_sub_nanotime = 0;
link->shaper_state = 0;
// Reset counters
link->encrypt_errors = 0;
link->decrypt_errors = 0;
link->send_errors = 0;
link->recv_errors = 0;
link->total_encrypted = 0;
link->total_decrypted = 0;
}
int etcp_encrypt_send(struct ETCP_DGRAM* dgram) { int etcp_encrypt_send(struct ETCP_DGRAM* dgram) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, ""); DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
@ -1019,9 +942,6 @@ process_decrypted:
link->keepalive_recv_count++; link->keepalive_recv_count++;
} }
// Cancel all init timers - link is alive, no need for recovery
etcp_cancel_all_init_timers(link->etcp);
size_t offset = 0; size_t offset = 0;
uint8_t code = pkt->data[offset++]; uint8_t code = pkt->data[offset++];
@ -1123,9 +1043,19 @@ process_decrypted:
return; // INIT_RESPONSE is handled, no further processing needed return; // INIT_RESPONSE is handled, no further processing needed
} }
if (link->link_state == 2) {// из recovery получен нормальный пакет - восстанавливаем линк в нормальный режим
if (link->init_timer) {// cancel init timer
uasync_cancel_timeout(link->etcp->instance->ua, link->init_timer);
link->init_timer = NULL;
}
etcp_link_send_keepalive(link); // Start keepalive timer
link->link_state = 3; // connected
}
// log_dump("RECV decrypted:", pkt->data, pkt->data_len, link); // log_dump("RECV decrypted:", pkt->data, pkt->data_len, link);
etcp_conn_input(pkt); if (link->link_state == 3) etcp_conn_input(pkt);
return; return;
ec_fr: ec_fr:

Loading…
Cancel
Save