// ll_queue.c - Упрощенная архитектура: разделение создания элементов и работы с очередью #include #include #include #include #include "ll_queue.h" #include "u_async.h" #include "debug_config.h" // Предварительные объявления внутренних функций static void queue_resume_timeout_cb(void* arg); static void check_waiters(struct ll_queue* q); static void add_to_hash(struct ll_queue* q, struct ll_entry* entry); static void remove_from_hash(struct ll_queue* q, struct ll_entry* entry); #define xxx 0 // Вспомогательная функция для преобразования данных в запись static inline struct ll_entry* data_to_entry(void* data) { if (!data) return NULL; return ((struct ll_entry*)data) - xxx; } // ==================== Управление очередью ==================== struct ll_queue* queue_new(struct UASYNC* ua, size_t hash_size) { if (!ua) return NULL; struct ll_queue* q = calloc(1, sizeof(struct ll_queue)); if (!q) return NULL; q->ua = ua; q->size_limit = -1; // Без ограничения по умолчанию q->hash_size = hash_size; // Создать хеш-таблицу если нужно if (hash_size > 0) { q->hash_table = calloc(hash_size, sizeof(struct ll_entry*)); if (!q->hash_table) { free(q); return NULL; } } DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_new: created queue %p, hash_size=%zu", q, hash_size); return q; } void queue_free(struct ll_queue* q) { if (!q) return; DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_free: freeing queue %p, head=%p, tail=%p, count=%d", q, q->head, q->tail, q->count); // ВАЖНО: Не освобождаем элементы в очереди - они должны быть извлечены отдельно // Это упрощает архитектуру и предотвращает double-free // Освободить хеш-таблицу if (q->hash_table) { free(q->hash_table); } // Отменить отложенное возобновление if (q->resume_timeout_id) { uasync_cancel_timeout(q->ua, q->resume_timeout_id); q->resume_timeout_id = NULL; } free(q); } // ==================== Конфигурация очереди ==================== void queue_set_callback(struct ll_queue* q, queue_callback_fn cbk_fn, void* arg) { if (!q) return; q->callback = cbk_fn; q->callback_arg = arg; } static void queue_resume_timeout_cb(void* arg) { struct ll_queue* q = (struct ll_queue*)arg; if (!q) return; q->resume_timeout_id = NULL; // Вызвать коллбэк если есть элементы и коллбэки разрешены if (q->head && !q->callback_suspended && q->callback) { q->callback(q, q->callback_arg); } } void queue_resume_callback(struct ll_queue* q) { if (!q) return; q->callback_suspended = 0; // Если есть элементы, запланировать вызов коллбэка if (q->head && q->callback && !q->resume_timeout_id) { q->resume_timeout_id = uasync_set_timeout(q->ua, 0, q, queue_resume_timeout_cb); } } void queue_set_size_limit(struct ll_queue* q, int lim) { if (!q) return; q->size_limit = lim; } // ==================== Управление элементами ==================== void* queue_data_new(size_t data_size) { struct ll_entry* entry = malloc(sizeof(struct ll_entry) + data_size); if (!entry) return NULL; memset(entry, 0, sizeof(struct ll_entry) + data_size); entry->size = data_size; entry->ref_count = 1; // Единственная ссылка - на сам элемент entry->pool = NULL; // Выделено через malloc // DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_data_new: created entry %p, size=%zu", entry, data_size); return (void*)(entry + xxx); } void* queue_data_new_from_pool(struct memory_pool* pool) { if (!pool) return NULL; struct ll_entry* entry = memory_pool_alloc(pool); if (!entry) return NULL; memset(entry, 0, pool->object_size); entry->size = pool->object_size - sizeof(struct ll_entry); entry->ref_count = 1; // Единственная ссылка - на сам элемент entry->pool = pool; // Выделено из пула // DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_data_new_from_pool: created entry %p from pool %p", entry, pool); return (void*)(entry + xxx); } void queue_data_free(void* data) { if (!data) return; struct ll_entry* entry = data_to_entry(data); // DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_data_free: freeing entry %p, ref_count=%d", entry, entry->ref_count); if (--entry->ref_count <= 0) { if (entry->pool) { memory_pool_free(entry->pool, entry); } else { free(entry); } } } // ==================== Операции с очередью ==================== // Внутренняя функция добавления в хеш-таблицу static void add_to_hash(struct ll_queue* q, struct ll_entry* entry) { if (!q || q->hash_size == 0 || !entry) return; size_t slot = entry->id % q->hash_size; entry->hash_next = q->hash_table[slot]; q->hash_table[slot] = entry; } // Внутренняя функция удаления из хеш-таблицы static void remove_from_hash(struct ll_queue* q, struct ll_entry* entry) { if (!q || q->hash_size == 0 || !entry) return; size_t slot = entry->id % q->hash_size; struct ll_entry** ptr = &q->hash_table[slot]; while (*ptr) { if (*ptr == entry) { *ptr = entry->hash_next; entry->hash_next = NULL; return; } ptr = &(*ptr)->hash_next; } } // Проверить и запустить ожидающие коллбэки static void check_waiters(struct ll_queue* q) { if (!q) return; // DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "check_waiters: checking waiters, count=%d, bytes=%zu", q->count, q->total_bytes); struct queue_waiter* waiter = &q->waiter; if (waiter->callback) { // Проверить условие: не больше max_packets и не больше max_bytes // max_bytes = 0 означает "не проверять байты" if (q->count <= waiter->max_packets && (waiter->max_bytes == 0 || q->total_bytes <= waiter->max_bytes)) { DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "check_waiters: condition met, calling callback, count=%d<=%d, bytes=%zu<=%zu (max_bytes_check=%s)", q->count, waiter->max_packets, q->total_bytes, waiter->max_bytes, waiter->max_bytes == 0 ? "disabled" : "enabled"); waiter->callback(q, waiter->callback_arg); memset(waiter, 0, sizeof(*waiter)); } } } int queue_data_put(struct ll_queue* q, void* data, uint32_t id) { if (!q || !data) return -1; struct ll_entry* entry = data_to_entry(data); entry->id = id; // Проверить лимит размера if (q->size_limit >= 0 && q->count >= q->size_limit) { queue_data_free(data); // Освободить элемент если превышен лимит return -1; } // Добавить в конец entry->next = NULL; entry->prev = q->tail; if (q->tail) { q->tail->next = entry; } else { q->head = entry; } q->tail = entry; q->count++; q->total_bytes += entry->size; // ВАЖНО: НЕ увеличиваем ref_count - элемент просто находится в очереди size_t send_q_bytes = queue_total_bytes(q); // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "check total bytes: new_q_len=%d element_size:%d", send_q_bytes, entry->size); add_to_hash(q, entry); // DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_data_put: added entry %p (id=%u), count=%d", entry, id, q->count); // Если очередь была пуста и коллбэки разрешены - вызвать коллбэк if (q->count == 1 && !q->callback_suspended && q->callback) { q->callback(q, q->callback_arg); } // Проверить ожидающие коллбэки check_waiters(q); return 0; } int queue_data_put_first(struct ll_queue* q, void* data, uint32_t id) { if (!q || !data) return -1; struct ll_entry* entry = data_to_entry(data); entry->id = id; // Проверить лимит размера if (q->size_limit >= 0 && q->count >= q->size_limit) { queue_data_free(data); // Освободить элемент если превышен лимит return -1; } // Добавить в начало entry->next = q->head; entry->prev = NULL; if (q->head) { q->head->prev = entry; } else { q->tail = entry; } q->head = entry; q->count++; q->total_bytes += entry->size; // ВАЖНО: НЕ увеличиваем ref_count - элемент просто находится в очереди add_to_hash(q, entry); // DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_data_put_first: added entry %p (id=%u), count=%d", entry, id, q->count); // Если очередь была пуста и коллбэки разрешены - вызвать коллбэк if (q->count == 1 && !q->callback_suspended && q->callback) { q->callback(q, q->callback_arg); } // Проверить ожидающие коллбэки check_waiters(q); return 0; } void* queue_data_get(struct ll_queue* q) { if (!q || !q->head) return NULL; struct ll_entry* entry = q->head; q->head = entry->next; if (q->head) q->head->prev = NULL; if (!q->head) q->tail = NULL; q->count--; q->total_bytes -= entry->size; entry->next = NULL; entry->prev = NULL; remove_from_hash(q, entry); // ВАЖНО: НЕ уменьшаем ref_count - просто удаляем из очереди // DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_data_get: got entry %p (id=%u), count=%d", entry, entry->id, q->count); // Приостановить коллбэки для предотвращения рекурсии q->callback_suspended = 1; // Проверить ожидающие коллбэки check_waiters(q); return (void*)(entry + xxx); } int queue_entry_count(struct ll_queue* q) { return q ? q->count : 0; } // ==================== Асинхронное ожидание ==================== struct queue_waiter* queue_wait_threshold(struct ll_queue* q, int max_packets, size_t max_bytes, queue_threshold_callback_fn callback, void* arg) { if (!q || !callback) return NULL; struct queue_waiter* waiter = &q->waiter; // Проверить условие немедленно if (q->count <= max_packets && (max_bytes == 0 || q->total_bytes <= max_bytes)) { // Условие уже выполнено - вызвать коллбэк немедленно callback(q, arg); return NULL; } // Установить waiter для отложенного вызова waiter->max_packets = max_packets; waiter->max_bytes = max_bytes; waiter->callback = callback; waiter->callback_arg = arg; return waiter; } void queue_cancel_wait(struct ll_queue* q, struct queue_waiter* waiter) { if (!q || !waiter || waiter != &q->waiter) return; memset(waiter, 0, sizeof(*waiter)); } // ==================== Поиск и удаление по ID ==================== void* queue_find_data_by_id(struct ll_queue* q, uint32_t id) { if (!q || q->hash_size == 0 || !q->hash_table) return NULL; size_t slot = id % q->hash_size; struct ll_entry* entry = q->hash_table[slot]; while (entry) { if (entry->id == id) { return (void*)(entry + xxx); } entry = entry->hash_next; } return NULL; } int queue_remove_data(struct ll_queue* q, void* data) { if (!q || !data) return -1; struct ll_entry* entry = data_to_entry(data); // Удалить из двусвязного списка if (entry->prev) { entry->prev->next = entry->next; } else { q->head = entry->next; } if (entry->next) { entry->next->prev = entry->prev; } else { q->tail = entry->prev; } q->count--; q->total_bytes -= entry->size; // ВАЖНО: НЕ уменьшаем ref_count - просто удаляем из очереди entry->next = NULL; entry->prev = NULL; remove_from_hash(q, entry); // DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_remove_data: removed entry %p (id=%u), count=%d", entry, entry->id, q->count); return 0; }