#ifndef LL_QUEUE_H #define LL_QUEUE_H #include // для size_t #include // для uint64_t #include "memory_pool.h" // для struct memory_pool #ifdef _WIN32 #include #include #else #include #endif #define QUEUE_DEBUG 1 //#define QUEUE_THREAD_CHECK 1 // 0 to disable /** * @file ll_queue.h * @brief Упрощённая двусвязная очередь с поддержкой автозабора элементов через callback, * порогового ожидания освобождения места, поиска по ID и гибкого управления памятью. * * Основные особенности: * - FIFO-очередь (с возможностью добавления в начало для приоритета) * - Автоматический вызов callback при появлении элементов (один элемент за раз) * - Обязательный вызов queue_resume_callback после обработки элемента * - Поддержка пулов памяти для entry и отдельно для dgram * - Хеш-таблица для быстрого поиска по ID (опционально) * - Встроенный одиночный waiter для ожидания освобождения очереди до заданного порога * - Использует uasync для отложенного возобновления callback'ов (без рекурсии в стеке) * * @note Важные правила использования автозабора (queue_set_callback): * 1. Коллбэк получает управление только если в очереди есть элементы И !callback_suspended * 2. Внутри коллбэка ОБЯЗАТЕЛЬНО: queue_data_get() → обработать элемент → queue_resume_callback() * 3. Без вызова queue_resume_callback очередь навсегда застрянет * 4. Не извлекайте элементы вручную вне коллбэка — это нарушит логику * 5. queue_data_get() автоматически suspend'ит коллбэки для предотвращения рекурсии * * @note Память: * - queue_free() освобождает ТОЛЬКО структуру очереди и хеш-таблицу * - Все элементы должны быть извлечены через queue_data_get() и освобождены через queue_entry_free() * - dgram освобождается отдельно через queue_dgram_free() или кастомную dgram_free_fn */ // Forward declaration struct ll_queue; /** * @struct ll_entry * @brief Элемент очереди (переменного размера). * * Память выделяется одним блоком: [struct ll_entry + data[size]]. * Поле dgram — отдельный блок (malloc / пул / внешняя память). */ struct ll_entry { char* name; struct ll_entry* next; // Следующий элемент в очереди struct ll_entry* prev; // Предыдущий элемент в очереди uint16_t size; // Размер пользовательского буфера data[] (байт) uint16_t len; // Актуальная длина данных в dgram uint16_t memlen; // Выделенный размер под dgram uint16_t int_len; // Внутреннее (не использовать) uint8_t* dgram; // Указатель на данные пакета void (*dgram_free_fn)(uint8_t* data); // Кастомная функция освобождения dgram struct memory_pool* dgram_pool; // Пул для dgram (если используется) uint16_t index_offset; // Смещение индекса в data[] uint16_t index_size; // Длина индекса (0 = без индекса) uint32_t index_hash; // Хеш по первым 4 байтам индекса struct ll_entry* hash_next; // Следующий в хеш-цепочке struct memory_pool* pool; // Пул, из которого выделен сам entry (NULL = malloc) uint8_t data[0]; // Гибкий массив пользовательских данных (размер = size) }; /** * @typedef queue_callback_fn * @brief Коллбэк автозабора элементов из очереди. * @param q указатель на очередь * @param arg пользовательский аргумент (из queue_set_callback) * * @note Внутри функции: * - Вызвать queue_data_get(q) для получения элемента * - Обработать элемент (можно асинхронно) * - После завершения обработки вызвать queue_resume_callback(q) */ typedef void (*queue_callback_fn)(struct ll_queue* q, void* arg); /** * @typedef queue_threshold_callback_fn * @brief Одноразовый коллбэк при достижении порога очереди (queue_wait_threshold). * @param q указатель на очередь * @param arg пользовательский аргумент */ typedef void (*queue_threshold_callback_fn)(struct ll_queue* q, void* arg); /** * @struct queue_waiter * @brief Описание ожидания освобождения места в очереди (встроен в ll_queue — только один). */ struct queue_waiter { int max_packets; ///< Максимально допустимое количество элементов size_t max_bytes; ///< Максимально допустимый объём данных (0 = не проверять) queue_threshold_callback_fn callback; ///< Коллбэк (вызывается один раз) void* callback_arg; ///< Аргумент коллбэка }; /** * @struct ll_queue * @brief Структура очереди. */ struct ll_queue { char* name; struct ll_entry* head; // Голова очереди (отсюда извлекаем) struct ll_entry* tail; // Хвост очереди (сюда добавляем) int count; // Текущее количество элементов size_t total_bytes; // Суммарный объём данных (сумма int_len) int size_limit; // Максимальное количество элементов (-1 = без лимита) queue_callback_fn callback; // Коллбэк автозабора void* callback_arg; // Аргумент коллбэка int callback_suspended; // 1 = коллбэки временно приостановлены void* resume_timeout_id; // ID таймера uasync для отложенного resume struct UASYNC* ua; // Экземпляр uasync (обязателен для таймеров) struct queue_waiter waiter; // Встроенный waiter (только один) struct ll_entry** hash_table; // Хеш-таблица для поиска по id (если hash_size > 0) size_t hash_size; // Размер хеш-таблицы #ifdef QUEUE_THREAD_CHECK #ifdef _WIN32 DWORD owner_thread; #else pthread_t owner_thread; #endif #endif }; /* ==================== Создание / уничтожение ==================== */ /** * @brief Создаёт новую пустую очередь. * @param ua экземпляр uasync (обязателен для отложенного resume коллбэков) * @param hash_size размер хеш-таблицы (0 = поиск по id отключён) * @return указатель на очередь или NULL при ошибке выделения памяти */ struct ll_queue* queue_new(struct UASYNC* ua, size_t hash_size, char* name); /** * @brief Освобождает структуру очереди. * @param q очередь * * @warning НЕ ОСВОБОЖДАЕТ элементы в очереди! * Их нужно извлечь через queue_data_get() и освободить через queue_entry_free(). */ void queue_free(struct ll_queue* q); /* ==================== Конфигурация ==================== */ /** * @brief Устанавливает максимальное количество элементов в очереди. * @param q очередь * @param lim лимит (-1 = без ограничения) * * При превышении лимита новые элементы автоматически освобождаются. */ void queue_set_size_limit(struct ll_queue* q, int lim); /* ==================== Автозабор элементов ==================== */ /** * @brief Устанавливает коллбэк для автоматического извлечения элементов. * @param q очередь * @param cbk_fn функция-коллбэк * @param arg пользовательский аргумент * * Коллбэк вызывается автоматически при наличии элементов и разрешённом состоянии. */ void queue_set_callback(struct ll_queue* q, queue_callback_fn cbk_fn, void* arg); /** * @brief Возобновляет работу коллбэка после обработки элемента в следующем event loop. * @param q очередь * * @warning ОБЯЗАТЕЛЬНО вызывать после завершения обработки элемента в коллбэке. * Иначе автозабор элементов навсегда остановится. */ void queue_resume_callback(struct ll_queue* q); /* ==================== Пороговое ожидание ==================== */ /** * @brief Регистрирует одноразовый коллбэк, который сработает, когда очередь освободится до заданного порога. * @param q очередь * @param max_packets максимальное количество элементов * @param max_bytes максимальный объём данных (0 = не проверять) * @param callback функция, вызываемая при достижении порога * @param arg аргумент коллбэка * @return указатель на waiter (для отмены) или NULL, если условие уже выполнено (коллбэк вызван сразу) */ struct queue_waiter* queue_wait_threshold(struct ll_queue* q, int max_packets, size_t max_bytes, queue_threshold_callback_fn callback, void* arg); /** * @brief Отменяет ранее установленное ожидание порога. * @param q очередь * @param waiter указатель, возвращённый queue_wait_threshold() */ void queue_cancel_wait(struct ll_queue* q, struct queue_waiter* waiter); /* ==================== Работа с данными ==================== */ /** * @brief Добавляет элемент в конец очереди (FIFO). * @param q очередь * @param entry элемент * @param id идентификатор для поиска * @return 0 — успех, -1 — превышен лимит (элемент освобождён) */ int queue_data_put(struct ll_queue* q, struct ll_entry* entry); /** * @brief Добавляет элемент в начало очереди (LIFO, высокий приоритет). * @param q очередь * @param entry элемент * @param id идентификатор * @return 0 — успех, -1 — превышен лимит (элемент освобождён) */ int queue_data_put_first(struct ll_queue* q, struct ll_entry* entry); /** * @brief Добавляет элемент в конец очереди с явным указанием смещения индекса для hash. * @param q очередь * @param entry элемент * @param index_offset смещение индекса (node_id) в data[] * @param index_size размер индекса (8 для uint64_t node_id) * @return 0 — успех, -1 — ошибка */ int queue_data_put_with_index(struct ll_queue* q, struct ll_entry* entry, uint16_t index_offset, uint16_t index_size); /** * @brief Добавляет элемент в начало очереди с явным указанием смещения индекса для hash. * @param q очередь * @param entry элемент * @param index_offset смещение индекса в data[] * @param index_size размер индекса * @return 0 — успех, -1 — ошибка */ int queue_data_put_first_with_index(struct ll_queue* q, struct ll_entry* entry, uint16_t index_offset, uint16_t index_size); /** * @brief Извлекает элемент из начала очереди. * @param q очередь * @return элемент или NULL, если очередь пуста * * @note Автоматически приостанавливает коллбэки (callback_suspended = 1) * После обработки элемента необходимо вызвать queue_resume_callback(). */ struct ll_entry* queue_data_get(struct ll_queue* q); /** * @brief Возвращает текущее количество элементов в очереди. * @param q очередь * @return количество элементов */ int queue_entry_count(struct ll_queue* q); /* ==================== Управление памятью элементов ==================== */ // проверить правильность очереди (корректность цепочки, счетчики элементов и байтов) int queue_check_consistency(struct ll_queue* q); /** * @brief Выделяет элемент с отдельным буфером dgram (malloc). * @param len требуемый размер dgram * @return элемент или NULL при ошибке */ struct ll_entry* ll_alloc_lldgram(uint16_t len); /** * @brief Создаёт элемент с пользовательским буфером data[] (malloc одним блоком). * @param data_size размер data[] * @return элемент или NULL при ошибке */ struct ll_entry* queue_entry_new(size_t data_size); /** * @brief Создаёт элемент из пула памяти. * @param pool пул * @return элемент или NULL, если пул пуст */ struct ll_entry* queue_entry_new_from_pool(struct memory_pool* pool); /** * @brief Освобождает структуру элемента (возвращает в пул или free). * @param entry элемент * * @note НЕ освобождает dgram! Используйте queue_dgram_free() отдельно. */ void queue_entry_free(struct ll_entry* entry); /** * @brief Освобождает буфер dgram элемента. * @param entry элемент */ void queue_dgram_free(struct ll_entry* entry); /* ==================== Поиск и удаление ==================== */ /** * @brief Находит элемент по идентификатору (требуется hash_size > 0 при создании). * @param q очередь * @param id искомый идентификатор * @return элемент или NULL */ struct ll_entry* queue_find_data_by_index(struct ll_queue* q, const void* index_key, uint16_t index_size); /** * @brief Удаляет элемент из очереди (не освобождает память). * @param q очередь * @param entry элемент * @return 0 — успех, -1 — элемент не найден в очереди */ int queue_remove_data(struct ll_queue* q, struct ll_entry* entry); /* ==================== Утилиты ==================== */ /** * @brief Возвращает суммарный объём данных в очереди. * @param q очередь * @return количество байт */ static inline size_t queue_total_bytes(struct ll_queue* q) { return q ? q->total_bytes : 0; } #endif // LL_QUEUE_H