You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1631 lines
54 KiB
1631 lines
54 KiB
// uasync.c |
|
|
|
#include "u_async.h" |
|
#include "platform_compat.h" |
|
#include "debug_config.h" |
|
#include "mem.h" |
|
#include "memory_pool.h" |
|
#include <stdio.h> |
|
#include <string.h> |
|
#include <stdlib.h> |
|
#include <errno.h> |
|
#include <limits.h> |
|
#include <pthread.h> |
|
#include <unistd.h> |
|
|
|
#include "../lib/platform_compat.h" |
|
//#ifdef _WIN32 |
|
//#include <windows.h> |
|
//#else |
|
//#include <sys/time.h> |
|
//#endif |
|
|
|
// Platform-specific includes |
|
#ifdef __linux__ |
|
#include <sys/epoll.h> |
|
#define HAS_EPOLL 1 |
|
#else |
|
#define HAS_EPOLL 0 |
|
#endif |
|
|
|
|
|
|
|
// Timeout node with safe cancellation |
|
struct timeout_node { |
|
void* arg; |
|
timeout_callback_t callback; |
|
uint64_t expiration_ms; // absolute expiration time in milliseconds |
|
struct UASYNC* ua; // Pointer back to uasync instance for counter updates |
|
struct timeout_node* next; // For immediate queue (FIFO) |
|
}; |
|
|
|
// Socket node with array-based storage |
|
struct socket_node { |
|
int fd; // File descriptor (for pipe, file) |
|
socket_t sock; // Socket (for cross-platform sockets) |
|
int type; // SOCKET_NODE_TYPE_FD or SOCKET_NODE_TYPE_SOCK |
|
socket_callback_t read_cbk; // For FD type |
|
socket_callback_t write_cbk; // For FD type |
|
socket_t_callback_t read_cbk_sock; // For SOCK type |
|
socket_t_callback_t write_cbk_sock; // For SOCK type |
|
socket_callback_t except_cbk; |
|
void* user_data; |
|
int active; // 1 if socket is active, 0 if u_freed (for reuse) |
|
}; |
|
|
|
// Array-based socket management for O(1) operations |
|
struct socket_array { |
|
struct socket_node* sockets; // Dynamic array of socket nodes |
|
int* fd_to_index; // FD to array index mapping |
|
int* index_to_fd; // Array index to FD mapping |
|
int* active_indices; // Array of indices of active sockets (for O(1) traversal) |
|
int capacity; // Total allocated capacity |
|
int count; // Number of active sockets |
|
int max_fd; // Maximum FD for bounds checking |
|
}; |
|
|
|
static struct socket_array* socket_array_create(int initial_capacity); |
|
static void socket_array_destroy(struct socket_array* sa); |
|
static int socket_array_add(struct socket_array* sa, int fd, socket_callback_t read_cbk, socket_callback_t write_cbk, socket_callback_t except_cbk, void* user_data); |
|
static int socket_array_remove(struct socket_array* sa, int fd); |
|
static struct socket_node* socket_array_get(struct socket_array* sa, int fd); |
|
|
|
// No global instance - each module must use its own struct UASYNC instance |
|
|
|
// Array-based socket management implementation |
|
static struct socket_array* socket_array_create(int initial_capacity) { |
|
if (initial_capacity < 4) initial_capacity = 4; // Minimum capacity |
|
|
|
struct socket_array* sa = u_malloc(sizeof(struct socket_array)); |
|
if (!sa) return NULL; |
|
|
|
sa->sockets = u_calloc(initial_capacity, sizeof(struct socket_node)); |
|
sa->fd_to_index = u_calloc(initial_capacity, sizeof(int)); |
|
sa->index_to_fd = u_calloc(initial_capacity, sizeof(int)); |
|
sa->active_indices = u_calloc(initial_capacity, sizeof(int)); |
|
|
|
if (!sa->sockets || !sa->fd_to_index || !sa->index_to_fd || !sa->active_indices) { |
|
u_free(sa->sockets); |
|
u_free(sa->fd_to_index); |
|
u_free(sa->index_to_fd); |
|
u_free(sa->active_indices); |
|
u_free(sa); |
|
return NULL; |
|
} |
|
|
|
// Initialize mapping arrays to -1 (invalid) |
|
for (int i = 0; i < initial_capacity; i++) { |
|
sa->fd_to_index[i] = -1; |
|
sa->index_to_fd[i] = -1; |
|
sa->active_indices[i] = -1; |
|
sa->sockets[i].fd = -1; |
|
sa->sockets[i].active = 0; |
|
} |
|
|
|
sa->capacity = initial_capacity; |
|
sa->count = 0; |
|
sa->max_fd = -1; |
|
|
|
return sa; |
|
} |
|
|
|
static void socket_array_destroy(struct socket_array* sa) { |
|
if (!sa) return; |
|
|
|
u_free(sa->sockets); |
|
u_free(sa->fd_to_index); |
|
u_free(sa->index_to_fd); |
|
u_free(sa->active_indices); |
|
u_free(sa); |
|
} |
|
|
|
static int socket_array_add_internal(struct socket_array* sa, int fd, socket_t sock, int type, |
|
socket_callback_t read_cbk_fd, socket_callback_t write_cbk_fd, |
|
socket_t_callback_t read_cbk_sock, socket_t_callback_t write_cbk_sock, |
|
socket_callback_t except_cbk, void* user_data) { |
|
if (!sa || fd < 0) return -1; |
|
// FD_SETSIZE check only for POSIX systems - Windows sockets can have any value |
|
#ifndef _WIN32 |
|
if (fd >= FD_SETSIZE) return -1; |
|
#endif |
|
if (fd >= sa->capacity) { |
|
// Need to resize - double the capacity |
|
int new_capacity = sa->capacity * 2; |
|
if (fd >= new_capacity) new_capacity = fd + 16; // Ensure enough space |
|
|
|
struct socket_node* new_sockets = u_realloc(sa->sockets, new_capacity * sizeof(struct socket_node)); |
|
int* new_fd_to_index = u_realloc(sa->fd_to_index, new_capacity * sizeof(int)); |
|
int* new_index_to_fd = u_realloc(sa->index_to_fd, new_capacity * sizeof(int)); |
|
int* new_active_indices = u_realloc(sa->active_indices, new_capacity * sizeof(int)); |
|
|
|
if (!new_sockets || !new_fd_to_index || !new_index_to_fd || !new_active_indices) { |
|
// Allocation failed |
|
u_free(new_sockets); |
|
u_free(new_fd_to_index); |
|
u_free(new_index_to_fd); |
|
u_free(new_active_indices); |
|
return -1; |
|
} |
|
|
|
// Initialize new elements |
|
for (int i = sa->capacity; i < new_capacity; i++) { |
|
new_fd_to_index[i] = -1; |
|
new_index_to_fd[i] = -1; |
|
new_active_indices[i] = -1; |
|
new_sockets[i].fd = -1; |
|
new_sockets[i].active = 0; |
|
} |
|
|
|
sa->sockets = new_sockets; |
|
sa->fd_to_index = new_fd_to_index; |
|
sa->index_to_fd = new_index_to_fd; |
|
sa->active_indices = new_active_indices; |
|
sa->capacity = new_capacity; |
|
} |
|
|
|
// Check if FD already exists |
|
if (sa->fd_to_index[fd] != -1) return -1; // FD already exists |
|
|
|
// Find first u_free slot |
|
int index = -1; |
|
for (int i = 0; i < sa->capacity; i++) { |
|
if (!sa->sockets[i].active) { |
|
index = i; |
|
break; |
|
} |
|
} |
|
|
|
if (index == -1) return -1; // No u_free slots (shouldn't happen) |
|
|
|
// Add the socket |
|
sa->sockets[index].fd = fd; |
|
sa->sockets[index].sock = sock; |
|
sa->sockets[index].type = type; |
|
sa->sockets[index].read_cbk = read_cbk_fd; |
|
sa->sockets[index].write_cbk = write_cbk_fd; |
|
sa->sockets[index].read_cbk_sock = read_cbk_sock; |
|
sa->sockets[index].write_cbk_sock = write_cbk_sock; |
|
sa->sockets[index].except_cbk = except_cbk; |
|
sa->sockets[index].user_data = user_data; |
|
sa->sockets[index].active = 1; |
|
|
|
sa->fd_to_index[fd] = index; |
|
sa->index_to_fd[index] = fd; |
|
sa->active_indices[sa->count] = index; // Add to active list |
|
sa->count++; |
|
|
|
if (fd > sa->max_fd) sa->max_fd = fd; |
|
|
|
return index; |
|
} |
|
|
|
// Wrapper for adding regular file descriptors (pipe, file) |
|
static int socket_array_add(struct socket_array* sa, int fd, socket_callback_t read_cbk, |
|
socket_callback_t write_cbk, socket_callback_t except_cbk, void* user_data) { |
|
return socket_array_add_internal(sa, fd, SOCKET_INVALID, SOCKET_NODE_TYPE_FD, |
|
read_cbk, write_cbk, NULL, NULL, except_cbk, user_data); |
|
} |
|
|
|
// Wrapper for adding socket_t (cross-platform sockets) |
|
static int socket_array_add_socket_t(struct socket_array* sa, socket_t sock, socket_t_callback_t read_cbk, |
|
socket_t_callback_t write_cbk, socket_callback_t except_cbk, void* user_data) { |
|
// On Windows, SOCKET is UINT_PTR, so we need to handle indexing differently |
|
#ifdef _WIN32 |
|
int fd = (int)(intptr_t)sock; // Use socket value as index on Windows (simplified) |
|
if (fd < 0) return -1; // Windows sockets can have any value, only check negative |
|
#else |
|
int fd = sock; // On POSIX, socket_t is int |
|
if (fd < 0 || fd >= FD_SETSIZE) return -1; |
|
#endif |
|
return socket_array_add_internal(sa, fd, sock, SOCKET_NODE_TYPE_SOCK, |
|
NULL, NULL, read_cbk, write_cbk, except_cbk, user_data); |
|
} |
|
|
|
static int socket_array_remove(struct socket_array* sa, int fd) { |
|
if (!sa || fd < 0 || fd >= sa->capacity) return -1; |
|
|
|
int index = sa->fd_to_index[fd]; |
|
if (index == -1 || !sa->sockets[index].active) return -1; // FD not found |
|
|
|
// Mark as inactive |
|
sa->sockets[index].active = 0; |
|
sa->sockets[index].fd = -1; |
|
sa->sockets[index].sock = SOCKET_INVALID; |
|
sa->sockets[index].type = SOCKET_NODE_TYPE_FD; |
|
sa->fd_to_index[fd] = -1; |
|
sa->index_to_fd[index] = -1; |
|
|
|
// Remove from active_indices by swapping with last element |
|
// Find position in active_indices |
|
for (int i = 0; i < sa->count; i++) { |
|
if (sa->active_indices[i] == index) { |
|
// Swap with last element |
|
sa->active_indices[i] = sa->active_indices[sa->count - 1]; |
|
sa->active_indices[sa->count - 1] = -1; |
|
break; |
|
} |
|
} |
|
sa->count--; |
|
|
|
return 0; |
|
} |
|
|
|
static struct socket_node* socket_array_get(struct socket_array* sa, int fd) { |
|
if (!sa || fd < 0 || fd >= sa->capacity) return NULL; |
|
|
|
int index = sa->fd_to_index[fd]; |
|
if (index == -1 || !sa->sockets[index].active) return NULL; |
|
|
|
return &sa->sockets[index]; |
|
} |
|
|
|
// Get socket_node by socket_t |
|
static struct socket_node* socket_array_get_by_sock(struct socket_array* sa, socket_t sock) { |
|
if (!sa) return NULL; |
|
#ifdef _WIN32 |
|
int fd = (int)(intptr_t)sock; |
|
#else |
|
int fd = sock; |
|
#endif |
|
if (fd < 0 || fd >= sa->capacity) return NULL; |
|
|
|
int index = sa->fd_to_index[fd]; |
|
if (index == -1 || !sa->sockets[index].active) return NULL; |
|
if (sa->sockets[index].type != SOCKET_NODE_TYPE_SOCK) return NULL; |
|
|
|
return &sa->sockets[index]; |
|
} |
|
|
|
// Callback to u_free timeout node and update counters |
|
static void timeout_node_free_callback(void* user_data, void* data) { |
|
struct UASYNC* ua = (struct UASYNC*)user_data; |
|
struct timeout_node* node = (struct timeout_node*)data; |
|
(void)node; // Not used directly, but keep for consistency |
|
ua->timer_free_count++; |
|
memory_pool_free(ua->timeout_pool, data); |
|
} |
|
|
|
// Helper to get current time |
|
static void get_current_time(struct timeval* tv) { |
|
#ifdef _WIN32 |
|
// Для Windows используем GetTickCount или другие механизмы |
|
FILETIME ft; |
|
GetSystemTimeAsFileTime(&ft); |
|
|
|
ULARGE_INTEGER ul; |
|
ul.LowPart = ft.dwLowDateTime; |
|
ul.HighPart = ft.dwHighDateTime; |
|
|
|
// Конвертируем 100-наносекундные интервалы в секунды и микросекунды |
|
tv->tv_sec = (long)((ul.QuadPart - 116444736000000000ULL) / 10000000ULL); |
|
tv->tv_usec = (long)((ul.QuadPart % 10000000ULL) / 10); |
|
#else |
|
// Для Linux и других Unix-подобных систем используем gettimeofday |
|
gettimeofday(tv, NULL); |
|
#endif |
|
} |
|
|
|
#ifdef _WIN32 |
|
uint64_t get_time_tb(void) { |
|
LARGE_INTEGER freq, count; |
|
QueryPerformanceFrequency(&freq); |
|
QueryPerformanceCounter(&count); |
|
double t = (double)count.QuadPart * 10000.0 / (double)freq.QuadPart; |
|
return (uint64_t)t; |
|
} |
|
//uint64_t get_time_tb(void) { |
|
// LARGE_INTEGER freq, count; |
|
// QueryPerformanceFrequency(&freq); // Получаем частоту таймера |
|
// QueryPerformanceCounter(&count); // Получаем текущее значение счётчика |
|
// return (uint64_t)(count.QuadPart * 10000ULL) / (uint64_t)freq.QuadPart; // Преобразуем в требуемые единицы времени |
|
//} |
|
#else |
|
uint64_t get_time_tb(void) { |
|
struct timespec ts; |
|
clock_gettime(CLOCK_MONOTONIC, &ts); |
|
return (uint64_t)ts.tv_sec * 10000ULL + (uint64_t)ts.tv_nsec / 100000ULL; // Преобразуем в требуемые единицы времени |
|
} |
|
#endif |
|
|
|
|
|
|
|
// Drain wakeup pipe - read all available bytes |
|
static void drain_wakeup_pipe(struct UASYNC* ua) { |
|
if (!ua || !ua->wakeup_initialized) return; |
|
|
|
char buf[64]; |
|
while (1) { |
|
ssize_t n = read(ua->wakeup_pipe[0], buf, sizeof(buf)); |
|
if (n <= 0) break; |
|
} |
|
} |
|
|
|
// Process posted tasks (lock-u_free during execution) |
|
static void process_posted_tasks(struct UASYNC* ua) { |
|
if (!ua) return; |
|
|
|
struct posted_task* list = NULL; |
|
|
|
|
|
#ifdef _WIN32 |
|
EnterCriticalSection(&ua->posted_lock); |
|
#else |
|
pthread_mutex_lock(&ua->posted_lock); |
|
#endif |
|
list = ua->posted_tasks_head; |
|
ua->posted_tasks_head = ua->posted_tasks_tail = NULL; |
|
#ifdef _WIN32 |
|
LeaveCriticalSection(&ua->posted_lock); |
|
#else |
|
pthread_mutex_unlock(&ua->posted_lock); |
|
#endif |
|
|
|
while (list) { |
|
DEBUG_DEBUG(DEBUG_CATEGORY_TUN, "POSTed task get"); |
|
struct posted_task* t = list; |
|
list = list->next; |
|
|
|
if (t->callback) { |
|
t->callback(t->arg); |
|
} |
|
u_free(t); |
|
} |
|
} |
|
|
|
// Unified wakeup handler (drain + execute posted callbacks) |
|
static void handle_wakeup(struct UASYNC* ua) { |
|
if (!ua || !ua->wakeup_initialized) return; |
|
|
|
// Drain the wakeup pipe/socket |
|
#ifdef _WIN32 |
|
char buf[64]; |
|
SOCKET s = (SOCKET)(intptr_t)ua->wakeup_pipe[0]; |
|
while (recv(s, buf, sizeof(buf), 0) > 0) {} |
|
#else |
|
char buf[64]; |
|
while (read(ua->wakeup_pipe[0], buf, sizeof(buf)) > 0) {} |
|
#endif |
|
|
|
// DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "POST: wakeup process"); |
|
// Execute all posted callbacks (in main thread) |
|
process_posted_tasks(ua); |
|
} |
|
|
|
|
|
// Helper to add timeval: tv += dt (timebase units) |
|
static void timeval_add_tb(struct timeval* tv, int dt) { |
|
tv->tv_usec += (dt % 10000) * 100; |
|
tv->tv_sec += dt / 10000 + tv->tv_usec / 1000000; |
|
tv->tv_usec %= 1000000; |
|
} |
|
|
|
// Convert timeval to milliseconds (uint64_t) |
|
static uint64_t timeval_to_ms(const struct timeval* tv) { |
|
return (uint64_t)tv->tv_sec * 1000ULL + (uint64_t)tv->tv_usec / 1000ULL; |
|
} |
|
|
|
|
|
|
|
// Simplified timeout handling without reference counting |
|
|
|
// Process expired timeouts with safe cancellation |
|
static void process_timeouts(struct UASYNC* ua) { |
|
if (!ua) return; |
|
|
|
// Сначала обрабатываем immediate_queue (FIFO) |
|
while (ua->immediate_queue_head) { |
|
struct timeout_node* node = ua->immediate_queue_head; |
|
ua->immediate_queue_head = node->next; |
|
if (!ua->immediate_queue_head) { |
|
ua->immediate_queue_tail = NULL; |
|
} |
|
|
|
if (node && node->callback) { |
|
node->callback(node->arg); |
|
} |
|
|
|
if (node && node->ua) { |
|
node->ua->timer_free_count++; |
|
} |
|
memory_pool_free(ua->timeout_pool, node); |
|
} |
|
|
|
if (!ua->timeout_heap) return; |
|
|
|
struct timeval now_tv; |
|
get_current_time(&now_tv); |
|
uint64_t now_ms = timeval_to_ms(&now_tv); |
|
|
|
while (1) { |
|
TimeoutEntry entry; |
|
if (timeout_heap_peek(ua->timeout_heap, &entry) != 0) break; |
|
if (entry.expiration > now_ms) break; |
|
|
|
// Pop the expired timeout |
|
timeout_heap_pop(ua->timeout_heap, &entry); |
|
struct timeout_node* node = (struct timeout_node*)entry.data; |
|
|
|
if (node && node->callback) { |
|
// Execute callback only if not cancelled |
|
node->callback(node->arg); |
|
} |
|
|
|
// Always u_free the node after processing |
|
if (node && node->ua) { |
|
node->ua->timer_free_count++; |
|
} |
|
memory_pool_free(ua->timeout_pool, node); |
|
continue; // Process next expired timeout |
|
} |
|
} |
|
|
|
// Compute time to next timeout |
|
static void get_next_timeout(struct UASYNC* ua, struct timeval* tv) { |
|
if (!ua || !ua->timeout_heap) { |
|
tv->tv_sec = 0; |
|
tv->tv_usec = 0; |
|
return; |
|
} |
|
|
|
TimeoutEntry entry; |
|
if (timeout_heap_peek(ua->timeout_heap, &entry) != 0) { |
|
tv->tv_sec = 0; |
|
tv->tv_usec = 0; |
|
return; |
|
} |
|
|
|
struct timeval now_tv; |
|
get_current_time(&now_tv); |
|
uint64_t now_ms = timeval_to_ms(&now_tv); |
|
|
|
if (entry.expiration <= now_ms) { |
|
tv->tv_sec = 0; |
|
tv->tv_usec = 0; |
|
return; |
|
} |
|
|
|
uint64_t delta_ms = entry.expiration - now_ms; |
|
tv->tv_sec = delta_ms / 1000; |
|
tv->tv_usec = (delta_ms % 1000) * 1000; |
|
} |
|
|
|
|
|
|
|
// Instance version |
|
void* uasync_set_timeout(struct UASYNC* ua, int timeout_tb, void* arg, timeout_callback_t callback) { |
|
if (!ua || timeout_tb < 0 || !callback) return NULL; |
|
if (!ua->timeout_heap) return NULL; |
|
|
|
// DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_set_timeout: timeout=%d.%d ms, arg=%p, callback=%p", timeout_tb/10, timeout_tb%10, arg, callback); |
|
|
|
struct timeout_node* node = memory_pool_alloc(ua->timeout_pool); |
|
if (!node) { |
|
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "uasync_set_timeout: failed to allocate node"); |
|
return NULL; |
|
} |
|
ua->timer_alloc_count++; |
|
|
|
node->arg = arg; |
|
node->callback = callback; |
|
node->ua = ua; |
|
|
|
// Calculate expiration time in milliseconds |
|
struct timeval now; |
|
get_current_time(&now); |
|
timeval_add_tb(&now, timeout_tb); |
|
node->expiration_ms = timeval_to_ms(&now); |
|
|
|
// Add to heap |
|
if (timeout_heap_push(ua->timeout_heap, node->expiration_ms, node) != 0) { |
|
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "uasync_set_timeout: failed to push to heap"); |
|
memory_pool_free(ua->timeout_pool, node); |
|
ua->timer_free_count++; // Balance the alloc counter |
|
return NULL; |
|
} |
|
|
|
return node; |
|
} |
|
|
|
// Immediate execution in next mainloop (FIFO order) |
|
void* uasync_call_soon(struct UASYNC* ua, void* user_arg, timeout_callback_t callback) { |
|
if (!ua || !callback) return NULL; |
|
if (!ua->timeout_pool) return NULL; |
|
|
|
struct timeout_node* node = memory_pool_alloc(ua->timeout_pool); |
|
if (!node) { |
|
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "uasync_call_soon: failed to allocate node"); |
|
return NULL; |
|
} |
|
ua->timer_alloc_count++; |
|
|
|
node->arg = user_arg; |
|
node->callback = callback; |
|
node->ua = ua; |
|
node->expiration_ms = 0; |
|
node->next = NULL; |
|
|
|
// FIFO: добавляем в конец очереди |
|
if (ua->immediate_queue_tail) { |
|
ua->immediate_queue_tail->next = node; |
|
ua->immediate_queue_tail = node; |
|
} else { |
|
ua->immediate_queue_head = ua->immediate_queue_tail = node; |
|
} |
|
|
|
return node; |
|
} |
|
|
|
// Cancel immediate callback by setting callback to NULL - O(1) |
|
err_t uasync_call_soon_cancel(struct UASYNC* ua, void* t_id) { |
|
if (!ua || !t_id) return ERR_FAIL; |
|
|
|
struct timeout_node* node = (struct timeout_node*)t_id; |
|
if (node->ua != ua) return ERR_FAIL; |
|
|
|
// Simply nullify callback - will be skipped in process_timeouts |
|
node->callback = NULL; |
|
|
|
return ERR_OK; |
|
} |
|
|
|
|
|
|
|
// Instance version |
|
err_t uasync_cancel_timeout(struct UASYNC* ua, void* t_id) { |
|
if (!ua || !t_id || !ua->timeout_heap) { |
|
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "uasync_cancel_timeout: invalid parameters ua=%p, t_id=%p, heap=%p", |
|
ua, t_id, ua ? ua->timeout_heap : NULL); |
|
return ERR_FAIL; |
|
} |
|
|
|
struct timeout_node* node = (struct timeout_node*)t_id; |
|
|
|
// Try to cancel from heap first |
|
if (timeout_heap_cancel(ua->timeout_heap, node->expiration_ms, node) == 0) { |
|
// Successfully marked as deleted - u_free will happen lazily in heap |
|
node->callback = NULL; |
|
// DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_cancel_timeout: successfully cancelled timer %p from heap", node); |
|
return ERR_OK; |
|
} |
|
|
|
DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_cancel_timeout: not found in heap: ua=%p, t_id=%p, node=%p, expires=%llu ms", |
|
ua, t_id, node, (unsigned long long)node->expiration_ms); |
|
|
|
// If not found in heap, it may have already expired or been invalid |
|
return ERR_FAIL; |
|
} |
|
|
|
|
|
// Instance version |
|
void* uasync_add_socket(struct UASYNC* ua, int fd, socket_callback_t read_cbk, socket_callback_t write_cbk, socket_callback_t except_cbk, void* user_data) { |
|
if (!ua || fd < 0) return NULL; |
|
// FD_SETSIZE check only for POSIX systems - Windows sockets can have any value |
|
#ifndef _WIN32 |
|
if (fd >= FD_SETSIZE) return NULL; |
|
#endif |
|
|
|
int index = socket_array_add(ua->sockets, fd, read_cbk, write_cbk, except_cbk, user_data); |
|
if (index < 0) return NULL; |
|
|
|
ua->socket_alloc_count++; |
|
ua->poll_fds_dirty = 1; // Mark poll_fds as needing rebuild |
|
|
|
#if HAS_EPOLL |
|
// Add to epoll if using epoll |
|
if (ua->use_epoll && ua->epoll_fd >= 0) { |
|
struct epoll_event ev; |
|
ev.events = 0; |
|
if (read_cbk) ev.events |= EPOLLIN; |
|
if (write_cbk) ev.events |= EPOLLOUT; |
|
// Use level-triggered mode (default) for compatibility with UDP sockets |
|
ev.data.ptr = &ua->sockets->sockets[index]; |
|
|
|
if (epoll_ctl(ua->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) { |
|
// Failed to add to epoll - remove from socket array and return error |
|
socket_array_remove(ua->sockets, fd); |
|
ua->socket_alloc_count--; |
|
return NULL; |
|
} |
|
} |
|
#endif |
|
|
|
// Return pointer to the socket node as ID |
|
return &ua->sockets->sockets[index]; |
|
} |
|
|
|
err_t uasync_remove_socket(struct UASYNC* ua, void* s_id) { |
|
if (!ua || !s_id) return ERR_FAIL; |
|
|
|
struct socket_node* node = (struct socket_node*)s_id; |
|
if (!node->active || node->fd < 0) return ERR_FAIL; |
|
|
|
int fd = node->fd; |
|
|
|
#if HAS_EPOLL |
|
// Remove from epoll if using epoll |
|
if (ua->use_epoll && ua->epoll_fd >= 0) { |
|
epoll_ctl(ua->epoll_fd, EPOLL_CTL_DEL, fd, NULL); |
|
} |
|
#endif |
|
|
|
int ret = socket_array_remove(ua->sockets, fd); |
|
if (ret == 0) { |
|
ua->socket_free_count++; |
|
ua->poll_fds_dirty = 1; // Mark poll_fds as needing rebuild |
|
return ERR_OK; |
|
} |
|
return ERR_FAIL; |
|
} |
|
|
|
// Add socket_t (cross-platform socket) |
|
void* uasync_add_socket_t(struct UASYNC* ua, socket_t sock, socket_t_callback_t read_cbk, |
|
socket_t_callback_t write_cbk, socket_t_callback_t except_cbk, void* user_data) { |
|
if (!ua || sock == SOCKET_INVALID) return NULL; |
|
|
|
int index = socket_array_add_socket_t(ua->sockets, sock, read_cbk, write_cbk, |
|
(socket_callback_t)except_cbk, user_data); |
|
if (index < 0) return NULL; |
|
|
|
ua->socket_alloc_count++; |
|
ua->poll_fds_dirty = 1; |
|
|
|
#if HAS_EPOLL |
|
if (ua->use_epoll && ua->epoll_fd >= 0) { |
|
struct epoll_event ev; |
|
ev.events = 0; |
|
if (read_cbk) ev.events |= EPOLLIN; |
|
if (write_cbk) ev.events |= EPOLLOUT; |
|
ev.data.ptr = &ua->sockets->sockets[index]; |
|
|
|
// On Windows, need to cast socket_t to int for epoll_ctl |
|
#ifdef _WIN32 |
|
int fd = (int)(intptr_t)sock; |
|
#else |
|
int fd = sock; |
|
#endif |
|
if (epoll_ctl(ua->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) { |
|
socket_array_remove(ua->sockets, fd); |
|
ua->socket_alloc_count--; |
|
return NULL; |
|
} |
|
} |
|
#endif |
|
|
|
return &ua->sockets->sockets[index]; |
|
} |
|
|
|
// Remove socket by socket_t |
|
err_t uasync_remove_socket_t(struct UASYNC* ua, socket_t sock) { |
|
if (!ua || sock == SOCKET_INVALID) return ERR_FAIL; |
|
|
|
struct socket_node* node = socket_array_get_by_sock(ua->sockets, sock); |
|
if (!node || !node->active) return ERR_FAIL; |
|
|
|
#if HAS_EPOLL |
|
if (ua->use_epoll && ua->epoll_fd >= 0) { |
|
#ifdef _WIN32 |
|
int fd = (int)(intptr_t)sock; |
|
#else |
|
int fd = sock; |
|
#endif |
|
epoll_ctl(ua->epoll_fd, EPOLL_CTL_DEL, fd, NULL); |
|
} |
|
#endif |
|
|
|
#ifdef _WIN32 |
|
int fd = (int)(intptr_t)sock; |
|
#else |
|
int fd = sock; |
|
#endif |
|
int ret = socket_array_remove(ua->sockets, fd); |
|
if (ret == 0) { |
|
ua->socket_free_count++; |
|
ua->poll_fds_dirty = 1; |
|
return ERR_OK; |
|
} |
|
return ERR_FAIL; |
|
} |
|
|
|
// Helper function to rebuild cached pollfd array |
|
static void rebuild_poll_fds(struct UASYNC* ua) { |
|
if (!ua || !ua->sockets) return; |
|
|
|
int socket_count = ua->sockets->count; |
|
int wakeup_fd_present = ua->wakeup_initialized && ua->wakeup_pipe[0] >= 0; |
|
int total_fds = socket_count + wakeup_fd_present; |
|
|
|
// Ensure poll_fds capacity is sufficient |
|
if (total_fds > ua->poll_fds_capacity) { |
|
int new_capacity = total_fds * 2; |
|
if (new_capacity < 16) new_capacity = 16; |
|
|
|
struct pollfd* new_poll_fds = u_realloc(ua->poll_fds, sizeof(struct pollfd) * new_capacity); |
|
if (!new_poll_fds) return; // Keep old allocation on failure |
|
|
|
ua->poll_fds = new_poll_fds; |
|
ua->poll_fds_capacity = new_capacity; |
|
} |
|
|
|
int idx = 0; |
|
|
|
// Add wakeup fd first if present |
|
if (wakeup_fd_present) { |
|
ua->poll_fds[idx].fd = ua->wakeup_pipe[0]; |
|
ua->poll_fds[idx].events = POLLIN; |
|
ua->poll_fds[idx].revents = 0; |
|
idx++; |
|
} |
|
|
|
// Add socket fds using active_indices for O(1) traversal |
|
for (int i = 0; i < socket_count; i++) { |
|
int socket_array_idx = ua->sockets->active_indices[i]; |
|
struct socket_node* cur = &ua->sockets->sockets[socket_array_idx]; |
|
|
|
// Handle socket_t vs int fd |
|
if (cur->type == SOCKET_NODE_TYPE_SOCK) { |
|
// socket_t - cast to int for pollfd |
|
#ifdef _WIN32 |
|
ua->poll_fds[idx].fd = (int)(intptr_t)cur->sock; |
|
#else |
|
ua->poll_fds[idx].fd = cur->sock; |
|
#endif |
|
} else { |
|
// Regular fd |
|
ua->poll_fds[idx].fd = cur->fd; |
|
} |
|
ua->poll_fds[idx].events = 0; |
|
ua->poll_fds[idx].revents = 0; |
|
|
|
if (cur->type == SOCKET_NODE_TYPE_SOCK) { |
|
if (cur->read_cbk_sock) ua->poll_fds[idx].events |= POLLIN; |
|
if (cur->write_cbk_sock) ua->poll_fds[idx].events |= POLLOUT; |
|
} else { |
|
if (cur->read_cbk) ua->poll_fds[idx].events |= POLLIN; |
|
if (cur->write_cbk) ua->poll_fds[idx].events |= POLLOUT; |
|
} |
|
if (cur->write_cbk) ua->poll_fds[idx].events |= POLLOUT; |
|
if (cur->except_cbk) ua->poll_fds[idx].events |= POLLPRI; |
|
|
|
idx++; |
|
} |
|
|
|
ua->poll_fds_count = total_fds; |
|
ua->poll_fds_dirty = 0; |
|
} |
|
|
|
// Process events from epoll (Linux only) |
|
#if HAS_EPOLL |
|
static void process_epoll_events(struct UASYNC* ua, struct epoll_event* events, int n_events) { |
|
for (int i = 0; i < n_events; i++) { |
|
// Check if this is the wakeup fd (data.ptr is NULL) |
|
if (events[i].data.ptr == NULL) { |
|
if (events[i].events & EPOLLIN) { |
|
drain_wakeup_pipe(ua); |
|
} |
|
continue; |
|
} |
|
|
|
// Socket event |
|
struct socket_node* node = (struct socket_node*)events[i].data.ptr; |
|
if (!node || !node->active) continue; |
|
|
|
/* Check for error conditions first */ |
|
if (events[i].events & (EPOLLERR | EPOLLHUP)) { |
|
if (node->except_cbk) { |
|
node->except_cbk(node->fd, node->user_data); |
|
} |
|
} |
|
|
|
/* Read readiness - use appropriate callback based on socket type */ |
|
if (events[i].events & EPOLLIN) { |
|
if (node->type == SOCKET_NODE_TYPE_SOCK) { |
|
if (node->read_cbk_sock) { |
|
node->read_cbk_sock(node->sock, node->user_data); |
|
} |
|
} else { |
|
if (node->read_cbk) { |
|
node->read_cbk(node->fd, node->user_data); |
|
} |
|
} |
|
} |
|
|
|
/* Write readiness - use appropriate callback based on socket type */ |
|
if (events[i].events & EPOLLOUT) { |
|
if (node->type == SOCKET_NODE_TYPE_SOCK) { |
|
if (node->write_cbk_sock) { |
|
node->write_cbk_sock(node->sock, node->user_data); |
|
} |
|
} else { |
|
if (node->write_cbk) { |
|
node->write_cbk(node->fd, node->user_data); |
|
} |
|
} |
|
} |
|
} |
|
} |
|
#endif |
|
|
|
// Instance version |
|
void uasync_poll(struct UASYNC* ua, int timeout_tb) { |
|
if (!ua) return; |
|
if (!ua->sockets || !ua->timeout_heap) return; |
|
|
|
DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "poll"); |
|
|
|
// Handle negative or zero timeout |
|
if (timeout_tb < 0) timeout_tb = -1; // Infinite wait |
|
else if (timeout_tb == 0) timeout_tb = 0; // No wait |
|
|
|
// Get next timeout |
|
struct timeval next_timeout; |
|
get_next_timeout(ua, &next_timeout); |
|
|
|
// Convert requested timeout to timeval |
|
struct timeval req_timeout = {0}; |
|
if (timeout_tb >= 0) { |
|
req_timeout.tv_sec = timeout_tb / 10000; |
|
req_timeout.tv_usec = (timeout_tb % 10000) * 100; |
|
} |
|
|
|
struct timeval poll_timeout; |
|
if (timeout_tb < 0) { |
|
poll_timeout = next_timeout; |
|
} else { |
|
if (next_timeout.tv_sec < req_timeout.tv_sec || |
|
(next_timeout.tv_sec == req_timeout.tv_sec && next_timeout.tv_usec < req_timeout.tv_usec)) { |
|
poll_timeout = next_timeout; |
|
} else { |
|
poll_timeout = req_timeout; |
|
} |
|
} |
|
if (poll_timeout.tv_sec == 0 && poll_timeout.tv_usec == 0 && timeout_tb > 0) poll_timeout = req_timeout; |
|
|
|
|
|
int timeout_ms; |
|
if (timeout_tb < 0 && (next_timeout.tv_sec > 0 || next_timeout.tv_usec > 0)) { |
|
timeout_ms = (poll_timeout.tv_sec * 1000) + (poll_timeout.tv_usec / 1000); |
|
} else if (timeout_tb < 0) { |
|
timeout_ms = -1; // Infinite |
|
} else { |
|
timeout_ms = (poll_timeout.tv_sec * 1000) + (poll_timeout.tv_usec / 1000); |
|
} |
|
|
|
// Count active sockets |
|
int socket_count = ua->sockets->count; |
|
if (socket_count == 0 && timeout_ms == -1) { |
|
// No sockets and infinite wait - but we have timers? Wait for timer |
|
if (ua->timeout_heap->size > 0) { |
|
timeout_ms = (poll_timeout.tv_sec * 1000) + (poll_timeout.tv_usec / 1000); |
|
} else { |
|
// Nothing to do - return immediately |
|
return; |
|
} |
|
} |
|
|
|
#if HAS_EPOLL |
|
// Use epoll on Linux if available |
|
if (ua->use_epoll && ua->epoll_fd >= 0) { |
|
struct epoll_event events[64]; // Stack-allocated array for events |
|
int max_events = 64; |
|
|
|
int ret = epoll_wait(ua->epoll_fd, events, max_events, timeout_ms); |
|
if (ret < 0) { |
|
if (errno == EINTR) { |
|
return; |
|
} |
|
perror("epoll_wait"); |
|
return; |
|
} |
|
|
|
/* Process socket events */ |
|
if (ret > 0) { |
|
process_epoll_events(ua, events, ret); |
|
} |
|
|
|
/* Process timeouts that may have expired during poll or socket processing */ |
|
process_timeouts(ua); |
|
return; |
|
} |
|
#endif |
|
|
|
// Fallback to poll() for non-Linux or if epoll failed |
|
|
|
// Include wakeup pipe if initialized |
|
int wakeup_fd_present = ua->wakeup_initialized && ua->wakeup_pipe[0] >= 0; |
|
int total_fds = socket_count + wakeup_fd_present; |
|
|
|
// If no sockets to poll, just wait and process timeouts |
|
if (total_fds == 0) { |
|
if (timeout_ms > 0) { |
|
#ifdef _WIN32 |
|
Sleep(timeout_ms); |
|
#else |
|
struct timespec ts = { timeout_ms / 1000, (timeout_ms % 1000) * 1000000 }; |
|
nanosleep(&ts, NULL); |
|
#endif |
|
} |
|
process_timeouts(ua); |
|
return; |
|
} |
|
|
|
#ifdef _WIN32 |
|
// On Windows, use select() instead of WSAPoll to avoid issues with accepted sockets |
|
fd_set read_fds, write_fds, except_fds; |
|
FD_ZERO(&read_fds); |
|
FD_ZERO(&write_fds); |
|
FD_ZERO(&except_fds); |
|
|
|
SOCKET max_fd = 0; |
|
|
|
// Add all active sockets to fd_sets |
|
for (int i = 0; i < ua->sockets->count; i++) { |
|
int idx = ua->sockets->active_indices[i]; |
|
struct socket_node* node = &ua->sockets->sockets[idx]; |
|
if (!node->active) continue; |
|
|
|
SOCKET s; |
|
if (node->type == SOCKET_NODE_TYPE_SOCK) { |
|
s = node->sock; |
|
} else { |
|
s = (SOCKET)node->fd; |
|
} |
|
|
|
if (node->type == SOCKET_NODE_TYPE_SOCK) { |
|
if (node->read_cbk_sock) FD_SET(s, &read_fds); |
|
if (node->write_cbk_sock) FD_SET(s, &write_fds); |
|
} else { |
|
if (node->read_cbk) FD_SET(s, &read_fds); |
|
if (node->write_cbk) FD_SET(s, &write_fds); |
|
} |
|
if (node->except_cbk) FD_SET(s, &except_fds); |
|
|
|
if (s > max_fd) max_fd = s; |
|
} |
|
|
|
struct timeval tv; |
|
tv.tv_sec = timeout_ms / 1000; |
|
tv.tv_usec = (timeout_ms % 1000) * 1000; |
|
|
|
int ret = select((int)max_fd + 1, &read_fds, &write_fds, &except_fds, &tv); |
|
|
|
if (ret < 0) { |
|
int err = WSAGetLastError(); |
|
if (err != WSAEINTR) { |
|
DEBUG_ERROR(DEBUG_CATEGORY_UASYNC, "select failed: %d", err); |
|
} |
|
return; |
|
} |
|
|
|
if (ret > 0) { |
|
for (int i = 0; i < ua->sockets->count; i++) { |
|
int idx = ua->sockets->active_indices[i]; |
|
struct socket_node* node = &ua->sockets->sockets[idx]; |
|
if (!node->active) continue; |
|
|
|
SOCKET s; |
|
if (node->type == SOCKET_NODE_TYPE_SOCK) { |
|
s = node->sock; |
|
} else { |
|
s = (SOCKET)node->fd; |
|
} |
|
|
|
int has_read = FD_ISSET(s, &read_fds); |
|
int has_write = FD_ISSET(s, &write_fds); |
|
int has_except = FD_ISSET(s, &except_fds); |
|
|
|
if (!has_read && !has_write && !has_except) continue; |
|
|
|
if (has_except) { |
|
if (node->except_cbk) { |
|
node->except_cbk(node->fd, node->user_data); |
|
} |
|
} |
|
|
|
if (has_read) { |
|
if (node->type == SOCKET_NODE_TYPE_SOCK) { |
|
if (node->read_cbk_sock) { |
|
node->read_cbk_sock(node->sock, node->user_data); |
|
} |
|
} else { |
|
if (node->read_cbk) { |
|
node->read_cbk(node->fd, node->user_data); |
|
} |
|
} |
|
} |
|
|
|
if (has_write) { |
|
if (node->type == SOCKET_NODE_TYPE_SOCK) { |
|
if (node->write_cbk_sock) { |
|
node->write_cbk_sock(node->sock, node->user_data); |
|
} |
|
} else { |
|
if (node->write_cbk) { |
|
node->write_cbk(node->fd, node->user_data); |
|
} |
|
} |
|
} |
|
} |
|
} |
|
#else |
|
// On non-Windows, use poll() |
|
|
|
// Rebuild poll_fds if dirty or not allocated |
|
if (ua->poll_fds_dirty || !ua->poll_fds) { |
|
rebuild_poll_fds(ua); |
|
} |
|
|
|
// Ensure poll_fds_count matches current state (in case sockets changed without dirty flag) |
|
if (ua->poll_fds_count != total_fds) { |
|
rebuild_poll_fds(ua); |
|
} |
|
|
|
int ret = poll(ua->poll_fds, ua->poll_fds_count, timeout_ms); |
|
if (ret < 0) { |
|
if (errno == EINTR) { |
|
return; |
|
} |
|
perror("poll"); |
|
return; |
|
} |
|
|
|
/* Process socket events first to give sockets higher priority */ |
|
if (ret > 0) { |
|
for (int i = 0; i < ua->poll_fds_count; i++) { |
|
if (ua->poll_fds[i].revents == 0) continue; |
|
|
|
/* Handle wakeup fd separately */ |
|
if (wakeup_fd_present && i == 0) { |
|
if (ua->poll_fds[i].revents & POLLIN) { |
|
drain_wakeup_pipe(ua); |
|
} |
|
continue; |
|
} |
|
|
|
/* Socket event - lookup by fd */ |
|
struct socket_node* node = socket_array_get(ua->sockets, ua->poll_fds[i].fd); |
|
if (!node) { // Try by socket_t (in case this is a socket) |
|
socket_t lookup_sock = ua->poll_fds[i].fd; |
|
node = socket_array_get_by_sock(ua->sockets, lookup_sock); |
|
} |
|
if (!node) continue; // Socket may have been removed |
|
|
|
/* Check for error conditions first */ |
|
if (ua->poll_fds[i].revents & (POLLERR | POLLHUP | POLLNVAL)) { |
|
/* Treat as exceptional condition */ |
|
if (node->except_cbk) { |
|
node->except_cbk(node->fd, node->user_data); |
|
} |
|
} |
|
|
|
/* Exceptional data (out-of-band) */ |
|
if (ua->poll_fds[i].revents & POLLPRI) { |
|
if (node->except_cbk) { |
|
node->except_cbk(node->fd, node->user_data); |
|
} |
|
} |
|
|
|
/* Read readiness - use appropriate callback based on socket type */ |
|
if (ua->poll_fds[i].revents & POLLIN) { |
|
if (node->type == SOCKET_NODE_TYPE_SOCK) { |
|
if (node->read_cbk_sock) { |
|
node->read_cbk_sock(node->sock, node->user_data); |
|
} |
|
} else { |
|
if (node->read_cbk) { |
|
node->read_cbk(node->fd, node->user_data); |
|
} |
|
} |
|
} |
|
|
|
/* Write readiness - use appropriate callback based on socket type */ |
|
if (ua->poll_fds[i].revents & POLLOUT) { |
|
if (node->type == SOCKET_NODE_TYPE_SOCK) { |
|
if (node->write_cbk_sock) { |
|
node->write_cbk_sock(node->sock, node->user_data); |
|
} |
|
} else { |
|
if (node->write_cbk) { |
|
node->write_cbk(node->fd, node->user_data); |
|
} |
|
} |
|
} |
|
} |
|
} |
|
#endif |
|
|
|
/* Process timeouts that may have expired during poll or socket processing */ |
|
process_timeouts(ua); |
|
} |
|
|
|
|
|
// Put this near the top of u_async.c, after includes and before uasync_create |
|
|
|
|
|
#ifdef _WIN32 |
|
static void wakeup_read_callback_win(socket_t sock, void* arg) { |
|
(void)sock; // не нужен |
|
handle_wakeup((struct UASYNC*)arg); |
|
} |
|
#else |
|
static void wakeup_read_callback_posix(int fd, void* arg) { |
|
(void)fd; |
|
handle_wakeup((struct UASYNC*)arg); |
|
} |
|
#endif |
|
|
|
|
|
// ========== Instance management functions ========== |
|
|
|
// Modified function in u_async.c: uasync_create |
|
// Changes: Use self-connected UDP socket for wakeup on Windows instead of pipe. |
|
// This ensures the wakeup is a selectable SOCKET. |
|
|
|
struct UASYNC* uasync_create(void) { |
|
// Initialize socket platform (Winsock on Windows) |
|
socket_platform_init(); |
|
|
|
struct UASYNC* ua = u_calloc(1, sizeof(struct UASYNC)); |
|
if (!ua) return NULL; |
|
|
|
ua->timer_alloc_count = 0; |
|
ua->timer_free_count = 0; |
|
ua->socket_alloc_count = 0; |
|
ua->socket_free_count = 0; |
|
|
|
ua->poll_fds = NULL; |
|
ua->poll_fds_capacity = 0; |
|
ua->poll_fds_count = 0; |
|
ua->poll_fds_dirty = 1; |
|
|
|
ua->wakeup_pipe[0] = -1; |
|
ua->wakeup_pipe[1] = -1; |
|
ua->wakeup_initialized = 0; |
|
ua->posted_tasks_head = NULL; |
|
ua->immediate_queue_head = NULL; |
|
ua->immediate_queue_tail = NULL; |
|
|
|
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Creating SA..."); |
|
ua->sockets = socket_array_create(16); |
|
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Creating SA1..."); |
|
if (!ua->sockets) { |
|
if (ua->wakeup_initialized) { |
|
#ifdef _WIN32 |
|
closesocket((SOCKET)(intptr_t)ua->wakeup_pipe[0]); |
|
closesocket((SOCKET)(intptr_t)ua->wakeup_pipe[1]); |
|
#else |
|
close(ua->wakeup_pipe[0]); |
|
close(ua->wakeup_pipe[1]); |
|
#endif |
|
} |
|
u_free(ua); |
|
return NULL; |
|
} |
|
|
|
ua->timeout_heap = timeout_heap_create(16); |
|
if (!ua->timeout_heap) { |
|
socket_array_destroy(ua->sockets); |
|
if (ua->wakeup_initialized) { |
|
#ifdef _WIN32 |
|
closesocket((SOCKET)(intptr_t)ua->wakeup_pipe[0]); |
|
closesocket((SOCKET)(intptr_t)ua->wakeup_pipe[1]); |
|
#else |
|
close(ua->wakeup_pipe[0]); |
|
close(ua->wakeup_pipe[1]); |
|
#endif |
|
} |
|
u_free(ua); |
|
return NULL; |
|
} |
|
|
|
// Initialize timeout pool |
|
ua->timeout_pool = memory_pool_init(sizeof(struct timeout_node)); |
|
if (!ua->timeout_pool) { |
|
timeout_heap_destroy(ua->timeout_heap); |
|
socket_array_destroy(ua->sockets); |
|
if (ua->wakeup_initialized) { |
|
#ifdef _WIN32 |
|
closesocket((SOCKET)(intptr_t)ua->wakeup_pipe[0]); |
|
closesocket((SOCKET)(intptr_t)ua->wakeup_pipe[1]); |
|
#else |
|
close(ua->wakeup_pipe[0]); |
|
close(ua->wakeup_pipe[1]); |
|
#endif |
|
} |
|
u_free(ua); |
|
return NULL; |
|
} |
|
|
|
// Set callback to u_free timeout nodes and update counters |
|
timeout_heap_set_free_callback(ua->timeout_heap, ua, timeout_node_free_callback); |
|
|
|
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Creating TH1..."); |
|
// Initialize epoll on Linux |
|
ua->epoll_fd = -1; |
|
ua->use_epoll = 0; |
|
#if HAS_EPOLL |
|
ua->epoll_fd = epoll_create1(EPOLL_CLOEXEC); |
|
if (ua->epoll_fd >= 0) { |
|
ua->use_epoll = 1; |
|
DEBUG_INFO(DEBUG_CATEGORY_UASYNC, "Using epoll for socket monitoring"); |
|
// Add wakeup pipe to epoll |
|
if (ua->wakeup_initialized) { |
|
struct epoll_event ev; |
|
ev.events = EPOLLIN; |
|
ev.data.ptr = NULL; // NULL ptr indicates wakeup fd |
|
if (epoll_ctl(ua->epoll_fd, EPOLL_CTL_ADD, ua->wakeup_pipe[0], &ev) < 0) { |
|
DEBUG_WARN(DEBUG_CATEGORY_UASYNC, "Failed to add wakeup pipe to epoll: %s", strerror(errno)); |
|
} |
|
} |
|
} else { |
|
DEBUG_WARN(DEBUG_CATEGORY_UASYNC, "Failed to create epoll fd, falling back to poll: %s", strerror(errno)); |
|
} |
|
#endif |
|
|
|
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Creating TH2..."); |
|
|
|
#ifdef _WIN32 |
|
// Windows: self-connected UDP socket for wakeup |
|
SOCKET r = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); |
|
SOCKET w = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); |
|
if (r == INVALID_SOCKET || w == INVALID_SOCKET) { |
|
DEBUG_ERROR(DEBUG_CATEGORY_UASYNC, "Failed to create wakeup sockets"); |
|
u_free(ua); |
|
return NULL; |
|
} |
|
|
|
struct sockaddr_in addr = {0}; |
|
addr.sin_family = AF_INET; |
|
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK); |
|
addr.sin_port = 0; |
|
|
|
if (bind(r, (struct sockaddr*)&addr, sizeof(addr)) == SOCKET_ERROR || |
|
getsockname(r, (struct sockaddr*)&addr, &(int){sizeof(addr)}) == SOCKET_ERROR || |
|
connect(w, (struct sockaddr*)&addr, sizeof(addr)) == SOCKET_ERROR) { |
|
closesocket(r); |
|
closesocket(w); |
|
DEBUG_ERROR(DEBUG_CATEGORY_UASYNC, "Wakeup socket setup failed: %d", WSAGetLastError()); |
|
u_free(ua); |
|
return NULL; |
|
} |
|
|
|
ua->wakeup_pipe[0] = (int)(intptr_t)r; |
|
ua->wakeup_pipe[1] = (int)(intptr_t)w; |
|
ua->wakeup_initialized = 1; |
|
|
|
u_long mode = 1; |
|
ioctlsocket(r, FIONBIO, &mode); |
|
|
|
// Register the read socket with uasync |
|
uasync_add_socket_t(ua, r, wakeup_read_callback_win, NULL, NULL, ua); // ← ua как user_data |
|
// uasync_add_socket_t(ua, r, wakeup_read_callback_win, NULL, NULL, NULL); |
|
InitializeCriticalSection(&ua->posted_lock); |
|
|
|
#else |
|
// POSIX pipe |
|
if (pipe(ua->wakeup_pipe) != 0) { |
|
DEBUG_ERROR(DEBUG_CATEGORY_UASYNC, "pipe() failed: %s", strerror(errno)); |
|
u_free(ua); |
|
return NULL; |
|
} |
|
ua->wakeup_initialized = 1; |
|
|
|
fcntl(ua->wakeup_pipe[0], F_SETFL, fcntl(ua->wakeup_pipe[0], F_GETFL, 0) | O_NONBLOCK); |
|
fcntl(ua->wakeup_pipe[1], F_SETFL, fcntl(ua->wakeup_pipe[1], F_GETFL, 0) | O_NONBLOCK); |
|
|
|
if (!ua->use_epoll) { |
|
uasync_add_socket(ua, ua->wakeup_pipe[0], wakeup_read_callback_posix, NULL, NULL, ua); // ← ua |
|
} |
|
pthread_mutex_init(&ua->posted_lock, NULL); |
|
|
|
#endif |
|
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Creating TH3..."); |
|
|
|
return ua; |
|
} |
|
|
|
// Print all resources for debugging |
|
void uasync_print_resources(struct UASYNC* ua, const char* prefix) { |
|
if (!ua) { |
|
printf("%s: NULL uasync instance\n", prefix); |
|
return; |
|
} |
|
|
|
printf("\n🔍 %s: UASYNC Resource Report for %p\n", prefix, ua); |
|
printf(" Timer Statistics: allocated=%zu, u_freed=%zu, active=%zd\n", |
|
ua->timer_alloc_count, ua->timer_free_count, |
|
(ssize_t)(ua->timer_alloc_count - ua->timer_free_count)); |
|
printf(" Socket Statistics: allocated=%zu, u_freed=%zu, active=%zd\n", |
|
ua->socket_alloc_count, ua->socket_free_count, |
|
(ssize_t)(ua->socket_alloc_count - ua->socket_free_count)); |
|
|
|
// Показать активные таймеры |
|
if (ua->timeout_heap) { |
|
size_t active_timers = 0; |
|
// Безопасное чтение без извлечения - просто итерируем по массиву |
|
for (size_t i = 0; i < ua->timeout_heap->size; i++) { |
|
if (!ua->timeout_heap->heap[i].deleted) { |
|
active_timers++; |
|
struct timeout_node* node = (struct timeout_node*)ua->timeout_heap->heap[i].data; |
|
printf(" Timer: node=%p, expires=%llu ms\n", |
|
node, (unsigned long long)ua->timeout_heap->heap[i].expiration); |
|
} |
|
} |
|
printf(" Active timers in heap: %zu\n", active_timers); |
|
} |
|
|
|
// Показать активные сокеты |
|
if (ua->sockets) { |
|
int active_sockets = 0; |
|
printf(" Socket array capacity: %d, active: %d\n", |
|
ua->sockets->capacity, ua->sockets->count); |
|
for (int i = 0; i < ua->sockets->capacity; i++) { |
|
if (ua->sockets->sockets[i].active) { |
|
active_sockets++; |
|
printf(" Socket: fd=%d, active=%d\n", |
|
ua->sockets->sockets[i].fd, |
|
ua->sockets->sockets[i].active); |
|
} |
|
} |
|
printf(" Total active sockets: %d\n", active_sockets); |
|
} |
|
|
|
printf("🔚 %s: End of resource report\n\n", prefix); |
|
} |
|
|
|
// Modified function in u_async.c: uasync_destroy |
|
// Changes: Close wakeup sockets properly on Windows. |
|
|
|
void uasync_destroy(struct UASYNC* ua, int close_fds) { |
|
if (!ua) return; |
|
|
|
DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_destroy: starting cleanup for ua=%p", ua); |
|
|
|
// Диагностика ресурсов перед очисткой |
|
uasync_print_resources(ua, "BEFORE_DESTROY"); |
|
|
|
// Check for potential memory leaks |
|
if (ua->timer_alloc_count != ua->timer_free_count || ua->socket_alloc_count != ua->socket_free_count) { |
|
DEBUG_ERROR(DEBUG_CATEGORY_MEMORY, "Memory leaks detected before cleanup: timers %zu/%zu, sockets %zu/%zu", |
|
ua->timer_alloc_count, ua->timer_free_count, ua->socket_alloc_count, ua->socket_free_count); |
|
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "Timer leak: allocated=%zu, u_freed=%zu, diff=%zd", |
|
ua->timer_alloc_count, ua->timer_free_count, |
|
(ssize_t)(ua->timer_alloc_count - ua->timer_free_count)); |
|
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "Socket leak: allocated=%zu, u_freed=%zu, diff=%zd", |
|
ua->socket_alloc_count, ua->socket_free_count, |
|
(ssize_t)(ua->socket_alloc_count - ua->socket_free_count)); |
|
// Continue cleanup, will abort after if leaks remain |
|
} |
|
|
|
// Free all remaining timeouts |
|
|
|
// Очистить immediate_queue |
|
while (ua->immediate_queue_head) { |
|
struct timeout_node* node = ua->immediate_queue_head; |
|
ua->immediate_queue_head = node->next; |
|
if (node) { |
|
node->ua->timer_free_count++; |
|
memory_pool_free(ua->timeout_pool, node); |
|
} |
|
} |
|
ua->immediate_queue_tail = NULL; |
|
|
|
// Очистить heap |
|
if (ua->timeout_heap) { |
|
size_t u_freed_count = 0; |
|
while (1) { |
|
TimeoutEntry entry; |
|
if (timeout_heap_pop(ua->timeout_heap, &entry) != 0) break; |
|
struct timeout_node* node = (struct timeout_node*)entry.data; |
|
|
|
// Free all timer nodes (avoid double-u_free bug) |
|
if (node) { |
|
ua->timer_free_count++; |
|
memory_pool_free(ua->timeout_pool, node); |
|
} |
|
} |
|
timeout_heap_destroy(ua->timeout_heap); |
|
ua->timeout_heap = NULL; |
|
} |
|
|
|
// Destroy timeout pool |
|
if (ua->timeout_pool) { |
|
memory_pool_destroy(ua->timeout_pool); |
|
ua->timeout_pool = NULL; |
|
} |
|
|
|
// Free all socket nodes using array approach |
|
if (ua->sockets) { |
|
// Count and u_free all active sockets |
|
int u_freed_count = 0; |
|
for (int i = 0; i < ua->sockets->capacity; i++) { |
|
if (ua->sockets->sockets[i].active) { |
|
if (close_fds && ua->sockets->sockets[i].fd >= 0) { |
|
#ifdef _WIN32 |
|
if (ua->sockets->sockets[i].type == SOCKET_NODE_TYPE_SOCK) { |
|
closesocket(ua->sockets->sockets[i].sock); |
|
} else { |
|
close(ua->sockets->sockets[i].fd); // For pipes/FDs |
|
} |
|
#else |
|
close(ua->sockets->sockets[i].fd); |
|
#endif |
|
} |
|
ua->socket_free_count++; |
|
u_freed_count++; |
|
} |
|
} |
|
DEBUG_DEBUG(DEBUG_CATEGORY_MEMORY, "Freed %d socket nodes in destroy", u_freed_count); |
|
socket_array_destroy(ua->sockets); |
|
} |
|
|
|
// Close wakeup pipe/sockets |
|
if (ua->wakeup_initialized) { |
|
#ifdef _WIN32 |
|
closesocket((SOCKET)(intptr_t)ua->wakeup_pipe[0]); |
|
closesocket((SOCKET)(intptr_t)ua->wakeup_pipe[1]); |
|
#else |
|
close(ua->wakeup_pipe[0]); |
|
close(ua->wakeup_pipe[1]); |
|
#endif |
|
} |
|
|
|
// Free cached poll_fds |
|
u_free(ua->poll_fds); |
|
|
|
// Close epoll fd on Linux |
|
#if HAS_EPOLL |
|
if (ua->epoll_fd >= 0) { |
|
close(ua->epoll_fd); |
|
} |
|
#endif |
|
|
|
// Final leak check |
|
if (ua->timer_alloc_count != ua->timer_free_count || ua->socket_alloc_count != ua->socket_free_count) { |
|
DEBUG_ERROR(DEBUG_CATEGORY_MEMORY, "Memory leaks detected after cleanup: timers %zu/%zu, sockets %zu/%zu", |
|
ua->timer_alloc_count, ua->timer_free_count, ua->socket_alloc_count, ua->socket_free_count); |
|
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "FINAL Timer leak: allocated=%zu, u_freed=%zu, diff=%zd", |
|
ua->timer_alloc_count, ua->timer_free_count, |
|
(ssize_t)(ua->timer_alloc_count - ua->timer_free_count)); |
|
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "FINAL Socket leak: allocated=%zu, u_freed=%zu, diff=%zd", |
|
ua->socket_alloc_count, ua->socket_free_count, |
|
(ssize_t)(ua->socket_alloc_count - ua->socket_free_count)); |
|
abort(); |
|
} |
|
|
|
DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_destroy: completed successfully for ua=%p", ua); |
|
|
|
while (ua->posted_tasks_head) { |
|
struct posted_task* t = ua->posted_tasks_head; |
|
ua->posted_tasks_head = t->next; |
|
u_free(t); |
|
} |
|
#ifdef _WIN32 |
|
DeleteCriticalSection(&ua->posted_lock); |
|
#else |
|
pthread_mutex_destroy(&ua->posted_lock); |
|
#endif |
|
|
|
u_free(ua); |
|
|
|
// Cleanup socket platform (WSACleanup on Windows) |
|
socket_platform_cleanup(); |
|
} |
|
|
|
void uasync_init_instance(struct UASYNC* ua) { |
|
if (!ua) return; |
|
|
|
// Initialize socket array if not present |
|
if (!ua->sockets) { |
|
ua->sockets = socket_array_create(16); |
|
} |
|
|
|
if (!ua->timeout_pool) { |
|
ua->timeout_pool = memory_pool_init(sizeof(struct timeout_node)); |
|
} |
|
|
|
if (!ua->immediate_queue_head) { |
|
ua->immediate_queue_head = NULL; |
|
ua->immediate_queue_tail = NULL; |
|
} |
|
|
|
if (!ua->timeout_heap) { |
|
ua->timeout_heap = timeout_heap_create(16); |
|
if (ua->timeout_heap) { |
|
timeout_heap_set_free_callback(ua->timeout_heap, ua, timeout_node_free_callback); |
|
} |
|
} |
|
} |
|
|
|
// Debug statistics |
|
void uasync_get_stats(struct UASYNC* ua, size_t* timer_alloc, size_t* timer_u_free, size_t* socket_alloc, size_t* socket_u_free) { |
|
if (!ua) return; |
|
if (timer_alloc) *timer_alloc = ua->timer_alloc_count; |
|
if (timer_u_free) *timer_u_free = ua->timer_free_count; |
|
if (socket_alloc) *socket_alloc = ua->socket_alloc_count; |
|
if (socket_u_free) *socket_u_free = ua->socket_free_count; |
|
} |
|
|
|
void uasync_memsync(struct UASYNC* ua) { |
|
#ifdef _WIN32 |
|
EnterCriticalSection(&ua->posted_lock); |
|
#else |
|
pthread_mutex_lock(&ua->posted_lock); |
|
#endif |
|
#ifdef _WIN32 |
|
LeaveCriticalSection(&ua->posted_lock); |
|
#else |
|
pthread_mutex_unlock(&ua->posted_lock); |
|
#endif |
|
} |
|
|
|
void uasync_post(struct UASYNC* ua, uasync_post_callback_t callback, void* arg) { |
|
if (!ua || !callback) return; |
|
|
|
struct posted_task* task = u_malloc(sizeof(struct posted_task)); |
|
if (!task) return; |
|
|
|
task->callback = callback; |
|
task->arg = arg; |
|
task->next = NULL; |
|
|
|
#ifdef _WIN32 |
|
EnterCriticalSection(&ua->posted_lock); |
|
#else |
|
pthread_mutex_lock(&ua->posted_lock); |
|
#endif |
|
|
|
if (ua->posted_tasks_tail) { |
|
// есть конец списка — добавляем туда |
|
ua->posted_tasks_tail->next = task; |
|
ua->posted_tasks_tail = task; |
|
} else { |
|
// список пустой — это первая задача |
|
ua->posted_tasks_head = ua->posted_tasks_tail = task; |
|
} |
|
|
|
#ifdef _WIN32 |
|
LeaveCriticalSection(&ua->posted_lock); |
|
#else |
|
pthread_mutex_unlock(&ua->posted_lock); |
|
#endif |
|
|
|
uasync_wakeup(ua); // будим mainloop |
|
} |
|
|
|
// Wakeup mechanism |
|
int uasync_wakeup(struct UASYNC* ua) { |
|
if (!ua || !ua->wakeup_initialized) return -1; |
|
|
|
char byte = 0; |
|
#ifdef _WIN32 |
|
int ret = send((SOCKET)(intptr_t)ua->wakeup_pipe[1], &byte, 1, 0); |
|
#else |
|
ssize_t ret = write(ua->wakeup_pipe[1], &byte, 1); |
|
#endif |
|
if (ret != 1) { |
|
// Don't print error from signal handler |
|
return -1; |
|
} |
|
return 0; |
|
} |
|
|
|
int uasync_get_wakeup_fd(struct UASYNC* ua) { |
|
if (!ua || !ua->wakeup_initialized) return -1; |
|
return ua->wakeup_pipe[1]; |
|
} |
|
|
|
/* Lookup socket by file descriptor - returns current pointer even after u_realloc */ |
|
int uasync_lookup_socket(struct UASYNC* ua, int fd, void** socket_id) { |
|
if (!ua || !ua->sockets || !socket_id || fd < 0 || fd >= FD_SETSIZE) { |
|
return -1; |
|
} |
|
|
|
*socket_id = socket_array_get(ua->sockets, fd); |
|
return (*socket_id != NULL) ? 0 : -1; |
|
} |
|
|
|
void uasync_mainloop(struct UASYNC* ua) { |
|
while (1) { |
|
uasync_poll(ua, -1); // Infinite wait |
|
} |
|
}
|
|
|