From 81f4b343a5641ce88c51ad13e2b4c0be60b26faa Mon Sep 17 00:00:00 2001 From: jeka Date: Sun, 8 Mar 2026 01:18:03 +0300 Subject: [PATCH] 1 --- src/pkt_normalizer.c | 39 +++++++++++++++++++++++++++++++++++++++ src/pkt_normalizer.h | 2 +- 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/src/pkt_normalizer.c b/src/pkt_normalizer.c index d5df72d..245a1a2 100644 --- a/src/pkt_normalizer.c +++ b/src/pkt_normalizer.c @@ -14,6 +14,7 @@ // Forward declarations static void packer_cb(struct ll_queue* q, void* arg); +static void pn_flush_cb(void* arg); static void etcp_input_ready_cb(struct ll_queue* q, void* arg); static void pn_unpacker_cb(struct ll_queue* q, void* arg); static void pn_send_to_etcp(struct PKTNORM* pn); @@ -62,6 +63,7 @@ struct PKTNORM* pn_init(struct ETCP_CONN* etcp) { pn->data = NULL; pn->recvpart = NULL; + pn->flush_timer = NULL; return pn; } @@ -98,6 +100,10 @@ void pn_deinit(struct PKTNORM* pn) { queue_free(pn->output); } + if (pn->flush_timer) { + uasync_cancel_timeout(pn->ua, pn->flush_timer); + } + if (pn->data) { memory_pool_free(pn->etcp->instance->data_pool, pn->data); } @@ -138,6 +144,12 @@ void pn_reset(struct PKTNORM* pn) { DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, ""); if (!pn) return; + // Cancel flush timer + if (pn->flush_timer) { + uasync_cancel_timeout(pn->ua, pn->flush_timer); + pn->flush_timer = NULL; + } + // Reset packer state if (pn->data) { memory_pool_free(pn->etcp->instance->data_pool, pn->data); @@ -184,6 +196,12 @@ void pn_packer_send(struct PKTNORM* pn, uint8_t* data, uint16_t len) { entry->len = len; entry->dgram_pool = NULL; + // Cancel flush timer if active + if (pn->flush_timer) { + uasync_cancel_timeout(pn->ua, pn->flush_timer); + pn->flush_timer = NULL; + } + DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "PUT to input"); int ret = queue_data_put(pn->input, entry, 0); pn->in_total_pkts++; @@ -291,9 +309,30 @@ exit: queue_dgram_free(in_dgram); queue_entry_free(in_dgram); + // Cancel flush timer if active + if (pn->flush_timer) { + uasync_cancel_timeout(pn->ua, pn->flush_timer); + pn->flush_timer = NULL; + } + + // Set flush timer if no more input + if (queue_entry_count(pn->input) == 0) { + pn->flush_timer = uasync_set_timeout(pn->ua, pn->tx_wait_time, pn, pn_flush_cb); + } + queue_resume_callback(pn->input); } +// Internal: Flush callback on timeout +static void pn_flush_cb(void* arg) { + DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, ""); + struct PKTNORM* pn = (struct PKTNORM*)arg; + if (!pn) return; + + pn->flush_timer = NULL; + pn_send_to_etcp(pn); +} + // Internal: Unpacker callback (assembles fragments into original packets) static void pn_unpacker_cb(struct ll_queue* q, void* arg) { diff --git a/src/pkt_normalizer.h b/src/pkt_normalizer.h index 574aa33..848be88 100644 --- a/src/pkt_normalizer.h +++ b/src/pkt_normalizer.h @@ -32,7 +32,7 @@ struct PKTNORM { uint16_t data_ptr; // число заполненных байт uint16_t data_size; // размер выделенной области // struct ll_entry sndpart; // блок ожидающий досборки -// void* flush_timer; // For timeout flush + void* flush_timer; // For timeout flush struct ll_entry* pending; // Partial processed input entry uint16_t pending_in_ptr; // Pointer in pending entry