Browse Source

проверки thread в ll_queue

nodeinfo-routing-update
jeka 4 weeks ago
parent
commit
5dfd1ea5fe
  1. 55
      lib/ll_queue.c
  2. 16
      lib/ll_queue.h
  3. 2
      src/etcp_api.c
  4. 16
      src/pkt_normalizer.c
  5. 2
      src/pkt_normalizer.h

55
lib/ll_queue.c

@ -8,6 +8,41 @@
#include "debug_config.h"
#include "mem.h"
#ifdef _WIN32
#include <windows.h>
#else
#include <pthread.h>
#endif
// ==================== Thread safety check ====================
#ifdef QUEUE_THREAD_CHECK
static inline void queue_check_thread(struct ll_queue* q) {
if (!q) return;
#ifdef _WIN32
DWORD current = GetCurrentThreadId();
if (q->owner_thread != current) {
DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "Queue '%s': thread mismatch! owner=%lu current=%lu",
q->name ? q->name : "unknown",
(unsigned long)q->owner_thread,
(unsigned long)current);
printf("ERROR: Queue '%s' accessed from wrong thread!\n", q->name ? q->name : "unknown");
abort();
}
#else
pthread_t current = pthread_self();
if (!pthread_equal(q->owner_thread, current)) {
DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "Queue '%s': thread mismatch! owner=%lu current=%lu",
q->name ? q->name : "unknown",
(unsigned long)q->owner_thread,
(unsigned long)current);
printf("ERROR: Queue '%s' accessed from wrong thread!\n", q->name ? q->name : "unknown");
abort();
}
#endif
}
#endif
// Предварительные объявления внутренних функций
static void queue_resume_timeout_cb(void* arg);
static void check_waiters(struct ll_queue* q);
@ -27,6 +62,14 @@ struct ll_queue* queue_new(struct UASYNC* ua, size_t hash_size, char* name) {
q->size_limit = -1; // Без ограничения по умолчанию
q->hash_size = hash_size;
#ifdef QUEUE_THREAD_CHECK
#ifdef _WIN32
q->owner_thread = GetCurrentThreadId();
#else
q->owner_thread = pthread_self();
#endif
#endif
// Создать хеш-таблицу если нужно
if (hash_size > 0) {
q->hash_table = u_calloc(hash_size, sizeof(struct ll_entry*));
@ -227,6 +270,12 @@ static void check_waiters(struct ll_queue* q) {
int queue_data_put(struct ll_queue* q, struct ll_entry* entry, uint32_t id) {
if (!q || !entry) return -1;
queue_check_thread(q);
#ifdef QUEUE_DEBUG
queue_check_consistency(q);// !!!! for debug - BEFORE callback
#endif
entry->id = id;
// Проверить лимит размера
@ -278,6 +327,8 @@ int queue_data_put(struct ll_queue* q, struct ll_entry* entry, uint32_t id) {
int queue_data_put_first(struct ll_queue* q, struct ll_entry* entry, uint32_t id) {
if (!q || !entry) return -1;
queue_check_thread(q);
entry->id = id;
// Проверить лимит размера
@ -330,6 +381,8 @@ int queue_data_put_first(struct ll_queue* q, struct ll_entry* entry, uint32_t id
struct ll_entry* queue_data_get(struct ll_queue* q) {
if (!q || !q->head) return NULL;
queue_check_thread(q);
struct ll_entry* entry = q->head;
q->head = entry->next;
@ -459,6 +512,8 @@ struct ll_entry* queue_find_data_by_id(struct ll_queue* q, uint32_t id) {
int queue_remove_data(struct ll_queue* q, struct ll_entry* entry) {
if (!q || !entry) return -1;
queue_check_thread(q);
// Удалить из двусвязного списка
if (entry->prev) {
entry->prev->next = entry->next;

16
lib/ll_queue.h

@ -5,7 +5,14 @@
#include <stdint.h> // для uint64_t
#include "memory_pool.h" // для struct memory_pool
#ifdef _WIN32
#include <windows.h>
#else
#include <pthread.h>
#endif
#define QUEUE_DEBUG 1
#define QUEUE_THREAD_CHECK 1 // 0 to disable
/**
* @file ll_queue.h
@ -45,6 +52,7 @@ struct ll_queue;
* Поле dgram отдельный блок (malloc / пул / внешняя память).
*/
struct ll_entry {
char* name;
struct ll_entry* next; ///< Следующий элемент в очереди
struct ll_entry* prev; ///< Предыдущий элемент в очереди
uint16_t size; ///< Размер пользовательского буфера data[] (байт)
@ -115,6 +123,14 @@ struct ll_queue {
struct ll_entry** hash_table; ///< Хеш-таблица для поиска по id (если hash_size > 0)
size_t hash_size; ///< Размер хеш-таблицы
#ifdef QUEUE_THREAD_CHECK
#ifdef _WIN32
DWORD owner_thread;
#else
pthread_t owner_thread;
#endif
#endif
};
/* ==================== Создание / уничтожение ==================== */

2
src/etcp_api.c

@ -88,7 +88,9 @@ int etcp_send(struct ETCP_CONN* conn, struct ll_entry* entry) {
// Помещаем entry в очередь input normalizer
// queue_data_put забирает ownership entry
DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "Before put to input");
int result = queue_data_put(pn->input, entry, 0);
DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "After put to input");
if (result != 0) {
DEBUG_WARN(DEBUG_CATEGORY_ETCP_API, "etcp_send: queue_data_put failed (queue full?)");

16
src/pkt_normalizer.c

@ -178,7 +178,7 @@ void pn_reset(struct PKTNORM* pn) {
}
}
// Send data to packer (copies and adds to input queue or pending, triggering callback)
// Send data to packer (copies and adds to input queue or pending, triggering callback) используется только в юниттесте
void pn_packer_send(struct PKTNORM* pn, uint8_t* data, uint16_t len) {
DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "");
if (!pn || !data || len == 0) return;
@ -198,10 +198,13 @@ void pn_packer_send(struct PKTNORM* pn, uint8_t* data, uint16_t len) {
pn->flush_timer = NULL;
}
DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "PUT to input");
int ret = queue_data_put(pn->input, entry, 0);
DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "PUT to input end");
// DEBUG_INFO(DEBUG_CATEGORY_NORMALIZER, "pn_packer_send: queue_data_put returned %d, input count=%d", ret, queue_entry_count(pn->input));
}
// Internal: Packer callback
static void packer_cb(struct ll_queue* q, void* arg) {
DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "");
@ -263,14 +266,17 @@ static void pn_buf_renew(struct PKTNORM* pn) {
// Internal: Process input when etcp->input_queue is ready (empty)
static void etcp_input_ready_cb(struct ll_queue* q, void* arg) {
DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "");
struct PKTNORM* pn = (struct PKTNORM*)arg;
if (!pn) return;
DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "");
DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "before get");
struct ll_entry* in_dgram = queue_data_get(pn->input);
if (!in_dgram) { queue_resume_callback(pn->input); return; }
DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "after get");
if (!in_dgram) {
DEBUG_ERROR(DEBUG_CATEGORY_NORMALIZER, "Q EMPTY");
queue_resume_callback(pn->input);
return;
}
if (debug_should_output(DEBUG_LEVEL_DEBUG, DEBUG_CATEGORY_DUMP)) log_dump("->NORM", in_dgram->dgram, in_dgram->len);

2
src/pkt_normalizer.h

@ -58,6 +58,6 @@ void pn_reset(struct PKTNORM* pn);
void pn_unpacker_reset_state(struct PKTNORM* pn);
// создаёт malloc data, копирует, помещает в input.
void pn_packer_send(struct PKTNORM* pn, uint8_t* data, uint16_t len);
//void pn_packer_send(struct PKTNORM* pn, uint8_t* data, uint16_t len);
#endif // PKT_NORMALIZER_H
Loading…
Cancel
Save