// pkt_normalizer.c - Implementation of packet normalizer for ETCP #include "pkt_normalizer.h" #include "etcp.h" // For ETCP_CONN and related structures #include "ll_queue.h" // For queue operations #include "u_async.h" // For UASYNC #include #include #include // For debugging (can be removed if not needed) #include "debug_config.h" // Assuming this for DEBUG_ERROR // Forward declarations static void packer_cb(struct ll_queue* q, void* arg); static void pn_flush_cb(void* arg); static void etcp_input_ready_cb(struct ll_queue* q, void* arg); static void pn_unpacker_cb(struct ll_queue* q, void* arg); static void pn_send_to_etcp(struct PKTNORM* pn); // Initialization struct PKTNORM* pn_init(struct ETCP_CONN* etcp) { if (!etcp) return NULL; struct PKTNORM* pn = calloc(1, sizeof(struct PKTNORM)); if (!pn) return NULL; pn->etcp = etcp; pn->ua = etcp->instance->ua; pn->frag_size = etcp->mtu - 100; // Use MTU as fixed packet size (adjust if headers need subtraction) pn->tx_wait_time = 10; pn->input = queue_new(pn->ua, 0); // No hash needed pn->output = queue_new(pn->ua, 0); // No hash needed if (!pn->input || !pn->output) { pn_pair_deinit(pn); return NULL; } queue_set_callback(pn->input, packer_cb, pn); queue_set_callback(etcp->output_queue, pn_unpacker_cb, pn); pn->data = NULL; pn->recvpart = NULL; pn->flush_timer = NULL; return pn; } // Deinitialization void pn_pair_deinit(struct PKTNORM* pn) { if (!pn) return; // Drain and free queues if (pn->input) { struct ll_entry* entry; while ((entry = queue_data_get(pn->input)) != NULL) { if (entry->dgram) { free(entry->dgram); } queue_entry_free(entry); } queue_free(pn->input); } if (pn->output) { struct ll_entry* entry; while ((entry = queue_data_get(pn->output)) != NULL) { if (entry->dgram) { free(entry->dgram); } queue_entry_free(entry); } queue_free(pn->output); } if (pn->flush_timer) { uasync_cancel_timeout(pn->ua, pn->flush_timer); } if (pn->data) { memory_pool_free(pn->etcp->instance->data_pool, pn->data); } if (pn->recvpart) { queue_dgram_free(pn->recvpart); queue_entry_free(pn->recvpart); } free(pn); } // Reset unpacker state void pn_unpacker_reset_state(struct PKTNORM* pn) { if (!pn) return; if (pn->recvpart) { queue_dgram_free(pn->recvpart); queue_entry_free(pn->recvpart); pn->recvpart = NULL; } } // Send data to packer (copies and adds to input queue or pending, triggering callback) void pn_packer_send(struct PKTNORM* pn, uint8_t* data, uint16_t len) { if (!pn || !data || len == 0) return; DEBUG_INFO(DEBUG_CATEGORY_ETCP, "pn_packer_send: pn=%p, len=%d", pn, len); struct ll_entry* entry = ll_alloc_lldgram(len); if (!entry) return; memcpy(entry->dgram, data, len); entry->len = len; entry->dgram_pool = NULL; int ret = queue_data_put(pn->input, entry, 0); DEBUG_INFO(DEBUG_CATEGORY_ETCP, "pn_packer_send: queue_data_put returned %d, input count=%d", ret, queue_entry_count(pn->input)); // Cancel flush timer if active if (pn->flush_timer) { uasync_cancel_timeout(pn->ua, pn->flush_timer); pn->flush_timer = NULL; } } // Internal: Packer callback static void packer_cb(struct ll_queue* q, void* arg) { struct PKTNORM* pn = (struct PKTNORM*)arg; if (!pn) return; DEBUG_INFO(DEBUG_CATEGORY_ETCP, "pn_packer: packer_cb"); queue_wait_threshold(pn->etcp->input_queue, 0, 0, etcp_input_ready_cb, pn); } // Helper to send block to ETCP as ETCP_FRAGMENT static void pn_send_to_etcp(struct PKTNORM* pn) { if (!pn || !pn->data || pn->data_ptr == 0) return; DEBUG_INFO(DEBUG_CATEGORY_ETCP, "pn_packer: pn_send_to_etcp"); // Allocate ETCP_FRAGMENT from io_pool struct ETCP_FRAGMENT* frag = (struct ETCP_FRAGMENT*)queue_entry_new_from_pool(pn->etcp->io_pool); if (!frag) {// drop data DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "pn_packer: send to etcp alloc error"); pn->alloc_errors++; pn->data_ptr = 0; return; } frag->seq = 0; frag->timestamp = 0; frag->ll.dgram = pn->data; frag->ll.len = pn->data_ptr; frag->ll.memlen = pn->etcp->instance->data_pool->object_size; queue_data_put(pn->etcp->input_queue, (struct ll_entry*)frag, 0); // Сбросить структуру (dgram передан во фрагмент, не освобождаем) pn->data = NULL; } // Internal: Renew sndpart buffer static void pn_buf_renew(struct PKTNORM* pn) { if (pn->data) { int remain = pn->data_size - pn->data_ptr; if (remain < 3) pn_send_to_etcp(pn); } if (!pn->data) { pn->data = memory_pool_alloc(pn->etcp->instance->data_pool); int size=pn->etcp->instance->data_pool->object_size; if (size>pn->frag_size) size=pn->frag_size; pn->data_size = size; pn->data_ptr=0; } } // Internal: Process input when etcp->input_queue is ready (empty) static void etcp_input_ready_cb(struct ll_queue* q, void* arg) { struct PKTNORM* pn = (struct PKTNORM*)arg; if (!pn) return; DEBUG_INFO(DEBUG_CATEGORY_ETCP, "pn_packer: etcp_input_ready_cb"); struct ll_entry* in_dgram = queue_data_get(pn->input); if (!in_dgram) { queue_resume_callback(pn->input); return; } uint16_t in_ptr = 0;// while (in_ptr < in_dgram->len) { pn_buf_renew(pn); if (!pn->data) break; // Allocation failed int remain = pn->data_size - pn->data_ptr; if (remain<3) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "pn_packer: fatal logic error"); pn->logic_errors++; break; } if (in_ptr == 0) { pn->data[pn->data_ptr++] = in_dgram->len & 0xFF; pn->data[pn->data_ptr++] = (in_dgram->len >> 8) & 0xFF; remain -= 2; } memcpy(pn->data + pn->data_ptr, in_dgram->dgram + in_ptr, remain); pn->data_ptr += remain; in_ptr += remain; } queue_dgram_free(in_dgram); queue_entry_free(in_dgram); // Cancel flush timer if active if (pn->flush_timer) { uasync_cancel_timeout(pn->ua, pn->flush_timer); pn->flush_timer = NULL; } // Set flush timer if no more input if (queue_entry_count(pn->input) == 0) { pn->flush_timer = uasync_set_timeout(pn->ua, pn->tx_wait_time, pn, pn_flush_cb); } queue_resume_callback(pn->input); } // Internal: Flush callback on timeout static void pn_flush_cb(void* arg) { struct PKTNORM* pn = (struct PKTNORM*)arg; if (!pn) return; pn->flush_timer = NULL; pn_send_to_etcp(pn); } // Internal: Unpacker callback (assembles fragments into original packets) static void pn_unpacker_cb(struct ll_queue* q, void* arg) { struct PKTNORM* pn = (struct PKTNORM*)arg; if (!pn) return; while (1) { void* data = queue_data_get(pn->etcp->output_queue); if (!data) break; struct ETCP_FRAGMENT* frag = (struct ETCP_FRAGMENT*)data; // Since data is ll_entry* uint8_t* payload = frag->ll.dgram; uint16_t len = frag->ll.len; uint16_t ptr = 0; while (ptr < len) { if (!pn->recvpart) { // Need length header for new packet if (len - ptr < 2) { // Incomplete header, reset pn_unpacker_reset_state(pn); break; } uint16_t pkt_len = payload[ptr] | (payload[ptr + 1] << 8); ptr += 2; pn->recvpart = ll_alloc_lldgram(pkt_len); if (!pn->recvpart) { break; } pn->recvpart->len = 0; } else { // We are in the middle of assembling a packet // Skip the 2-byte length header at the start of subsequent fragments if (ptr == 0) { ptr += 2; if (ptr >= len) break; // Fragment contains only the length header } } uint16_t rem = pn->recvpart->memlen - pn->recvpart->len; uint16_t avail = len - ptr; uint16_t cp = (rem < avail) ? rem : avail; memcpy(pn->recvpart->dgram + pn->recvpart->len, payload + ptr, cp); pn->recvpart->len += cp; ptr += cp; if (pn->recvpart->len == pn->recvpart->memlen) { queue_data_put(pn->output, pn->recvpart, 0); pn->recvpart = NULL; } } // Free the fragment - dgram was malloc'd in pn_send_to_etcp memory_pool_free(pn->etcp->instance->data_pool, frag->ll.dgram); memory_pool_free(pn->etcp->io_pool, frag); } queue_resume_callback(q); }