// uasync.c #include "u_async.h" #include "debug_config.h" #include #include #include #include #include #include #ifndef _WIN32 #include #include #endif // Platform-specific includes #ifdef __linux__ #include #define HAS_EPOLL 1 #else #define HAS_EPOLL 0 #endif // Timeout node with safe cancellation struct timeout_node { void* arg; timeout_callback_t callback; uint64_t expiration_ms; // absolute expiration time in milliseconds struct UASYNC* ua; // Pointer back to uasync instance for counter updates int cancelled; // Cancellation flag }; // Socket node with array-based storage struct socket_node { int fd; // File descriptor (for pipe, file) socket_t sock; // Socket (for cross-platform sockets) int type; // SOCKET_NODE_TYPE_FD or SOCKET_NODE_TYPE_SOCK socket_callback_t read_cbk; // For FD type socket_callback_t write_cbk; // For FD type socket_t_callback_t read_cbk_sock; // For SOCK type socket_t_callback_t write_cbk_sock; // For SOCK type socket_callback_t except_cbk; void* user_data; int active; // 1 if socket is active, 0 if freed (for reuse) }; // Array-based socket management for O(1) operations struct socket_array { struct socket_node* sockets; // Dynamic array of socket nodes int* fd_to_index; // FD to array index mapping int* index_to_fd; // Array index to FD mapping int* active_indices; // Array of indices of active sockets (for O(1) traversal) int capacity; // Total allocated capacity int count; // Number of active sockets int max_fd; // Maximum FD for bounds checking }; static struct socket_array* socket_array_create(int initial_capacity); static void socket_array_destroy(struct socket_array* sa); static int socket_array_add(struct socket_array* sa, int fd, socket_callback_t read_cbk, socket_callback_t write_cbk, socket_callback_t except_cbk, void* user_data); static int socket_array_remove(struct socket_array* sa, int fd); static struct socket_node* socket_array_get(struct socket_array* sa, int fd); // No global instance - each module must use its own struct UASYNC instance // Array-based socket management implementation static struct socket_array* socket_array_create(int initial_capacity) { if (initial_capacity < 4) initial_capacity = 4; // Minimum capacity struct socket_array* sa = malloc(sizeof(struct socket_array)); if (!sa) return NULL; sa->sockets = calloc(initial_capacity, sizeof(struct socket_node)); sa->fd_to_index = calloc(initial_capacity, sizeof(int)); sa->index_to_fd = calloc(initial_capacity, sizeof(int)); sa->active_indices = calloc(initial_capacity, sizeof(int)); if (!sa->sockets || !sa->fd_to_index || !sa->index_to_fd || !sa->active_indices) { free(sa->sockets); free(sa->fd_to_index); free(sa->index_to_fd); free(sa->active_indices); free(sa); return NULL; } // Initialize mapping arrays to -1 (invalid) for (int i = 0; i < initial_capacity; i++) { sa->fd_to_index[i] = -1; sa->index_to_fd[i] = -1; sa->active_indices[i] = -1; sa->sockets[i].fd = -1; sa->sockets[i].active = 0; } sa->capacity = initial_capacity; sa->count = 0; sa->max_fd = -1; return sa; } static void socket_array_destroy(struct socket_array* sa) { if (!sa) return; free(sa->sockets); free(sa->fd_to_index); free(sa->index_to_fd); free(sa->active_indices); free(sa); } static int socket_array_add_internal(struct socket_array* sa, int fd, socket_t sock, int type, socket_callback_t read_cbk_fd, socket_callback_t write_cbk_fd, socket_t_callback_t read_cbk_sock, socket_t_callback_t write_cbk_sock, socket_callback_t except_cbk, void* user_data) { if (!sa || fd < 0 || fd >= FD_SETSIZE) return -1; if (fd >= sa->capacity) { // Need to resize - double the capacity int new_capacity = sa->capacity * 2; if (fd >= new_capacity) new_capacity = fd + 16; // Ensure enough space struct socket_node* new_sockets = realloc(sa->sockets, new_capacity * sizeof(struct socket_node)); int* new_fd_to_index = realloc(sa->fd_to_index, new_capacity * sizeof(int)); int* new_index_to_fd = realloc(sa->index_to_fd, new_capacity * sizeof(int)); int* new_active_indices = realloc(sa->active_indices, new_capacity * sizeof(int)); if (!new_sockets || !new_fd_to_index || !new_index_to_fd || !new_active_indices) { // Allocation failed free(new_sockets); free(new_fd_to_index); free(new_index_to_fd); free(new_active_indices); return -1; } // Initialize new elements for (int i = sa->capacity; i < new_capacity; i++) { new_fd_to_index[i] = -1; new_index_to_fd[i] = -1; new_active_indices[i] = -1; new_sockets[i].fd = -1; new_sockets[i].active = 0; } sa->sockets = new_sockets; sa->fd_to_index = new_fd_to_index; sa->index_to_fd = new_index_to_fd; sa->active_indices = new_active_indices; sa->capacity = new_capacity; } // Check if FD already exists if (sa->fd_to_index[fd] != -1) return -1; // FD already exists // Find first free slot int index = -1; for (int i = 0; i < sa->capacity; i++) { if (!sa->sockets[i].active) { index = i; break; } } if (index == -1) return -1; // No free slots (shouldn't happen) // Add the socket sa->sockets[index].fd = fd; sa->sockets[index].sock = sock; sa->sockets[index].type = type; sa->sockets[index].read_cbk = read_cbk_fd; sa->sockets[index].write_cbk = write_cbk_fd; sa->sockets[index].read_cbk_sock = read_cbk_sock; sa->sockets[index].write_cbk_sock = write_cbk_sock; sa->sockets[index].except_cbk = except_cbk; sa->sockets[index].user_data = user_data; sa->sockets[index].active = 1; sa->fd_to_index[fd] = index; sa->index_to_fd[index] = fd; sa->active_indices[sa->count] = index; // Add to active list sa->count++; if (fd > sa->max_fd) sa->max_fd = fd; return index; } // Wrapper for adding regular file descriptors (pipe, file) static int socket_array_add(struct socket_array* sa, int fd, socket_callback_t read_cbk, socket_callback_t write_cbk, socket_callback_t except_cbk, void* user_data) { return socket_array_add_internal(sa, fd, SOCKET_INVALID, SOCKET_NODE_TYPE_FD, read_cbk, write_cbk, NULL, NULL, except_cbk, user_data); } // Wrapper for adding socket_t (cross-platform sockets) static int socket_array_add_socket_t(struct socket_array* sa, socket_t sock, socket_t_callback_t read_cbk, socket_t_callback_t write_cbk, socket_callback_t except_cbk, void* user_data) { // On Windows, SOCKET is UINT_PTR, so we need to handle indexing differently #ifdef _WIN32 int fd = (int)(intptr_t)sock; // Use socket value as index on Windows (simplified) #else int fd = sock; // On POSIX, socket_t is int #endif if (fd < 0 || fd >= FD_SETSIZE) return -1; return socket_array_add_internal(sa, fd, sock, SOCKET_NODE_TYPE_SOCK, NULL, NULL, read_cbk, write_cbk, except_cbk, user_data); } static int socket_array_remove(struct socket_array* sa, int fd) { if (!sa || fd < 0 || fd >= sa->capacity) return -1; int index = sa->fd_to_index[fd]; if (index == -1 || !sa->sockets[index].active) return -1; // FD not found // Mark as inactive sa->sockets[index].active = 0; sa->sockets[index].fd = -1; sa->sockets[index].sock = SOCKET_INVALID; sa->sockets[index].type = SOCKET_NODE_TYPE_FD; sa->fd_to_index[fd] = -1; sa->index_to_fd[index] = -1; // Remove from active_indices by swapping with last element // Find position in active_indices for (int i = 0; i < sa->count; i++) { if (sa->active_indices[i] == index) { // Swap with last element sa->active_indices[i] = sa->active_indices[sa->count - 1]; sa->active_indices[sa->count - 1] = -1; break; } } sa->count--; return 0; } static struct socket_node* socket_array_get(struct socket_array* sa, int fd) { if (!sa || fd < 0 || fd >= sa->capacity) return NULL; int index = sa->fd_to_index[fd]; if (index == -1 || !sa->sockets[index].active) return NULL; return &sa->sockets[index]; } // Get socket_node by socket_t static struct socket_node* socket_array_get_by_sock(struct socket_array* sa, socket_t sock) { if (!sa) return NULL; #ifdef _WIN32 int fd = (int)(intptr_t)sock; #else int fd = sock; #endif if (fd < 0 || fd >= sa->capacity) return NULL; int index = sa->fd_to_index[fd]; if (index == -1 || !sa->sockets[index].active) return NULL; if (sa->sockets[index].type != SOCKET_NODE_TYPE_SOCK) return NULL; return &sa->sockets[index]; } // Callback to free timeout node and update counters static void timeout_node_free_callback(void* user_data, void* data) { struct UASYNC* ua = (struct UASYNC*)user_data; struct timeout_node* node = (struct timeout_node*)data; (void)node; // Not used directly, but keep for consistency ua->timer_free_count++; free(data); } // Helper to get current time static void get_current_time(struct timeval* tv) { gettimeofday(tv, NULL); } // Drain wakeup pipe - read all available bytes static void drain_wakeup_pipe(struct UASYNC* ua) { if (!ua || !ua->wakeup_initialized) return; char buf[64]; while (1) { ssize_t n = read(ua->wakeup_pipe[0], buf, sizeof(buf)); if (n <= 0) break; } } // Helper to add timeval: tv += dt (timebase units) static void timeval_add_tb(struct timeval* tv, int dt) { tv->tv_usec += (dt % 10000) * 100; tv->tv_sec += dt / 10000 + tv->tv_usec / 1000000; tv->tv_usec %= 1000000; } // Convert timeval to milliseconds (uint64_t) static uint64_t timeval_to_ms(const struct timeval* tv) { return (uint64_t)tv->tv_sec * 1000ULL + (uint64_t)tv->tv_usec / 1000ULL; } // Simplified timeout handling without reference counting // Process expired timeouts with safe cancellation static void process_timeouts(struct UASYNC* ua) { if (!ua || !ua->timeout_heap) return; struct timeval now_tv; get_current_time(&now_tv); uint64_t now_ms = timeval_to_ms(&now_tv); while (1) { TimeoutEntry entry; if (timeout_heap_peek(ua->timeout_heap, &entry) != 0) break; if (entry.expiration > now_ms) break; // Pop the expired timeout timeout_heap_pop(ua->timeout_heap, &entry); struct timeout_node* node = (struct timeout_node*)entry.data; if (node && node->callback && !node->cancelled) { // Execute callback only if not cancelled node->callback(node->arg); } // Always free the node after processing if (node && node->ua) { node->ua->timer_free_count++; } free(node); break; } } // Compute time to next timeout static void get_next_timeout(struct UASYNC* ua, struct timeval* tv) { if (!ua || !ua->timeout_heap) { tv->tv_sec = 0; tv->tv_usec = 0; return; } TimeoutEntry entry; if (timeout_heap_peek(ua->timeout_heap, &entry) != 0) { tv->tv_sec = 0; tv->tv_usec = 0; return; } struct timeval now_tv; get_current_time(&now_tv); uint64_t now_ms = timeval_to_ms(&now_tv); if (entry.expiration <= now_ms) { tv->tv_sec = 0; tv->tv_usec = 0; return; } uint64_t delta_ms = entry.expiration - now_ms; tv->tv_sec = delta_ms / 1000; tv->tv_usec = (delta_ms % 1000) * 1000; } // Instance version void* uasync_set_timeout(struct UASYNC* ua, int timeout_tb, void* arg, timeout_callback_t callback) { if (!ua || timeout_tb < 0 || !callback) return NULL; if (!ua->timeout_heap) return NULL; // DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_set_timeout: timeout=%d.%d ms, arg=%p, callback=%p", timeout_tb/10, timeout_tb%10, arg, callback); struct timeout_node* node = malloc(sizeof(struct timeout_node)); if (!node) { DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "uasync_set_timeout: failed to allocate node"); return NULL; } ua->timer_alloc_count++; node->arg = arg; node->callback = callback; node->ua = ua; node->cancelled = 0; // Calculate expiration time in milliseconds struct timeval now; get_current_time(&now); timeval_add_tb(&now, timeout_tb); node->expiration_ms = timeval_to_ms(&now); // Add to heap if (timeout_heap_push(ua->timeout_heap, node->expiration_ms, node) != 0) { DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "uasync_set_timeout: failed to push to heap"); free(node); ua->timer_free_count++; // Balance the alloc counter return NULL; } return node; } // Instance version err_t uasync_cancel_timeout(struct UASYNC* ua, void* t_id) { if (!ua || !t_id || !ua->timeout_heap) { DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "uasync_cancel_timeout: invalid parameters ua=%p, t_id=%p, heap=%p", ua, t_id, ua ? ua->timeout_heap : NULL); return ERR_FAIL; } struct timeout_node* node = (struct timeout_node*)t_id; // Try to cancel from heap first if (timeout_heap_cancel(ua->timeout_heap, node->expiration_ms, node) == 0) { // Successfully marked as deleted - free will happen lazily in heap node->cancelled = 1; node->callback = NULL; // DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_cancel_timeout: successfully cancelled timer %p from heap", node); return ERR_OK; } DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_cancel_timeout: not found in heap: ua=%p, t_id=%p, node=%p, expires=%llu ms", ua, t_id, node, (unsigned long long)node->expiration_ms); // If not found in heap, it may have already expired or been invalid return ERR_FAIL; } // Instance version void* uasync_add_socket(struct UASYNC* ua, int fd, socket_callback_t read_cbk, socket_callback_t write_cbk, socket_callback_t except_cbk, void* user_data) { if (!ua || fd < 0 || fd >= FD_SETSIZE) return NULL; // Bounds check int index = socket_array_add(ua->sockets, fd, read_cbk, write_cbk, except_cbk, user_data); if (index < 0) return NULL; ua->socket_alloc_count++; ua->poll_fds_dirty = 1; // Mark poll_fds as needing rebuild #if HAS_EPOLL // Add to epoll if using epoll if (ua->use_epoll && ua->epoll_fd >= 0) { struct epoll_event ev; ev.events = 0; if (read_cbk) ev.events |= EPOLLIN; if (write_cbk) ev.events |= EPOLLOUT; // Use level-triggered mode (default) for compatibility with UDP sockets ev.data.ptr = &ua->sockets->sockets[index]; if (epoll_ctl(ua->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) { // Failed to add to epoll - remove from socket array and return error socket_array_remove(ua->sockets, fd); ua->socket_alloc_count--; return NULL; } } #endif // Return pointer to the socket node as ID return &ua->sockets->sockets[index]; } err_t uasync_remove_socket(struct UASYNC* ua, void* s_id) { if (!ua || !s_id) return ERR_FAIL; struct socket_node* node = (struct socket_node*)s_id; if (!node->active || node->fd < 0) return ERR_FAIL; int fd = node->fd; #if HAS_EPOLL // Remove from epoll if using epoll if (ua->use_epoll && ua->epoll_fd >= 0) { epoll_ctl(ua->epoll_fd, EPOLL_CTL_DEL, fd, NULL); } #endif int ret = socket_array_remove(ua->sockets, fd); if (ret == 0) { ua->socket_free_count++; ua->poll_fds_dirty = 1; // Mark poll_fds as needing rebuild return ERR_OK; } return ERR_FAIL; } // Add socket_t (cross-platform socket) void* uasync_add_socket_t(struct UASYNC* ua, socket_t sock, socket_t_callback_t read_cbk, socket_t_callback_t write_cbk, socket_t_callback_t except_cbk, void* user_data) { if (!ua || sock == SOCKET_INVALID) return NULL; int index = socket_array_add_socket_t(ua->sockets, sock, read_cbk, write_cbk, (socket_callback_t)except_cbk, user_data); if (index < 0) return NULL; ua->socket_alloc_count++; ua->poll_fds_dirty = 1; #if HAS_EPOLL if (ua->use_epoll && ua->epoll_fd >= 0) { struct epoll_event ev; ev.events = 0; if (read_cbk) ev.events |= EPOLLIN; if (write_cbk) ev.events |= EPOLLOUT; ev.data.ptr = &ua->sockets->sockets[index]; // On Windows, need to cast socket_t to int for epoll_ctl #ifdef _WIN32 int fd = (int)(intptr_t)sock; #else int fd = sock; #endif if (epoll_ctl(ua->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) { socket_array_remove(ua->sockets, fd); ua->socket_alloc_count--; return NULL; } } #endif return &ua->sockets->sockets[index]; } // Remove socket by socket_t err_t uasync_remove_socket_t(struct UASYNC* ua, socket_t sock) { if (!ua || sock == SOCKET_INVALID) return ERR_FAIL; struct socket_node* node = socket_array_get_by_sock(ua->sockets, sock); if (!node || !node->active) return ERR_FAIL; #if HAS_EPOLL if (ua->use_epoll && ua->epoll_fd >= 0) { #ifdef _WIN32 int fd = (int)(intptr_t)sock; #else int fd = sock; #endif epoll_ctl(ua->epoll_fd, EPOLL_CTL_DEL, fd, NULL); } #endif #ifdef _WIN32 int fd = (int)(intptr_t)sock; #else int fd = sock; #endif int ret = socket_array_remove(ua->sockets, fd); if (ret == 0) { ua->socket_free_count++; ua->poll_fds_dirty = 1; return ERR_OK; } return ERR_FAIL; } // Helper function to rebuild cached pollfd array static void rebuild_poll_fds(struct UASYNC* ua) { if (!ua || !ua->sockets) return; int socket_count = ua->sockets->count; int wakeup_fd_present = ua->wakeup_initialized && ua->wakeup_pipe[0] >= 0; int total_fds = socket_count + wakeup_fd_present; // Ensure poll_fds capacity is sufficient if (total_fds > ua->poll_fds_capacity) { int new_capacity = total_fds * 2; if (new_capacity < 16) new_capacity = 16; struct pollfd* new_poll_fds = realloc(ua->poll_fds, sizeof(struct pollfd) * new_capacity); if (!new_poll_fds) return; // Keep old allocation on failure ua->poll_fds = new_poll_fds; ua->poll_fds_capacity = new_capacity; } int idx = 0; // Add wakeup fd first if present if (wakeup_fd_present) { ua->poll_fds[idx].fd = ua->wakeup_pipe[0]; ua->poll_fds[idx].events = POLLIN; ua->poll_fds[idx].revents = 0; idx++; } // Add socket fds using active_indices for O(1) traversal for (int i = 0; i < socket_count; i++) { int socket_array_idx = ua->sockets->active_indices[i]; struct socket_node* cur = &ua->sockets->sockets[socket_array_idx]; // Handle socket_t vs int fd if (cur->type == SOCKET_NODE_TYPE_SOCK) { // socket_t - cast to int for pollfd #ifdef _WIN32 ua->poll_fds[idx].fd = (int)(intptr_t)cur->sock; #else ua->poll_fds[idx].fd = cur->sock; #endif } else { // Regular fd ua->poll_fds[idx].fd = cur->fd; } ua->poll_fds[idx].events = 0; ua->poll_fds[idx].revents = 0; if (cur->type == SOCKET_NODE_TYPE_SOCK) { if (cur->read_cbk_sock) ua->poll_fds[idx].events |= POLLIN; if (cur->write_cbk_sock) ua->poll_fds[idx].events |= POLLOUT; } else { if (cur->read_cbk) ua->poll_fds[idx].events |= POLLIN; if (cur->write_cbk) ua->poll_fds[idx].events |= POLLOUT; } if (cur->write_cbk) ua->poll_fds[idx].events |= POLLOUT; if (cur->except_cbk) ua->poll_fds[idx].events |= POLLPRI; idx++; } ua->poll_fds_count = total_fds; ua->poll_fds_dirty = 0; } // Process events from epoll (Linux only) #if HAS_EPOLL static void process_epoll_events(struct UASYNC* ua, struct epoll_event* events, int n_events) { for (int i = 0; i < n_events; i++) { // Check if this is the wakeup fd (data.ptr is NULL) if (events[i].data.ptr == NULL) { if (events[i].events & EPOLLIN) { drain_wakeup_pipe(ua); } continue; } // Socket event struct socket_node* node = (struct socket_node*)events[i].data.ptr; if (!node || !node->active) continue; /* Check for error conditions first */ if (events[i].events & (EPOLLERR | EPOLLHUP)) { if (node->except_cbk) { node->except_cbk(node->fd, node->user_data); } } /* Read readiness - use appropriate callback based on socket type */ if (events[i].events & EPOLLIN) { if (node->type == SOCKET_NODE_TYPE_SOCK) { if (node->read_cbk_sock) { node->read_cbk_sock(node->sock, node->user_data); } } else { if (node->read_cbk) { node->read_cbk(node->fd, node->user_data); } } } /* Write readiness - use appropriate callback based on socket type */ if (events[i].events & EPOLLOUT) { if (node->type == SOCKET_NODE_TYPE_SOCK) { if (node->write_cbk_sock) { node->write_cbk_sock(node->sock, node->user_data); } } else { if (node->write_cbk) { node->write_cbk(node->fd, node->user_data); } } } } } #endif // Instance version void uasync_poll(struct UASYNC* ua, int timeout_tb) { if (!ua) return; if (!ua->sockets || !ua->timeout_heap) return; DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "poll"); // Handle negative or zero timeout if (timeout_tb < 0) timeout_tb = -1; // Infinite wait else if (timeout_tb == 0) timeout_tb = 0; // No wait // Get next timeout struct timeval next_timeout; get_next_timeout(ua, &next_timeout); // Convert requested timeout to timeval struct timeval req_timeout = {0}; if (timeout_tb >= 0) { req_timeout.tv_sec = timeout_tb / 10000; req_timeout.tv_usec = (timeout_tb % 10000) * 100; } // Use minimum of requested and next timer if both finite struct timeval poll_timeout; if (timeout_tb < 0) { // Infinite requested - use next timer if any poll_timeout = next_timeout; } else { // Finite requested - min of requested and next if (next_timeout.tv_sec < req_timeout.tv_sec || (next_timeout.tv_sec == req_timeout.tv_sec && next_timeout.tv_usec < req_timeout.tv_usec)) { poll_timeout = next_timeout; } else { poll_timeout = req_timeout; } } int timeout_ms; if (timeout_tb < 0 && (next_timeout.tv_sec > 0 || next_timeout.tv_usec > 0)) { timeout_ms = (poll_timeout.tv_sec * 1000) + (poll_timeout.tv_usec / 1000); } else if (timeout_tb < 0) { timeout_ms = -1; // Infinite } else { timeout_ms = (poll_timeout.tv_sec * 1000) + (poll_timeout.tv_usec / 1000); } // Count active sockets int socket_count = ua->sockets->count; if (socket_count == 0 && timeout_ms == -1) { // No sockets and infinite wait - but we have timers? Wait for timer if (ua->timeout_heap->size > 0) { timeout_ms = (poll_timeout.tv_sec * 1000) + (poll_timeout.tv_usec / 1000); } else { // Nothing to do - return immediately return; } } #if HAS_EPOLL // Use epoll on Linux if available if (ua->use_epoll && ua->epoll_fd >= 0) { struct epoll_event events[64]; // Stack-allocated array for events int max_events = 64; int ret = epoll_wait(ua->epoll_fd, events, max_events, timeout_ms); if (ret < 0) { if (errno == EINTR) { return; } perror("epoll_wait"); return; } /* Process socket events */ if (ret > 0) { process_epoll_events(ua, events, ret); } /* Process timeouts that may have expired during poll or socket processing */ process_timeouts(ua); return; } #endif // Fallback to poll() for non-Linux or if epoll failed // Include wakeup pipe if initialized int wakeup_fd_present = ua->wakeup_initialized && ua->wakeup_pipe[0] >= 0; int total_fds = socket_count + wakeup_fd_present; // Rebuild poll_fds if dirty or not allocated if (ua->poll_fds_dirty || !ua->poll_fds) { rebuild_poll_fds(ua); } // Ensure poll_fds_count matches current state (in case sockets changed without dirty flag) if (ua->poll_fds_count != total_fds) { rebuild_poll_fds(ua); } /* Call poll with cached fds */ int ret = poll(ua->poll_fds, ua->poll_fds_count, timeout_ms); if (ret < 0) { if (errno == EINTR) { return; } perror("poll"); return; } /* Process socket events first to give sockets higher priority */ if (ret > 0) { for (int i = 0; i < ua->poll_fds_count; i++) { if (ua->poll_fds[i].revents == 0) continue; /* Handle wakeup fd separately */ if (wakeup_fd_present && i == 0) { if (ua->poll_fds[i].revents & POLLIN) { drain_wakeup_pipe(ua); } continue; } /* Socket event - lookup by fd */ struct socket_node* node = socket_array_get(ua->sockets, ua->poll_fds[i].fd); if (!node) { // Try by socket_t (in case this is a socket) socket_t lookup_sock; #ifdef _WIN32 lookup_sock = (socket_t)(intptr_t)ua->poll_fds[i].fd; #else lookup_sock = ua->poll_fds[i].fd; #endif node = socket_array_get_by_sock(ua->sockets, lookup_sock); } if (!node) continue; // Socket may have been removed /* Check for error conditions first */ if (ua->poll_fds[i].revents & (POLLERR | POLLHUP | POLLNVAL)) { /* Treat as exceptional condition */ if (node->except_cbk) { node->except_cbk(node->fd, node->user_data); } } /* Exceptional data (out-of-band) */ if (ua->poll_fds[i].revents & POLLPRI) { if (node->except_cbk) { node->except_cbk(node->fd, node->user_data); } } /* Read readiness - use appropriate callback based on socket type */ if (ua->poll_fds[i].revents & POLLIN) { if (node->type == SOCKET_NODE_TYPE_SOCK) { if (node->read_cbk_sock) { node->read_cbk_sock(node->sock, node->user_data); } } else { if (node->read_cbk) { node->read_cbk(node->fd, node->user_data); } } } /* Write readiness - use appropriate callback based on socket type */ if (ua->poll_fds[i].revents & POLLOUT) { if (node->type == SOCKET_NODE_TYPE_SOCK) { if (node->write_cbk_sock) { node->write_cbk_sock(node->sock, node->user_data); } } else { if (node->write_cbk) { node->write_cbk(node->fd, node->user_data); } } } } } /* Process timeouts that may have expired during poll or socket processing */ process_timeouts(ua); } // ========== Instance management functions ========== struct UASYNC* uasync_create(void) { // Initialize socket platform first (WSAStartup on Windows) if (socket_platform_init() != 0) { DEBUG_ERROR(DEBUG_CATEGORY_SOCKET, "Failed to initialize socket platform"); return NULL; } struct UASYNC* ua = malloc(sizeof(struct UASYNC)); if (!ua) { socket_platform_cleanup(); return NULL; } memset(ua, 0, sizeof(struct UASYNC)); ua->wakeup_pipe[0] = -1; ua->wakeup_pipe[1] = -1; ua->wakeup_initialized = 0; // Create wakeup pipe if (pipe(ua->wakeup_pipe) < 0) { DEBUG_WARN(DEBUG_CATEGORY_UASYNC, "Failed to create wakeup pipe: %s", strerror(errno)); // Continue without wakeup mechanism ua->wakeup_pipe[0] = -1; ua->wakeup_pipe[1] = -1; } else { ua->wakeup_initialized = 1; // Set non-blocking on read end to avoid blocking if pipe is full int flags = fcntl(ua->wakeup_pipe[0], F_GETFL, 0); if (flags >= 0) { fcntl(ua->wakeup_pipe[0], F_SETFL, flags | O_NONBLOCK); } } ua->sockets = socket_array_create(16); if (!ua->sockets) { if (ua->wakeup_initialized) { close(ua->wakeup_pipe[0]); close(ua->wakeup_pipe[1]); } free(ua); return NULL; } ua->timeout_heap = timeout_heap_create(16); if (!ua->timeout_heap) { socket_array_destroy(ua->sockets); if (ua->wakeup_initialized) { close(ua->wakeup_pipe[0]); close(ua->wakeup_pipe[1]); } free(ua); return NULL; } // Set callback to free timeout nodes and update counters timeout_heap_set_free_callback(ua->timeout_heap, ua, timeout_node_free_callback); // Initialize epoll on Linux ua->epoll_fd = -1; ua->use_epoll = 0; #if HAS_EPOLL ua->epoll_fd = epoll_create1(EPOLL_CLOEXEC); if (ua->epoll_fd >= 0) { ua->use_epoll = 1; DEBUG_INFO(DEBUG_CATEGORY_UASYNC, "Using epoll for socket monitoring"); // Add wakeup pipe to epoll if (ua->wakeup_initialized) { struct epoll_event ev; ev.events = EPOLLIN; ev.data.ptr = NULL; // NULL ptr indicates wakeup fd if (epoll_ctl(ua->epoll_fd, EPOLL_CTL_ADD, ua->wakeup_pipe[0], &ev) < 0) { DEBUG_WARN(DEBUG_CATEGORY_UASYNC, "Failed to add wakeup pipe to epoll: %s", strerror(errno)); } } } else { DEBUG_WARN(DEBUG_CATEGORY_UASYNC, "Failed to create epoll fd, falling back to poll: %s", strerror(errno)); } #endif return ua; } // Print all resources for debugging void uasync_print_resources(struct UASYNC* ua, const char* prefix) { if (!ua) { printf("%s: NULL uasync instance\n", prefix); return; } printf("\nπŸ” %s: UASYNC Resource Report for %p\n", prefix, ua); printf(" Timer Statistics: allocated=%zu, freed=%zu, active=%zd\n", ua->timer_alloc_count, ua->timer_free_count, (ssize_t)(ua->timer_alloc_count - ua->timer_free_count)); printf(" Socket Statistics: allocated=%zu, freed=%zu, active=%zd\n", ua->socket_alloc_count, ua->socket_free_count, (ssize_t)(ua->socket_alloc_count - ua->socket_free_count)); // ΠŸΠΎΠΊΠ°Π·Π°Ρ‚ΡŒ Π°ΠΊΡ‚ΠΈΠ²Π½Ρ‹Π΅ Ρ‚Π°ΠΉΠΌΠ΅Ρ€Ρ‹ if (ua->timeout_heap) { size_t active_timers = 0; // БСзопасноС Ρ‡Ρ‚Π΅Π½ΠΈΠ΅ Π±Π΅Π· извлСчСния - просто ΠΈΡ‚Π΅Ρ€ΠΈΡ€ΡƒΠ΅ΠΌ ΠΏΠΎ массиву for (size_t i = 0; i < ua->timeout_heap->size; i++) { if (!ua->timeout_heap->heap[i].deleted) { active_timers++; struct timeout_node* node = (struct timeout_node*)ua->timeout_heap->heap[i].data; printf(" Timer: node=%p, expires=%llu ms, cancelled=%d\n", node, (unsigned long long)ua->timeout_heap->heap[i].expiration, node->cancelled); } } printf(" Active timers in heap: %zu\n", active_timers); } // ΠŸΠΎΠΊΠ°Π·Π°Ρ‚ΡŒ Π°ΠΊΡ‚ΠΈΠ²Π½Ρ‹Π΅ сокСты if (ua->sockets) { int active_sockets = 0; printf(" Socket array capacity: %d, active: %d\n", ua->sockets->capacity, ua->sockets->count); for (int i = 0; i < ua->sockets->capacity; i++) { if (ua->sockets->sockets[i].active) { active_sockets++; printf(" Socket: fd=%d, active=%d\n", ua->sockets->sockets[i].fd, ua->sockets->sockets[i].active); } } printf(" Total active sockets: %d\n", active_sockets); } printf("πŸ”š %s: End of resource report\n\n", prefix); } void uasync_destroy(struct UASYNC* ua, int close_fds) { if (!ua) return; DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_destroy: starting cleanup for ua=%p", ua); // Диагностика рСсурсов ΠΏΠ΅Ρ€Π΅Π΄ очисткой uasync_print_resources(ua, "BEFORE_DESTROY"); // Check for potential memory leaks if (ua->timer_alloc_count != ua->timer_free_count || ua->socket_alloc_count != ua->socket_free_count) { DEBUG_ERROR(DEBUG_CATEGORY_MEMORY, "Memory leaks detected before cleanup: timers %zu/%zu, sockets %zu/%zu", ua->timer_alloc_count, ua->timer_free_count, ua->socket_alloc_count, ua->socket_free_count); DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "Timer leak: allocated=%zu, freed=%zu, diff=%zd", ua->timer_alloc_count, ua->timer_free_count, (ssize_t)(ua->timer_alloc_count - ua->timer_free_count)); DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "Socket leak: allocated=%zu, freed=%zu, diff=%zd", ua->socket_alloc_count, ua->socket_free_count, (ssize_t)(ua->socket_alloc_count - ua->socket_free_count)); // Continue cleanup, will abort after if leaks remain } // Free all remaining timeouts if (ua->timeout_heap) { size_t freed_count = 0; while (1) { TimeoutEntry entry; if (timeout_heap_pop(ua->timeout_heap, &entry) != 0) break; struct timeout_node* node = (struct timeout_node*)entry.data; // Free all timer nodes (avoid double-free bug) if (node) { ua->timer_free_count++; free(node); } } timeout_heap_destroy(ua->timeout_heap); } // Free all socket nodes using array approach if (ua->sockets) { // Count and free all active sockets int freed_count = 0; for (int i = 0; i < ua->sockets->capacity; i++) { if (ua->sockets->sockets[i].active) { if (close_fds && ua->sockets->sockets[i].fd >= 0) { close(ua->sockets->sockets[i].fd); } ua->socket_free_count++; freed_count++; } } DEBUG_DEBUG(DEBUG_CATEGORY_MEMORY, "Freed %d socket nodes in destroy", freed_count); socket_array_destroy(ua->sockets); } // Close wakeup pipe if (ua->wakeup_initialized) { close(ua->wakeup_pipe[0]); close(ua->wakeup_pipe[1]); } // Free cached poll_fds free(ua->poll_fds); // Close epoll fd on Linux #if HAS_EPOLL if (ua->epoll_fd >= 0) { close(ua->epoll_fd); } #endif // Final leak check if (ua->timer_alloc_count != ua->timer_free_count || ua->socket_alloc_count != ua->socket_free_count) { DEBUG_ERROR(DEBUG_CATEGORY_MEMORY, "Memory leaks detected after cleanup: timers %zu/%zu, sockets %zu/%zu", ua->timer_alloc_count, ua->timer_free_count, ua->socket_alloc_count, ua->socket_free_count); DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "FINAL Timer leak: allocated=%zu, freed=%zu, diff=%zd", ua->timer_alloc_count, ua->timer_free_count, (ssize_t)(ua->timer_alloc_count - ua->timer_free_count)); DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "FINAL Socket leak: allocated=%zu, freed=%zu, diff=%zd", ua->socket_alloc_count, ua->socket_free_count, (ssize_t)(ua->socket_alloc_count - ua->socket_free_count)); abort(); } DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_destroy: completed successfully for ua=%p", ua); free(ua); // Cleanup socket platform (WSACleanup on Windows) socket_platform_cleanup(); } void uasync_init_instance(struct UASYNC* ua) { if (!ua) return; // Initialize socket array if not present if (!ua->sockets) { ua->sockets = socket_array_create(16); } if (!ua->timeout_heap) { ua->timeout_heap = timeout_heap_create(16); if (ua->timeout_heap) { timeout_heap_set_free_callback(ua->timeout_heap, ua, timeout_node_free_callback); } } } // Debug statistics void uasync_get_stats(struct UASYNC* ua, size_t* timer_alloc, size_t* timer_free, size_t* socket_alloc, size_t* socket_free) { if (!ua) return; if (timer_alloc) *timer_alloc = ua->timer_alloc_count; if (timer_free) *timer_free = ua->timer_free_count; if (socket_alloc) *socket_alloc = ua->socket_alloc_count; if (socket_free) *socket_free = ua->socket_free_count; } // Get global instance for backward compatibility // Wakeup mechanism int uasync_wakeup(struct UASYNC* ua) { if (!ua || !ua->wakeup_initialized) return -1; char byte = 0; ssize_t ret = write(ua->wakeup_pipe[1], &byte, 1); if (ret != 1) { // Don't print error from signal handler return -1; } return 0; } int uasync_get_wakeup_fd(struct UASYNC* ua) { if (!ua || !ua->wakeup_initialized) return -1; return ua->wakeup_pipe[1]; } /* Lookup socket by file descriptor - returns current pointer even after realloc */ int uasync_lookup_socket(struct UASYNC* ua, int fd, void** socket_id) { if (!ua || !ua->sockets || !socket_id || fd < 0 || fd >= FD_SETSIZE) { return -1; } *socket_id = socket_array_get(ua->sockets, fd); return (*socket_id != NULL) ? 0 : -1; } void uasync_mainloop(struct UASYNC* ua) { while (1) { uasync_poll(ua, -1); // Infinite wait } }