You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

637 lines
20 KiB

// ll_queue.c - Упрощенная архитектура: разделение создания элементов и работы с очередью
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <assert.h>
#include "ll_queue.h"
#include "u_async.h"
#include "debug_config.h"
#include "mem.h"
#ifdef _WIN32
#include <windows.h>
#else
#include <pthread.h>
#endif
// ==================== Thread safety check ====================
#ifdef QUEUE_THREAD_CHECK
static inline void queue_check_thread(struct ll_queue* q) {
if (!q) return;
#ifdef _WIN32
DWORD current = GetCurrentThreadId();
if (q->owner_thread != current) {
DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "Queue '%s': thread mismatch! owner=%lu current=%lu",
q->name ? q->name : "unknown",
(unsigned long)q->owner_thread,
(unsigned long)current);
printf("ERROR: Queue '%s' accessed from wrong thread!\n", q->name ? q->name : "unknown");
abort();
}
#else
pthread_t current = pthread_self();
if (!pthread_equal(q->owner_thread, current)) {
DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "Queue '%s': thread mismatch! owner=%lu current=%lu",
q->name ? q->name : "unknown",
(unsigned long)q->owner_thread,
(unsigned long)current);
printf("ERROR: Queue '%s' accessed from wrong thread!\n", q->name ? q->name : "unknown");
abort();
}
#endif
}
#endif
// Предварительные объявления внутренних функций
static void queue_resume_timeout_cb(void* arg);
static void check_waiters(struct ll_queue* q);
static void add_to_hash(struct ll_queue* q, struct ll_entry* entry);
static void remove_from_hash(struct ll_queue* q, struct ll_entry* entry);
// ==================== Управление очередью ====================
struct ll_queue* queue_new(struct UASYNC* ua, size_t hash_size, char* name) {
if (!ua) return NULL;
struct ll_queue* q = u_calloc(1, sizeof(struct ll_queue));
if (!q) return NULL;
q->name = name;
q->ua = ua;
q->size_limit = -1; // Без ограничения по умолчанию
q->hash_size = hash_size;
#ifdef QUEUE_THREAD_CHECK
#ifdef _WIN32
q->owner_thread = GetCurrentThreadId();
#else
q->owner_thread = pthread_self();
#endif
#endif
// Создать хеш-таблицу если нужно
if (hash_size > 0) {
q->hash_table = u_calloc(hash_size, sizeof(struct ll_entry*));
if (!q->hash_table) {
u_free(q);
return NULL;
}
}
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_new: created queue %p, hash_size=%zu", q, hash_size);
return q;
}
struct ll_entry* ll_alloc_lldgram(uint16_t len) {
struct ll_entry* entry = queue_entry_new(0);
if (!entry) return NULL;
entry->len=0;
entry->memlen=len;
entry->dgram = u_malloc(len);
// entry->dgram_pool = NULL; - уже null (memset)
// entry->dgram_free_fn = NULL;
if (!entry->dgram) {
queue_entry_free(entry);
return NULL;
}
return entry;
}
void queue_free(struct ll_queue* q) {
if (!q) return;
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_free: freeing queue %p, head=%p, tail=%p, count=%d",
q, q->head, q->tail, q->count);
// ВАЖНО: Не освобождаем элементы в очереди - они должны быть извлечены отдельно
// Это упрощает архитектуру и предотвращает double-u_free
// Освободить хеш-таблицу
if (q->hash_table) {
u_free(q->hash_table);
}
// Отменить отложенное возобновление
if (q->resume_timeout_id) {
uasync_call_soon_cancel(q->ua, q->resume_timeout_id);
q->resume_timeout_id = NULL;
}
u_free(q);
}
// ==================== Конфигурация очереди ====================
void queue_set_callback(struct ll_queue* q, queue_callback_fn cbk_fn, void* arg) {
if (!q) return;
q->callback = cbk_fn;
q->callback_arg = arg;
}
static void queue_resume_timeout_cb(void* arg) {
struct ll_queue* q = (struct ll_queue*)arg;
if (!q) return;
q->resume_timeout_id = NULL;
// Вызвать коллбэк если есть элементы и коллбэки разрешены
if (q->head && !q->callback_suspended && q->callback) {
q->callback(q, q->callback_arg);
}
}
void queue_resume_callback(struct ll_queue* q) {
if (!q) return;
q->callback_suspended = 0;
// Если есть элементы, запланировать вызов коллбэка
if (q->head && q->callback && !q->resume_timeout_id) {
q->resume_timeout_id = uasync_call_soon(q->ua, q, queue_resume_timeout_cb);
}
}
void queue_set_size_limit(struct ll_queue* q, int lim) {
if (!q) return;
q->size_limit = lim;
}
// ==================== Управление элементами ====================
struct ll_entry* queue_entry_new(size_t data_size) {
struct ll_entry* entry = u_malloc(sizeof(struct ll_entry) + data_size);
if (!entry) return NULL;
memset(entry, 0, sizeof(struct ll_entry) + data_size);
entry->size = data_size;
entry->len = 0;
entry->pool = NULL; // Выделено через u_malloc
// DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_new: created entry %p, size=%zu", entry, data_size);
return entry;
}
struct ll_entry* queue_entry_new_from_pool(struct memory_pool* pool) {
if (!pool) return NULL;
struct ll_entry* entry = memory_pool_alloc(pool);
if (!entry) return NULL;
memset(entry, 0, pool->object_size);
entry->size = pool->object_size - sizeof(struct ll_entry);
entry->len = 0;
entry->pool = pool; // Выделено из пула
// DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_new_from_pool: created entry %p from pool %p", entry, pool);
return entry;
}
//void ll_u_free_dgram(struct ll_entry* entry) {
void queue_dgram_free(struct ll_entry* entry) {
if (!entry) return;
if (entry->dgram) {
if (entry->dgram_free_fn) {
entry->dgram_free_fn(entry->dgram); // arg=NULL, если не задан
} else if (entry->dgram_pool) {
memory_pool_free(entry->dgram_pool, entry->dgram);
} else {
u_free(entry->dgram);
}
entry->dgram = NULL;
entry->len = 0; // Опционально сброс len
}
}
void queue_entry_free(struct ll_entry* entry) {
if (!entry) return;
if (entry->pool) {
memory_pool_free(entry->pool, entry);
} else {
u_free(entry);
}
}
// ==================== Операции с очередью ====================
// Внутренняя функция добавления в хеш-таблицу
static void add_to_hash(struct ll_queue* q, struct ll_entry* entry) {
if (!q || !entry) return;
if (q->hash_size == 0 || entry->index_size == 0) {
DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "[%s] Empty hash_size=%d or index_size=%d",q->name, q->hash_size, entry->index_size);
return;
}
uint32_t slot = entry->index_hash % q->hash_size;
entry->hash_next = q->hash_table[slot];
q->hash_table[slot] = entry;
}
static void remove_from_hash(struct ll_queue* q, struct ll_entry* entry) {
if (!q || q->hash_size == 0 || !entry) return;
uint32_t slot = entry->index_hash % q->hash_size; // теперь используем index_hash
struct ll_entry** ptr = &q->hash_table[slot];
while (*ptr) {
if (*ptr == entry) {
*ptr = entry->hash_next;
entry->hash_next = NULL;
return;
}
ptr = &(*ptr)->hash_next;
}
}
static uint32_t make_hash(const void* data, uint16_t len) {// алгоритм FNV-1a
if (len == 0 || data == NULL) return 0;
const uint8_t* p = (const uint8_t*)data;
uint32_t hash = 0x811C9DC5u; // FNV-1a offset basis (32-bit)
for (uint16_t i = 0; i < len; ++i) {
hash ^= (uint32_t)p[i]; // XOR с байтом
hash *= 0x01000193u; // FNV-1a prime
}
return hash;
}
// Проверить и запустить ожидающие коллбэки
static void check_waiters(struct ll_queue* q) {
if (!q) return;
// DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "check_waiters: checking waiters, count=%d, bytes=%zu", q->count, q->total_bytes);
struct queue_waiter* waiter = &q->waiter;
if (waiter->callback) {
// Проверить условие: не больше max_packets и не больше max_bytes
// max_bytes = 0 означает "не проверять байты"
if (q->count <= waiter->max_packets && (waiter->max_bytes == 0 || q->total_bytes <= waiter->max_bytes)) {
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "check_waiters: condition met, calling callback, count=%d<=%d, bytes=%zu<=%zu (max_bytes_check=%s)",
q->count, waiter->max_packets, q->total_bytes, waiter->max_bytes,
waiter->max_bytes == 0 ? "disabled" : "enabled");
waiter->callback(q, waiter->callback_arg);
memset(waiter, 0, sizeof(*waiter));
}
}
}
int queue_data_put(struct ll_queue* q, struct ll_entry* entry) {
if (!q || !entry) return -1;
#ifdef QUEUE_THREAD_CHECK
queue_check_thread(q);
#endif
entry->index_offset = 0;
entry->index_size = 0;
entry->index_hash = 0;
if (q->hash_size > 0) {
DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "[%s] Put no-hash data to hash queue",q->name);
queue_dgram_free(entry);
queue_entry_free(entry);
return -1;
}
if (q->size_limit >= 0 && q->count >= q->size_limit) {
queue_dgram_free(entry);
queue_entry_free(entry);
return -1;
}
// Добавляем в конец
entry->next = NULL;
entry->prev = q->tail;
if (q->tail) q->tail->next = entry; else q->head = entry;
q->tail = entry;
q->count++;
entry->int_len = entry->len;
q->total_bytes += entry->int_len;
// НЕ вызываем add_to_hash
if (q->count == 1 && !q->callback_suspended && q->callback) {
q->callback(q, q->callback_arg);
}
#ifdef QUEUE_DEBUG
queue_check_consistency(q);
#endif
return 0;
}
int queue_data_put_with_index(struct ll_queue* q, struct ll_entry* entry, uint16_t index_offset, uint16_t index_size) {
if (!q || !entry) return -1;
#ifdef QUEUE_THREAD_CHECK
queue_check_thread(q);
#endif
if (index_size == 0 || (size_t)index_offset + index_size > (size_t)entry->size) {
DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "[%s] invalid index: offset=%u size=%u (data_size=%u)",q->name, index_offset, index_size, entry->size);
queue_dgram_free(entry);
queue_entry_free(entry);
return -1;
}
entry->index_offset = index_offset;
entry->index_size = index_size;
entry->index_hash = make_hash(entry->data + index_offset, index_size);
if (q->size_limit >= 0 && q->count >= q->size_limit) {
queue_dgram_free(entry);
queue_entry_free(entry);
return -1;
}
// Добавляем в конец
entry->next = NULL;
entry->prev = q->tail;
if (q->tail) q->tail->next = entry; else q->head = entry;
q->tail = entry;
q->count++;
entry->int_len = entry->len;
q->total_bytes += entry->int_len;
add_to_hash(q, entry);
if (q->count == 1 && !q->callback_suspended && q->callback) {
q->callback(q, q->callback_arg);
}
#ifdef QUEUE_DEBUG
queue_check_consistency(q);
#endif
return 0;
}
int queue_data_put_first(struct ll_queue* q, struct ll_entry* entry) {
if (!q || !entry) return -1;
#ifdef QUEUE_THREAD_CHECK
queue_check_thread(q);
#endif
entry->index_offset = 0;
entry->index_size = 0;
entry->index_hash = 0;
if (q->hash_size > 0) {
DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "[%s] Put no-hash data to hash queue",q->name);
queue_dgram_free(entry);
queue_entry_free(entry);
return -1;
}
if (q->size_limit >= 0 && q->count >= q->size_limit) {
queue_dgram_free(entry);
queue_entry_free(entry); // как было в оригинале
return -1;
}
entry->next = q->head;
entry->prev = NULL;
if (q->head) q->head->prev = entry; else q->tail = entry;
q->head = entry;
q->count++;
entry->int_len = entry->len;
q->total_bytes += entry->int_len;
if (q->count == 1 && !q->callback_suspended && q->callback) {
q->callback(q, q->callback_arg);
}
#ifdef QUEUE_DEBUG
queue_check_consistency(q);
#endif
return 0;
}
// С хешем для put_first
int queue_data_put_first_with_index(struct ll_queue* q, struct ll_entry* entry, uint16_t index_offset, uint16_t index_size) {
if (!q || !entry) return -1;
#ifdef QUEUE_THREAD_CHECK
queue_check_thread(q);
#endif
if (index_size == 0 || (size_t)index_offset + index_size > (size_t)entry->size) {
DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "[%s] invalid index: offset=%u size=%u (data_size=%u)",q->name, index_offset, index_size, entry->size);
queue_dgram_free(entry);
queue_entry_free(entry);
return -1;
}
entry->index_offset = index_offset;
entry->index_size = index_size;
entry->index_hash = make_hash(entry->data + index_offset, index_size);
if (q->size_limit >= 0 && q->count >= q->size_limit) {
queue_dgram_free(entry);
queue_entry_free(entry);
return -1;
}
entry->next = q->head;
entry->prev = NULL;
if (q->head) q->head->prev = entry; else q->tail = entry;
q->head = entry;
q->count++;
entry->int_len = entry->len;
q->total_bytes += entry->int_len;
add_to_hash(q, entry);
if (q->count == 1 && !q->callback_suspended && q->callback) {
q->callback(q, q->callback_arg);
}
#ifdef QUEUE_DEBUG
queue_check_consistency(q);
#endif
return 0;
}
struct ll_entry* queue_data_get(struct ll_queue* q) {
if (!q || !q->head) return NULL;
#ifdef QUEUE_THREAD_CHECK
queue_check_thread(q);
#endif
struct ll_entry* entry = q->head;
q->head = entry->next;
if (q->head) q->head->prev = NULL;
if (!q->head) q->tail = NULL;
q->count--;
q->total_bytes -= entry->int_len;
entry->next = NULL;
entry->prev = NULL;
remove_from_hash(q, entry);
// DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_data_get: got entry %p (id=%u), count=%d", entry, entry->id, q->count);
// Приостановить коллбэки для предотвращения рекурсии
q->callback_suspended = 1;
// Проверить ожидающие коллбэки
check_waiters(q);
#ifdef QUEUE_DEBUG
queue_check_consistency(q);// !!!! for debug
#endif
return entry;
}
int queue_entry_count(struct ll_queue* q) {
return q ? q->count : 0;
}
// Функция проверки консистентности count и total_bytes
// Возвращает 0 если ok, -1 если есть несоответствия
int queue_check_consistency(struct ll_queue* q) {
if (!q) return -1; // Недопустимая очередь
// Проверка: если count > 0, то head не должен быть NULL
if (q->count > 0 && !q->head) {
DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "Queue '%s': count=%d but head is NULL!",
q->name ? q->name : "unknown", q->count);
return -1;
}
int actual_count = 0;
size_t actual_bytes = 0;
struct ll_entry* current = q->head;
while (current) {
actual_count++;
actual_bytes += current->int_len;
if (current->next) {
if (current->next->prev != current) {
// Несоответствие в связях prev/next
DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "Queue '%s': prev/next error at entry %p != %p entries: %d!=%d bytes: %zu!=%zu",
q->name ? q->name : "unknown", (void*)current, (void*)current->next->prev, actual_count, q->count, actual_bytes, q->total_bytes);
return -1;
}
}
current = current->next;
}
// Проверить хвост
if (q->tail && q->tail->next != NULL) {
DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "Queue '%s': tail error", q->name ? q->name : "unknown");
return -1; // Хвост должен иметь next == NULL
}
// Сравнить с сохранёнными значениями
if (actual_count != q->count || actual_bytes != q->total_bytes) {
DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "Queue '%s': count error entries: %d!=%d or bytes: %zu!=%zu",
q->name ? q->name : "unknown", actual_count, q->count, actual_bytes, q->total_bytes);
return -1;
}
return 0;
}
// ==================== Асинхронное ожидание ====================
struct queue_waiter* queue_wait_threshold(struct ll_queue* q, int max_packets, size_t max_bytes,
queue_threshold_callback_fn callback, void* arg) {
if (!q || !callback) return NULL;
struct queue_waiter* waiter = &q->waiter;
// Проверить условие немедленно
if (q->count <= max_packets && (max_bytes == 0 || q->total_bytes <= max_bytes)) {
// Условие уже выполнено - вызвать коллбэк немедленно
callback(q, arg);
return NULL;
}
// Установить waiter для отложенного вызова
waiter->max_packets = max_packets;
waiter->max_bytes = max_bytes;
waiter->callback = callback;
waiter->callback_arg = arg;
return waiter;
}
void queue_cancel_wait(struct ll_queue* q, struct queue_waiter* waiter) {
if (!q || !waiter || waiter != &q->waiter) return;
memset(waiter, 0, sizeof(*waiter));
}
// ==================== Поиск и удаление по ID ====================
struct ll_entry* queue_find_data_by_index(struct ll_queue* q, const void* index_key, uint16_t index_size) {
if (!q || q->hash_size == 0 || !q->hash_table || index_size == 0 || !index_key) return NULL;
uint32_t hash_val = make_hash(index_key, index_size);
uint32_t slot = hash_val % q->hash_size;
struct ll_entry* entry = q->hash_table[slot];
while (entry) {
if (entry->index_size == index_size &&
(size_t)entry->index_offset + index_size <= (size_t)entry->size &&
memcmp(entry->data + entry->index_offset, index_key, index_size) == 0) {
return entry;
}
entry = entry->hash_next;
}
return NULL;
}
int queue_remove_data(struct ll_queue* q, struct ll_entry* entry) {
if (!q || !entry) return -1;
#ifdef QUEUE_THREAD_CHECK
queue_check_thread(q);
#endif
// Удалить из двусвязного списка
if (entry->prev) {
entry->prev->next = entry->next;
} else {
q->head = entry->next;
}
if (entry->next) {
entry->next->prev = entry->prev;
} else {
q->tail = entry->prev;
}
q->count--;
q->total_bytes -= entry->int_len;
entry->next = NULL;
entry->prev = NULL;
remove_from_hash(q, entry);
// DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_remove_data: removed entry %p (id=%u), count=%d", entry, entry->id, q->count);
#ifdef QUEUE_DEBUG
queue_check_consistency(q);// !!!! for debug
#endif
check_waiters(q);
return 0;
}