Browse Source

+link stats

nodeinfo-routing-update
jeka 2 weeks ago
parent
commit
ec2a075f79
  1. 22
      src/etcp.c
  2. 76
      src/etcp_connections.c
  3. 29
      src/etcp_connections.h
  4. 4
      src/etcp_loadbalancer.c

22
src/etcp.c

@ -268,8 +268,15 @@ void etcp_conn_reset(struct ETCP_CONN* etcp) {
clear_queue(etcp->recv_q);
clear_queue(etcp->ack_q);
// В etcp_conn_reset(), после очистки очередей добавьте:
struct ETCP_LINK* l = etcp->links;
while (l) {
l->window_pkt_transmitted = 0;
l->window_retransmissions = 0;
l->win_timebase = 50000;
l->win_ptr = 0;
memset(l->stat_win, 0, sizeof(l->stat_win));
start_stats_timer(l); // перезапускаем с дефолтным интервалом
l->inflight_bytes = 0;
l->inflight_packets = 0;
l = l->next;
@ -738,27 +745,24 @@ struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) {
// Always subtract from previous last_link (if any) – this handles retransmission
if (inf_pkt->last_link) {
// if (inf_pkt->last_link->inflight_bytes >= inf_pkt->ll.len) {
inf_pkt->last_link->inflight_bytes -= inf_pkt->ll.len;
// } else {
// inf_pkt->last_link->inflight_bytes = 0;
// }
// if (inf_pkt->last_link->inflight_packets > 0) {
inf_pkt->last_link->inflight_packets--;
// }
inf_pkt->last_link->window_retransmissions++;
inf_pkt->last_link->total_retransmissions++;
inf_pkt->last_link->inflight_bytes -= inf_pkt->ll.len;
inf_pkt->last_link->inflight_packets--;
}
// Always add to the CURRENT link (first send or retransmission)
link->inflight_bytes += inf_pkt->ll.len;
link->inflight_packets++;
link->window_pkt_transmitted++;
// Update last_link for future ACK/retrans
inf_pkt->last_link = link;
inf_pkt->last_timestamp=now;
inf_pkt->send_count++;
inf_pkt->state=INFLIGHT_STATE_WAIT_ACK;
queue_data_put(etcp->input_wait_ack, &inf_pkt->ll, inf_pkt->seq);// move dgram to wait_ack queue
}
size_t ack_q_size = queue_entry_count(etcp->ack_q);

76
src/etcp_connections.c

@ -28,6 +28,8 @@ static void etcp_link_send_init(struct ETCP_LINK* link, uint8_t reset);
static void etcp_link_init_timer_cbk(void* arg);
static void etcp_link_send_keepalive(struct ETCP_LINK* link);
static void keepalive_timer_cb(void* arg);
static void link_stats_timer_cb(void* arg);
void etcp_link_update_inflight_lim(struct ETCP_LINK* link, uint32_t new_lim) {
if (!link) return;
@ -389,7 +391,7 @@ int etcp_find_free_local_link_id(struct ETCP_CONN* etcp) {
// ===============================
struct ETCP_SOCKET* etcp_socket_add(struct UTUN_INSTANCE* instance, struct sockaddr_storage* ip, uint32_t netif_index, int so_mark, uint8_t type, int mtu, int loss_rate) {
struct ETCP_SOCKET* etcp_socket_add(struct UTUN_INSTANCE* instance, struct sockaddr_storage* ip, uint32_t netif_index, int so_mark, uint8_t type, int mtu, int loss_rate, char* name) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!instance) return NULL;
@ -399,6 +401,13 @@ struct ETCP_SOCKET* etcp_socket_add(struct UTUN_INSTANCE* instance, struct socka
return NULL;
}
e_sock->fd = SOCKET_INVALID; // Initialize to invalid socket
if (name && name[0]) {
strncpy(e_sock->name, name, MAX_CONN_NAME_LEN - 1);
e_sock->name[MAX_CONN_NAME_LEN - 1] = '\0';
} else {
e_sock->name[0] = '\0';
}
int family = AF_INET;
if (ip) {
@ -587,6 +596,16 @@ struct ETCP_LINK* etcp_link_new(struct ETCP_CONN* etcp, struct ETCP_SOCKET* conn
link->rtt_max_idx = 0;
// rtt_history[] is already zeroed by calloc
// Инициализация статистики
link->win_timebase = 50000; // 50 ms в микросекундах
link->win_ptr = 0;
link->window_pkt_transmitted = 0;
link->window_retransmissions = 0;
link->total_retransmissions = 0;
memset(link->stat_win, 0, sizeof(link->stat_win));
start_stats_timer(link);
// insert_link(conn, link);
if (insert_link(conn, link) < 0) {
// откатываем то, что успели
@ -619,6 +638,11 @@ void etcp_link_close(struct ETCP_LINK* link) {
return;
}
if (link->stats_timer) {
uasync_cancel_timeout(link->etcp->instance->ua, link->stats_timer);
link->stats_timer = NULL;
}
// Cancel init timer if active
if (link->init_timer) {
uasync_cancel_timeout(link->etcp->instance->ua, link->init_timer);
@ -652,6 +676,54 @@ void etcp_link_close(struct ETCP_LINK* link) {
u_free(link);
}
void start_stats_timer(struct ETCP_LINK* link) {
if (!link || !link->etcp || !link->etcp->instance) return;
if (link->stats_timer) {
uasync_cancel_timeout(link->etcp->instance->ua, link->stats_timer);
}
uint32_t tb = link->win_timebase / 100; // us → 0.1 ms units
if (tb < 50) tb = 50; // минимум 5 ms
if (tb > 5000) tb = 5000; // max 500 ms
link->stats_timer = uasync_set_timeout(
link->etcp->instance->ua,
tb,
link,
link_stats_timer_cb
);
}
// === Новый callback таймера ===
static void link_stats_timer_cb(void* arg) {
struct ETCP_LINK* link = (struct ETCP_LINK*)arg;
if (!link || !link->etcp) return;
// 1. Сохраняем снимок текущего окна
link->stat_win[link->win_ptr].rtt = link->rtt_avg10;
link->stat_win[link->win_ptr].pkt_loss = (uint16_t)link->window_retransmissions;
link->stat_win[link->win_ptr].pkt_transmitted = link->window_pkt_transmitted;
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] stats window updated (win_timebase=%u us, rtt=%u, retrans=%u, transmitted=%u)",
link->etcp->log_name, link->win_timebase, link->rtt_avg10, link->window_retransmissions, link->window_pkt_transmitted);
// 2. Переходим к следующему слоту
link->win_ptr = (link->win_ptr + 1) % 32;
// 3. Обнуляем накопители для нового окна
link->window_pkt_transmitted = 0;
link->window_retransmissions = 0;
// 4. Плавная подстройка win_timebase под rtt/2 (в микросекундах)
uint32_t target_us = (uint32_t)link->rtt_avg10 * 50ULL; // rtt_avg10 (0.1 ms) → rtt/2 в us
if (target_us < 10000) target_us = 10000; // минимум 10 ms
if (target_us > 500000) target_us = 500000; // максимум 0.5 s
link->win_timebase = (link->win_timebase * 7 + target_us) / 8;
// 5. Перезапускаем таймер с новым интервалом
start_stats_timer(link);
}
int etcp_encrypt_send(struct ETCP_DGRAM* dgram) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
@ -1127,7 +1199,7 @@ int init_connections(struct UTUN_INSTANCE* instance) {
}
}
struct ETCP_SOCKET* e_sock = etcp_socket_add(instance, &server->ip, server->netif_index, server->so_mark, server->type, server->mtu ? server->mtu : instance->config->global.mtu, server->loss_rate);
struct ETCP_SOCKET* e_sock = etcp_socket_add(instance, &server->ip, server->netif_index, server->so_mark, server->type, server->mtu ? server->mtu : instance->config->global.mtu, server->loss_rate, server->name);
if (e_sock && default_ip != 0) {
struct in_addr addr;
addr.s_addr = default_ip;

29
src/etcp_connections.h

@ -32,6 +32,7 @@ struct ETCP_DGRAM {// пакет (незашифрованный)
struct ETCP_SOCKET {
struct ETCP_SOCKET* next; // Linked list для всех соединений
struct UTUN_INSTANCE* instance;
char name[MAX_CONN_NAME_LEN]; // Socket name from config (e.g., "lan1" from [server: lan1])
socket_t fd; // UDP socket (cross-platform)
struct sockaddr_storage local_addr; // Локальный адрес
int mtu; // MTU для этого сокета
@ -95,18 +96,33 @@ struct ETCP_LINK {
uint32_t inflight_lim_bytes;
/* Биты блокировки отправки по причине (для корректного resume) */
unsigned send_blocked_input_queue : 1;
unsigned send_blocked_sack_full : 1;
unsigned send_blocked_bandwidth : 1;
// unsigned send_blocked_input_queue : 1;
// unsigned send_blocked_sack_full : 1;
// unsigned send_blocked_bandwidth : 1;
unsigned send_blocked_inflight : 1;
unsigned send_blocked_other : 4;
// unsigned send_blocked_other : 4;
// lim периодически обновляется. = (rtt_avg10 + jitter * K2[=1.0]) * K1[=1.5] * channel_bandwidth
uint32_t win_timebase; // x1 us (по умолчанию 50 000 = 50 ms, потом подстраивается под rtt/2)
uint32_t window_pkt_transmitted; // текущие накопители для следующего окна (обнуляются в таймере)
uint32_t window_retransmissions;
void* stats_timer; // Таймер для обновления окна
uint32_t win_ptr; // текущий слот в кольцевом буфере (0..31)
struct {
uint16_t rtt; // x 0.1ms (avg10 в конеце окна)
// uint16_t avg_rtt; // x 0.1ms (avg10 усредненный за период)
uint16_t pkt_loss; // штук (за период)
uint32_t pkt_transmitted; // штук (за период)
} stat_win[32]; // последние 32 замера статистики
// statistics
size_t encrypt_errors;
size_t decrypt_errors;
size_t send_errors;
size_t recv_errors;
size_t total_encrypted;
size_t total_decrypted;
uint32_t total_retransmissions;
uint16_t rtt_last; // round trip (время отправки + приёма)
uint16_t rtt_history[10]; // Circular buffer for last 10 RTT values
@ -142,7 +158,7 @@ int init_connections(struct UTUN_INSTANCE* instance);
// SOCKET FUNCTIONS
// добавляет новый версер (сокет для приёма и отправки кодограмм. обслуживает много подключений)
struct ETCP_SOCKET* etcp_socket_add(struct UTUN_INSTANCE* instance, struct sockaddr_storage* ip, uint32_t netif_index, int so_mark, uint8_t type, int mtu, int loss_rate);
struct ETCP_SOCKET* etcp_socket_add(struct UTUN_INSTANCE* instance, struct sockaddr_storage* ip, uint32_t netif_index, int so_mark, uint8_t type, int mtu, int loss_rate, char* name);
// удаляет сокет и освобождает ресурсы (грохает все его подключения и сокет)
void etcp_socket_remove(struct ETCP_SOCKET* conn);
@ -162,5 +178,6 @@ void etcp_link_update_inflight_lim(struct ETCP_LINK* link, uint32_t new_lim);
// scans all links in connection, marks used ids in bit array
// returns first free id (0-255) or -1 if all occupied
int etcp_find_free_local_link_id(struct ETCP_CONN* etcp);
void start_stats_timer(struct ETCP_LINK* link);
#endif // ETCP_CONNECTIONS_H

4
src/etcp_loadbalancer.c

@ -158,10 +158,10 @@ int loadbalancer_link_can_send(struct ETCP_LINK* link) {
link->send_blocked_inflight = 0;
}
if (link->shaper_timer) {
link->send_blocked_bandwidth = 1;
// link->send_blocked_bandwidth = 1;
can = 0;
} else {
link->send_blocked_bandwidth = 0;
// link->send_blocked_bandwidth = 0;
}
return can;
}

Loading…
Cancel
Save