You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1632 lines
54 KiB

// uasync.c
#include "u_async.h"
#include "platform_compat.h"
#include "debug_config.h"
#include "mem.h"
#include "memory_pool.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <errno.h>
#include <limits.h>
#include <pthread.h>
#include <unistd.h>
#include "../lib/platform_compat.h"
//#ifdef _WIN32
//#include <windows.h>
//#else
//#include <sys/time.h>
//#endif
// Platform-specific includes
#ifdef __linux__
#include <sys/epoll.h>
#define HAS_EPOLL 1
#else
#define HAS_EPOLL 0
#endif
// Timeout node with safe cancellation
struct timeout_node {
void* arg;
timeout_callback_t callback;
uint64_t expiration_ms; // absolute expiration time in milliseconds
struct UASYNC* ua; // Pointer back to uasync instance for counter updates
struct timeout_node* next; // For immediate queue (FIFO)
};
// Socket node with array-based storage
struct socket_node {
int fd; // File descriptor (for pipe, file)
socket_t sock; // Socket (for cross-platform sockets)
int type; // SOCKET_NODE_TYPE_FD or SOCKET_NODE_TYPE_SOCK
socket_callback_t read_cbk; // For FD type
socket_callback_t write_cbk; // For FD type
socket_t_callback_t read_cbk_sock; // For SOCK type
socket_t_callback_t write_cbk_sock; // For SOCK type
socket_callback_t except_cbk;
void* user_data;
int active; // 1 if socket is active, 0 if u_freed (for reuse)
};
// Array-based socket management for O(1) operations
struct socket_array {
struct socket_node* sockets; // Dynamic array of socket nodes
int* fd_to_index; // FD to array index mapping
int* index_to_fd; // Array index to FD mapping
int* active_indices; // Array of indices of active sockets (for O(1) traversal)
int capacity; // Total allocated capacity
int count; // Number of active sockets
int max_fd; // Maximum FD for bounds checking
};
static struct socket_array* socket_array_create(int initial_capacity);
static void socket_array_destroy(struct socket_array* sa);
static int socket_array_add(struct socket_array* sa, int fd, socket_callback_t read_cbk, socket_callback_t write_cbk, socket_callback_t except_cbk, void* user_data);
static int socket_array_remove(struct socket_array* sa, int fd);
static struct socket_node* socket_array_get(struct socket_array* sa, int fd);
// No global instance - each module must use its own struct UASYNC instance
// Array-based socket management implementation
static struct socket_array* socket_array_create(int initial_capacity) {
if (initial_capacity < 4) initial_capacity = 4; // Minimum capacity
struct socket_array* sa = u_malloc(sizeof(struct socket_array));
if (!sa) return NULL;
sa->sockets = u_calloc(initial_capacity, sizeof(struct socket_node));
sa->fd_to_index = u_calloc(initial_capacity, sizeof(int));
sa->index_to_fd = u_calloc(initial_capacity, sizeof(int));
sa->active_indices = u_calloc(initial_capacity, sizeof(int));
if (!sa->sockets || !sa->fd_to_index || !sa->index_to_fd || !sa->active_indices) {
u_free(sa->sockets);
u_free(sa->fd_to_index);
u_free(sa->index_to_fd);
u_free(sa->active_indices);
u_free(sa);
return NULL;
}
// Initialize mapping arrays to -1 (invalid)
for (int i = 0; i < initial_capacity; i++) {
sa->fd_to_index[i] = -1;
sa->index_to_fd[i] = -1;
sa->active_indices[i] = -1;
sa->sockets[i].fd = -1;
sa->sockets[i].active = 0;
}
sa->capacity = initial_capacity;
sa->count = 0;
sa->max_fd = -1;
return sa;
}
static void socket_array_destroy(struct socket_array* sa) {
if (!sa) return;
u_free(sa->sockets);
u_free(sa->fd_to_index);
u_free(sa->index_to_fd);
u_free(sa->active_indices);
u_free(sa);
}
static int socket_array_add_internal(struct socket_array* sa, int fd, socket_t sock, int type,
socket_callback_t read_cbk_fd, socket_callback_t write_cbk_fd,
socket_t_callback_t read_cbk_sock, socket_t_callback_t write_cbk_sock,
socket_callback_t except_cbk, void* user_data) {
if (!sa || fd < 0) return -1;
// FD_SETSIZE check only for POSIX systems - Windows sockets can have any value
#ifndef _WIN32
if (fd >= FD_SETSIZE) return -1;
#endif
if (fd >= sa->capacity) {
// Need to resize - double the capacity
int new_capacity = sa->capacity * 2;
if (fd >= new_capacity) new_capacity = fd + 16; // Ensure enough space
struct socket_node* new_sockets = u_realloc(sa->sockets, new_capacity * sizeof(struct socket_node));
int* new_fd_to_index = u_realloc(sa->fd_to_index, new_capacity * sizeof(int));
int* new_index_to_fd = u_realloc(sa->index_to_fd, new_capacity * sizeof(int));
int* new_active_indices = u_realloc(sa->active_indices, new_capacity * sizeof(int));
if (!new_sockets || !new_fd_to_index || !new_index_to_fd || !new_active_indices) {
// Allocation failed
u_free(new_sockets);
u_free(new_fd_to_index);
u_free(new_index_to_fd);
u_free(new_active_indices);
return -1;
}
// Initialize new elements
for (int i = sa->capacity; i < new_capacity; i++) {
new_fd_to_index[i] = -1;
new_index_to_fd[i] = -1;
new_active_indices[i] = -1;
new_sockets[i].fd = -1;
new_sockets[i].active = 0;
}
sa->sockets = new_sockets;
sa->fd_to_index = new_fd_to_index;
sa->index_to_fd = new_index_to_fd;
sa->active_indices = new_active_indices;
sa->capacity = new_capacity;
}
// Check if FD already exists
if (sa->fd_to_index[fd] != -1) return -1; // FD already exists
// Find first u_free slot
int index = -1;
for (int i = 0; i < sa->capacity; i++) {
if (!sa->sockets[i].active) {
index = i;
break;
}
}
if (index == -1) return -1; // No u_free slots (shouldn't happen)
// Add the socket
sa->sockets[index].fd = fd;
sa->sockets[index].sock = sock;
sa->sockets[index].type = type;
sa->sockets[index].read_cbk = read_cbk_fd;
sa->sockets[index].write_cbk = write_cbk_fd;
sa->sockets[index].read_cbk_sock = read_cbk_sock;
sa->sockets[index].write_cbk_sock = write_cbk_sock;
sa->sockets[index].except_cbk = except_cbk;
sa->sockets[index].user_data = user_data;
sa->sockets[index].active = 1;
sa->fd_to_index[fd] = index;
sa->index_to_fd[index] = fd;
sa->active_indices[sa->count] = index; // Add to active list
sa->count++;
if (fd > sa->max_fd) sa->max_fd = fd;
return index;
}
// Wrapper for adding regular file descriptors (pipe, file)
static int socket_array_add(struct socket_array* sa, int fd, socket_callback_t read_cbk,
socket_callback_t write_cbk, socket_callback_t except_cbk, void* user_data) {
return socket_array_add_internal(sa, fd, SOCKET_INVALID, SOCKET_NODE_TYPE_FD,
read_cbk, write_cbk, NULL, NULL, except_cbk, user_data);
}
// Wrapper for adding socket_t (cross-platform sockets)
static int socket_array_add_socket_t(struct socket_array* sa, socket_t sock, socket_t_callback_t read_cbk,
socket_t_callback_t write_cbk, socket_callback_t except_cbk, void* user_data) {
// On Windows, SOCKET is UINT_PTR, so we need to handle indexing differently
#ifdef _WIN32
int fd = (int)(intptr_t)sock; // Use socket value as index on Windows (simplified)
if (fd < 0) return -1; // Windows sockets can have any value, only check negative
#else
int fd = sock; // On POSIX, socket_t is int
if (fd < 0 || fd >= FD_SETSIZE) return -1;
#endif
return socket_array_add_internal(sa, fd, sock, SOCKET_NODE_TYPE_SOCK,
NULL, NULL, read_cbk, write_cbk, except_cbk, user_data);
}
static int socket_array_remove(struct socket_array* sa, int fd) {
if (!sa || fd < 0 || fd >= sa->capacity) return -1;
int index = sa->fd_to_index[fd];
if (index == -1 || !sa->sockets[index].active) return -1; // FD not found
// Mark as inactive
sa->sockets[index].active = 0;
sa->sockets[index].fd = -1;
sa->sockets[index].sock = SOCKET_INVALID;
sa->sockets[index].type = SOCKET_NODE_TYPE_FD;
sa->fd_to_index[fd] = -1;
sa->index_to_fd[index] = -1;
// Remove from active_indices by swapping with last element
// Find position in active_indices
for (int i = 0; i < sa->count; i++) {
if (sa->active_indices[i] == index) {
// Swap with last element
sa->active_indices[i] = sa->active_indices[sa->count - 1];
sa->active_indices[sa->count - 1] = -1;
break;
}
}
sa->count--;
return 0;
}
static struct socket_node* socket_array_get(struct socket_array* sa, int fd) {
if (!sa || fd < 0 || fd >= sa->capacity) return NULL;
int index = sa->fd_to_index[fd];
if (index == -1 || !sa->sockets[index].active) return NULL;
return &sa->sockets[index];
}
// Get socket_node by socket_t
static struct socket_node* socket_array_get_by_sock(struct socket_array* sa, socket_t sock) {
if (!sa) return NULL;
#ifdef _WIN32
int fd = (int)(intptr_t)sock;
#else
int fd = sock;
#endif
if (fd < 0 || fd >= sa->capacity) return NULL;
int index = sa->fd_to_index[fd];
if (index == -1 || !sa->sockets[index].active) return NULL;
if (sa->sockets[index].type != SOCKET_NODE_TYPE_SOCK) return NULL;
return &sa->sockets[index];
}
// Callback to u_free timeout node and update counters
static void timeout_node_free_callback(void* user_data, void* data) {
struct UASYNC* ua = (struct UASYNC*)user_data;
struct timeout_node* node = (struct timeout_node*)data;
(void)node; // Not used directly, but keep for consistency
ua->timer_free_count++;
memory_pool_free(ua->timeout_pool, data);
}
// Helper to get current time
static void get_current_time(struct timeval* tv) {
#ifdef _WIN32
// Для Windows используем GetTickCount или другие механизмы
FILETIME ft;
GetSystemTimeAsFileTime(&ft);
ULARGE_INTEGER ul;
ul.LowPart = ft.dwLowDateTime;
ul.HighPart = ft.dwHighDateTime;
// Конвертируем 100-наносекундные интервалы в секунды и микросекунды
tv->tv_sec = (long)((ul.QuadPart - 116444736000000000ULL) / 10000000ULL);
tv->tv_usec = (long)((ul.QuadPart % 10000000ULL) / 10);
#else
// Для Linux и других Unix-подобных систем используем gettimeofday
gettimeofday(tv, NULL);
#endif
}
#ifdef _WIN32
uint64_t get_time_tb(void) {
LARGE_INTEGER freq, count;
QueryPerformanceFrequency(&freq);
QueryPerformanceCounter(&count);
double t = (double)count.QuadPart * 10000.0 / (double)freq.QuadPart;
return (uint64_t)t;
}
//uint64_t get_time_tb(void) {
// LARGE_INTEGER freq, count;
// QueryPerformanceFrequency(&freq); // Получаем частоту таймера
// QueryPerformanceCounter(&count); // Получаем текущее значение счётчика
// return (uint64_t)(count.QuadPart * 10000ULL) / (uint64_t)freq.QuadPart; // Преобразуем в требуемые единицы времени
//}
#else
uint64_t get_time_tb(void) {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (uint64_t)ts.tv_sec * 10000ULL + (uint64_t)ts.tv_nsec / 100000ULL; // Преобразуем в требуемые единицы времени
}
#endif
// Drain wakeup pipe - read all available bytes
static void drain_wakeup_pipe(struct UASYNC* ua) {
if (!ua || !ua->wakeup_initialized) return;
char buf[64];
while (1) {
ssize_t n = read(ua->wakeup_pipe[0], buf, sizeof(buf));
if (n <= 0) break;
}
}
// Process posted tasks (lock-u_free during execution)
static void process_posted_tasks(struct UASYNC* ua) {
if (!ua) return;
struct posted_task* list = NULL;
#ifdef _WIN32
EnterCriticalSection(&ua->posted_lock);
#else
pthread_mutex_lock(&ua->posted_lock);
#endif
list = ua->posted_tasks_head;
ua->posted_tasks_head = ua->posted_tasks_tail = NULL;
#ifdef _WIN32
LeaveCriticalSection(&ua->posted_lock);
#else
pthread_mutex_unlock(&ua->posted_lock);
#endif
while (list) {
DEBUG_DEBUG(DEBUG_CATEGORY_TUN, "POSTed task get");
struct posted_task* t = list;
list = list->next;
if (t->callback) {
t->callback(t->arg);
}
u_free(t);
}
}
// Unified wakeup handler (drain + execute posted callbacks)
static void handle_wakeup(struct UASYNC* ua) {
if (!ua || !ua->wakeup_initialized) return;
// Drain the wakeup pipe/socket
#ifdef _WIN32
char buf[64];
SOCKET s = (SOCKET)(intptr_t)ua->wakeup_pipe[0];
while (recv(s, buf, sizeof(buf), 0) > 0) {}
#else
char buf[64];
while (read(ua->wakeup_pipe[0], buf, sizeof(buf)) > 0) {}
#endif
// DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "POST: wakeup process");
// Execute all posted callbacks (in main thread)
process_posted_tasks(ua);
}
// Helper to add timeval: tv += dt (timebase units)
static void timeval_add_tb(struct timeval* tv, int dt) {
tv->tv_usec += (dt % 10000) * 100;
tv->tv_sec += dt / 10000 + tv->tv_usec / 1000000;
tv->tv_usec %= 1000000;
}
// Convert timeval to milliseconds (uint64_t)
static uint64_t timeval_to_ms(const struct timeval* tv) {
return (uint64_t)tv->tv_sec * 1000ULL + (uint64_t)tv->tv_usec / 1000ULL;
}
// Simplified timeout handling without reference counting
// Process expired timeouts with safe cancellation
static void process_timeouts(struct UASYNC* ua) {
if (!ua) return;
// Сначала обрабатываем immediate_queue (FIFO)
while (ua->immediate_queue_head) {
struct timeout_node* node = ua->immediate_queue_head;
ua->immediate_queue_head = node->next;
if (!ua->immediate_queue_head) {
ua->immediate_queue_tail = NULL;
}
if (node && node->callback) {
node->callback(node->arg);
}
if (node && node->ua) {
node->ua->timer_free_count++;
}
memory_pool_free(ua->timeout_pool, node);
}
if (!ua->timeout_heap) return;
struct timeval now_tv;
get_current_time(&now_tv);
uint64_t now_ms = timeval_to_ms(&now_tv);
while (1) {
TimeoutEntry entry;
if (timeout_heap_peek(ua->timeout_heap, &entry) != 0) break;
if (entry.expiration > now_ms) break;
// Pop the expired timeout
timeout_heap_pop(ua->timeout_heap, &entry);
struct timeout_node* node = (struct timeout_node*)entry.data;
if (node && node->callback) {
// Execute callback only if not cancelled
node->callback(node->arg);
}
// Always u_free the node after processing
if (node && node->ua) {
node->ua->timer_free_count++;
}
memory_pool_free(ua->timeout_pool, node);
continue; // Process next expired timeout
}
}
// Compute time to next timeout
static void get_next_timeout(struct UASYNC* ua, struct timeval* tv) {
if (!ua || !ua->timeout_heap) {
tv->tv_sec = 0;
tv->tv_usec = 0;
return;
}
TimeoutEntry entry;
if (timeout_heap_peek(ua->timeout_heap, &entry) != 0) {
tv->tv_sec = 0;
tv->tv_usec = 0;
return;
}
struct timeval now_tv;
get_current_time(&now_tv);
uint64_t now_ms = timeval_to_ms(&now_tv);
if (entry.expiration <= now_ms) {
tv->tv_sec = 0;
tv->tv_usec = 0;
return;
}
uint64_t delta_ms = entry.expiration - now_ms;
tv->tv_sec = delta_ms / 1000;
tv->tv_usec = (delta_ms % 1000) * 1000;
}
// Instance version
void* uasync_set_timeout(struct UASYNC* ua, int timeout_tb, void* arg, timeout_callback_t callback) {
if (!ua || timeout_tb < 0 || !callback) return NULL;
if (!ua->timeout_heap) return NULL;
// DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_set_timeout: timeout=%d.%d ms, arg=%p, callback=%p", timeout_tb/10, timeout_tb%10, arg, callback);
struct timeout_node* node = memory_pool_alloc(ua->timeout_pool);
if (!node) {
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "uasync_set_timeout: failed to allocate node");
return NULL;
}
ua->timer_alloc_count++;
node->arg = arg;
node->callback = callback;
node->ua = ua;
// Calculate expiration time in milliseconds
struct timeval now;
get_current_time(&now);
timeval_add_tb(&now, timeout_tb);
node->expiration_ms = timeval_to_ms(&now);
// Add to heap
if (timeout_heap_push(ua->timeout_heap, node->expiration_ms, node) != 0) {
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "uasync_set_timeout: failed to push to heap");
memory_pool_free(ua->timeout_pool, node);
ua->timer_free_count++; // Balance the alloc counter
return NULL;
}
return node;
}
// Immediate execution in next mainloop (FIFO order)
void* uasync_call_soon(struct UASYNC* ua, void* user_arg, timeout_callback_t callback) {
if (!ua || !callback) return NULL;
if (!ua->timeout_pool) return NULL;
struct timeout_node* node = memory_pool_alloc(ua->timeout_pool);
if (!node) {
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "uasync_call_soon: failed to allocate node");
return NULL;
}
ua->timer_alloc_count++;
node->arg = user_arg;
node->callback = callback;
node->ua = ua;
node->expiration_ms = 0;
node->next = NULL;
// FIFO: добавляем в конец очереди
if (ua->immediate_queue_tail) {
ua->immediate_queue_tail->next = node;
ua->immediate_queue_tail = node;
} else {
ua->immediate_queue_head = ua->immediate_queue_tail = node;
}
return node;
}
// Cancel immediate callback by setting callback to NULL - O(1)
err_t uasync_call_soon_cancel(struct UASYNC* ua, void* t_id) {
if (!ua || !t_id) return ERR_FAIL;
struct timeout_node* node = (struct timeout_node*)t_id;
if (node->ua != ua) return ERR_FAIL;
// Simply nullify callback - will be skipped in process_timeouts
node->callback = NULL;
return ERR_OK;
}
// Instance version
err_t uasync_cancel_timeout(struct UASYNC* ua, void* t_id) {
if (!ua || !t_id || !ua->timeout_heap) {
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "uasync_cancel_timeout: invalid parameters ua=%p, t_id=%p, heap=%p",
ua, t_id, ua ? ua->timeout_heap : NULL);
return ERR_FAIL;
}
struct timeout_node* node = (struct timeout_node*)t_id;
// Try to cancel from heap first
if (timeout_heap_cancel(ua->timeout_heap, node->expiration_ms, node) == 0) {
// Successfully marked as deleted - u_free will happen lazily in heap
node->callback = NULL;
// DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_cancel_timeout: successfully cancelled timer %p from heap", node);
return ERR_OK;
}
DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_cancel_timeout: not found in heap: ua=%p, t_id=%p, node=%p, expires=%llu ms",
ua, t_id, node, (unsigned long long)node->expiration_ms);
// If not found in heap, it may have already expired or been invalid
return ERR_FAIL;
}
// Instance version
void* uasync_add_socket(struct UASYNC* ua, int fd, socket_callback_t read_cbk, socket_callback_t write_cbk, socket_callback_t except_cbk, void* user_data) {
if (!ua || fd < 0) return NULL;
// FD_SETSIZE check only for POSIX systems - Windows sockets can have any value
#ifndef _WIN32
if (fd >= FD_SETSIZE) return NULL;
#endif
int index = socket_array_add(ua->sockets, fd, read_cbk, write_cbk, except_cbk, user_data);
if (index < 0) return NULL;
ua->socket_alloc_count++;
ua->poll_fds_dirty = 1; // Mark poll_fds as needing rebuild
#if HAS_EPOLL
// Add to epoll if using epoll
if (ua->use_epoll && ua->epoll_fd >= 0) {
struct epoll_event ev;
ev.events = 0;
if (read_cbk) ev.events |= EPOLLIN;
if (write_cbk) ev.events |= EPOLLOUT;
// Use level-triggered mode (default) for compatibility with UDP sockets
ev.data.ptr = &ua->sockets->sockets[index];
if (epoll_ctl(ua->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) {
// Failed to add to epoll - remove from socket array and return error
socket_array_remove(ua->sockets, fd);
ua->socket_alloc_count--;
return NULL;
}
}
#endif
// Return pointer to the socket node as ID
return &ua->sockets->sockets[index];
}
err_t uasync_remove_socket(struct UASYNC* ua, void* s_id) {
if (!ua || !s_id) return ERR_FAIL;
struct socket_node* node = (struct socket_node*)s_id;
if (!node->active || node->fd < 0) return ERR_FAIL;
int fd = node->fd;
#if HAS_EPOLL
// Remove from epoll if using epoll
if (ua->use_epoll && ua->epoll_fd >= 0) {
epoll_ctl(ua->epoll_fd, EPOLL_CTL_DEL, fd, NULL);
}
#endif
int ret = socket_array_remove(ua->sockets, fd);
if (ret == 0) {
ua->socket_free_count++;
ua->poll_fds_dirty = 1; // Mark poll_fds as needing rebuild
return ERR_OK;
}
return ERR_FAIL;
}
// Add socket_t (cross-platform socket)
void* uasync_add_socket_t(struct UASYNC* ua, socket_t sock, socket_t_callback_t read_cbk,
socket_t_callback_t write_cbk, socket_t_callback_t except_cbk, void* user_data) {
if (!ua || sock == SOCKET_INVALID) return NULL;
int index = socket_array_add_socket_t(ua->sockets, sock, read_cbk, write_cbk,
(socket_callback_t)except_cbk, user_data);
if (index < 0) return NULL;
ua->socket_alloc_count++;
ua->poll_fds_dirty = 1;
#if HAS_EPOLL
if (ua->use_epoll && ua->epoll_fd >= 0) {
struct epoll_event ev;
ev.events = 0;
if (read_cbk) ev.events |= EPOLLIN;
if (write_cbk) ev.events |= EPOLLOUT;
ev.data.ptr = &ua->sockets->sockets[index];
// On Windows, need to cast socket_t to int for epoll_ctl
#ifdef _WIN32
int fd = (int)(intptr_t)sock;
#else
int fd = sock;
#endif
if (epoll_ctl(ua->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) {
socket_array_remove(ua->sockets, fd);
ua->socket_alloc_count--;
return NULL;
}
}
#endif
return &ua->sockets->sockets[index];
}
// Remove socket by socket_t
err_t uasync_remove_socket_t(struct UASYNC* ua, socket_t sock) {
if (!ua || sock == SOCKET_INVALID) return ERR_FAIL;
struct socket_node* node = socket_array_get_by_sock(ua->sockets, sock);
if (!node || !node->active) return ERR_FAIL;
#if HAS_EPOLL
if (ua->use_epoll && ua->epoll_fd >= 0) {
#ifdef _WIN32
int fd = (int)(intptr_t)sock;
#else
int fd = sock;
#endif
epoll_ctl(ua->epoll_fd, EPOLL_CTL_DEL, fd, NULL);
}
#endif
#ifdef _WIN32
int fd = (int)(intptr_t)sock;
#else
int fd = sock;
#endif
int ret = socket_array_remove(ua->sockets, fd);
if (ret == 0) {
ua->socket_free_count++;
ua->poll_fds_dirty = 1;
return ERR_OK;
}
return ERR_FAIL;
}
// Helper function to rebuild cached pollfd array
static void rebuild_poll_fds(struct UASYNC* ua) {
if (!ua || !ua->sockets) return;
int socket_count = ua->sockets->count;
int wakeup_fd_present = ua->wakeup_initialized && ua->wakeup_pipe[0] >= 0;
int total_fds = socket_count + wakeup_fd_present;
// Ensure poll_fds capacity is sufficient
if (total_fds > ua->poll_fds_capacity) {
int new_capacity = total_fds * 2;
if (new_capacity < 16) new_capacity = 16;
struct pollfd* new_poll_fds = u_realloc(ua->poll_fds, sizeof(struct pollfd) * new_capacity);
if (!new_poll_fds) return; // Keep old allocation on failure
ua->poll_fds = new_poll_fds;
ua->poll_fds_capacity = new_capacity;
}
int idx = 0;
// Add wakeup fd first if present
if (wakeup_fd_present) {
ua->poll_fds[idx].fd = ua->wakeup_pipe[0];
ua->poll_fds[idx].events = POLLIN;
ua->poll_fds[idx].revents = 0;
idx++;
}
// Add socket fds using active_indices for O(1) traversal
for (int i = 0; i < socket_count; i++) {
int socket_array_idx = ua->sockets->active_indices[i];
struct socket_node* cur = &ua->sockets->sockets[socket_array_idx];
// Handle socket_t vs int fd
if (cur->type == SOCKET_NODE_TYPE_SOCK) {
// socket_t - cast to int for pollfd
#ifdef _WIN32
ua->poll_fds[idx].fd = (int)(intptr_t)cur->sock;
#else
ua->poll_fds[idx].fd = cur->sock;
#endif
} else {
// Regular fd
ua->poll_fds[idx].fd = cur->fd;
}
ua->poll_fds[idx].events = 0;
ua->poll_fds[idx].revents = 0;
if (cur->type == SOCKET_NODE_TYPE_SOCK) {
if (cur->read_cbk_sock) ua->poll_fds[idx].events |= POLLIN;
if (cur->write_cbk_sock) ua->poll_fds[idx].events |= POLLOUT;
} else {
if (cur->read_cbk) ua->poll_fds[idx].events |= POLLIN;
if (cur->write_cbk) ua->poll_fds[idx].events |= POLLOUT;
}
if (cur->write_cbk) ua->poll_fds[idx].events |= POLLOUT;
if (cur->except_cbk) ua->poll_fds[idx].events |= POLLPRI;
idx++;
}
ua->poll_fds_count = total_fds;
ua->poll_fds_dirty = 0;
}
// Process events from epoll (Linux only)
#if HAS_EPOLL
static void process_epoll_events(struct UASYNC* ua, struct epoll_event* events, int n_events) {
for (int i = 0; i < n_events; i++) {
// Check if this is the wakeup fd (data.ptr is NULL)
if (events[i].data.ptr == NULL) {
if (events[i].events & EPOLLIN) {
drain_wakeup_pipe(ua);
}
continue;
}
// Socket event
struct socket_node* node = (struct socket_node*)events[i].data.ptr;
if (!node || !node->active) continue;
/* Check for error conditions first */
if (events[i].events & (EPOLLERR | EPOLLHUP)) {
if (node->except_cbk) {
node->except_cbk(node->fd, node->user_data);
}
}
/* Read readiness - use appropriate callback based on socket type */
if (events[i].events & EPOLLIN) {
if (node->type == SOCKET_NODE_TYPE_SOCK) {
if (node->read_cbk_sock) {
node->read_cbk_sock(node->sock, node->user_data);
}
} else {
if (node->read_cbk) {
node->read_cbk(node->fd, node->user_data);
}
}
}
/* Write readiness - use appropriate callback based on socket type */
if (events[i].events & EPOLLOUT) {
if (node->type == SOCKET_NODE_TYPE_SOCK) {
if (node->write_cbk_sock) {
node->write_cbk_sock(node->sock, node->user_data);
}
} else {
if (node->write_cbk) {
node->write_cbk(node->fd, node->user_data);
}
}
}
}
}
#endif
// Instance version
void uasync_poll(struct UASYNC* ua, int timeout_tb) {
if (!ua) return;
if (!ua->sockets || !ua->timeout_heap) return;
DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "poll");
// Handle negative or zero timeout
if (timeout_tb < 0) timeout_tb = -1; // Infinite wait
else if (timeout_tb == 0) timeout_tb = 0; // No wait
// Get next timeout
struct timeval next_timeout;
get_next_timeout(ua, &next_timeout);
// Convert requested timeout to timeval
struct timeval req_timeout = {0};
if (timeout_tb >= 0) {
req_timeout.tv_sec = timeout_tb / 10000;
req_timeout.tv_usec = (timeout_tb % 10000) * 100;
}
// Use minimum of requested and next timer if both finite
struct timeval poll_timeout;
if (timeout_tb < 0) {
// Infinite requested - use next timer if any
poll_timeout = next_timeout;
} else {
// Finite requested - min of requested and next
if (next_timeout.tv_sec < req_timeout.tv_sec ||
(next_timeout.tv_sec == req_timeout.tv_sec && next_timeout.tv_usec < req_timeout.tv_usec)) {
poll_timeout = next_timeout;
} else {
poll_timeout = req_timeout;
}
}
int timeout_ms;
if (timeout_tb < 0 && (next_timeout.tv_sec > 0 || next_timeout.tv_usec > 0)) {
timeout_ms = (poll_timeout.tv_sec * 1000) + (poll_timeout.tv_usec / 1000);
} else if (timeout_tb < 0) {
timeout_ms = -1; // Infinite
} else {
timeout_ms = (poll_timeout.tv_sec * 1000) + (poll_timeout.tv_usec / 1000);
}
// Count active sockets
int socket_count = ua->sockets->count;
if (socket_count == 0 && timeout_ms == -1) {
// No sockets and infinite wait - but we have timers? Wait for timer
if (ua->timeout_heap->size > 0) {
timeout_ms = (poll_timeout.tv_sec * 1000) + (poll_timeout.tv_usec / 1000);
} else {
// Nothing to do - return immediately
return;
}
}
#if HAS_EPOLL
// Use epoll on Linux if available
if (ua->use_epoll && ua->epoll_fd >= 0) {
struct epoll_event events[64]; // Stack-allocated array for events
int max_events = 64;
int ret = epoll_wait(ua->epoll_fd, events, max_events, timeout_ms);
if (ret < 0) {
if (errno == EINTR) {
return;
}
perror("epoll_wait");
return;
}
/* Process socket events */
if (ret > 0) {
process_epoll_events(ua, events, ret);
}
/* Process timeouts that may have expired during poll or socket processing */
process_timeouts(ua);
return;
}
#endif
// Fallback to poll() for non-Linux or if epoll failed
// Include wakeup pipe if initialized
int wakeup_fd_present = ua->wakeup_initialized && ua->wakeup_pipe[0] >= 0;
int total_fds = socket_count + wakeup_fd_present;
// If no sockets to poll, just wait and process timeouts
if (total_fds == 0) {
if (timeout_ms > 0) {
#ifdef _WIN32
Sleep(timeout_ms);
#else
struct timespec ts = { timeout_ms / 1000, (timeout_ms % 1000) * 1000000 };
nanosleep(&ts, NULL);
#endif
}
process_timeouts(ua);
return;
}
#ifdef _WIN32
// On Windows, use select() instead of WSAPoll to avoid issues with accepted sockets
fd_set read_fds, write_fds, except_fds;
FD_ZERO(&read_fds);
FD_ZERO(&write_fds);
FD_ZERO(&except_fds);
SOCKET max_fd = 0;
// Add all active sockets to fd_sets
for (int i = 0; i < ua->sockets->count; i++) {
int idx = ua->sockets->active_indices[i];
struct socket_node* node = &ua->sockets->sockets[idx];
if (!node->active) continue;
SOCKET s;
if (node->type == SOCKET_NODE_TYPE_SOCK) {
s = node->sock;
} else {
s = (SOCKET)node->fd;
}
if (node->type == SOCKET_NODE_TYPE_SOCK) {
if (node->read_cbk_sock) FD_SET(s, &read_fds);
if (node->write_cbk_sock) FD_SET(s, &write_fds);
} else {
if (node->read_cbk) FD_SET(s, &read_fds);
if (node->write_cbk) FD_SET(s, &write_fds);
}
if (node->except_cbk) FD_SET(s, &except_fds);
if (s > max_fd) max_fd = s;
}
struct timeval tv;
tv.tv_sec = timeout_ms / 1000;
tv.tv_usec = (timeout_ms % 1000) * 1000;
int ret = select((int)max_fd + 1, &read_fds, &write_fds, &except_fds, &tv);
if (ret < 0) {
int err = WSAGetLastError();
if (err != WSAEINTR) {
DEBUG_ERROR(DEBUG_CATEGORY_UASYNC, "select failed: %d", err);
}
return;
}
if (ret > 0) {
for (int i = 0; i < ua->sockets->count; i++) {
int idx = ua->sockets->active_indices[i];
struct socket_node* node = &ua->sockets->sockets[idx];
if (!node->active) continue;
SOCKET s;
if (node->type == SOCKET_NODE_TYPE_SOCK) {
s = node->sock;
} else {
s = (SOCKET)node->fd;
}
int has_read = FD_ISSET(s, &read_fds);
int has_write = FD_ISSET(s, &write_fds);
int has_except = FD_ISSET(s, &except_fds);
if (!has_read && !has_write && !has_except) continue;
if (has_except) {
if (node->except_cbk) {
node->except_cbk(node->fd, node->user_data);
}
}
if (has_read) {
if (node->type == SOCKET_NODE_TYPE_SOCK) {
if (node->read_cbk_sock) {
node->read_cbk_sock(node->sock, node->user_data);
}
} else {
if (node->read_cbk) {
node->read_cbk(node->fd, node->user_data);
}
}
}
if (has_write) {
if (node->type == SOCKET_NODE_TYPE_SOCK) {
if (node->write_cbk_sock) {
node->write_cbk_sock(node->sock, node->user_data);
}
} else {
if (node->write_cbk) {
node->write_cbk(node->fd, node->user_data);
}
}
}
}
}
#else
// On non-Windows, use poll()
// Rebuild poll_fds if dirty or not allocated
if (ua->poll_fds_dirty || !ua->poll_fds) {
rebuild_poll_fds(ua);
}
// Ensure poll_fds_count matches current state (in case sockets changed without dirty flag)
if (ua->poll_fds_count != total_fds) {
rebuild_poll_fds(ua);
}
int ret = poll(ua->poll_fds, ua->poll_fds_count, timeout_ms);
if (ret < 0) {
if (errno == EINTR) {
return;
}
perror("poll");
return;
}
/* Process socket events first to give sockets higher priority */
if (ret > 0) {
for (int i = 0; i < ua->poll_fds_count; i++) {
if (ua->poll_fds[i].revents == 0) continue;
/* Handle wakeup fd separately */
if (wakeup_fd_present && i == 0) {
if (ua->poll_fds[i].revents & POLLIN) {
drain_wakeup_pipe(ua);
}
continue;
}
/* Socket event - lookup by fd */
struct socket_node* node = socket_array_get(ua->sockets, ua->poll_fds[i].fd);
if (!node) { // Try by socket_t (in case this is a socket)
socket_t lookup_sock = ua->poll_fds[i].fd;
node = socket_array_get_by_sock(ua->sockets, lookup_sock);
}
if (!node) continue; // Socket may have been removed
/* Check for error conditions first */
if (ua->poll_fds[i].revents & (POLLERR | POLLHUP | POLLNVAL)) {
/* Treat as exceptional condition */
if (node->except_cbk) {
node->except_cbk(node->fd, node->user_data);
}
}
/* Exceptional data (out-of-band) */
if (ua->poll_fds[i].revents & POLLPRI) {
if (node->except_cbk) {
node->except_cbk(node->fd, node->user_data);
}
}
/* Read readiness - use appropriate callback based on socket type */
if (ua->poll_fds[i].revents & POLLIN) {
if (node->type == SOCKET_NODE_TYPE_SOCK) {
if (node->read_cbk_sock) {
node->read_cbk_sock(node->sock, node->user_data);
}
} else {
if (node->read_cbk) {
node->read_cbk(node->fd, node->user_data);
}
}
}
/* Write readiness - use appropriate callback based on socket type */
if (ua->poll_fds[i].revents & POLLOUT) {
if (node->type == SOCKET_NODE_TYPE_SOCK) {
if (node->write_cbk_sock) {
node->write_cbk_sock(node->sock, node->user_data);
}
} else {
if (node->write_cbk) {
node->write_cbk(node->fd, node->user_data);
}
}
}
}
}
#endif
/* Process timeouts that may have expired during poll or socket processing */
process_timeouts(ua);
}
// Put this near the top of u_async.c, after includes and before uasync_create
#ifdef _WIN32
static void wakeup_read_callback_win(socket_t sock, void* arg) {
(void)sock; // не нужен
handle_wakeup((struct UASYNC*)arg);
}
#else
static void wakeup_read_callback_posix(int fd, void* arg) {
(void)fd;
handle_wakeup((struct UASYNC*)arg);
}
#endif
// ========== Instance management functions ==========
// Modified function in u_async.c: uasync_create
// Changes: Use self-connected UDP socket for wakeup on Windows instead of pipe.
// This ensures the wakeup is a selectable SOCKET.
struct UASYNC* uasync_create(void) {
// Initialize socket platform (Winsock on Windows)
socket_platform_init();
struct UASYNC* ua = u_calloc(1, sizeof(struct UASYNC));
if (!ua) return NULL;
ua->timer_alloc_count = 0;
ua->timer_free_count = 0;
ua->socket_alloc_count = 0;
ua->socket_free_count = 0;
ua->poll_fds = NULL;
ua->poll_fds_capacity = 0;
ua->poll_fds_count = 0;
ua->poll_fds_dirty = 1;
ua->wakeup_pipe[0] = -1;
ua->wakeup_pipe[1] = -1;
ua->wakeup_initialized = 0;
ua->posted_tasks_head = NULL;
ua->immediate_queue_head = NULL;
ua->immediate_queue_tail = NULL;
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Creating SA...");
ua->sockets = socket_array_create(16);
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Creating SA1...");
if (!ua->sockets) {
if (ua->wakeup_initialized) {
#ifdef _WIN32
closesocket((SOCKET)(intptr_t)ua->wakeup_pipe[0]);
closesocket((SOCKET)(intptr_t)ua->wakeup_pipe[1]);
#else
close(ua->wakeup_pipe[0]);
close(ua->wakeup_pipe[1]);
#endif
}
u_free(ua);
return NULL;
}
ua->timeout_heap = timeout_heap_create(16);
if (!ua->timeout_heap) {
socket_array_destroy(ua->sockets);
if (ua->wakeup_initialized) {
#ifdef _WIN32
closesocket((SOCKET)(intptr_t)ua->wakeup_pipe[0]);
closesocket((SOCKET)(intptr_t)ua->wakeup_pipe[1]);
#else
close(ua->wakeup_pipe[0]);
close(ua->wakeup_pipe[1]);
#endif
}
u_free(ua);
return NULL;
}
// Initialize timeout pool
ua->timeout_pool = memory_pool_init(sizeof(struct timeout_node));
if (!ua->timeout_pool) {
timeout_heap_destroy(ua->timeout_heap);
socket_array_destroy(ua->sockets);
if (ua->wakeup_initialized) {
#ifdef _WIN32
closesocket((SOCKET)(intptr_t)ua->wakeup_pipe[0]);
closesocket((SOCKET)(intptr_t)ua->wakeup_pipe[1]);
#else
close(ua->wakeup_pipe[0]);
close(ua->wakeup_pipe[1]);
#endif
}
u_free(ua);
return NULL;
}
// Set callback to u_free timeout nodes and update counters
timeout_heap_set_free_callback(ua->timeout_heap, ua, timeout_node_free_callback);
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Creating TH1...");
// Initialize epoll on Linux
ua->epoll_fd = -1;
ua->use_epoll = 0;
#if HAS_EPOLL
ua->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
if (ua->epoll_fd >= 0) {
ua->use_epoll = 1;
DEBUG_INFO(DEBUG_CATEGORY_UASYNC, "Using epoll for socket monitoring");
// Add wakeup pipe to epoll
if (ua->wakeup_initialized) {
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.ptr = NULL; // NULL ptr indicates wakeup fd
if (epoll_ctl(ua->epoll_fd, EPOLL_CTL_ADD, ua->wakeup_pipe[0], &ev) < 0) {
DEBUG_WARN(DEBUG_CATEGORY_UASYNC, "Failed to add wakeup pipe to epoll: %s", strerror(errno));
}
}
} else {
DEBUG_WARN(DEBUG_CATEGORY_UASYNC, "Failed to create epoll fd, falling back to poll: %s", strerror(errno));
}
#endif
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Creating TH2...");
#ifdef _WIN32
// Windows: self-connected UDP socket for wakeup
SOCKET r = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
SOCKET w = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (r == INVALID_SOCKET || w == INVALID_SOCKET) {
DEBUG_ERROR(DEBUG_CATEGORY_UASYNC, "Failed to create wakeup sockets");
u_free(ua);
return NULL;
}
struct sockaddr_in addr = {0};
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
addr.sin_port = 0;
if (bind(r, (struct sockaddr*)&addr, sizeof(addr)) == SOCKET_ERROR ||
getsockname(r, (struct sockaddr*)&addr, &(int){sizeof(addr)}) == SOCKET_ERROR ||
connect(w, (struct sockaddr*)&addr, sizeof(addr)) == SOCKET_ERROR) {
closesocket(r);
closesocket(w);
DEBUG_ERROR(DEBUG_CATEGORY_UASYNC, "Wakeup socket setup failed: %d", WSAGetLastError());
u_free(ua);
return NULL;
}
ua->wakeup_pipe[0] = (int)(intptr_t)r;
ua->wakeup_pipe[1] = (int)(intptr_t)w;
ua->wakeup_initialized = 1;
u_long mode = 1;
ioctlsocket(r, FIONBIO, &mode);
// Register the read socket with uasync
uasync_add_socket_t(ua, r, wakeup_read_callback_win, NULL, NULL, ua); // ← ua как user_data
// uasync_add_socket_t(ua, r, wakeup_read_callback_win, NULL, NULL, NULL);
InitializeCriticalSection(&ua->posted_lock);
#else
// POSIX pipe
if (pipe(ua->wakeup_pipe) != 0) {
DEBUG_ERROR(DEBUG_CATEGORY_UASYNC, "pipe() failed: %s", strerror(errno));
u_free(ua);
return NULL;
}
ua->wakeup_initialized = 1;
fcntl(ua->wakeup_pipe[0], F_SETFL, fcntl(ua->wakeup_pipe[0], F_GETFL, 0) | O_NONBLOCK);
fcntl(ua->wakeup_pipe[1], F_SETFL, fcntl(ua->wakeup_pipe[1], F_GETFL, 0) | O_NONBLOCK);
if (!ua->use_epoll) {
uasync_add_socket(ua, ua->wakeup_pipe[0], wakeup_read_callback_posix, NULL, NULL, ua); // ← ua
}
pthread_mutex_init(&ua->posted_lock, NULL);
#endif
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Creating TH3...");
return ua;
}
// Print all resources for debugging
void uasync_print_resources(struct UASYNC* ua, const char* prefix) {
if (!ua) {
printf("%s: NULL uasync instance\n", prefix);
return;
}
printf("\n🔍 %s: UASYNC Resource Report for %p\n", prefix, ua);
printf(" Timer Statistics: allocated=%zu, u_freed=%zu, active=%zd\n",
ua->timer_alloc_count, ua->timer_free_count,
(ssize_t)(ua->timer_alloc_count - ua->timer_free_count));
printf(" Socket Statistics: allocated=%zu, u_freed=%zu, active=%zd\n",
ua->socket_alloc_count, ua->socket_free_count,
(ssize_t)(ua->socket_alloc_count - ua->socket_free_count));
// Показать активные таймеры
if (ua->timeout_heap) {
size_t active_timers = 0;
// Безопасное чтение без извлечения - просто итерируем по массиву
for (size_t i = 0; i < ua->timeout_heap->size; i++) {
if (!ua->timeout_heap->heap[i].deleted) {
active_timers++;
struct timeout_node* node = (struct timeout_node*)ua->timeout_heap->heap[i].data;
printf(" Timer: node=%p, expires=%llu ms\n",
node, (unsigned long long)ua->timeout_heap->heap[i].expiration);
}
}
printf(" Active timers in heap: %zu\n", active_timers);
}
// Показать активные сокеты
if (ua->sockets) {
int active_sockets = 0;
printf(" Socket array capacity: %d, active: %d\n",
ua->sockets->capacity, ua->sockets->count);
for (int i = 0; i < ua->sockets->capacity; i++) {
if (ua->sockets->sockets[i].active) {
active_sockets++;
printf(" Socket: fd=%d, active=%d\n",
ua->sockets->sockets[i].fd,
ua->sockets->sockets[i].active);
}
}
printf(" Total active sockets: %d\n", active_sockets);
}
printf("🔚 %s: End of resource report\n\n", prefix);
}
// Modified function in u_async.c: uasync_destroy
// Changes: Close wakeup sockets properly on Windows.
void uasync_destroy(struct UASYNC* ua, int close_fds) {
if (!ua) return;
DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_destroy: starting cleanup for ua=%p", ua);
// Диагностика ресурсов перед очисткой
uasync_print_resources(ua, "BEFORE_DESTROY");
// Check for potential memory leaks
if (ua->timer_alloc_count != ua->timer_free_count || ua->socket_alloc_count != ua->socket_free_count) {
DEBUG_ERROR(DEBUG_CATEGORY_MEMORY, "Memory leaks detected before cleanup: timers %zu/%zu, sockets %zu/%zu",
ua->timer_alloc_count, ua->timer_free_count, ua->socket_alloc_count, ua->socket_free_count);
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "Timer leak: allocated=%zu, u_freed=%zu, diff=%zd",
ua->timer_alloc_count, ua->timer_free_count,
(ssize_t)(ua->timer_alloc_count - ua->timer_free_count));
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "Socket leak: allocated=%zu, u_freed=%zu, diff=%zd",
ua->socket_alloc_count, ua->socket_free_count,
(ssize_t)(ua->socket_alloc_count - ua->socket_free_count));
// Continue cleanup, will abort after if leaks remain
}
// Free all remaining timeouts
// Очистить immediate_queue
while (ua->immediate_queue_head) {
struct timeout_node* node = ua->immediate_queue_head;
ua->immediate_queue_head = node->next;
if (node) {
node->ua->timer_free_count++;
memory_pool_free(ua->timeout_pool, node);
}
}
ua->immediate_queue_tail = NULL;
// Очистить heap
if (ua->timeout_heap) {
size_t u_freed_count = 0;
while (1) {
TimeoutEntry entry;
if (timeout_heap_pop(ua->timeout_heap, &entry) != 0) break;
struct timeout_node* node = (struct timeout_node*)entry.data;
// Free all timer nodes (avoid double-u_free bug)
if (node) {
ua->timer_free_count++;
memory_pool_free(ua->timeout_pool, node);
}
}
timeout_heap_destroy(ua->timeout_heap);
ua->timeout_heap = NULL;
}
// Destroy timeout pool
if (ua->timeout_pool) {
memory_pool_destroy(ua->timeout_pool);
ua->timeout_pool = NULL;
}
// Free all socket nodes using array approach
if (ua->sockets) {
// Count and u_free all active sockets
int u_freed_count = 0;
for (int i = 0; i < ua->sockets->capacity; i++) {
if (ua->sockets->sockets[i].active) {
if (close_fds && ua->sockets->sockets[i].fd >= 0) {
#ifdef _WIN32
if (ua->sockets->sockets[i].type == SOCKET_NODE_TYPE_SOCK) {
closesocket(ua->sockets->sockets[i].sock);
} else {
close(ua->sockets->sockets[i].fd); // For pipes/FDs
}
#else
close(ua->sockets->sockets[i].fd);
#endif
}
ua->socket_free_count++;
u_freed_count++;
}
}
DEBUG_DEBUG(DEBUG_CATEGORY_MEMORY, "Freed %d socket nodes in destroy", u_freed_count);
socket_array_destroy(ua->sockets);
}
// Close wakeup pipe/sockets
if (ua->wakeup_initialized) {
#ifdef _WIN32
closesocket((SOCKET)(intptr_t)ua->wakeup_pipe[0]);
closesocket((SOCKET)(intptr_t)ua->wakeup_pipe[1]);
#else
close(ua->wakeup_pipe[0]);
close(ua->wakeup_pipe[1]);
#endif
}
// Free cached poll_fds
u_free(ua->poll_fds);
// Close epoll fd on Linux
#if HAS_EPOLL
if (ua->epoll_fd >= 0) {
close(ua->epoll_fd);
}
#endif
// Final leak check
if (ua->timer_alloc_count != ua->timer_free_count || ua->socket_alloc_count != ua->socket_free_count) {
DEBUG_ERROR(DEBUG_CATEGORY_MEMORY, "Memory leaks detected after cleanup: timers %zu/%zu, sockets %zu/%zu",
ua->timer_alloc_count, ua->timer_free_count, ua->socket_alloc_count, ua->socket_free_count);
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "FINAL Timer leak: allocated=%zu, u_freed=%zu, diff=%zd",
ua->timer_alloc_count, ua->timer_free_count,
(ssize_t)(ua->timer_alloc_count - ua->timer_free_count));
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "FINAL Socket leak: allocated=%zu, u_freed=%zu, diff=%zd",
ua->socket_alloc_count, ua->socket_free_count,
(ssize_t)(ua->socket_alloc_count - ua->socket_free_count));
abort();
}
DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_destroy: completed successfully for ua=%p", ua);
while (ua->posted_tasks_head) {
struct posted_task* t = ua->posted_tasks_head;
ua->posted_tasks_head = t->next;
u_free(t);
}
#ifdef _WIN32
DeleteCriticalSection(&ua->posted_lock);
#else
pthread_mutex_destroy(&ua->posted_lock);
#endif
u_free(ua);
// Cleanup socket platform (WSACleanup on Windows)
socket_platform_cleanup();
}
void uasync_init_instance(struct UASYNC* ua) {
if (!ua) return;
// Initialize socket array if not present
if (!ua->sockets) {
ua->sockets = socket_array_create(16);
}
if (!ua->timeout_pool) {
ua->timeout_pool = memory_pool_init(sizeof(struct timeout_node));
}
if (!ua->immediate_queue_head) {
ua->immediate_queue_head = NULL;
ua->immediate_queue_tail = NULL;
}
if (!ua->timeout_heap) {
ua->timeout_heap = timeout_heap_create(16);
if (ua->timeout_heap) {
timeout_heap_set_free_callback(ua->timeout_heap, ua, timeout_node_free_callback);
}
}
}
// Debug statistics
void uasync_get_stats(struct UASYNC* ua, size_t* timer_alloc, size_t* timer_u_free, size_t* socket_alloc, size_t* socket_u_free) {
if (!ua) return;
if (timer_alloc) *timer_alloc = ua->timer_alloc_count;
if (timer_u_free) *timer_u_free = ua->timer_free_count;
if (socket_alloc) *socket_alloc = ua->socket_alloc_count;
if (socket_u_free) *socket_u_free = ua->socket_free_count;
}
void uasync_memsync(struct UASYNC* ua) {
#ifdef _WIN32
EnterCriticalSection(&ua->posted_lock);
#else
pthread_mutex_lock(&ua->posted_lock);
#endif
#ifdef _WIN32
LeaveCriticalSection(&ua->posted_lock);
#else
pthread_mutex_unlock(&ua->posted_lock);
#endif
}
void uasync_post(struct UASYNC* ua, uasync_post_callback_t callback, void* arg) {
if (!ua || !callback) return;
struct posted_task* task = u_malloc(sizeof(struct posted_task));
if (!task) return;
task->callback = callback;
task->arg = arg;
task->next = NULL;
#ifdef _WIN32
EnterCriticalSection(&ua->posted_lock);
#else
pthread_mutex_lock(&ua->posted_lock);
#endif
if (ua->posted_tasks_tail) {
// есть конец списка — добавляем туда
ua->posted_tasks_tail->next = task;
ua->posted_tasks_tail = task;
} else {
// список пустой — это первая задача
ua->posted_tasks_head = ua->posted_tasks_tail = task;
}
#ifdef _WIN32
LeaveCriticalSection(&ua->posted_lock);
#else
pthread_mutex_unlock(&ua->posted_lock);
#endif
uasync_wakeup(ua); // будим mainloop
}
// Wakeup mechanism
int uasync_wakeup(struct UASYNC* ua) {
if (!ua || !ua->wakeup_initialized) return -1;
char byte = 0;
#ifdef _WIN32
int ret = send((SOCKET)(intptr_t)ua->wakeup_pipe[1], &byte, 1, 0);
#else
ssize_t ret = write(ua->wakeup_pipe[1], &byte, 1);
#endif
if (ret != 1) {
// Don't print error from signal handler
return -1;
}
return 0;
}
int uasync_get_wakeup_fd(struct UASYNC* ua) {
if (!ua || !ua->wakeup_initialized) return -1;
return ua->wakeup_pipe[1];
}
/* Lookup socket by file descriptor - returns current pointer even after u_realloc */
int uasync_lookup_socket(struct UASYNC* ua, int fd, void** socket_id) {
if (!ua || !ua->sockets || !socket_id || fd < 0 || fd >= FD_SETSIZE) {
return -1;
}
*socket_id = socket_array_get(ua->sockets, fd);
return (*socket_id != NULL) ? 0 : -1;
}
void uasync_mainloop(struct UASYNC* ua) {
while (1) {
uasync_poll(ua, -1); // Infinite wait
}
}