Browse Source

+queue names

nodeinfo-routing-update
jeka 4 weeks ago
parent
commit
1811764f6d
  1. 34
      lib/ll_queue.c
  2. 3
      lib/ll_queue.h
  3. 2
      src/dummynet.c
  4. 12
      src/etcp.c
  5. 33
      src/pkt_normalizer.c
  6. 2
      src/route_bgp.c
  7. 4
      src/tun_if.c
  8. 2
      tests/debug_simple.c
  9. 4
      tests/test_intensive_memory_pool.c
  10. 4
      tests/test_intensive_memory_pool_new.c
  11. 18
      tests/test_ll_queue.c
  12. 2
      tests/test_memory_pool_and_config.c
  13. 4
      tests/test_pkt_normalizer_standalone.c

34
lib/ll_queue.c

@ -16,12 +16,13 @@ static void remove_from_hash(struct ll_queue* q, struct ll_entry* entry);
// ==================== Управление очередью ==================== // ==================== Управление очередью ====================
struct ll_queue* queue_new(struct UASYNC* ua, size_t hash_size) { struct ll_queue* queue_new(struct UASYNC* ua, size_t hash_size, char* name) {
if (!ua) return NULL; if (!ua) return NULL;
struct ll_queue* q = u_calloc(1, sizeof(struct ll_queue)); struct ll_queue* q = u_calloc(1, sizeof(struct ll_queue));
if (!q) return NULL; if (!q) return NULL;
q->name = name;
q->ua = ua; q->ua = ua;
q->size_limit = -1; // Без ограничения по умолчанию q->size_limit = -1; // Без ограничения по умолчанию
q->hash_size = hash_size; q->hash_size = hash_size;
@ -255,6 +256,11 @@ int queue_data_put(struct ll_queue* q, struct ll_entry* entry, uint32_t id) {
// DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_data_put: added entry %p (id=%u), count=%d", entry, id, q->count); // DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_data_put: added entry %p (id=%u), count=%d", entry, id, q->count);
// ВАЖНО: проверка консистентности ДО коллбэка, так как коллбэк может модифицировать очередь
#ifdef QUEUE_DEBUG
queue_check_consistency(q);// !!!! for debug - BEFORE callback
#endif
// Если очередь была пуста и коллбэки разрешены - вызвать коллбэк // Если очередь была пуста и коллбэки разрешены - вызвать коллбэк
if (q->count == 1 && !q->callback_suspended && q->callback) { if (q->count == 1 && !q->callback_suspended && q->callback) {
q->callback(q, q->callback_arg); q->callback(q, q->callback_arg);
@ -264,7 +270,7 @@ int queue_data_put(struct ll_queue* q, struct ll_entry* entry, uint32_t id) {
check_waiters(q); check_waiters(q);
#ifdef QUEUE_DEBUG #ifdef QUEUE_DEBUG
queue_check_consistency(q);// !!!! for debug queue_check_consistency(q);// !!!! for debug - AFTER callback
#endif #endif
return 0; return 0;
} }
@ -298,6 +304,11 @@ int queue_data_put_first(struct ll_queue* q, struct ll_entry* entry, uint32_t id
// DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_data_put_first: added entry %p (id=%u), count=%d", entry, id, q->count); // DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_data_put_first: added entry %p (id=%u), count=%d", entry, id, q->count);
// ВАЖНО: проверка консистентности ДО коллбэка
#ifdef QUEUE_DEBUG
queue_check_consistency(q);// !!!! for debug - BEFORE callback
#endif
// Если очередь была пуста и коллбэки разрешены - вызвать коллбэк // Если очередь была пуста и коллбэки разрешены - вызвать коллбэк
if (q->count == 1 && !q->callback_suspended && q->callback) { if (q->count == 1 && !q->callback_suspended && q->callback) {
q->callback(q, q->callback_arg); q->callback(q, q->callback_arg);
@ -306,6 +317,10 @@ int queue_data_put_first(struct ll_queue* q, struct ll_entry* entry, uint32_t id
// Проверить ожидающие коллбэки // Проверить ожидающие коллбэки
check_waiters(q); check_waiters(q);
#ifdef QUEUE_DEBUG
queue_check_consistency(q);// !!!! for debug - AFTER callback
#endif
#ifdef QUEUE_DEBUG #ifdef QUEUE_DEBUG
queue_check_consistency(q);// !!!! for debug queue_check_consistency(q);// !!!! for debug
#endif #endif
@ -352,6 +367,13 @@ int queue_entry_count(struct ll_queue* q) {
int queue_check_consistency(struct ll_queue* q) { int queue_check_consistency(struct ll_queue* q) {
if (!q) return -1; // Недопустимая очередь if (!q) return -1; // Недопустимая очередь
// Проверка: если count > 0, то head не должен быть NULL
if (q->count > 0 && !q->head) {
DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "Queue '%s': count=%d but head is NULL!",
q->name ? q->name : "unknown", q->count);
return -1;
}
int actual_count = 0; int actual_count = 0;
size_t actual_bytes = 0; size_t actual_bytes = 0;
struct ll_entry* current = q->head; struct ll_entry* current = q->head;
@ -362,7 +384,8 @@ int queue_check_consistency(struct ll_queue* q) {
if (current->next) { if (current->next) {
if (current->next->prev != current) { if (current->next->prev != current) {
// Несоответствие в связях prev/next // Несоответствие в связях prev/next
DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "Prev/next error"); DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "Queue '%s': prev/next error at entry %p",
q->name ? q->name : "unknown", (void*)current);
return -1; return -1;
} }
} }
@ -371,13 +394,14 @@ int queue_check_consistency(struct ll_queue* q) {
// Проверить хвост // Проверить хвост
if (q->tail && q->tail->next != NULL) { if (q->tail && q->tail->next != NULL) {
DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "Tail error"); DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "Queue '%s': tail error", q->name ? q->name : "unknown");
return -1; // Хвост должен иметь next == NULL return -1; // Хвост должен иметь next == NULL
} }
// Сравнить с сохранёнными значениями // Сравнить с сохранёнными значениями
if (actual_count != q->count || actual_bytes != q->total_bytes) { if (actual_count != q->count || actual_bytes != q->total_bytes) {
DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, " count error entries: %d!=%d or bytes: %d!=%d", actual_count, q->count, actual_bytes, q->total_bytes); DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "Queue '%s': count error entries: %d!=%d or bytes: %zu!=%zu",
q->name ? q->name : "unknown", actual_count, q->count, actual_bytes, q->total_bytes);
return -1; return -1;
} }

3
lib/ll_queue.h

@ -97,6 +97,7 @@ struct queue_waiter {
* @brief Структура очереди. * @brief Структура очереди.
*/ */
struct ll_queue { struct ll_queue {
char* name;
struct ll_entry* head; ///< Голова очереди (отсюда извлекаем) struct ll_entry* head; ///< Голова очереди (отсюда извлекаем)
struct ll_entry* tail; ///< Хвост очереди (сюда добавляем) struct ll_entry* tail; ///< Хвост очереди (сюда добавляем)
int count; ///< Текущее количество элементов int count; ///< Текущее количество элементов
@ -124,7 +125,7 @@ struct ll_queue {
* @param hash_size размер хеш-таблицы (0 = поиск по id отключён) * @param hash_size размер хеш-таблицы (0 = поиск по id отключён)
* @return указатель на очередь или NULL при ошибке выделения памяти * @return указатель на очередь или NULL при ошибке выделения памяти
*/ */
struct ll_queue* queue_new(struct UASYNC* ua, size_t hash_size); struct ll_queue* queue_new(struct UASYNC* ua, size_t hash_size, char* name);
/** /**
* @brief Освобождает структуру очереди. * @brief Освобождает структуру очереди.

2
src/dummynet.c

@ -359,7 +359,7 @@ struct dummynet* dummynet_create(struct UASYNC* ua, const char* bind_ip, uint16_
/* Создаём очереди для направлений */ /* Создаём очереди для направлений */
for (int i = 0; i < DUMMYNET_DIR_COUNT; i++) { for (int i = 0; i < DUMMYNET_DIR_COUNT; i++) {
dn->dirs[i].queue = queue_new(ua, 0); dn->dirs[i].queue = queue_new(ua, 0, "dummynet1");
if (!dn->dirs[i].queue) { if (!dn->dirs[i].queue) {
DEBUG_ERROR(DEBUG_CATEGORY_DUMMYNET, "Failed to create queue for direction %d", i); DEBUG_ERROR(DEBUG_CATEGORY_DUMMYNET, "Failed to create queue for direction %d", i);
dummynet_destroy(dn); dummynet_destroy(dn);

12
src/etcp.c

@ -118,12 +118,12 @@ struct ETCP_CONN* etcp_connection_create(struct UTUN_INSTANCE* instance) {
} }
etcp->instance = instance; etcp->instance = instance;
etcp->input_queue = queue_new(instance->ua, 0); // No hash for input_queue etcp->input_queue = queue_new(instance->ua, 0, "ETCP input"); // No hash for input_queue
etcp->output_queue = queue_new(instance->ua, 0); // No hash for output_queue etcp->output_queue = queue_new(instance->ua, 0, "ETCP output"); // No hash for output_queue
etcp->input_send_q = queue_new(instance->ua, INFLIGHT_INITIAL_HASH_SIZE); // Hash for send_q etcp->input_send_q = queue_new(instance->ua, INFLIGHT_INITIAL_HASH_SIZE, "input_send_q"); // Hash for send_q
etcp->input_wait_ack = queue_new(instance->ua, INFLIGHT_INITIAL_HASH_SIZE); // Hash for wait_ack etcp->input_wait_ack = queue_new(instance->ua, INFLIGHT_INITIAL_HASH_SIZE, "input_wait_ack"); // Hash for wait_ack
etcp->recv_q = queue_new(instance->ua, INFLIGHT_INITIAL_HASH_SIZE); // Hash for send_q etcp->recv_q = queue_new(instance->ua, INFLIGHT_INITIAL_HASH_SIZE, "recv_q"); // Hash for send_q
etcp->ack_q = queue_new(instance->ua, 0); etcp->ack_q = queue_new(instance->ua, 0, "ack_q");
etcp->inflight_pool = memory_pool_init(sizeof(struct INFLIGHT_PACKET)); etcp->inflight_pool = memory_pool_init(sizeof(struct INFLIGHT_PACKET));
etcp->io_pool = memory_pool_init(sizeof(struct ETCP_FRAGMENT)); etcp->io_pool = memory_pool_init(sizeof(struct ETCP_FRAGMENT));
etcp->optimal_inflight=10000; etcp->optimal_inflight=10000;

33
src/pkt_normalizer.c

@ -21,6 +21,7 @@ static void pn_send_to_etcp(struct PKTNORM* pn);
// Initialization // Initialization
struct PKTNORM* pn_init(struct ETCP_CONN* etcp) { struct PKTNORM* pn_init(struct ETCP_CONN* etcp) {
DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "");
if (!etcp) return NULL; if (!etcp) return NULL;
if (etcp->mtu<200) { if (etcp->mtu<200) {
@ -41,14 +42,14 @@ struct PKTNORM* pn_init(struct ETCP_CONN* etcp) {
pn->frag_size = etcp->mtu - 100; // Use MTU as fixed packet size (adjust if headers need subtraction) pn->frag_size = etcp->mtu - 100; // Use MTU as fixed packet size (adjust if headers need subtraction)
pn->tx_wait_time = 10; pn->tx_wait_time = 10;
pn->input = queue_new(pn->ua, 0); // No hash needed pn->input = queue_new(pn->ua, 0, "pn_input"); // No hash needed
if (!pn->input) { if (!pn->input) {
DEBUG_ERROR(DEBUG_CATEGORY_NORMALIZER, "pn_init: queue_new(input) failed"); DEBUG_ERROR(DEBUG_CATEGORY_NORMALIZER, "pn_init: queue_new(input) failed");
pn_deinit(pn); pn_deinit(pn);
return NULL; return NULL;
} }
pn->output = queue_new(pn->ua, 0); // No hash needed pn->output = queue_new(pn->ua, 0, "pn_output"); // No hash needed
if (!pn->output) { if (!pn->output) {
DEBUG_ERROR(DEBUG_CATEGORY_NORMALIZER, "pn_init: queue_new(output) failed"); DEBUG_ERROR(DEBUG_CATEGORY_NORMALIZER, "pn_init: queue_new(output) failed");
pn_deinit(pn); pn_deinit(pn);
@ -68,6 +69,7 @@ struct PKTNORM* pn_init(struct ETCP_CONN* etcp) {
// Deinitialization // Deinitialization
void pn_deinit(struct PKTNORM* pn) { void pn_deinit(struct PKTNORM* pn) {
DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "");
if (!pn) return; if (!pn) return;
// Unregister from routing module // Unregister from routing module
@ -125,6 +127,7 @@ void pn_deinit(struct PKTNORM* pn) {
// Reset unpacker state // Reset unpacker state
void pn_unpacker_reset_state(struct PKTNORM* pn) { void pn_unpacker_reset_state(struct PKTNORM* pn) {
DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "");
if (!pn) return; if (!pn) return;
if (pn->recvpart) { if (pn->recvpart) {
queue_dgram_free(pn->recvpart); queue_dgram_free(pn->recvpart);
@ -135,6 +138,7 @@ void pn_unpacker_reset_state(struct PKTNORM* pn) {
// Reset packer and unpacker state (for reconnection) // Reset packer and unpacker state (for reconnection)
void pn_reset(struct PKTNORM* pn) { void pn_reset(struct PKTNORM* pn) {
DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "");
if (!pn) return; if (!pn) return;
// Cancel flush timer // Cancel flush timer
@ -150,15 +154,33 @@ void pn_reset(struct PKTNORM* pn) {
} }
pn->data_ptr = 0; pn->data_ptr = 0;
pn->data_size = 0; pn->data_size = 0;
pn->pending = NULL;
// Free pending if any
if (pn->pending) {
queue_dgram_free(pn->pending);
queue_entry_free(pn->pending);
pn->pending = NULL;
}
pn->pending_in_ptr = 0; pn->pending_in_ptr = 0;
// Reset unpacker state // Reset unpacker state
pn_unpacker_reset_state(pn); pn_unpacker_reset_state(pn);
// Drain input and output queues to discard pending data and maintain consistency
struct ll_entry* entry;
while ((entry = queue_data_get(pn->input)) != NULL) {
queue_dgram_free(entry);
queue_entry_free(entry);
}
while ((entry = queue_data_get(pn->output)) != NULL) {
queue_dgram_free(entry);
queue_entry_free(entry);
}
} }
// Send data to packer (copies and adds to input queue or pending, triggering callback) // Send data to packer (copies and adds to input queue or pending, triggering callback)
void pn_packer_send(struct PKTNORM* pn, uint8_t* data, uint16_t len) { void pn_packer_send(struct PKTNORM* pn, uint8_t* data, uint16_t len) {
DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "");
if (!pn || !data || len == 0) return; if (!pn || !data || len == 0) return;
struct ll_entry* entry = ll_alloc_lldgram(len); struct ll_entry* entry = ll_alloc_lldgram(len);
@ -182,6 +204,7 @@ void pn_packer_send(struct PKTNORM* pn, uint8_t* data, uint16_t len) {
// Internal: Packer callback // Internal: Packer callback
static void packer_cb(struct ll_queue* q, void* arg) { static void packer_cb(struct ll_queue* q, void* arg) {
DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "");
struct PKTNORM* pn = (struct PKTNORM*)arg; struct PKTNORM* pn = (struct PKTNORM*)arg;
if (!pn) return; if (!pn) return;
DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "input_q->pn: waiting etcp input threshold"); DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "input_q->pn: waiting etcp input threshold");
@ -190,6 +213,7 @@ static void packer_cb(struct ll_queue* q, void* arg) {
// Helper to send block to ETCP as ETCP_FRAGMENT // Helper to send block to ETCP as ETCP_FRAGMENT
static void pn_send_to_etcp(struct PKTNORM* pn) { static void pn_send_to_etcp(struct PKTNORM* pn) {
DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "");
if (!pn || !pn->data || pn->data_ptr == 0) return; if (!pn || !pn->data || pn->data_ptr == 0) return;
// DEBUG_INFO(DEBUG_CATEGORY_NORMALIZER, "pn_packer: pn_send_to_etcp"); // DEBUG_INFO(DEBUG_CATEGORY_NORMALIZER, "pn_packer: pn_send_to_etcp");
@ -218,6 +242,7 @@ static void pn_send_to_etcp(struct PKTNORM* pn) {
// Internal: Renew sndpart buffer // Internal: Renew sndpart buffer
static void pn_buf_renew(struct PKTNORM* pn) { static void pn_buf_renew(struct PKTNORM* pn) {
DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "");
if (pn->data) { if (pn->data) {
int remain = pn->data_size - pn->data_ptr; int remain = pn->data_size - pn->data_ptr;
if (remain < 3) pn_send_to_etcp(pn); if (remain < 3) pn_send_to_etcp(pn);
@ -238,6 +263,7 @@ static void pn_buf_renew(struct PKTNORM* pn) {
// Internal: Process input when etcp->input_queue is ready (empty) // Internal: Process input when etcp->input_queue is ready (empty)
static void etcp_input_ready_cb(struct ll_queue* q, void* arg) { static void etcp_input_ready_cb(struct ll_queue* q, void* arg) {
DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "");
struct PKTNORM* pn = (struct PKTNORM*)arg; struct PKTNORM* pn = (struct PKTNORM*)arg;
if (!pn) return; if (!pn) return;
@ -286,6 +312,7 @@ exit:
// Internal: Flush callback on timeout // Internal: Flush callback on timeout
static void pn_flush_cb(void* arg) { static void pn_flush_cb(void* arg) {
DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "");
struct PKTNORM* pn = (struct PKTNORM*)arg; struct PKTNORM* pn = (struct PKTNORM*)arg;
if (!pn) return; if (!pn) return;

2
src/route_bgp.c

@ -460,7 +460,7 @@ struct ROUTE_BGP* route_bgp_init(struct UTUN_INSTANCE* instance)
bgp->instance = instance; bgp->instance = instance;
// Создаем очередь для рассылки // Создаем очередь для рассылки
bgp->senders_list = queue_new(instance->ua, 0); bgp->senders_list = queue_new(instance->ua, 0, "BGP sender_list");
if (!bgp->senders_list) { if (!bgp->senders_list) {
DEBUG_ERROR(DEBUG_CATEGORY_BGP, "Failed to create senders queue"); DEBUG_ERROR(DEBUG_CATEGORY_BGP, "Failed to create senders queue");
u_free(bgp); u_free(bgp);

4
src/tun_if.c

@ -148,8 +148,8 @@ struct tun_if* tun_init(struct UASYNC* ua, struct utun_config* config)
tun->pool = memory_pool_init(sizeof(struct ll_entry)); tun->pool = memory_pool_init(sizeof(struct ll_entry));
if (!tun->pool) goto fail; if (!tun->pool) goto fail;
tun->output_queue = queue_new(ua, 0); tun->output_queue = queue_new(ua, 0, "TUN output");
tun->input_queue = queue_new(ua, 0); tun->input_queue = queue_new(ua, 0, "TUN input");
if (!tun->output_queue || !tun->input_queue) goto fail; if (!tun->output_queue || !tun->input_queue) goto fail;
queue_set_callback(tun->input_queue, tun_input_queue_callback, tun); queue_set_callback(tun->input_queue, tun_input_queue_callback, tun);

2
tests/debug_simple.c

@ -12,7 +12,7 @@ typedef struct {
int main() { int main() {
struct UASYNC* ua = uasync_create(); struct UASYNC* ua = uasync_create();
struct ll_queue* q = queue_new(ua, 0); struct ll_queue* q = queue_new(ua, 0,"s1");
/* Create test data */ /* Create test data */
test_data_t* data1 = (test_data_t*)queue_entry_new(sizeof(test_data_t)); test_data_t* data1 = (test_data_t*)queue_entry_new(sizeof(test_data_t));

4
tests/test_intensive_memory_pool.c

@ -27,7 +27,7 @@ static double test_without_pools(int iterations) {
clock_t start = clock(); clock_t start = clock();
struct UASYNC* ua = uasync_create(); struct UASYNC* ua = uasync_create();
struct ll_queue* queue = queue_new(ua, 0); // Без пулов struct ll_queue* queue = queue_new(ua, 0,"q1"); // Без пулов
for (int cycle = 0; cycle < iterations; cycle++) { for (int cycle = 0; cycle < iterations; cycle++) {
// Создать много waiters // Создать много waiters
@ -70,7 +70,7 @@ static double test_with_pools(int iterations) {
clock_t start = clock(); clock_t start = clock();
struct UASYNC* ua = uasync_create(); struct UASYNC* ua = uasync_create();
struct ll_queue* queue = queue_new(ua, 0); // С пулами struct ll_queue* queue = queue_new(ua, 0,"q2"); // С пулами
// Создать пул памяти для данных // Создать пул памяти для данных
struct memory_pool* pool = memory_pool_init(sizeof(struct ll_entry) + 64); struct memory_pool* pool = memory_pool_init(sizeof(struct ll_entry) + 64);

4
tests/test_intensive_memory_pool_new.c

@ -27,7 +27,7 @@ static double test_without_pools(int iterations) {
clock_t start = clock(); clock_t start = clock();
struct UASYNC* ua = uasync_create(); struct UASYNC* ua = uasync_create();
struct ll_queue* queue = queue_new(ua, 0); // Без пулов struct ll_queue* queue = queue_new(ua, 0,"q1"); // Без пулов
for (int cycle = 0; cycle < iterations; cycle++) { for (int cycle = 0; cycle < iterations; cycle++) {
// Создать много waiters // Создать много waiters
@ -70,7 +70,7 @@ static double test_with_pools(int iterations) {
clock_t start = clock(); clock_t start = clock();
struct UASYNC* ua = uasync_create(); struct UASYNC* ua = uasync_create();
struct ll_queue* queue = queue_new(ua, 0); // С пулами struct ll_queue* queue = queue_new(ua, 0,"q2"); // С пулами
// Создать пул памяти для данных // Создать пул памяти для данных
struct memory_pool* pool = memory_pool_init(sizeof(struct ll_entry) + 64); struct memory_pool* pool = memory_pool_init(sizeof(struct ll_entry) + 64);

18
tests/test_ll_queue.c

@ -77,8 +77,8 @@ static void waiter_cb(struct ll_queue *q, void *arg) {
static void test_basic(void) { static void test_basic(void) {
TEST("basic creation / free"); TEST("basic creation / free");
struct UASYNC *ua = uasync_create(); struct UASYNC *ua = uasync_create();
struct ll_queue *q1 = queue_new(ua, 0); struct ll_queue *q1 = queue_new(ua, 0,"q1");
struct ll_queue *q2 = queue_new(ua, 16); struct ll_queue *q2 = queue_new(ua, 16,"q2");
ASSERT(q1 && q2, "queue_new failed"); ASSERT(q1 && q2, "queue_new failed");
ASSERT_EQ(queue_entry_count(q1), 0, ""); ASSERT_EQ(queue_entry_count(q1), 0, "");
queue_free(q1); queue_free(q2); queue_free(q1); queue_free(q2);
@ -89,7 +89,7 @@ static void test_basic(void) {
static void test_fifo(void) { static void test_fifo(void) {
TEST("FIFO ordering"); TEST("FIFO ordering");
struct UASYNC *ua = uasync_create(); struct UASYNC *ua = uasync_create();
struct ll_queue *q = queue_new(ua, 0); struct ll_queue *q = queue_new(ua, 0,"q3");
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
test_data_t *d = (test_data_t*)queue_entry_new(sizeof(test_data_t)); test_data_t *d = (test_data_t*)queue_entry_new(sizeof(test_data_t));
@ -112,7 +112,7 @@ static void test_fifo(void) {
static void test_lifo_priority(void) { static void test_lifo_priority(void) {
TEST("put_first (LIFO priority)"); TEST("put_first (LIFO priority)");
struct UASYNC *ua = uasync_create(); struct UASYNC *ua = uasync_create();
struct ll_queue *q = queue_new(ua, 0); struct ll_queue *q = queue_new(ua, 0,"q4");
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
test_data_t *d = (test_data_t*)queue_entry_new(sizeof(*d)); test_data_t *d = (test_data_t*)queue_entry_new(sizeof(*d));
@ -142,7 +142,7 @@ static void test_lifo_priority(void) {
static void test_callback(void) { static void test_callback(void) {
TEST("callback serial processing"); TEST("callback serial processing");
struct UASYNC *ua = uasync_create(); struct UASYNC *ua = uasync_create();
struct ll_queue *q = queue_new(ua, 0); struct ll_queue *q = queue_new(ua, 0,"q5");
int cnt = 0; int cnt = 0;
queue_set_callback(q, queue_cb, &cnt); queue_set_callback(q, queue_cb, &cnt);
@ -164,7 +164,7 @@ static void test_callback(void) {
static void test_waiter(void) { static void test_waiter(void) {
TEST("wait_threshold + cancel"); TEST("wait_threshold + cancel");
struct UASYNC *ua = uasync_create(); struct UASYNC *ua = uasync_create();
struct ll_queue *q = queue_new(ua, 0); struct ll_queue *q = queue_new(ua, 0,"q6");
int called = 0; int called = 0;
struct queue_waiter *w = queue_wait_threshold(q, 2, 0, waiter_cb, &called); struct queue_waiter *w = queue_wait_threshold(q, 2, 0, waiter_cb, &called);
@ -192,7 +192,7 @@ static void test_waiter(void) {
static void test_limits_hash(void) { static void test_limits_hash(void) {
TEST("size limit + hash find/remove"); TEST("size limit + hash find/remove");
struct UASYNC *ua = uasync_create(); struct UASYNC *ua = uasync_create();
struct ll_queue *q = queue_new(ua, 16); struct ll_queue *q = queue_new(ua, 16,"q7");
queue_set_size_limit(q, 3); queue_set_size_limit(q, 3);
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
@ -216,7 +216,7 @@ static void test_pool(void) {
TEST("memory_pool integration + reuse"); TEST("memory_pool integration + reuse");
struct UASYNC *ua = uasync_create(); struct UASYNC *ua = uasync_create();
struct memory_pool *pool = memory_pool_init(sizeof(test_data_t)); struct memory_pool *pool = memory_pool_init(sizeof(test_data_t));
struct ll_queue *q = queue_new(ua, 0); struct ll_queue *q = queue_new(ua, 0,"q8");
size_t alloc1 = 0, reuse1 = 0; size_t alloc1 = 0, reuse1 = 0;
memory_pool_get_stats(pool, &alloc1, &reuse1); memory_pool_get_stats(pool, &alloc1, &reuse1);
@ -251,7 +251,7 @@ static void test_stress(void) {
TEST("stress 10k ops"); TEST("stress 10k ops");
double start = now_ms(); double start = now_ms();
struct UASYNC *ua = uasync_create(); struct UASYNC *ua = uasync_create();
struct ll_queue *q = queue_new(ua, 64); struct ll_queue *q = queue_new(ua, 64,"q9");
for (int i = 0; i < 10000; i++) { for (int i = 0; i < 10000; i++) {
if (queue_entry_count(q) > 80) { if (queue_entry_count(q) > 80) {

2
tests/test_memory_pool_and_config.c

@ -43,7 +43,7 @@ int main() {
} }
// Create queue with memory pools enabled // Create queue with memory pools enabled
struct ll_queue* queue = queue_new(ua, 0); struct ll_queue* queue = queue_new(ua, 0,"q1");
if (!queue) { if (!queue) {
DEBUG_ERROR(DEBUG_CATEGORY_MEMORY, "Failed to create queue with pools"); DEBUG_ERROR(DEBUG_CATEGORY_MEMORY, "Failed to create queue with pools");
uasync_destroy(ua, 0); uasync_destroy(ua, 0);

4
tests/test_pkt_normalizer_standalone.c

@ -182,8 +182,8 @@ static int init_mock_etcp(void) {
} }
// Create queues // Create queues
mock_etcp.input_queue = queue_new(mock_instance.ua, 0); mock_etcp.input_queue = queue_new(mock_instance.ua, 0,"q1");
mock_etcp.output_queue = queue_new(mock_instance.ua, 0); mock_etcp.output_queue = queue_new(mock_instance.ua, 0,"q2");
if (!mock_etcp.input_queue || !mock_etcp.output_queue) { if (!mock_etcp.input_queue || !mock_etcp.output_queue) {
printf("Failed to create queues\n"); printf("Failed to create queues\n");

Loading…
Cancel
Save