// ll_queue.c - Упрощенная архитектура: разделение создания элементов и работы с очередью #include #include #include #include #include "ll_queue.h" #include "u_async.h" #include "debug_config.h" // Предварительные объявления внутренних функций static void queue_resume_timeout_cb(void* arg); static void check_waiters(struct ll_queue* q); static void add_to_hash(struct ll_queue* q, struct ll_entry* entry); static void remove_from_hash(struct ll_queue* q, struct ll_entry* entry); // ==================== Управление очередью ==================== struct ll_queue* queue_new(struct UASYNC* ua, size_t hash_size) { if (!ua) return NULL; struct ll_queue* q = calloc(1, sizeof(struct ll_queue)); if (!q) return NULL; q->ua = ua; q->size_limit = -1; // Без ограничения по умолчанию q->hash_size = hash_size; // Создать хеш-таблицу если нужно if (hash_size > 0) { q->hash_table = calloc(hash_size, sizeof(struct ll_entry*)); if (!q->hash_table) { free(q); return NULL; } } DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_new: created queue %p, hash_size=%zu", q, hash_size); return q; } struct ll_entry* ll_alloc_lldgram(uint16_t len) { struct ll_entry* entry = queue_entry_new(0); if (!entry) return NULL; entry->len=0; entry->memlen=len; entry->dgram = malloc(len); // entry->dgram_pool = NULL; - уже null (memset) // entry->dgram_free_fn = NULL; if (!entry->dgram) { queue_entry_free(entry); return NULL; } return entry; } void queue_free(struct ll_queue* q) { if (!q) return; DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_free: freeing queue %p, head=%p, tail=%p, count=%d", q, q->head, q->tail, q->count); // ВАЖНО: Не освобождаем элементы в очереди - они должны быть извлечены отдельно // Это упрощает архитектуру и предотвращает double-free // Освободить хеш-таблицу if (q->hash_table) { free(q->hash_table); } // Отменить отложенное возобновление if (q->resume_timeout_id) { uasync_cancel_timeout(q->ua, q->resume_timeout_id); q->resume_timeout_id = NULL; } free(q); } // ==================== Конфигурация очереди ==================== void queue_set_callback(struct ll_queue* q, queue_callback_fn cbk_fn, void* arg) { if (!q) return; q->callback = cbk_fn; q->callback_arg = arg; } static void queue_resume_timeout_cb(void* arg) { struct ll_queue* q = (struct ll_queue*)arg; if (!q) return; q->resume_timeout_id = NULL; // Вызвать коллбэк если есть элементы и коллбэки разрешены if (q->head && !q->callback_suspended && q->callback) { q->callback(q, q->callback_arg); } } void queue_resume_callback(struct ll_queue* q) { if (!q) return; q->callback_suspended = 0; // Если есть элементы, запланировать вызов коллбэка if (q->head && q->callback && !q->resume_timeout_id) { q->resume_timeout_id = uasync_set_timeout(q->ua, 0, q, queue_resume_timeout_cb); } } void queue_set_size_limit(struct ll_queue* q, int lim) { if (!q) return; q->size_limit = lim; } // ==================== Управление элементами ==================== struct ll_entry* queue_entry_new(size_t data_size) { struct ll_entry* entry = malloc(sizeof(struct ll_entry) + data_size); if (!entry) return NULL; memset(entry, 0, sizeof(struct ll_entry) + data_size); entry->size = data_size; entry->len = 0; entry->pool = NULL; // Выделено через malloc // DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_new: created entry %p, size=%zu", entry, data_size); return entry; } struct ll_entry* queue_entry_new_from_pool(struct memory_pool* pool) { if (!pool) return NULL; struct ll_entry* entry = memory_pool_alloc(pool); if (!entry) return NULL; memset(entry, 0, pool->object_size); entry->size = pool->object_size - sizeof(struct ll_entry); entry->len = 0; entry->pool = pool; // Выделено из пула // DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_entry_new_from_pool: created entry %p from pool %p", entry, pool); return entry; } //void ll_free_dgram(struct ll_entry* entry) { void queue_dgram_free(struct ll_entry* entry) { if (!entry) return; if (entry->dgram) { if (entry->dgram_free_fn) { entry->dgram_free_fn(entry->dgram, NULL); // arg=NULL, если не задан } else if (entry->dgram_pool) { memory_pool_free(entry->dgram_pool, entry->dgram); } else { free(entry->dgram); } entry->dgram = NULL; entry->len = 0; // Опционально сброс len } } void queue_entry_free(struct ll_entry* entry) { if (!entry) return; if (entry->pool) { memory_pool_free(entry->pool, entry); } else { free(entry); } } // ==================== Операции с очередью ==================== // Внутренняя функция добавления в хеш-таблицу static void add_to_hash(struct ll_queue* q, struct ll_entry* entry) { if (!q || q->hash_size == 0 || !entry) return; size_t slot = entry->id % q->hash_size; entry->hash_next = q->hash_table[slot]; q->hash_table[slot] = entry; } // Внутренняя функция удаления из хеш-таблицы static void remove_from_hash(struct ll_queue* q, struct ll_entry* entry) { if (!q || q->hash_size == 0 || !entry) return; size_t slot = entry->id % q->hash_size; struct ll_entry** ptr = &q->hash_table[slot]; while (*ptr) { if (*ptr == entry) { *ptr = entry->hash_next; entry->hash_next = NULL; return; } ptr = &(*ptr)->hash_next; } } // Проверить и запустить ожидающие коллбэки static void check_waiters(struct ll_queue* q) { if (!q) return; // DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "check_waiters: checking waiters, count=%d, bytes=%zu", q->count, q->total_bytes); struct queue_waiter* waiter = &q->waiter; if (waiter->callback) { // Проверить условие: не больше max_packets и не больше max_bytes // max_bytes = 0 означает "не проверять байты" if (q->count <= waiter->max_packets && (waiter->max_bytes == 0 || q->total_bytes <= waiter->max_bytes)) { DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "check_waiters: condition met, calling callback, count=%d<=%d, bytes=%zu<=%zu (max_bytes_check=%s)", q->count, waiter->max_packets, q->total_bytes, waiter->max_bytes, waiter->max_bytes == 0 ? "disabled" : "enabled"); waiter->callback(q, waiter->callback_arg); memset(waiter, 0, sizeof(*waiter)); } } } int queue_data_put(struct ll_queue* q, struct ll_entry* entry, uint32_t id) { if (!q || !entry) return -1; entry->id = id; // Проверить лимит размера if (q->size_limit >= 0 && q->count >= q->size_limit) { queue_entry_free(entry); // Освободить элемент если превышен лимит return -1; } // Добавить в конец entry->next = NULL; entry->prev = q->tail; if (q->tail) { q->tail->next = entry; } else { q->head = entry; } q->tail = entry; q->count++; entry->int_len=entry->len; q->total_bytes += entry->int_len; size_t send_q_bytes = queue_total_bytes(q); // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "check total bytes: new_q_len=%d element_size:%d", send_q_bytes, entry->size); add_to_hash(q, entry); // DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_data_put: added entry %p (id=%u), count=%d", entry, id, q->count); // Если очередь была пуста и коллбэки разрешены - вызвать коллбэк if (q->count == 1 && !q->callback_suspended && q->callback) { q->callback(q, q->callback_arg); } // Проверить ожидающие коллбэки check_waiters(q); return 0; } int queue_data_put_first(struct ll_queue* q, struct ll_entry* entry, uint32_t id) { if (!q || !entry) return -1; entry->id = id; // Проверить лимит размера if (q->size_limit >= 0 && q->count >= q->size_limit) { queue_entry_free(entry); // Освободить элемент если превышен лимит return -1; } // Добавить в начало entry->next = q->head; entry->prev = NULL; if (q->head) { q->head->prev = entry; } else { q->tail = entry; } q->head = entry; q->count++; entry->int_len=entry->len; q->total_bytes += entry->int_len; add_to_hash(q, entry); // DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_data_put_first: added entry %p (id=%u), count=%d", entry, id, q->count); // Если очередь была пуста и коллбэки разрешены - вызвать коллбэк if (q->count == 1 && !q->callback_suspended && q->callback) { q->callback(q, q->callback_arg); } // Проверить ожидающие коллбэки check_waiters(q); return 0; } struct ll_entry* queue_data_get(struct ll_queue* q) { if (!q || !q->head) return NULL; struct ll_entry* entry = q->head; q->head = entry->next; if (q->head) q->head->prev = NULL; if (!q->head) q->tail = NULL; q->count--; q->total_bytes -= entry->int_len; entry->next = NULL; entry->prev = NULL; remove_from_hash(q, entry); // DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_data_get: got entry %p (id=%u), count=%d", entry, entry->id, q->count); // Приостановить коллбэки для предотвращения рекурсии q->callback_suspended = 1; // Проверить ожидающие коллбэки check_waiters(q); return entry; } int queue_entry_count(struct ll_queue* q) { return q ? q->count : 0; } // ==================== Асинхронное ожидание ==================== struct queue_waiter* queue_wait_threshold(struct ll_queue* q, int max_packets, size_t max_bytes, queue_threshold_callback_fn callback, void* arg) { if (!q || !callback) return NULL; struct queue_waiter* waiter = &q->waiter; // Проверить условие немедленно if (q->count <= max_packets && (max_bytes == 0 || q->total_bytes <= max_bytes)) { // Условие уже выполнено - вызвать коллбэк немедленно callback(q, arg); return NULL; } // Установить waiter для отложенного вызова waiter->max_packets = max_packets; waiter->max_bytes = max_bytes; waiter->callback = callback; waiter->callback_arg = arg; return waiter; } void queue_cancel_wait(struct ll_queue* q, struct queue_waiter* waiter) { if (!q || !waiter || waiter != &q->waiter) return; memset(waiter, 0, sizeof(*waiter)); } // ==================== Поиск и удаление по ID ==================== struct ll_entry* queue_find_data_by_id(struct ll_queue* q, uint32_t id) { if (!q || q->hash_size == 0 || !q->hash_table) return NULL; size_t slot = id % q->hash_size; struct ll_entry* entry = q->hash_table[slot]; while (entry) { if (entry->id == id) { return entry; } entry = entry->hash_next; } return NULL; } int queue_remove_data(struct ll_queue* q, struct ll_entry* entry) { if (!q || !entry) return -1; // Удалить из двусвязного списка if (entry->prev) { entry->prev->next = entry->next; } else { q->head = entry->next; } if (entry->next) { entry->next->prev = entry->prev; } else { q->tail = entry->prev; } q->count--; q->total_bytes -= entry->int_len; entry->next = NULL; entry->prev = NULL; remove_from_hash(q, entry); // DEBUG_DEBUG(DEBUG_CATEGORY_LL_QUEUE, "queue_remove_data: removed entry %p (id=%u), count=%d", entry, entry->id, q->count); return 0; }