You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

400 lines
15 KiB

#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <assert.h>
#include "ll_queue.h"
#include "u_async.h"
#include "debug_config.h"
// Предварительное объявление для отложенного возобновления
static void queue_resume_timeout_cb(void* arg);
// Проверить и запустить ожидающие коллбэки
static void check_waiters(struct ll_queue* q) {
if (!q || !q->waiters) return;
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "check_waiters: checking waiters, count=%d, bytes=%zu",
q->count, q->total_bytes);
struct queue_waiter** pprev = &q->waiters;
struct queue_waiter* waiter = q->waiters;
while (waiter) {
struct queue_waiter* next = waiter->next;
// Проверить условие: не больше max_packets и не больше max_bytes
// max_bytes = 0 означает "не проверять байты"
if (q->count <= waiter->max_packets && (waiter->max_bytes == 0 || q->total_bytes <= waiter->max_bytes)) {
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "check_waiters: condition met, calling callback, count=%d<=%d, bytes=%zu<=%zu (max_bytes_check=%s)",
q->count, waiter->max_packets, q->total_bytes, waiter->max_bytes,
waiter->max_bytes == 0 ? "disabled" : "enabled");
waiter->callback(q, waiter->callback_arg);
// Удалить waiter из списка
*pprev = next;
if (q->pool) {
memory_pool_free(q->pool, waiter);
} else {
free(waiter);
}
// pprev уже указывает на правильный следующий элемент
} else {
// Условие не выполнено - оставить в списке
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "check_waiters: condition NOT met, count=%d>%d or bytes=%zu>%zu (max_bytes_check=%s)",
q->count, waiter->max_packets, q->total_bytes, waiter->max_bytes,
waiter->max_bytes == 0 ? "disabled" : "enabled");
pprev = &waiter->next;
}
waiter = next;
}
}
// ==================== Управление очередью ====================
struct ll_queue* queue_new(struct UASYNC* ua, struct memory_pool* pool) {
struct ll_queue* q = calloc(1, sizeof(struct ll_queue));
if (!q) return NULL;
q->head = NULL;
q->tail = NULL;
q->count = 0;
q->total_bytes = 0;
q->size_limit = -1; // По умолчанию без ограничения
q->callback = NULL;
q->callback_arg = NULL;
q->callback_suspended = 0; // Коллбэки разрешены изначально
q->resume_timeout_id = NULL;
q->ua = ua;
q->waiters = NULL;
q->pool=pool;
return q;
}
void queue_free(struct ll_queue* q) {
if (!q) return;
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_free: freeing queue %p, head=%p, tail=%p, count=%d",
q, q->head, q->tail, q->count);
// Освободить все элементы (уменьшить счетчик ссылок)
struct ll_entry* entry = q->head;
int entry_count = 0;
while (entry) {
struct ll_entry* next = entry->next;
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_free: releasing entry %p (entry %d), ref_count=%d",
entry, entry_count++, entry->ref_count);
queue_entry_free(entry); // Это уменьшит ref_count и освободит только если ref_count == 0
entry = next;
}
// Освободить все ожидающие коллбэки
struct queue_waiter* waiter = q->waiters;
int waiter_count = 0;
while (waiter) {
struct queue_waiter* next = waiter->next;
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_free: freeing waiter %p (waiter %d)", waiter, waiter_count++);
if (q->pool) {
memory_pool_free(q->pool, waiter);
} else {
free(waiter);
}
waiter = next;
}
// Отменить отложенное возобновление если запланировано
if (q->resume_timeout_id) {
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_free: cancelling resume timeout %p", q->resume_timeout_id);
uasync_cancel_timeout(q->ua, q->resume_timeout_id);
}
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_free: freeing queue structure %p", q);
free(q);
}
// ==================== Конфигурация очереди ====================
void queue_set_callback(struct ll_queue* q, queue_callback_fn cbk_fn, void* arg) {
if (!q) return;
q->callback = cbk_fn;
q->callback_arg = arg;
}
static void queue_resume_timeout_cb(void* arg) {
struct ll_queue* q = (struct ll_queue*)arg;
if (!q || !q->callback) return;
// Очистить ID таймаута (таймаут сработал)
q->resume_timeout_id = NULL;
// Разрешить коллбэки
q->callback_suspended = 0;
// Если в очереди есть элементы, вызвать коллбэк с первым элементом
// Обработчик должен извлечь этот элемент вызовом queue_entry_get()
if (q->head) {
q->callback(q, q->head, q->callback_arg);
}
}
void queue_resume_callback(struct ll_queue* q) {
if (!q || !q->callback) return;
// Если уже есть отложенное возобновление, ничего не делать
if (q->resume_timeout_id) {
return;
}
// Запланировать отложенное возобновление через uasync
q->resume_timeout_id = uasync_set_timeout(q->ua, 0, q, queue_resume_timeout_cb);
}
void queue_set_size_limit(struct ll_queue* q, int lim) {
if (!q) return;
q->size_limit = lim;
}
// ==================== Управление элементами ====================
struct ll_entry* queue_entry_new(size_t data_size) {
// Выделить память под структуру + область данных
struct ll_entry* entry = malloc(sizeof(struct ll_entry) + data_size);
if (!entry) return NULL;
entry->next = NULL;
entry->size = data_size;
entry->ref_count = 1; // Начальный счетчик ссылок
// Область данных оставить неинициализированной для производительности
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_new: created entry %p, size=%zu, ref_count=%d",
entry, data_size, entry->ref_count);
return entry;
}
void queue_entry_free(struct ll_entry* entry) {
if (!entry) return;
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_free: entry=%p, ref_count=%d",
entry, entry->ref_count);
if (entry->ref_count <= 0) {
DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_free: entry %p has invalid ref_count=%d",
entry, entry->ref_count);
return; // Предотвратить double-free
}
entry->ref_count--;
if (entry->ref_count == 0) {
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_free: actually freeing entry %p", entry);
free(entry);
} else {
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_free: decremented ref_count to %d for entry %p",
entry->ref_count, entry);
}
}
// ==================== Операции с очередью ====================
int queue_entry_put(struct ll_queue* q, struct ll_entry* entry) {
if (!q || !entry) return -1;
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_put: entry=%p, size=%zu, count=%d",
entry, entry->size, q->count);
// Проверить лимит размера
if (q->size_limit >= 0 && q->count >= q->size_limit) {
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_put: size limit exceeded, freeing entry %p", entry);
queue_entry_free(entry);
return -1;
}
// Увеличить счетчик ссылок при добавлении в очередь
entry->ref_count++;
// Добавить в хвост (FIFO)
entry->next = NULL;
if (q->tail) {
q->tail->next = entry;
} else {
q->head = entry;
}
q->tail = entry;
q->count++;
q->total_bytes += entry->size;
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_put: added entry %p, new count=%d, total_bytes=%zu, ref_count=%d",
entry, q->count, q->total_bytes, entry->ref_count);
// Если коллбэки разрешены - вызвать коллбэк
// Это запускает автоматическую обработку очереди
if (!q->callback_suspended && q->callback) {
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_put: calling callback for entry %p", entry);
// Приостановить коллбэки во время выполнения коллбэка, чтобы предотвратить рекурсию
q->callback_suspended = 1;
q->callback(q, entry, q->callback_arg);
// Не восстанавливать здесь - восстановление происходит через queue_resume_callback
}
// Проверить ожидающие коллбэки
check_waiters(q);
return 0;
}
int queue_entry_put_first(struct ll_queue* q, struct ll_entry* entry) {
if (!q || !entry) return -1;
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_put_first: entry=%p, size=%zu, count=%d",
entry, entry->size, q->count);
// Проверить лимит размера
if (q->size_limit >= 0 && q->count >= q->size_limit) {
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_put_first: size limit exceeded, freeing entry %p", entry);
queue_entry_free(entry);
return -1;
}
// Увеличить счетчик ссылок при добавлении в очередь
entry->ref_count++;
// Добавить в голову (LIFO, высокий приоритет)
entry->next = q->head;
q->head = entry;
if (!q->tail) {
q->tail = entry;
}
q->count++;
q->total_bytes += entry->size;
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_put_first: added entry %p, new count=%d, total_bytes=%zu, ref_count=%d",
entry, q->count, q->total_bytes, entry->ref_count);
// Если коллбэки разрешены - вызвать коллбэк
if (!q->callback_suspended && q->callback) {
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_put_first: calling callback for entry %p", entry);
q->callback(q, entry, q->callback_arg);
}
// Проверить ожидающие коллбэки
check_waiters(q);
return 0;
}
struct ll_entry* queue_entry_get(struct ll_queue* q) {
if (!q || !q->head) return NULL;
struct ll_entry* entry = q->head;
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_get: retrieving entry %p, size=%zu, ref_count=%d",
entry, entry->size, entry->ref_count);
q->head = entry->next;
if (!q->head) {
q->tail = NULL;
}
q->count--;
q->total_bytes -= entry->size;
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_get: removed entry %p, new count=%d, total_bytes=%zu",
entry, q->count, q->total_bytes);
entry->next = NULL; // Отсоединить от очереди
// Уменьшить счетчик ссылок при извлечении из очереди
// entry->ref_count был увеличен при добавлении в очередь
// теперь уменьшаем, но не освобождаем, так как вызывающий код должен это сделать
entry->ref_count--;
// При извлечении элемента приостанавливаем коллбэки
// Это предотвращает рекурсию если во время обработки добавляются новые элементы
q->callback_suspended = 1;
// Проверить ожидающие коллбэки при извлечении элемента
// Это важно для waiters, которые ожидают уменьшения очереди
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_get: about to call check_waiters, count=%d", q->count);
check_waiters(q);
return entry;
}
int queue_entry_count(struct ll_queue* q) {
if (!q) return 0;
return q->count;
}
// ==================== Асинхронное ожидание ====================
struct queue_waiter* queue_wait_threshold(struct ll_queue* q, int max_packets, size_t max_bytes,
queue_threshold_callback_fn callback, void* arg) {
if (!q || !callback) return NULL;
// Создать новый waiter
struct queue_waiter* waiter;
if (q->pool) {
waiter = (struct queue_waiter*)memory_pool_alloc(q->pool);
} else {
waiter = malloc(sizeof(struct queue_waiter));
}
if (!waiter) return NULL;
waiter->max_packets = max_packets;
waiter->max_bytes = max_bytes;
waiter->callback = callback;
waiter->callback_arg = arg;
waiter->next = NULL;
// Проверить условие немедленно
if (q->count <= max_packets && (max_bytes == 0 || q->total_bytes <= max_bytes)) {
// Условие уже выполнено - вызвать коллбэк и освободить waiter
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_wait_threshold: condition already met, count=%d<=%d, bytes=%zu<=%zu, calling callback",
q->count, max_packets, q->total_bytes, max_bytes);
callback(q, arg);
free(waiter);
return NULL;
}
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_wait_threshold: registering waiter, count=%d, bytes=%zu, max_packets=%d, max_bytes=%zu",
q->count, q->total_bytes, max_packets, max_bytes);
// Добавить в список ожидающих
waiter->next = q->waiters;
q->waiters = waiter;
DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_wait_threshold: waiter registered successfully");
return waiter;
}
void queue_cancel_wait(struct ll_queue* q, struct queue_waiter* waiter) {
if (!q || !waiter) return;
// Найти и удалить waiter из списка
struct queue_waiter** pprev = &q->waiters;
struct queue_waiter* w = q->waiters;
while (w) {
if (w == waiter) {
*pprev = w->next;
if (q->pool) {
memory_pool_free(q->pool, w);
} else {
free(w);
}
return;
}
pprev = &w->next;
w = w->next;
}
}
// ==================== Статистика и метрики ====================
void queue_get_pool_stats(struct ll_queue* q, size_t* waiter_allocations, size_t* waiter_reuse) {
if (!q || !q->pool) {
if (waiter_allocations) *waiter_allocations = 0;
if (waiter_reuse) *waiter_reuse = 0;
return;
}
memory_pool_get_stats(q->pool, waiter_allocations, waiter_reuse);
}