Browse Source

Fix: Added explicit type casts for ll_queue type conversions

- Fixed all incompatible pointer type warnings in src/etcp.c
- Fixed warnings in src/pkt_normalizer.c
- Fixed warnings in tests/test_etcp_simple_traffic.c
- Fixed warnings in tests/test_etcp_100_packets.c
- Fixed warnings in tests/test_ll_queue.c
- Fixed DEBUG_CATEGORY_ALL overflow warning in debug_config.h
- Fixed DEBUG_CATEGORY_LL_QUEUE redefinition warning in test_ll_queue.c
- Fixed write() unused result warning in test_u_async_comprehensive.c
nodeinfo-routing-update
Evgeny 2 months ago
parent
commit
a0c4727b8b
  1. 2
      lib/debug_config.h
  2. 48
      lib/ll_queue.c
  3. 16
      lib/ll_queue.h
  4. 254
      src/etcp.c
  5. 12
      src/pkt_normalizer.c
  6. 297
      src/pkt_normalizer.c2
  7. 4
      tests/debug_simple.c
  8. BIN
      tests/test_etcp_100_packets
  9. 14
      tests/test_etcp_100_packets.c
  10. BIN
      tests/test_etcp_minimal
  11. BIN
      tests/test_etcp_simple_traffic
  12. 4
      tests/test_etcp_simple_traffic.c
  13. BIN
      tests/test_etcp_two_instances
  14. BIN
      tests/test_intensive_memory_pool
  15. 2
      tests/test_intensive_memory_pool.c
  16. 2
      tests/test_intensive_memory_pool_new.c
  17. BIN
      tests/test_ll_queue
  18. 80
      tests/test_ll_queue.c
  19. BIN
      tests/test_memory_pool_and_config
  20. 2
      tests/test_memory_pool_and_config.c
  21. BIN
      tests/test_pkt_normalizer_etcp
  22. BIN
      tests/test_u_async_comprehensive
  23. 3
      tests/test_u_async_comprehensive.c

2
lib/debug_config.h

