Browse Source

1

nodeinfo-routing-update
jeka 4 weeks ago
parent
commit
8880db0ca2
  1. 4
      lib/ll_queue.c
  2. 342
      lib/u_async.c
  3. 15
      lib/u_async.h
  4. 10
      src/control_server.c
  5. 34
      src/etcp.c
  6. 13
      src/etcp.h
  7. 67
      tools/etcpmon/etcpmon_gui.c
  8. 24
      tools/etcpmon/etcpmon_gui.h
  9. 8
      tools/etcpmon/etcpmon_protocol.h

4
lib/ll_queue.c

@ -117,7 +117,7 @@ void queue_free(struct ll_queue* q) {
// Отменить отложенное возобновление // Отменить отложенное возобновление
if (q->resume_timeout_id) { if (q->resume_timeout_id) {
uasync_cancel_timeout(q->ua, q->resume_timeout_id); uasync_call_soon_cancel(q->ua, q->resume_timeout_id);
q->resume_timeout_id = NULL; q->resume_timeout_id = NULL;
} }
@ -151,7 +151,7 @@ void queue_resume_callback(struct ll_queue* q) {
// Если есть элементы, запланировать вызов коллбэка // Если есть элементы, запланировать вызов коллбэка
if (q->head && q->callback && !q->resume_timeout_id) { if (q->head && q->callback && !q->resume_timeout_id) {
q->resume_timeout_id = uasync_set_timeout(q->ua, 0, q, queue_resume_timeout_cb); q->resume_timeout_id = uasync_call_soon(q->ua, q, queue_resume_timeout_cb);
} }
} }

342
lib/u_async.c

