// ll_queue.c - Упрощенная архитектура: разделение создания элементов и работы с очередью #include #include #include #include #include "ll_queue.h" #include "u_async.h" #include "debug_config.h" #include "mem.h" #ifdef _WIN32 #include #else #include #endif // ==================== Thread safety check ==================== #ifdef QUEUE_THREAD_CHECK static inline void queue_check_thread(struct ll_queue* q) { if (!q) return; #ifdef _WIN32 DWORD current = GetCurrentThreadId(); if (q->owner_thread != current) { DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "Queue '%s': thread mismatch! owner=%lu current=%lu", q->name ? q->name : "unknown", (unsigned long)q->owner_thread, (unsigned long)current); printf("ERROR: Queue '%s' accessed from wrong thread!\n", q->name ? q->name : "unknown"); abort(); } #else pthread_t current = pthread_self(); if (!pthread_equal(q->owner_thread, current)) { DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "Queue '%s': thread mismatch! owner=%lu current=%lu", q->name ? q->name : "unknown", (unsigned long)q->owner_thread, (unsigned long)current); printf("ERROR: Queue '%s' accessed from wrong thread!\n", q->name ? q->name : "unknown"); abort(); } #endif } #endif // Предварительные объявления внутренних функций static void queue_resume_timeout_cb(void* arg); static void check_waiters(struct ll_queue* q); static void add_to_hash(struct ll_queue* q, struct ll_entry* entry); static void remove_from_hash(struct ll_queue* q, struct ll_entry* entry); // ==================== Управление очередью ==================== struct ll_queue* queue_new(struct UASYNC* ua, size_t hash_size, char* name) { if (!ua) return NULL; struct ll_queue* q = u_calloc(1, sizeof(struct ll_queue)); if (!q) return NULL; q->name = name; q->ua = ua; q->size_limit = -1; // Без ограничения по умолчанию q->hash_size = hash_size; #ifdef QUEUE_THREAD_CHECK #ifdef _WIN32 q->owner_thread = GetCurrentThreadId(); #else q->owner_thread = pthread_self(); #endif #endif // Создать хеш-таблицу если нужно if (hash_size > 0) { q->hash_table = u_calloc(hash_size, sizeof(struct ll_entry*)); if (!q->hash_table) { u_free(q); return NULL; } } DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_new: created queue %p, hash_size=%zu", q, hash_size); return q; } struct ll_entry* ll_alloc_lldgram(uint16_t len) { struct ll_entry* entry = queue_entry_new(0); if (!entry) return NULL; entry->len=0; entry->memlen=len; entry->dgram = u_malloc(len); // entry->dgram_pool = NULL; - уже null (memset) // entry->dgram_free_fn = NULL; if (!entry->dgram) { queue_entry_free(entry); return NULL; } return entry; } void queue_free(struct ll_queue* q) { if (!q) return; DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_free: freeing queue %p, head=%p, tail=%p, count=%d", q, q->head, q->tail, q->count); // ВАЖНО: Не освобождаем элементы в очереди - они должны быть извлечены отдельно // Это упрощает архитектуру и предотвращает double-u_free // Освободить хеш-таблицу if (q->hash_table) { u_free(q->hash_table); } // Отменить отложенное возобновление if (q->resume_timeout_id) { uasync_call_soon_cancel(q->ua, q->resume_timeout_id); q->resume_timeout_id = NULL; } u_free(q); } // ==================== Конфигурация очереди ==================== void queue_set_callback(struct ll_queue* q, queue_callback_fn cbk_fn, void* arg) { if (!q) return; q->callback = cbk_fn; q->callback_arg = arg; } static void queue_resume_timeout_cb(void* arg) { struct ll_queue* q = (struct ll_queue*)arg; if (!q) return; q->resume_timeout_id = NULL; // Вызвать коллбэк если есть элементы и коллбэки разрешены if (q->head && !q->callback_suspended && q->callback) { q->callback(q, q->callback_arg); } } void queue_resume_callback(struct ll_queue* q) { if (!q) return; q->callback_suspended = 0; // Если есть элементы, запланировать вызов коллбэка if (q->head && q->callback && !q->resume_timeout_id) { q->resume_timeout_id = uasync_call_soon(q->ua, q, queue_resume_timeout_cb); } } void queue_set_size_limit(struct ll_queue* q, int lim) { if (!q) return; q->size_limit = lim; } // ==================== Управление элементами ==================== struct ll_entry* queue_entry_new(size_t data_size) { struct ll_entry* entry = u_malloc(sizeof(struct ll_entry) + data_size); if (!entry) return NULL; memset(entry, 0, sizeof(struct ll_entry) + data_size); entry->size = data_size; entry->len = 0; entry->pool = NULL; // Выделено через u_malloc // DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_new: created entry %p, size=%zu", entry, data_size); return entry; } struct ll_entry* queue_entry_new_from_pool(struct memory_pool* pool) { if (!pool) return NULL; struct ll_entry* entry = memory_pool_alloc(pool); if (!entry) return NULL; memset(entry, 0, pool->object_size); entry->size = pool->object_size - sizeof(struct ll_entry); entry->len = 0; entry->pool = pool; // Выделено из пула // DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_new_from_pool: created entry %p from pool %p", entry, pool); return entry; } //void ll_u_free_dgram(struct ll_entry* entry) { void queue_dgram_free(struct ll_entry* entry) { if (!entry) return; if (entry->dgram) { if (entry->dgram_free_fn) { entry->dgram_free_fn(entry->dgram); // arg=NULL, если не задан } else if (entry->dgram_pool) { memory_pool_free(entry->dgram_pool, entry->dgram); } else { u_free(entry->dgram); } entry->dgram = NULL; entry->len = 0; // Опционально сброс len } } void queue_entry_free(struct ll_entry* entry) { if (!entry) return; if (entry->pool) { memory_pool_free(entry->pool, entry); } else { u_free(entry); } } // ==================== Операции с очередью ==================== // Внутренняя функция добавления в хеш-таблицу static void add_to_hash(struct ll_queue* q, struct ll_entry* entry) { if (!q || !entry) return; if (q->hash_size == 0 || entry->index_size == 0) { DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "[%s] Empty hash_size=%d or index_size=%d",q->name, q->hash_size, entry->index_size); return; } uint32_t slot = entry->index_hash % q->hash_size; entry->hash_next = q->hash_table[slot]; q->hash_table[slot] = entry; } static void remove_from_hash(struct ll_queue* q, struct ll_entry* entry) { if (!q || q->hash_size == 0 || !entry) return; uint32_t slot = entry->index_hash % q->hash_size; // теперь используем index_hash struct ll_entry** ptr = &q->hash_table[slot]; while (*ptr) { if (*ptr == entry) { *ptr = entry->hash_next; entry->hash_next = NULL; return; } ptr = &(*ptr)->hash_next; } } static uint32_t make_hash(const void* data, uint16_t len) {// алгоритм FNV-1a if (len == 0 || data == NULL) return 0; const uint8_t* p = (const uint8_t*)data; uint32_t hash = 0x811C9DC5u; // FNV-1a offset basis (32-bit) for (uint16_t i = 0; i < len; ++i) { hash ^= (uint32_t)p[i]; // XOR с байтом hash *= 0x01000193u; // FNV-1a prime } return hash; } // Проверить и запустить ожидающие коллбэки static void check_waiters(struct ll_queue* q) { if (!q) return; // DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "check_waiters: checking waiters, count=%d, bytes=%zu", q->count, q->total_bytes); struct queue_waiter* waiter = &q->waiter; if (waiter->callback) { // Проверить условие: не больше max_packets и не больше max_bytes // max_bytes = 0 означает "не проверять байты" if (q->count <= waiter->max_packets && (waiter->max_bytes == 0 || q->total_bytes <= waiter->max_bytes)) { DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "check_waiters: condition met, calling callback, count=%d<=%d, bytes=%zu<=%zu (max_bytes_check=%s)", q->count, waiter->max_packets, q->total_bytes, waiter->max_bytes, waiter->max_bytes == 0 ? "disabled" : "enabled"); waiter->callback(q, waiter->callback_arg); memset(waiter, 0, sizeof(*waiter)); } } } int queue_data_put(struct ll_queue* q, struct ll_entry* entry) { if (!q || !entry) return -1; #ifdef QUEUE_THREAD_CHECK queue_check_thread(q); #endif entry->index_offset = 0; entry->index_size = 0; entry->index_hash = 0; if (q->hash_size > 0) { DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "[%s] Put no-hash data to hash queue",q->name); queue_dgram_free(entry); queue_entry_free(entry); return -1; } if (q->size_limit >= 0 && q->count >= q->size_limit) { queue_dgram_free(entry); queue_entry_free(entry); return -1; } // Добавляем в конец entry->next = NULL; entry->prev = q->tail; if (q->tail) q->tail->next = entry; else q->head = entry; q->tail = entry; q->count++; entry->int_len = entry->len; q->total_bytes += entry->int_len; // НЕ вызываем add_to_hash if (q->count == 1 && !q->callback_suspended && q->callback) { q->callback(q, q->callback_arg); } #ifdef QUEUE_DEBUG queue_check_consistency(q); #endif return 0; } int queue_data_put_with_index(struct ll_queue* q, struct ll_entry* entry, uint16_t index_offset, uint16_t index_size) { if (!q || !entry) return -1; #ifdef QUEUE_THREAD_CHECK queue_check_thread(q); #endif if (index_size == 0 || (size_t)index_offset + index_size > (size_t)entry->size) { DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "[%s] invalid index: offset=%u size=%u (data_size=%u)",q->name, index_offset, index_size, entry->size); queue_dgram_free(entry); queue_entry_free(entry); return -1; } entry->index_offset = index_offset; entry->index_size = index_size; entry->index_hash = make_hash(entry->data + index_offset, index_size); if (q->size_limit >= 0 && q->count >= q->size_limit) { queue_dgram_free(entry); queue_entry_free(entry); return -1; } // Добавляем в конец entry->next = NULL; entry->prev = q->tail; if (q->tail) q->tail->next = entry; else q->head = entry; q->tail = entry; q->count++; entry->int_len = entry->len; q->total_bytes += entry->int_len; add_to_hash(q, entry); if (q->count == 1 && !q->callback_suspended && q->callback) { q->callback(q, q->callback_arg); } #ifdef QUEUE_DEBUG queue_check_consistency(q); #endif return 0; } int queue_data_put_first(struct ll_queue* q, struct ll_entry* entry) { if (!q || !entry) return -1; #ifdef QUEUE_THREAD_CHECK queue_check_thread(q); #endif entry->index_offset = 0; entry->index_size = 0; entry->index_hash = 0; if (q->hash_size > 0) { DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "[%s] Put no-hash data to hash queue",q->name); queue_dgram_free(entry); queue_entry_free(entry); return -1; } if (q->size_limit >= 0 && q->count >= q->size_limit) { queue_dgram_free(entry); queue_entry_free(entry); // как было в оригинале return -1; } entry->next = q->head; entry->prev = NULL; if (q->head) q->head->prev = entry; else q->tail = entry; q->head = entry; q->count++; entry->int_len = entry->len; q->total_bytes += entry->int_len; if (q->count == 1 && !q->callback_suspended && q->callback) { q->callback(q, q->callback_arg); } #ifdef QUEUE_DEBUG queue_check_consistency(q); #endif return 0; } // С хешем для put_first int queue_data_put_first_with_index(struct ll_queue* q, struct ll_entry* entry, uint16_t index_offset, uint16_t index_size) { if (!q || !entry) return -1; #ifdef QUEUE_THREAD_CHECK queue_check_thread(q); #endif if (index_size == 0 || (size_t)index_offset + index_size > (size_t)entry->size) { DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "[%s] invalid index: offset=%u size=%u (data_size=%u)",q->name, index_offset, index_size, entry->size); queue_dgram_free(entry); queue_entry_free(entry); return -1; } entry->index_offset = index_offset; entry->index_size = index_size; entry->index_hash = make_hash(entry->data + index_offset, index_size); if (q->size_limit >= 0 && q->count >= q->size_limit) { queue_dgram_free(entry); queue_entry_free(entry); return -1; } entry->next = q->head; entry->prev = NULL; if (q->head) q->head->prev = entry; else q->tail = entry; q->head = entry; q->count++; entry->int_len = entry->len; q->total_bytes += entry->int_len; add_to_hash(q, entry); if (q->count == 1 && !q->callback_suspended && q->callback) { q->callback(q, q->callback_arg); } #ifdef QUEUE_DEBUG queue_check_consistency(q); #endif return 0; } struct ll_entry* queue_data_get(struct ll_queue* q) { if (!q || !q->head) return NULL; #ifdef QUEUE_THREAD_CHECK queue_check_thread(q); #endif struct ll_entry* entry = q->head; q->head = entry->next; if (q->head) q->head->prev = NULL; if (!q->head) q->tail = NULL; q->count--; q->total_bytes -= entry->int_len; entry->next = NULL; entry->prev = NULL; remove_from_hash(q, entry); // DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_data_get: got entry %p (id=%u), count=%d", entry, entry->id, q->count); // Приостановить коллбэки для предотвращения рекурсии q->callback_suspended = 1; // Проверить ожидающие коллбэки check_waiters(q); #ifdef QUEUE_DEBUG queue_check_consistency(q);// !!!! for debug #endif return entry; } int queue_entry_count(struct ll_queue* q) { return q ? q->count : 0; } // Функция проверки консистентности count и total_bytes // Возвращает 0 если ok, -1 если есть несоответствия int queue_check_consistency(struct ll_queue* q) { if (!q) return -1; // Недопустимая очередь // Проверка: если count > 0, то head не должен быть NULL if (q->count > 0 && !q->head) { DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "Queue '%s': count=%d but head is NULL!", q->name ? q->name : "unknown", q->count); return -1; } int actual_count = 0; size_t actual_bytes = 0; struct ll_entry* current = q->head; while (current) { actual_count++; actual_bytes += current->int_len; if (current->next) { if (current->next->prev != current) { // Несоответствие в связях prev/next DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "Queue '%s': prev/next error at entry %p != %p entries: %d!=%d bytes: %zu!=%zu", q->name ? q->name : "unknown", (void*)current, (void*)current->next->prev, actual_count, q->count, actual_bytes, q->total_bytes); return -1; } } current = current->next; } // Проверить хвост if (q->tail && q->tail->next != NULL) { DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "Queue '%s': tail error", q->name ? q->name : "unknown"); return -1; // Хвост должен иметь next == NULL } // Сравнить с сохранёнными значениями if (actual_count != q->count || actual_bytes != q->total_bytes) { DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "Queue '%s': count error entries: %d!=%d or bytes: %zu!=%zu", q->name ? q->name : "unknown", actual_count, q->count, actual_bytes, q->total_bytes); return -1; } return 0; } // ==================== Асинхронное ожидание ==================== struct queue_waiter* queue_wait_threshold(struct ll_queue* q, int max_packets, size_t max_bytes, queue_threshold_callback_fn callback, void* arg) { if (!q || !callback) return NULL; struct queue_waiter* waiter = &q->waiter; // Проверить условие немедленно if (q->count <= max_packets && (max_bytes == 0 || q->total_bytes <= max_bytes)) { // Условие уже выполнено - вызвать коллбэк немедленно callback(q, arg); return NULL; } // Установить waiter для отложенного вызова waiter->max_packets = max_packets; waiter->max_bytes = max_bytes; waiter->callback = callback; waiter->callback_arg = arg; return waiter; } void queue_cancel_wait(struct ll_queue* q, struct queue_waiter* waiter) { if (!q || !waiter || waiter != &q->waiter) return; memset(waiter, 0, sizeof(*waiter)); } // ==================== Поиск и удаление по ID ==================== struct ll_entry* queue_find_data_by_index(struct ll_queue* q, const void* index_key, uint16_t index_size) { if (!q || q->hash_size == 0 || !q->hash_table || index_size == 0 || !index_key) return NULL; uint32_t hash_val = make_hash(index_key, index_size); uint32_t slot = hash_val % q->hash_size; struct ll_entry* entry = q->hash_table[slot]; while (entry) { if (entry->index_size == index_size && (size_t)entry->index_offset + index_size <= (size_t)entry->size && memcmp(entry->data + entry->index_offset, index_key, index_size) == 0) { return entry; } entry = entry->hash_next; } return NULL; } int queue_remove_data(struct ll_queue* q, struct ll_entry* entry) { if (!q || !entry) return -1; #ifdef QUEUE_THREAD_CHECK queue_check_thread(q); #endif // Удалить из двусвязного списка if (entry->prev) { entry->prev->next = entry->next; } else { q->head = entry->next; } if (entry->next) { entry->next->prev = entry->prev; } else { q->tail = entry->prev; } q->count--; q->total_bytes -= entry->int_len; entry->next = NULL; entry->prev = NULL; remove_from_hash(q, entry); // DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_remove_data: removed entry %p (id=%u), count=%d", entry, entry->id, q->count); #ifdef QUEUE_DEBUG queue_check_consistency(q);// !!!! for debug #endif check_waiters(q); return 0; }