Browse Source

add per-connection inflight limit

nodeinfo-routing-update
Evgeny 2 weeks ago
parent
commit
30a76bd576
  1. 12
      src/etcp_connections.c
  2. 11
      src/etcp_connections.h
  3. 36
      src/etcp_loadbalancer.c
  4. 5
      src/etcp_loadbalancer.h

12
src/etcp_connections.c

@ -29,6 +29,17 @@ static void etcp_link_init_timer_cbk(void* arg);
static void etcp_link_send_keepalive(struct ETCP_LINK* link);
static void keepalive_timer_cb(void* arg);
void etcp_link_update_inflight_lim(struct ETCP_LINK* link, uint32_t new_lim) {
if (!link) return;
uint32_t old = link->inflight_lim_bytes;
link->inflight_lim_bytes = new_lim;
if (old != new_lim && link->inflight_bytes < new_lim && link->send_blocked_inflight) {
link->send_blocked_inflight = 0;
loadbalancer_link_ready(link);
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_link_update_inflight_lim: unblocked link (lim %u->%u)", old, new_lim);
}
}
#define INIT_TIMEOUT_INITIAL 500
#define INIT_TIMEOUT_MAX 50000
@ -552,6 +563,7 @@ struct ETCP_LINK* etcp_link_new(struct ETCP_CONN* etcp, struct ETCP_SOCKET* conn
if (link->keepalive_interval < 10) link->keepalive_interval = 10;
link->keepalive_sent_count = 0;
link->keepalive_recv_count = 0;
link->inflight_lim_bytes = 30000;
// Выделяем свободный local_link_id
int free_id = etcp_find_free_local_link_id(etcp);

11
src/etcp_connections.h

@ -92,6 +92,14 @@ struct ETCP_LINK {
uint32_t inflight_bytes;
uint32_t inflight_packets;
uint32_t inflight_lim_bytes;
/* Биты блокировки отправки по причине (для корректного resume) */
unsigned send_blocked_input_queue : 1;
unsigned send_blocked_sack_full : 1;
unsigned send_blocked_bandwidth : 1;
unsigned send_blocked_inflight : 1;
unsigned send_blocked_other : 4;
size_t encrypt_errors;
size_t decrypt_errors;
@ -147,6 +155,9 @@ int etcp_encrypt_send(struct ETCP_DGRAM* dgram);// зашифровывает и
// find link by address
struct ETCP_LINK* etcp_link_find_by_addr(struct ETCP_SOCKET* e_sock, struct sockaddr_storage* addr);
// обновляет лимит inflight_bytes для линка и корректно снимает блокировку если нужно
void etcp_link_update_inflight_lim(struct ETCP_LINK* link, uint32_t new_lim);
// find free local_link_id for connection
// scans all links in connection, marks used ids in bit array
// returns first free id (0-255) or -1 if all occupied

36
src/etcp_loadbalancer.c

@ -45,7 +45,11 @@ struct ETCP_LINK* etcp_loadbalancer_select_link(struct ETCP_CONN* etcp) {
// link_index, link, link->initialized, link->shaper_timer==NULL?1:0,
// (unsigned long long)link->shaper_load_time_tb, (unsigned long long)link->shaper_sub_nanotime);
if (!link->initialized || link->shaper_timer || link->link_status != 1) {// если установлен таймер - значит надо ждать. не рассматриваем этот линк
if (!link->initialized || link->link_status != 1) {
link = link->next;
continue;
}
if (loadbalancer_link_can_send(link) == 0) {
link = link->next;
continue;
}
@ -144,22 +148,38 @@ void etcp_loadbalancer_send(struct ETCP_DGRAM* dgram) {
memory_pool_free(etcp->instance->pkt_pool, dgram);
}
// New: Notify when link is ready (called from timer or external)
int loadbalancer_link_can_send(struct ETCP_LINK* link) {
if (!link) return 0;
int can = 1;
if (link->inflight_bytes >= link->inflight_lim_bytes) {
link->send_blocked_inflight = 1;
can = 0;
} else {
link->send_blocked_inflight = 0;
}
if (link->shaper_timer) {
link->send_blocked_bandwidth = 1;
can = 0;
} else {
link->send_blocked_bandwidth = 0;
}
return can;
}
void loadbalancer_link_ready(struct ETCP_LINK* link) {
if (!link || !link->etcp) {
DEBUG_WARN(DEBUG_CATEGORY_ETCP, "loadbalancer_link_ready: invalid link (%p)", link);
return;
}
if (loadbalancer_link_can_send(link) == 0) {
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "loadbalancer_link_ready: link still blocked");
return;
}
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "loadbalancer_link_ready: link=%p now ready, notifying ETCP_CONN", link);
// Call ETCP_CONN resume (assumes link_ready_for_send_fn in ETCP_CONN; add to etcp.h: void (*link_ready_for_send_fn)(struct ETCP_CONN*);)
if (link->etcp->link_ready_for_send_fn) {
link->etcp->link_ready_for_send_fn(link->etcp);
} else {
// Fallback: request next packet (implement etcp_request_pkt in etcp.c if needed)
// etcp_request_pkt(link->etcp);
DEBUG_WARN(DEBUG_CATEGORY_ETCP, "loadbalancer_link_ready: no link_ready_for_send_fn set, skipping notify");
DEBUG_WARN(DEBUG_CATEGORY_ETCP, "loadbalancer_link_ready: no link_ready_for_send_fn set");
}
}

5
src/etcp_loadbalancer.h

@ -23,9 +23,12 @@ struct ETCP_LINK* etcp_loadbalancer_select_link(struct ETCP_CONN* etcp);
//void etcp_loadbalancer_update_after_send(struct ETCP_LINK* link, size_t pkt_size); - это надо заменить на:
void etcp_loadbalancer_send(struct ETCP_DGRAM* dgram);
// сообщаем в loadbalancer о готовности линка
// сообщаем в loadbalancer о готовности линка (вызывается из таймера, ACK, изменения лимита)
void loadbalancer_link_ready(struct ETCP_LINK* link);
// проверяет все условия блокировки отправки (включая inflight_bytes)
int loadbalancer_link_can_send(struct ETCP_LINK* link);
// Получить состояние связи ETCP: 1 - есть живой линк, 0 - все недоступны
int etcp_loadbalancer_get_link_status(struct ETCP_CONN* etcp);

Loading…
Cancel
Save