Browse Source

1

nodeinfo-routing-update
jeka 4 weeks ago
parent
commit
81f4b343a5
  1. 39
      src/pkt_normalizer.c
  2. 2
      src/pkt_normalizer.h

39
src/pkt_normalizer.c

@ -14,6 +14,7 @@
// Forward declarations // Forward declarations
static void packer_cb(struct ll_queue* q, void* arg); static void packer_cb(struct ll_queue* q, void* arg);
static void pn_flush_cb(void* arg);
static void etcp_input_ready_cb(struct ll_queue* q, void* arg); static void etcp_input_ready_cb(struct ll_queue* q, void* arg);
static void pn_unpacker_cb(struct ll_queue* q, void* arg); static void pn_unpacker_cb(struct ll_queue* q, void* arg);
static void pn_send_to_etcp(struct PKTNORM* pn); static void pn_send_to_etcp(struct PKTNORM* pn);
@ -62,6 +63,7 @@ struct PKTNORM* pn_init(struct ETCP_CONN* etcp) {
pn->data = NULL; pn->data = NULL;
pn->recvpart = NULL; pn->recvpart = NULL;
pn->flush_timer = NULL;
return pn; return pn;
} }
@ -98,6 +100,10 @@ void pn_deinit(struct PKTNORM* pn) {
queue_free(pn->output); queue_free(pn->output);
} }
if (pn->flush_timer) {
uasync_cancel_timeout(pn->ua, pn->flush_timer);
}
if (pn->data) { if (pn->data) {
memory_pool_free(pn->etcp->instance->data_pool, pn->data); memory_pool_free(pn->etcp->instance->data_pool, pn->data);
} }
@ -138,6 +144,12 @@ void pn_reset(struct PKTNORM* pn) {
DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, ""); DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "");
if (!pn) return; if (!pn) return;
// Cancel flush timer
if (pn->flush_timer) {
uasync_cancel_timeout(pn->ua, pn->flush_timer);
pn->flush_timer = NULL;
}
// Reset packer state // Reset packer state
if (pn->data) { if (pn->data) {
memory_pool_free(pn->etcp->instance->data_pool, pn->data); memory_pool_free(pn->etcp->instance->data_pool, pn->data);
@ -184,6 +196,12 @@ void pn_packer_send(struct PKTNORM* pn, uint8_t* data, uint16_t len) {
entry->len = len; entry->len = len;
entry->dgram_pool = NULL; entry->dgram_pool = NULL;
// Cancel flush timer if active
if (pn->flush_timer) {
uasync_cancel_timeout(pn->ua, pn->flush_timer);
pn->flush_timer = NULL;
}
DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "PUT to input"); DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "PUT to input");
int ret = queue_data_put(pn->input, entry, 0); int ret = queue_data_put(pn->input, entry, 0);
pn->in_total_pkts++; pn->in_total_pkts++;
@ -291,9 +309,30 @@ exit:
queue_dgram_free(in_dgram); queue_dgram_free(in_dgram);
queue_entry_free(in_dgram); queue_entry_free(in_dgram);
// Cancel flush timer if active
if (pn->flush_timer) {
uasync_cancel_timeout(pn->ua, pn->flush_timer);
pn->flush_timer = NULL;
}
// Set flush timer if no more input
if (queue_entry_count(pn->input) == 0) {
pn->flush_timer = uasync_set_timeout(pn->ua, pn->tx_wait_time, pn, pn_flush_cb);
}
queue_resume_callback(pn->input); queue_resume_callback(pn->input);
} }
// Internal: Flush callback on timeout
static void pn_flush_cb(void* arg) {
DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "");
struct PKTNORM* pn = (struct PKTNORM*)arg;
if (!pn) return;
pn->flush_timer = NULL;
pn_send_to_etcp(pn);
}
// Internal: Unpacker callback (assembles fragments into original packets) // Internal: Unpacker callback (assembles fragments into original packets)
static void pn_unpacker_cb(struct ll_queue* q, void* arg) { static void pn_unpacker_cb(struct ll_queue* q, void* arg) {

2
src/pkt_normalizer.h

@ -32,7 +32,7 @@ struct PKTNORM {
uint16_t data_ptr; // число заполненных байт uint16_t data_ptr; // число заполненных байт
uint16_t data_size; // размер выделенной области uint16_t data_size; // размер выделенной области
// struct ll_entry sndpart; // блок ожидающий досборки // struct ll_entry sndpart; // блок ожидающий досборки
// void* flush_timer; // For timeout flush void* flush_timer; // For timeout flush
struct ll_entry* pending; // Partial processed input entry struct ll_entry* pending; // Partial processed input entry
uint16_t pending_in_ptr; // Pointer in pending entry uint16_t pending_in_ptr; // Pointer in pending entry

Loading…
Cancel
Save