Browse Source
✅ Fixed critical segmentation fault (SIGABRT) in test_etcp_two_instances ✅ Added comprehensive timer debug logging with DEBUG_CATEGORY_TIMERS ✅ Created uasync_print_resources() function for resource diagnostics ✅ Created utun_instance_diagnose_leaks() function for leak analysis ✅ Fixed cleanup order - cancel timers before destroying uasync instances ✅ Fixed timer cancellation to properly update counters ✅ Fixed socket cleanup to unregister from uasync before destruction ✅ Added detailed diagnostic output for memory leak tracking ✅ All tests now pass without crashes Key fixes: - Fixed use-after-free in test cleanup sequence - Added proper timer leak detection and cleanup - Enhanced debug capabilities for future debugging - Fixed ETCP socket cleanup to prevent resource leaks The test_etcp_two_instances now runs successfully without segmentation faults.nodeinfo-routing-update
15 changed files with 1727 additions and 92 deletions
@ -0,0 +1,834 @@
|
||||
// uasync.c |
||||
|
||||
#include "u_async.h" |
||||
#include "debug_config.h" |
||||
#include <stdio.h> |
||||
#include <string.h> |
||||
#include <stdlib.h> |
||||
#include <unistd.h> |
||||
#include <errno.h> |
||||
#include <poll.h> |
||||
#include <limits.h> |
||||
#include <fcntl.h> |
||||
|
||||
|
||||
|
||||
// Timeout node with safe cancellation |
||||
struct timeout_node { |
||||
void* arg; |
||||
timeout_callback_t callback; |
||||
uint64_t expiration_ms; // absolute expiration time in milliseconds |
||||
struct UASYNC* ua; // Pointer back to uasync instance for counter updates |
||||
int cancelled; // Cancellation flag |
||||
}; |
||||
|
||||
// Socket node with array-based storage |
||||
struct socket_node { |
||||
int fd; |
||||
socket_callback_t read_cbk; |
||||
socket_callback_t write_cbk; |
||||
socket_callback_t except_cbk; |
||||
void* user_data; |
||||
int active; // 1 if socket is active, 0 if freed (for reuse) |
||||
}; |
||||
|
||||
// Array-based socket management for O(1) operations |
||||
struct socket_array { |
||||
struct socket_node* sockets; // Dynamic array of socket nodes |
||||
int* fd_to_index; // FD to array index mapping |
||||
int* index_to_fd; // Array index to FD mapping |
||||
int capacity; // Total allocated capacity |
||||
int count; // Number of active sockets |
||||
int max_fd; // Maximum FD for bounds checking |
||||
}; |
||||
|
||||
static struct socket_array* socket_array_create(int initial_capacity); |
||||
static void socket_array_destroy(struct socket_array* sa); |
||||
static int socket_array_add(struct socket_array* sa, int fd, socket_callback_t read_cbk, socket_callback_t write_cbk, socket_callback_t except_cbk, void* user_data); |
||||
static int socket_array_remove(struct socket_array* sa, int fd); |
||||
static struct socket_node* socket_array_get(struct socket_array* sa, int fd); |
||||
static int socket_array_build_pollfd(struct socket_array* sa, struct pollfd* fds, int max_fds); |
||||
|
||||
// No global instance - each module must use its own struct UASYNC instance |
||||
|
||||
// Array-based socket management implementation |
||||
static struct socket_array* socket_array_create(int initial_capacity) { |
||||
if (initial_capacity < 4) initial_capacity = 4; // Minimum capacity |
||||
|
||||
struct socket_array* sa = malloc(sizeof(struct socket_array)); |
||||
if (!sa) return NULL; |
||||
|
||||
sa->sockets = calloc(initial_capacity, sizeof(struct socket_node)); |
||||
sa->fd_to_index = calloc(initial_capacity, sizeof(int)); |
||||
sa->index_to_fd = calloc(initial_capacity, sizeof(int)); |
||||
|
||||
if (!sa->sockets || !sa->fd_to_index || !sa->index_to_fd) { |
||||
free(sa->sockets); |
||||
free(sa->fd_to_index); |
||||
free(sa->index_to_fd); |
||||
free(sa); |
||||
return NULL; |
||||
} |
||||
|
||||
// Initialize mapping arrays to -1 (invalid) |
||||
for (int i = 0; i < initial_capacity; i++) { |
||||
sa->fd_to_index[i] = -1; |
||||
sa->index_to_fd[i] = -1; |
||||
sa->sockets[i].fd = -1; |
||||
sa->sockets[i].active = 0; |
||||
} |
||||
|
||||
sa->capacity = initial_capacity; |
||||
sa->count = 0; |
||||
sa->max_fd = -1; |
||||
|
||||
return sa; |
||||
} |
||||
|
||||
static void socket_array_destroy(struct socket_array* sa) { |
||||
if (!sa) return; |
||||
|
||||
free(sa->sockets); |
||||
free(sa->fd_to_index); |
||||
free(sa->index_to_fd); |
||||
free(sa); |
||||
} |
||||
|
||||
static int socket_array_add(struct socket_array* sa, int fd, socket_callback_t read_cbk, socket_callback_t write_cbk, socket_callback_t except_cbk, void* user_data) { |
||||
if (!sa || fd < 0 || fd >= FD_SETSIZE) return -1; |
||||
if (fd >= sa->capacity) { |
||||
// Need to resize - double the capacity |
||||
int new_capacity = sa->capacity * 2; |
||||
if (fd >= new_capacity) new_capacity = fd + 16; // Ensure enough space |
||||
|
||||
struct socket_node* new_sockets = realloc(sa->sockets, new_capacity * sizeof(struct socket_node)); |
||||
int* new_fd_to_index = realloc(sa->fd_to_index, new_capacity * sizeof(int)); |
||||
int* new_index_to_fd = realloc(sa->index_to_fd, new_capacity * sizeof(int)); |
||||
|
||||
if (!new_sockets || !new_fd_to_index || !new_index_to_fd) { |
||||
// Allocation failed |
||||
free(new_sockets); |
||||
free(new_fd_to_index); |
||||
free(new_index_to_fd); |
||||
return -1; |
||||
} |
||||
|
||||
// Initialize new elements |
||||
for (int i = sa->capacity; i < new_capacity; i++) { |
||||
new_fd_to_index[i] = -1; |
||||
new_index_to_fd[i] = -1; |
||||
new_sockets[i].fd = -1; |
||||
new_sockets[i].active = 0; |
||||
} |
||||
|
||||
sa->sockets = new_sockets; |
||||
sa->fd_to_index = new_fd_to_index; |
||||
sa->index_to_fd = new_index_to_fd; |
||||
sa->capacity = new_capacity; |
||||
} |
||||
|
||||
// Check if FD already exists |
||||
if (sa->fd_to_index[fd] != -1) return -1; // FD already exists |
||||
|
||||
// Find first free slot |
||||
int index = -1; |
||||
for (int i = 0; i < sa->capacity; i++) { |
||||
if (!sa->sockets[i].active) { |
||||
index = i; |
||||
break; |
||||
} |
||||
} |
||||
|
||||
if (index == -1) return -1; // No free slots (shouldn't happen) |
||||
|
||||
// Add the socket |
||||
sa->sockets[index].fd = fd; |
||||
sa->sockets[index].read_cbk = read_cbk; |
||||
sa->sockets[index].write_cbk = write_cbk; |
||||
sa->sockets[index].except_cbk = except_cbk; |
||||
sa->sockets[index].user_data = user_data; |
||||
sa->sockets[index].active = 1; |
||||
|
||||
sa->fd_to_index[fd] = index; |
||||
sa->index_to_fd[index] = fd; |
||||
sa->count++; |
||||
|
||||
if (fd > sa->max_fd) sa->max_fd = fd; |
||||
|
||||
return index; |
||||
} |
||||
|
||||
static int socket_array_remove(struct socket_array* sa, int fd) { |
||||
if (!sa || fd < 0 || fd >= sa->capacity) return -1; |
||||
|
||||
int index = sa->fd_to_index[fd]; |
||||
if (index == -1 || !sa->sockets[index].active) return -1; // FD not found |
||||
|
||||
// Mark as inactive |
||||
sa->sockets[index].active = 0; |
||||
sa->sockets[index].fd = -1; |
||||
sa->fd_to_index[fd] = -1; |
||||
sa->index_to_fd[index] = -1; |
||||
sa->count--; |
||||
|
||||
return 0; |
||||
} |
||||
|
||||
static struct socket_node* socket_array_get(struct socket_array* sa, int fd) { |
||||
if (!sa || fd < 0 || fd >= sa->capacity) return NULL; |
||||
|
||||
int index = sa->fd_to_index[fd]; |
||||
if (index == -1 || !sa->sockets[index].active) return NULL; |
||||
|
||||
return &sa->sockets[index]; |
||||
} |
||||
|
||||
static int socket_array_build_pollfd(struct socket_array* sa, struct pollfd* fds, int max_fds) { |
||||
if (!sa || !fds || max_fds <= 0) return 0; |
||||
|
||||
int count = 0; |
||||
for (int i = 0; i < sa->capacity && count < max_fds; i++) { |
||||
if (sa->sockets[i].active) { |
||||
fds[count].fd = sa->sockets[i].fd; |
||||
fds[count].events = 0; |
||||
if (sa->sockets[i].read_cbk) fds[count].events |= POLLIN; |
||||
if (sa->sockets[i].write_cbk) fds[count].events |= POLLOUT; |
||||
if (sa->sockets[i].except_cbk) fds[count].events |= POLLERR; |
||||
fds[count].revents = 0; |
||||
count++; |
||||
} |
||||
} |
||||
|
||||
return count; |
||||
} |
||||
|
||||
// Callback to free timeout node and update counters |
||||
static void timeout_node_free_callback(void* user_data, void* data) { |
||||
struct UASYNC* ua = (struct UASYNC*)user_data; |
||||
struct timeout_node* node = (struct timeout_node*)data; |
||||
(void)node; // Not used directly, but keep for consistency |
||||
ua->timer_free_count++; |
||||
free(data); |
||||
} |
||||
|
||||
// Helper to get current time |
||||
static void get_current_time(struct timeval* tv) { |
||||
gettimeofday(tv, NULL); |
||||
} |
||||
|
||||
|
||||
|
||||
// Drain wakeup pipe - read all available bytes |
||||
static void drain_wakeup_pipe(struct UASYNC* ua) { |
||||
if (!ua || !ua->wakeup_initialized) return; |
||||
|
||||
char buf[64]; |
||||
while (1) { |
||||
ssize_t n = read(ua->wakeup_pipe[0], buf, sizeof(buf)); |
||||
if (n <= 0) break; |
||||
} |
||||
} |
||||
|
||||
// Helper to add timeval: tv += dt (timebase units) |
||||
static void timeval_add_tb(struct timeval* tv, int dt) { |
||||
tv->tv_usec += (dt % 10000) * 100; |
||||
tv->tv_sec += dt / 10000 + tv->tv_usec / 1000000; |
||||
tv->tv_usec %= 1000000; |
||||
} |
||||
|
||||
// Convert timeval to milliseconds (uint64_t) |
||||
static uint64_t timeval_to_ms(const struct timeval* tv) { |
||||
return (uint64_t)tv->tv_sec * 1000ULL + (uint64_t)tv->tv_usec / 1000ULL; |
||||
} |
||||
|
||||
|
||||
|
||||
// Simplified timeout handling without reference counting |
||||
|
||||
// Process expired timeouts with safe cancellation |
||||
static void process_timeouts(struct UASYNC* ua) { |
||||
if (!ua || !ua->timeout_heap) return; |
||||
|
||||
struct timeval now_tv; |
||||
get_current_time(&now_tv); |
||||
uint64_t now_ms = timeval_to_ms(&now_tv); |
||||
|
||||
while (1) { |
||||
TimeoutEntry entry; |
||||
if (timeout_heap_peek(ua->timeout_heap, &entry) != 0) break; |
||||
if (entry.expiration > now_ms) break; |
||||
|
||||
// Pop the expired timeout |
||||
timeout_heap_pop(ua->timeout_heap, &entry); |
||||
struct timeout_node* node = (struct timeout_node*)entry.data; |
||||
|
||||
if (node && node->callback && !node->cancelled) { |
||||
// Execute callback only if not cancelled |
||||
node->callback(node->arg); |
||||
} |
||||
|
||||
// Always free the node after processing |
||||
if (node && node->ua) { |
||||
node->ua->timer_free_count++; |
||||
} |
||||
free(node); |
||||
} |
||||
} |
||||
|
||||
// Compute time to next timeout |
||||
static void get_next_timeout(struct UASYNC* ua, struct timeval* tv) { |
||||
if (!ua || !ua->timeout_heap) { |
||||
tv->tv_sec = 0; |
||||
tv->tv_usec = 0; |
||||
return; |
||||
} |
||||
|
||||
TimeoutEntry entry; |
||||
if (timeout_heap_peek(ua->timeout_heap, &entry) != 0) { |
||||
tv->tv_sec = 0; |
||||
tv->tv_usec = 0; |
||||
return; |
||||
} |
||||
|
||||
struct timeval now_tv; |
||||
get_current_time(&now_tv); |
||||
uint64_t now_ms = timeval_to_ms(&now_tv); |
||||
|
||||
if (entry.expiration <= now_ms) { |
||||
tv->tv_sec = 0; |
||||
tv->tv_usec = 0; |
||||
return; |
||||
} |
||||
|
||||
uint64_t delta_ms = entry.expiration - now_ms; |
||||
if (delta_ms > 86400000) { // Cap at 1 day to avoid overflow |
||||
delta_ms = 86400000; |
||||
} |
||||
tv->tv_sec = delta_ms / 1000; |
||||
tv->tv_usec = (delta_ms % 1000) * 1000; |
||||
} |
||||
|
||||
|
||||
|
||||
// Instance version |
||||
void* uasync_set_timeout(struct UASYNC* ua, int timeout_tb, void* arg, timeout_callback_t callback) { |
||||
if (!ua || timeout_tb < 0 || !callback) return NULL; |
||||
if (!ua->timeout_heap) return NULL; |
||||
|
||||
DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_set_timeout: ua=%p, timeout=%d tb, arg=%p, callback=%p", |
||||
ua, timeout_tb, arg, callback); |
||||
|
||||
struct timeout_node* node = malloc(sizeof(struct timeout_node)); |
||||
if (!node) { |
||||
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "uasync_set_timeout: failed to allocate node"); |
||||
return NULL; |
||||
} |
||||
ua->timer_alloc_count++; |
||||
|
||||
DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_set_timeout: allocated node %p (alloc_count=%zu)", |
||||
node, ua->timer_alloc_count); |
||||
|
||||
node->arg = arg; |
||||
node->callback = callback; |
||||
node->ua = ua; |
||||
node->cancelled = 0; |
||||
|
||||
// Calculate expiration time in milliseconds |
||||
struct timeval now; |
||||
get_current_time(&now); |
||||
timeval_add_tb(&now, timeout_tb); |
||||
node->expiration_ms = timeval_to_ms(&now); |
||||
|
||||
DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_set_timeout: node %p expires at %llu ms", |
||||
node, (unsigned long long)node->expiration_ms); |
||||
|
||||
// Add to heap |
||||
if (timeout_heap_push(ua->timeout_heap, node->expiration_ms, node) != 0) { |
||||
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "uasync_set_timeout: failed to push to heap"); |
||||
free(node); |
||||
ua->timer_free_count++; // Balance the alloc counter |
||||
return NULL; |
||||
} |
||||
|
||||
DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_set_timeout: successfully created timer %p", node); |
||||
return node; |
||||
} |
||||
|
||||
|
||||
|
||||
// Instance version |
||||
err_t uasync_cancel_timeout(struct UASYNC* ua, void* t_id) { |
||||
if (!ua || !t_id || !ua->timeout_heap) { |
||||
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "uasync_cancel_timeout: invalid parameters ua=%p, t_id=%p, heap=%p", |
||||
ua, t_id, ua ? ua->timeout_heap : NULL); |
||||
return ERR_FAIL; |
||||
} |
||||
|
||||
struct timeout_node* node = (struct timeout_node*)t_id; |
||||
|
||||
DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_cancel_timeout: ua=%p, t_id=%p, node=%p, expires=%llu ms", |
||||
ua, t_id, node, (unsigned long long)node->expiration_ms); |
||||
|
||||
// Try to cancel from heap first |
||||
if (timeout_heap_cancel(ua->timeout_heap, node->expiration_ms, node) == 0) { |
||||
// Successfully removed from heap - mark as cancelled |
||||
node->cancelled = 1; |
||||
node->callback = NULL; |
||||
DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_cancel_timeout: successfully cancelled timer %p from heap", node); |
||||
return ERR_OK; |
||||
|
||||
// If not found in heap (maybe already expired and being processed) |
||||
// We still need to mark it as cancelled to prevent callback execution |
||||
DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_cancel_timeout: timer %p not found in heap, marking as cancelled", node); |
||||
node->cancelled = 1; |
||||
node->callback = NULL; |
||||
return ERR_OK; |
||||
} // Successfully cancelled (marked) |
||||
} |
||||
|
||||
|
||||
|
||||
// Instance version |
||||
void* uasync_add_socket(struct UASYNC* ua, int fd, socket_callback_t read_cbk, socket_callback_t write_cbk, socket_callback_t except_cbk, void* user_data) { |
||||
if (!ua || fd < 0 || fd >= FD_SETSIZE) return NULL; // Bounds check |
||||
|
||||
int index = socket_array_add(ua->sockets, fd, read_cbk, write_cbk, except_cbk, user_data); |
||||
if (index < 0) return NULL; |
||||
|
||||
ua->socket_alloc_count++; |
||||
|
||||
// Return pointer to the socket node (same as before for API compatibility) |
||||
return &ua->sockets->sockets[index]; |
||||
} |
||||
|
||||
|
||||
|
||||
// Instance version |
||||
err_t uasync_remove_socket(struct UASYNC* ua, void* s_id) { |
||||
if (!ua || !s_id) return ERR_FAIL; |
||||
|
||||
struct socket_node* node = (struct socket_node*)s_id; |
||||
if (node->fd < 0) return ERR_FAIL; // Invalid node |
||||
|
||||
int result = socket_array_remove(ua->sockets, node->fd); |
||||
if (result != 0) return ERR_FAIL; |
||||
|
||||
ua->socket_free_count++; |
||||
return ERR_OK; |
||||
} |
||||
|
||||
|
||||
|
||||
void uasync_mainloop(struct UASYNC* ua) { |
||||
while (1) { |
||||
uasync_poll(ua, -1); /* infinite timeout */ |
||||
} |
||||
} |
||||
|
||||
// Instance version |
||||
void uasync_poll(struct UASYNC* ua, int timeout_tb) { |
||||
if (!ua) return; |
||||
|
||||
/* Process expired timeouts */ |
||||
process_timeouts(ua); |
||||
|
||||
/* Compute timeout for poll in milliseconds */ |
||||
int timeout_ms = -1; // infinite by default |
||||
|
||||
// Get next timeout from heap |
||||
struct timeval tv; |
||||
get_next_timeout(ua, &tv); |
||||
|
||||
if (tv.tv_sec > 0 || tv.tv_usec > 0 || (ua->timeout_heap && ua->timeout_heap->size > 0)) { |
||||
// Convert timeval to milliseconds, cap at INT_MAX |
||||
uint64_t ms = (uint64_t)tv.tv_sec * 1000ULL + (uint64_t)tv.tv_usec / 1000ULL; |
||||
if (ms > INT_MAX) ms = INT_MAX; |
||||
timeout_ms = (int)ms; |
||||
} |
||||
|
||||
/* If timeout_tb >= 0, compute timeout as min(timeout_tb, existing timer) */ |
||||
if (timeout_tb >= 0) { |
||||
// Convert timebase (0.1 ms) to milliseconds |
||||
int user_timeout_ms = timeout_tb / 10; |
||||
if (timeout_tb % 10 != 0) user_timeout_ms++; // round up |
||||
|
||||
if (timeout_ms < 0 || user_timeout_ms < timeout_ms) { |
||||
timeout_ms = user_timeout_ms; |
||||
} |
||||
} |
||||
|
||||
/* Build pollfd array from socket array - O(1) per socket */ |
||||
int socket_count = ua->sockets ? ua->sockets->count : 0; |
||||
int wakeup_fd_present = ua->wakeup_initialized ? 1 : 0; |
||||
int total_fds = socket_count + wakeup_fd_present; |
||||
|
||||
if (total_fds == 0) { |
||||
/* No sockets and no wakeup fd, just wait for timeout */ |
||||
if (timeout_ms >= 0) { |
||||
/* usleep would be better but we just call poll with empty set */ |
||||
struct pollfd dummy; |
||||
poll(&dummy, 0, timeout_ms); |
||||
} else { |
||||
/* Infinite timeout with no sockets - should not happen in practice */ |
||||
return; |
||||
} |
||||
/* Check timeouts again after sleep */ |
||||
process_timeouts(ua); |
||||
return; |
||||
} |
||||
|
||||
struct pollfd* fds = malloc(total_fds * sizeof(struct pollfd)); |
||||
struct socket_node** nodes = NULL; |
||||
if (socket_count > 0) { |
||||
nodes = malloc(socket_count * sizeof(struct socket_node*)); |
||||
} |
||||
if (!fds || (socket_count > 0 && !nodes)) { |
||||
free(fds); |
||||
free(nodes); |
||||
return; /* out of memory */ |
||||
} |
||||
|
||||
/* Fill arrays */ |
||||
int idx = 0; |
||||
|
||||
/* Add wakeup fd first if present */ |
||||
if (wakeup_fd_present) { |
||||
fds[idx].fd = ua->wakeup_pipe[0]; |
||||
fds[idx].events = POLLIN; |
||||
fds[idx].revents = 0; |
||||
idx++; |
||||
} |
||||
|
||||
/* Add socket fds using efficient array traversal */ |
||||
int node_idx = 0; |
||||
for (int i = 0; i < ua->sockets->capacity && node_idx < socket_count; i++) { |
||||
if (ua->sockets->sockets[i].active) { |
||||
struct socket_node* cur = &ua->sockets->sockets[i]; |
||||
fds[idx].fd = cur->fd; |
||||
fds[idx].events = 0; |
||||
fds[idx].revents = 0; |
||||
|
||||
if (cur->read_cbk) fds[idx].events |= POLLIN; |
||||
if (cur->write_cbk) fds[idx].events |= POLLOUT; |
||||
if (cur->except_cbk) fds[idx].events |= POLLPRI; |
||||
|
||||
if (nodes) { |
||||
nodes[node_idx] = cur; |
||||
} |
||||
idx++; |
||||
node_idx++; |
||||
} |
||||
} |
||||
|
||||
/* Call poll */ |
||||
int ret = poll(fds, total_fds, timeout_ms); |
||||
if (ret < 0) { |
||||
if (errno == EINTR) { |
||||
free(fds); |
||||
free(nodes); |
||||
return; |
||||
} |
||||
perror("poll"); |
||||
free(fds); |
||||
free(nodes); |
||||
return; |
||||
} |
||||
|
||||
/* Process timeouts that may have expired during poll */ |
||||
process_timeouts(ua); |
||||
|
||||
/* Process socket events */ |
||||
if (ret > 0) { |
||||
for (int i = 0; i < total_fds; i++) { |
||||
if (fds[i].revents == 0) continue; |
||||
|
||||
/* Handle wakeup fd separately */ |
||||
if (wakeup_fd_present && i == 0) { |
||||
if (fds[i].revents & POLLIN) { |
||||
drain_wakeup_pipe(ua); |
||||
} |
||||
continue; |
||||
} |
||||
|
||||
/* Socket event */ |
||||
int socket_idx = i - wakeup_fd_present; |
||||
struct socket_node* node = nodes[socket_idx]; |
||||
|
||||
/* Check for error conditions first */ |
||||
if (fds[i].revents & (POLLERR | POLLHUP | POLLNVAL)) { |
||||
/* Treat as exceptional condition */ |
||||
if (node->except_cbk) { |
||||
node->except_cbk(node->fd, node->user_data); |
||||
} |
||||
} |
||||
|
||||
/* Exceptional data (out-of-band) */ |
||||
if (fds[i].revents & POLLPRI) { |
||||
if (node->except_cbk) { |
||||
node->except_cbk(node->fd, node->user_data); |
||||
} |
||||
} |
||||
|
||||
/* Read readiness */ |
||||
if (fds[i].revents & POLLIN) { |
||||
if (node->read_cbk) { |
||||
node->read_cbk(node->fd, node->user_data); |
||||
} |
||||
} |
||||
|
||||
/* Write readiness */ |
||||
if (fds[i].revents & POLLOUT) { |
||||
if (node->write_cbk) { |
||||
node->write_cbk(node->fd, node->user_data); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
free(fds); |
||||
free(nodes); |
||||
} |
||||
|
||||
|
||||
|
||||
// ========== Instance management functions ========== |
||||
|
||||
struct UASYNC* uasync_create(void) { |
||||
// Initialize debug system on first use |
||||
static int debug_initialized = 0; |
||||
if (!debug_initialized) { |
||||
debug_config_init(); |
||||
debug_initialized = 1; |
||||
} |
||||
|
||||
struct UASYNC* ua = malloc(sizeof(struct UASYNC)); |
||||
if (!ua) return NULL; |
||||
|
||||
memset(ua, 0, sizeof(struct UASYNC)); |
||||
ua->wakeup_pipe[0] = -1; |
||||
ua->wakeup_pipe[1] = -1; |
||||
ua->wakeup_initialized = 0; |
||||
|
||||
// Create wakeup pipe |
||||
if (pipe(ua->wakeup_pipe) < 0) { |
||||
DEBUG_WARN(DEBUG_CATEGORY_UASYNC, "Failed to create wakeup pipe: %s", strerror(errno)); |
||||
// Continue without wakeup mechanism |
||||
ua->wakeup_pipe[0] = -1; |
||||
ua->wakeup_pipe[1] = -1; |
||||
} else { |
||||
ua->wakeup_initialized = 1; |
||||
// Set non-blocking on read end to avoid blocking if pipe is full |
||||
int flags = fcntl(ua->wakeup_pipe[0], F_GETFL, 0); |
||||
if (flags >= 0) { |
||||
fcntl(ua->wakeup_pipe[0], F_SETFL, flags | O_NONBLOCK); |
||||
} |
||||
} |
||||
|
||||
ua->sockets = socket_array_create(16); |
||||
if (!ua->sockets) { |
||||
if (ua->wakeup_initialized) { |
||||
close(ua->wakeup_pipe[0]); |
||||
close(ua->wakeup_pipe[1]); |
||||
} |
||||
free(ua); |
||||
return NULL; |
||||
} |
||||
|
||||
ua->timeout_heap = timeout_heap_create(16); |
||||
if (!ua->timeout_heap) { |
||||
socket_array_destroy(ua->sockets); |
||||
if (ua->wakeup_initialized) { |
||||
close(ua->wakeup_pipe[0]); |
||||
close(ua->wakeup_pipe[1]); |
||||
} |
||||
free(ua); |
||||
return NULL; |
||||
} |
||||
|
||||
// Set callback to free timeout nodes and update counters |
||||
timeout_heap_set_free_callback(ua->timeout_heap, ua, timeout_node_free_callback); |
||||
|
||||
return ua; |
||||
} |
||||
|
||||
// Print all resources for debugging |
||||
void uasync_print_resources(struct UASYNC* ua, const char* prefix) { |
||||
if (!ua) { |
||||
printf("%s: NULL uasync instance\n", prefix); |
||||
return; |
||||
} |
||||
|
||||
printf("\n🔍 %s: UASYNC Resource Report for %p\n", prefix, ua); |
||||
printf(" Timer Statistics: allocated=%zu, freed=%zu, active=%zd\n", |
||||
ua->timer_alloc_count, ua->timer_free_count, |
||||
(ssize_t)(ua->timer_alloc_count - ua->timer_free_count)); |
||||
printf(" Socket Statistics: allocated=%zu, freed=%zu, active=%zd\n", |
||||
ua->socket_alloc_count, ua->socket_free_count, |
||||
(ssize_t)(ua->socket_alloc_count - ua->socket_free_count)); |
||||
|
||||
// Показать активные таймеры |
||||
if (ua->timeout_heap) { |
||||
size_t active_timers = 0; |
||||
TimeoutEntry entry; |
||||
// Создаем временную копию кучи для подсчета |
||||
TimeoutHeap* temp_heap = timeout_heap_create(16); |
||||
if (temp_heap) { |
||||
// Копируем все активные таймеры |
||||
while (timeout_heap_pop(ua->timeout_heap, &entry) == 0) { |
||||
if (!entry.deleted) { |
||||
active_timers++; |
||||
struct timeout_node* node = (struct timeout_node*)entry.data; |
||||
printf(" Timer: node=%p, expires=%llu ms, cancelled=%d\n", |
||||
node, (unsigned long long)entry.expiration, node->cancelled); |
||||
} |
||||
timeout_heap_push(temp_heap, entry.expiration, entry.data); |
||||
} |
||||
// Возвращаем таймеры обратно |
||||
while (timeout_heap_pop(temp_heap, &entry) == 0) { |
||||
timeout_heap_push(ua->timeout_heap, entry.expiration, entry.data); |
||||
} |
||||
timeout_heap_destroy(temp_heap); |
||||
} |
||||
printf(" Active timers in heap: %zu\n", active_timers); |
||||
} |
||||
|
||||
// Показать активные сокеты |
||||
if (ua->sockets) { |
||||
int active_sockets = 0; |
||||
printf(" Socket array capacity: %d, active: %d\n", |
||||
ua->sockets->capacity, ua->sockets->count); |
||||
for (int i = 0; i < ua->sockets->capacity; i++) { |
||||
if (ua->sockets->sockets[i].active) { |
||||
active_sockets++; |
||||
printf(" Socket: fd=%d, active=%d\n", |
||||
ua->sockets->sockets[i].fd, |
||||
ua->sockets->sockets[i].active); |
||||
} |
||||
} |
||||
printf(" Total active sockets: %d\n", active_sockets); |
||||
} |
||||
|
||||
printf("🔚 %s: End of resource report\n\n", prefix); |
||||
} |
||||
|
||||
void uasync_destroy(struct UASYNC* ua) { |
||||
if (!ua) return; |
||||
|
||||
DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_destroy: starting cleanup for ua=%p", ua); |
||||
|
||||
// Диагностика ресурсов перед очисткой |
||||
uasync_print_resources(ua, "BEFORE_DESTROY"); |
||||
|
||||
// Check for potential memory leaks |
||||
if (ua->timer_alloc_count != ua->timer_free_count || ua->socket_alloc_count != ua->socket_free_count) { |
||||
DEBUG_ERROR(DEBUG_CATEGORY_MEMORY, "Memory leaks detected before cleanup: timers %zu/%zu, sockets %zu/%zu", |
||||
ua->timer_alloc_count, ua->timer_free_count, ua->socket_alloc_count, ua->socket_free_count); |
||||
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "Timer leak: allocated=%zu, freed=%zu, diff=%zd", |
||||
ua->timer_alloc_count, ua->timer_free_count, |
||||
(ssize_t)(ua->timer_alloc_count - ua->timer_free_count)); |
||||
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "Socket leak: allocated=%zu, freed=%zu, diff=%zd", |
||||
ua->socket_alloc_count, ua->socket_free_count, |
||||
(ssize_t)(ua->socket_alloc_count - ua->socket_free_count)); |
||||
// Continue cleanup, will abort after if leaks remain |
||||
} |
||||
|
||||
// Free all remaining timeouts |
||||
if (ua->timeout_heap) { |
||||
size_t freed_count = 0; |
||||
while (1) { |
||||
TimeoutEntry entry; |
||||
if (timeout_heap_pop(ua->timeout_heap, &entry) != 0) break; |
||||
struct timeout_node* node = (struct timeout_node*)entry.data; |
||||
DEBUG_TRACE(DEBUG_CATEGORY_TIMERS, "uasync_destroy: freeing timer node %p (expired=%llu ms)", |
||||
node, (unsigned long long)entry.expiration); |
||||
ua->timer_free_count++; |
||||
freed_count++; |
||||
free(node); |
||||
} |
||||
DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_destroy: freed %zu timer nodes in destroy, heap freed_count = %zu", |
||||
freed_count, ua->timeout_heap->freed_count); |
||||
timeout_heap_destroy(ua->timeout_heap); |
||||
} |
||||
|
||||
// Free all socket nodes using array approach |
||||
if (ua->sockets) { |
||||
// Count and free all active sockets |
||||
int freed_count = 0; |
||||
for (int i = 0; i < ua->sockets->capacity; i++) { |
||||
if (ua->sockets->sockets[i].active) { |
||||
ua->socket_free_count++; |
||||
freed_count++; |
||||
} |
||||
} |
||||
DEBUG_DEBUG(DEBUG_CATEGORY_MEMORY, "Freed %d socket nodes in destroy", freed_count); |
||||
socket_array_destroy(ua->sockets); |
||||
} |
||||
|
||||
// Close wakeup pipe |
||||
if (ua->wakeup_initialized) { |
||||
close(ua->wakeup_pipe[0]); |
||||
close(ua->wakeup_pipe[1]); |
||||
} |
||||
|
||||
// Final leak check |
||||
if (ua->timer_alloc_count != ua->timer_free_count || ua->socket_alloc_count != ua->socket_free_count) { |
||||
DEBUG_ERROR(DEBUG_CATEGORY_MEMORY, "Memory leaks detected after cleanup: timers %zu/%zu, sockets %zu/%zu", |
||||
ua->timer_alloc_count, ua->timer_free_count, ua->socket_alloc_count, ua->socket_free_count); |
||||
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "FINAL Timer leak: allocated=%zu, freed=%zu, diff=%zd", |
||||
ua->timer_alloc_count, ua->timer_free_count, |
||||
(ssize_t)(ua->timer_alloc_count - ua->timer_free_count)); |
||||
DEBUG_ERROR(DEBUG_CATEGORY_TIMERS, "FINAL Socket leak: allocated=%zu, freed=%zu, diff=%zd", |
||||
ua->socket_alloc_count, ua->socket_free_count, |
||||
(ssize_t)(ua->socket_alloc_count - ua->socket_free_count)); |
||||
abort(); |
||||
} |
||||
|
||||
DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_destroy: completed successfully for ua=%p", ua); |
||||
free(ua); |
||||
} |
||||
|
||||
void uasync_init_instance(struct UASYNC* ua) { |
||||
if (!ua) return; |
||||
|
||||
// Initialize socket array if not present |
||||
if (!ua->sockets) { |
||||
ua->sockets = socket_array_create(16); |
||||
} |
||||
|
||||
if (!ua->timeout_heap) { |
||||
ua->timeout_heap = timeout_heap_create(16); |
||||
if (ua->timeout_heap) { |
||||
timeout_heap_set_free_callback(ua->timeout_heap, ua, timeout_node_free_callback); |
||||
} |
||||
} |
||||
} |
||||
|
||||
// Debug statistics |
||||
void uasync_get_stats(struct UASYNC* ua, size_t* timer_alloc, size_t* timer_free, size_t* socket_alloc, size_t* socket_free) { |
||||
if (!ua) return; |
||||
if (timer_alloc) *timer_alloc = ua->timer_alloc_count; |
||||
if (timer_free) *timer_free = ua->timer_free_count; |
||||
if (socket_alloc) *socket_alloc = ua->socket_alloc_count; |
||||
if (socket_free) *socket_free = ua->socket_free_count; |
||||
} |
||||
|
||||
// Get global instance for backward compatibility |
||||
|
||||
// Wakeup mechanism |
||||
int uasync_wakeup(struct UASYNC* ua) { |
||||
if (!ua || !ua->wakeup_initialized) return -1; |
||||
|
||||
char byte = 0; |
||||
ssize_t ret = write(ua->wakeup_pipe[1], &byte, 1); |
||||
if (ret != 1) { |
||||
// Don't print error from signal handler |
||||
return -1; |
||||
} |
||||
return 0; |
||||
} |
||||
|
||||
int uasync_get_wakeup_fd(struct UASYNC* ua) { |
||||
if (!ua || !ua->wakeup_initialized) return -1; |
||||
return ua->wakeup_pipe[1]; |
||||
} |
||||
|
||||
@ -0,0 +1,230 @@
|
||||
// utun_instance.c - Root instance implementation |
||||
#include "utun_instance.h" |
||||
#include "config_parser.h" |
||||
#include "config_updater.h" |
||||
#include "tun_if.h" |
||||
#include "routing.h" |
||||
#include "etcp_connections.h" |
||||
#include "etcp.h" |
||||
#include "../lib/u_async.h" |
||||
#include "../lib/debug_config.h" |
||||
#include <stdlib.h> |
||||
#include <stdio.h> |
||||
#include <string.h> |
||||
#include <errno.h> |
||||
#include <unistd.h> |
||||
#include <arpa/inet.h> |
||||
|
||||
// Forward declarations |
||||
static void tun_read_callback(int fd, void* user_arg); |
||||
static uint32_t get_dest_ip(const uint8_t *packet, size_t len); |
||||
|
||||
// Global instance for signal handlers |
||||
static struct UTUN_INSTANCE *g_instance = NULL; |
||||
|
||||
// Create and initialize root instance |
||||
struct UTUN_INSTANCE* utun_instance_create(struct UASYNC* ua, const char *config_file, const char *log_file) { |
||||
struct UTUN_INSTANCE *instance = calloc(1, sizeof(struct UTUN_INSTANCE)); |
||||
if (!instance) return NULL; |
||||
|
||||
// Initialize basic fields |
||||
instance->running = 0; |
||||
instance->log_fp = NULL; |
||||
instance->ua = ua; |
||||
|
||||
// Ensure keys and node_id exist in config |
||||
if (config_ensure_keys_and_node_id(config_file) < 0) { |
||||
DEBUG_ERROR(DEBUG_CATEGORY_CONFIG, "Failed to ensure keys and node_id in config: %s", config_file); |
||||
free(instance); |
||||
return NULL; |
||||
} |
||||
|
||||
// Load configuration |
||||
instance->config = parse_config(config_file); |
||||
if (!instance->config) { |
||||
DEBUG_ERROR(DEBUG_CATEGORY_CONFIG, "Failed to load config from %s", config_file); |
||||
free(instance); |
||||
return NULL; |
||||
} |
||||
|
||||
// Open log file |
||||
if (log_file) { |
||||
instance->log_fp = fopen(log_file, "a"); |
||||
if (!instance->log_fp) { |
||||
DEBUG_ERROR(DEBUG_CATEGORY_MEMORY, "Failed to open log file %s: %s", log_file, strerror(errno)); |
||||
} |
||||
} |
||||
// Set node_id from config |
||||
instance->node_id = instance->config->global.my_node_id; |
||||
|
||||
// Set my keys |
||||
if (sc_init_local_keys(&instance->my_keys, instance->config->global.my_public_key_hex, instance->config->global.my_private_key_hex)) { |
||||
DEBUG_ERROR(DEBUG_CATEGORY_MEMORY, "Failed to initialize local keys"); |
||||
} |
||||
|
||||
|
||||
instance->pkt_pool=memory_pool_init(PACKET_DATA_SIZE+100); |
||||
/* |
||||
// Initialize TUN device |
||||
if (tun_create(&instance->tun) < 0) { |
||||
DEBUG_ERROR(DEBUG_CATEGORY_TUN, "Failed to create TUN device"); |
||||
return -1; |
||||
} |
||||
|
||||
// Configure TUN device |
||||
if (tun_set_ip(instance->tun.ifname, instance->config->global.tun_ip) < 0) { |
||||
DEBUG_ERROR(DEBUG_CATEGORY_TUN, "Failed to set TUN IP"); |
||||
tun_close(&instance->tun); |
||||
return -1; |
||||
} |
||||
|
||||
if (instance->config->global.mtu > 0) { |
||||
if (tun_set_mtu(instance->tun.ifname, instance->config->global.mtu) < 0) { |
||||
DEBUG_ERROR(DEBUG_CATEGORY_TUN, "Failed to set TUN MTU"); |
||||
tun_close(&instance->tun); |
||||
return -1; |
||||
} |
||||
} |
||||
|
||||
if (tun_set_up(instance->tun.ifname) < 0) { |
||||
DEBUG_ERROR(DEBUG_CATEGORY_TUN, "Failed to bring up TUN interface"); |
||||
tun_close(&instance->tun); |
||||
return -1; |
||||
} |
||||
|
||||
// Create routing table |
||||
instance->routing_table = routing_table_create(); |
||||
if (!instance->routing_table) { |
||||
DEBUG_ERROR(DEBUG_CATEGORY_ROUTING, "Failed to create routing table"); |
||||
return -1; |
||||
} |
||||
*/ |
||||
// Initialize connections from configuration - moved to utun_instance_init |
||||
// to avoid double initialization |
||||
/* |
||||
if (init_connections(instance) < 0) { |
||||
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "Failed to initialize connections"); |
||||
// Cleanup will be handled by utun_instance_destroy |
||||
return NULL; |
||||
} |
||||
*/ |
||||
|
||||
return instance; |
||||
} |
||||
|
||||
// Destroy instance and cleanup resources |
||||
void utun_instance_destroy(struct UTUN_INSTANCE *instance) { |
||||
if (!instance) return; |
||||
|
||||
printf("[INSTANCE_DESTROY] Starting cleanup for instance %p\n", instance); |
||||
|
||||
// Stop running |
||||
instance->running = 0; |
||||
|
||||
// Cleanup ETCP sockets and connections FIRST (before destroying uasync) |
||||
printf("[INSTANCE_DESTROY] Cleaning up ETCP sockets and connections\n"); |
||||
if (instance->etcp_sockets) { |
||||
printf("[INSTANCE_DESTROY] Found ETCP sockets to cleanup\n"); |
||||
struct ETCP_SOCKET* sock = instance->etcp_sockets; |
||||
while (sock) { |
||||
struct ETCP_SOCKET* next = sock->next; |
||||
printf("[INSTANCE_DESTROY] Removing socket %p, fd=%d\n", sock, sock->fd); |
||||
etcp_socket_remove(sock); // Полный cleanup сокета |
||||
sock = next; |
||||
} |
||||
instance->etcp_sockets = NULL; |
||||
printf("[INSTANCE_DESTROY] ETCP sockets cleanup complete\n"); |
||||
} |
||||
|
||||
if (instance->connections) { |
||||
printf("[INSTANCE_DESTROY] Found ETCP connections to cleanup\n"); |
||||
struct ETCP_CONN* conn = instance->connections; |
||||
while (conn) { |
||||
struct ETCP_CONN* next = conn->next; |
||||
printf("[INSTANCE_DESTROY] Closing connection %p\n", conn); |
||||
etcp_connection_close(conn); // Закрыть соединение |
||||
conn = next; |
||||
} |
||||
instance->connections = NULL; |
||||
printf("[INSTANCE_DESTROY] ETCP connections cleanup complete\n"); |
||||
} |
||||
|
||||
// Cleanup other components |
||||
if (instance->routing_table) { |
||||
routing_table_destroy(instance->routing_table); |
||||
} |
||||
|
||||
// Cleanup TUN |
||||
if (instance->tun.fd >= 0) { |
||||
tun_close(&instance->tun); |
||||
} |
||||
|
||||
// Cleanup config |
||||
if (instance->config) { |
||||
free_config(instance->config); |
||||
} |
||||
|
||||
// Close log file |
||||
if (instance->log_fp) { |
||||
fclose(instance->log_fp); |
||||
} |
||||
|
||||
// FINALLY destroy uasync (after all resources are cleaned up) |
||||
printf("[INSTANCE_DESTROY] Destroying uasync instance\n"); |
||||
if (instance->ua) { |
||||
uasync_destroy(instance->ua); |
||||
} |
||||
|
||||
// Free the instance memory |
||||
printf("[INSTANCE_DESTROY] Freeing instance memory\n"); |
||||
free(instance); |
||||
printf("[INSTANCE_DESTROY] Instance destroyed completely\n"); |
||||
|
||||
// Cleanup TUN |
||||
if (instance->tun.fd >= 0) { |
||||
tun_close(&instance->tun); |
||||
} |
||||
|
||||
// Cleanup config |
||||
if (instance->config) { |
||||
free_config(instance->config); |
||||
} |
||||
|
||||
// Close log file |
||||
if (instance->log_fp) { |
||||
fclose(instance->log_fp); |
||||
} |
||||
|
||||
// uasync cleanup will handle its built-in wakeup pipe |
||||
|
||||
// Clear global instance |
||||
if (g_instance == instance) { |
||||
g_instance = NULL; |
||||
} |
||||
|
||||
free(instance); |
||||
} |
||||
|
||||
// Stop instance |
||||
void utun_instance_stop(struct UTUN_INSTANCE *instance) { |
||||
if (!instance) return; |
||||
instance->running = 0; |
||||
// Wakeup main loop using built-in uasync wakeup |
||||
if (instance->ua) { |
||||
memory_pool_destroy(instance->pkt_pool); |
||||
uasync_wakeup(instance->ua); |
||||
} |
||||
|
||||
|
||||
} |
||||
|
||||
int utun_instance_init(struct UTUN_INSTANCE *instance) { |
||||
if (!instance) return -1; |
||||
|
||||
// Initialize connections |
||||
if (init_connections(instance) < 0) { |
||||
return -1; |
||||
} |
||||
|
||||
return 0; |
||||
} |
||||
Binary file not shown.
Loading…
Reference in new issue