@ -1,9 +1,10 @@
// uasync.c // uasync.c
#include "u_async.h" #include "u_async.h"
#include "platform_compat.h" #include "platform_compat.h"
#include "debug_config.h" #include "debug_config.h"
#include "mem.h" #include "mem.h"
#include "memory_pool.h"
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
@ -27,13 +28,13 @@
// Timeout node with safe cancellation // Timeout node with safe cancellation
struct timeout_node { struct timeout_node {
void* arg; void* arg;
timeout_callback_t callback; timeout_callback_t callback;
uint64_t expiration_ms; // absolute expiration time in milliseconds uint64_t expiration_ms; // absolute expiration time in milliseconds
struct UASYNC* ua; // Pointer back to uasync instance for counter updates struct UASYNC* ua; // Pointer back to uasync instance for counter updates
int cancelled; // Cancellation flag struct timeout_node* next; // For immediate queue (FIFO)
}; };
// Socket node with array-based storage // Socket node with array-based storage
@ -273,13 +274,13 @@ static struct socket_node* socket_array_get_by_sock(struct socket_array* sa, soc
return &sa->sockets[index]; return &sa->sockets[index];
} }
// Callback to u_free timeout node and update counters // Callback to u_free timeout node and update counters
static void timeout_node_free_callback(void* user_data, void* data) { static void timeout_node_free_callback(void* user_data, void* data) {
struct UASYNC* ua = (struct UASYNC*)user_data; struct UASYNC* ua = (struct UASYNC*)user_data;
struct timeout_node* node = (struct timeout_node*)data; struct timeout_node* node = (struct timeout_node*)data;
(void)node; // Not used directly, but keep for consistency (void)node; // Not used directly, but keep for consistency
ua->timer_free_count++; ua->timer_free_count++;
u_free(data); memory_pool_free(ua->timeout_pool, data);
} }
// Helper to get current time // Helper to get current time
@ -398,35 +399,55 @@ static uint64_t timeval_to_ms(const struct timeval* tv) {
// Simplified timeout handling without reference counting // Simplified timeout handling without reference counting
// Process expired timeouts with safe cancellation // Process expired timeouts with safe cancellation
static void process_timeouts(struct UASYNC* ua) { static void process_timeouts(struct UASYNC* ua) {
if (!ua || !ua->timeout_heap) return; if (!ua) return;
struct timeval now_tv; // Сначала обрабатываем immediate_queue (FIFO)
get_current_time(&now_tv); while (ua->immediate_queue_head) {
uint64_t now_ms = timeval_to_ms(&now_tv); struct timeout_node* node = ua->immediate_queue_head;
ua->immediate_queue_head = node->next;
while (1) { if (!ua->immediate_queue_head) {
TimeoutEntry entry; ua->immediate_queue_tail = NULL;
if (timeout_heap_peek(ua->timeout_heap, &entry) != 0) break; }
if (entry.expiration > now_ms) break;
if (node && node->callback) {
// Pop the expired timeout node->callback(node->arg);
timeout_heap_pop(ua->timeout_heap, &entry); }
struct timeout_node* node = (struct timeout_node*)entry.data;
if (node && node->ua) {
if (node && node->callback && !node->cancelled) { node->ua->timer_free_count++;
// Execute callback only if not cancelled }
node->callback(node->arg); memory_pool_free(ua->timeout_pool, node);
} }
// Always u_free the node after processing if (!ua->timeout_heap) return;
if (node && node->ua) {
node->ua->timer_free_count++; struct timeval now_tv;
} get_current_time(&now_tv);
u_free(node); uint64_t now_ms = timeval_to_ms(&now_tv);
continue; // Process next expired timeout
} while (1) {
TimeoutEntry entry;
if (timeout_heap_peek(ua->timeout_heap, &entry) != 0) break;
if (entry.expiration > now_ms) break;
// Pop the expired timeout
timeout_heap_pop(ua->timeout_heap, &entry);
struct timeout_node* node = (struct timeout_node*)entry.data;
if (node && node->callback) {
// Execute callback only if not cancelled
node->callback(node->arg);
}
// Always u_free the node after processing
if (node && node->ua) {
node->ua->timer_free_count++;
}
memory_pool_free(ua->timeout_pool, node);
continue; // Process next expired timeout
}
} }
// Compute time to next timeout // Compute time to next timeout
@ -468,17 +489,16 @@ void* uasync_set_timeout(struct UASYNC* ua, int timeout_tb, void* arg, timeout_c
// DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_set_timeout: timeout=%d.%d ms, arg=%p, callback=%p", timeout_tb/10, timeout_tb%10, arg, callback); // DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_set_timeout: timeout=%d.%d ms, arg=%p, callback=%p", timeout_tb/10, timeout_tb%10, arg, callback);
struct timeout_node* node = u_malloc(sizeof(struct timeout_node)); struct timeout_node* node = memory_pool_alloc(ua->timeout_pool);
if (!node) { if (!node) {
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "uasync_set_timeout: failed to allocate node"); DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "uasync_set_timeout: failed to allocate node");
return NULL; return NULL;
} }
ua->timer_alloc_count++; ua->timer_alloc_count++;
node->arg = arg; node->arg = arg;
node->callback = callback; node->callback = callback;
node->ua = ua; node->ua = ua;
node->cancelled = 0;
// Calculate expiration time in milliseconds // Calculate expiration time in milliseconds
struct timeval now; struct timeval now;
@ -486,19 +506,61 @@ void* uasync_set_timeout(struct UASYNC* ua, int timeout_tb, void* arg, timeout_c
timeval_add_tb(&now, timeout_tb); timeval_add_tb(&now, timeout_tb);
node->expiration_ms = timeval_to_ms(&now); node->expiration_ms = timeval_to_ms(&now);
// Add to heap // Add to heap
if (timeout_heap_push(ua->timeout_heap, node->expiration_ms, node) != 0) { if (timeout_heap_push(ua->timeout_heap, node->expiration_ms, node) != 0) {
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "uasync_set_timeout: failed to push to heap"); DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "uasync_set_timeout: failed to push to heap");
u_free(node); memory_pool_free(ua->timeout_pool, node);
ua->timer_free_count++; // Balance the alloc counter ua->timer_free_count++; // Balance the alloc counter
return NULL; return NULL;
} }
return node; return node;
} }
// Immediate execution in next mainloop (FIFO order)
void* uasync_call_soon(struct UASYNC* ua, void* user_arg, timeout_callback_t callback) {
if (!ua || !callback) return NULL;
if (!ua->timeout_pool) return NULL;
struct timeout_node* node = memory_pool_alloc(ua->timeout_pool);
if (!node) {
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "uasync_call_soon: failed to allocate node");
return NULL;
}
ua->timer_alloc_count++;
node->arg = user_arg;
node->callback = callback;
node->ua = ua;
node->expiration_ms = 0;
node->next = NULL;
// FIFO: добавляем в конец очереди
if (ua->immediate_queue_tail) {
ua->immediate_queue_tail->next = node;
ua->immediate_queue_tail = node;
} else {
ua->immediate_queue_head = ua->immediate_queue_tail = node;
}
return node;
}
// Cancel immediate callback by setting callback to NULL - O(1)
err_t uasync_call_soon_cancel(struct UASYNC* ua, void* t_id) {
if (!ua || !t_id) return ERR_FAIL;
struct timeout_node* node = (struct timeout_node*)t_id;
if (node->ua != ua) return ERR_FAIL;
// Simply nullify callback - will be skipped in process_timeouts
node->callback = NULL;
return ERR_OK;
}
// Instance version // Instance version
err_t uasync_cancel_timeout(struct UASYNC* ua, void* t_id) { err_t uasync_cancel_timeout(struct UASYNC* ua, void* t_id) {
if (!ua || !t_id || !ua->timeout_heap) { if (!ua || !t_id || !ua->timeout_heap) {
@ -512,7 +574,6 @@ err_t uasync_cancel_timeout(struct UASYNC* ua, void* t_id) {
// Try to cancel from heap first // Try to cancel from heap first
if (timeout_heap_cancel(ua->timeout_heap, node->expiration_ms, node) == 0) { if (timeout_heap_cancel(ua->timeout_heap, node->expiration_ms, node) == 0) {
// Successfully marked as deleted - u_free will happen lazily in heap // Successfully marked as deleted - u_free will happen lazily in heap
node->cancelled = 1;
node->callback = NULL; node->callback = NULL;
// DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_cancel_timeout: successfully cancelled timer %p from heap", node); // DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_cancel_timeout: successfully cancelled timer %p from heap", node);
return ERR_OK; return ERR_OK;
@ -1107,11 +1168,13 @@ struct UASYNC* uasync_create(void) {
ua->poll_fds_count = 0; ua->poll_fds_count = 0;
ua->poll_fds_dirty = 1; ua->poll_fds_dirty = 1;
ua->wakeup_pipe[0] = -1; ua->wakeup_pipe[0] = -1;
ua->wakeup_pipe[1] = -1; ua->wakeup_pipe[1] = -1;
ua->wakeup_initialized = 0; ua->wakeup_initialized = 0;
ua->posted_tasks_head = NULL; ua->posted_tasks_head = NULL;
ua->immediate_queue_head = NULL;
ua->immediate_queue_tail = NULL;
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Creating SA..."); DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Creating SA...");
ua->sockets = socket_array_create(16); ua->sockets = socket_array_create(16);
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Creating SA1..."); DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Creating SA1...");
@ -1129,22 +1192,40 @@ struct UASYNC* uasync_create(void) {
return NULL; return NULL;
} }
ua->timeout_heap = timeout_heap_create(16); ua->timeout_heap = timeout_heap_create(16);
if (!ua->timeout_heap) { if (!ua->timeout_heap) {
socket_array_destroy(ua->sockets); socket_array_destroy(ua->sockets);
if (ua->wakeup_initialized) { if (ua->wakeup_initialized) {
#ifdef _WIN32 #ifdef _WIN32
closesocket((SOCKET)(intptr_t)ua->wakeup_pipe[0]); closesocket((SOCKET)(intptr_t)ua->wakeup_pipe[0]);
closesocket((SOCKET)(intptr_t)ua->wakeup_pipe[1]); closesocket((SOCKET)(intptr_t)ua->wakeup_pipe[1]);
#else #else
close(ua->wakeup_pipe[0]); close(ua->wakeup_pipe[0]);
close(ua->wakeup_pipe[1]); close(ua->wakeup_pipe[1]);
#endif #endif
} }
u_free(ua); u_free(ua);
return NULL; return NULL;
} }
// Initialize timeout pool
ua->timeout_pool = memory_pool_init(sizeof(struct timeout_node));
if (!ua->timeout_pool) {
timeout_heap_destroy(ua->timeout_heap);
socket_array_destroy(ua->sockets);
if (ua->wakeup_initialized) {
#ifdef _WIN32
closesocket((SOCKET)(intptr_t)ua->wakeup_pipe[0]);
closesocket((SOCKET)(intptr_t)ua->wakeup_pipe[1]);
#else
close(ua->wakeup_pipe[0]);
close(ua->wakeup_pipe[1]);
#endif
}
u_free(ua);
return NULL;
}
// Set callback to u_free timeout nodes and update counters // Set callback to u_free timeout nodes and update counters
timeout_heap_set_free_callback(ua->timeout_heap, ua, timeout_node_free_callback); timeout_heap_set_free_callback(ua->timeout_heap, ua, timeout_node_free_callback);
@ -1256,8 +1337,8 @@ void uasync_print_resources(struct UASYNC* ua, const char* prefix) {
if (!ua->timeout_heap->heap[i].deleted) { if (!ua->timeout_heap->heap[i].deleted) {
active_timers++; active_timers++;
struct timeout_node* node = (struct timeout_node*)ua->timeout_heap->heap[i].data; struct timeout_node* node = (struct timeout_node*)ua->timeout_heap->heap[i].data;
printf(" Timer: node=%p, expires=%llu ms, cancelled=%d\n", printf(" Timer: node=%p, expires=%llu ms\n",
node, (unsigned long long)ua->timeout_heap->heap[i].expiration, node->cancelled); node, (unsigned long long)ua->timeout_heap->heap[i].expiration);
} }
} }
printf(" Active timers in heap: %zu\n", active_timers); printf(" Active timers in heap: %zu\n", active_timers);
@ -1306,21 +1387,41 @@ void uasync_destroy(struct UASYNC* ua, int close_fds) {
// Continue cleanup, will abort after if leaks remain // Continue cleanup, will abort after if leaks remain
} }
// Free all remaining timeouts // Free all remaining timeouts
if (ua->timeout_heap) {
size_t u_freed_count = 0; // Очистить immediate_queue
while (1) { while (ua->immediate_queue_head) {
TimeoutEntry entry; struct timeout_node* node = ua->immediate_queue_head;
if (timeout_heap_pop(ua->timeout_heap, &entry) != 0) break; ua->immediate_queue_head = node->next;
struct timeout_node* node = (struct timeout_node*)entry.data; if (node) {
node->ua->timer_free_count++;
// Free all timer nodes (avoid double-u_free bug) memory_pool_free(ua->timeout_pool, node);
if (node) { }
ua->timer_free_count++; }
u_free(node); ua->immediate_queue_tail = NULL;
}
} // Очистить heap
timeout_heap_destroy(ua->timeout_heap); if (ua->timeout_heap) {
size_t u_freed_count = 0;
while (1) {
TimeoutEntry entry;
if (timeout_heap_pop(ua->timeout_heap, &entry) != 0) break;
struct timeout_node* node = (struct timeout_node*)entry.data;
// Free all timer nodes (avoid double-u_free bug)
if (node) {
ua->timer_free_count++;
memory_pool_free(ua->timeout_pool, node);
}
}
timeout_heap_destroy(ua->timeout_heap);
ua->timeout_heap = NULL;
}
// Destroy timeout pool
if (ua->timeout_pool) {
memory_pool_destroy(ua->timeout_pool);
ua->timeout_pool = NULL;
} }
// Free all socket nodes using array approach // Free all socket nodes using array approach
@ -1401,20 +1502,29 @@ void uasync_destroy(struct UASYNC* ua, int close_fds) {
socket_platform_cleanup(); socket_platform_cleanup();
} }
void uasync_init_instance(struct UASYNC* ua) { void uasync_init_instance(struct UASYNC* ua) {
if (!ua) return; if (!ua) return;
// Initialize socket array if not present // Initialize socket array if not present
if (!ua->sockets) { if (!ua->sockets) {
ua->sockets = socket_array_create(16); ua->sockets = socket_array_create(16);
} }
if (!ua->timeout_heap) { if (!ua->timeout_pool) {
ua->timeout_heap = timeout_heap_create(16); ua->timeout_pool = memory_pool_init(sizeof(struct timeout_node));
if (ua->timeout_heap) { }
timeout_heap_set_free_callback(ua->timeout_heap, ua, timeout_node_free_callback);
} if (!ua->immediate_queue_head) {
} ua->immediate_queue_head = NULL;
ua->immediate_queue_tail = NULL;
}
if (!ua->timeout_heap) {
ua->timeout_heap = timeout_heap_create(16);
if (ua->timeout_heap) {
timeout_heap_set_free_callback(ua->timeout_heap, ua, timeout_node_free_callback);
}
}
} }
// Debug statistics // Debug statistics

15
lib/u_async.h

@ -27,14 +27,21 @@ typedef int err_t;
typedef void (*uasync_post_callback_t)(void* user_arg); typedef void (*uasync_post_callback_t)(void* user_arg);
#include "memory_pool.h"
struct timeout_node; // Forward declaration
struct posted_task { struct posted_task {
uasync_post_callback_t callback; uasync_post_callback_t callback;
void* arg; void* arg;
struct posted_task* next; struct posted_task* next;
}; };
// Uasync instance structure // Uasync instance structure
struct UASYNC { struct UASYNC {
struct memory_pool* timeout_pool; // Pool for timeout_node allocation
struct timeout_node* immediate_queue_head; // FIFO queue for immediate execution
struct timeout_node* immediate_queue_tail;
TimeoutHeap* timeout_heap; // Heap for timeout management TimeoutHeap* timeout_heap; // Heap for timeout management
struct socket_array* sockets; // Array-based socket management struct socket_array* sockets; // Array-based socket management
// Debug counters for memory allocation tracking // Debug counters for memory allocation tracking
@ -79,6 +86,10 @@ uint64_t get_time_tb(void);
void* uasync_set_timeout(struct UASYNC* ua, int timeout_tb, void* user_arg, timeout_callback_t callback); void* uasync_set_timeout(struct UASYNC* ua, int timeout_tb, void* user_arg, timeout_callback_t callback);
err_t uasync_cancel_timeout(struct UASYNC* ua, void* t_id); err_t uasync_cancel_timeout(struct UASYNC* ua, void* t_id);
// Immediate execution in next mainloop (FIFO order)
void* uasync_call_soon(struct UASYNC* ua, void* user_arg, timeout_callback_t callback);
err_t uasync_call_soon_cancel(struct UASYNC* ua, void* t_id);
// Sockets - for regular file descriptors (pipe, file) // Sockets - for regular file descriptors (pipe, file)
void* uasync_add_socket(struct UASYNC* ua, int fd, socket_callback_t read_cbk, socket_callback_t write_cbk, socket_callback_t except_cbk, void* user_arg); void* uasync_add_socket(struct UASYNC* ua, int fd, socket_callback_t read_cbk, socket_callback_t write_cbk, socket_callback_t except_cbk, void* user_arg);
// Sockets - for socket_t (cross-platform sockets) // Sockets - for socket_t (cross-platform sockets)

10
src/control_server.c

@ -797,6 +797,16 @@ static void send_metrics(struct control_server* server, struct control_client* c
rsp->etcp.norm_out_total_pkts = 0; rsp->etcp.norm_out_total_pkts = 0;
rsp->etcp.norm_out_total_bytes = 0; rsp->etcp.norm_out_total_bytes = 0;
} }
/* ACK debug counters */
rsp->etcp.cnt_ack_hit_inf = conn->cnt_ack_hit_inf;
rsp->etcp.cnt_ack_hit_sndq = conn->cnt_ack_hit_sndq;
rsp->etcp.cnt_ack_miss = conn->cnt_ack_miss;
rsp->etcp.cnt_link_wait = conn->cnt_link_wait;
rsp->etcp.tx_state = conn->tx_state;
for (int i = 0; i < 8; i++) {
rsp->etcp.debug[i] = conn->debug[i];
}
/* Fill TUN metrics */ /* Fill TUN metrics */
if (instance->tun) { if (instance->tun) {

34
src/etcp.c

@ -391,7 +391,18 @@ int etcp_int_send(struct ETCP_CONN* etcp, const void* data, uint16_t len) {
static void input_queue_try_resume(struct ETCP_CONN* etcp) { static void input_queue_try_push(struct ETCP_CONN* etcp) {// пробуем протолкнуть при отправке
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
// когда очередь отправки пуста - пробуем взять новый пакет на обработку
size_t wait_ack_bytes = queue_total_bytes(etcp->input_wait_ack);
if (wait_ack_bytes <= etcp->optimal_inflight) {
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] resume input queue: inflight_bytes=%d, input_len=%d", etcp->log_name, wait_ack_bytes, etcp->input_queue->total_bytes);
queue_resume_callback(etcp->input_queue);// и только когда больше нечего отправлять - забираем новый пакет
}
}
static void input_queue_try_resume(struct ETCP_CONN* etcp) {// при ACK
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
// Сперва отправим всё из очереди отправки // Сперва отправим всё из очереди отправки
@ -590,6 +601,8 @@ struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
struct ETCP_LINK* link = etcp_loadbalancer_select_link(etcp); struct ETCP_LINK* link = etcp_loadbalancer_select_link(etcp);
if (!link) { if (!link) {
etcp->tx_state=ETCP_TX_STATE_WAIT_LINKS;
etcp->cnt_link_wait++;
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] no link available", etcp->log_name); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] no link available", etcp->log_name);
return NULL;// если линков нет - ждём появления свободного return NULL;// если линков нет - ждём появления свободного
} }
@ -598,8 +611,7 @@ struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) {
if (send_q_size == 0) {// сгребаем из input_queue if (send_q_size == 0) {// сгребаем из input_queue
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: input_send_q empty, check if avail input_queue -> inflight"); // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: input_send_q empty, check if avail input_queue -> inflight");
input_queue_try_resume(etcp); input_queue_try_push(etcp);
return NULL;
} }
// First, check if there's a packet in input_send_q (retrans or new) // First, check if there's a packet in input_send_q (retrans or new)
@ -618,12 +630,14 @@ struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) {
if (!inf_pkt && ack_q_size == 0) { if (!inf_pkt && ack_q_size == 0) {
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] no data/ack to send", etcp->log_name); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] no data/ack to send", etcp->log_name);
etcp->tx_state=ETCP_TX_STATE_NO_DATA;
return NULL; return NULL;
} }
struct ETCP_DGRAM* dgram = memory_pool_alloc(etcp->instance->pkt_pool); struct ETCP_DGRAM* dgram = memory_pool_alloc(etcp->instance->pkt_pool);
if (!dgram) { if (!dgram) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to allocate ETCP_DGRAM", etcp->log_name); DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to allocate ETCP_DGRAM", etcp->log_name);
etcp->tx_state=ETCP_TX_STATE_ERR_MEM;
return NULL; return NULL;
} }
@ -724,7 +738,8 @@ static void etcp_link_ready_callback(struct ETCP_CONN* etcp) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
if (!etcp) return; if (!etcp) return;
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_link_ready_callback: processing send queue for etcp=%p", etcp); // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_link_ready_callback: processing send queue for etcp=%p", etcp);
etcp_conn_process_send_queue(etcp); queue_resume_callback(etcp->input_send_q);
// etcp_conn_process_send_queue(etcp);
} }
// Process packets in send queue and transmit them // Process packets in send queue and transmit them
@ -806,11 +821,16 @@ void etcp_ack_recv(struct ETCP_CONN* etcp, uint32_t seq, uint16_t ts, uint16_t d
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] processing ACK for seq=%u, ts=%u, dts=%u", etcp->log_name, seq, ts, dts); // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] processing ACK for seq=%u, ts=%u, dts=%u", etcp->log_name, seq, ts, dts);
// Find the acknowledged packet in the wait_ack queue // Find the acknowledged packet in the wait_ack queue
struct INFLIGHT_PACKET* acked_pkt = (struct INFLIGHT_PACKET*)queue_find_data_by_id(etcp->input_wait_ack, seq); struct INFLIGHT_PACKET* acked_pkt;
if (acked_pkt) queue_remove_data(etcp->input_wait_ack, (struct ll_entry*)acked_pkt); if (acked_pkt = (struct INFLIGHT_PACKET*)queue_find_data_by_id(etcp->input_wait_ack, seq)) {
else { acked_pkt = (struct INFLIGHT_PACKET*)queue_find_data_by_id(etcp->input_send_q, seq); etcp->cnt_ack_hit_inf++;
queue_remove_data(etcp->input_wait_ack, (struct ll_entry*)acked_pkt);
}
else if ( acked_pkt = (struct INFLIGHT_PACKET*)queue_find_data_by_id(etcp->input_send_q, seq) ) {
etcp->cnt_ack_hit_sndq++;
queue_remove_data(etcp->input_send_q, (struct ll_entry*)acked_pkt); queue_remove_data(etcp->input_send_q, (struct ll_entry*)acked_pkt);
} }
else etcp->cnt_ack_miss++;
if (!acked_pkt) { if (!acked_pkt) {
// Packet might be already acknowledged or not found // Packet might be already acknowledged or not found

13
src/etcp.h

@ -62,6 +62,10 @@ struct ACK_PACKET {
uint32_t recv_timestamp;// время приема (локальное) uint32_t recv_timestamp;// время приема (локальное)
}; };
#define ETCP_TX_STATE_WAIT_LINKS 1
#define ETCP_TX_STATE_NO_DATA 2
#define ETCP_TX_STATE_ERR_MEM 100
// ETCP connection structure (refactored) // ETCP connection structure (refactored)
struct ETCP_CONN { struct ETCP_CONN {
struct ETCP_CONN* next; struct ETCP_CONN* next;
@ -98,7 +102,6 @@ struct ETCP_CONN {
// IDs and state // IDs and state
uint8_t got_initial_pkt; //
uint32_t next_tx_id; // Next TX ID uint32_t next_tx_id; // Next TX ID
uint32_t last_rx_id; // Last received ID uint32_t last_rx_id; // Last received ID
uint32_t last_delivered_id; // Last delivered to output_queue uint32_t last_delivered_id; // Last delivered to output_queue
@ -141,6 +144,14 @@ struct ETCP_CONN {
// Flags // Flags
uint8_t routing_exchange_active; // 0 - не активен, 1 - надо инициировать обмен маршрутами (клиент), 2 - обмен маршрутами активен uint8_t routing_exchange_active; // 0 - не активен, 1 - надо инициировать обмен маршрутами (клиент), 2 - обмен маршрутами активен
uint8_t got_initial_pkt; //
uint32_t cnt_ack_hit_inf; // счетчик удлений из inflight
uint32_t cnt_ack_hit_sndq; // счетчик удалений inflight пакетов из sndq
uint32_t cnt_ack_miss; // счетчик не найденных ack
uint32_t cnt_link_wait; // счетчик переходов в ожидание когда link busy
uint32_t tx_state; // 1 - wait link ready, 2
uint32_t debug[8]; // 8 значений для дебага (live watch)
// Logging identifier (format: "XXXX→XXXX" - last 4 digits of local and peer node_id) // Logging identifier (format: "XXXX→XXXX" - last 4 digits of local and peer node_id)
char log_name[16]; char log_name[16];

67
tools/etcpmon/etcpmon_gui.c

@ -19,7 +19,7 @@
//#pragma comment(lib, "user32.lib") //#pragma comment(lib, "user32.lib")
//#pragma comment(lib, "gdi32.lib") //#pragma comment(lib, "gdi32.lib")
#define WINDOW_WIDTH 900 #define WINDOW_WIDTH 900
#define WINDOW_HEIGHT 1150 #define WINDOW_HEIGHT 1250
#define UPDATE_INTERVAL 10 /* 50ms → 20 samples per second */ #define UPDATE_INTERVAL 10 /* 50ms → 20 samples per second */
/* Global app pointer for callbacks */ /* Global app pointer for callbacks */
static struct etcpmon_app* g_app = NULL; static struct etcpmon_app* g_app = NULL;
@ -436,7 +436,7 @@ static void CreateControls(struct etcpmon_app* app) {
y = 915; y = 915;
CreateWindowExA(0, "BUTTON", "Queues & Errors", CreateWindowExA(0, "BUTTON", "Queues & Errors",
WS_CHILD | WS_VISIBLE | BS_GROUPBOX, WS_CHILD | WS_VISIBLE | BS_GROUPBOX,
10, y, 880, 235, hWnd, (HMENU)IDC_STATIC_QUEUES, hInst, NULL); 10, y, 880, 335, hWnd, (HMENU)IDC_STATIC_QUEUES, hInst, NULL);
int qy = y + 22; int qy = y + 22;
int q_col1 = 20; int q_col1 = 20;
@ -612,6 +612,49 @@ static void CreateControls(struct etcpmon_app* app) {
app->hEditNormOutTotBytes = CreateWindowExA(WS_EX_CLIENTEDGE, "EDIT", "", app->hEditNormOutTotBytes = CreateWindowExA(WS_EX_CLIENTEDGE, "EDIT", "",
WS_CHILD | WS_VISIBLE | ES_READONLY | ES_CENTER, WS_CHILD | WS_VISIBLE | ES_READONLY | ES_CENTER,
q_col1 + 265, qy - 2, 70, 18, hWnd, (HMENU)IDC_EDIT_NORM_OUT_TOT_BYTES, hInst, NULL); q_col1 + 265, qy - 2, 70, 18, hWnd, (HMENU)IDC_EDIT_NORM_OUT_TOT_BYTES, hInst, NULL);
/* ACK Debug section */
qy += 25;
CreateWindowExA(0, "STATIC", "ACK Debug:",
WS_CHILD | WS_VISIBLE, q_col1, qy, 70, 16, hWnd, (HMENU)IDC_STATIC, hInst, NULL);
CreateWindowExA(0, "STATIC", "HitInf:",
WS_CHILD | WS_VISIBLE, q_col1 + 80, qy, 40, 16, hWnd, (HMENU)IDC_STATIC, hInst, NULL);
app->hEditAckHitInf = CreateWindowExA(WS_EX_CLIENTEDGE, "EDIT", "",
WS_CHILD | WS_VISIBLE | ES_READONLY | ES_CENTER,
q_col1 + 125, qy - 2, 60, 18, hWnd, (HMENU)IDC_EDIT_ACK_HIT_INF, hInst, NULL);
CreateWindowExA(0, "STATIC", "HitSndQ:",
WS_CHILD | WS_VISIBLE, q_col1 + 195, qy, 50, 16, hWnd, (HMENU)IDC_STATIC, hInst, NULL);
app->hEditAckHitSndq = CreateWindowExA(WS_EX_CLIENTEDGE, "EDIT", "",
WS_CHILD | WS_VISIBLE | ES_READONLY | ES_CENTER,
q_col1 + 250, qy - 2, 60, 18, hWnd, (HMENU)IDC_EDIT_ACK_HIT_SNDQ, hInst, NULL);
CreateWindowExA(0, "STATIC", "Miss:",
WS_CHILD | WS_VISIBLE, q_col2 + 80, qy, 40, 16, hWnd, (HMENU)IDC_STATIC, hInst, NULL);
app->hEditAckMiss = CreateWindowExA(WS_EX_CLIENTEDGE, "EDIT", "",
WS_CHILD | WS_VISIBLE | ES_READONLY | ES_CENTER,
q_col2 + 125, qy - 2, 60, 18, hWnd, (HMENU)IDC_EDIT_ACK_MISS, hInst, NULL);
CreateWindowExA(0, "STATIC", "LnkWait:",
WS_CHILD | WS_VISIBLE, q_col2 + 195, qy, 50, 16, hWnd, (HMENU)IDC_STATIC, hInst, NULL);
app->hEditCntLinkWait = CreateWindowExA(WS_EX_CLIENTEDGE, "EDIT", "",
WS_CHILD | WS_VISIBLE | ES_READONLY | ES_CENTER,
q_col2 + 250, qy - 2, 60, 18, hWnd, (HMENU)IDC_EDIT_CNT_LINK_WAIT, hInst, NULL);
CreateWindowExA(0, "STATIC", "TxState:",
WS_CHILD | WS_VISIBLE, q_col3 + 80, qy, 40, 16, hWnd, (HMENU)IDC_STATIC, hInst, NULL);
app->hEditTxState = CreateWindowExA(WS_EX_CLIENTEDGE, "EDIT", "",
WS_CHILD | WS_VISIBLE | ES_READONLY | ES_CENTER,
q_col3 + 125, qy - 2, 60, 18, hWnd, (HMENU)IDC_EDIT_TX_STATE, hInst, NULL);
qy += 22;
CreateWindowExA(0, "STATIC", "Debug:",
WS_CHILD | WS_VISIBLE, q_col1 + 30, qy, 50, 16, hWnd, (HMENU)IDC_STATIC, hInst, NULL);
for (int i = 0; i < 8; i++) {
app->hEditDebug[i] = CreateWindowExA(WS_EX_CLIENTEDGE, "EDIT", "",
WS_CHILD | WS_VISIBLE | ES_READONLY | ES_CENTER,
q_col1 + 85 + i * 65, qy - 2, 60, 18, hWnd, (HMENU)(IDC_EDIT_DEBUG_0 + i), hInst, NULL);
}
} }
static LRESULT CALLBACK WndProc(HWND hWnd, UINT message, WPARAM wParam, LPARAM lParam) { static LRESULT CALLBACK WndProc(HWND hWnd, UINT message, WPARAM wParam, LPARAM lParam) {
struct etcpmon_app* app = NULL; struct etcpmon_app* app = NULL;
@ -949,6 +992,16 @@ void etcpmon_gui_update_metrics(struct etcpmon_app* app,
UpdateEditIfChanged(hMain, IDC_EDIT_NORM_OUT_TOT_PKTS, "%llu", (unsigned long long)metrics->etcp.norm_out_total_pkts); UpdateEditIfChanged(hMain, IDC_EDIT_NORM_OUT_TOT_PKTS, "%llu", (unsigned long long)metrics->etcp.norm_out_total_pkts);
UpdateEditIfChanged(hMain, IDC_EDIT_NORM_OUT_TOT_BYTES, "%llu", (unsigned long long)metrics->etcp.norm_out_total_bytes); UpdateEditIfChanged(hMain, IDC_EDIT_NORM_OUT_TOT_BYTES, "%llu", (unsigned long long)metrics->etcp.norm_out_total_bytes);
/* ACK Debug counters */
UpdateEditIfChanged(hMain, IDC_EDIT_ACK_HIT_INF, "%u", metrics->etcp.cnt_ack_hit_inf);
UpdateEditIfChanged(hMain, IDC_EDIT_ACK_HIT_SNDQ, "%u", metrics->etcp.cnt_ack_hit_sndq);
UpdateEditIfChanged(hMain, IDC_EDIT_ACK_MISS, "%u", metrics->etcp.cnt_ack_miss);
UpdateEditIfChanged(hMain, IDC_EDIT_CNT_LINK_WAIT, "%u", metrics->etcp.cnt_link_wait);
UpdateEditIfChanged(hMain, IDC_EDIT_TX_STATE, "%u", metrics->etcp.tx_state);
for (int i = 0; i < 8; i++) {
UpdateEditIfChanged(hMain, IDC_EDIT_DEBUG_0 + i, "%u", metrics->etcp.debug[i]);
}
/* Links list */ /* Links list */
if (app->hListLinks) { if (app->hListLinks) {
SendMessage(app->hListLinks, LB_RESETCONTENT, 0, 0); SendMessage(app->hListLinks, LB_RESETCONTENT, 0, 0);
@ -1064,6 +1117,16 @@ void etcpmon_gui_clear_metrics(struct etcpmon_app* app) {
SetDlgItemTextA(app->hWndMain, IDC_EDIT_NORM_IN_TOT_BYTES, ""); SetDlgItemTextA(app->hWndMain, IDC_EDIT_NORM_IN_TOT_BYTES, "");
SetDlgItemTextA(app->hWndMain, IDC_EDIT_NORM_OUT_TOT_PKTS, ""); SetDlgItemTextA(app->hWndMain, IDC_EDIT_NORM_OUT_TOT_PKTS, "");
SetDlgItemTextA(app->hWndMain, IDC_EDIT_NORM_OUT_TOT_BYTES, ""); SetDlgItemTextA(app->hWndMain, IDC_EDIT_NORM_OUT_TOT_BYTES, "");
/* ACK Debug counters */
SetDlgItemTextA(app->hWndMain, IDC_EDIT_ACK_HIT_INF, "");
SetDlgItemTextA(app->hWndMain, IDC_EDIT_ACK_HIT_SNDQ, "");
SetDlgItemTextA(app->hWndMain, IDC_EDIT_ACK_MISS, "");
SetDlgItemTextA(app->hWndMain, IDC_EDIT_CNT_LINK_WAIT, "");
SetDlgItemTextA(app->hWndMain, IDC_EDIT_TX_STATE, "");
for (int i = 0; i < 8; i++) {
SetDlgItemTextA(app->hWndMain, IDC_EDIT_DEBUG_0 + i, "");
}
if (app->hListLinks) { if (app->hListLinks) {
SendMessage(app->hListLinks, LB_RESETCONTENT, 0, 0); SendMessage(app->hListLinks, LB_RESETCONTENT, 0, 0);

24
tools/etcpmon/etcpmon_gui.h

@ -125,6 +125,22 @@ extern "C" {
#define IDC_EDIT_NORM_OUT_TOT_PKTS 672 #define IDC_EDIT_NORM_OUT_TOT_PKTS 672
#define IDC_EDIT_NORM_OUT_TOT_BYTES 673 #define IDC_EDIT_NORM_OUT_TOT_BYTES 673
/* ACK Debug IDs */
#define IDC_STATIC_ACKDBG 680
#define IDC_EDIT_ACK_HIT_INF 681
#define IDC_EDIT_ACK_HIT_SNDQ 682
#define IDC_EDIT_ACK_MISS 683
#define IDC_EDIT_CNT_LINK_WAIT 684
#define IDC_EDIT_TX_STATE 685
#define IDC_EDIT_DEBUG_0 686
#define IDC_EDIT_DEBUG_1 687
#define IDC_EDIT_DEBUG_2 688
#define IDC_EDIT_DEBUG_3 689
#define IDC_EDIT_DEBUG_4 690
#define IDC_EDIT_DEBUG_5 691
#define IDC_EDIT_DEBUG_6 692
#define IDC_EDIT_DEBUG_7 693
/* Graph control ID */ /* Graph control ID */
#define IDC_GRAPH 500 #define IDC_GRAPH 500
@ -251,6 +267,14 @@ struct etcpmon_app {
HWND hEditNormInTotBytes; HWND hEditNormInTotBytes;
HWND hEditNormOutTotPkts; HWND hEditNormOutTotPkts;
HWND hEditNormOutTotBytes; HWND hEditNormOutTotBytes;
/* ACK Debug counters */
HWND hEditAckHitInf;
HWND hEditAckHitSndq;
HWND hEditAckMiss;
HWND hEditCntLinkWait;
HWND hEditTxState;
HWND hEditDebug[8];
/* Graph controls */ /* Graph controls */
HWND hGraphWnd; HWND hGraphWnd;

8
tools/etcpmon/etcpmon_protocol.h

@ -155,6 +155,14 @@ struct etcpmon_etcp_metrics {
uint64_t norm_in_total_bytes; uint64_t norm_in_total_bytes;
uint64_t norm_out_total_pkts; uint64_t norm_out_total_pkts;
uint64_t norm_out_total_bytes; uint64_t norm_out_total_bytes;
/* ACK debug counters */
uint32_t cnt_ack_hit_inf;
uint32_t cnt_ack_hit_sndq;
uint32_t cnt_ack_miss;
uint32_t cnt_link_wait;
uint32_t tx_state;
uint32_t debug[8];
}; };
/* Link metrics */ /* Link metrics */

Loading…
Cancel
Save