You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

746 lines
23 KiB

// uasync.c
#include "u_async.h"
#include "timeout_heap.h"
#include "debug_config.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <poll.h>
#include <limits.h>
#include <fcntl.h>
// Timeout node with safe cancellation
struct timeout_node {
void* arg;
timeout_callback_t callback;
uint64_t expiration_ms; // absolute expiration time in milliseconds
uasync_t* ua; // Pointer back to uasync instance for counter updates
int cancelled; // Cancellation flag
};
// Socket node with array-based storage
struct socket_node {
int fd;
socket_callback_t read_cbk;
socket_callback_t write_cbk;
socket_callback_t except_cbk;
void* user_data;
int active; // 1 if socket is active, 0 if freed (for reuse)
};
// Array-based socket management for O(1) operations
struct socket_array {
struct socket_node* sockets; // Dynamic array of socket nodes
int* fd_to_index; // FD to array index mapping
int* index_to_fd; // Array index to FD mapping
int capacity; // Total allocated capacity
int count; // Number of active sockets
int max_fd; // Maximum FD for bounds checking
};
static struct socket_array* socket_array_create(int initial_capacity);
static void socket_array_destroy(struct socket_array* sa);
static int socket_array_add(struct socket_array* sa, int fd, socket_callback_t read_cbk, socket_callback_t write_cbk, socket_callback_t except_cbk, void* user_data);
static int socket_array_remove(struct socket_array* sa, int fd);
static struct socket_node* socket_array_get(struct socket_array* sa, int fd);
static int socket_array_build_pollfd(struct socket_array* sa, struct pollfd* fds, int max_fds);
// Uasync instance structure
struct uasync_s {
TimeoutHeap* timeout_heap; // Heap for timeout management
struct socket_array* sockets; // Array-based socket management
// Debug counters for memory allocation tracking
size_t timer_alloc_count;
size_t timer_free_count;
size_t socket_alloc_count;
size_t socket_free_count;
// Wakeup pipe for interrupting poll
int wakeup_pipe[2]; // [0] read, [1] write
int wakeup_initialized;
};
// No global instance - each module must use its own uasync_t instance
// Array-based socket management implementation
static struct socket_array* socket_array_create(int initial_capacity) {
if (initial_capacity < 4) initial_capacity = 4; // Minimum capacity
struct socket_array* sa = malloc(sizeof(struct socket_array));
if (!sa) return NULL;
sa->sockets = calloc(initial_capacity, sizeof(struct socket_node));
sa->fd_to_index = calloc(initial_capacity, sizeof(int));
sa->index_to_fd = calloc(initial_capacity, sizeof(int));
if (!sa->sockets || !sa->fd_to_index || !sa->index_to_fd) {
free(sa->sockets);
free(sa->fd_to_index);
free(sa->index_to_fd);
free(sa);
return NULL;
}
// Initialize mapping arrays to -1 (invalid)
for (int i = 0; i < initial_capacity; i++) {
sa->fd_to_index[i] = -1;
sa->index_to_fd[i] = -1;
sa->sockets[i].fd = -1;
sa->sockets[i].active = 0;
}
sa->capacity = initial_capacity;
sa->count = 0;
sa->max_fd = -1;
return sa;
}
static void socket_array_destroy(struct socket_array* sa) {
if (!sa) return;
free(sa->sockets);
free(sa->fd_to_index);
free(sa->index_to_fd);
free(sa);
}
static int socket_array_add(struct socket_array* sa, int fd, socket_callback_t read_cbk, socket_callback_t write_cbk, socket_callback_t except_cbk, void* user_data) {
if (!sa || fd < 0 || fd >= FD_SETSIZE) return -1;
if (fd >= sa->capacity) {
// Need to resize - double the capacity
int new_capacity = sa->capacity * 2;
if (fd >= new_capacity) new_capacity = fd + 16; // Ensure enough space
struct socket_node* new_sockets = realloc(sa->sockets, new_capacity * sizeof(struct socket_node));
int* new_fd_to_index = realloc(sa->fd_to_index, new_capacity * sizeof(int));
int* new_index_to_fd = realloc(sa->index_to_fd, new_capacity * sizeof(int));
if (!new_sockets || !new_fd_to_index || !new_index_to_fd) {
// Allocation failed
free(new_sockets);
free(new_fd_to_index);
free(new_index_to_fd);
return -1;
}
// Initialize new elements
for (int i = sa->capacity; i < new_capacity; i++) {
new_fd_to_index[i] = -1;
new_index_to_fd[i] = -1;
new_sockets[i].fd = -1;
new_sockets[i].active = 0;
}
sa->sockets = new_sockets;
sa->fd_to_index = new_fd_to_index;
sa->index_to_fd = new_index_to_fd;
sa->capacity = new_capacity;
}
// Check if FD already exists
if (sa->fd_to_index[fd] != -1) return -1; // FD already exists
// Find first free slot
int index = -1;
for (int i = 0; i < sa->capacity; i++) {
if (!sa->sockets[i].active) {
index = i;
break;
}
}
if (index == -1) return -1; // No free slots (shouldn't happen)
// Add the socket
sa->sockets[index].fd = fd;
sa->sockets[index].read_cbk = read_cbk;
sa->sockets[index].write_cbk = write_cbk;
sa->sockets[index].except_cbk = except_cbk;
sa->sockets[index].user_data = user_data;
sa->sockets[index].active = 1;
sa->fd_to_index[fd] = index;
sa->index_to_fd[index] = fd;
sa->count++;
if (fd > sa->max_fd) sa->max_fd = fd;
return index;
}
static int socket_array_remove(struct socket_array* sa, int fd) {
if (!sa || fd < 0 || fd >= sa->capacity) return -1;
int index = sa->fd_to_index[fd];
if (index == -1 || !sa->sockets[index].active) return -1; // FD not found
// Mark as inactive
sa->sockets[index].active = 0;
sa->sockets[index].fd = -1;
sa->fd_to_index[fd] = -1;
sa->index_to_fd[index] = -1;
sa->count--;
return 0;
}
static struct socket_node* socket_array_get(struct socket_array* sa, int fd) {
if (!sa || fd < 0 || fd >= sa->capacity) return NULL;
int index = sa->fd_to_index[fd];
if (index == -1 || !sa->sockets[index].active) return NULL;
return &sa->sockets[index];
}
static int socket_array_build_pollfd(struct socket_array* sa, struct pollfd* fds, int max_fds) {
if (!sa || !fds || max_fds <= 0) return 0;
int count = 0;
for (int i = 0; i < sa->capacity && count < max_fds; i++) {
if (sa->sockets[i].active) {
fds[count].fd = sa->sockets[i].fd;
fds[count].events = 0;
if (sa->sockets[i].read_cbk) fds[count].events |= POLLIN;
if (sa->sockets[i].write_cbk) fds[count].events |= POLLOUT;
if (sa->sockets[i].except_cbk) fds[count].events |= POLLERR;
fds[count].revents = 0;
count++;
}
}
return count;
}
// Callback to free timeout node and update counters
static void timeout_node_free_callback(void* user_data, void* data) {
uasync_t* ua = (uasync_t*)user_data;
struct timeout_node* node = (struct timeout_node*)data;
(void)node; // Not used directly, but keep for consistency
ua->timer_free_count++;
free(data);
}
// Helper to get current time
static void get_current_time(struct timeval* tv) {
gettimeofday(tv, NULL);
}
// Drain wakeup pipe - read all available bytes
static void drain_wakeup_pipe(uasync_t* ua) {
if (!ua || !ua->wakeup_initialized) return;
char buf[64];
while (1) {
ssize_t n = read(ua->wakeup_pipe[0], buf, sizeof(buf));
if (n <= 0) break;
}
}
// Helper to add timeval: tv += dt (timebase units)
static void timeval_add_tb(struct timeval* tv, int dt) {
tv->tv_usec += (dt % 10000) * 100;
tv->tv_sec += dt / 10000 + tv->tv_usec / 1000000;
tv->tv_usec %= 1000000;
}
// Convert timeval to milliseconds (uint64_t)
static uint64_t timeval_to_ms(const struct timeval* tv) {
return (uint64_t)tv->tv_sec * 1000ULL + (uint64_t)tv->tv_usec / 1000ULL;
}
// Simplified timeout handling without reference counting
// Process expired timeouts with safe cancellation
static void process_timeouts(struct uasync_s* ua) {
if (!ua || !ua->timeout_heap) return;
struct timeval now_tv;
get_current_time(&now_tv);
uint64_t now_ms = timeval_to_ms(&now_tv);
while (1) {
TimeoutEntry entry;
if (timeout_heap_peek(ua->timeout_heap, &entry) != 0) break;
if (entry.expiration > now_ms) break;
// Pop the expired timeout
timeout_heap_pop(ua->timeout_heap, &entry);
struct timeout_node* node = (struct timeout_node*)entry.data;
if (node && node->callback && !node->cancelled) {
// Execute callback only if not cancelled
node->callback(node->arg);
}
// Always free the node after processing
if (node && node->ua) {
node->ua->timer_free_count++;
}
free(node);
}
}
// Compute time to next timeout
static void get_next_timeout(struct uasync_s* ua, struct timeval* tv) {
if (!ua || !ua->timeout_heap) {
tv->tv_sec = 0;
tv->tv_usec = 0;
return;
}
TimeoutEntry entry;
if (timeout_heap_peek(ua->timeout_heap, &entry) != 0) {
tv->tv_sec = 0;
tv->tv_usec = 0;
return;
}
struct timeval now_tv;
get_current_time(&now_tv);
uint64_t now_ms = timeval_to_ms(&now_tv);
if (entry.expiration <= now_ms) {
tv->tv_sec = 0;
tv->tv_usec = 0;
return;
}
uint64_t delta_ms = entry.expiration - now_ms;
if (delta_ms > 86400000) { // Cap at 1 day to avoid overflow
delta_ms = 86400000;
}
tv->tv_sec = delta_ms / 1000;
tv->tv_usec = (delta_ms % 1000) * 1000;
}
// Instance version
void* uasync_set_timeout(uasync_t* ua, int timeout_tb, void* arg, timeout_callback_t callback) {
if (!ua || timeout_tb < 0 || !callback) return NULL;
if (!ua->timeout_heap) return NULL;
struct timeout_node* node = malloc(sizeof(struct timeout_node));
if (!node) return NULL;
ua->timer_alloc_count++;
node->arg = arg;
node->callback = callback;
node->ua = ua;
node->cancelled = 0;
// Calculate expiration time in milliseconds
struct timeval now;
get_current_time(&now);
timeval_add_tb(&now, timeout_tb);
node->expiration_ms = timeval_to_ms(&now);
// Add to heap
if (timeout_heap_push(ua->timeout_heap, node->expiration_ms, node) != 0) {
free(node);
ua->timer_free_count++; // Balance the alloc counter
return NULL;
}
return node;
}
// Instance version
err_t uasync_cancel_timeout(uasync_t* ua, void* t_id) {
if (!ua || !t_id || !ua->timeout_heap) return ERR_FAIL;
struct timeout_node* node = (struct timeout_node*)t_id;
// Try to cancel from heap first
if (timeout_heap_cancel(ua->timeout_heap, node->expiration_ms, node) == 0) {
// Successfully removed from heap - mark as cancelled
node->cancelled = 1;
node->callback = NULL;
return ERR_OK;
}
// If not found in heap (maybe already expired and being processed)
// We still need to mark it as cancelled to prevent callback execution
node->cancelled = 1;
node->callback = NULL;
return ERR_OK; // Successfully cancelled (marked)
}
// Instance version
void* uasync_add_socket(uasync_t* ua, int fd, socket_callback_t read_cbk, socket_callback_t write_cbk, socket_callback_t except_cbk, void* user_data) {
if (!ua || fd < 0 || fd >= FD_SETSIZE) return NULL; // Bounds check
int index = socket_array_add(ua->sockets, fd, read_cbk, write_cbk, except_cbk, user_data);
if (index < 0) return NULL;
ua->socket_alloc_count++;
// Return pointer to the socket node (same as before for API compatibility)
return &ua->sockets->sockets[index];
}
// Instance version
err_t uasync_remove_socket(uasync_t* ua, void* s_id) {
if (!ua || !s_id) return ERR_FAIL;
struct socket_node* node = (struct socket_node*)s_id;
if (node->fd < 0) return ERR_FAIL; // Invalid node
int result = socket_array_remove(ua->sockets, node->fd);
if (result != 0) return ERR_FAIL;
ua->socket_free_count++;
return ERR_OK;
}
void uasync_mainloop(uasync_t* ua) {
while (1) {
uasync_poll(ua, -1); /* infinite timeout */
}
}
// Instance version
void uasync_poll(uasync_t* ua, int timeout_tb) {
if (!ua) return;
/* Process expired timeouts */
process_timeouts(ua);
/* Compute timeout for poll in milliseconds */
int timeout_ms = -1; // infinite by default
// Get next timeout from heap
struct timeval tv;
get_next_timeout(ua, &tv);
if (tv.tv_sec > 0 || tv.tv_usec > 0 || (ua->timeout_heap && ua->timeout_heap->size > 0)) {
// Convert timeval to milliseconds, cap at INT_MAX
uint64_t ms = (uint64_t)tv.tv_sec * 1000ULL + (uint64_t)tv.tv_usec / 1000ULL;
if (ms > INT_MAX) ms = INT_MAX;
timeout_ms = (int)ms;
}
/* If timeout_tb >= 0, compute timeout as min(timeout_tb, existing timer) */
if (timeout_tb >= 0) {
// Convert timebase (0.1 ms) to milliseconds
int user_timeout_ms = timeout_tb / 10;
if (timeout_tb % 10 != 0) user_timeout_ms++; // round up
if (timeout_ms < 0 || user_timeout_ms < timeout_ms) {
timeout_ms = user_timeout_ms;
}
}
/* Build pollfd array from socket array - O(1) per socket */
int socket_count = ua->sockets ? ua->sockets->count : 0;
int wakeup_fd_present = ua->wakeup_initialized ? 1 : 0;
int total_fds = socket_count + wakeup_fd_present;
if (total_fds == 0) {
/* No sockets and no wakeup fd, just wait for timeout */
if (timeout_ms >= 0) {
/* usleep would be better but we just call poll with empty set */
struct pollfd dummy;
poll(&dummy, 0, timeout_ms);
} else {
/* Infinite timeout with no sockets - should not happen in practice */
return;
}
/* Check timeouts again after sleep */
process_timeouts(ua);
return;
}
struct pollfd* fds = malloc(total_fds * sizeof(struct pollfd));
struct socket_node** nodes = NULL;
if (socket_count > 0) {
nodes = malloc(socket_count * sizeof(struct socket_node*));
}
if (!fds || (socket_count > 0 && !nodes)) {
free(fds);
free(nodes);
return; /* out of memory */
}
/* Fill arrays */
int idx = 0;
/* Add wakeup fd first if present */
if (wakeup_fd_present) {
fds[idx].fd = ua->wakeup_pipe[0];
fds[idx].events = POLLIN;
fds[idx].revents = 0;
idx++;
}
/* Add socket fds using efficient array traversal */
int node_idx = 0;
for (int i = 0; i < ua->sockets->capacity && node_idx < socket_count; i++) {
if (ua->sockets->sockets[i].active) {
struct socket_node* cur = &ua->sockets->sockets[i];
fds[idx].fd = cur->fd;
fds[idx].events = 0;
fds[idx].revents = 0;
if (cur->read_cbk) fds[idx].events |= POLLIN;
if (cur->write_cbk) fds[idx].events |= POLLOUT;
if (cur->except_cbk) fds[idx].events |= POLLPRI;
if (nodes) {
nodes[node_idx] = cur;
}
idx++;
node_idx++;
}
}
/* Call poll */
int ret = poll(fds, total_fds, timeout_ms);
if (ret < 0) {
if (errno == EINTR) {
free(fds);
free(nodes);
return;
}
perror("poll");
free(fds);
free(nodes);
return;
}
/* Process timeouts that may have expired during poll */
process_timeouts(ua);
/* Process socket events */
if (ret > 0) {
for (int i = 0; i < total_fds; i++) {
if (fds[i].revents == 0) continue;
/* Handle wakeup fd separately */
if (wakeup_fd_present && i == 0) {
if (fds[i].revents & POLLIN) {
drain_wakeup_pipe(ua);
}
continue;
}
/* Socket event */
int socket_idx = i - wakeup_fd_present;
struct socket_node* node = nodes[socket_idx];
/* Check for error conditions first */
if (fds[i].revents & (POLLERR | POLLHUP | POLLNVAL)) {
/* Treat as exceptional condition */
if (node->except_cbk) {
node->except_cbk(node->fd, node->user_data);
}
}
/* Exceptional data (out-of-band) */
if (fds[i].revents & POLLPRI) {
if (node->except_cbk) {
node->except_cbk(node->fd, node->user_data);
}
}
/* Read readiness */
if (fds[i].revents & POLLIN) {
if (node->read_cbk) {
node->read_cbk(node->fd, node->user_data);
}
}
/* Write readiness */
if (fds[i].revents & POLLOUT) {
if (node->write_cbk) {
node->write_cbk(node->fd, node->user_data);
}
}
}
}
free(fds);
free(nodes);
}
// ========== Instance management functions ==========
uasync_t* uasync_create(void) {
// Initialize debug system on first use
static int debug_initialized = 0;
if (!debug_initialized) {
debug_config_init();
debug_initialized = 1;
}
uasync_t* ua = malloc(sizeof(struct uasync_s));
if (!ua) return NULL;
memset(ua, 0, sizeof(struct uasync_s));
ua->wakeup_pipe[0] = -1;
ua->wakeup_pipe[1] = -1;
ua->wakeup_initialized = 0;
// Create wakeup pipe
if (pipe(ua->wakeup_pipe) < 0) {
DEBUG_WARN(DEBUG_CATEGORY_UASYNC, "Failed to create wakeup pipe: %s", strerror(errno));
// Continue without wakeup mechanism
ua->wakeup_pipe[0] = -1;
ua->wakeup_pipe[1] = -1;
} else {
ua->wakeup_initialized = 1;
// Set non-blocking on read end to avoid blocking if pipe is full
int flags = fcntl(ua->wakeup_pipe[0], F_GETFL, 0);
if (flags >= 0) {
fcntl(ua->wakeup_pipe[0], F_SETFL, flags | O_NONBLOCK);
}
}
ua->sockets = socket_array_create(16);
if (!ua->sockets) {
if (ua->wakeup_initialized) {
close(ua->wakeup_pipe[0]);
close(ua->wakeup_pipe[1]);
}
free(ua);
return NULL;
}
ua->timeout_heap = timeout_heap_create(16);
if (!ua->timeout_heap) {
socket_array_destroy(ua->sockets);
if (ua->wakeup_initialized) {
close(ua->wakeup_pipe[0]);
close(ua->wakeup_pipe[1]);
}
free(ua);
return NULL;
}
// Set callback to free timeout nodes and update counters
timeout_heap_set_free_callback(ua->timeout_heap, ua, timeout_node_free_callback);
return ua;
}
void uasync_destroy(uasync_t* ua) {
if (!ua) return;
// Check for potential memory leaks
if (ua->timer_alloc_count != ua->timer_free_count || ua->socket_alloc_count != ua->socket_free_count) {
DEBUG_ERROR(DEBUG_CATEGORY_MEMORY, "Memory leaks detected before cleanup: timers %zu/%zu, sockets %zu/%zu",
ua->timer_alloc_count, ua->timer_free_count, ua->socket_alloc_count, ua->socket_free_count);
// Continue cleanup, will abort after if leaks remain
}
// Free all remaining timeouts
if (ua->timeout_heap) {
size_t freed_count = 0;
while (1) {
TimeoutEntry entry;
if (timeout_heap_pop(ua->timeout_heap, &entry) != 0) break;
struct timeout_node* node = (struct timeout_node*)entry.data;
ua->timer_free_count++;
freed_count++;
free(node);
}
DEBUG_DEBUG(DEBUG_CATEGORY_MEMORY, "Freed %zu timer nodes in destroy, heap freed_count = %zu",
freed_count, ua->timeout_heap->freed_count);
timeout_heap_destroy(ua->timeout_heap);
}
// Free all socket nodes using array approach
if (ua->sockets) {
// Count and free all active sockets
int freed_count = 0;
for (int i = 0; i < ua->sockets->capacity; i++) {
if (ua->sockets->sockets[i].active) {
ua->socket_free_count++;
freed_count++;
}
}
DEBUG_DEBUG(DEBUG_CATEGORY_MEMORY, "Freed %d socket nodes in destroy", freed_count);
socket_array_destroy(ua->sockets);
}
// Close wakeup pipe
if (ua->wakeup_initialized) {
close(ua->wakeup_pipe[0]);
close(ua->wakeup_pipe[1]);
}
// Final leak check
if (ua->timer_alloc_count != ua->timer_free_count || ua->socket_alloc_count != ua->socket_free_count) {
DEBUG_ERROR(DEBUG_CATEGORY_MEMORY, "Memory leaks detected after cleanup: timers %zu/%zu, sockets %zu/%zu",
ua->timer_alloc_count, ua->timer_free_count, ua->socket_alloc_count, ua->socket_free_count);
abort();
}
free(ua);
}
void uasync_init_instance(uasync_t* ua) {
if (!ua) return;
// Initialize socket array if not present
if (!ua->sockets) {
ua->sockets = socket_array_create(16);
}
if (!ua->timeout_heap) {
ua->timeout_heap = timeout_heap_create(16);
if (ua->timeout_heap) {
timeout_heap_set_free_callback(ua->timeout_heap, ua, timeout_node_free_callback);
}
}
}
// Debug statistics
void uasync_get_stats(uasync_t* ua, size_t* timer_alloc, size_t* timer_free, size_t* socket_alloc, size_t* socket_free) {
if (!ua) return;
if (timer_alloc) *timer_alloc = ua->timer_alloc_count;
if (timer_free) *timer_free = ua->timer_free_count;
if (socket_alloc) *socket_alloc = ua->socket_alloc_count;
if (socket_free) *socket_free = ua->socket_free_count;
}
// Get global instance for backward compatibility
// Wakeup mechanism
int uasync_wakeup(uasync_t* ua) {
if (!ua || !ua->wakeup_initialized) return -1;
char byte = 0;
ssize_t ret = write(ua->wakeup_pipe[1], &byte, 1);
if (ret != 1) {
// Don't print error from signal handler
return -1;
}
return 0;
}
int uasync_get_wakeup_fd(uasync_t* ua) {
if (!ua || !ua->wakeup_initialized) return -1;
return ua->wakeup_pipe[1];
}