You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

342 lines
17 KiB

#ifndef LL_QUEUE_H
#define LL_QUEUE_H
#include <stddef.h> // для size_t
#include <stdint.h> // для uint64_t
#include "memory_pool.h" // для struct memory_pool
#ifdef _WIN32
#include <winsock2.h>
#include <windows.h>
#else
#include <pthread.h>
#endif
#define QUEUE_DEBUG 1
//#define QUEUE_THREAD_CHECK 1 // 0 to disable
/**
* @file ll_queue.h
* @brief Упрощённая двусвязная очередь с поддержкой автозабора элементов через callback,
* порогового ожидания освобождения места, поиска по ID и гибкого управления памятью.
*
* Основные особенности:
* - FIFO-очередь (с возможностью добавления в начало для приоритета)
* - Автоматический вызов callback при появлении элементов (один элемент за раз)
* - Обязательный вызов queue_resume_callback после обработки элемента
* - Поддержка пулов памяти для entry и отдельно для dgram
* - Хеш-таблица для быстрого поиска по ID (опционально)
* - Встроенный одиночный waiter для ожидания освобождения очереди до заданного порога
* - Использует uasync для отложенного возобновления callback'ов (без рекурсии в стеке)
*
* @note Важные правила использования автозабора (queue_set_callback):
* 1. Коллбэк получает управление только если в очереди есть элементы И !callback_suspended
* 2. Внутри коллбэка ОБЯЗАТЕЛЬНО: queue_data_get() → обработать элемент → queue_resume_callback()
* 3. Без вызова queue_resume_callback очередь навсегда застрянет
* 4. Не извлекайте элементы вручную вне коллбэка — это нарушит логику
* 5. queue_data_get() автоматически suspend'ит коллбэки для предотвращения рекурсии
*
* @note Память:
* - queue_free() освобождает ТОЛЬКО структуру очереди и хеш-таблицу
* - Все элементы должны быть извлечены через queue_data_get() и освобождены через queue_entry_free()
* - dgram освобождается отдельно через queue_dgram_free() или кастомную dgram_free_fn
*/
// Forward declaration
struct ll_queue;
/**
* @struct ll_entry
* @brief Элемент очереди (переменного размера).
*
* Память выделяется одним блоком: [struct ll_entry + data[size]].
* Поле dgram — отдельный блок (malloc / пул / внешняя память).
*/
struct ll_entry {
char* name;
struct ll_entry* next; // Следующий элемент в очереди
struct ll_entry* prev; // Предыдущий элемент в очереди
uint16_t size; // Размер пользовательского буфера data[] (байт)
uint16_t len; // Актуальная длина данных в dgram
uint16_t memlen; // Выделенный размер под dgram
uint16_t int_len; // Внутреннее (не использовать)
uint8_t* dgram; // Указатель на данные пакета
void (*dgram_free_fn)(uint8_t* data); // Кастомная функция освобождения dgram
struct memory_pool* dgram_pool; // Пул для dgram (если используется)
uint16_t index_offset; // Смещение индекса в data[]
uint16_t index_size; // Длина индекса (0 = без индекса)
uint32_t index_hash; // Хеш по первым 4 байтам индекса
struct ll_entry* hash_next; // Следующий в хеш-цепочке
struct memory_pool* pool; // Пул, из которого выделен сам entry (NULL = malloc)
uint8_t data[0]; // Гибкий массив пользовательских данных (размер = size)
};
/**
* @typedef queue_callback_fn
* @brief Коллбэк автозабора элементов из очереди.
* @param q указатель на очередь
* @param arg пользовательский аргумент (из queue_set_callback)
*
* @note Внутри функции:
* - Вызвать queue_data_get(q) для получения элемента
* - Обработать элемент (можно асинхронно)
* - После завершения обработки вызвать queue_resume_callback(q)
*/
typedef void (*queue_callback_fn)(struct ll_queue* q, void* arg);
/**
* @typedef queue_threshold_callback_fn
* @brief Одноразовый коллбэк при достижении порога очереди (queue_wait_threshold).
* @param q указатель на очередь
* @param arg пользовательский аргумент
*/
typedef void (*queue_threshold_callback_fn)(struct ll_queue* q, void* arg);
/**
* @struct queue_waiter
* @brief Описание ожидания освобождения места в очереди (встроен в ll_queue — только один).
*/
struct queue_waiter {
int max_packets; ///< Максимально допустимое количество элементов
size_t max_bytes; ///< Максимально допустимый объём данных (0 = не проверять)
queue_threshold_callback_fn callback; ///< Коллбэк (вызывается один раз)
void* callback_arg; ///< Аргумент коллбэка
};
/**
* @struct ll_queue
* @brief Структура очереди.
*/
struct ll_queue {
char* name;
struct ll_entry* head; // Голова очереди (отсюда извлекаем)
struct ll_entry* tail; // Хвост очереди (сюда добавляем)
int count; // Текущее количество элементов
size_t total_bytes; // Суммарный объём данных (сумма int_len)
int size_limit; // Максимальное количество элементов (-1 = без лимита)
queue_callback_fn callback; // Коллбэк автозабора
void* callback_arg; // Аргумент коллбэка
int callback_suspended; // 1 = коллбэки временно приостановлены
void* resume_timeout_id; // ID таймера uasync для отложенного resume
struct UASYNC* ua; // Экземпляр uasync (обязателен для таймеров)
struct queue_waiter waiter; // Встроенный waiter (только один)
struct ll_entry** hash_table; // Хеш-таблица для поиска по id (если hash_size > 0)
size_t hash_size; // Размер хеш-таблицы
#ifdef QUEUE_THREAD_CHECK
#ifdef _WIN32
DWORD owner_thread;
#else
pthread_t owner_thread;
#endif
#endif
};
/* ==================== Создание / уничтожение ==================== */
/**
* @brief Создаёт новую пустую очередь.
* @param ua экземпляр uasync (обязателен для отложенного resume коллбэков)
* @param hash_size размер хеш-таблицы (0 = поиск по id отключён)
* @return указатель на очередь или NULL при ошибке выделения памяти
*/
struct ll_queue* queue_new(struct UASYNC* ua, size_t hash_size, char* name);
/**
* @brief Освобождает структуру очереди.
* @param q очередь
*
* @warning НЕ ОСВОБОЖДАЕТ элементы в очереди!
* Их нужно извлечь через queue_data_get() и освободить через queue_entry_free().
*/
void queue_free(struct ll_queue* q);
/* ==================== Конфигурация ==================== */
/**
* @brief Устанавливает максимальное количество элементов в очереди.
* @param q очередь
* @param lim лимит (-1 = без ограничения)
*
* При превышении лимита новые элементы автоматически освобождаются.
*/
void queue_set_size_limit(struct ll_queue* q, int lim);
/* ==================== Автозабор элементов ==================== */
/**
* @brief Устанавливает коллбэк для автоматического извлечения элементов.
* @param q очередь
* @param cbk_fn функция-коллбэк
* @param arg пользовательский аргумент
*
* Коллбэк вызывается автоматически при наличии элементов и разрешённом состоянии.
*/
void queue_set_callback(struct ll_queue* q, queue_callback_fn cbk_fn, void* arg);
/**
* @brief Возобновляет работу коллбэка после обработки элемента в следующем event loop.
* @param q очередь
*
* @warning ОБЯЗАТЕЛЬНО вызывать после завершения обработки элемента в коллбэке.
* Иначе автозабор элементов навсегда остановится.
*/
void queue_resume_callback(struct ll_queue* q);
/* ==================== Пороговое ожидание ==================== */
/**
* @brief Регистрирует одноразовый коллбэк, который сработает, когда очередь освободится до заданного порога.
* @param q очередь
* @param max_packets максимальное количество элементов
* @param max_bytes максимальный объём данных (0 = не проверять)
* @param callback функция, вызываемая при достижении порога
* @param arg аргумент коллбэка
* @return указатель на waiter (для отмены) или NULL, если условие уже выполнено (коллбэк вызван сразу)
*/
struct queue_waiter* queue_wait_threshold(struct ll_queue* q, int max_packets, size_t max_bytes,
queue_threshold_callback_fn callback, void* arg);
/**
* @brief Отменяет ранее установленное ожидание порога.
* @param q очередь
* @param waiter указатель, возвращённый queue_wait_threshold()
*/
void queue_cancel_wait(struct ll_queue* q, struct queue_waiter* waiter);
/* ==================== Работа с данными ==================== */
/**
* @brief Добавляет элемент в конец очереди (FIFO).
* @param q очередь
* @param entry элемент
* @param id идентификатор для поиска
* @return 0 — успех, -1 — превышен лимит (элемент освобождён)
*/
int queue_data_put(struct ll_queue* q, struct ll_entry* entry);
/**
* @brief Добавляет элемент в начало очереди (LIFO, высокий приоритет).
* @param q очередь
* @param entry элемент
* @param id идентификатор
* @return 0 — успех, -1 — превышен лимит (элемент освобождён)
*/
int queue_data_put_first(struct ll_queue* q, struct ll_entry* entry);
/**
* @brief Добавляет элемент в конец очереди с явным указанием смещения индекса для hash.
* @param q очередь
* @param entry элемент
* @param index_offset смещение индекса (node_id) в data[]
* @param index_size размер индекса (8 для uint64_t node_id)
* @return 0 — успех, -1 — ошибка
*/
int queue_data_put_with_index(struct ll_queue* q, struct ll_entry* entry,
uint16_t index_offset, uint16_t index_size);
/**
* @brief Добавляет элемент в начало очереди с явным указанием смещения индекса для hash.
* @param q очередь
* @param entry элемент
* @param index_offset смещение индекса в data[]
* @param index_size размер индекса
* @return 0 — успех, -1 — ошибка
*/
int queue_data_put_first_with_index(struct ll_queue* q, struct ll_entry* entry,
uint16_t index_offset, uint16_t index_size);
/**
* @brief Извлекает элемент из начала очереди.
* @param q очередь
* @return элемент или NULL, если очередь пуста
*
* @note Автоматически приостанавливает коллбэки (callback_suspended = 1)
* После обработки элемента необходимо вызвать queue_resume_callback().
*/
struct ll_entry* queue_data_get(struct ll_queue* q);
/**
* @brief Возвращает текущее количество элементов в очереди.
* @param q очередь
* @return количество элементов
*/
int queue_entry_count(struct ll_queue* q);
/* ==================== Управление памятью элементов ==================== */
// проверить правильность очереди (корректность цепочки, счетчики элементов и байтов)
int queue_check_consistency(struct ll_queue* q);
/**
* @brief Выделяет элемент с отдельным буфером dgram (malloc).
* @param len требуемый размер dgram
* @return элемент или NULL при ошибке
*/
struct ll_entry* ll_alloc_lldgram(uint16_t len);
/**
* @brief Создаёт элемент с пользовательским буфером data[] (malloc одним блоком).
* @param data_size размер data[]
* @return элемент или NULL при ошибке
*/
struct ll_entry* queue_entry_new(size_t data_size);
/**
* @brief Создаёт элемент из пула памяти.
* @param pool пул
* @return элемент или NULL, если пул пуст
*/
struct ll_entry* queue_entry_new_from_pool(struct memory_pool* pool);
/**
* @brief Освобождает структуру элемента (возвращает в пул или free).
* @param entry элемент
*
* @note НЕ освобождает dgram! Используйте queue_dgram_free() отдельно.
*/
void queue_entry_free(struct ll_entry* entry);
/**
* @brief Освобождает буфер dgram элемента.
* @param entry элемент
*/
void queue_dgram_free(struct ll_entry* entry);
/* ==================== Поиск и удаление ==================== */
/**
* @brief Находит элемент по идентификатору (требуется hash_size > 0 при создании).
* @param q очередь
* @param id искомый идентификатор
* @return элемент или NULL
*/
struct ll_entry* queue_find_data_by_index(struct ll_queue* q,
const void* index_key,
uint16_t index_size);
/**
* @brief Удаляет элемент из очереди (не освобождает память).
* @param q очередь
* @param entry элемент
* @return 0 — успех, -1 — элемент не найден в очереди
*/
int queue_remove_data(struct ll_queue* q, struct ll_entry* entry);
/* ==================== Утилиты ==================== */
/**
* @brief Возвращает суммарный объём данных в очереди.
* @param q очередь
* @return количество байт
*/
static inline size_t queue_total_bytes(struct ll_queue* q) {
return q ? q->total_bytes : 0;
}
#endif // LL_QUEUE_H