@ -41,7 +41,7 @@ typedef uint64_t debug_category_t;
#define DEBUG_CATEGORY_TUN ((debug_category_t)1 << 8) // TUN interface #define DEBUG_CATEGORY_TUN ((debug_category_t)1 << 8) // TUN interface
#define DEBUG_CATEGORY_ROUTING ((debug_category_t)1 << 9) // routing table #define DEBUG_CATEGORY_ROUTING ((debug_category_t)1 << 9) // routing table
#define DEBUG_CATEGORY_TIMERS ((debug_category_t)1 << 10) // timer management #define DEBUG_CATEGORY_TIMERS ((debug_category_t)1 << 10) // timer management
#define DEBUG_CATEGORY_ALL (~((debug_category_t)0)) #define DEBUG_CATEGORY_ALL ((debug_category_t)(~((uint64_t)0)))
/* Debug configuration structure */ /* Debug configuration structure */
typedef struct { typedef struct {

48
lib/ll_queue.c

@ -13,14 +13,6 @@ static void check_waiters(struct ll_queue* q);
static void add_to_hash(struct ll_queue* q, struct ll_entry* entry); static void add_to_hash(struct ll_queue* q, struct ll_entry* entry);
static void remove_from_hash(struct ll_queue* q, struct ll_entry* entry); static void remove_from_hash(struct ll_queue* q, struct ll_entry* entry);
#define xxx 0
// Вспомогательная функция для преобразования данных в запись
static inline struct ll_entry* data_to_entry(void* data) {
if (!data) return NULL;
return ((struct ll_entry*)data) - xxx;
}
// ==================== Управление очередью ==================== // ==================== Управление очередью ====================
struct ll_queue* queue_new(struct UASYNC* ua, size_t hash_size) { struct ll_queue* queue_new(struct UASYNC* ua, size_t hash_size) {
@ -48,7 +40,7 @@ struct ll_queue* queue_new(struct UASYNC* ua, size_t hash_size) {
} }
struct ll_entry* ll_alloc_lldgram(uint16_t len) { struct ll_entry* ll_alloc_lldgram(uint16_t len) {
struct ll_entry* entry = queue_data_new(0); struct ll_entry* entry = queue_entry_new(0);
if (!entry) return NULL; if (!entry) return NULL;
entry->len=0; entry->len=0;
@ -123,7 +115,7 @@ void queue_set_size_limit(struct ll_queue* q, int lim) {
// ==================== Управление элементами ==================== // ==================== Управление элементами ====================
void* queue_data_new(size_t data_size) { struct ll_entry* queue_entry_new(size_t data_size) {
struct ll_entry* entry = malloc(sizeof(struct ll_entry) + data_size); struct ll_entry* entry = malloc(sizeof(struct ll_entry) + data_size);
if (!entry) return NULL; if (!entry) return NULL;
@ -132,12 +124,12 @@ void* queue_data_new(size_t data_size) {
entry->len = 0; entry->len = 0;
entry->pool = NULL; // Выделено через malloc entry->pool = NULL; // Выделено через malloc
// DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_data_new: created entry %p, size=%zu", entry, data_size); // DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_new: created entry %p, size=%zu", entry, data_size);
return (void*)(entry + xxx); return entry;
} }
void* queue_entry_new_from_pool(struct memory_pool* pool) { struct ll_entry* queue_entry_new_from_pool(struct memory_pool* pool) {
if (!pool) return NULL; if (!pool) return NULL;
struct ll_entry* entry = memory_pool_alloc(pool); struct ll_entry* entry = memory_pool_alloc(pool);
@ -150,7 +142,7 @@ void* queue_entry_new_from_pool(struct memory_pool* pool) {
// DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_new_from_pool: created entry %p from pool %p", entry, pool); // DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_new_from_pool: created entry %p from pool %p", entry, pool);
return (void*)(entry + xxx); return entry;
} }
//void ll_free_dgram(struct ll_entry* entry) { //void ll_free_dgram(struct ll_entry* entry) {
@ -228,15 +220,14 @@ static void check_waiters(struct ll_queue* q) {
} }
} }
int queue_data_put(struct ll_queue* q, void* data, uint32_t id) { int queue_data_put(struct ll_queue* q, struct ll_entry* entry, uint32_t id) {
if (!q || !data) return -1; if (!q || !entry) return -1;
struct ll_entry* entry = data_to_entry(data);
entry->id = id; entry->id = id;
// Проверить лимит размера // Проверить лимит размера
if (q->size_limit >= 0 && q->count >= q->size_limit) { if (q->size_limit >= 0 && q->count >= q->size_limit) {
queue_entry_free(data); // Освободить элемент если превышен лимит queue_entry_free(entry); // Освободить элемент если превышен лимит
return -1; return -1;
} }
@ -271,15 +262,14 @@ int queue_data_put(struct ll_queue* q, void* data, uint32_t id) {
return 0; return 0;
} }
int queue_data_put_first(struct ll_queue* q, void* data, uint32_t id) { int queue_data_put_first(struct ll_queue* q, struct ll_entry* entry, uint32_t id) {
if (!q || !data) return -1; if (!q || !entry) return -1;
struct ll_entry* entry = data_to_entry(data);
entry->id = id; entry->id = id;
// Проверить лимит размера // Проверить лимит размера
if (q->size_limit >= 0 && q->count >= q->size_limit) { if (q->size_limit >= 0 && q->count >= q->size_limit) {
queue_entry_free(data); // Освободить элемент если превышен лимит queue_entry_free(entry); // Освободить элемент если превышен лимит
return -1; return -1;
} }
@ -311,7 +301,7 @@ int queue_data_put_first(struct ll_queue* q, void* data, uint32_t id) {
return 0; return 0;
} }
void* queue_data_get(struct ll_queue* q) { struct ll_entry* queue_data_get(struct ll_queue* q) {
if (!q || !q->head) return NULL; if (!q || !q->head) return NULL;
struct ll_entry* entry = q->head; struct ll_entry* entry = q->head;
@ -336,7 +326,7 @@ void* queue_data_get(struct ll_queue* q) {
// Проверить ожидающие коллбэки // Проверить ожидающие коллбэки
check_waiters(q); check_waiters(q);
return (void*)(entry + xxx); return entry;
} }
int queue_entry_count(struct ll_queue* q) { int queue_entry_count(struct ll_queue* q) {
@ -375,7 +365,7 @@ void queue_cancel_wait(struct ll_queue* q, struct queue_waiter* waiter) {
// ==================== Поиск и удаление по ID ==================== // ==================== Поиск и удаление по ID ====================
void* queue_find_data_by_id(struct ll_queue* q, uint32_t id) { struct ll_entry* queue_find_data_by_id(struct ll_queue* q, uint32_t id) {
if (!q || q->hash_size == 0 || !q->hash_table) return NULL; if (!q || q->hash_size == 0 || !q->hash_table) return NULL;
size_t slot = id % q->hash_size; size_t slot = id % q->hash_size;
@ -383,7 +373,7 @@ void* queue_find_data_by_id(struct ll_queue* q, uint32_t id) {
while (entry) { while (entry) {
if (entry->id == id) { if (entry->id == id) {
return (void*)(entry + xxx); return entry;
} }
entry = entry->hash_next; entry = entry->hash_next;
} }
@ -391,10 +381,8 @@ void* queue_find_data_by_id(struct ll_queue* q, uint32_t id) {
return NULL; return NULL;
} }
int queue_remove_data(struct ll_queue* q, void* data) { int queue_remove_data(struct ll_queue* q, struct ll_entry* entry) {
if (!q || !data) return -1; if (!q || !entry) return -1;
struct ll_entry* entry = data_to_entry(data);
// Удалить из двусвязного списка // Удалить из двусвязного списка
if (entry->prev) { if (entry->prev) {

16
lib/ll_queue.h

@ -85,7 +85,7 @@ void queue_free(struct ll_queue* q);
struct ll_entry* ll_alloc_lldgram(uint16_t len); struct ll_entry* ll_alloc_lldgram(uint16_t len);
// Функция для освобождения только dgram в ll_entry // Функция для освобождения только dgram в ll_entry
// Принимает void* data (как возвращается из queue_data_new или queue_data_get) // Принимает void* data (как возвращается из queue_entry_new или queue_data_get)
// Освобождает dgram с использованием dgram_pool (если указан) или free (если нет) // Освобождает dgram с использованием dgram_pool (если указан) или free (если нет)
// Если есть dgram_free_fn, использует её; иначе - pool или free // Если есть dgram_free_fn, использует её; иначе - pool или free
// Устанавливает dgram в NULL после освобождения // Устанавливает dgram в NULL после освобождения
@ -114,11 +114,11 @@ void queue_set_size_limit(struct ll_queue* q, int lim);
// Создать новый элемент с областью данных указанного размера // Создать новый элемент с областью данных указанного размера
// Память выделяется одним блоком: [struct ll_entry][область данных data_size байт] // Память выделяется одним блоком: [struct ll_entry][область данных data_size байт]
// Возвращает: указатель на структуру элемента (struct ll_entry*) или NULL при ошибке выделения памяти // Возвращает: указатель на структуру элемента (struct ll_entry*) или NULL при ошибке выделения памяти
void* queue_data_new(size_t data_size); struct ll_entry* queue_entry_new(size_t data_size);
// Создать новый элемент из пула (размер был определен при создании пула) // Создать новый элемент из пула (размер был определен при создании пула)
// Возвращает: указатель на структуру элемента (struct ll_entry*) или NULL при ошибке выделения памяти // Возвращает: указатель на структуру элемента (struct ll_entry*) или NULL при ошибке выделения памяти
void* queue_entry_new_from_pool(struct memory_pool* pool); struct ll_entry* queue_entry_new_from_pool(struct memory_pool* pool);
// Освободить только entry (не влияет на очереди, dgram не освобождает) // Освободить только entry (не влияет на очереди, dgram не освобождает)
void queue_entry_free(struct ll_entry* entry); void queue_entry_free(struct ll_entry* entry);
@ -132,18 +132,18 @@ void queue_dgram_free(struct ll_entry* entry);
// Добавить элемент в конец очереди (FIFO) // Добавить элемент в конец очереди (FIFO)
// Если очередь была пустой и коллбэки разрешены - вызывает коллбэк // Если очередь была пустой и коллбэки разрешены - вызывает коллбэк
// Возвращает: 0 при успехе, -1 если превышен лимит размера (элемент освобожден) // Возвращает: 0 при успехе, -1 если превышен лимит размера (элемент освобожден)
int queue_data_put(struct ll_queue* q, void* data, uint32_t id); int queue_data_put(struct ll_queue* q, struct ll_entry* entry, uint32_t id);
// Добавить элемент в начало очереди (LIFO, высокий приоритет) // Добавить элемент в начало очереди (LIFO, высокий приоритет)
// Если очередь была пустой и коллбэки разрешены - вызывает коллбэк // Если очередь была пустой и коллбэки разрешены - вызывает коллбэк
// Возвращает: 0 при успехе, -1 если превышен лимит размера (элемент освобожден) // Возвращает: 0 при успехе, -1 если превышен лимит размера (элемент освобожден)
int queue_data_put_first(struct ll_queue* q, void* data, uint32_t id); int queue_data_put_first(struct ll_queue* q, struct ll_entry* entry, uint32_t id);
// Извлечь элемент из начала очереди // Извлечь элемент из начала очереди
// При извлечении приостанавливает коллбэки (callback_suspended = 1) чтобы предотвратить рекурсию // При извлечении приостанавливает коллбэки (callback_suspended = 1) чтобы предотвратить рекурсию
// Возвращает: указатель на структуру элемента (struct ll_entry*) или NULL если очередь пуста // Возвращает: указатель на структуру элемента (struct ll_entry*) или NULL если очередь пуста
// ПРИМЕЧАНИЕ: не освобождает память элемента // ПРИМЕЧАНИЕ: не освобождает память элемента
void* queue_data_get(struct ll_queue* q); struct ll_entry* queue_data_get(struct ll_queue* q);
// Получить текущее количество элементов в очереди // Получить текущее количество элементов в очереди
int queue_entry_count(struct ll_queue* q); int queue_entry_count(struct ll_queue* q);
@ -172,11 +172,11 @@ static inline size_t queue_total_bytes(struct ll_queue* q) {
// Найти элемент по ID // Найти элемент по ID
// Возвращает: указатель на структуру элемента (struct ll_entry*) или NULL если не найден // Возвращает: указатель на структуру элемента (struct ll_entry*) или NULL если не найден
void* queue_find_data_by_id(struct ll_queue* q, uint32_t id); struct ll_entry* queue_find_data_by_id(struct ll_queue* q, uint32_t id);
// Удалить элемент из очереди по указателю на структуру элемента // Удалить элемент из очереди по указателю на структуру элемента
// Возвращает: 0 при успехе, -1 если элемент не найден // Возвращает: 0 при успехе, -1 если элемент не найден
// ПРИМЕЧАНИЕ: НЕ изменяет ref_count элемента, просто удаляет из очереди // ПРИМЕЧАНИЕ: НЕ изменяет ref_count элемента, просто удаляет из очереди
int queue_remove_data(struct ll_queue* q, void* data); int queue_remove_data(struct ll_queue* q, struct ll_entry* entry);
#endif // LL_QUEUE_H #endif // LL_QUEUE_H

254
src/etcp.c

@ -121,10 +121,10 @@ struct ETCP_CONN* etcp_connection_create(struct UTUN_INSTANCE* instance) {
void etcp_connection_close(struct ETCP_CONN* etcp) { void etcp_connection_close(struct ETCP_CONN* etcp) {
if (!etcp) return; if (!etcp) return;
// Drain and free input_queue (contains ETCP_FRAGMENT with pkt_data from data_pool) // Drain and free input_queue (contains ETCP_FRAGMENT with pkt_data from data_pool)
if (etcp->input_queue) { if (etcp->input_queue) {
struct ETCP_FRAGMENT* pkt; struct ETCP_FRAGMENT* pkt;
while ((pkt = queue_data_get(etcp->input_queue)) != NULL) { while ((pkt = (struct ETCP_FRAGMENT*)queue_data_get(etcp->input_queue)) != NULL) {
if (pkt->ll.dgram) { if (pkt->ll.dgram) {
memory_pool_free(etcp->instance->data_pool, pkt->ll.dgram); memory_pool_free(etcp->instance->data_pool, pkt->ll.dgram);
} }
@ -134,10 +134,10 @@ void etcp_connection_close(struct ETCP_CONN* etcp) {
etcp->input_queue = NULL; etcp->input_queue = NULL;
} }
// Drain and free output_queue (contains ETCP_FRAGMENT with pkt_data from data_pool) // Drain and free output_queue (contains ETCP_FRAGMENT with pkt_data from data_pool)
if (etcp->output_queue) { if (etcp->output_queue) {
struct ETCP_FRAGMENT* pkt; struct ETCP_FRAGMENT* pkt;
while ((pkt = queue_data_get(etcp->output_queue)) != NULL) { while ((pkt = (struct ETCP_FRAGMENT*)queue_data_get(etcp->output_queue)) != NULL) {
if (pkt->ll.dgram) { if (pkt->ll.dgram) {
memory_pool_free(etcp->instance->data_pool, pkt->ll.dgram); memory_pool_free(etcp->instance->data_pool, pkt->ll.dgram);
} }
@ -147,10 +147,10 @@ void etcp_connection_close(struct ETCP_CONN* etcp) {
etcp->output_queue = NULL; etcp->output_queue = NULL;
} }
// Drain and free input_send_q (contains INFLIGHT_PACKET with pkt_data from data_pool) // Drain and free input_send_q (contains INFLIGHT_PACKET with pkt_data from data_pool)
if (etcp->input_send_q) { if (etcp->input_send_q) {
struct INFLIGHT_PACKET* pkt; struct INFLIGHT_PACKET* pkt;
while ((pkt = queue_data_get(etcp->input_send_q)) != NULL) { while ((pkt = (struct INFLIGHT_PACKET*)queue_data_get(etcp->input_send_q)) != NULL) {
if (pkt->ll.dgram) { if (pkt->ll.dgram) {
memory_pool_free(etcp->instance->data_pool, pkt->ll.dgram); memory_pool_free(etcp->instance->data_pool, pkt->ll.dgram);
} }
@ -160,10 +160,10 @@ void etcp_connection_close(struct ETCP_CONN* etcp) {
etcp->input_send_q = NULL; etcp->input_send_q = NULL;
} }
// Drain and free input_wait_ack (contains INFLIGHT_PACKET with pkt_data from data_pool) // Drain and free input_wait_ack (contains INFLIGHT_PACKET with pkt_data from data_pool)
if (etcp->input_wait_ack) { if (etcp->input_wait_ack) {
struct INFLIGHT_PACKET* pkt; struct INFLIGHT_PACKET* pkt;
while ((pkt = queue_data_get(etcp->input_wait_ack)) != NULL) { while ((pkt = (struct INFLIGHT_PACKET*)queue_data_get(etcp->input_wait_ack)) != NULL) {
if (pkt->ll.dgram) { if (pkt->ll.dgram) {
memory_pool_free(etcp->instance->data_pool, pkt->ll.dgram); memory_pool_free(etcp->instance->data_pool, pkt->ll.dgram);
} }
@ -173,20 +173,20 @@ void etcp_connection_close(struct ETCP_CONN* etcp) {
etcp->input_wait_ack = NULL; etcp->input_wait_ack = NULL;
} }
// Drain and free ack_q (contains ACK_PACKET from ack_pool) // Drain and free ack_q (contains ACK_PACKET from ack_pool)
if (etcp->ack_q) { if (etcp->ack_q) {
struct ACK_PACKET* pkt; struct ACK_PACKET* pkt;
while ((pkt = queue_data_get(etcp->ack_q)) != NULL) { while ((pkt = (struct ACK_PACKET*)queue_data_get(etcp->ack_q)) != NULL) {
queue_entry_free(pkt); queue_entry_free((struct ll_entry*)pkt);
} }
queue_free(etcp->ack_q); queue_free(etcp->ack_q);
etcp->ack_q = NULL; etcp->ack_q = NULL;
} }
// Drain and free recv_q (contains ETCP_FRAGMENT with pkt_data from data_pool) // Drain and free recv_q (contains ETCP_FRAGMENT with pkt_data from data_pool)
if (etcp->recv_q) { if (etcp->recv_q) {
struct ETCP_FRAGMENT* pkt; struct ETCP_FRAGMENT* pkt;
while ((pkt = queue_data_get(etcp->recv_q)) != NULL) { while ((pkt = (struct ETCP_FRAGMENT*)queue_data_get(etcp->recv_q)) != NULL) {
if (pkt->ll.dgram) { if (pkt->ll.dgram) {
memory_pool_free(etcp->instance->data_pool, pkt->ll.dgram); memory_pool_free(etcp->instance->data_pool, pkt->ll.dgram);
} }
@ -272,7 +272,7 @@ int etcp_send(struct ETCP_CONN* etcp, const void* data, uint16_t len) {
// Create queue entry - this allocates ll_entry + data pointer // Create queue entry - this allocates ll_entry + data pointer
struct ETCP_FRAGMENT* pkt = queue_entry_new_from_pool(etcp->rx_pool); struct ETCP_FRAGMENT* pkt = (struct ETCP_FRAGMENT*)queue_entry_new_from_pool(etcp->rx_pool);
if (!pkt) { if (!pkt) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_send: failed to allocate queue entry"); DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_send: failed to allocate queue entry");
memory_pool_free(etcp->instance->data_pool, packet_data); memory_pool_free(etcp->instance->data_pool, packet_data);
@ -286,8 +286,8 @@ int etcp_send(struct ETCP_CONN* etcp, const void* data, uint16_t len) {
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_send: created PACKET %p with data %p (len=%zu)", pkt, packet_data, len); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_send: created PACKET %p with data %p (len=%zu)", pkt, packet_data, len);
// Add to input queue - input_queue_cb will process it // Add to input queue - input_queue_cb will process it
if (queue_data_put(etcp->input_queue, pkt, 0) != 0) { if (queue_data_put(etcp->input_queue, (struct ll_entry*)pkt, 0) != 0) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_send: failed to add to input queue"); DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_send: failed to add to input queue");
memory_pool_free(etcp->instance->data_pool, packet_data); memory_pool_free(etcp->instance->data_pool, packet_data);
memory_pool_free(etcp->rx_pool, pkt); memory_pool_free(etcp->rx_pool, pkt);
@ -317,8 +317,8 @@ static void input_queue_try_resume(struct ETCP_CONN* etcp) {
// input_queue -> input_send_q // input_queue -> input_send_q
static void input_queue_cb(struct ll_queue* q, void* arg) { static void input_queue_cb(struct ll_queue* q, void* arg) {
struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg; struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg;
struct ETCP_FRAGMENT* in_pkt = queue_data_get(q); struct ETCP_FRAGMENT* in_pkt = (struct ETCP_FRAGMENT*)queue_data_get(q);
if (!in_pkt) { if (!in_pkt) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "input_queue_cb: cannot get element (pool=%p etcp=%p)", etcp->inflight_pool, etcp); DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "input_queue_cb: cannot get element (pool=%p etcp=%p)", etcp->inflight_pool, etcp);
@ -333,10 +333,10 @@ static void input_queue_cb(struct ll_queue* q, void* arg) {
// Create INFLIGHT_PACKET // Create INFLIGHT_PACKET
struct INFLIGHT_PACKET* p = memory_pool_alloc(etcp->inflight_pool); struct INFLIGHT_PACKET* p = memory_pool_alloc(etcp->inflight_pool);
if (!p) { if (!p) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "input_queue_cb: cannot allocate INFLIGHT_PACKET (pool=%p etcp=%p)", etcp->inflight_pool, etcp); DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "input_queue_cb: cannot allocate INFLIGHT_PACKET (pool=%p etcp=%p)", etcp->inflight_pool, etcp);
queue_entry_free(in_pkt); // Free the ETCP_FRAGMENT queue_entry_free((struct ll_entry*)in_pkt); // Free the ETCP_FRAGMENT
queue_resume_callback(q); queue_resume_callback(q);
return; return;
} }
// Setup inflight packet (based on protocol.txt) // Setup inflight packet (based on protocol.txt)
@ -347,13 +347,13 @@ static void input_queue_cb(struct ll_queue* q, void* arg) {
p->ll.dgram = in_pkt->ll.dgram; p->ll.dgram = in_pkt->ll.dgram;
p->ll.len = in_pkt->ll.len; p->ll.len = in_pkt->ll.len;
// Add to send queue // Add to send queue
if (queue_data_put(etcp->input_send_q, p, p->seq) != 0) { if (queue_data_put(etcp->input_send_q, (struct ll_entry*)p, p->seq) != 0) {
memory_pool_free(etcp->inflight_pool, p); memory_pool_free(etcp->inflight_pool, p);
queue_entry_free(in_pkt); queue_entry_free((struct ll_entry*)in_pkt);
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "input_queue_cb: EXIT (queue put failed)"); DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "input_queue_cb: EXIT (queue put failed)");
return; return;
} }
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "input_queue_cb: successfully moved from input_queue to input_send_q"); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "input_queue_cb: successfully moved from input_queue to input_send_q");
input_queue_try_resume(etcp); input_queue_try_resume(etcp);
@ -396,12 +396,12 @@ static void ack_timeout_check(void* arg) {
pkt->last_timestamp = now; pkt->last_timestamp = now;
pkt->last_link = NULL; // Reset last link for re-selection pkt->last_link = NULL; // Reset last link for re-selection
// Remove from wait_ack // Remove from wait_ack
queue_remove_data(etcp->input_wait_ack, pkt); queue_remove_data(etcp->input_wait_ack, (struct ll_entry*)pkt);
// Change state and add to send_q for retransmission // Change state and add to send_q for retransmission
pkt->state = INFLIGHT_STATE_WAIT_SEND; pkt->state = INFLIGHT_STATE_WAIT_SEND;
queue_data_put(etcp->input_send_q, pkt, pkt->seq); queue_data_put(etcp->input_send_q, (struct ll_entry*)pkt, pkt->seq);
// Update stats // Update stats
etcp->retransmissions_count++; etcp->retransmissions_count++;
@ -441,17 +441,17 @@ struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) {
// return NULL; // return NULL;
} }
// First, check if there's a packet in input_send_q (retrans or new) // First, check if there's a packet in input_send_q (retrans or new)
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: getting packet from input_send_q"); // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: getting packet from input_send_q");
struct INFLIGHT_PACKET* inf_pkt = queue_data_get(etcp->input_send_q); struct INFLIGHT_PACKET* inf_pkt = (struct INFLIGHT_PACKET*)queue_data_get(etcp->input_send_q);
if (inf_pkt) { if (inf_pkt) {
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: prepare udp dgram for send packet %p (seq=%u, len=%u)", inf_pkt, inf_pkt->seq, inf_pkt->ll.len); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: prepare udp dgram for send packet %p (seq=%u, len=%u)", inf_pkt, inf_pkt->seq, inf_pkt->ll.len);
inf_pkt->last_timestamp=get_current_time_units(); inf_pkt->last_timestamp=get_current_time_units();
inf_pkt->send_count++; inf_pkt->send_count++;
inf_pkt->state=INFLIGHT_STATE_WAIT_ACK; inf_pkt->state=INFLIGHT_STATE_WAIT_ACK;
queue_data_put(etcp->input_wait_ack, inf_pkt, inf_pkt->seq);// move dgram to wait_ack queue queue_data_put(etcp->input_wait_ack, (struct ll_entry*)inf_pkt, inf_pkt->seq);// move dgram to wait_ack queue
} }
size_t ack_q_size = queue_entry_count(etcp->ack_q); size_t ack_q_size = queue_entry_count(etcp->ack_q);
@ -480,24 +480,24 @@ struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) {
dgram->data[ptr++]=etcp->last_delivered_id>>24; dgram->data[ptr++]=etcp->last_delivered_id>>24;
// тут (потом) добавим опциональные заголовки // тут (потом) добавим опциональные заголовки
struct ACK_PACKET* ack_pkt; struct ACK_PACKET* ack_pkt;
while (ack_pkt = queue_data_get(etcp->ack_q)) { while ((ack_pkt = (struct ACK_PACKET*)queue_data_get(etcp->ack_q))) {
// seq 4 байта // seq 4 байта
dgram->data[ptr++]=ack_pkt->seq; dgram->data[ptr++]=ack_pkt->seq;
dgram->data[ptr++]=ack_pkt->seq>>8; dgram->data[ptr++]=ack_pkt->seq>>8;
dgram->data[ptr++]=ack_pkt->seq>>16; dgram->data[ptr++]=ack_pkt->seq>>16;
dgram->data[ptr++]=ack_pkt->seq>>24; dgram->data[ptr++]=ack_pkt->seq>>24;
// ts приема 2 байта // ts приема 2 байта
dgram->data[ptr++]=ack_pkt->recv_timestamp; dgram->data[ptr++]=ack_pkt->recv_timestamp;
dgram->data[ptr++]=ack_pkt->recv_timestamp>>8; dgram->data[ptr++]=ack_pkt->recv_timestamp>>8;
// время задержки 2 байта между recv и ack // время задержки 2 байта между recv и ack
uint16_t dly=get_current_timestamp()-ack_pkt->recv_timestamp; uint16_t dly=get_current_timestamp()-ack_pkt->recv_timestamp;
dgram->data[ptr++]=dly; dgram->data[ptr++]=dly;
dgram->data[ptr++]=dly>>8; dgram->data[ptr++]=dly>>8;
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: add ACK N%d dTS=%d", ack_pkt->seq, dly); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: add ACK N%d dTS=%d", ack_pkt->seq, dly);
queue_entry_free(ack_pkt); queue_entry_free((struct ll_entry*)ack_pkt);
if (inf_pkt && inf_pkt->ll.len+ptr>=etcp->mtu-10) break;// pkt len (надо просчитать точнее включая все заголовки) if (inf_pkt && inf_pkt->ll.len+ptr>=etcp->mtu-10) break;// pkt len (надо просчитать точнее включая все заголовки)
if (ptr>500) break; if (ptr>500) break;
@ -562,35 +562,35 @@ void etcp_output_try_assembly(struct ETCP_CONN* etcp) {
int delivered_count = 0; int delivered_count = 0;
uint32_t delivered_bytes = 0; uint32_t delivered_bytes = 0;
// Look for contiguous packets starting from next_expected_id // Look for contiguous packets starting from next_expected_id
while (1) { while (1) {
struct ETCP_FRAGMENT* rx_pkt = queue_find_data_by_id(etcp->recv_q, next_expected_id); struct ETCP_FRAGMENT* rx_pkt = (struct ETCP_FRAGMENT*)queue_find_data_by_id(etcp->recv_q, next_expected_id);
if (!rx_pkt) { if (!rx_pkt) {
// No more contiguous packets found // No more contiguous packets found
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_output_try_assembly: no packet found for id=%u, stopping", next_expected_id); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_output_try_assembly: no packet found for id=%u, stopping", next_expected_id);
break; break;
} }
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_output_try_assembly: assembling packet id=%u (len=%u)", DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_output_try_assembly: assembling packet id=%u (len=%u)",
rx_pkt->seq, rx_pkt->ll.len); rx_pkt->seq, rx_pkt->ll.len);
// Simply move ETCP_FRAGMENT from recv_q to output_queue - no data copying needed // Simply move ETCP_FRAGMENT from recv_q to output_queue - no data copying needed
// Remove from recv_q first // Remove from recv_q first
queue_remove_data(etcp->recv_q, rx_pkt); queue_remove_data(etcp->recv_q, (struct ll_entry*)rx_pkt);
// Add to output_queue using the same ETCP_FRAGMENT structure // Add to output_queue using the same ETCP_FRAGMENT structure
if (queue_data_put(etcp->output_queue, rx_pkt, next_expected_id) == 0) { if (queue_data_put(etcp->output_queue, (struct ll_entry*)rx_pkt, next_expected_id) == 0) {
delivered_bytes += rx_pkt->ll.len; delivered_bytes += rx_pkt->ll.len;
delivered_count++; delivered_count++;
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "etcp_output_try_assembly: moved packet id=%u to output_queue", DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "etcp_output_try_assembly: moved packet id=%u to output_queue",
next_expected_id); next_expected_id);
} else { } else {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_output_try_assembly: failed to add packet id=%u to output_queue", DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_output_try_assembly: failed to add packet id=%u to output_queue",
next_expected_id); next_expected_id);
// Put it back in recv_q if we can't add to output_queue // Put it back in recv_q if we can't add to output_queue
queue_data_put(etcp->recv_q, rx_pkt, next_expected_id); queue_data_put(etcp->recv_q, (struct ll_entry*)rx_pkt, next_expected_id);
break; break;
} }
// Update state for next iteration // Update state for next iteration
etcp->last_delivered_id = next_expected_id; etcp->last_delivered_id = next_expected_id;
@ -607,12 +607,12 @@ void etcp_ack_recv(struct ETCP_CONN* etcp, uint32_t seq, uint16_t ts, uint16_t d
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_ack_recv: processing ACK for seq=%u, ts=%u, dts=%u", seq, ts, dts); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_ack_recv: processing ACK for seq=%u, ts=%u, dts=%u", seq, ts, dts);
// Find the acknowledged packet in the wait_ack queue // Find the acknowledged packet in the wait_ack queue
struct INFLIGHT_PACKET* acked_pkt = queue_find_data_by_id(etcp->input_wait_ack, seq); struct INFLIGHT_PACKET* acked_pkt = (struct INFLIGHT_PACKET*)queue_find_data_by_id(etcp->input_wait_ack, seq);
if (acked_pkt) queue_remove_data(etcp->input_wait_ack, acked_pkt); if (acked_pkt) queue_remove_data(etcp->input_wait_ack, (struct ll_entry*)acked_pkt);
else { acked_pkt = queue_find_data_by_id(etcp->input_send_q, seq); else { acked_pkt = (struct INFLIGHT_PACKET*)queue_find_data_by_id(etcp->input_send_q, seq);
queue_remove_data(etcp->input_send_q, acked_pkt); queue_remove_data(etcp->input_send_q, (struct ll_entry*)acked_pkt);
} }
if (!acked_pkt) { if (!acked_pkt) {
// Packet might be already acknowledged or not found // Packet might be already acknowledged or not found
@ -705,30 +705,30 @@ void etcp_conn_input(struct ETCP_DGRAM* pkt) {
case ETCP_SECTION_PAYLOAD: { case ETCP_SECTION_PAYLOAD: {
if (len>=5) { if (len>=5) {
// формируем ACK // формируем ACK
struct ACK_PACKET* p = queue_entry_new_from_pool(etcp->instance->ack_pool); struct ACK_PACKET* p = (struct ACK_PACKET*)queue_entry_new_from_pool(etcp->instance->ack_pool);
uint32_t seq=data[1] | (data[2]<<8) | (data[3]<<16) | (data[4]<<24); uint32_t seq=data[1] | (data[2]<<8) | (data[3]<<16) | (data[4]<<24);
p->seq=seq; p->seq=seq;
p->pkt_timestamp=pkt->timestamp; p->pkt_timestamp=pkt->timestamp;
p->recv_timestamp=get_current_timestamp(); p->recv_timestamp=get_current_timestamp();
queue_data_put(etcp->ack_q, p, p->seq); queue_data_put(etcp->ack_q, (struct ll_entry*)p, p->seq);
if (etcp->ack_resp_timer == NULL) { if (etcp->ack_resp_timer == NULL) {
etcp->ack_resp_timer = uasync_set_timeout(etcp->instance->ua, ACK_DELAY_TB, etcp, ack_response_timer_cb); etcp->ack_resp_timer = uasync_set_timeout(etcp->instance->ua, ACK_DELAY_TB, etcp, ack_response_timer_cb);
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_conn_input: set ack_timer for delayed ACK send"); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_conn_input: set ack_timer for delayed ACK send");
} }
if ((int32_t)(etcp->last_delivered_id-seq)<0) if (queue_find_data_by_id(etcp->recv_q, seq)==NULL) {// проверяем есть ли пакет с этим seq if ((int32_t)(etcp->last_delivered_id-seq)<0) if (queue_find_data_by_id(etcp->recv_q, seq)==NULL) {// проверяем есть ли пакет с этим seq
uint32_t pkt_len=len-5; uint32_t pkt_len=len-5;
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_conn_input: adding packet seq=%u to recv_q (last_delivered_id=%u)", seq, etcp->last_delivered_id); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_conn_input: adding packet seq=%u to recv_q (last_delivered_id=%u)", seq, etcp->last_delivered_id);
// отправляем пакет в очередь на сборку // отправляем пакет в очередь на сборку
uint8_t* payload_data = memory_pool_alloc(etcp->instance->data_pool); uint8_t* payload_data = memory_pool_alloc(etcp->instance->data_pool);
struct ETCP_FRAGMENT* rx_pkt = queue_entry_new_from_pool(etcp->rx_pool); struct ETCP_FRAGMENT* rx_pkt = (struct ETCP_FRAGMENT*)queue_entry_new_from_pool(etcp->rx_pool);
rx_pkt->seq=seq; rx_pkt->seq=seq;
rx_pkt->timestamp=pkt->timestamp; rx_pkt->timestamp=pkt->timestamp;
rx_pkt->ll.dgram=payload_data; rx_pkt->ll.dgram=payload_data;
rx_pkt->ll.len=pkt_len; rx_pkt->ll.len=pkt_len;
// Copy the actual payload data // Copy the actual payload data
memcpy(payload_data, data + 5, pkt_len); memcpy(payload_data, data + 5, pkt_len);
queue_data_put(etcp->recv_q, rx_pkt, seq); queue_data_put(etcp->recv_q, (struct ll_entry*)rx_pkt, seq);
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_conn_input: packet seq=%u added to recv_q, calling assembly (last_delivered_id=%u)", seq, etcp->last_delivered_id); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_conn_input: packet seq=%u added to recv_q, calling assembly (last_delivered_id=%u)", seq, etcp->last_delivered_id);
if (etcp->last_delivered_id+1==seq) etcp_output_try_assembly(etcp);// пробуем собрать выходную очередь из фрагментов if (etcp->last_delivered_id+1==seq) etcp_output_try_assembly(etcp);// пробуем собрать выходную очередь из фрагментов
} }

12
src/pkt_normalizer.c

@ -85,7 +85,7 @@ void pn_pair_deinit(struct PKTNORM* pn) {
} }
if (pn->sndpart.dgram) { if (pn->sndpart.dgram) {
queue_entry_free(&pn->sndpart); queue_dgram_free(&pn->sndpart);
} }
if (pn->recvpart) { if (pn->recvpart) {
queue_dgram_free(pn->recvpart); queue_dgram_free(pn->recvpart);
@ -112,6 +112,7 @@ void pn_packer_send(struct PKTNORM* pn, uint8_t* data, uint16_t len) {
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "pn_packer_send: pn=%p, len=%d", pn, len); DEBUG_INFO(DEBUG_CATEGORY_ETCP, "pn_packer_send: pn=%p, len=%d", pn, len);
struct ll_entry* entry = ll_alloc_lldgram(len); struct ll_entry* entry = ll_alloc_lldgram(len);
if (!entry) return;
memcpy(entry->dgram, data, len); memcpy(entry->dgram, data, len);
entry->len = len; entry->len = len;
entry->dgram_pool = NULL; entry->dgram_pool = NULL;
@ -137,11 +138,11 @@ static void packer_cb(struct ll_queue* q, void* arg) {
// Helper to send sndpart to ETCP as ETCP_FRAGMENT // Helper to send sndpart to ETCP as ETCP_FRAGMENT
static void pn_send_to_etcp(struct PKTNORM* pn) { static void pn_send_to_etcp(struct PKTNORM* pn) {
if (!pn || !pn->sndpart.data || !pn->sndpart.len==0) return; if (!pn || !pn->sndpart.dgram || pn->sndpart.len == 0) return;
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "pn_packer: pn_send_to_etcp"); DEBUG_INFO(DEBUG_CATEGORY_ETCP, "pn_packer: pn_send_to_etcp");
// Allocate ETCP_FRAGMENT from rx_pool // Allocate ETCP_FRAGMENT from rx_pool
struct ETCP_FRAGMENT* frag = queue_entry_new_from_pool(pn->etcp->rx_pool); struct ETCP_FRAGMENT* frag = (struct ETCP_FRAGMENT*)queue_entry_new_from_pool(pn->etcp->rx_pool);
if (!frag) {// drop data if (!frag) {// drop data
pn->sndpart.len = 0; pn->sndpart.len = 0;
return; return;
@ -153,9 +154,8 @@ static void pn_send_to_etcp(struct PKTNORM* pn) {
frag->ll.len = pn->sndpart.len; frag->ll.len = pn->sndpart.len;
frag->ll.memlen = pn->sndpart.memlen; frag->ll.memlen = pn->sndpart.memlen;
queue_data_put(pn->etcp->input_queue, frag, 0); queue_data_put(pn->etcp->input_queue, (struct ll_entry*)frag, 0);
queue_entry_free(&pn->sndpart); // Сбросить структуру (dgram передан во фрагмент, не освобождаем)
// Сбросить структуру после освобождения
pn->sndpart.dgram = NULL; pn->sndpart.dgram = NULL;
pn->sndpart.len = 0; pn->sndpart.len = 0;
pn->sndpart.memlen = 0; pn->sndpart.memlen = 0;

297
src/pkt_normalizer.c2

@ -0,0 +1,297 @@
// pkt_normalizer.c - Implementation of packet normalizer for ETCP
#include "pkt_normalizer.h"
#include "etcp.h" // For ETCP_CONN and related structures
#include "ll_queue.h" // For queue operations
#include "u_async.h" // For UASYNC
#include <stdlib.h>
#include <string.h>
#include <stdio.h> // For debugging (can be removed if not needed)
#include "debug_config.h" // Assuming this for DEBUG_ERROR
// Internal helper to convert void* data to struct ll_entry*
static inline struct ll_entry* data_to_entry(void* data) {
if (!data) return NULL;
return (struct ll_entry*)data;
}
// Forward declarations
static void packer_cb(struct ll_queue* q, void* arg);
static void pn_flush_cb(void* arg);
static void etcp_input_ready_cb(struct ll_queue* q, void* arg);
static void pn_unpacker_cb(struct ll_queue* q, void* arg);
static void pn_send_to_etcp(struct PKTNORM* pn);
// Initialization
struct PKTNORM* pn_init(struct ETCP_CONN* etcp) {
if (!etcp) return NULL;
struct PKTNORM* pn = calloc(1, sizeof(struct PKTNORM));
if (!pn) return NULL;
pn->etcp = etcp;
pn->ua = etcp->instance->ua;
pn->frag_size = etcp->mtu - 100; // Use MTU as fixed packet size (adjust if headers need subtraction)
pn->tx_wait_time = 10;
pn->input = queue_new(pn->ua, 0); // No hash needed
pn->output = queue_new(pn->ua, 0); // No hash needed
if (!pn->input || !pn->output) {
pn_pair_deinit(pn);
return NULL;
}
queue_set_callback(pn->input, packer_cb, pn);
queue_set_callback(etcp->output_queue, pn_unpacker_cb, pn);
pn->sndpart.dgram = NULL;
pn->sndpart.len = 0;
pn->recvpart = NULL;
pn->flush_timer = NULL;
return pn;
}
// Deinitialization
void pn_pair_deinit(struct PKTNORM* pn) {
if (!pn) return;
// Drain and free queues
if (pn->input) {
void* data;
while ((data = queue_data_get(pn->input)) != NULL) {
struct ll_entry* entry = data_to_entry(data);
if (entry->dgram) {
free(entry->dgram);
}
queue_entry_free(data);
}
queue_free(pn->input);
}
if (pn->output) {
void* data;
while ((data = queue_data_get(pn->output)) != NULL) {
struct ll_entry* entry = data_to_entry(data);
if (entry->dgram) {
free(entry->dgram);
}
queue_entry_free(data);
}
queue_free(pn->output);
}
if (pn->flush_timer) {
uasync_cancel_timeout(pn->ua, pn->flush_timer);
}
if (pn->sndpart.dgram) {
queue_entry_free(&pn->sndpart);
}
if (pn->recvpart) {
queue_dgram_free(pn->recvpart);
queue_entry_free(pn->recvpart);
}
free(pn);
}
// Reset unpacker state
void pn_unpacker_reset_state(struct PKTNORM* pn) {
if (!pn) return;
if (pn->recvpart) {
queue_dgram_free(pn->recvpart);
queue_entry_free(pn->recvpart);
pn->recvpart = NULL;
}
}
// Send data to packer (copies and adds to input queue or pending, triggering callback)
void pn_packer_send(struct PKTNORM* pn, uint8_t* data, uint16_t len) {
if (!pn || !data || len == 0) return;
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "pn_packer_send: pn=%p, len=%d", pn, len);
struct ll_entry* entry = ll_alloc_lldgram(len);
memcpy(entry->dgram, data, len);
entry->len = len;
entry->dgram_pool = NULL;
int ret = queue_data_put(pn->input, entry, 0);
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "pn_packer_send: queue_data_put returned %d, input count=%d", ret, queue_entry_count(pn->input));
// Cancel flush timer if active
if (pn->flush_timer) {
uasync_cancel_timeout(pn->ua, pn->flush_timer);
pn->flush_timer = NULL;
}
}
// Internal: Packer callback
static void packer_cb(struct ll_queue* q, void* arg) {
struct PKTNORM* pn = (struct PKTNORM*)arg;
if (!pn) return;
queue_wait_threshold(pn->etcp->input_queue, 0, 0, etcp_input_ready_cb, pn);
}
// Helper to send sndpart to ETCP as ETCP_FRAGMENT
static void pn_send_to_etcp(struct PKTNORM* pn) {
if (!pn || !pn->sndpart.data || !pn->sndpart.len==0) return;
// Allocate ETCP_FRAGMENT from rx_pool
struct ETCP_FRAGMENT* frag = queue_entry_new_from_pool(pn->etcp->rx_pool);
if (!frag) {// drop data
pn->sndpart.len = 0;
return;
}
frag->seq = 0;
frag->timestamp = 0;
frag->ll.dgram = pn->sndpart.dgram;
frag->ll.len = pn->sndpart.len;
frag->ll.memlen = pn->sndpart.memlen;
queue_data_put(pn->etcp->input_queue, frag, 0);
queue_entry_free(&pn->sndpart);
// Сбросить структуру после освобождения
pn->sndpart.dgram = NULL;
pn->sndpart.len = 0;
pn->sndpart.memlen = 0;
}
// Internal: Renew sndpart buffer
static void pn_buf_renew(struct PKTNORM* pn) {
if (pn->sndpart.dgram) {
int remain = pn->frag_size - pn->sndpart.len;
if (remain < 3) pn_send_to_etcp(pn);
}
if (!pn->sndpart.dgram) {
pn->sndpart.len=0;
pn->sndpart.dgram_pool = pn->etcp->instance->data_pool;
pn->sndpart.memlen=pn->etcp->instance->data_pool->object_size;//pn->frag_size;
pn->sndpart.dgram = memory_pool_alloc(pn->etcp->instance->data_pool);
}
}
// Internal: Process input when etcp->input_queue is ready (empty)
static void etcp_input_ready_cb(struct ll_queue* q, void* arg) {
struct PKTNORM* pn = (struct PKTNORM*)arg;
if (!pn) return;
void* data = queue_data_get(pn->input);
if (!data) {
queue_resume_callback(pn->input);
return;
}
struct ll_entry* in_dgram = data_to_entry(data);
uint16_t ptr = 0;
while (ptr < in_dgram->len) {
pn_buf_renew(pn);
if (!pn->sndpart.dgram) break; // Allocation failed
int remain = pn->frag_size - pn->sndpart.len;
if (remain < 3) {
// Буфер почти полон, отправить его
pn_send_to_etcp(pn);
continue; // Продолжить с новым буфером
}
if (ptr == 0) {
pn->sndpart.dgram[pn->sndpart.len++] = in_dgram->len & 0xFF;
pn->sndpart.dgram[pn->sndpart.len++] = (in_dgram->len >> 8) & 0xFF;
remain -= 2;
}
int n = remain;
int rem = in_dgram->len - ptr;
if (n > rem) n = rem;
memcpy(pn->sndpart.dgram + pn->sndpart.len, in_dgram->dgram + ptr, n);
pn->sndpart.len += n;
ptr += n;
// Проверить, не заполнился ли буфер
if (pn->sndpart.len >= pn->frag_size - 2) {
pn_send_to_etcp(pn);
}
}
queue_dgram_free(in_dgram);
queue_entry_free(data);
// Cancel flush timer if active
if (pn->flush_timer) {
uasync_cancel_timeout(pn->ua, pn->flush_timer);
pn->flush_timer = NULL;
}
// Set flush timer if no more input
if (queue_entry_count(pn->input) == 0) {
pn->flush_timer = uasync_set_timeout(pn->ua, pn->tx_wait_time, pn, pn_flush_cb);
}
queue_resume_callback(pn->input);
}
// Internal: Flush callback on timeout
static void pn_flush_cb(void* arg) {
struct PKTNORM* pn = (struct PKTNORM*)arg;
if (!pn) return;
pn->flush_timer = NULL;
pn_send_to_etcp(pn);
}
// Internal: Unpacker callback (assembles fragments into original packets)
static void pn_unpacker_cb(struct ll_queue* q, void* arg) {
struct PKTNORM* pn = (struct PKTNORM*)arg;
if (!pn) return;
while (1) {
void* data = queue_data_get(pn->etcp->output_queue);
if (!data) break;
struct ETCP_FRAGMENT* frag = (struct ETCP_FRAGMENT*)data; // Since data is ll_entry*
uint8_t* payload = frag->ll.dgram;
uint16_t len = frag->ll.len;
uint16_t ptr = 0;
while (ptr < len) {
if (!pn->recvpart) {
// Need length header for new packet
if (len - ptr < 2) {
// Incomplete header, reset
pn_unpacker_reset_state(pn);
break;
}
uint16_t pkt_len = payload[ptr] | (payload[ptr + 1] << 8);
ptr += 2;
pn->recvpart = ll_alloc_lldgram(pkt_len);
if (!pn->recvpart) {
break;
}
pn->recvpart->len = 0;
}
uint16_t rem = pn->recvpart->memlen - pn->recvpart->len;
uint16_t avail = len - ptr;
uint16_t cp = (rem < avail) ? rem : avail;
memcpy(pn->recvpart->dgram + pn->recvpart->len, payload + ptr, cp);
pn->recvpart->len += cp;
ptr += cp;
if (pn->recvpart->len == pn->recvpart->memlen) {
queue_data_put(pn->output, pn->recvpart, 0);
pn->recvpart = NULL;
}
}
// Free the fragment - dgram was malloc'd in pn_send_to_etcp
free(frag->ll.dgram);
memory_pool_free(pn->etcp->rx_pool, frag);
}
queue_resume_callback(q);
}

4
tests/debug_simple.c

@ -15,8 +15,8 @@ int main() {
struct ll_queue* q = queue_new(ua, 0); struct ll_queue* q = queue_new(ua, 0);
/* Create test data */ /* Create test data */
test_data_t* data1 = (test_data_t*)queue_data_new(sizeof(test_data_t)); test_data_t* data1 = (test_data_t*)queue_entry_new(sizeof(test_data_t));
printf("queue_data_new returned: %p\n", data1); printf("queue_entry_new returned: %p\n", data1);
data1->id = 1; data1->id = 1;
strcpy(data1->name, "test1"); strcpy(data1->name, "test1");

BIN
tests/test_etcp_100_packets

Binary file not shown.

14
tests/test_etcp_100_packets.c

@ -149,7 +149,7 @@ static void check_received_packets_fwd(void) {
if (!conn || !conn->output_queue) return; if (!conn || !conn->output_queue) return;
struct ETCP_FRAGMENT* pkt; struct ETCP_FRAGMENT* pkt;
while ((pkt = queue_data_get(conn->output_queue)) != NULL) { while ((pkt = (struct ETCP_FRAGMENT*)queue_data_get(conn->output_queue)) != NULL) {
if (pkt->ll.len >= PACKET_SIZE) { if (pkt->ll.len >= PACKET_SIZE) {
int seq = pkt->ll.dgram[0]; int seq = pkt->ll.dgram[0];
@ -167,7 +167,7 @@ static void check_received_packets_fwd(void) {
if (pkt->ll.dgram) { if (pkt->ll.dgram) {
memory_pool_free(conn->instance->data_pool, pkt->ll.dgram); memory_pool_free(conn->instance->data_pool, pkt->ll.dgram);
} }
queue_entry_free(pkt); queue_entry_free((struct ll_entry*)pkt);
} }
} }
@ -179,13 +179,13 @@ static void check_received_packets_back(void) {
if (!conn || !conn->output_queue) return; if (!conn || !conn->output_queue) return;
struct ETCP_FRAGMENT* pkt; struct ETCP_FRAGMENT* pkt;
while ((pkt = queue_data_get(conn->output_queue)) != NULL) { while ((pkt = (struct ETCP_FRAGMENT*)queue_data_get(conn->output_queue)) != NULL) {
if (pkt->ll.len >= PACKET_SIZE) { if (pkt->ll.len >= PACKET_SIZE) {
int seq = pkt->ll.dgram[0]; int seq = pkt->ll.dgram[0];
uint8_t expected[PACKET_SIZE]; uint8_t expected[PACKET_SIZE];
generate_packet_data(seq, expected, PACKET_SIZE); generate_packet_data(seq, expected, PACKET_SIZE);
if (memcmp(pkt->ll.dgram, expected, PACKET_SIZE) == 0) { if (memcmp(pkt->ll.dgram, expected, PACKET_SIZE) == 0) {
if (seq >= 0 && seq < TOTAL_PACKETS) { if (seq >= 0 && seq < TOTAL_PACKETS) {
received_packets_back[seq] = 1; received_packets_back[seq] = 1;
@ -193,11 +193,11 @@ static void check_received_packets_back(void) {
packets_received_back++; packets_received_back++;
} }
} }
if (pkt->ll.dgram) { if (pkt->ll.dgram) {
memory_pool_free(conn->instance->data_pool, pkt->ll.dgram); memory_pool_free(conn->instance->data_pool, pkt->ll.dgram);
} }
queue_entry_free(pkt); queue_entry_free((struct ll_entry*)pkt);
} }
} }

BIN
tests/test_etcp_minimal

Binary file not shown.

BIN
tests/test_etcp_simple_traffic

Binary file not shown.

4
tests/test_etcp_simple_traffic.c

@ -155,7 +155,7 @@ static void check_packet_received(void) {
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "check_packet_received: Server output_queue count: %d", queue_entry_count(conn->output_queue)); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "check_packet_received: Server output_queue count: %d", queue_entry_count(conn->output_queue));
// Check if there's any packet in output queue // Check if there's any packet in output queue
struct ETCP_FRAGMENT* pkt = queue_data_get(conn->output_queue); struct ETCP_FRAGMENT* pkt = (struct ETCP_FRAGMENT*)queue_data_get(conn->output_queue);
if (pkt) { if (pkt) {
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "check_packet_received: Found packet in output queue"); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "check_packet_received: Found packet in output queue");
@ -179,7 +179,7 @@ static void check_packet_received(void) {
if (pkt->ll.dgram) { if (pkt->ll.dgram) {
memory_pool_free(conn->instance->data_pool, pkt->ll.dgram); memory_pool_free(conn->instance->data_pool, pkt->ll.dgram);
} }
queue_entry_free(pkt); queue_entry_free((struct ll_entry*)pkt);
} else { } else {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "check_packet_received: No packet found in output queue"); DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "check_packet_received: No packet found in output queue");
} }

BIN
tests/test_etcp_two_instances

Binary file not shown.

BIN
tests/test_intensive_memory_pool

Binary file not shown.

2
tests/test_intensive_memory_pool.c

@ -38,7 +38,7 @@ static double test_without_pools(int iterations) {
// Добавить записи // Добавить записи
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
void* data = queue_data_new(64); void* data = queue_entry_new(64);
queue_data_put(queue, data, i); // Используем ID = i queue_data_put(queue, data, i); // Используем ID = i
} }

2
tests/test_intensive_memory_pool_new.c

@ -38,7 +38,7 @@ static double test_without_pools(int iterations) {
// Добавить записи // Добавить записи
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
void* data = queue_data_new(64); void* data = queue_entry_new(64);
queue_data_put(queue, data, i); // Используем ID = i queue_data_put(queue, data, i); // Используем ID = i
} }

BIN
tests/test_ll_queue

Binary file not shown.

80
tests/test_ll_queue.c

@ -15,7 +15,9 @@
#include "../lib/debug_config.h" #include "../lib/debug_config.h"
#include "../lib/memory_pool.h" #include "../lib/memory_pool.h"
#ifndef DEBUG_CATEGORY_LL_QUEUE
#define DEBUG_CATEGORY_LL_QUEUE 1 #define DEBUG_CATEGORY_LL_QUEUE 1
#endif
static struct { static struct {
int run, passed, failed; int run, passed, failed;
@ -59,10 +61,10 @@ static void queue_cb(struct ll_queue *q, void *arg) {
int *cnt = arg; int *cnt = arg;
(*cnt)++; stats.cb_queue++; (*cnt)++; stats.cb_queue++;
test_data_t *taken = queue_data_get(q); test_data_t *taken = (test_data_t*)queue_data_get(q);
ASSERT(checksum(taken) == taken->checksum, "corruption in cb"); ASSERT(checksum(taken) == taken->checksum, "corruption in cb");
queue_entry_free(taken); queue_entry_free((struct ll_entry*)taken);
queue_resume_callback(q); // next item when ready queue_resume_callback(q); // next item when ready
} }
@ -90,17 +92,17 @@ static void test_fifo(void) {
struct ll_queue *q = queue_new(ua, 0); struct ll_queue *q = queue_new(ua, 0);
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
test_data_t *d = queue_data_new(sizeof(test_data_t)); test_data_t *d = (test_data_t*)queue_entry_new(sizeof(test_data_t));
d->id = i; snprintf(d->name, sizeof(d->name), "item%d", i); d->id = i; snprintf(d->name, sizeof(d->name), "item%d", i);
d->value = i*10; d->checksum = checksum(d); d->value = i*10; d->checksum = checksum(d);
queue_data_put(q, d, d->id); queue_data_put(q, (struct ll_entry*)d, d->id);
} }
ASSERT_EQ(queue_entry_count(q), 10, ""); ASSERT_EQ(queue_entry_count(q), 10, "");
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
test_data_t *d = queue_data_get(q); test_data_t *d = (test_data_t*)queue_data_get(q);
ASSERT(d && d->id == i, "FIFO violation"); ASSERT(d && d->id == i, "FIFO violation");
queue_entry_free(d); queue_entry_free((struct ll_entry*)d);
} }
ASSERT_EQ(queue_entry_count(q), 0, ""); ASSERT_EQ(queue_entry_count(q), 0, "");
queue_free(q); uasync_destroy(ua, 0); queue_free(q); uasync_destroy(ua, 0);
@ -113,25 +115,25 @@ static void test_lifo_priority(void) {
struct ll_queue *q = queue_new(ua, 0); struct ll_queue *q = queue_new(ua, 0);
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
test_data_t *d = queue_data_new(sizeof(*d)); test_data_t *d = (test_data_t*)queue_entry_new(sizeof(*d));
d->id = i; d->id = i;
d->value = i; d->value = i;
d->checksum = checksum(d); d->checksum = checksum(d);
queue_data_put(q, d, i); queue_data_put(q, (struct ll_entry*)d, i);
} }
test_data_t *pri = queue_data_new(sizeof(*pri)); test_data_t *pri = (test_data_t*)queue_entry_new(sizeof(*pri));
pri->id = 999; pri->value = 999; pri->checksum = checksum(pri); pri->id = 999; pri->value = 999; pri->checksum = checksum(pri);
queue_data_put_first(q, pri, 999); queue_data_put_first(q, (struct ll_entry*)pri, 999);
test_data_t *first = queue_data_get(q); test_data_t *first = (test_data_t*)queue_data_get(q);
ASSERT(first && first->id == 999, "priority first"); ASSERT(first && first->id == 999, "priority first");
queue_entry_free(first); queue_entry_free((struct ll_entry*)first);
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
test_data_t *d = queue_data_get(q); test_data_t *d = (test_data_t*)queue_data_get(q);
ASSERT(d && d->id == i, "remaining FIFO"); ASSERT(d && d->id == i, "remaining FIFO");
queue_entry_free(d); queue_entry_free((struct ll_entry*)d);
} }
queue_free(q); uasync_destroy(ua, 0); queue_free(q); uasync_destroy(ua, 0);
PASS(); PASS();
@ -146,9 +148,9 @@ static void test_callback(void) {
queue_set_callback(q, queue_cb, &cnt); queue_set_callback(q, queue_cb, &cnt);
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
test_data_t *d = queue_data_new(sizeof(*d)); test_data_t *d = (test_data_t*)queue_entry_new(sizeof(*d));
d->id = i; d->value = i*10; d->checksum = checksum(d); d->id = i; d->value = i*10; d->checksum = checksum(d);
queue_data_put(q, d, d->id); queue_data_put(q, (struct ll_entry*)d, d->id);
} }
for (int i = 0; i < 30 && cnt < 5; i++) uasync_poll(ua, 5); for (int i = 0; i < 30 && cnt < 5; i++) uasync_poll(ua, 5);
@ -169,15 +171,15 @@ static void test_waiter(void) {
ASSERT(w == NULL && called == 1, "immediate when condition met"); // empty queue ASSERT(w == NULL && called == 1, "immediate when condition met"); // empty queue
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
test_data_t *d = queue_data_new(sizeof(*d)); d->id = i; test_data_t *d = (test_data_t*)queue_entry_new(sizeof(*d)); d->id = i;
queue_data_put(q, d, i); queue_data_put(q, (struct ll_entry*)d, i);
} }
called = 0; called = 0;
w = queue_wait_threshold(q, 2, 0, waiter_cb, &called); w = queue_wait_threshold(q, 2, 0, waiter_cb, &called);
ASSERT(w != NULL && called == 0, ""); ASSERT(w != NULL && called == 0, "");
for (int i = 0; i < 3; i++) queue_entry_free(queue_data_get(q)); for (int i = 0; i < 3; i++) queue_entry_free((struct ll_entry*)queue_data_get(q));
for (int i = 0; i < 15; i++) uasync_poll(ua, 1); for (int i = 0; i < 15; i++) uasync_poll(ua, 1);
ASSERT_EQ(called, 1, ""); ASSERT_EQ(called, 1, "");
@ -194,18 +196,18 @@ static void test_limits_hash(void) {
queue_set_size_limit(q, 3); queue_set_size_limit(q, 3);
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
test_data_t *d = queue_data_new(sizeof(*d)); d->id = i*10+1; test_data_t *d = (test_data_t*)queue_entry_new(sizeof(*d)); d->id = i*10+1;
queue_data_put(q, d, d->id); queue_data_put(q, (struct ll_entry*)d, d->id);
} }
test_data_t *ex = queue_data_new(sizeof(*ex)); test_data_t *ex = (test_data_t*)queue_entry_new(sizeof(*ex));
ASSERT_EQ(queue_data_put(q, ex, 999), -1, "limit reject"); ASSERT_EQ(queue_data_put(q, (struct ll_entry*)ex, 999), -1, "limit reject");
test_data_t *found = queue_find_data_by_id(q, 21); test_data_t *found = (test_data_t*)queue_find_data_by_id(q, 21);
ASSERT(found && found->id == 21, "hash find"); ASSERT(found && found->id == 21, "hash find");
queue_remove_data(q, found); queue_remove_data(q, (struct ll_entry*)found);
ASSERT(queue_find_data_by_id(q, 21) == NULL, "removed"); ASSERT(queue_find_data_by_id(q, 21) == NULL, "removed");
while (queue_entry_count(q)) queue_entry_free(queue_data_get(q)); while (queue_entry_count(q)) queue_entry_free((struct ll_entry*)queue_data_get(q));
queue_free(q); uasync_destroy(ua, 0); queue_free(q); uasync_destroy(ua, 0);
PASS(); PASS();
} }
@ -219,23 +221,23 @@ static void test_pool(void) {
size_t alloc1 = 0, reuse1 = 0; size_t alloc1 = 0, reuse1 = 0;
memory_pool_get_stats(pool, &alloc1, &reuse1); memory_pool_get_stats(pool, &alloc1, &reuse1);
test_data_t *d1 = queue_entry_new_from_pool(pool); test_data_t *d1 = (test_data_t*)queue_entry_new_from_pool(pool);
d1->id = 1; d1->checksum = checksum(d1); d1->id = 1; d1->checksum = checksum(d1);
queue_data_put(q, d1, 1); queue_data_put(q, (struct ll_entry*)d1, 1);
test_data_t *d2 = queue_entry_new_from_pool(pool); test_data_t *d2 = (test_data_t*)queue_entry_new_from_pool(pool);
d2->id = 2; d2->checksum = checksum(d2); d2->id = 2; d2->checksum = checksum(d2);
queue_data_put(q, d2, 2); queue_data_put(q, (struct ll_entry*)d2, 2);
queue_entry_free(queue_data_get(q)); // free d1 back to pool queue_entry_free((struct ll_entry*)queue_data_get(q)); // free d1 back to pool
queue_entry_free(queue_data_get(q)); // free d2 back to pool queue_entry_free((struct ll_entry*)queue_data_get(q)); // free d2 back to pool
// Now allocate again to trigger reuse // Now allocate again to trigger reuse
test_data_t *d3 = queue_entry_new_from_pool(pool); test_data_t *d3 = (test_data_t*)queue_entry_new_from_pool(pool);
ASSERT(d3 != NULL, "alloc after free failed"); ASSERT(d3 != NULL, "alloc after free failed");
d3->id = 3; d3->checksum = checksum(d3); d3->id = 3; d3->checksum = checksum(d3);
queue_data_put(q, d3, 3); queue_data_put(q, (struct ll_entry*)d3, 3);
queue_entry_free(queue_data_get(q)); // free d3 back queue_entry_free((struct ll_entry*)queue_data_get(q)); // free d3 back
size_t alloc2 = 0, reuse2 = 0; size_t alloc2 = 0, reuse2 = 0;
memory_pool_get_stats(pool, &alloc2, &reuse2); memory_pool_get_stats(pool, &alloc2, &reuse2);
@ -253,15 +255,15 @@ static void test_stress(void) {
for (int i = 0; i < 10000; i++) { for (int i = 0; i < 10000; i++) {
if (queue_entry_count(q) > 80) { if (queue_entry_count(q) > 80) {
queue_entry_free(queue_data_get(q)); queue_entry_free((struct ll_entry*)queue_data_get(q));
} }
test_data_t *d = queue_data_new(sizeof(*d)); test_data_t *d = (test_data_t*)queue_entry_new(sizeof(*d));
d->id = rand() % 10000; d->id = rand() % 10000;
d->checksum = checksum(d); d->checksum = checksum(d);
queue_data_put(q, d, d->id); queue_data_put(q, (struct ll_entry*)d, d->id);
stats.ops++; stats.ops++;
} }
while (queue_entry_count(q)) queue_entry_free(queue_data_get(q)); while (queue_entry_count(q)) queue_entry_free((struct ll_entry*)queue_data_get(q));
stats.time_ms += now_ms() - start; stats.time_ms += now_ms() - start;
queue_free(q); uasync_destroy(ua, 0); queue_free(q); uasync_destroy(ua, 0);

BIN
tests/test_memory_pool_and_config

Binary file not shown.

2
tests/test_memory_pool_and_config.c

@ -57,7 +57,7 @@ int main() {
// Add some entries and trigger waiters // Add some entries and trigger waiters
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
void* data = queue_data_new(10); void* data = queue_entry_new(10);
queue_data_put(queue, data, i); // Используем ID = i queue_data_put(queue, data, i); // Используем ID = i
} }

BIN
tests/test_pkt_normalizer_etcp

Binary file not shown.

BIN
tests/test_u_async_comprehensive

Binary file not shown.

3
tests/test_u_async_comprehensive.c

@ -425,7 +425,8 @@ static void test_concurrent_operations(void) {
/* Write to sockets to generate events */ /* Write to sockets to generate events */
if (cycle % 3 == 0) { if (cycle % 3 == 0) {
char data = 'x'; char data = 'x';
write(sockets[0], &data, 1); ssize_t wret = write(sockets[0], &data, 1);
(void)wret; /* Suppress warning - best effort write for testing */
} }
/* Poll */ /* Poll */

Loading…
Cancel
Save