You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1086 lines
46 KiB
1086 lines
46 KiB
// etcp.c - ETCP Protocol Implementation (refactored and expanded based on etcp_protocol.txt) |
|
|
|
#include "etcp.h" |
|
#include "etcp_debug.h" |
|
#include "etcp_loadbalancer.h" |
|
#include "routing.h" |
|
#include "route_bgp.h" |
|
#include "../lib/u_async.h" |
|
#include "../lib/ll_queue.h" |
|
#include "../lib/debug_config.h" |
|
#include "crc32.h" // For potential hashing, though not used yet. |
|
#include <stdlib.h> |
|
#include <string.h> |
|
#include <sys/time.h> |
|
#include <math.h> // For bandwidth calcs |
|
#include <limits.h> // For UINT16_MAX |
|
#include "../lib/mem.h" |
|
|
|
// Enable comprehensive debug output for ETCP module |
|
#define DEBUG_CATEGORY_ETCP_DETAILED 1 |
|
|
|
// Constants from spec (adjusted for completeness) |
|
#define MAX_INFLIGHT_BYTES 65536 // Initial window |
|
#define RETRANS_K1 2.0f // RTT multiplier for retrans timeout |
|
#define RETRANS_K2 1.5f // Jitter multiplier |
|
#define ACK_DELAY_TB 20 // ACK timer delay (2ms in 0.1ms units) |
|
#define BURST_DELAY_FACTOR 4 // Delay before burst |
|
#define BURST_SIZE 5 // Packets in burst (1 delayed + 4 burst) |
|
#define RTT_HISTORY_SIZE 10 // For jitter calc |
|
#define MAX_PENDING 32 // For ACKs/retrans (arbitrary; adjust) |
|
#define SECTION_HEADER_SIZE 3 // type(1) + len(2) |
|
|
|
// Container-of macro for getting struct from data pointer |
|
//#define CONTAINER_OF(ptr, type, member) ((type *)((char *)(ptr) - offsetof(type, member))) |
|
|
|
// Forward declarations |
|
static void input_queue_cb(struct ll_queue* q, void* arg); |
|
static void etcp_link_ready_callback(struct ETCP_CONN* etcp); |
|
static void input_send_q_cb(struct ll_queue* q, void* arg); |
|
static void wait_ack_cb(struct ll_queue* q, void* arg); |
|
static void send_ack_req_cb(struct ll_queue* q, void* arg); |
|
static void etcp_conn_process_send_queue(struct ETCP_CONN* etcp); |
|
struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp); |
|
|
|
static void clear_queue(struct ll_queue* q) { |
|
if (!q) return; |
|
struct ll_entry* pkt; |
|
while ((pkt = queue_data_get(q)) != NULL) { |
|
queue_dgram_free(pkt); |
|
queue_entry_free(pkt); |
|
} |
|
} |
|
|
|
static void drain_and_free_queue(struct ll_queue** q) { |
|
if (!*q) return; |
|
clear_queue(*q); |
|
queue_free(*q); |
|
*q = NULL; |
|
} |
|
|
|
uint16_t get_current_timestamp() { |
|
return (uint16_t)get_time_tb(); |
|
} |
|
|
|
// Timestamp diff (with wrap-around) |
|
static uint16_t timestamp_diff(uint16_t t1, uint16_t t2) { |
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
if (t1 >= t2) { |
|
return t1 - t2; |
|
} |
|
return (UINT16_MAX - t2) + t1 + 1; |
|
} |
|
|
|
static const char EMPTY_NAME[] = ""; |
|
|
|
// Create new ETCP connection |
|
struct ETCP_CONN* etcp_connection_create(struct UTUN_INSTANCE* instance, char* name) { |
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
if (!instance) return NULL; |
|
|
|
|
|
struct ETCP_CONN* etcp = u_calloc(1, sizeof(struct ETCP_CONN)); |
|
if (!etcp) { |
|
DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "etcp_connection_create: creating connection failed for instance %p", instance); |
|
return NULL; |
|
} |
|
|
|
etcp->instance = instance; |
|
etcp->input_queue = queue_new(instance->ua, 0, "ETCP input"); // No hash for input_queue |
|
etcp->output_queue = queue_new(instance->ua, 0, "ETCP output"); // No hash for output_queue |
|
etcp->input_send_q = queue_new(instance->ua, INFLIGHT_INITIAL_HASH_SIZE, "input_send_q"); // Hash for send_q |
|
etcp->input_wait_ack = queue_new(instance->ua, INFLIGHT_INITIAL_HASH_SIZE, "input_wait_ack"); // Hash for wait_ack |
|
etcp->recv_q = queue_new(instance->ua, INFLIGHT_INITIAL_HASH_SIZE, "recv_q"); // Hash for send_q |
|
etcp->ack_q = queue_new(instance->ua, 0, "ack_q"); |
|
etcp->inflight_pool = memory_pool_init(sizeof(struct INFLIGHT_PACKET)); |
|
etcp->io_pool = memory_pool_init(sizeof(struct ETCP_FRAGMENT)); |
|
etcp->optimal_inflight=10000; |
|
etcp->initialized=0; |
|
etcp->links_up=0; |
|
etcp->name = u_strdup(name); |
|
// Initialize log_name with local node_id (peer will be updated later when known) |
|
snprintf(etcp->log_name, sizeof(etcp->log_name), "%04X->???? [%s]", (uint16_t)instance->node_id, etcp->name); |
|
|
|
if (!etcp->input_queue || !etcp->output_queue || !etcp->input_send_q || !etcp->recv_q || !etcp->ack_q || |
|
!etcp->input_wait_ack || !etcp->inflight_pool || !etcp->io_pool) { |
|
DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "etcp_connection_create: error - closing - input:%p output:%p send:%p wait:%x pool:%p", |
|
etcp->input_queue, etcp->output_queue, etcp->input_send_q, etcp->input_wait_ack, etcp->inflight_pool); |
|
etcp_connection_close(etcp); |
|
return NULL; |
|
} |
|
|
|
|
|
// etcp->window_size = MAX_INFLIGHT_BYTES; // Not used |
|
etcp->mtu = 1500; // Default MTU |
|
etcp->next_tx_id = 1; |
|
etcp->rtt_avg_10 = 10; // Initial guess (1ms) |
|
etcp->rtt_avg_100 = 10; |
|
etcp->rtt_history_idx = 0; |
|
memset(etcp->rtt_history, 0, sizeof(etcp->rtt_history)); |
|
|
|
|
|
etcp->normalizer = pn_init(etcp); |
|
if (!etcp->normalizer) { |
|
etcp_connection_close(etcp); |
|
return NULL; |
|
} |
|
|
|
// Set input queue callback |
|
queue_set_callback(etcp->input_queue, input_queue_cb, etcp); |
|
queue_set_callback(etcp->input_send_q, input_send_q_cb, etcp); |
|
queue_set_callback(etcp->input_wait_ack, wait_ack_cb, etcp); |
|
queue_set_callback(etcp->ack_q, send_ack_req_cb, etcp); |
|
|
|
etcp->link_ready_for_send_fn = etcp_link_ready_callback; |
|
|
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] connection initialized. ETCP=%p mtu=%d, next_tx_id=%u", |
|
etcp->log_name, etcp, etcp->mtu, etcp->next_tx_id); |
|
|
|
// Вызываем callback для нового соединения если установлен |
|
if (instance && instance->etcp_new_conn_cbk) { |
|
instance->etcp_new_conn_cbk(etcp, instance->etcp_new_conn_arg); |
|
} |
|
|
|
return etcp; |
|
} |
|
|
|
|
|
static void etcp_on_up(struct ETCP_CONN* etcp) { |
|
if (etcp->up_cbk) etcp->up_cbk(etcp, etcp->up_arg); |
|
} |
|
|
|
static void etcp_on_down(struct ETCP_CONN* etcp) { |
|
if (etcp->down_cbk) etcp->down_cbk(etcp, etcp->down_arg); |
|
} |
|
|
|
|
|
// Close connection with NULL pointer safety (prevents double free) |
|
void etcp_connection_close(struct ETCP_CONN* etcp) { |
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
if (!etcp) return; |
|
|
|
if (etcp->links_up!=0) { |
|
etcp->links_up=0; |
|
etcp_on_down(etcp); |
|
} |
|
|
|
// Cancel active timers to prevent memory leaks |
|
if (etcp->retrans_timer) { |
|
uasync_cancel_timeout(etcp->instance->ua, etcp->retrans_timer); |
|
etcp->retrans_timer = NULL; |
|
} |
|
if (etcp->ack_resp_timer) { |
|
uasync_cancel_timeout(etcp->instance->ua, etcp->ack_resp_timer); |
|
etcp->ack_resp_timer = NULL; |
|
} |
|
|
|
routing_del_conn(etcp); |
|
|
|
// Notify BGP about connection closure (send withdraws, remove from senders_list) |
|
if (etcp->instance && etcp->instance->bgp) { |
|
route_bgp_remove_conn(etcp); |
|
} |
|
|
|
// Deinitialize packet normalizer (this will call routing_del_conn) |
|
if (etcp->normalizer) { |
|
pn_deinit((struct PKTNORM*)etcp->normalizer); |
|
etcp->normalizer = NULL; |
|
} |
|
|
|
// Drain and free all queues using helper functions |
|
drain_and_free_queue(&etcp->input_queue); |
|
drain_and_free_queue(&etcp->output_queue); |
|
drain_and_free_queue(&etcp->input_send_q); |
|
drain_and_free_queue(&etcp->input_wait_ack); |
|
drain_and_free_queue(&etcp->recv_q); |
|
|
|
// Drain and free ack_q (contains ACK_PACKET from ack_pool - special handling) |
|
if (etcp->ack_q) { |
|
struct ACK_PACKET* pkt; |
|
while ((pkt = (struct ACK_PACKET*)queue_data_get(etcp->ack_q)) != NULL) { |
|
queue_entry_free((struct ll_entry*)pkt); |
|
} |
|
queue_free(etcp->ack_q); |
|
etcp->ack_q = NULL; |
|
} |
|
|
|
// Free memory pools after all elements are returned |
|
if (etcp->inflight_pool) { |
|
memory_pool_destroy(etcp->inflight_pool); |
|
etcp->inflight_pool = NULL; |
|
} |
|
|
|
if (etcp->io_pool) { |
|
memory_pool_destroy(etcp->io_pool); |
|
etcp->io_pool = NULL; |
|
} |
|
|
|
// Clear links list safely |
|
if (etcp->links) { |
|
struct ETCP_LINK* link = etcp->links; |
|
while (link) { |
|
struct ETCP_LINK* next = link->next; |
|
etcp_link_close(link); |
|
link = next; |
|
} |
|
etcp->links = NULL; |
|
} |
|
|
|
u_free(etcp->name); |
|
|
|
// Clear next pointer to prevent dangling references |
|
etcp->next = NULL; |
|
|
|
// TODO: Free rx_list, etc. |
|
u_free(etcp); |
|
} |
|
|
|
// Reset connection |
|
void etcp_conn_reset(struct ETCP_CONN* etcp) { |
|
// Reset IDs |
|
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Resetting ETCP instance [%d]", etcp->log_name); |
|
etcp->next_tx_id = 1; |
|
etcp->last_rx_id = 0; |
|
etcp->last_delivered_id = 0; |
|
etcp->rx_ack_till = 0; |
|
|
|
// Устанавливаем флаг ожидания первого пакета |
|
etcp->got_initial_pkt = 0; |
|
|
|
// Reset metrics |
|
etcp->unacked_bytes = 0; |
|
etcp->rtt_last = 0; |
|
etcp->rtt_avg_10 = 0; |
|
etcp->rtt_avg_100 = 0; |
|
etcp->jitter = 0; |
|
etcp->bytes_sent_total = 0; |
|
etcp->retransmissions_count = 0; |
|
etcp->ack_packets_count = 0; |
|
|
|
// Reset RTT history |
|
memset(etcp->rtt_history, 0, sizeof(etcp->rtt_history)); |
|
etcp->rtt_history_idx = 0; |
|
|
|
// Clear queues (keep queue structures) |
|
clear_queue(etcp->input_queue); |
|
clear_queue(etcp->output_queue); |
|
clear_queue(etcp->input_send_q); |
|
clear_queue(etcp->input_wait_ack); |
|
clear_queue(etcp->recv_q); |
|
|
|
// Reset timers (just clear the pointers - timers will expire naturally) |
|
etcp->retrans_timer = NULL; |
|
etcp->ack_resp_timer = NULL; |
|
|
|
etcp->reset_count++; |
|
|
|
// Reset normalizer (packer/unpacker state for reconnection) |
|
if (etcp->normalizer) { |
|
pn_reset(etcp->normalizer); |
|
} |
|
|
|
queue_resume_callback(etcp->input_queue); |
|
queue_resume_callback(etcp->output_queue); |
|
queue_resume_callback(etcp->input_send_q); |
|
queue_resume_callback(etcp->input_wait_ack); |
|
queue_resume_callback(etcp->recv_q); |
|
// queue_resume_callback(etcp->ack_q); |
|
|
|
etcp->tx_state=ETCP_TX_STATE_DATA_WAIT; |
|
|
|
|
|
int up=0; |
|
struct ETCP_LINK* link = etcp->links; |
|
while (link) { |
|
if (link->link_status) up=1; |
|
link = link->next; |
|
} |
|
|
|
if (up) { |
|
if (etcp->links_up==0) { |
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "reset conn: set link up"); |
|
etcp->links_up=1; |
|
etcp_on_up(etcp); |
|
} else { |
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "reset conn: set link down/up"); |
|
etcp_on_down(etcp); |
|
etcp_on_up(etcp); |
|
} |
|
} else DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "links not up - skip on_down/on_up"); |
|
|
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "end"); |
|
} |
|
|
|
void etcp_conn_reinit(struct ETCP_CONN* etcp) {// Если сбой в обмене или ребутнулась одна из сторон -> необходимо заново переинициализировать соединение |
|
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Reinitializing ETCP connection [%s]", etcp->log_name); |
|
|
|
etcp->reinit_count++; |
|
|
|
etcp->initialized = 0; |
|
|
|
// Сбрасываем initialized во всех линках |
|
struct ETCP_LINK* link = etcp->links; |
|
while (link) { |
|
link->initialized = 0; |
|
link = link->next; |
|
} |
|
|
|
// Вызываем etcp_conn_reset для сброса состояния |
|
etcp_conn_reset(etcp); |
|
|
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "end"); |
|
} |
|
|
|
// внутренняя функция. Вызывается один раз когда первый линк готов. |
|
void etcp_conn_ready(struct ETCP_CONN* conn) { |
|
if (!conn) return; |
|
|
|
conn->initialized = 1; |
|
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[%s] Connection ready", conn->log_name); |
|
|
|
// Вызываем callback если установлен |
|
if (conn->ready_cbk) conn->ready_cbk(conn, conn->ready_arg); |
|
} |
|
|
|
|
|
// Update log_name when peer_node_id becomes known |
|
void etcp_update_log_name(struct ETCP_CONN* etcp) { |
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
if (!etcp || !etcp->instance) return; |
|
uint16_t local_id = etcp->instance->node_id; |
|
uint16_t peer_id = etcp->peer_node_id; |
|
const char* name = etcp->name ? etcp->name : EMPTY_NAME; |
|
snprintf(etcp->log_name, sizeof(etcp->log_name), "%04X->%04X [%s]", local_id, peer_id, name); |
|
} |
|
|
|
|
|
// ====================================================================== Отправка данных в etcp после нормализации |
|
|
|
// Send data through ETCP connection |
|
// Allocates memory from data_pool and places in input queue |
|
// Returns: 0 on success, -1 on failure |
|
int etcp_int_send(struct ETCP_CONN* etcp, const void* data, uint16_t len) { |
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
|
|
if (!etcp || !data || len == 0) { |
|
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] invalid parameters (etcp=%p, data=%p, len=%zu)", etcp->log_name, etcp, data, len); |
|
return -1; |
|
} |
|
|
|
if (!etcp->input_queue) { |
|
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] input_queue is NULL for etcp=%p", etcp->log_name, etcp); |
|
return -1; |
|
} |
|
|
|
// Check length against maximum packet size |
|
if (len > PACKET_DATA_SIZE) { |
|
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] packet too large (len=%zu, max=%d)", etcp->log_name, len, PACKET_DATA_SIZE); |
|
return -1; |
|
} |
|
|
|
// Allocate packet data from data_pool (following ETCP reception pattern) |
|
uint8_t* packet_data = memory_pool_alloc(etcp->instance->data_pool); |
|
if (!packet_data) { |
|
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to allocate packet data from data_pool", etcp->log_name); |
|
return -1; |
|
} |
|
|
|
// Copy user data to packet buffer |
|
memcpy(packet_data, data, len); |
|
|
|
struct ETCP_FRAGMENT* pkt = (struct ETCP_FRAGMENT*)queue_entry_new_from_pool(etcp->io_pool); |
|
if (!pkt) { |
|
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to allocate queue entry", etcp->log_name); |
|
memory_pool_free(etcp->instance->data_pool, packet_data); |
|
return -1; |
|
} |
|
|
|
pkt->seq = 0; // Will be assigned by input_queue_cb |
|
pkt->timestamp = 0; // Will be set by input_queue_cb |
|
pkt->ll.dgram = packet_data; // Point to data_pool allocation |
|
pkt->ll.len = len; // размер packet_data |
|
pkt->ll.dgram_pool = etcp->instance->data_pool; |
|
pkt->ll.memlen = etcp->instance->data_pool->object_size; |
|
|
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] created PACKET %p with data %p (len=%zu)", etcp->log_name, pkt, packet_data, len); |
|
|
|
// Add to input queue - input_queue_cb will process it |
|
if (queue_data_put(etcp->input_queue, (struct ll_entry*)pkt, 0) != 0) { |
|
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to add to input queue", etcp->log_name); |
|
queue_dgram_free(&pkt->ll); |
|
queue_entry_free(&pkt->ll); |
|
return -1; |
|
} |
|
|
|
return 0; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
static void input_queue_try_push(struct ETCP_CONN* etcp) {// пробуем протолкнуть при отправке |
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
|
|
// когда очередь отправки пуста - пробуем взять новый пакет на обработку |
|
size_t wait_ack_bytes = queue_total_bytes(etcp->input_wait_ack); |
|
if (wait_ack_bytes <= etcp->optimal_inflight) { |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] resume input queue: inflight_bytes=%d, input_len=%d", etcp->log_name, wait_ack_bytes, etcp->input_queue->total_bytes); |
|
queue_resume_callback(etcp->input_queue);// и только когда больше нечего отправлять - забираем новый пакет |
|
} |
|
} |
|
|
|
static void input_queue_try_resume(struct ETCP_CONN* etcp) {// при ACK |
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
|
|
// Сперва отправим всё из очереди отправки |
|
size_t send_q_bytes = queue_total_bytes(etcp->input_send_q); |
|
// queue_resume_callback(etcp->input_send_q);// вызвать лишний раз resume не страшно. |
|
if (send_q_bytes>0) return; |
|
|
|
// когда очередь отправки пуста - пробуем взять новый пакет на обработку |
|
size_t wait_ack_bytes = queue_total_bytes(etcp->input_wait_ack); |
|
if (wait_ack_bytes <= etcp->optimal_inflight) { |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] resume input queue: inflight_bytes=%d, input_len=%d", etcp->log_name, wait_ack_bytes, etcp->input_queue->total_bytes); |
|
queue_resume_callback(etcp->input_queue);// и только когда больше нечего отправлять - забираем новый пакет |
|
} |
|
} |
|
|
|
void etcp_stats(struct ETCP_CONN* etcp) { |
|
if (!etcp) return; |
|
|
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] stats for conn=%p:", etcp->log_name, etcp); |
|
|
|
// Queue statistics |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] Queues:", etcp->log_name); |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] input_queue: %zu pkts, %zu bytes", etcp->log_name, |
|
queue_entry_count(etcp->input_queue), queue_total_bytes(etcp->input_queue)); |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] input_send_q: %zu pkts, %zu bytes", etcp->log_name, |
|
queue_entry_count(etcp->input_send_q), queue_total_bytes(etcp->input_send_q)); |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] input_wait_ack: %zu pkts, %zu bytes", etcp->log_name, |
|
queue_entry_count(etcp->input_wait_ack), queue_total_bytes(etcp->input_wait_ack)); |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] ack_q: %zu pkts", etcp->log_name, |
|
queue_entry_count(etcp->ack_q)); |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] recv_q: %zu pkts", etcp->log_name, |
|
queue_entry_count(etcp->recv_q)); |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] output_queue: %zu pkts, %zu bytes", etcp->log_name, |
|
queue_entry_count(etcp->output_queue), queue_total_bytes(etcp->output_queue)); |
|
|
|
// RTT metrics |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] RTT metrics:", etcp->log_name); |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] rtt_last: %u (0.1ms)", etcp->log_name, etcp->rtt_last); |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] rtt_avg_10: %u (0.1ms)", etcp->log_name, etcp->rtt_avg_10); |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] rtt_avg_100: %u (0.1ms)", etcp->log_name, etcp->rtt_avg_100); |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] jitter: %u (0.1ms)", etcp->log_name, etcp->jitter); |
|
|
|
// Counters |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] Counters:", etcp->log_name); |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] bytes_sent_total: %u", etcp->log_name, etcp->bytes_sent_total); |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] retransmissions_count: %u", etcp->log_name, etcp->retransmissions_count); |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] ack_packets_count: %u", etcp->log_name, etcp->ack_packets_count); |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] unacked_bytes: %u", etcp->log_name, etcp->unacked_bytes); |
|
|
|
// IDs |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] IDs:", etcp->log_name); |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] next_tx_id: %u", etcp->log_name, etcp->next_tx_id); |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] last_rx_id: %u", etcp->log_name, etcp->last_rx_id); |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] last_delivered_id:%u", etcp->log_name, etcp->last_delivered_id); |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] rx_ack_till: %u", etcp->log_name, etcp->rx_ack_till); |
|
} |
|
|
|
// Input callback for input_queue (добавление новых кодограмм в стек) |
|
// input_queue -> input_send_q |
|
static void input_queue_cb(struct ll_queue* q, void* arg) { |
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg; |
|
struct ETCP_FRAGMENT* in_pkt = (struct ETCP_FRAGMENT*)queue_data_get(q); |
|
|
|
if (!in_pkt) { |
|
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] cannot get element (pool=%p etcp=%p)", etcp->log_name, etcp->inflight_pool, etcp); |
|
queue_resume_callback(q); |
|
return; |
|
} |
|
|
|
|
|
|
|
// Create INFLIGHT_PACKET |
|
|
|
struct INFLIGHT_PACKET* p = (struct INFLIGHT_PACKET*)queue_entry_new_from_pool(etcp->inflight_pool); |
|
if (!p) { |
|
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] cannot allocate INFLIGHT_PACKET (pool=%p etcp=%p)", etcp->log_name, etcp->inflight_pool, etcp); |
|
queue_entry_free((struct ll_entry*)in_pkt); // Free the ETCP_FRAGMENT |
|
queue_resume_callback(q); |
|
return; |
|
} |
|
|
|
// Setup inflight packet (based on protocol.txt) |
|
memset(p, 0, sizeof(*p)); |
|
p->seq = etcp->next_tx_id++; // Assign seq |
|
p->state = INFLIGHT_STATE_WAIT_SEND; |
|
p->last_timestamp = 0; |
|
p->ll.dgram = in_pkt->ll.dgram; |
|
p->ll.dgram_pool = in_pkt->ll.dgram_pool; |
|
p->ll.len = in_pkt->ll.len; |
|
|
|
// memory_pool_free(etcp->io_pool, in_pkt);// перемещаем из io_pool в inflight_pool |
|
queue_entry_free(&in_pkt->ll); |
|
|
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] input -> inflight (seq=%u, len=%u)", etcp->log_name, p->seq, p->ll.len); |
|
int len=p->ll.len;// сохраним len |
|
|
|
// Add to send queue |
|
if (queue_data_put(etcp->input_send_q, &p->ll, p->seq) != 0) { |
|
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to add packet seq=%u to input_send_q", etcp->log_name, p->seq); |
|
queue_dgram_free(&p->ll); |
|
queue_entry_free(&p->ll); |
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "[%s] EXIT (queue put failed)", etcp->log_name); |
|
return; |
|
} |
|
etcp->unacked_bytes += len; |
|
|
|
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "input_queue_cb: successfully moved from input_queue to input_send_q"); |
|
|
|
// etcp_conn_process_send_queue(etcp);// сразу обработаем этот пакет |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] nextloop, input_queue size=%d ", etcp->log_name, q->count); |
|
} |
|
|
|
static void ack_timeout_cb(void* arg); |
|
static void ack_timeout_check(struct ETCP_CONN* etcp) { |
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
|
|
uint64_t now = get_time_tb(); |
|
int64_t timeout = 1000;//(uint64_t)(etcp->rtt_avg_10 * RETRANS_K1) + (uint64_t)(etcp->jitter * RETRANS_K2); |
|
|
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "starting check, now=%llu, timeout=%llu, rtt_avg_10=%u, jitter=%u", |
|
(unsigned long long)now, (unsigned long long)timeout, etcp->rtt_avg_10, etcp->jitter); |
|
|
|
|
|
struct ll_entry* current; |
|
while (current = etcp->input_wait_ack->head) { |
|
struct INFLIGHT_PACKET* pkt = (struct INFLIGHT_PACKET*)current; |
|
|
|
int64_t elapsed = now - pkt->last_timestamp; |
|
if (elapsed > timeout) { |
|
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[%s] timeout for seq=%u, elapsed=%lld, now=%llu, timeout=%llu, send_count=%u. Moving: wait_ack -> send_q", |
|
etcp->log_name, pkt->seq, (unsigned long long)elapsed, (unsigned long long)now, (unsigned long long)timeout, pkt->send_count); |
|
|
|
// Remove from wait_ack |
|
pkt=(struct INFLIGHT_PACKET*)queue_data_get(etcp->input_wait_ack); |
|
if (!pkt) break; |
|
// Increment counters |
|
pkt->send_count++; |
|
pkt->retrans_req_count++; // Optional, if used for retrans request logic |
|
pkt->last_timestamp = now; |
|
|
|
// Change state and add to send_q for retransmission |
|
pkt->state = INFLIGHT_STATE_WAIT_SEND; |
|
queue_data_put(etcp->input_send_q, (struct ll_entry*)pkt, pkt->seq); |
|
|
|
// Update stats |
|
etcp->retransmissions_count++; |
|
} |
|
else {// не надо до конца сканировать - они уже сортированы по таймстемпу т.к. очередь fifo, а timestamp = время добавления в очередь = время отправки |
|
// shedule timer |
|
int64_t next_timeout=timeout - elapsed; |
|
if (next_timeout<0) next_timeout=0; |
|
etcp->retrans_timer = uasync_set_timeout(etcp->instance->ua, next_timeout+10, etcp, ack_timeout_cb); |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] retransmission timer set for %llu units", etcp->log_name, next_timeout); |
|
return; |
|
} |
|
} |
|
queue_resume_callback(etcp->input_wait_ack); |
|
} |
|
|
|
static void ack_timeout_cb(void* arg) {// сработал таймер переотправки |
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg; |
|
etcp->retrans_timer=NULL; |
|
ack_timeout_check(etcp);// он установит новый таймер или выгребет всё и установит ожидание на очередь |
|
} |
|
|
|
static void wait_ack_cb(struct ll_queue* q, void* arg) {// добавили пакет в ожидание ACK |
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg; |
|
if (!etcp->retrans_timer) ack_timeout_check(etcp);// если таймер ретрансмиссий взведен - дожидаемся таймера. |
|
} |
|
|
|
static void input_send_q_cb(struct ll_queue* q, void* arg) {// etcp->input_send_q data ready |
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
struct ETCP_CONN* etcp=(struct ETCP_CONN*)arg; |
|
etcp_conn_process_send_queue(etcp); |
|
if (etcp->tx_state==ETCP_TX_STATE_DATA_WAIT) queue_resume_callback(etcp->input_send_q); |
|
} |
|
|
|
static void send_ack_req_cb(struct ll_queue* q, void* arg) {// etcp->ack_q data ready |
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
struct ETCP_CONN* etcp=(struct ETCP_CONN*)arg; |
|
etcp_conn_process_send_queue(etcp); |
|
if (etcp->tx_state==ETCP_TX_STATE_DATA_WAIT) queue_resume_callback(etcp->ack_q); |
|
} |
|
|
|
void etcp_on_link_down(struct ETCP_CONN* etcp) { |
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
// scan all links -> |
|
int up=0; |
|
struct ETCP_LINK* link = etcp->links; |
|
while (link) { |
|
if (link->link_status) up=1; |
|
link = link->next; |
|
} |
|
etcp->links_up=up; |
|
if (up==0 && etcp->links_up!=0) { |
|
etcp->links_up=0; |
|
etcp_on_down(etcp); |
|
} |
|
} |
|
|
|
static void etcp_link_ready_callback(struct ETCP_CONN* etcp) { |
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
if (!etcp) return; |
|
|
|
if (etcp->tx_state==0) { |
|
DEBUG_WARN(DEBUG_CATEGORY_ETCP, "ETCP not ready, skip link state change"); |
|
return;// not initialized |
|
} |
|
|
|
if (etcp->links_up==0) { |
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "skip on_up - ETCP already up"); |
|
etcp->links_up=1; |
|
etcp_on_up(etcp); |
|
} else DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "skip on_up - ETCP already up"); |
|
|
|
|
|
if (etcp->tx_state!=ETCP_TX_STATE_LINK_WAIT) return; |
|
etcp->tx_state = ETCP_TX_STATE_DATA_WAIT; |
|
queue_resume_callback(etcp->input_send_q); |
|
queue_resume_callback(etcp->ack_q); |
|
} |
|
|
|
static void ack_response_timer_cb(void* arg) {// проверяем неотправленные ack response и отправляем если надо. |
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg; |
|
etcp_conn_process_send_queue(etcp);// проталкиваем (она же должна отправлять только ack если больше ничего нет) |
|
// если ack все еще заняты - обновляем таймаут |
|
if (etcp->ack_q->count) etcp->ack_resp_timer = uasync_set_timeout(etcp->instance->ua, ACK_DELAY_TB, etcp, ack_response_timer_cb); |
|
else etcp->ack_resp_timer=NULL; |
|
} |
|
|
|
// Process packets in send queue and transmit them |
|
static void etcp_conn_process_send_queue(struct ETCP_CONN* etcp) {// вызываем когда есть элемент в send_q или надо отправить ack |
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
struct ETCP_DGRAM* dgram; |
|
if (etcp->tx_state!=ETCP_TX_STATE_DATA_WAIT) { |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] TX state: %d", etcp->log_name, etcp->tx_state); |
|
return; |
|
} |
|
while(dgram = etcp_request_pkt(etcp)) { |
|
etcp_loadbalancer_send(dgram); |
|
} |
|
} |
|
|
|
// Подготовить и отправить кодограмму |
|
// вызывается линком когда освобождается или очередью если появляются данные на передачу |
|
struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) { |
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
struct ETCP_LINK* link = etcp_loadbalancer_select_link(etcp); |
|
if (!link) { |
|
etcp->tx_state=ETCP_TX_STATE_LINK_WAIT; |
|
etcp->cnt_link_wait++; |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] no link available", etcp->log_name); |
|
return NULL;// если линков нет - ждём появления свободного |
|
} |
|
etcp->tx_state=ETCP_TX_STATE_DATA_WAIT; |
|
|
|
size_t send_q_size = queue_entry_count(etcp->input_send_q); |
|
|
|
if (send_q_size == 0) {// сгребаем из input_queue |
|
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "input_send_q empty, check if avail input_queue -> inflight"); |
|
input_queue_try_push(etcp); |
|
} |
|
|
|
|
|
// First, check if there's a packet in input_send_q (retrans or new) |
|
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "getting packet from input_send_q"); |
|
struct INFLIGHT_PACKET* inf_pkt = (struct INFLIGHT_PACKET*)queue_data_get(etcp->input_send_q); |
|
if (inf_pkt) { |
|
uint64_t now=get_time_tb(); |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] send_q->wait_ack seq=%d TS=%llu", etcp->log_name, inf_pkt->seq, now); |
|
inf_pkt->last_timestamp=now; |
|
inf_pkt->send_count++; |
|
inf_pkt->state=INFLIGHT_STATE_WAIT_ACK; |
|
queue_data_put(etcp->input_wait_ack, &inf_pkt->ll, inf_pkt->seq);// move dgram to wait_ack queue |
|
} |
|
size_t ack_q_size = queue_entry_count(etcp->ack_q); |
|
|
|
if (!inf_pkt && ack_q_size == 0) { |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] no data/ack to send", etcp->log_name); |
|
return NULL; |
|
} |
|
|
|
struct ETCP_DGRAM* dgram = memory_pool_alloc(etcp->instance->pkt_pool); |
|
if (!dgram) { |
|
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to allocate ETCP_DGRAM", etcp->log_name); |
|
return NULL; |
|
} |
|
|
|
dgram->link = link; |
|
dgram->noencrypt_len=0; |
|
dgram->timestamp = get_current_timestamp(); |
|
|
|
// формат ack: [01] [elements count] [4 байта last_delivered_id] и <[4 байта seq][2 байта recv_ts][2 байта txrx delay ts]> x count |
|
dgram->data[0]=1;// ack |
|
int ptr=2; |
|
|
|
dgram->data[ptr++]=etcp->last_delivered_id; |
|
dgram->data[ptr++]=etcp->last_delivered_id>>8; |
|
dgram->data[ptr++]=etcp->last_delivered_id>>16; |
|
dgram->data[ptr++]=etcp->last_delivered_id>>24; |
|
|
|
int data_len=0; |
|
if (inf_pkt) data_len=inf_pkt->ll.len; |
|
int remain_len=link->mtu -28/*udp headers*/ -13-8-4/*sc_nonce+tag size+crc*/ -9/*hdr[3] + min ack[6]*/ -5/*payload hdr*/ - data_len; |
|
// DEBUG_ERROR(DEBUG_CATEGORY_CRYPTO, "remain_len= %d pl=%d", remain_len, data_len); |
|
|
|
// добавим опциональные заголовки |
|
struct ACK_PACKET* ack_pkt; |
|
while (remain_len>=8) { |
|
ack_pkt = (struct ACK_PACKET*)queue_data_get(etcp->ack_q); |
|
if (!ack_pkt) break; |
|
remain_len-=8; |
|
// seq 4 байта |
|
dgram->data[ptr++]=ack_pkt->seq; |
|
dgram->data[ptr++]=ack_pkt->seq>>8; |
|
dgram->data[ptr++]=ack_pkt->seq>>16; |
|
dgram->data[ptr++]=ack_pkt->seq>>24; |
|
|
|
// ts приема 2 байта |
|
dgram->data[ptr++]=ack_pkt->recv_timestamp; |
|
dgram->data[ptr++]=ack_pkt->recv_timestamp>>8; |
|
|
|
// время задержки 2 байта между recv и ack |
|
uint16_t dly=get_current_timestamp()-ack_pkt->recv_timestamp; |
|
dgram->data[ptr++]=dly; |
|
dgram->data[ptr++]=dly>>8; |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] add ACK N%d dTS=%d", etcp->log_name, ack_pkt->seq, dly); |
|
queue_entry_free((struct ll_entry*)ack_pkt); |
|
|
|
if (inf_pkt && inf_pkt->ll.len+ptr>=etcp->mtu-10) break;// pkt len (надо просчитать точнее включая все заголовки) |
|
if (ptr>500) break; |
|
} |
|
|
|
dgram->data[1]=ptr/8; |
|
|
|
|
|
if (link->last_recv_updated && remain_len>=5) {// если есть данные - добавим channel_timestamp |
|
uint64_t now=get_time_tb(); |
|
uint64_t dt=now - link->last_recv_local_time; |
|
link->last_recv_updated=0; |
|
if (dt<1000000) { |
|
dgram->data[ptr++]=ETCP_SECTION_TIMESTAMP; |
|
|
|
uint16_t t=link->last_recv_timestamp + dt; |
|
dgram->data[ptr++]=t; |
|
dgram->data[ptr++]=t>>8; |
|
|
|
t=link->last_recv_local_time - link->last_recv_timestamp; |
|
dgram->data[ptr++]=t; |
|
dgram->data[ptr++]=t>>8; |
|
remain_len-=5; |
|
} |
|
} |
|
|
|
// DEBUG_ERROR(DEBUG_CATEGORY_CRYPTO, "remain_len(2)= %d", remain_len); |
|
|
|
if (inf_pkt) { |
|
// фрейм data (0) обязательно в конец |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] add DATA (seq=%u, len=%u), ack_size=%d", etcp->log_name, inf_pkt->seq, inf_pkt->ll.len, dgram->data[1]); |
|
dgram->data[ptr++]=0;// payload |
|
dgram->data[ptr++]=inf_pkt->seq; |
|
dgram->data[ptr++]=inf_pkt->seq>>8; |
|
dgram->data[ptr++]=inf_pkt->seq>>16; |
|
dgram->data[ptr++]=inf_pkt->seq>>24; |
|
|
|
memcpy(&dgram->data[ptr], inf_pkt->ll.dgram, inf_pkt->ll.len); ptr+=inf_pkt->ll.len; |
|
} |
|
else { |
|
int chk=queue_check_consistency(etcp->ack_q); |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] only ACK (size=%d) packet with %d bytes total (chk=%d) rem=%d", etcp->log_name, ack_q_size, ptr, chk, remain_len); |
|
} |
|
if (ptr>=PACKET_DATA_SIZE-50) DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] SIZE ERROR!!! %d", ptr); |
|
|
|
dgram->data_len=ptr; |
|
|
|
etcp_dump_pkt_sections(dgram, link, 1); |
|
|
|
return dgram; |
|
} |
|
|
|
// ====================================================================== Прием данных |
|
|
|
|
|
void etcp_output_try_assembly(struct ETCP_CONN* etcp) { |
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
// пробуем собрать выходную очередь из фрагментов |
|
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] etcp=%p, last_delivered_id=%u, recv_q_count=%d", |
|
// etcp->log_name, etcp, etcp->last_delivered_id, queue_entry_count(etcp->recv_q)); |
|
|
|
uint32_t next_expected_id = etcp->last_delivered_id + 1; |
|
int delivered_count = 0; |
|
uint32_t delivered_bytes = 0; |
|
|
|
// Look for contiguous packets starting from next_expected_id |
|
while (1) { |
|
struct ETCP_FRAGMENT* rx_pkt = (struct ETCP_FRAGMENT*)queue_find_data_by_id(etcp->recv_q, next_expected_id); |
|
if (!rx_pkt) { |
|
// No more contiguous packets found |
|
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] no packet found for id=%u, stopping", etcp->log_name, next_expected_id); |
|
break; |
|
} |
|
|
|
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] assembling packet id=%u (len=%u)", etcp->log_name, |
|
// rx_pkt->seq, rx_pkt->ll.len); |
|
|
|
// Simply move ETCP_FRAGMENT from recv_q to output_queue - no data copying needed |
|
// Remove from recv_q first |
|
queue_remove_data(etcp->recv_q, (struct ll_entry*)rx_pkt); |
|
|
|
// Add to output_queue using the same ETCP_FRAGMENT structure |
|
if (queue_data_put(etcp->output_queue, (struct ll_entry*)rx_pkt, next_expected_id) == 0) { |
|
delivered_bytes += rx_pkt->ll.len; |
|
delivered_count++; |
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "[%s] moved packet id=%u to output_queue (qlen=%d)", etcp->log_name, |
|
next_expected_id, etcp->output_queue->count); |
|
} else { |
|
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to add packet id=%u to output_queue", etcp->log_name, |
|
next_expected_id); |
|
// Put it back in recv_q if we can't add to output_queue |
|
queue_data_put(etcp->recv_q, (struct ll_entry*)rx_pkt, next_expected_id); |
|
break; |
|
} |
|
|
|
// Update state for next iteration |
|
etcp->last_delivered_id = next_expected_id; |
|
next_expected_id++; |
|
} |
|
|
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] delivered %u contiguous packets (%u bytes), last_delivered_id=%u, output_queue_count=%d", |
|
etcp->log_name, delivered_count, delivered_bytes, etcp->last_delivered_id, queue_entry_count(etcp->output_queue)); |
|
} |
|
|
|
// Process ACK receipt - remove acknowledged packet from inflight queues |
|
void etcp_ack_recv(struct ETCP_CONN* etcp, uint32_t seq, uint16_t ts, uint16_t dts) { |
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
if (!etcp) return; |
|
|
|
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] processing ACK for seq=%u, ts=%u, dts=%u", etcp->log_name, seq, ts, dts); |
|
|
|
// Find the acknowledged packet in the wait_ack queue |
|
struct INFLIGHT_PACKET* acked_pkt; |
|
if (acked_pkt = (struct INFLIGHT_PACKET*)queue_find_data_by_id(etcp->input_wait_ack, seq)) { |
|
etcp->cnt_ack_hit_inf++; |
|
queue_remove_data(etcp->input_wait_ack, (struct ll_entry*)acked_pkt); |
|
} |
|
else if ( acked_pkt = (struct INFLIGHT_PACKET*)queue_find_data_by_id(etcp->input_send_q, seq) ) { |
|
etcp->cnt_ack_hit_sndq++; |
|
queue_remove_data(etcp->input_send_q, (struct ll_entry*)acked_pkt); |
|
} |
|
else etcp->cnt_ack_miss++; |
|
|
|
if (!acked_pkt) { |
|
// Packet might be already acknowledged or not found |
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "etcp_ack_recv: packet seq=%u not found in wait_ack queue", seq); |
|
return; |
|
} |
|
|
|
// Calculate RTT if timestamps are valid |
|
/* if (ts != (uint16_t)-1 && dts != (uint16_t)-1) { |
|
uint16_t rtt = timestamp_diff(ts, dts); |
|
etcp->rtt_last = rtt; |
|
|
|
// Update RTT averages (exponential smoothing) |
|
if (etcp->rtt_avg_10 == 0) { |
|
etcp->rtt_avg_10 = rtt; |
|
etcp->rtt_avg_100 = rtt; |
|
} else { |
|
// RTT average over 10 packets |
|
etcp->rtt_avg_10 = (etcp->rtt_avg_10 * 9 + rtt) / 10; |
|
// RTT average over 100 packets |
|
etcp->rtt_avg_100 = (etcp->rtt_avg_100 * 99 + rtt) / 100; |
|
} |
|
|
|
// Update jitter calculation (max - min of last 10 RTT samples) |
|
etcp->rtt_history[etcp->rtt_history_idx] = rtt; |
|
etcp->rtt_history_idx = (etcp->rtt_history_idx + 1) % 10; |
|
|
|
uint16_t rtt_min = UINT16_MAX, rtt_max = 0; |
|
for (int i = 0; i < 10; i++) { |
|
if (etcp->rtt_history[i] < rtt_min) rtt_min = etcp->rtt_history[i]; |
|
if (etcp->rtt_history[i] > rtt_max) rtt_max = etcp->rtt_history[i]; |
|
} |
|
etcp->jitter = rtt_max - rtt_min; |
|
|
|
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] RTT updated - last=%u, avg_10=%u, avg_100=%u, jitter=%u", |
|
// etcp->log_name, rtt, etcp->rtt_avg_10, etcp->rtt_avg_100, etcp->jitter); |
|
} |
|
*/ |
|
// Update connection statistics |
|
etcp->unacked_bytes -= acked_pkt->ll.len; |
|
etcp->bytes_sent_total += acked_pkt->ll.len; |
|
etcp->ack_packets_count++; |
|
|
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] removed packet seq=%u from wait_ack, unacked_bytes now %u total acked=%u", etcp->log_name, seq, etcp->unacked_bytes, etcp->ack_packets_count); |
|
|
|
if (acked_pkt->ll.dgram) { |
|
memory_pool_free(etcp->instance->data_pool, acked_pkt->ll.dgram); |
|
} |
|
memory_pool_free(etcp->inflight_pool, acked_pkt); |
|
|
|
// Try to resume sending more packets if window space opened up |
|
input_queue_try_resume(etcp); |
|
|
|
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] completed for seq=%u", etcp->log_name, seq); |
|
} |
|
|
|
|
|
// Process incoming decrypted packet |
|
void etcp_conn_input(struct ETCP_DGRAM* pkt) { |
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
if (!pkt || !pkt->data_len) return; |
|
|
|
etcp_dump_pkt_sections(pkt, pkt->link, 0); |
|
|
|
struct ETCP_CONN* etcp = pkt->link->etcp; |
|
uint8_t* data = pkt->data; |
|
uint16_t len = pkt->data_len; |
|
uint16_t ts = pkt->timestamp; // Received timestamp |
|
|
|
// Note: Assume packet starts with sections after timestamp (but timestamp is already extracted in connections?). |
|
// Protocol.txt: timestamp is first 2B, then sections. |
|
// But in conn_input, pkt->data is after timestamp? Assume data starts with first section. |
|
|
|
while (len >= 1) { |
|
uint8_t type = data[0]; |
|
|
|
// Process sections as per protocol.txt |
|
switch (type) { |
|
case ETCP_SECTION_ACK: { |
|
int elm_cnt=data[1]; |
|
uint32_t till=data[2] | (data[3]<<8) | (data[4]<<16) | (data[5]<<24); |
|
int ack_section_len = 6 + elm_cnt * 8; |
|
data+=ack_section_len; |
|
len-=ack_section_len; |
|
for (int i=0; i<elm_cnt; i++) { |
|
uint32_t seq=data[-ack_section_len+6+i*8] | (data[-ack_section_len+7+i*8]<<8) | (data[-ack_section_len+8+i*8]<<16) | (data[-ack_section_len+9+i*8]<<24); |
|
uint16_t ts=data[-ack_section_len+10+i*8] | (data[-ack_section_len+11+i*8]<<8); |
|
uint16_t dts=data[-ack_section_len+12+i*8] | (data[-ack_section_len+13+i*8]<<8); |
|
etcp_ack_recv(etcp, seq, ts, dts); |
|
} |
|
while ((int32_t)(etcp->rx_ack_till-till)<0) { etcp->rx_ack_till++; etcp_ack_recv(etcp, etcp->rx_ack_till, -1, -1); }// подтверждаем всё по till |
|
break; |
|
} |
|
case ETCP_SECTION_TIMESTAMP: { |
|
uint16_t cur_ts=get_current_timestamp(); |
|
uint16_t ret_ts=data[1] | (data[2]<<8);// cur_ts=ret_ts = RTT |
|
pkt->link->rtt_last=cur_ts-ret_ts; |
|
int recv_dt_tx1=data[3] | (data[4]<<8);// localtime удаленной стороны момента принятия пакета - timestamp этого пакета (на стороне отправителя, т.е. у нас) |
|
|
|
int recv_dt_tx=recv_dt_tx1 - pkt->link->rtt_last/2; |
|
int recv_dt_rx=cur_ts - pkt->link->rtt_last/2 - ts; |
|
if (pkt->link->recv_dt_avg_rx==0) pkt->link->recv_dt_avg_rx=recv_dt_rx*256; |
|
if (pkt->link->recv_dt_avg_tx==0) pkt->link->recv_dt_avg_tx=recv_dt_tx*256; |
|
pkt->link->recv_dt_avg_rx +=(recv_dt_rx*256 - pkt->link->recv_dt_avg_rx)/256; |
|
pkt->link->recv_dt_avg_tx +=(recv_dt_tx*256 - pkt->link->recv_dt_avg_tx)/256; |
|
pkt->link->tt_last = recv_dt_tx1;// - pkt->link->recv_dt_avg_tx/256; |
|
pkt->link->rt_last = cur_ts - ts - pkt->link->recv_dt_avg_tx/256; |
|
//tts_correction += ((NOW - RTT/2 - TTS) - tts_correction)/16 (инициализируем сразу по 1 пакету) |
|
data+=5; len-=5; |
|
|
|
struct ETCP_LINK* c=etcp->links; |
|
int rtt_sum=0; |
|
int tt_sum=0; |
|
int cnt=0; |
|
while (c) { |
|
rtt_sum += c->rtt_last; |
|
tt_sum += c->tt_last; |
|
cnt++; |
|
c=c->next; |
|
} |
|
etcp->rtt_last=rtt_sum/cnt; |
|
etcp->tt_last=tt_sum/cnt; |
|
break; |
|
} |
|
case ETCP_SECTION_PAYLOAD: { |
|
|
|
if (len>=5) { |
|
// формируем ACK |
|
uint32_t seq=data[1] | (data[2]<<8) | (data[3]<<16) | (data[4]<<24); |
|
if (etcp->got_initial_pkt == 0) { |
|
if (seq==1) etcp->got_initial_pkt = 1; |
|
else { |
|
DEBUG_WARN(DEBUG_CATEGORY_ETCP, "[%s] Waiting for initial packet but recv seq=%d, ignoring", etcp->log_name, seq); |
|
len=0; |
|
break; |
|
} |
|
} |
|
else { |
|
int32_t d=seq - etcp->last_delivered_id; |
|
if (d>MAX_INFLIGHT_SIZE || d<-MAX_INFLIGHT_SIZE) { |
|
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] Received packet out of inflight bounds: seq=%d last delivered=%d", etcp->log_name, seq, etcp->last_delivered_id); |
|
len=0; |
|
break; |
|
} |
|
} |
|
struct ACK_PACKET* p = (struct ACK_PACKET*)queue_entry_new_from_pool(etcp->instance->ack_pool); |
|
p->seq=seq; |
|
p->pkt_timestamp=pkt->timestamp; |
|
p->recv_timestamp=get_current_timestamp(); |
|
queue_data_put(etcp->ack_q, (struct ll_entry*)p, p->seq); |
|
if (etcp->ack_resp_timer == NULL) { |
|
etcp->ack_resp_timer = uasync_set_timeout(etcp->instance->ua, ACK_DELAY_TB, etcp, ack_response_timer_cb); |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] set ack_timer for delayed ACK send", etcp->log_name); |
|
} |
|
if ((int32_t)(etcp->last_delivered_id-seq)<0) if (queue_find_data_by_id(etcp->recv_q, seq)==NULL) {// проверяем есть ли пакет с этим seq |
|
uint32_t pkt_len=len-5; |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] adding packet seq=%u to recv_q (last_delivered_id=%u)", etcp->log_name, seq, etcp->last_delivered_id); |
|
// отправляем пакет в очередь на сборку |
|
uint8_t* payload_data = memory_pool_alloc(etcp->instance->data_pool); |
|
if (!payload_data) { |
|
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to allocate payload_data from data_pool", etcp->log_name); |
|
len=0; |
|
break; |
|
} |
|
struct ETCP_FRAGMENT* rx_pkt = (struct ETCP_FRAGMENT*)queue_entry_new_from_pool(etcp->io_pool); |
|
if (!rx_pkt) { |
|
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to allocate rx_pkt from io_pool", etcp->log_name); |
|
memory_pool_free(etcp->instance->data_pool, payload_data); |
|
len=0; |
|
break; |
|
} |
|
rx_pkt->seq = seq; |
|
rx_pkt->timestamp = pkt->timestamp; |
|
rx_pkt->ll.dgram = payload_data; |
|
rx_pkt->ll.len = pkt_len; |
|
rx_pkt->ll.dgram_pool = etcp->instance->data_pool; |
|
rx_pkt->ll.memlen = etcp->instance->data_pool->object_size; |
|
// Copy the actual payload data |
|
memcpy(payload_data, data + 5, pkt_len); |
|
queue_data_put(etcp->recv_q, (struct ll_entry*)rx_pkt, seq); |
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] packet seq=%u added to recv_q, calling assembly (last_delivered_id=%u)", etcp->log_name, seq, etcp->last_delivered_id); |
|
if ((int32_t)(seq - etcp->last_delivered_id) == 1) etcp_output_try_assembly(etcp);// пробуем собрать выходную очередь из фрагментов |
|
} |
|
} |
|
len=0; |
|
break; |
|
} |
|
|
|
default: |
|
DEBUG_WARN(DEBUG_CATEGORY_ETCP, "etcp_conn_input: unknown section type=0x%02x", type); |
|
len=0; |
|
break; |
|
} |
|
|
|
} |
|
|
|
memory_pool_free(etcp->instance->pkt_pool, pkt); // Free the incoming dgram |
|
} |
|
|
|
|