You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

433 lines
12 KiB

// uasync.c
#include "u_async.h"
#include "timeout_heap.h"
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#ifndef FD_SETSIZE
#define FD_SETSIZE 1024 // Assume standard size; adjust if needed for your platform
#endif
// Timeout node
struct timeout_node {
void* arg;
timeout_callback_t callback;
uint64_t expiration_ms; // absolute expiration time in milliseconds
uasync_t* ua; // Pointer back to uasync instance for counter updates
};
// Socket node
struct socket_node {
int fd;
socket_callback_t read_cbk;
socket_callback_t write_cbk;
socket_callback_t except_cbk;
void* user_data;
struct socket_node* next;
};
// Uasync instance structure
struct uasync_s {
TimeoutHeap* timeout_heap; // Heap for timeout management
struct socket_node* socket_head;
int max_fd;
fd_set master_readfds;
fd_set master_writefds;
fd_set master_exceptfds;
struct socket_node* fd_to_node[FD_SETSIZE];
// Debug counters for memory allocation tracking
size_t timer_alloc_count;
size_t timer_free_count;
size_t socket_alloc_count;
size_t socket_free_count;
};
// No global instance - each module must use its own uasync_t instance
// Callback to free timeout node and update counters
static void timeout_node_free_callback(void* user_data, void* data) {
uasync_t* ua = (uasync_t*)user_data;
struct timeout_node* node = (struct timeout_node*)data;
(void)node; // Not used directly, but keep for consistency
ua->timer_free_count++;
free(data);
}
// Helper to get current time
static void get_current_time(struct timeval* tv) {
gettimeofday(tv, NULL);
}
// Helper to add timeval: tv += dt (timebase units)
static void timeval_add_tb(struct timeval* tv, int dt) {
tv->tv_usec += (dt % 10000) * 100;
tv->tv_sec += dt / 10000 + tv->tv_usec / 1000000;
tv->tv_usec %= 1000000;
}
// Convert timeval to milliseconds (uint64_t)
static uint64_t timeval_to_ms(const struct timeval* tv) {
return (uint64_t)tv->tv_sec * 1000ULL + (uint64_t)tv->tv_usec / 1000ULL;
}
// Process expired timeouts
static void process_timeouts(struct uasync_s* ua) {
if (!ua || !ua->timeout_heap) return;
struct timeval now_tv;
get_current_time(&now_tv);
uint64_t now_ms = timeval_to_ms(&now_tv);
while (1) {
TimeoutEntry entry;
if (timeout_heap_peek(ua->timeout_heap, &entry) != 0) break;
if (entry.expiration > now_ms) break;
// Pop the expired timeout
timeout_heap_pop(ua->timeout_heap, &entry);
struct timeout_node* node = (struct timeout_node*)entry.data;
if (node && node->callback) {
node->callback(node->arg);
}
ua->timer_free_count++;
free(node);
}
}
// Compute time to next timeout
static void get_next_timeout(struct uasync_s* ua, struct timeval* tv) {
if (!ua || !ua->timeout_heap) {
tv->tv_sec = 0;
tv->tv_usec = 0;
return;
}
TimeoutEntry entry;
if (timeout_heap_peek(ua->timeout_heap, &entry) != 0) {
tv->tv_sec = 0;
tv->tv_usec = 0;
return;
}
struct timeval now_tv;
get_current_time(&now_tv);
uint64_t now_ms = timeval_to_ms(&now_tv);
if (entry.expiration <= now_ms) {
tv->tv_sec = 0;
tv->tv_usec = 0;
return;
}
uint64_t delta_ms = entry.expiration - now_ms;
if (delta_ms > 86400000) { // Cap at 1 day to avoid overflow
delta_ms = 86400000;
}
tv->tv_sec = delta_ms / 1000;
tv->tv_usec = (delta_ms % 1000) * 1000;
}
// Instance version
void* uasync_set_timeout(uasync_t* ua, int timeout_tb, void* arg, timeout_callback_t callback) {
if (!ua || timeout_tb < 0 || !callback) return NULL;
if (!ua->timeout_heap) return NULL;
struct timeout_node* node = malloc(sizeof(struct timeout_node));
if (!node) return NULL;
ua->timer_alloc_count++;
node->arg = arg;
node->callback = callback;
node->ua = ua;
// Calculate expiration time in milliseconds
struct timeval now;
get_current_time(&now);
timeval_add_tb(&now, timeout_tb);
node->expiration_ms = timeval_to_ms(&now);
// Insert into heap
if (timeout_heap_push(ua->timeout_heap, node->expiration_ms, node) != 0) {
ua->timer_free_count++;
free(node);
return NULL;
}
return node;
}
// Instance version
err_t uasync_cancel_timeout(uasync_t* ua, void* t_id) {
if (!ua || !t_id || !ua->timeout_heap) return ERR_FAIL;
struct timeout_node* node = (struct timeout_node*)t_id;
// Try to cancel from heap
if (timeout_heap_cancel(ua->timeout_heap, node->expiration_ms, node) == 0) {
// Mark as cancelled by clearing callback - memory will be freed later
node->callback = NULL;
return ERR_OK;
}
// If not found in heap (maybe already expired and removed), do NOT free
// because node was already freed in process_timeouts
return ERR_FAIL;
}
// Instance version
void* uasync_add_socket(uasync_t* ua, int fd, socket_callback_t read_cbk, socket_callback_t write_cbk, socket_callback_t except_cbk, void* user_data) {
if (!ua || fd < 0 || fd >= FD_SETSIZE) return NULL; // Add bounds check for map
struct socket_node* node = malloc(sizeof(struct socket_node));
if (!node) return NULL;
ua->socket_alloc_count++;
node->fd = fd;
node->read_cbk = read_cbk;
node->write_cbk = write_cbk;
node->except_cbk = except_cbk;
node->user_data = user_data;
node->next = ua->socket_head;
ua->socket_head = node;
// Update masters (point 1)
if (read_cbk) FD_SET(fd, &ua->master_readfds);
if (write_cbk) FD_SET(fd, &ua->master_writefds);
if (except_cbk) FD_SET(fd, &ua->master_exceptfds);
// Update map (point 2)
ua->fd_to_node[fd] = node;
if (fd > ua->max_fd) ua->max_fd = fd;
return node;
}
// Instance version
err_t uasync_remove_socket(uasync_t* ua, void* s_id) {
if (!ua || !s_id) return ERR_FAIL;
struct socket_node* node = (struct socket_node*)s_id;
struct socket_node* cur = ua->socket_head;
struct socket_node* prev = NULL;
while (cur) {
if (cur == node) {
if (prev) {
prev->next = cur->next;
} else {
ua->socket_head = cur->next;
}
// Update masters (point 1)
if (node->read_cbk) FD_CLR(node->fd, &ua->master_readfds);
if (node->write_cbk) FD_CLR(node->fd, &ua->master_writefds);
if (node->except_cbk) FD_CLR(node->fd, &ua->master_exceptfds);
// Update map (point 2)
ua->fd_to_node[node->fd] = NULL;
ua->socket_free_count++;
free(cur);
// Update max_fd (simple rescan; optimize if needed by checking if removed == max_fd)
ua->max_fd = -1;
cur = ua->socket_head;
while (cur) {
if (cur->fd > ua->max_fd) ua->max_fd = cur->fd;
cur = cur->next;
}
return ERR_OK;
}
prev = cur;
cur = cur->next;
}
return ERR_FAIL;
}
void uasync_mainloop(uasync_t* ua) {
while (1) {
uasync_poll(ua, -1); /* infinite timeout */
}
}
// Instance version
void uasync_poll(uasync_t* ua, int timeout_tb) {
if (!ua) return;
/* Process expired timeouts */
process_timeouts(ua);
/* Prepare select with copies of masters */
fd_set readfds = ua->master_readfds;
fd_set writefds = ua->master_writefds;
fd_set exceptfds = ua->master_exceptfds;
struct timeval tv;
get_next_timeout(ua, &tv);
/* If timeout_tb >= 0, compute timeout as min(timeout_tb, existing timer) */
if (timeout_tb >= 0) {
struct timeval user_tv;
user_tv.tv_sec = timeout_tb / 10000;
user_tv.tv_usec = (timeout_tb % 10000) * 100;
/* If no internal timer or user timeout is smaller */
if (tv.tv_sec == 0 && tv.tv_usec == 0 && (!ua->timeout_heap || ua->timeout_heap->size == 0)) {
tv = user_tv;
} else if (user_tv.tv_sec < tv.tv_sec ||
(user_tv.tv_sec == tv.tv_sec && user_tv.tv_usec < tv.tv_usec)) {
tv = user_tv;
}
}
struct timeval* ptv = (tv.tv_sec == 0 && tv.tv_usec == 0 && (!ua->timeout_heap || ua->timeout_heap->size == 0)) ? NULL : &tv;
int nfds = select(ua->max_fd + 1, &readfds, &writefds, &exceptfds, ptv);
if (nfds < 0) {
if (errno == EINTR) return;
perror("select");
return;
}
/* Process timeouts that may have expired during select */
process_timeouts(ua);
/* Process sockets with faster dispatch */
for (int fd = 0; nfds > 0 && fd <= ua->max_fd; fd++) {
struct socket_node* node = ua->fd_to_node[fd];
if (!node) continue;
if (node->except_cbk && FD_ISSET(fd, &exceptfds)) {
node->except_cbk(fd, node->user_data);
nfds--;
}
if (node->read_cbk && FD_ISSET(fd, &readfds)) {
node->read_cbk(fd, node->user_data);
nfds--;
}
if (node->write_cbk && FD_ISSET(fd, &writefds)) {
node->write_cbk(fd, node->user_data);
nfds--;
}
}
}
// ========== Instance management functions ==========
uasync_t* uasync_create(void) {
uasync_t* ua = malloc(sizeof(struct uasync_s));
if (!ua) return NULL;
memset(ua, 0, sizeof(struct uasync_s));
ua->max_fd = -1;
FD_ZERO(&ua->master_readfds);
FD_ZERO(&ua->master_writefds);
FD_ZERO(&ua->master_exceptfds);
memset(ua->fd_to_node, 0, sizeof(ua->fd_to_node));
ua->timeout_heap = timeout_heap_create(16);
if (!ua->timeout_heap) {
free(ua);
return NULL;
}
// Set callback to free timeout nodes and update counters
timeout_heap_set_free_callback(ua->timeout_heap, ua, timeout_node_free_callback);
return ua;
}
void uasync_destroy(uasync_t* ua) {
if (!ua) return;
// Check for potential memory leaks
if (ua->timer_alloc_count != ua->timer_free_count || ua->socket_alloc_count != ua->socket_free_count) {
fprintf(stderr, "[UASYNC FATAL] Memory leaks detected before cleanup: timers %zu/%zu, sockets %zu/%zu\n",
ua->timer_alloc_count, ua->timer_free_count, ua->socket_alloc_count, ua->socket_free_count);
// Continue cleanup, will abort after if leaks remain
}
// Free all remaining timeouts
if (ua->timeout_heap) {
size_t freed_count = 0;
while (1) {
TimeoutEntry entry;
if (timeout_heap_pop(ua->timeout_heap, &entry) != 0) break;
struct timeout_node* node = (struct timeout_node*)entry.data;
ua->timer_free_count++;
freed_count++;
free(node);
}
printf("[UASYNC_DEBUG] Freed %zu timer nodes in destroy, heap freed_count = %zu\n",
freed_count, ua->timeout_heap->freed_count);
timeout_heap_destroy(ua->timeout_heap);
}
// Free all socket nodes
struct socket_node* cur = ua->socket_head;
while (cur) {
struct socket_node* next = cur->next;
ua->socket_free_count++;
free(cur);
cur = next;
}
// Final leak check
if (ua->timer_alloc_count != ua->timer_free_count || ua->socket_alloc_count != ua->socket_free_count) {
fprintf(stderr, "[UASYNC FATAL] Memory leaks detected after cleanup: timers %zu/%zu, sockets %zu/%zu\n",
ua->timer_alloc_count, ua->timer_free_count, ua->socket_alloc_count, ua->socket_free_count);
abort();
}
free(ua);
}
void uasync_init_instance(uasync_t* ua) {
if (!ua) return;
ua->max_fd = -1;
FD_ZERO(&ua->master_readfds);
FD_ZERO(&ua->master_writefds);
FD_ZERO(&ua->master_exceptfds);
memset(ua->fd_to_node, 0, sizeof(ua->fd_to_node));
if (!ua->timeout_heap) {
ua->timeout_heap = timeout_heap_create(16);
if (ua->timeout_heap) {
timeout_heap_set_free_callback(ua->timeout_heap, ua, timeout_node_free_callback);
}
}
}
// Debug statistics
void uasync_get_stats(uasync_t* ua, size_t* timer_alloc, size_t* timer_free, size_t* socket_alloc, size_t* socket_free) {
if (!ua) return;
if (timer_alloc) *timer_alloc = ua->timer_alloc_count;
if (timer_free) *timer_free = ua->timer_free_count;
if (socket_alloc) *socket_alloc = ua->socket_alloc_count;
if (socket_free) *socket_free = ua->socket_free_count;
}
// Get global instance for backward compatibility