diff --git a/lib/debug_config.h b/lib/debug_config.h index 209ef11..b342cc3 100644 --- a/lib/debug_config.h +++ b/lib/debug_config.h @@ -41,7 +41,7 @@ typedef uint64_t debug_category_t; #define DEBUG_CATEGORY_TUN ((debug_category_t)1 << 8) // TUN interface #define DEBUG_CATEGORY_ROUTING ((debug_category_t)1 << 9) // routing table #define DEBUG_CATEGORY_TIMERS ((debug_category_t)1 << 10) // timer management -#define DEBUG_CATEGORY_ALL (~((debug_category_t)0)) +#define DEBUG_CATEGORY_ALL ((debug_category_t)(~((uint64_t)0))) /* Debug configuration structure */ typedef struct { diff --git a/lib/ll_queue.c b/lib/ll_queue.c index c8d5da8..ea6b74a 100755 --- a/lib/ll_queue.c +++ b/lib/ll_queue.c @@ -13,14 +13,6 @@ static void check_waiters(struct ll_queue* q); static void add_to_hash(struct ll_queue* q, struct ll_entry* entry); static void remove_from_hash(struct ll_queue* q, struct ll_entry* entry); -#define xxx 0 - -// Вспомогательная функция для преобразования данных в запись -static inline struct ll_entry* data_to_entry(void* data) { - if (!data) return NULL; - return ((struct ll_entry*)data) - xxx; -} - // ==================== Управление очередью ==================== struct ll_queue* queue_new(struct UASYNC* ua, size_t hash_size) { @@ -48,7 +40,7 @@ struct ll_queue* queue_new(struct UASYNC* ua, size_t hash_size) { } struct ll_entry* ll_alloc_lldgram(uint16_t len) { - struct ll_entry* entry = queue_data_new(0); + struct ll_entry* entry = queue_entry_new(0); if (!entry) return NULL; entry->len=0; @@ -123,7 +115,7 @@ void queue_set_size_limit(struct ll_queue* q, int lim) { // ==================== Управление элементами ==================== -void* queue_data_new(size_t data_size) { +struct ll_entry* queue_entry_new(size_t data_size) { struct ll_entry* entry = malloc(sizeof(struct ll_entry) + data_size); if (!entry) return NULL; @@ -132,12 +124,12 @@ void* queue_data_new(size_t data_size) { entry->len = 0; entry->pool = NULL; // Выделено через malloc -// DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_data_new: created entry %p, size=%zu", entry, data_size); +// DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_new: created entry %p, size=%zu", entry, data_size); - return (void*)(entry + xxx); + return entry; } -void* queue_entry_new_from_pool(struct memory_pool* pool) { +struct ll_entry* queue_entry_new_from_pool(struct memory_pool* pool) { if (!pool) return NULL; struct ll_entry* entry = memory_pool_alloc(pool); @@ -150,7 +142,7 @@ void* queue_entry_new_from_pool(struct memory_pool* pool) { // DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_new_from_pool: created entry %p from pool %p", entry, pool); - return (void*)(entry + xxx); + return entry; } //void ll_free_dgram(struct ll_entry* entry) { @@ -228,15 +220,14 @@ static void check_waiters(struct ll_queue* q) { } } -int queue_data_put(struct ll_queue* q, void* data, uint32_t id) { - if (!q || !data) return -1; +int queue_data_put(struct ll_queue* q, struct ll_entry* entry, uint32_t id) { + if (!q || !entry) return -1; - struct ll_entry* entry = data_to_entry(data); entry->id = id; // Проверить лимит размера if (q->size_limit >= 0 && q->count >= q->size_limit) { - queue_entry_free(data); // Освободить элемент если превышен лимит + queue_entry_free(entry); // Освободить элемент если превышен лимит return -1; } @@ -271,15 +262,14 @@ int queue_data_put(struct ll_queue* q, void* data, uint32_t id) { return 0; } -int queue_data_put_first(struct ll_queue* q, void* data, uint32_t id) { - if (!q || !data) return -1; +int queue_data_put_first(struct ll_queue* q, struct ll_entry* entry, uint32_t id) { + if (!q || !entry) return -1; - struct ll_entry* entry = data_to_entry(data); entry->id = id; // Проверить лимит размера if (q->size_limit >= 0 && q->count >= q->size_limit) { - queue_entry_free(data); // Освободить элемент если превышен лимит + queue_entry_free(entry); // Освободить элемент если превышен лимит return -1; } @@ -311,7 +301,7 @@ int queue_data_put_first(struct ll_queue* q, void* data, uint32_t id) { return 0; } -void* queue_data_get(struct ll_queue* q) { +struct ll_entry* queue_data_get(struct ll_queue* q) { if (!q || !q->head) return NULL; struct ll_entry* entry = q->head; @@ -336,7 +326,7 @@ void* queue_data_get(struct ll_queue* q) { // Проверить ожидающие коллбэки check_waiters(q); - return (void*)(entry + xxx); + return entry; } int queue_entry_count(struct ll_queue* q) { @@ -375,7 +365,7 @@ void queue_cancel_wait(struct ll_queue* q, struct queue_waiter* waiter) { // ==================== Поиск и удаление по ID ==================== -void* queue_find_data_by_id(struct ll_queue* q, uint32_t id) { +struct ll_entry* queue_find_data_by_id(struct ll_queue* q, uint32_t id) { if (!q || q->hash_size == 0 || !q->hash_table) return NULL; size_t slot = id % q->hash_size; @@ -383,7 +373,7 @@ void* queue_find_data_by_id(struct ll_queue* q, uint32_t id) { while (entry) { if (entry->id == id) { - return (void*)(entry + xxx); + return entry; } entry = entry->hash_next; } @@ -391,10 +381,8 @@ void* queue_find_data_by_id(struct ll_queue* q, uint32_t id) { return NULL; } -int queue_remove_data(struct ll_queue* q, void* data) { - if (!q || !data) return -1; - - struct ll_entry* entry = data_to_entry(data); +int queue_remove_data(struct ll_queue* q, struct ll_entry* entry) { + if (!q || !entry) return -1; // Удалить из двусвязного списка if (entry->prev) { diff --git a/lib/ll_queue.h b/lib/ll_queue.h index 257e574..02bf14d 100644 --- a/lib/ll_queue.h +++ b/lib/ll_queue.h @@ -85,7 +85,7 @@ void queue_free(struct ll_queue* q); struct ll_entry* ll_alloc_lldgram(uint16_t len); // Функция для освобождения только dgram в ll_entry -// Принимает void* data (как возвращается из queue_data_new или queue_data_get) +// Принимает void* data (как возвращается из queue_entry_new или queue_data_get) // Освобождает dgram с использованием dgram_pool (если указан) или free (если нет) // Если есть dgram_free_fn, использует её; иначе - pool или free // Устанавливает dgram в NULL после освобождения @@ -114,11 +114,11 @@ void queue_set_size_limit(struct ll_queue* q, int lim); // Создать новый элемент с областью данных указанного размера // Память выделяется одним блоком: [struct ll_entry][область данных data_size байт] // Возвращает: указатель на структуру элемента (struct ll_entry*) или NULL при ошибке выделения памяти -void* queue_data_new(size_t data_size); +struct ll_entry* queue_entry_new(size_t data_size); // Создать новый элемент из пула (размер был определен при создании пула) // Возвращает: указатель на структуру элемента (struct ll_entry*) или NULL при ошибке выделения памяти -void* queue_entry_new_from_pool(struct memory_pool* pool); +struct ll_entry* queue_entry_new_from_pool(struct memory_pool* pool); // Освободить только entry (не влияет на очереди, dgram не освобождает) void queue_entry_free(struct ll_entry* entry); @@ -132,18 +132,18 @@ void queue_dgram_free(struct ll_entry* entry); // Добавить элемент в конец очереди (FIFO) // Если очередь была пустой и коллбэки разрешены - вызывает коллбэк // Возвращает: 0 при успехе, -1 если превышен лимит размера (элемент освобожден) -int queue_data_put(struct ll_queue* q, void* data, uint32_t id); +int queue_data_put(struct ll_queue* q, struct ll_entry* entry, uint32_t id); // Добавить элемент в начало очереди (LIFO, высокий приоритет) // Если очередь была пустой и коллбэки разрешены - вызывает коллбэк // Возвращает: 0 при успехе, -1 если превышен лимит размера (элемент освобожден) -int queue_data_put_first(struct ll_queue* q, void* data, uint32_t id); +int queue_data_put_first(struct ll_queue* q, struct ll_entry* entry, uint32_t id); // Извлечь элемент из начала очереди // При извлечении приостанавливает коллбэки (callback_suspended = 1) чтобы предотвратить рекурсию // Возвращает: указатель на структуру элемента (struct ll_entry*) или NULL если очередь пуста // ПРИМЕЧАНИЕ: не освобождает память элемента -void* queue_data_get(struct ll_queue* q); +struct ll_entry* queue_data_get(struct ll_queue* q); // Получить текущее количество элементов в очереди int queue_entry_count(struct ll_queue* q); @@ -172,11 +172,11 @@ static inline size_t queue_total_bytes(struct ll_queue* q) { // Найти элемент по ID // Возвращает: указатель на структуру элемента (struct ll_entry*) или NULL если не найден -void* queue_find_data_by_id(struct ll_queue* q, uint32_t id); +struct ll_entry* queue_find_data_by_id(struct ll_queue* q, uint32_t id); // Удалить элемент из очереди по указателю на структуру элемента // Возвращает: 0 при успехе, -1 если элемент не найден // ПРИМЕЧАНИЕ: НЕ изменяет ref_count элемента, просто удаляет из очереди -int queue_remove_data(struct ll_queue* q, void* data); +int queue_remove_data(struct ll_queue* q, struct ll_entry* entry); #endif // LL_QUEUE_H diff --git a/src/etcp.c b/src/etcp.c index 60aba3c..56bbad7 100644 --- a/src/etcp.c +++ b/src/etcp.c @@ -121,10 +121,10 @@ struct ETCP_CONN* etcp_connection_create(struct UTUN_INSTANCE* instance) { void etcp_connection_close(struct ETCP_CONN* etcp) { if (!etcp) return; - // Drain and free input_queue (contains ETCP_FRAGMENT with pkt_data from data_pool) - if (etcp->input_queue) { - struct ETCP_FRAGMENT* pkt; - while ((pkt = queue_data_get(etcp->input_queue)) != NULL) { + // Drain and free input_queue (contains ETCP_FRAGMENT with pkt_data from data_pool) + if (etcp->input_queue) { + struct ETCP_FRAGMENT* pkt; + while ((pkt = (struct ETCP_FRAGMENT*)queue_data_get(etcp->input_queue)) != NULL) { if (pkt->ll.dgram) { memory_pool_free(etcp->instance->data_pool, pkt->ll.dgram); } @@ -134,10 +134,10 @@ void etcp_connection_close(struct ETCP_CONN* etcp) { etcp->input_queue = NULL; } - // Drain and free output_queue (contains ETCP_FRAGMENT with pkt_data from data_pool) - if (etcp->output_queue) { - struct ETCP_FRAGMENT* pkt; - while ((pkt = queue_data_get(etcp->output_queue)) != NULL) { + // Drain and free output_queue (contains ETCP_FRAGMENT with pkt_data from data_pool) + if (etcp->output_queue) { + struct ETCP_FRAGMENT* pkt; + while ((pkt = (struct ETCP_FRAGMENT*)queue_data_get(etcp->output_queue)) != NULL) { if (pkt->ll.dgram) { memory_pool_free(etcp->instance->data_pool, pkt->ll.dgram); } @@ -147,10 +147,10 @@ void etcp_connection_close(struct ETCP_CONN* etcp) { etcp->output_queue = NULL; } - // Drain and free input_send_q (contains INFLIGHT_PACKET with pkt_data from data_pool) - if (etcp->input_send_q) { - struct INFLIGHT_PACKET* pkt; - while ((pkt = queue_data_get(etcp->input_send_q)) != NULL) { + // Drain and free input_send_q (contains INFLIGHT_PACKET with pkt_data from data_pool) + if (etcp->input_send_q) { + struct INFLIGHT_PACKET* pkt; + while ((pkt = (struct INFLIGHT_PACKET*)queue_data_get(etcp->input_send_q)) != NULL) { if (pkt->ll.dgram) { memory_pool_free(etcp->instance->data_pool, pkt->ll.dgram); } @@ -160,10 +160,10 @@ void etcp_connection_close(struct ETCP_CONN* etcp) { etcp->input_send_q = NULL; } - // Drain and free input_wait_ack (contains INFLIGHT_PACKET with pkt_data from data_pool) - if (etcp->input_wait_ack) { - struct INFLIGHT_PACKET* pkt; - while ((pkt = queue_data_get(etcp->input_wait_ack)) != NULL) { + // Drain and free input_wait_ack (contains INFLIGHT_PACKET with pkt_data from data_pool) + if (etcp->input_wait_ack) { + struct INFLIGHT_PACKET* pkt; + while ((pkt = (struct INFLIGHT_PACKET*)queue_data_get(etcp->input_wait_ack)) != NULL) { if (pkt->ll.dgram) { memory_pool_free(etcp->instance->data_pool, pkt->ll.dgram); } @@ -173,20 +173,20 @@ void etcp_connection_close(struct ETCP_CONN* etcp) { etcp->input_wait_ack = NULL; } - // Drain and free ack_q (contains ACK_PACKET from ack_pool) - if (etcp->ack_q) { - struct ACK_PACKET* pkt; - while ((pkt = queue_data_get(etcp->ack_q)) != NULL) { - queue_entry_free(pkt); - } + // Drain and free ack_q (contains ACK_PACKET from ack_pool) + if (etcp->ack_q) { + struct ACK_PACKET* pkt; + while ((pkt = (struct ACK_PACKET*)queue_data_get(etcp->ack_q)) != NULL) { + queue_entry_free((struct ll_entry*)pkt); + } queue_free(etcp->ack_q); etcp->ack_q = NULL; } - // Drain and free recv_q (contains ETCP_FRAGMENT with pkt_data from data_pool) - if (etcp->recv_q) { - struct ETCP_FRAGMENT* pkt; - while ((pkt = queue_data_get(etcp->recv_q)) != NULL) { + // Drain and free recv_q (contains ETCP_FRAGMENT with pkt_data from data_pool) + if (etcp->recv_q) { + struct ETCP_FRAGMENT* pkt; + while ((pkt = (struct ETCP_FRAGMENT*)queue_data_get(etcp->recv_q)) != NULL) { if (pkt->ll.dgram) { memory_pool_free(etcp->instance->data_pool, pkt->ll.dgram); } @@ -272,7 +272,7 @@ int etcp_send(struct ETCP_CONN* etcp, const void* data, uint16_t len) { // Create queue entry - this allocates ll_entry + data pointer - struct ETCP_FRAGMENT* pkt = queue_entry_new_from_pool(etcp->rx_pool); + struct ETCP_FRAGMENT* pkt = (struct ETCP_FRAGMENT*)queue_entry_new_from_pool(etcp->rx_pool); if (!pkt) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_send: failed to allocate queue entry"); memory_pool_free(etcp->instance->data_pool, packet_data); @@ -286,8 +286,8 @@ int etcp_send(struct ETCP_CONN* etcp, const void* data, uint16_t len) { DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_send: created PACKET %p with data %p (len=%zu)", pkt, packet_data, len); - // Add to input queue - input_queue_cb will process it - if (queue_data_put(etcp->input_queue, pkt, 0) != 0) { + // Add to input queue - input_queue_cb will process it + if (queue_data_put(etcp->input_queue, (struct ll_entry*)pkt, 0) != 0) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_send: failed to add to input queue"); memory_pool_free(etcp->instance->data_pool, packet_data); memory_pool_free(etcp->rx_pool, pkt); @@ -317,8 +317,8 @@ static void input_queue_try_resume(struct ETCP_CONN* etcp) { // input_queue -> input_send_q static void input_queue_cb(struct ll_queue* q, void* arg) { - struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg; - struct ETCP_FRAGMENT* in_pkt = queue_data_get(q); + struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg; + struct ETCP_FRAGMENT* in_pkt = (struct ETCP_FRAGMENT*)queue_data_get(q); if (!in_pkt) { DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "input_queue_cb: cannot get element (pool=%p etcp=%p)", etcp->inflight_pool, etcp); @@ -333,10 +333,10 @@ static void input_queue_cb(struct ll_queue* q, void* arg) { // Create INFLIGHT_PACKET struct INFLIGHT_PACKET* p = memory_pool_alloc(etcp->inflight_pool); if (!p) { - DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "input_queue_cb: cannot allocate INFLIGHT_PACKET (pool=%p etcp=%p)", etcp->inflight_pool, etcp); - queue_entry_free(in_pkt); // Free the ETCP_FRAGMENT - queue_resume_callback(q); - return; + DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "input_queue_cb: cannot allocate INFLIGHT_PACKET (pool=%p etcp=%p)", etcp->inflight_pool, etcp); + queue_entry_free((struct ll_entry*)in_pkt); // Free the ETCP_FRAGMENT + queue_resume_callback(q); + return; } // Setup inflight packet (based on protocol.txt) @@ -347,13 +347,13 @@ static void input_queue_cb(struct ll_queue* q, void* arg) { p->ll.dgram = in_pkt->ll.dgram; p->ll.len = in_pkt->ll.len; - // Add to send queue - if (queue_data_put(etcp->input_send_q, p, p->seq) != 0) { - memory_pool_free(etcp->inflight_pool, p); - queue_entry_free(in_pkt); - DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "input_queue_cb: EXIT (queue put failed)"); - return; - } + // Add to send queue + if (queue_data_put(etcp->input_send_q, (struct ll_entry*)p, p->seq) != 0) { + memory_pool_free(etcp->inflight_pool, p); + queue_entry_free((struct ll_entry*)in_pkt); + DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "input_queue_cb: EXIT (queue put failed)"); + return; + } DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "input_queue_cb: successfully moved from input_queue to input_send_q"); input_queue_try_resume(etcp); @@ -396,12 +396,12 @@ static void ack_timeout_check(void* arg) { pkt->last_timestamp = now; pkt->last_link = NULL; // Reset last link for re-selection - // Remove from wait_ack - queue_remove_data(etcp->input_wait_ack, pkt); - - // Change state and add to send_q for retransmission - pkt->state = INFLIGHT_STATE_WAIT_SEND; - queue_data_put(etcp->input_send_q, pkt, pkt->seq); + // Remove from wait_ack + queue_remove_data(etcp->input_wait_ack, (struct ll_entry*)pkt); + + // Change state and add to send_q for retransmission + pkt->state = INFLIGHT_STATE_WAIT_SEND; + queue_data_put(etcp->input_send_q, (struct ll_entry*)pkt, pkt->seq); // Update stats etcp->retransmissions_count++; @@ -441,17 +441,17 @@ struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) { // return NULL; } - // First, check if there's a packet in input_send_q (retrans or new) -// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: getting packet from input_send_q"); - struct INFLIGHT_PACKET* inf_pkt = queue_data_get(etcp->input_send_q); - if (inf_pkt) { - DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: prepare udp dgram for send packet %p (seq=%u, len=%u)", inf_pkt, inf_pkt->seq, inf_pkt->ll.len); - - inf_pkt->last_timestamp=get_current_time_units(); - inf_pkt->send_count++; - inf_pkt->state=INFLIGHT_STATE_WAIT_ACK; - queue_data_put(etcp->input_wait_ack, inf_pkt, inf_pkt->seq);// move dgram to wait_ack queue - } + // First, check if there's a packet in input_send_q (retrans or new) +// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: getting packet from input_send_q"); + struct INFLIGHT_PACKET* inf_pkt = (struct INFLIGHT_PACKET*)queue_data_get(etcp->input_send_q); + if (inf_pkt) { + DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: prepare udp dgram for send packet %p (seq=%u, len=%u)", inf_pkt, inf_pkt->seq, inf_pkt->ll.len); + + inf_pkt->last_timestamp=get_current_time_units(); + inf_pkt->send_count++; + inf_pkt->state=INFLIGHT_STATE_WAIT_ACK; + queue_data_put(etcp->input_wait_ack, (struct ll_entry*)inf_pkt, inf_pkt->seq);// move dgram to wait_ack queue + } size_t ack_q_size = queue_entry_count(etcp->ack_q); @@ -480,24 +480,24 @@ struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) { dgram->data[ptr++]=etcp->last_delivered_id>>24; // тут (потом) добавим опциональные заголовки - struct ACK_PACKET* ack_pkt; - while (ack_pkt = queue_data_get(etcp->ack_q)) { - // seq 4 байта - dgram->data[ptr++]=ack_pkt->seq; - dgram->data[ptr++]=ack_pkt->seq>>8; - dgram->data[ptr++]=ack_pkt->seq>>16; - dgram->data[ptr++]=ack_pkt->seq>>24; - - // ts приема 2 байта - dgram->data[ptr++]=ack_pkt->recv_timestamp; - dgram->data[ptr++]=ack_pkt->recv_timestamp>>8; - - // время задержки 2 байта между recv и ack - uint16_t dly=get_current_timestamp()-ack_pkt->recv_timestamp; - dgram->data[ptr++]=dly; - dgram->data[ptr++]=dly>>8; - DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: add ACK N%d dTS=%d", ack_pkt->seq, dly); - queue_entry_free(ack_pkt); + struct ACK_PACKET* ack_pkt; + while ((ack_pkt = (struct ACK_PACKET*)queue_data_get(etcp->ack_q))) { + // seq 4 байта + dgram->data[ptr++]=ack_pkt->seq; + dgram->data[ptr++]=ack_pkt->seq>>8; + dgram->data[ptr++]=ack_pkt->seq>>16; + dgram->data[ptr++]=ack_pkt->seq>>24; + + // ts приема 2 байта + dgram->data[ptr++]=ack_pkt->recv_timestamp; + dgram->data[ptr++]=ack_pkt->recv_timestamp>>8; + + // время задержки 2 байта между recv и ack + uint16_t dly=get_current_timestamp()-ack_pkt->recv_timestamp; + dgram->data[ptr++]=dly; + dgram->data[ptr++]=dly>>8; + DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: add ACK N%d dTS=%d", ack_pkt->seq, dly); + queue_entry_free((struct ll_entry*)ack_pkt); if (inf_pkt && inf_pkt->ll.len+ptr>=etcp->mtu-10) break;// pkt len (надо просчитать точнее включая все заголовки) if (ptr>500) break; @@ -562,35 +562,35 @@ void etcp_output_try_assembly(struct ETCP_CONN* etcp) { int delivered_count = 0; uint32_t delivered_bytes = 0; - // Look for contiguous packets starting from next_expected_id - while (1) { - struct ETCP_FRAGMENT* rx_pkt = queue_find_data_by_id(etcp->recv_q, next_expected_id); - if (!rx_pkt) { - // No more contiguous packets found - DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_output_try_assembly: no packet found for id=%u, stopping", next_expected_id); - break; - } - - DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_output_try_assembly: assembling packet id=%u (len=%u)", - rx_pkt->seq, rx_pkt->ll.len); - - // Simply move ETCP_FRAGMENT from recv_q to output_queue - no data copying needed - // Remove from recv_q first - queue_remove_data(etcp->recv_q, rx_pkt); - - // Add to output_queue using the same ETCP_FRAGMENT structure - if (queue_data_put(etcp->output_queue, rx_pkt, next_expected_id) == 0) { + // Look for contiguous packets starting from next_expected_id + while (1) { + struct ETCP_FRAGMENT* rx_pkt = (struct ETCP_FRAGMENT*)queue_find_data_by_id(etcp->recv_q, next_expected_id); + if (!rx_pkt) { + // No more contiguous packets found + DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_output_try_assembly: no packet found for id=%u, stopping", next_expected_id); + break; + } + + DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_output_try_assembly: assembling packet id=%u (len=%u)", + rx_pkt->seq, rx_pkt->ll.len); + + // Simply move ETCP_FRAGMENT from recv_q to output_queue - no data copying needed + // Remove from recv_q first + queue_remove_data(etcp->recv_q, (struct ll_entry*)rx_pkt); + + // Add to output_queue using the same ETCP_FRAGMENT structure + if (queue_data_put(etcp->output_queue, (struct ll_entry*)rx_pkt, next_expected_id) == 0) { delivered_bytes += rx_pkt->ll.len; delivered_count++; DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "etcp_output_try_assembly: moved packet id=%u to output_queue", next_expected_id); } else { - DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_output_try_assembly: failed to add packet id=%u to output_queue", - next_expected_id); - // Put it back in recv_q if we can't add to output_queue - queue_data_put(etcp->recv_q, rx_pkt, next_expected_id); - break; - } + DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_output_try_assembly: failed to add packet id=%u to output_queue", + next_expected_id); + // Put it back in recv_q if we can't add to output_queue + queue_data_put(etcp->recv_q, (struct ll_entry*)rx_pkt, next_expected_id); + break; + } // Update state for next iteration etcp->last_delivered_id = next_expected_id; @@ -607,12 +607,12 @@ void etcp_ack_recv(struct ETCP_CONN* etcp, uint32_t seq, uint16_t ts, uint16_t d DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_ack_recv: processing ACK for seq=%u, ts=%u, dts=%u", seq, ts, dts); - // Find the acknowledged packet in the wait_ack queue - struct INFLIGHT_PACKET* acked_pkt = queue_find_data_by_id(etcp->input_wait_ack, seq); - if (acked_pkt) queue_remove_data(etcp->input_wait_ack, acked_pkt); - else { acked_pkt = queue_find_data_by_id(etcp->input_send_q, seq); - queue_remove_data(etcp->input_send_q, acked_pkt); - } + // Find the acknowledged packet in the wait_ack queue + struct INFLIGHT_PACKET* acked_pkt = (struct INFLIGHT_PACKET*)queue_find_data_by_id(etcp->input_wait_ack, seq); + if (acked_pkt) queue_remove_data(etcp->input_wait_ack, (struct ll_entry*)acked_pkt); + else { acked_pkt = (struct INFLIGHT_PACKET*)queue_find_data_by_id(etcp->input_send_q, seq); + queue_remove_data(etcp->input_send_q, (struct ll_entry*)acked_pkt); + } if (!acked_pkt) { // Packet might be already acknowledged or not found @@ -705,30 +705,30 @@ void etcp_conn_input(struct ETCP_DGRAM* pkt) { case ETCP_SECTION_PAYLOAD: { if (len>=5) { - // формируем ACK - struct ACK_PACKET* p = queue_entry_new_from_pool(etcp->instance->ack_pool); - uint32_t seq=data[1] | (data[2]<<8) | (data[3]<<16) | (data[4]<<24); - p->seq=seq; - p->pkt_timestamp=pkt->timestamp; - p->recv_timestamp=get_current_timestamp(); - queue_data_put(etcp->ack_q, p, p->seq); + // формируем ACK + struct ACK_PACKET* p = (struct ACK_PACKET*)queue_entry_new_from_pool(etcp->instance->ack_pool); + uint32_t seq=data[1] | (data[2]<<8) | (data[3]<<16) | (data[4]<<24); + p->seq=seq; + p->pkt_timestamp=pkt->timestamp; + p->recv_timestamp=get_current_timestamp(); + queue_data_put(etcp->ack_q, (struct ll_entry*)p, p->seq); if (etcp->ack_resp_timer == NULL) { etcp->ack_resp_timer = uasync_set_timeout(etcp->instance->ua, ACK_DELAY_TB, etcp, ack_response_timer_cb); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_conn_input: set ack_timer for delayed ACK send"); } - if ((int32_t)(etcp->last_delivered_id-seq)<0) if (queue_find_data_by_id(etcp->recv_q, seq)==NULL) {// проверяем есть ли пакет с этим seq - uint32_t pkt_len=len-5; - DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_conn_input: adding packet seq=%u to recv_q (last_delivered_id=%u)", seq, etcp->last_delivered_id); - // отправляем пакет в очередь на сборку - uint8_t* payload_data = memory_pool_alloc(etcp->instance->data_pool); - struct ETCP_FRAGMENT* rx_pkt = queue_entry_new_from_pool(etcp->rx_pool); - rx_pkt->seq=seq; - rx_pkt->timestamp=pkt->timestamp; - rx_pkt->ll.dgram=payload_data; - rx_pkt->ll.len=pkt_len; - // Copy the actual payload data - memcpy(payload_data, data + 5, pkt_len); - queue_data_put(etcp->recv_q, rx_pkt, seq); + if ((int32_t)(etcp->last_delivered_id-seq)<0) if (queue_find_data_by_id(etcp->recv_q, seq)==NULL) {// проверяем есть ли пакет с этим seq + uint32_t pkt_len=len-5; + DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_conn_input: adding packet seq=%u to recv_q (last_delivered_id=%u)", seq, etcp->last_delivered_id); + // отправляем пакет в очередь на сборку + uint8_t* payload_data = memory_pool_alloc(etcp->instance->data_pool); + struct ETCP_FRAGMENT* rx_pkt = (struct ETCP_FRAGMENT*)queue_entry_new_from_pool(etcp->rx_pool); + rx_pkt->seq=seq; + rx_pkt->timestamp=pkt->timestamp; + rx_pkt->ll.dgram=payload_data; + rx_pkt->ll.len=pkt_len; + // Copy the actual payload data + memcpy(payload_data, data + 5, pkt_len); + queue_data_put(etcp->recv_q, (struct ll_entry*)rx_pkt, seq); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_conn_input: packet seq=%u added to recv_q, calling assembly (last_delivered_id=%u)", seq, etcp->last_delivered_id); if (etcp->last_delivered_id+1==seq) etcp_output_try_assembly(etcp);// пробуем собрать выходную очередь из фрагментов } diff --git a/src/pkt_normalizer.c b/src/pkt_normalizer.c index 9effe87..ae49540 100644 --- a/src/pkt_normalizer.c +++ b/src/pkt_normalizer.c @@ -85,7 +85,7 @@ void pn_pair_deinit(struct PKTNORM* pn) { } if (pn->sndpart.dgram) { - queue_entry_free(&pn->sndpart); + queue_dgram_free(&pn->sndpart); } if (pn->recvpart) { queue_dgram_free(pn->recvpart); @@ -112,6 +112,7 @@ void pn_packer_send(struct PKTNORM* pn, uint8_t* data, uint16_t len) { DEBUG_INFO(DEBUG_CATEGORY_ETCP, "pn_packer_send: pn=%p, len=%d", pn, len); struct ll_entry* entry = ll_alloc_lldgram(len); + if (!entry) return; memcpy(entry->dgram, data, len); entry->len = len; entry->dgram_pool = NULL; @@ -137,11 +138,11 @@ static void packer_cb(struct ll_queue* q, void* arg) { // Helper to send sndpart to ETCP as ETCP_FRAGMENT static void pn_send_to_etcp(struct PKTNORM* pn) { - if (!pn || !pn->sndpart.data || !pn->sndpart.len==0) return; + if (!pn || !pn->sndpart.dgram || pn->sndpart.len == 0) return; DEBUG_INFO(DEBUG_CATEGORY_ETCP, "pn_packer: pn_send_to_etcp"); // Allocate ETCP_FRAGMENT from rx_pool - struct ETCP_FRAGMENT* frag = queue_entry_new_from_pool(pn->etcp->rx_pool); + struct ETCP_FRAGMENT* frag = (struct ETCP_FRAGMENT*)queue_entry_new_from_pool(pn->etcp->rx_pool); if (!frag) {// drop data pn->sndpart.len = 0; return; @@ -153,9 +154,8 @@ static void pn_send_to_etcp(struct PKTNORM* pn) { frag->ll.len = pn->sndpart.len; frag->ll.memlen = pn->sndpart.memlen; - queue_data_put(pn->etcp->input_queue, frag, 0); - queue_entry_free(&pn->sndpart); - // Сбросить структуру после освобождения + queue_data_put(pn->etcp->input_queue, (struct ll_entry*)frag, 0); + // Сбросить структуру (dgram передан во фрагмент, не освобождаем) pn->sndpart.dgram = NULL; pn->sndpart.len = 0; pn->sndpart.memlen = 0; diff --git a/src/pkt_normalizer.c2 b/src/pkt_normalizer.c2 new file mode 100755 index 0000000..c93b7c6 --- /dev/null +++ b/src/pkt_normalizer.c2 @@ -0,0 +1,297 @@ +// pkt_normalizer.c - Implementation of packet normalizer for ETCP +#include "pkt_normalizer.h" +#include "etcp.h" // For ETCP_CONN and related structures +#include "ll_queue.h" // For queue operations +#include "u_async.h" // For UASYNC +#include +#include +#include // For debugging (can be removed if not needed) +#include "debug_config.h" // Assuming this for DEBUG_ERROR + +// Internal helper to convert void* data to struct ll_entry* +static inline struct ll_entry* data_to_entry(void* data) { + if (!data) return NULL; + return (struct ll_entry*)data; +} + +// Forward declarations +static void packer_cb(struct ll_queue* q, void* arg); +static void pn_flush_cb(void* arg); +static void etcp_input_ready_cb(struct ll_queue* q, void* arg); +static void pn_unpacker_cb(struct ll_queue* q, void* arg); +static void pn_send_to_etcp(struct PKTNORM* pn); + +// Initialization +struct PKTNORM* pn_init(struct ETCP_CONN* etcp) { + if (!etcp) return NULL; + + struct PKTNORM* pn = calloc(1, sizeof(struct PKTNORM)); + if (!pn) return NULL; + + pn->etcp = etcp; + pn->ua = etcp->instance->ua; + pn->frag_size = etcp->mtu - 100; // Use MTU as fixed packet size (adjust if headers need subtraction) + pn->tx_wait_time = 10; + + pn->input = queue_new(pn->ua, 0); // No hash needed + pn->output = queue_new(pn->ua, 0); // No hash needed + + if (!pn->input || !pn->output) { + pn_pair_deinit(pn); + return NULL; + } + + queue_set_callback(pn->input, packer_cb, pn); + queue_set_callback(etcp->output_queue, pn_unpacker_cb, pn); + + pn->sndpart.dgram = NULL; + pn->sndpart.len = 0; + pn->recvpart = NULL; + pn->flush_timer = NULL; + + return pn; +} + +// Deinitialization +void pn_pair_deinit(struct PKTNORM* pn) { + if (!pn) return; + + // Drain and free queues + if (pn->input) { + void* data; + while ((data = queue_data_get(pn->input)) != NULL) { + struct ll_entry* entry = data_to_entry(data); + if (entry->dgram) { + free(entry->dgram); + } + queue_entry_free(data); + } + queue_free(pn->input); + } + if (pn->output) { + void* data; + while ((data = queue_data_get(pn->output)) != NULL) { + struct ll_entry* entry = data_to_entry(data); + if (entry->dgram) { + free(entry->dgram); + } + queue_entry_free(data); + } + queue_free(pn->output); + } + + if (pn->flush_timer) { + uasync_cancel_timeout(pn->ua, pn->flush_timer); + } + + if (pn->sndpart.dgram) { + queue_entry_free(&pn->sndpart); + } + if (pn->recvpart) { + queue_dgram_free(pn->recvpart); + queue_entry_free(pn->recvpart); + } + + free(pn); +} + +// Reset unpacker state +void pn_unpacker_reset_state(struct PKTNORM* pn) { + if (!pn) return; + if (pn->recvpart) { + queue_dgram_free(pn->recvpart); + queue_entry_free(pn->recvpart); + pn->recvpart = NULL; + } +} + +// Send data to packer (copies and adds to input queue or pending, triggering callback) +void pn_packer_send(struct PKTNORM* pn, uint8_t* data, uint16_t len) { + if (!pn || !data || len == 0) return; + + DEBUG_INFO(DEBUG_CATEGORY_ETCP, "pn_packer_send: pn=%p, len=%d", pn, len); + + struct ll_entry* entry = ll_alloc_lldgram(len); + memcpy(entry->dgram, data, len); + entry->len = len; + entry->dgram_pool = NULL; + + int ret = queue_data_put(pn->input, entry, 0); + DEBUG_INFO(DEBUG_CATEGORY_ETCP, "pn_packer_send: queue_data_put returned %d, input count=%d", ret, queue_entry_count(pn->input)); + + // Cancel flush timer if active + if (pn->flush_timer) { + uasync_cancel_timeout(pn->ua, pn->flush_timer); + pn->flush_timer = NULL; + } +} + +// Internal: Packer callback +static void packer_cb(struct ll_queue* q, void* arg) { + struct PKTNORM* pn = (struct PKTNORM*)arg; + if (!pn) return; + + queue_wait_threshold(pn->etcp->input_queue, 0, 0, etcp_input_ready_cb, pn); +} + +// Helper to send sndpart to ETCP as ETCP_FRAGMENT +static void pn_send_to_etcp(struct PKTNORM* pn) { + if (!pn || !pn->sndpart.data || !pn->sndpart.len==0) return; + + // Allocate ETCP_FRAGMENT from rx_pool + struct ETCP_FRAGMENT* frag = queue_entry_new_from_pool(pn->etcp->rx_pool); + if (!frag) {// drop data + pn->sndpart.len = 0; + return; + } + + frag->seq = 0; + frag->timestamp = 0; + frag->ll.dgram = pn->sndpart.dgram; + frag->ll.len = pn->sndpart.len; + frag->ll.memlen = pn->sndpart.memlen; + + queue_data_put(pn->etcp->input_queue, frag, 0); + queue_entry_free(&pn->sndpart); + // Сбросить структуру после освобождения + pn->sndpart.dgram = NULL; + pn->sndpart.len = 0; + pn->sndpart.memlen = 0; +} + +// Internal: Renew sndpart buffer +static void pn_buf_renew(struct PKTNORM* pn) { + if (pn->sndpart.dgram) { + int remain = pn->frag_size - pn->sndpart.len; + if (remain < 3) pn_send_to_etcp(pn); + } + if (!pn->sndpart.dgram) { + pn->sndpart.len=0; + pn->sndpart.dgram_pool = pn->etcp->instance->data_pool; + pn->sndpart.memlen=pn->etcp->instance->data_pool->object_size;//pn->frag_size; + pn->sndpart.dgram = memory_pool_alloc(pn->etcp->instance->data_pool); + } +} + +// Internal: Process input when etcp->input_queue is ready (empty) +static void etcp_input_ready_cb(struct ll_queue* q, void* arg) { + struct PKTNORM* pn = (struct PKTNORM*)arg; + if (!pn) return; + + void* data = queue_data_get(pn->input); + if (!data) { + queue_resume_callback(pn->input); + return; + } + + struct ll_entry* in_dgram = data_to_entry(data); + uint16_t ptr = 0; + + while (ptr < in_dgram->len) { + pn_buf_renew(pn); + if (!pn->sndpart.dgram) break; // Allocation failed + + int remain = pn->frag_size - pn->sndpart.len; + if (remain < 3) { + // Буфер почти полон, отправить его + pn_send_to_etcp(pn); + continue; // Продолжить с новым буфером + } + + if (ptr == 0) { + pn->sndpart.dgram[pn->sndpart.len++] = in_dgram->len & 0xFF; + pn->sndpart.dgram[pn->sndpart.len++] = (in_dgram->len >> 8) & 0xFF; + remain -= 2; + } + + int n = remain; + int rem = in_dgram->len - ptr; + if (n > rem) n = rem; + memcpy(pn->sndpart.dgram + pn->sndpart.len, in_dgram->dgram + ptr, n); + pn->sndpart.len += n; + ptr += n; + + // Проверить, не заполнился ли буфер + if (pn->sndpart.len >= pn->frag_size - 2) { + pn_send_to_etcp(pn); + } + } + + queue_dgram_free(in_dgram); + queue_entry_free(data); + + // Cancel flush timer if active + if (pn->flush_timer) { + uasync_cancel_timeout(pn->ua, pn->flush_timer); + pn->flush_timer = NULL; + } + + // Set flush timer if no more input + if (queue_entry_count(pn->input) == 0) { + pn->flush_timer = uasync_set_timeout(pn->ua, pn->tx_wait_time, pn, pn_flush_cb); + } + + queue_resume_callback(pn->input); +} + +// Internal: Flush callback on timeout +static void pn_flush_cb(void* arg) { + struct PKTNORM* pn = (struct PKTNORM*)arg; + if (!pn) return; + + pn->flush_timer = NULL; + pn_send_to_etcp(pn); +} + +// Internal: Unpacker callback (assembles fragments into original packets) +static void pn_unpacker_cb(struct ll_queue* q, void* arg) { + struct PKTNORM* pn = (struct PKTNORM*)arg; + if (!pn) return; + + while (1) { + void* data = queue_data_get(pn->etcp->output_queue); + if (!data) break; + + struct ETCP_FRAGMENT* frag = (struct ETCP_FRAGMENT*)data; // Since data is ll_entry* + uint8_t* payload = frag->ll.dgram; + uint16_t len = frag->ll.len; + uint16_t ptr = 0; + + while (ptr < len) { + if (!pn->recvpart) { + // Need length header for new packet + if (len - ptr < 2) { + // Incomplete header, reset + pn_unpacker_reset_state(pn); + break; + } + uint16_t pkt_len = payload[ptr] | (payload[ptr + 1] << 8); + ptr += 2; + + pn->recvpart = ll_alloc_lldgram(pkt_len); + if (!pn->recvpart) { + break; + } + pn->recvpart->len = 0; + } + + uint16_t rem = pn->recvpart->memlen - pn->recvpart->len; + uint16_t avail = len - ptr; + uint16_t cp = (rem < avail) ? rem : avail; + memcpy(pn->recvpart->dgram + pn->recvpart->len, payload + ptr, cp); + pn->recvpart->len += cp; + ptr += cp; + + if (pn->recvpart->len == pn->recvpart->memlen) { + queue_data_put(pn->output, pn->recvpart, 0); + pn->recvpart = NULL; + } + } + + // Free the fragment - dgram was malloc'd in pn_send_to_etcp + free(frag->ll.dgram); + memory_pool_free(pn->etcp->rx_pool, frag); + } + + queue_resume_callback(q); +} diff --git a/tests/debug_simple.c b/tests/debug_simple.c index 98752d9..63ed33a 100644 --- a/tests/debug_simple.c +++ b/tests/debug_simple.c @@ -15,8 +15,8 @@ int main() { struct ll_queue* q = queue_new(ua, 0); /* Create test data */ - test_data_t* data1 = (test_data_t*)queue_data_new(sizeof(test_data_t)); - printf("queue_data_new returned: %p\n", data1); + test_data_t* data1 = (test_data_t*)queue_entry_new(sizeof(test_data_t)); + printf("queue_entry_new returned: %p\n", data1); data1->id = 1; strcpy(data1->name, "test1"); diff --git a/tests/test_etcp_100_packets b/tests/test_etcp_100_packets index 0473c10..c129f73 100755 Binary files a/tests/test_etcp_100_packets and b/tests/test_etcp_100_packets differ diff --git a/tests/test_etcp_100_packets.c b/tests/test_etcp_100_packets.c index ef17b94..db183cf 100644 --- a/tests/test_etcp_100_packets.c +++ b/tests/test_etcp_100_packets.c @@ -149,7 +149,7 @@ static void check_received_packets_fwd(void) { if (!conn || !conn->output_queue) return; struct ETCP_FRAGMENT* pkt; - while ((pkt = queue_data_get(conn->output_queue)) != NULL) { + while ((pkt = (struct ETCP_FRAGMENT*)queue_data_get(conn->output_queue)) != NULL) { if (pkt->ll.len >= PACKET_SIZE) { int seq = pkt->ll.dgram[0]; @@ -167,7 +167,7 @@ static void check_received_packets_fwd(void) { if (pkt->ll.dgram) { memory_pool_free(conn->instance->data_pool, pkt->ll.dgram); } - queue_entry_free(pkt); + queue_entry_free((struct ll_entry*)pkt); } } @@ -179,13 +179,13 @@ static void check_received_packets_back(void) { if (!conn || !conn->output_queue) return; struct ETCP_FRAGMENT* pkt; - while ((pkt = queue_data_get(conn->output_queue)) != NULL) { + while ((pkt = (struct ETCP_FRAGMENT*)queue_data_get(conn->output_queue)) != NULL) { if (pkt->ll.len >= PACKET_SIZE) { int seq = pkt->ll.dgram[0]; - + uint8_t expected[PACKET_SIZE]; generate_packet_data(seq, expected, PACKET_SIZE); - + if (memcmp(pkt->ll.dgram, expected, PACKET_SIZE) == 0) { if (seq >= 0 && seq < TOTAL_PACKETS) { received_packets_back[seq] = 1; @@ -193,11 +193,11 @@ static void check_received_packets_back(void) { packets_received_back++; } } - + if (pkt->ll.dgram) { memory_pool_free(conn->instance->data_pool, pkt->ll.dgram); } - queue_entry_free(pkt); + queue_entry_free((struct ll_entry*)pkt); } } diff --git a/tests/test_etcp_minimal b/tests/test_etcp_minimal index e739de7..28ac67d 100755 Binary files a/tests/test_etcp_minimal and b/tests/test_etcp_minimal differ diff --git a/tests/test_etcp_simple_traffic b/tests/test_etcp_simple_traffic index 86046e5..bd53998 100755 Binary files a/tests/test_etcp_simple_traffic and b/tests/test_etcp_simple_traffic differ diff --git a/tests/test_etcp_simple_traffic.c b/tests/test_etcp_simple_traffic.c index 78520d7..44b965a 100644 --- a/tests/test_etcp_simple_traffic.c +++ b/tests/test_etcp_simple_traffic.c @@ -155,7 +155,7 @@ static void check_packet_received(void) { DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "check_packet_received: Server output_queue count: %d", queue_entry_count(conn->output_queue)); // Check if there's any packet in output queue - struct ETCP_FRAGMENT* pkt = queue_data_get(conn->output_queue); + struct ETCP_FRAGMENT* pkt = (struct ETCP_FRAGMENT*)queue_data_get(conn->output_queue); if (pkt) { DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "check_packet_received: Found packet in output queue"); @@ -179,7 +179,7 @@ static void check_packet_received(void) { if (pkt->ll.dgram) { memory_pool_free(conn->instance->data_pool, pkt->ll.dgram); } - queue_entry_free(pkt); + queue_entry_free((struct ll_entry*)pkt); } else { DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "check_packet_received: No packet found in output queue"); } diff --git a/tests/test_etcp_two_instances b/tests/test_etcp_two_instances index 6720838..6ff94d7 100755 Binary files a/tests/test_etcp_two_instances and b/tests/test_etcp_two_instances differ diff --git a/tests/test_intensive_memory_pool b/tests/test_intensive_memory_pool index 1c6819f..51265bf 100755 Binary files a/tests/test_intensive_memory_pool and b/tests/test_intensive_memory_pool differ diff --git a/tests/test_intensive_memory_pool.c b/tests/test_intensive_memory_pool.c index 895549b..af8148c 100644 --- a/tests/test_intensive_memory_pool.c +++ b/tests/test_intensive_memory_pool.c @@ -38,7 +38,7 @@ static double test_without_pools(int iterations) { // Добавить записи for (int i = 0; i < 10; i++) { - void* data = queue_data_new(64); + void* data = queue_entry_new(64); queue_data_put(queue, data, i); // Используем ID = i } diff --git a/tests/test_intensive_memory_pool_new.c b/tests/test_intensive_memory_pool_new.c index ec95330..cfc3778 100644 --- a/tests/test_intensive_memory_pool_new.c +++ b/tests/test_intensive_memory_pool_new.c @@ -38,7 +38,7 @@ static double test_without_pools(int iterations) { // Добавить записи for (int i = 0; i < 10; i++) { - void* data = queue_data_new(64); + void* data = queue_entry_new(64); queue_data_put(queue, data, i); // Используем ID = i } diff --git a/tests/test_ll_queue b/tests/test_ll_queue index 7d68202..331bb9a 100755 Binary files a/tests/test_ll_queue and b/tests/test_ll_queue differ diff --git a/tests/test_ll_queue.c b/tests/test_ll_queue.c index 0e37b19..1f1a1ff 100644 --- a/tests/test_ll_queue.c +++ b/tests/test_ll_queue.c @@ -15,7 +15,9 @@ #include "../lib/debug_config.h" #include "../lib/memory_pool.h" +#ifndef DEBUG_CATEGORY_LL_QUEUE #define DEBUG_CATEGORY_LL_QUEUE 1 +#endif static struct { int run, passed, failed; @@ -59,10 +61,10 @@ static void queue_cb(struct ll_queue *q, void *arg) { int *cnt = arg; (*cnt)++; stats.cb_queue++; - test_data_t *taken = queue_data_get(q); + test_data_t *taken = (test_data_t*)queue_data_get(q); ASSERT(checksum(taken) == taken->checksum, "corruption in cb"); - queue_entry_free(taken); + queue_entry_free((struct ll_entry*)taken); queue_resume_callback(q); // next item when ready } @@ -90,17 +92,17 @@ static void test_fifo(void) { struct ll_queue *q = queue_new(ua, 0); for (int i = 0; i < 10; i++) { - test_data_t *d = queue_data_new(sizeof(test_data_t)); + test_data_t *d = (test_data_t*)queue_entry_new(sizeof(test_data_t)); d->id = i; snprintf(d->name, sizeof(d->name), "item%d", i); d->value = i*10; d->checksum = checksum(d); - queue_data_put(q, d, d->id); + queue_data_put(q, (struct ll_entry*)d, d->id); } ASSERT_EQ(queue_entry_count(q), 10, ""); for (int i = 0; i < 10; i++) { - test_data_t *d = queue_data_get(q); + test_data_t *d = (test_data_t*)queue_data_get(q); ASSERT(d && d->id == i, "FIFO violation"); - queue_entry_free(d); + queue_entry_free((struct ll_entry*)d); } ASSERT_EQ(queue_entry_count(q), 0, ""); queue_free(q); uasync_destroy(ua, 0); @@ -113,25 +115,25 @@ static void test_lifo_priority(void) { struct ll_queue *q = queue_new(ua, 0); for (int i = 0; i < 3; i++) { - test_data_t *d = queue_data_new(sizeof(*d)); + test_data_t *d = (test_data_t*)queue_entry_new(sizeof(*d)); d->id = i; d->value = i; d->checksum = checksum(d); - queue_data_put(q, d, i); + queue_data_put(q, (struct ll_entry*)d, i); } - test_data_t *pri = queue_data_new(sizeof(*pri)); + test_data_t *pri = (test_data_t*)queue_entry_new(sizeof(*pri)); pri->id = 999; pri->value = 999; pri->checksum = checksum(pri); - queue_data_put_first(q, pri, 999); + queue_data_put_first(q, (struct ll_entry*)pri, 999); - test_data_t *first = queue_data_get(q); + test_data_t *first = (test_data_t*)queue_data_get(q); ASSERT(first && first->id == 999, "priority first"); - queue_entry_free(first); + queue_entry_free((struct ll_entry*)first); for (int i = 0; i < 3; i++) { - test_data_t *d = queue_data_get(q); + test_data_t *d = (test_data_t*)queue_data_get(q); ASSERT(d && d->id == i, "remaining FIFO"); - queue_entry_free(d); + queue_entry_free((struct ll_entry*)d); } queue_free(q); uasync_destroy(ua, 0); PASS(); @@ -146,9 +148,9 @@ static void test_callback(void) { queue_set_callback(q, queue_cb, &cnt); for (int i = 0; i < 5; i++) { - test_data_t *d = queue_data_new(sizeof(*d)); + test_data_t *d = (test_data_t*)queue_entry_new(sizeof(*d)); d->id = i; d->value = i*10; d->checksum = checksum(d); - queue_data_put(q, d, d->id); + queue_data_put(q, (struct ll_entry*)d, d->id); } for (int i = 0; i < 30 && cnt < 5; i++) uasync_poll(ua, 5); @@ -169,15 +171,15 @@ static void test_waiter(void) { ASSERT(w == NULL && called == 1, "immediate when condition met"); // empty queue for (int i = 0; i < 5; i++) { - test_data_t *d = queue_data_new(sizeof(*d)); d->id = i; - queue_data_put(q, d, i); + test_data_t *d = (test_data_t*)queue_entry_new(sizeof(*d)); d->id = i; + queue_data_put(q, (struct ll_entry*)d, i); } called = 0; w = queue_wait_threshold(q, 2, 0, waiter_cb, &called); ASSERT(w != NULL && called == 0, ""); - for (int i = 0; i < 3; i++) queue_entry_free(queue_data_get(q)); + for (int i = 0; i < 3; i++) queue_entry_free((struct ll_entry*)queue_data_get(q)); for (int i = 0; i < 15; i++) uasync_poll(ua, 1); ASSERT_EQ(called, 1, ""); @@ -194,18 +196,18 @@ static void test_limits_hash(void) { queue_set_size_limit(q, 3); for (int i = 0; i < 3; i++) { - test_data_t *d = queue_data_new(sizeof(*d)); d->id = i*10+1; - queue_data_put(q, d, d->id); + test_data_t *d = (test_data_t*)queue_entry_new(sizeof(*d)); d->id = i*10+1; + queue_data_put(q, (struct ll_entry*)d, d->id); } - test_data_t *ex = queue_data_new(sizeof(*ex)); - ASSERT_EQ(queue_data_put(q, ex, 999), -1, "limit reject"); + test_data_t *ex = (test_data_t*)queue_entry_new(sizeof(*ex)); + ASSERT_EQ(queue_data_put(q, (struct ll_entry*)ex, 999), -1, "limit reject"); - test_data_t *found = queue_find_data_by_id(q, 21); + test_data_t *found = (test_data_t*)queue_find_data_by_id(q, 21); ASSERT(found && found->id == 21, "hash find"); - queue_remove_data(q, found); + queue_remove_data(q, (struct ll_entry*)found); ASSERT(queue_find_data_by_id(q, 21) == NULL, "removed"); - while (queue_entry_count(q)) queue_entry_free(queue_data_get(q)); + while (queue_entry_count(q)) queue_entry_free((struct ll_entry*)queue_data_get(q)); queue_free(q); uasync_destroy(ua, 0); PASS(); } @@ -219,23 +221,23 @@ static void test_pool(void) { size_t alloc1 = 0, reuse1 = 0; memory_pool_get_stats(pool, &alloc1, &reuse1); - test_data_t *d1 = queue_entry_new_from_pool(pool); + test_data_t *d1 = (test_data_t*)queue_entry_new_from_pool(pool); d1->id = 1; d1->checksum = checksum(d1); - queue_data_put(q, d1, 1); + queue_data_put(q, (struct ll_entry*)d1, 1); - test_data_t *d2 = queue_entry_new_from_pool(pool); + test_data_t *d2 = (test_data_t*)queue_entry_new_from_pool(pool); d2->id = 2; d2->checksum = checksum(d2); - queue_data_put(q, d2, 2); + queue_data_put(q, (struct ll_entry*)d2, 2); - queue_entry_free(queue_data_get(q)); // free d1 back to pool - queue_entry_free(queue_data_get(q)); // free d2 back to pool + queue_entry_free((struct ll_entry*)queue_data_get(q)); // free d1 back to pool + queue_entry_free((struct ll_entry*)queue_data_get(q)); // free d2 back to pool // Now allocate again to trigger reuse - test_data_t *d3 = queue_entry_new_from_pool(pool); + test_data_t *d3 = (test_data_t*)queue_entry_new_from_pool(pool); ASSERT(d3 != NULL, "alloc after free failed"); d3->id = 3; d3->checksum = checksum(d3); - queue_data_put(q, d3, 3); - queue_entry_free(queue_data_get(q)); // free d3 back + queue_data_put(q, (struct ll_entry*)d3, 3); + queue_entry_free((struct ll_entry*)queue_data_get(q)); // free d3 back size_t alloc2 = 0, reuse2 = 0; memory_pool_get_stats(pool, &alloc2, &reuse2); @@ -253,15 +255,15 @@ static void test_stress(void) { for (int i = 0; i < 10000; i++) { if (queue_entry_count(q) > 80) { - queue_entry_free(queue_data_get(q)); + queue_entry_free((struct ll_entry*)queue_data_get(q)); } - test_data_t *d = queue_data_new(sizeof(*d)); + test_data_t *d = (test_data_t*)queue_entry_new(sizeof(*d)); d->id = rand() % 10000; d->checksum = checksum(d); - queue_data_put(q, d, d->id); + queue_data_put(q, (struct ll_entry*)d, d->id); stats.ops++; } - while (queue_entry_count(q)) queue_entry_free(queue_data_get(q)); + while (queue_entry_count(q)) queue_entry_free((struct ll_entry*)queue_data_get(q)); stats.time_ms += now_ms() - start; queue_free(q); uasync_destroy(ua, 0); diff --git a/tests/test_memory_pool_and_config b/tests/test_memory_pool_and_config index e4fb62f..d1f57e1 100755 Binary files a/tests/test_memory_pool_and_config and b/tests/test_memory_pool_and_config differ diff --git a/tests/test_memory_pool_and_config.c b/tests/test_memory_pool_and_config.c index c1f94cc..cedfb94 100644 --- a/tests/test_memory_pool_and_config.c +++ b/tests/test_memory_pool_and_config.c @@ -57,7 +57,7 @@ int main() { // Add some entries and trigger waiters for (int i = 0; i < 5; i++) { - void* data = queue_data_new(10); + void* data = queue_entry_new(10); queue_data_put(queue, data, i); // Используем ID = i } diff --git a/tests/test_pkt_normalizer_etcp b/tests/test_pkt_normalizer_etcp index 53c5a1a..f24605c 100755 Binary files a/tests/test_pkt_normalizer_etcp and b/tests/test_pkt_normalizer_etcp differ diff --git a/tests/test_u_async_comprehensive b/tests/test_u_async_comprehensive index 779d636..853e7f9 100755 Binary files a/tests/test_u_async_comprehensive and b/tests/test_u_async_comprehensive differ diff --git a/tests/test_u_async_comprehensive.c b/tests/test_u_async_comprehensive.c index d3c3c10..8117351 100644 --- a/tests/test_u_async_comprehensive.c +++ b/tests/test_u_async_comprehensive.c @@ -425,7 +425,8 @@ static void test_concurrent_operations(void) { /* Write to sockets to generate events */ if (cycle % 3 == 0) { char data = 'x'; - write(sockets[0], &data, 1); + ssize_t wret = write(sockets[0], &data, 1); + (void)wret; /* Suppress warning - best effort write for testing */ } /* Poll */