You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

617 lines
20 KiB

// uasync.c
#include "u_async.h"
#include "debug_config.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <poll.h>
#include <limits.h>
#include <fcntl.h>
#include <sys/time.h>
// Socket node with array-based storage
struct socket_node {
int fd;
socket_callback_t read_cbk;
socket_callback_t write_cbk;
socket_callback_t except_cbk;
void* user_data;
int active; // 1 if socket is active, 0 if freed (for reuse)
};
// Array-based socket management for O(1) operations
struct socket_array {
struct socket_node* sockets; // Dynamic array of socket nodes
int* fd_to_index; // FD to array index mapping
int* index_to_fd; // Array index to FD mapping
int capacity; // Total allocated capacity
int count; // Number of active sockets
int max_fd; // Maximum FD for bounds checking
};
static struct socket_array* socket_array_create(int initial_capacity);
static void socket_array_destroy(struct socket_array* sa);
static int socket_array_add(struct socket_array* sa, int fd, socket_callback_t read_cbk, socket_callback_t write_cbk, socket_callback_t except_cbk, void* user_data);
static int socket_array_remove(struct socket_array* sa, int fd);
static struct socket_node* socket_array_get(struct socket_array* sa, int fd);
// Helper to get current time
static void get_current_time(struct timeval* tv) {
gettimeofday(tv, NULL);
}
static void timeval_add_tb(struct timeval* tv, int dt) {
if (dt <= 0) return;
long long us_add = (long long)dt * 100LL; // 0.1ms = 100us
tv->tv_usec += us_add % 1000000LL;
tv->tv_sec += us_add / 1000000LL;
if (tv->tv_usec >= 1000000LL) {
tv->tv_sec += tv->tv_usec / 1000000LL;
tv->tv_usec %= 1000000LL;
}
}
static uint64_t timeval_to_ms(const struct timeval* tv) {
return (uint64_t)tv->tv_sec * 1000ULL + (uint64_t)tv->tv_usec / 1000ULL;
}
static void drain_wakeup_pipe(struct UASYNC* ua) {
char buf[1024];
while (read(ua->wakeup_pipe[0], buf, sizeof(buf)) > 0);
}
// Process expired timeouts with safe cancellation
static void process_timeouts(struct UASYNC* ua) {
if (!ua || !ua->timeout_heap) return;
struct timeval now_tv;
get_current_time(&now_tv);
uint64_t now_ms = timeval_to_ms(&now_tv);
while (1) {
TimeoutEntry entry;
if (timeout_heap_peek(ua->timeout_heap, &entry) != 0) break;
if (entry.expiration > now_ms) break;
// Pop the expired timeout
timeout_heap_pop(ua->timeout_heap, &entry);
struct timeout_node* node = (struct timeout_node*)entry.data;
if (node && node->callback && !node->cancelled) {
// Execute callback only if not cancelled
node->callback(node->arg);
}
// Always free the node after processing
if (node && node->ua) {
node->ua->timer_free_count++;
}
free(node);
}
}
// Instance version
void* uasync_set_timeout(struct UASYNC* ua, int timeout_tb, void* arg, timeout_callback_t callback) {
if (!ua || timeout_tb < 0 || !callback) return NULL;
if (!ua->timeout_heap) return NULL;
DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_set_timeout: ua=%p, timeout=%d tb, arg=%p, callback=%p",
ua, timeout_tb, arg, callback);
struct timeout_node* node = malloc(sizeof(struct timeout_node));
if (!node) {
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "uasync_set_timeout: failed to allocate node");
return NULL;
}
ua->timer_alloc_count++;
DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_set_timeout: allocated node %p (alloc_count=%zu)",
node, ua->timer_alloc_count);
node->arg = arg;
node->callback = callback;
node->ua = ua;
node->cancelled = 0;
node->heap_index = SIZE_MAX; // Initialize
// Calculate expiration time in milliseconds
struct timeval now;
get_current_time(&now);
timeval_add_tb(&now, timeout_tb);
node->expiration_ms = timeval_to_ms(&now);
DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_set_timeout: node %p expires at %llu ms",
node, (unsigned long long)node->expiration_ms);
// Add to heap
if (timeout_heap_push(ua->timeout_heap, node->expiration_ms, node) != 0) {
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "uasync_set_timeout: failed to push to heap");
free(node);
ua->timer_free_count++; // Balance the alloc counter
return NULL;
}
DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_set_timeout: successfully created timer %p", node);
return node;
}
// Instance version
err_t uasync_cancel_timeout(struct UASYNC* ua, void* t_id) {
if (!ua || !t_id || !ua->timeout_heap) {
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "uasync_cancel_timeout: invalid parameters ua=%p, t_id=%p, heap=%p",
ua, t_id, ua ? ua->timeout_heap : NULL);
return ERR_FAIL;
}
struct timeout_node* node = (struct timeout_node*)t_id;
DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_cancel_timeout: ua=%p, t_id=%p, node=%p, expires=%llu ms",
ua, t_id, node, (unsigned long long)node->expiration_ms);
// Try to remove from heap
if (timeout_heap_remove(ua->timeout_heap, node) == 0) {
// Successfully removed - free it
node->cancelled = 1;
node->callback = NULL;
ua->timer_free_count++;
free(node);
return ERR_OK;
} else {
// Not found in heap (may be already popped) - just mark cancelled to skip callback if pending
node->cancelled = 1;
node->callback = NULL;
// Do NOT free here to avoid double-free if already processed
return ERR_OK;
}
}
void* uasync_add_socket(struct UASYNC* ua, int fd, socket_callback_t read_cbk, socket_callback_t write_cbk, socket_callback_t except_cbk, void* user_data) {
if (!ua || !ua->sockets) return NULL;
if (socket_array_add(ua->sockets, fd, read_cbk, write_cbk, except_cbk, user_data) != 0) return NULL;
ua->socket_alloc_count++;
return socket_array_get(ua->sockets, fd);
}
err_t uasync_remove_socket(struct UASYNC* ua, void* s_id) {
if (!ua || !ua->sockets || !s_id) return ERR_FAIL;
struct socket_node* node = (struct socket_node*)s_id;
if (socket_array_remove(ua->sockets, node->fd) != 0) return ERR_FAIL;
ua->socket_free_count++;
return ERR_OK;
}
void uasync_poll(struct UASYNC* ua, int timeout_tb) {
if (!ua) return;
// Get next timeout
int timeout_ms = -1;
if (ua->timeout_heap && ua->timeout_heap->size > 0) {
struct timeval now_tv;
get_current_time(&now_tv);
uint64_t now_ms = timeval_to_ms(&now_tv);
TimeoutEntry entry;
if (timeout_heap_peek(ua->timeout_heap, &entry) == 0) {
if (entry.expiration > now_ms) {
timeout_ms = (int)(entry.expiration - now_ms);
} else {
timeout_ms = 0;
}
}
}
if (timeout_tb >= 0) {
int tb_ms = timeout_tb / 10; // approx tb to ms (adjust if tb unit is different)
if (timeout_ms < 0 || tb_ms < timeout_ms) timeout_ms = tb_ms;
}
// Prepare poll fds
int wakeup_fd_present = ua->wakeup_initialized && ua->wakeup_pipe[0] >= 0;
int socket_count = ua->sockets ? ua->sockets->count : 0;
int total_fds = socket_count + wakeup_fd_present;
struct pollfd* fds = malloc(total_fds * sizeof(struct pollfd));
struct socket_node** nodes = socket_count > 0 ? malloc(socket_count * sizeof(struct socket_node*)) : NULL;
if (!fds || (socket_count > 0 && !nodes)) {
free(fds);
free(nodes);
return; /* out of memory */
}
/* Fill arrays */
int idx = 0;
/* Add wakeup fd first if present */
if (wakeup_fd_present) {
fds[idx].fd = ua->wakeup_pipe[0];
fds[idx].events = POLLIN;
fds[idx].revents = 0;
idx++;
}
/* Add socket fds using efficient array traversal */
int node_idx = 0;
for (int i = 0; i < ua->sockets->capacity && node_idx < socket_count; i++) {
if (ua->sockets->sockets[i].active) {
struct socket_node* cur = &ua->sockets->sockets[i];
fds[idx].fd = cur->fd;
fds[idx].events = 0;
fds[idx].revents = 0;
if (cur->read_cbk) fds[idx].events |= POLLIN;
if (cur->write_cbk) fds[idx].events |= POLLOUT;
if (cur->except_cbk) fds[idx].events |= POLLPRI;
if (nodes) {
nodes[node_idx] = cur;
}
idx++;
node_idx++;
}
}
/* Call poll */
int ret = poll(fds, total_fds, timeout_ms);
if (ret < 0) {
if (errno == EINTR) {
free(fds);
free(nodes);
return;
}
perror("poll");
free(fds);
free(nodes);
return;
}
/* Process timeouts that may have expired during poll */
process_timeouts(ua);
/* Process socket events */
if (ret > 0) {
for (int i = 0; i < total_fds; i++) {
if (fds[i].revents == 0) continue;
/* Handle wakeup fd separately */
if (wakeup_fd_present && i == 0) {
if (fds[i].revents & POLLIN) {
drain_wakeup_pipe(ua);
}
continue;
}
/* Socket event */
int socket_idx = i - wakeup_fd_present;
struct socket_node* node = nodes[socket_idx];
/* Check for error conditions first */
if (fds[i].revents & (POLLERR | POLLHUP | POLLNVAL)) {
/* Treat as exceptional condition */
if (node->except_cbk) {
node->except_cbk(node->fd, node->user_data);
}
}
/* Exceptional data (out-of-band) */
if (fds[i].revents & POLLPRI) {
if (node->except_cbk) {
node->except_cbk(node->fd, node->user_data);
}
}
/* Read readiness */
if (fds[i].revents & POLLIN) {
if (node->read_cbk) {
node->read_cbk(node->fd, node->user_data);
}
}
/* Write readiness */
if (fds[i].revents & POLLOUT) {
if (node->write_cbk) {
node->write_cbk(node->fd, node->user_data);
}
}
}
}
free(fds);
free(nodes);
}
void uasync_mainloop(struct UASYNC* ua) {
while (1) {
uasync_poll(ua, -1);
}
}
struct UASYNC* uasync_create(void) {
// Initialize debug system on first use
static int debug_initialized = 0;
if (!debug_initialized) {
debug_config_init();
debug_initialized = 1;
}
struct UASYNC* ua = malloc(sizeof(struct UASYNC));
if (!ua) return NULL;
memset(ua, 0, sizeof(struct UASYNC));
ua->wakeup_pipe[0] = -1;
ua->wakeup_pipe[1] = -1;
ua->wakeup_initialized = 0;
// Create wakeup pipe
if (pipe(ua->wakeup_pipe) < 0) {
DEBUG_WARN(DEBUG_CATEGORY_UASYNC, "Failed to create wakeup pipe: %s", strerror(errno));
// Continue without wakeup mechanism
ua->wakeup_pipe[0] = -1;
ua->wakeup_pipe[1] = -1;
} else {
ua->wakeup_initialized = 1;
// Set non-blocking on read end to avoid blocking if pipe is full
int flags = fcntl(ua->wakeup_pipe[0], F_GETFL, 0);
if (flags >= 0) {
fcntl(ua->wakeup_pipe[0], F_SETFL, flags | O_NONBLOCK);
}
}
ua->sockets = socket_array_create(16);
if (!ua->sockets) {
if (ua->wakeup_initialized) {
close(ua->wakeup_pipe[0]);
close(ua->wakeup_pipe[1]);
}
free(ua);
return NULL;
}
ua->timeout_heap = timeout_heap_create(16);
if (!ua->timeout_heap) {
socket_array_destroy(ua->sockets);
if (ua->wakeup_initialized) {
close(ua->wakeup_pipe[0]);
close(ua->wakeup_pipe[1]);
}
free(ua);
return NULL;
}
return ua;
}
// Print all resources for debugging
void uasync_print_resources(struct UASYNC* ua, const char* prefix) {
if (!ua) {
printf("%s: NULL uasync instance\n", prefix);
return;
}
printf("\n🔍 %s: UASYNC Resource Report for %p\n", prefix, ua);
printf(" Timer Statistics: allocated=%zu, freed=%zu, active=%zd\n",
ua->timer_alloc_count, ua->timer_free_count,
(ssize_t)(ua->timer_alloc_count - ua->timer_free_count));
printf(" Socket Statistics: allocated=%zu, freed=%zu, active=%zd\n",
ua->socket_alloc_count, ua->socket_free_count,
(ssize_t)(ua->socket_alloc_count - ua->socket_free_count));
// Show active timers (removed check for .deleted; heap entries are valid up to .size)
if (ua->timeout_heap) {
size_t active_timers = ua->timeout_heap->size;
for (size_t i = 0; i < ua->timeout_heap->size; i++) {
struct timeout_node* node = (struct timeout_node*)ua->timeout_heap->heap[i].data;
printf(" Timer: node=%p, expires=%llu ms, cancelled=%d\n",
node, (unsigned long long)ua->timeout_heap->heap[i].expiration, node->cancelled);
}
printf(" Active timers in heap: %zu\n", active_timers);
}
// Show active sockets
if (ua->sockets) {
int active_sockets = 0;
printf(" Socket array capacity: %d, active: %d\n",
ua->sockets->capacity, ua->sockets->count);
for (int i = 0; i < ua->sockets->capacity; i++) {
if (ua->sockets->sockets[i].active) {
active_sockets++;
printf(" Socket: fd=%d, active=%d\n",
ua->sockets->sockets[i].fd,
ua->sockets->sockets[i].active);
}
}
printf(" Total active sockets: %d\n", active_sockets);
}
printf("🔚 %s: End of resource report\n\n", prefix);
}
void uasync_destroy(struct UASYNC* ua, int close_fds) {
if (!ua) return;
DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_destroy: starting cleanup for ua=%p", ua);
// Free all remaining timeouts by popping and freeing data (fixes leak without double-free)
if (ua->timeout_heap) {
TimeoutEntry entry;
while (timeout_heap_pop(ua->timeout_heap, &entry) == 0) {
struct timeout_node* node = (struct timeout_node*)entry.data;
if (node) {
ua->timer_free_count++;
free(node);
}
}
timeout_heap_destroy(ua->timeout_heap);
ua->timeout_heap = NULL;
}
// Free all socket nodes using array approach
if (ua->sockets) {
// Count and free all active sockets
int freed_count = 0;
for (int i = 0; i < ua->sockets->capacity; i++) {
if (ua->sockets->sockets[i].active) {
if (close_fds && ua->sockets->sockets[i].fd >= 0) {
close(ua->sockets->sockets[i].fd);
}
ua->socket_free_count++;
freed_count++;
}
}
DEBUG_DEBUG(DEBUG_CATEGORY_MEMORY, "Freed %d socket nodes in destroy", freed_count);
socket_array_destroy(ua->sockets);
}
// Close wakeup pipe
if (ua->wakeup_initialized) {
close(ua->wakeup_pipe[0]);
close(ua->wakeup_pipe[1]);
}
// Final leak check
if (ua->timer_alloc_count != ua->timer_free_count || ua->socket_alloc_count != ua->socket_free_count) {
DEBUG_ERROR(DEBUG_CATEGORY_MEMORY, "Memory leaks detected after cleanup: timers %zu/%zu, sockets %zu/%zu",
ua->timer_alloc_count, ua->timer_free_count, ua->socket_alloc_count, ua->socket_free_count);
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "FINAL Timer leak: allocated=%zu, freed=%zu, diff=%zd",
ua->timer_alloc_count, ua->timer_free_count,
(ssize_t)(ua->timer_alloc_count - ua->timer_free_count));
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "FINAL Socket leak: allocated=%zu, freed=%zu, diff=%zd",
ua->socket_alloc_count, ua->socket_free_count,
(ssize_t)(ua->socket_alloc_count - ua->socket_free_count));
abort();
}
DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_destroy: completed successfully for ua=%p", ua);
free(ua);
}
void uasync_init_instance(struct UASYNC* ua) {
if (!ua) return;
// Initialize socket array if not present
if (!ua->sockets) {
ua->sockets = socket_array_create(16);
}
if (!ua->timeout_heap) {
ua->timeout_heap = timeout_heap_create(16);
}
}
// Debug statistics
void uasync_get_stats(struct UASYNC* ua, size_t* timer_alloc, size_t* timer_free, size_t* socket_alloc, size_t* socket_free) {
if (!ua) return;
if (timer_alloc) *timer_alloc = ua->timer_alloc_count;
if (timer_free) *timer_free = ua->timer_free_count;
if (socket_alloc) *socket_alloc = ua->socket_alloc_count;
if (socket_free) *socket_free = ua->socket_free_count;
}
// Wakeup mechanism
int uasync_wakeup(struct UASYNC* ua) {
if (!ua || !ua->wakeup_initialized) return -1;
char byte = 0;
ssize_t ret = write(ua->wakeup_pipe[1], &byte, 1);
if (ret != 1) {
// Don't print error from signal handler
return -1;
}
return 0;
}
int uasync_get_wakeup_fd(struct UASYNC* ua) {
if (!ua || !ua->wakeup_initialized) return -1;
return ua->wakeup_pipe[1];
}
static struct socket_array* socket_array_create(int initial_capacity) {
struct socket_array* sa = malloc(sizeof(struct socket_array));
if (!sa) return NULL;
sa->sockets = malloc(initial_capacity * sizeof(struct socket_node));
if (!sa->sockets) {
free(sa);
return NULL;
}
sa->fd_to_index = malloc(FD_SETSIZE * sizeof(int)); // Assume FD_SETSIZE defined
if (!sa->fd_to_index) {
free(sa->sockets);
free(sa);
return NULL;
}
memset(sa->fd_to_index, -1, FD_SETSIZE * sizeof(int));
sa->index_to_fd = malloc(initial_capacity * sizeof(int));
if (!sa->index_to_fd) {
free(sa->fd_to_index);
free(sa->sockets);
free(sa);
return NULL;
}
sa->capacity = initial_capacity;
sa->count = 0;
sa->max_fd = 0;
return sa;
}
static void socket_array_destroy(struct socket_array* sa) {
if (sa) {
free(sa->sockets);
free(sa->fd_to_index);
free(sa->index_to_fd);
free(sa);
}
}
static int socket_array_add(struct socket_array* sa, int fd, socket_callback_t read_cbk, socket_callback_t write_cbk, socket_callback_t except_cbk, void* user_data) {
if (fd < 0 || fd >= FD_SETSIZE) return -1;
if (sa->fd_to_index[fd] != -1) return -1; // Already exists
if (sa->count == sa->capacity) {
int new_cap = sa->capacity * 2;
struct socket_node* new_sockets = realloc(sa->sockets, new_cap * sizeof(struct socket_node));
if (!new_sockets) return -1;
sa->sockets = new_sockets;
int* new_index_to_fd = realloc(sa->index_to_fd, new_cap * sizeof(int));
if (!new_index_to_fd) return -1;
sa->index_to_fd = new_index_to_fd;
sa->capacity = new_cap;
}
int index = sa->count++;
sa->sockets[index].fd = fd;
sa->sockets[index].read_cbk = read_cbk;
sa->sockets[index].write_cbk = write_cbk;
sa->sockets[index].except_cbk = except_cbk;
sa->sockets[index].user_data = user_data;
sa->sockets[index].active = 1;
sa->fd_to_index[fd] = index;
sa->index_to_fd[index] = fd;
if (fd > sa->max_fd) sa->max_fd = fd;
return 0;
}
static int socket_array_remove(struct socket_array* sa, int fd) {
if (fd < 0 || fd >= FD_SETSIZE) return -1;
int index = sa->fd_to_index[fd];
if (index == -1) return -1;
sa->sockets[index].active = 0;
sa->fd_to_index[fd] = -1;
// Move last to hole
int last_index = --sa->count;
if (index != last_index) {
sa->sockets[index] = sa->sockets[last_index];
int last_fd = sa->index_to_fd[last_index];
sa->fd_to_index[last_fd] = index;
sa->index_to_fd[index] = last_fd;
}
return 0;
}
static struct socket_node* socket_array_get(struct socket_array* sa, int fd) {
if (fd < 0 || fd >= FD_SETSIZE) return NULL;
int index = sa->fd_to_index[fd];
if (index == -1) return NULL;
return &sa->sockets[index];
}