You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

464 lines
15 KiB

// ll_queue.c - Упрощенная архитектура: разделение создания элементов и работы с очередью
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <assert.h>
#include "ll_queue.h"
#include "u_async.h"
#include "debug_config.h"
#include "mem.h"
// Предварительные объявления внутренних функций
static void queue_resume_timeout_cb(void* arg);
static void check_waiters(struct ll_queue* q);
static void add_to_hash(struct ll_queue* q, struct ll_entry* entry);
static void remove_from_hash(struct ll_queue* q, struct ll_entry* entry);
// ==================== Управление очередью ====================
struct ll_queue* queue_new(struct UASYNC* ua, size_t hash_size) {
if (!ua) return NULL;
struct ll_queue* q = u_calloc(1, sizeof(struct ll_queue));
if (!q) return NULL;
q->ua = ua;
q->size_limit = -1; // Без ограничения по умолчанию
q->hash_size = hash_size;
// Создать хеш-таблицу если нужно
if (hash_size > 0) {
q->hash_table = u_calloc(hash_size, sizeof(struct ll_entry*));
if (!q->hash_table) {
u_free(q);
return NULL;
}
}
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_new: created queue %p, hash_size=%zu", q, hash_size);
return q;
}
struct ll_entry* ll_alloc_lldgram(uint16_t len) {
struct ll_entry* entry = queue_entry_new(0);
if (!entry) return NULL;
entry->len=0;
entry->memlen=len;
entry->dgram = u_malloc(len);
// entry->dgram_pool = NULL; - уже null (memset)
// entry->dgram_free_fn = NULL;
if (!entry->dgram) {
queue_entry_free(entry);
return NULL;
}
return entry;
}
void queue_free(struct ll_queue* q) {
if (!q) return;
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_free: freeing queue %p, head=%p, tail=%p, count=%d",
q, q->head, q->tail, q->count);
// ВАЖНО: Не освобождаем элементы в очереди - они должны быть извлечены отдельно
// Это упрощает архитектуру и предотвращает double-u_free
// Освободить хеш-таблицу
if (q->hash_table) {
u_free(q->hash_table);
}
// Отменить отложенное возобновление
if (q->resume_timeout_id) {
uasync_cancel_timeout(q->ua, q->resume_timeout_id);
q->resume_timeout_id = NULL;
}
u_free(q);
}
// ==================== Конфигурация очереди ====================
void queue_set_callback(struct ll_queue* q, queue_callback_fn cbk_fn, void* arg) {
if (!q) return;
q->callback = cbk_fn;
q->callback_arg = arg;
}
static void queue_resume_timeout_cb(void* arg) {
struct ll_queue* q = (struct ll_queue*)arg;
if (!q) return;
q->resume_timeout_id = NULL;
// Вызвать коллбэк если есть элементы и коллбэки разрешены
if (q->head && !q->callback_suspended && q->callback) {
q->callback(q, q->callback_arg);
}
}
void queue_resume_callback(struct ll_queue* q) {
if (!q) return;
q->callback_suspended = 0;
// Если есть элементы, запланировать вызов коллбэка
if (q->head && q->callback && !q->resume_timeout_id) {
q->resume_timeout_id = uasync_set_timeout(q->ua, 0, q, queue_resume_timeout_cb);
}
}
void queue_set_size_limit(struct ll_queue* q, int lim) {
if (!q) return;
q->size_limit = lim;
}
// ==================== Управление элементами ====================
struct ll_entry* queue_entry_new(size_t data_size) {
struct ll_entry* entry = u_malloc(sizeof(struct ll_entry) + data_size);
if (!entry) return NULL;
memset(entry, 0, sizeof(struct ll_entry) + data_size);
entry->size = data_size;
entry->len = 0;
entry->pool = NULL; // Выделено через u_malloc
// DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_new: created entry %p, size=%zu", entry, data_size);
return entry;
}
struct ll_entry* queue_entry_new_from_pool(struct memory_pool* pool) {
if (!pool) return NULL;
struct ll_entry* entry = memory_pool_alloc(pool);
if (!entry) return NULL;
memset(entry, 0, pool->object_size);
entry->size = pool->object_size - sizeof(struct ll_entry);
entry->len = 0;
entry->pool = pool; // Выделено из пула
// DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_new_from_pool: created entry %p from pool %p", entry, pool);
return entry;
}
//void ll_u_free_dgram(struct ll_entry* entry) {
void queue_dgram_free(struct ll_entry* entry) {
if (!entry) return;
if (entry->dgram) {
if (entry->dgram_free_fn) {
entry->dgram_free_fn(entry->dgram); // arg=NULL, если не задан
} else if (entry->dgram_pool) {
memory_pool_free(entry->dgram_pool, entry->dgram);
} else {
u_free(entry->dgram);
}
entry->dgram = NULL;
entry->len = 0; // Опционально сброс len
}
}
void queue_entry_free(struct ll_entry* entry) {
if (!entry) return;
if (entry->pool) {
memory_pool_free(entry->pool, entry);
} else {
u_free(entry);
}
}
// ==================== Операции с очередью ====================
// Внутренняя функция добавления в хеш-таблицу
static void add_to_hash(struct ll_queue* q, struct ll_entry* entry) {
if (!q || q->hash_size == 0 || !entry) return;
size_t slot = entry->id % q->hash_size;
entry->hash_next = q->hash_table[slot];
q->hash_table[slot] = entry;
}
// Внутренняя функция удаления из хеш-таблицы
static void remove_from_hash(struct ll_queue* q, struct ll_entry* entry) {
if (!q || q->hash_size == 0 || !entry) return;
size_t slot = entry->id % q->hash_size;
struct ll_entry** ptr = &q->hash_table[slot];
while (*ptr) {
if (*ptr == entry) {
*ptr = entry->hash_next;
entry->hash_next = NULL;
return;
}
ptr = &(*ptr)->hash_next;
}
}
// Проверить и запустить ожидающие коллбэки
static void check_waiters(struct ll_queue* q) {
if (!q) return;
// DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "check_waiters: checking waiters, count=%d, bytes=%zu", q->count, q->total_bytes);
struct queue_waiter* waiter = &q->waiter;
if (waiter->callback) {
// Проверить условие: не больше max_packets и не больше max_bytes
// max_bytes = 0 означает "не проверять байты"
if (q->count <= waiter->max_packets && (waiter->max_bytes == 0 || q->total_bytes <= waiter->max_bytes)) {
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "check_waiters: condition met, calling callback, count=%d<=%d, bytes=%zu<=%zu (max_bytes_check=%s)",
q->count, waiter->max_packets, q->total_bytes, waiter->max_bytes,
waiter->max_bytes == 0 ? "disabled" : "enabled");
waiter->callback(q, waiter->callback_arg);
memset(waiter, 0, sizeof(*waiter));
}
}
}
int queue_data_put(struct ll_queue* q, struct ll_entry* entry, uint32_t id) {
if (!q || !entry) return -1;
entry->id = id;
// Проверить лимит размера
if (q->size_limit >= 0 && q->count >= q->size_limit) {
queue_entry_free(entry); // Освободить элемент если превышен лимит
return -1;
}
// Добавить в конец
entry->next = NULL;
entry->prev = q->tail;
if (q->tail) {
q->tail->next = entry;
} else {
q->head = entry;
}
q->tail = entry;
q->count++;
entry->int_len=entry->len;
q->total_bytes += entry->int_len;
size_t send_q_bytes = queue_total_bytes(q);
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "check total bytes: new_q_len=%d element_size:%d", send_q_bytes, entry->size);
add_to_hash(q, entry);
// DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_data_put: added entry %p (id=%u), count=%d", entry, id, q->count);
// Если очередь была пуста и коллбэки разрешены - вызвать коллбэк
if (q->count == 1 && !q->callback_suspended && q->callback) {
q->callback(q, q->callback_arg);
}
// Проверить ожидающие коллбэки
check_waiters(q);
#ifdef QUEUE_DEBUG
queue_check_consistency(q);// !!!! for debug
#endif
return 0;
}
int queue_data_put_first(struct ll_queue* q, struct ll_entry* entry, uint32_t id) {
if (!q || !entry) return -1;
entry->id = id;
// Проверить лимит размера
if (q->size_limit >= 0 && q->count >= q->size_limit) {
queue_entry_free(entry); // Освободить элемент если превышен лимит
return -1;
}
// Добавить в начало
entry->next = q->head;
entry->prev = NULL;
if (q->head) {
q->head->prev = entry;
} else {
q->tail = entry;
}
q->head = entry;
q->count++;
entry->int_len=entry->len;
q->total_bytes += entry->int_len;
add_to_hash(q, entry);
// DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_data_put_first: added entry %p (id=%u), count=%d", entry, id, q->count);
// Если очередь была пуста и коллбэки разрешены - вызвать коллбэк
if (q->count == 1 && !q->callback_suspended && q->callback) {
q->callback(q, q->callback_arg);
}
// Проверить ожидающие коллбэки
check_waiters(q);
#ifdef QUEUE_DEBUG
queue_check_consistency(q);// !!!! for debug
#endif
return 0;
}
struct ll_entry* queue_data_get(struct ll_queue* q) {
if (!q || !q->head) return NULL;
struct ll_entry* entry = q->head;
q->head = entry->next;
if (q->head) q->head->prev = NULL;
if (!q->head) q->tail = NULL;
q->count--;
q->total_bytes -= entry->int_len;
entry->next = NULL;
entry->prev = NULL;
remove_from_hash(q, entry);
// DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_data_get: got entry %p (id=%u), count=%d", entry, entry->id, q->count);
// Приостановить коллбэки для предотвращения рекурсии
q->callback_suspended = 1;
// Проверить ожидающие коллбэки
check_waiters(q);
#ifdef QUEUE_DEBUG
queue_check_consistency(q);// !!!! for debug
#endif
return entry;
}
int queue_entry_count(struct ll_queue* q) {
return q ? q->count : 0;
}
// Функция проверки консистентности count и total_bytes
// Возвращает 0 если ok, -1 если есть несоответствия
int queue_check_consistency(struct ll_queue* q) {
if (!q) return -1; // Недопустимая очередь
int actual_count = 0;
size_t actual_bytes = 0;
struct ll_entry* current = q->head;
while (current) {
actual_count++;
actual_bytes += current->int_len;
if (current->next) {
if (current->next->prev != current) {
// Несоответствие в связях prev/next
DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "Prev/next error");
return -1;
}
}
current = current->next;
}
// Проверить хвост
if (q->tail && q->tail->next != NULL) {
DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "Tail error");
return -1; // Хвост должен иметь next == NULL
}
// Сравнить с сохранёнными значениями
if (actual_count != q->count || actual_bytes != q->total_bytes) {
DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, " count error entries: %d!=%d or bytes: %d!=%d", actual_count, q->count, actual_bytes, q->total_bytes);
return -1;
}
return 0;
}
// ==================== Асинхронное ожидание ====================
struct queue_waiter* queue_wait_threshold(struct ll_queue* q, int max_packets, size_t max_bytes,
queue_threshold_callback_fn callback, void* arg) {
if (!q || !callback) return NULL;
struct queue_waiter* waiter = &q->waiter;
// Проверить условие немедленно
if (q->count <= max_packets && (max_bytes == 0 || q->total_bytes <= max_bytes)) {
// Условие уже выполнено - вызвать коллбэк немедленно
callback(q, arg);
return NULL;
}
// Установить waiter для отложенного вызова
waiter->max_packets = max_packets;
waiter->max_bytes = max_bytes;
waiter->callback = callback;
waiter->callback_arg = arg;
return waiter;
}
void queue_cancel_wait(struct ll_queue* q, struct queue_waiter* waiter) {
if (!q || !waiter || waiter != &q->waiter) return;
memset(waiter, 0, sizeof(*waiter));
}
// ==================== Поиск и удаление по ID ====================
struct ll_entry* queue_find_data_by_id(struct ll_queue* q, uint32_t id) {
if (!q || q->hash_size == 0 || !q->hash_table) return NULL;
size_t slot = id % q->hash_size;
struct ll_entry* entry = q->hash_table[slot];
while (entry) {
if (entry->id == id) {
return entry;
}
entry = entry->hash_next;
}
return NULL;
}
int queue_remove_data(struct ll_queue* q, struct ll_entry* entry) {
if (!q || !entry) return -1;
// Удалить из двусвязного списка
if (entry->prev) {
entry->prev->next = entry->next;
} else {
q->head = entry->next;
}
if (entry->next) {
entry->next->prev = entry->prev;
} else {
q->tail = entry->prev;
}
q->count--;
q->total_bytes -= entry->int_len;
entry->next = NULL;
entry->prev = NULL;
remove_from_hash(q, entry);
// DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_remove_data: removed entry %p (id=%u), count=%d", entry, entry->id, q->count);
#ifdef QUEUE_DEBUG
queue_check_consistency(q);// !!!! for debug
#endif
return 0;
}