Browse Source

race condition fix (win tun)

nodeinfo-routing-update
jeka 4 weeks ago
parent
commit
5d4cef0c3e
  1. 16
      lib/ll_queue.c
  2. 2
      lib/ll_queue.h
  3. 13
      lib/u_async.c
  4. 3
      lib/u_async.h
  5. 2
      src/etcp_api.c
  6. 3
      src/tun_windows.c

16
lib/ll_queue.c

@ -270,10 +270,12 @@ static void check_waiters(struct ll_queue* q) {
int queue_data_put(struct ll_queue* q, struct ll_entry* entry, uint32_t id) {
if (!q || !entry) return -1;
#ifdef QUEUE_THREAD_CHECK
queue_check_thread(q);
#endif
#ifdef QUEUE_DEBUG
queue_check_consistency(q);// !!!! for debug - BEFORE callback
// queue_check_consistency(q);// !!!! for debug - BEFORE callback
#endif
entry->id = id;
@ -327,8 +329,10 @@ int queue_data_put(struct ll_queue* q, struct ll_entry* entry, uint32_t id) {
int queue_data_put_first(struct ll_queue* q, struct ll_entry* entry, uint32_t id) {
if (!q || !entry) return -1;
#ifdef QUEUE_THREAD_CHECK
queue_check_thread(q);
#endif
entry->id = id;
// Проверить лимит размера
@ -381,7 +385,9 @@ int queue_data_put_first(struct ll_queue* q, struct ll_entry* entry, uint32_t id
struct ll_entry* queue_data_get(struct ll_queue* q) {
if (!q || !q->head) return NULL;
#ifdef QUEUE_THREAD_CHECK
queue_check_thread(q);
#endif
struct ll_entry* entry = q->head;
@ -437,8 +443,8 @@ int queue_check_consistency(struct ll_queue* q) {
if (current->next) {
if (current->next->prev != current) {
// Несоответствие в связях prev/next
DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "Queue '%s': prev/next error at entry %p",
q->name ? q->name : "unknown", (void*)current);
DEBUG_ERROR(DEBUG_CATEGORY_LL_QUEUE, "Queue '%s': prev/next error at entry %p != %p entries: %d!=%d bytes: %zu!=%zu",
q->name ? q->name : "unknown", (void*)current, (void*)current->next->prev, actual_count, q->count, actual_bytes, q->total_bytes);
return -1;
}
}
@ -512,7 +518,9 @@ struct ll_entry* queue_find_data_by_id(struct ll_queue* q, uint32_t id) {
int queue_remove_data(struct ll_queue* q, struct ll_entry* entry) {
if (!q || !entry) return -1;
#ifdef QUEUE_THREAD_CHECK
queue_check_thread(q);
#endif
// Удалить из двусвязного списка
if (entry->prev) {

2
lib/ll_queue.h

@ -12,7 +12,7 @@
#endif
#define QUEUE_DEBUG 1
#define QUEUE_THREAD_CHECK 1 // 0 to disable
//#define QUEUE_THREAD_CHECK 1 // 0 to disable
/**
* @file ll_queue.h

13
lib/u_async.c

@ -1426,6 +1426,19 @@ void uasync_get_stats(struct UASYNC* ua, size_t* timer_alloc, size_t* timer_u_fr
if (socket_u_free) *socket_u_free = ua->socket_free_count;
}
void uasync_memsync(struct UASYNC* ua) {
#ifdef _WIN32
EnterCriticalSection(&ua->posted_lock);
#else
pthread_mutex_lock(&ua->posted_lock);
#endif
#ifdef _WIN32
LeaveCriticalSection(&ua->posted_lock);
#else
pthread_mutex_unlock(&ua->posted_lock);
#endif
}
void uasync_post(struct UASYNC* ua, uasync_post_callback_t callback, void* arg) {
if (!ua || !callback) return;

3
lib/u_async.h

@ -108,4 +108,7 @@ int uasync_get_wakeup_fd(struct UASYNC* ua); // returns write fd for wakeup pipe
// сообщить async (из другого thread) чтобы он вызвал callback с аргументом
void uasync_post(struct UASYNC* ua, uasync_post_callback_t callback, void* user_arg);
// синхронизация памяти (для доступности из других потоков)
void uasync_memsync(struct UASYNC* ua);
#endif // UASYNC_H

2
src/etcp_api.c

@ -88,7 +88,7 @@ int etcp_send(struct ETCP_CONN* conn, struct ll_entry* entry) {
// Помещаем entry в очередь input normalizer
// queue_data_put забирает ownership entry
DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "Before put to input");
// DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "Before put to input");
int result = queue_data_put(pn->input, entry, 0);
DEBUG_TRACE(DEBUG_CATEGORY_NORMALIZER, "After put to input");

3
src/tun_windows.c

@ -273,7 +273,8 @@ DWORD WINAPI tun_read_thread_proc(LPVOID arg)
memcpy(data + 1, wintun_pkt, size);
WintunReleaseReceivePacket(session, wintun_pkt); /* сразу отдаём кольцо */
struct ll_entry* pkt = queue_entry_new_from_pool(tun->pool);
// struct ll_entry* pkt = queue_entry_new_from_pool(tun->pool);// from pool нельзя - нет thred safe (memory consistency)
struct ll_entry* pkt = queue_entry_new(0);
if (!pkt) {
u_free(data);
tun->read_errors++;

Loading…
Cancel
Save