Browse Source

add per-link inflight

nodeinfo-routing-update
jeka 2 weeks ago
parent
commit
26cfa3209a
  1. 2
      src/control_server.c
  2. 57
      src/etcp.c
  3. 1
      src/etcp.h
  4. 64
      src/etcp_connections.c
  5. 3
      src/etcp_connections.h
  6. 6
      tools/etcpmon/etcpmon_gui.c
  7. 12
      tools/etcpmon/etcpmon_protocol.h

2
src/control_server.c

@ -883,6 +883,8 @@ static void send_metrics(struct control_server* server, struct control_client* c
link_info[i].shaper_timer_active = (link->shaper_timer != NULL) ? 1 : 0;
link_info[i].keepalive_sent = (uint32_t)link->keepalive_sent_count;
link_info[i].keepalive_recv = (uint32_t)link->keepalive_recv_count;
link_info[i].inflight_bytes = link->inflight_bytes;
link_info[i].inflight_packets = link->inflight_packets;
link = link->next;
}

57
src/etcp.c

@ -268,6 +268,13 @@ void etcp_conn_reset(struct ETCP_CONN* etcp) {
clear_queue(etcp->recv_q);
clear_queue(etcp->ack_q);
struct ETCP_LINK* l = etcp->links;
while (l) {
l->inflight_bytes = 0;
l->inflight_packets = 0;
l = l->next;
}
// Reset timers (just clear the pointers - timers will expire naturally)
etcp->retrans_timer = NULL;
etcp->ack_resp_timer = NULL;
@ -331,7 +338,7 @@ void etcp_conn_reinit(struct ETCP_CONN* etcp) {// Если сбой в обме
etcp->reinit_count++;
etcp->initialized = 0;
etcp->initialized = 0;// еще раз придёт conn_ready_callback
// Вызываем etcp_conn_reset для сброса состояния
etcp_conn_reset(etcp);
@ -714,6 +721,41 @@ struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) {
if (inf_pkt) {
uint64_t now=get_time_tb();
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] send_q->wait_ack seq=%d TS=%llu", etcp->log_name, inf_pkt->seq, now);
// === NEW: per-link inflight + send_hist logic ===
// Compute 1-based link number in the linked list (as requested)
uint8_t link_num = 0;
struct ETCP_LINK* tmp = etcp->links;
while (tmp) {
link_num++;
if (tmp == link) break;
tmp = tmp->next;
}
if (link_num == 0) link_num = 255; // safety (should never happen)
// Fill history (send_count is the index before this attempt)
inf_pkt->send_hist[inf_pkt->send_count % 8] = link_num;
// Always subtract from previous last_link (if any) – this handles retransmission
if (inf_pkt->last_link) {
// if (inf_pkt->last_link->inflight_bytes >= inf_pkt->ll.len) {
inf_pkt->last_link->inflight_bytes -= inf_pkt->ll.len;
// } else {
// inf_pkt->last_link->inflight_bytes = 0;
// }
// if (inf_pkt->last_link->inflight_packets > 0) {
inf_pkt->last_link->inflight_packets--;
// }
}
// Always add to the CURRENT link (first send or retransmission)
link->inflight_bytes += inf_pkt->ll.len;
link->inflight_packets++;
// Update last_link for future ACK/retrans
inf_pkt->last_link = link;
inf_pkt->last_timestamp=now;
inf_pkt->send_count++;
inf_pkt->state=INFLIGHT_STATE_WAIT_ACK;
@ -901,6 +943,19 @@ void etcp_ack_recv(struct ETCP_CONN* etcp, uint32_t seq, uint16_t ts, uint16_t d
return;
}
// === NEW: subtract inflight from the LAST link the packet was sent on ===
if (acked_pkt->last_link) {
// if (acked_pkt->last_link->inflight_bytes >= acked_pkt->ll.len) {
acked_pkt->last_link->inflight_bytes -= acked_pkt->ll.len;
// } else {
// acked_pkt->last_link->inflight_bytes = 0;
// }
// if (acked_pkt->last_link->inflight_packets > 0) {
acked_pkt->last_link->inflight_packets--;
// }
}
// Update connection statistics
etcp->unacked_bytes -= acked_pkt->ll.len;
etcp->bytes_sent_total += acked_pkt->ll.len;

1
src/etcp.h

@ -48,6 +48,7 @@ struct INFLIGHT_PACKET {// выделяется из etcp->inflight_pool
uint8_t send_count; // Number of sends
uint8_t retrans_req_count; // Number of retrans requests
uint8_t state; // WAIT_ACK or WAIT_SEND
uint8_t send_hist[8]; // через какие каналы передавался пакет (NEW). send_count - head ptr
};
// Список пакетов для сборки. собирается в ll_queue (используем быстрый поиск с хешем)

64
src/etcp_connections.c

@ -186,6 +186,19 @@ static int etcp_all_links_down(struct ETCP_CONN* etcp) {
return 1; // All links are down
}
static void start_keepalive_timer(struct ETCP_LINK* link) {
// Start keepalive timer
if (link->init_timer) {// cancel init timer
uasync_cancel_timeout(link->etcp->instance->ua, link->init_timer);
link->init_timer = NULL;
}
if (link->keepalive_timer == NULL) {
link->keepalive_timer = uasync_set_timeout(link->etcp->instance->ua, link->keepalive_interval * 10, link, keepalive_timer_cb);
DEBUG_DEBUG(DEBUG_CATEGORY_CONNECTION, "[%s] Keepalive timer started on link %p (interval=%d ms)", link->etcp->log_name, link, link->keepalive_interval);
}
}
// Keepalive timer callback
static void keepalive_timer_cb(void* arg) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
@ -227,25 +240,13 @@ static void keepalive_timer_cb(void* arg) {
(unsigned long long)(elapsed / 10), link->keepalive_timeout);
}
}
// else {
// if (link->recv_keepalive != 1) {
// link->recv_keepalive = 1; // up
// loadbalancer_link_ready(link);
// DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "[%s] Link %p (local_id=%d) recv status changed to UP", link->etcp->log_name, link, link->local_link_id);
// }
// }
// Send keepalive only if no packets were sent since last tick
if (!link->pkt_sent_since_keepalive) etcp_link_send_keepalive(link);
link->pkt_sent_since_keepalive = 0;
restart_timer:
// Restart timer
if (link->keepalive_interval > 0) {
link->keepalive_timer = uasync_set_timeout(link->etcp->instance->ua,
link->keepalive_interval * 10,
link, keepalive_timer_cb);
}
link->keepalive_timer = uasync_set_timeout(link->etcp->instance->ua, link->keepalive_interval * 10, link, keepalive_timer_cb);
}
static uint32_t sockaddr_hash(struct sockaddr_storage* addr) {
@ -548,6 +549,7 @@ struct ETCP_LINK* etcp_link_new(struct ETCP_CONN* etcp, struct ETCP_SOCKET* conn
link->keepalive_timeout = 2000; // Default 2 seconds
link->keepalive_interval = 200; // Default 0.2 s
}
if (link->keepalive_interval < 10) link->keepalive_interval = 10;
link->keepalive_sent_count = 0;
link->keepalive_recv_count = 0;
@ -883,6 +885,7 @@ static void etcp_connections_read_callback_socket(socket_t sock, void* arg) {
}
link->keepalive_interval=(ack_hdr->keepalive[0]<<8) | ack_hdr->keepalive[1];
link->recovery_interval=((ack_hdr->recovery[0]<<8) | ack_hdr->recovery[1])*100;// timebase в link, timebase/100 в кодограмме
if (link->keepalive_interval < 10) link->keepalive_interval = 10;
DEBUG_DEBUG(DEBUG_CATEGORY_CONNECTION, "set keepalive for link=%d", link->keepalive_interval);
}
@ -941,14 +944,8 @@ static void etcp_connections_read_callback_socket(socket_t sock, void* arg) {
DEBUG_INFO(DEBUG_CATEGORY_CONNECTION, "[%s] Link %p (local_id=%d) initialized and marked as UP (server)",
link->etcp->log_name, link, link->local_link_id);
// Start keepalive timer
if (!link->keepalive_timer && link->keepalive_interval > 0) {
link->keepalive_timer = uasync_set_timeout(link->etcp->instance->ua,
link->keepalive_interval * 10,
link, keepalive_timer_cb);
DEBUG_DEBUG(DEBUG_CATEGORY_CONNECTION, "[%s] Keepalive timer started on link %p (interval=%d ms)",
link->etcp->log_name, link, link->keepalive_interval);
}
start_keepalive_timer(link);
loadbalancer_link_ready(link);
@ -1044,21 +1041,7 @@ process_decrypted:
link->etcp->peer_node_id = server_node_id; // If not set
etcp_update_log_name(link->etcp); // Update log_name with peer_node_id
// Cancel init timer if exists
if (link->init_timer) {
uasync_cancel_timeout(link->etcp->instance->ua, link->init_timer);
link->init_timer = NULL;
}
if (link->keepalive_timer) {
uasync_cancel_timeout(link->etcp->instance->ua, link->keepalive_timer);
link->keepalive_timer = NULL;
}
// Mark link as initialized
// DEBUG_DEBUG(DEBUG_CATEGORY_CONNECTION, "Setting link->initialized=1, link=%p, is_server=%d", link, link->is_server);
link->initialized = 1;// получен init response (client)
// link->link_status = 1; // Link is up after successful initialization
link->link_state = 3; // connected
if (code == ETCP_INIT_RESPONSE) {
@ -1077,10 +1060,8 @@ process_decrypted:
// Start keepalive timer
etcp_link_send_keepalive(link);
if (link->keepalive_interval > 0) {
DEBUG_DEBUG(DEBUG_CATEGORY_CONNECTION, "[%s] Keepalive timer started on link %p (interval=%d ms)", link->etcp->log_name, link, link->keepalive_interval);
link->keepalive_timer = uasync_set_timeout(link->etcp->instance->ua, link->keepalive_interval * 10, link, keepalive_timer_cb);
}
start_keepalive_timer(link);
loadbalancer_link_ready(link);
@ -1091,10 +1072,7 @@ process_decrypted:
}
if (link->link_state == 2) {// из recovery получен нормальный пакет - восстанавливаем линк в нормальный режим
if (link->init_timer) {// cancel init timer
uasync_cancel_timeout(link->etcp->instance->ua, link->init_timer);
link->init_timer = NULL;
}
start_keepalive_timer(link);
etcp_link_send_keepalive(link); // Start keepalive timer
link->link_state = 3; // connected
}

3
src/etcp_connections.h

@ -90,6 +90,9 @@ struct ETCP_LINK {
uint8_t shaper_state;
void* shaper_timer;
uint32_t inflight_bytes;
uint32_t inflight_packets;
size_t encrypt_errors;
size_t decrypt_errors;
size_t send_errors;

6
tools/etcpmon/etcpmon_gui.c

@ -1042,7 +1042,7 @@ void etcpmon_gui_update_metrics(struct etcpmon_app* app,
snprintf(line2, sizeof(line2),
" RTT=%.1f ms, avg10=%.1f ms, TT=%.1f ms Timers: Init=%s, KA=%s, Shaper=%s | "
"KAsent=%u, KArecv=%u",
"KAsent=%u, KArecv=%u, InB/P=%u/%u",
(float)links[i].rtt_last/10.0f,
(float)links[i].rtt_avg10/10.0f,
(float)links[i].tt_last/10.f,
@ -1050,7 +1050,9 @@ void etcpmon_gui_update_metrics(struct etcpmon_app* app,
links[i].keepalive_timer_active ? "ON" : "OFF",
links[i].shaper_timer_active ? "ON" : "OFF",
links[i].keepalive_sent,
links[i].keepalive_recv);
links[i].keepalive_recv,
links[i].inflight_bytes,
links[i].inflight_packets);
SendMessageA(app->hListLinks, LB_ADDSTRING, 0, (LPARAM)line1);
SendMessageA(app->hListLinks, LB_ADDSTRING, 0, (LPARAM)line2);

12
tools/etcpmon/etcpmon_protocol.h

@ -12,6 +12,7 @@
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
@ -50,13 +51,13 @@ extern "C" {
#define ETCPMON_ERR_NO_CONN_SELECTED 0x03 /* No connection selected */
#define ETCPMON_ERR_SERVER_BUSY 0x04 /* Server too busy */
#pragma pack(push, 1)
/* ============================================================================
* Message Header
* ============================================================================
* seq_id: 0 = broadcast/unsolicited, 1-255 = request/response pair */
#pragma pack(push, 1)
struct etcpmon_msg_header {
uint16_t size; /* Total message size including header */
uint8_t type; /* Message type (command or response) */
@ -107,7 +108,6 @@ struct etcpmon_etcp_metrics {
uint32_t ack_count; /* ACK packets received */
uint32_t unacked_bytes; /* Current unacknowledged bytes */
uint32_t optimal_inflight; /* Target inflight window */
uint8_t links_count; /* Number of active links */
/* Queue metrics */
uint32_t input_queue_bytes;
@ -128,6 +128,8 @@ struct etcpmon_etcp_metrics {
uint32_t reset_count;
uint32_t pkt_format_errors;
uint8_t links_count; /* Number of active links */
/* Timer flags (1 = active, 0 = NULL) */
uint8_t retrans_timer_active;
uint8_t ack_resp_timer_active;
@ -191,6 +193,10 @@ struct etcpmon_link_metrics {
/* Keepalive counters */
uint32_t keepalive_sent;
uint32_t keepalive_recv;
/* Inflight counters */
uint32_t inflight_bytes;
uint32_t inflight_packets;
};
/* TUN interface metrics */

Loading…
Cancel
Save