diff --git a/src/etcp_connections.c b/src/etcp_connections.c index 53e1afa..81b1d56 100644 --- a/src/etcp_connections.c +++ b/src/etcp_connections.c @@ -29,6 +29,17 @@ static void etcp_link_init_timer_cbk(void* arg); static void etcp_link_send_keepalive(struct ETCP_LINK* link); static void keepalive_timer_cb(void* arg); +void etcp_link_update_inflight_lim(struct ETCP_LINK* link, uint32_t new_lim) { + if (!link) return; + uint32_t old = link->inflight_lim_bytes; + link->inflight_lim_bytes = new_lim; + if (old != new_lim && link->inflight_bytes < new_lim && link->send_blocked_inflight) { + link->send_blocked_inflight = 0; + loadbalancer_link_ready(link); + DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_link_update_inflight_lim: unblocked link (lim %u->%u)", old, new_lim); + } +} + #define INIT_TIMEOUT_INITIAL 500 #define INIT_TIMEOUT_MAX 50000 @@ -552,6 +563,7 @@ struct ETCP_LINK* etcp_link_new(struct ETCP_CONN* etcp, struct ETCP_SOCKET* conn if (link->keepalive_interval < 10) link->keepalive_interval = 10; link->keepalive_sent_count = 0; link->keepalive_recv_count = 0; + link->inflight_lim_bytes = 30000; // Выделяем свободный local_link_id int free_id = etcp_find_free_local_link_id(etcp); diff --git a/src/etcp_connections.h b/src/etcp_connections.h index ded8939..d39ea09 100644 --- a/src/etcp_connections.h +++ b/src/etcp_connections.h @@ -92,6 +92,14 @@ struct ETCP_LINK { uint32_t inflight_bytes; uint32_t inflight_packets; + uint32_t inflight_lim_bytes; + + /* Биты блокировки отправки по причине (для корректного resume) */ + unsigned send_blocked_input_queue : 1; + unsigned send_blocked_sack_full : 1; + unsigned send_blocked_bandwidth : 1; + unsigned send_blocked_inflight : 1; + unsigned send_blocked_other : 4; size_t encrypt_errors; size_t decrypt_errors; @@ -147,6 +155,9 @@ int etcp_encrypt_send(struct ETCP_DGRAM* dgram);// зашифровывает и // find link by address struct ETCP_LINK* etcp_link_find_by_addr(struct ETCP_SOCKET* e_sock, struct sockaddr_storage* addr); +// обновляет лимит inflight_bytes для линка и корректно снимает блокировку если нужно +void etcp_link_update_inflight_lim(struct ETCP_LINK* link, uint32_t new_lim); + // find free local_link_id for connection // scans all links in connection, marks used ids in bit array // returns first free id (0-255) or -1 if all occupied diff --git a/src/etcp_loadbalancer.c b/src/etcp_loadbalancer.c index 6a298f5..1588e4d 100644 --- a/src/etcp_loadbalancer.c +++ b/src/etcp_loadbalancer.c @@ -45,7 +45,11 @@ struct ETCP_LINK* etcp_loadbalancer_select_link(struct ETCP_CONN* etcp) { // link_index, link, link->initialized, link->shaper_timer==NULL?1:0, // (unsigned long long)link->shaper_load_time_tb, (unsigned long long)link->shaper_sub_nanotime); - if (!link->initialized || link->shaper_timer || link->link_status != 1) {// если установлен таймер - значит надо ждать. не рассматриваем этот линк + if (!link->initialized || link->link_status != 1) { + link = link->next; + continue; + } + if (loadbalancer_link_can_send(link) == 0) { link = link->next; continue; } @@ -144,22 +148,38 @@ void etcp_loadbalancer_send(struct ETCP_DGRAM* dgram) { memory_pool_free(etcp->instance->pkt_pool, dgram); } -// New: Notify when link is ready (called from timer or external) +int loadbalancer_link_can_send(struct ETCP_LINK* link) { + if (!link) return 0; + int can = 1; + if (link->inflight_bytes >= link->inflight_lim_bytes) { + link->send_blocked_inflight = 1; + can = 0; + } else { + link->send_blocked_inflight = 0; + } + if (link->shaper_timer) { + link->send_blocked_bandwidth = 1; + can = 0; + } else { + link->send_blocked_bandwidth = 0; + } + return can; +} + void loadbalancer_link_ready(struct ETCP_LINK* link) { if (!link || !link->etcp) { DEBUG_WARN(DEBUG_CATEGORY_ETCP, "loadbalancer_link_ready: invalid link (%p)", link); return; } - + if (loadbalancer_link_can_send(link) == 0) { + DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "loadbalancer_link_ready: link still blocked"); + return; + } DEBUG_INFO(DEBUG_CATEGORY_ETCP, "loadbalancer_link_ready: link=%p now ready, notifying ETCP_CONN", link); - - // Call ETCP_CONN resume (assumes link_ready_for_send_fn in ETCP_CONN; add to etcp.h: void (*link_ready_for_send_fn)(struct ETCP_CONN*);) if (link->etcp->link_ready_for_send_fn) { link->etcp->link_ready_for_send_fn(link->etcp); } else { - // Fallback: request next packet (implement etcp_request_pkt in etcp.c if needed) - // etcp_request_pkt(link->etcp); - DEBUG_WARN(DEBUG_CATEGORY_ETCP, "loadbalancer_link_ready: no link_ready_for_send_fn set, skipping notify"); + DEBUG_WARN(DEBUG_CATEGORY_ETCP, "loadbalancer_link_ready: no link_ready_for_send_fn set"); } } diff --git a/src/etcp_loadbalancer.h b/src/etcp_loadbalancer.h index 594484c..47c578e 100644 --- a/src/etcp_loadbalancer.h +++ b/src/etcp_loadbalancer.h @@ -23,9 +23,12 @@ struct ETCP_LINK* etcp_loadbalancer_select_link(struct ETCP_CONN* etcp); //void etcp_loadbalancer_update_after_send(struct ETCP_LINK* link, size_t pkt_size); - это надо заменить на: void etcp_loadbalancer_send(struct ETCP_DGRAM* dgram); -// сообщаем в loadbalancer о готовности линка +// сообщаем в loadbalancer о готовности линка (вызывается из таймера, ACK, изменения лимита) void loadbalancer_link_ready(struct ETCP_LINK* link); +// проверяет все условия блокировки отправки (включая inflight_bytes) +int loadbalancer_link_can_send(struct ETCP_LINK* link); + // Получить состояние связи ETCP: 1 - есть живой линк, 0 - все недоступны int etcp_loadbalancer_get_link_status(struct ETCP_CONN* etcp);