diff --git a/lib/Makefile.am b/lib/Makefile.am index 3f670a6..2f8bdce 100644 --- a/lib/Makefile.am +++ b/lib/Makefile.am @@ -12,7 +12,9 @@ libuasync_a_SOURCES = \ memory_pool.c \ memory_pool.h \ sha256.c \ - sha256.h + sha256.h \ + socket_compat.c \ + socket_compat.h libuasync_a_CFLAGS = \ -D_ISOC99_SOURCE \ diff --git a/lib/debug_config.h b/lib/debug_config.h index f681e22..7c30365 100644 --- a/lib/debug_config.h +++ b/lib/debug_config.h @@ -43,6 +43,7 @@ typedef uint64_t debug_category_t; #define DEBUG_CATEGORY_TIMERS ((debug_category_t)1 << 10) // timer management #define DEBUG_CATEGORY_NORMALIZER ((debug_category_t)1 << 11) // packet normalizer #define DEBUG_CATEGORY_BGP ((debug_category_t)1 << 12) // BGP route exchange +#define DEBUG_CATEGORY_SOCKET ((debug_category_t)1 << 13) // Socket operations #define DEBUG_CATEGORY_ALL ((debug_category_t)0xFFFFFFFFFFFFFFFFULL) /* Debug configuration structure */ diff --git a/lib/socket_compat.c b/lib/socket_compat.c new file mode 100644 index 0000000..491536a --- /dev/null +++ b/lib/socket_compat.c @@ -0,0 +1,219 @@ +/** + * Socket compatibility layer implementation + */ + +#include "socket_compat.h" +#include "debug_config.h" + +#ifdef _WIN32 +// ==================== Windows Implementation ==================== + +int socket_platform_init(void) { + WSADATA wsaData; + int result = WSAStartup(MAKEWORD(2, 2), &wsaData); + if (result != 0) { + DEBUG_ERROR(DEBUG_CATEGORY_SOCKET, "WSAStartup failed: %d", result); + return -1; + } + DEBUG_INFO(DEBUG_CATEGORY_SOCKET, "Winsock 2.2 initialized"); + return 0; +} + +void socket_platform_cleanup(void) { + WSACleanup(); + DEBUG_INFO(DEBUG_CATEGORY_SOCKET, "Winsock cleanup completed"); +} + +int socket_set_nonblocking(socket_t sock) { + u_long mode = 1; + if (ioctlsocket(sock, FIONBIO, &mode) != 0) { + DEBUG_ERROR(DEBUG_CATEGORY_SOCKET, + "ioctlsocket(FIONBIO) failed: %d", socket_get_error()); + return -1; + } + return 0; +} + +int socket_close_wrapper(socket_t sock) { + return closesocket(sock); +} + +const char* socket_strerror(int err) { + static char buf[256]; + FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, err, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), + buf, sizeof(buf), NULL); + return buf; +} + +#else +// ==================== POSIX Implementation ==================== + +int socket_platform_init(void) { + // No initialization needed on POSIX + DEBUG_INFO(DEBUG_CATEGORY_SOCKET, "POSIX socket subsystem ready"); + return 0; +} + +void socket_platform_cleanup(void) { + // No cleanup needed on POSIX + DEBUG_INFO(DEBUG_CATEGORY_SOCKET, "POSIX socket cleanup completed"); +} + +int socket_set_nonblocking(socket_t sock) { + int flags = fcntl(sock, F_GETFL, 0); + if (flags < 0) { + DEBUG_ERROR(DEBUG_CATEGORY_SOCKET, + "fcntl(F_GETFL) failed: %s", strerror(errno)); + return -1; + } + if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) < 0) { + DEBUG_ERROR(DEBUG_CATEGORY_SOCKET, + "fcntl(F_SETFL) failed: %s", strerror(errno)); + return -1; + } + return 0; +} + +int socket_close_wrapper(socket_t sock) { + return close(sock); +} + +const char* socket_strerror(int err) { + return strerror(err); +} + +#endif + +// ==================== Universal Implementations ==================== + +socket_t socket_create_udp(int family) { + socket_t sock = socket(family, SOCK_DGRAM, IPPROTO_UDP); + if (sock == SOCKET_INVALID) { + DEBUG_ERROR(DEBUG_CATEGORY_SOCKET, + "socket(SOCK_DGRAM) failed: %s", socket_strerror(socket_get_error())); + return SOCKET_INVALID; + } + DEBUG_DEBUG(DEBUG_CATEGORY_SOCKET, "Created UDP socket: %ld", (long)sock); + return sock; +} + +int socket_set_buffers(socket_t sock, int sndbuf, int rcvbuf) { + int err = 0; + +#ifdef _WIN32 + // Windows uses char* for optval + if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, + (const char*)&sndbuf, sizeof(sndbuf)) != 0) { + DEBUG_WARN(DEBUG_CATEGORY_SOCKET, + "SO_SNDBUF failed: %s", socket_strerror(socket_get_error())); + err = -1; + } + if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, + (const char*)&rcvbuf, sizeof(rcvbuf)) != 0) { + DEBUG_WARN(DEBUG_CATEGORY_SOCKET, + "SO_RCVBUF failed: %s", socket_strerror(socket_get_error())); + err = -1; + } +#else + if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(sndbuf)) < 0) { + DEBUG_WARN(DEBUG_CATEGORY_SOCKET, + "SO_SNDBUF failed: %s", strerror(errno)); + err = -1; + } + if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)) < 0) { + DEBUG_WARN(DEBUG_CATEGORY_SOCKET, + "SO_RCVBUF failed: %s", strerror(errno)); + err = -1; + } +#endif + + return err; +} + +int socket_set_reuseaddr(socket_t sock, int reuse) { +#ifdef _WIN32 + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, + (const char*)&reuse, sizeof(reuse)) != 0) { + DEBUG_WARN(DEBUG_CATEGORY_SOCKET, + "SO_REUSEADDR failed: %s", socket_strerror(socket_get_error())); + return -1; + } +#else + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0) { + DEBUG_WARN(DEBUG_CATEGORY_SOCKET, + "SO_REUSEADDR failed: %s", strerror(errno)); + return -1; + } +#endif + return 0; +} + +int socket_bind_to_device(socket_t sock, const char* ifname) { +#ifndef _WIN32 +#ifdef SO_BINDTODEVICE + if (setsockopt(sock, SOL_SOCKET, SO_BINDTODEVICE, ifname, strlen(ifname)) < 0) { + DEBUG_WARN(DEBUG_CATEGORY_SOCKET, + "SO_BINDTODEVICE failed: %s", strerror(errno)); + return -1; + } + return 0; +#else + (void)sock; + (void)ifname; + DEBUG_DEBUG(DEBUG_CATEGORY_SOCKET, "SO_BINDTODEVICE not available"); + return -1; +#endif +#else + // Windows doesn't support SO_BINDTODEVICE + (void)sock; + (void)ifname; + DEBUG_DEBUG(DEBUG_CATEGORY_SOCKET, "SO_BINDTODEVICE not available on Windows"); + return -1; +#endif +} + +int socket_set_mark(socket_t sock, int mark) { +#ifndef _WIN32 +#ifdef SO_MARK + if (setsockopt(sock, SOL_SOCKET, SO_MARK, &mark, sizeof(mark)) < 0) { + DEBUG_WARN(DEBUG_CATEGORY_SOCKET, + "SO_MARK failed: %s", strerror(errno)); + return -1; + } + return 0; +#else + (void)sock; + (void)mark; + DEBUG_DEBUG(DEBUG_CATEGORY_SOCKET, "SO_MARK not available"); + return -1; +#endif +#else + // Windows doesn't support SO_MARK + (void)sock; + (void)mark; + DEBUG_DEBUG(DEBUG_CATEGORY_SOCKET, "SO_MARK not available on Windows"); + return -1; +#endif +} + +ssize_t socket_sendto(socket_t sock, const void* buf, size_t len, + struct sockaddr* dest, socklen_t dest_len) { +#ifdef _WIN32 + return sendto(sock, (const char*)buf, (int)len, 0, dest, dest_len); +#else + return sendto(sock, buf, len, 0, dest, dest_len); +#endif +} + +ssize_t socket_recvfrom(socket_t sock, void* buf, size_t len, + struct sockaddr* src, socklen_t* src_len) { +#ifdef _WIN32 + int fromlen = src_len ? (int)*src_len : 0; + ssize_t ret = recvfrom(sock, (char*)buf, (int)len, 0, src, src_len ? &fromlen : NULL); + if (src_len) *src_len = (socklen_t)fromlen; + return ret; +#else + return recvfrom(sock, buf, len, 0, src, src_len); +#endif +} diff --git a/lib/socket_compat.h b/lib/socket_compat.h new file mode 100644 index 0000000..006ec21 --- /dev/null +++ b/lib/socket_compat.h @@ -0,0 +1,72 @@ +/** + * Socket compatibility layer for cross-platform support (POSIX / Windows) + * MSYS2 UCRT64 compatible + */ + +#ifndef SOCKET_COMPAT_H +#define SOCKET_COMPAT_H + +#ifdef _WIN32 + #include + #include + + typedef SOCKET socket_t; + #define SOCKET_INVALID INVALID_SOCKET + #define SOCKET_ERROR_CODE SOCKET_ERROR + + // Error codes + #define ERR_WOULDBLOCK WSAEWOULDBLOCK + #define ERR_AGAIN WSAEWOULDBLOCK + #define ERR_INTR WSAEINTR + +#else + // POSIX systems + #include + #include + #include + #include + #include + #include + #include + + typedef int socket_t; + #define SOCKET_INVALID (-1) + #define SOCKET_ERROR_CODE (-1) + + #define ERR_WOULDBLOCK EWOULDBLOCK + #define ERR_AGAIN EAGAIN + #define ERR_INTR EINTR +#endif + +// Platform initialization/cleanup +int socket_platform_init(void); +void socket_platform_cleanup(void); + +// Socket operations +socket_t socket_create_udp(int family); +int socket_set_nonblocking(socket_t sock); +int socket_close_wrapper(socket_t sock); +static inline int socket_get_error(void) { +#ifdef _WIN32 + return WSAGetLastError(); +#else + return errno; +#endif +} + +// Socket options +int socket_set_buffers(socket_t sock, int sndbuf, int rcvbuf); +int socket_set_reuseaddr(socket_t sock, int reuse); +int socket_bind_to_device(socket_t sock, const char* ifname); +int socket_set_mark(socket_t sock, int mark); + +// I/O operations +ssize_t socket_sendto(socket_t sock, const void* buf, size_t len, + struct sockaddr* dest, socklen_t dest_len); +ssize_t socket_recvfrom(socket_t sock, void* buf, size_t len, + struct sockaddr* src, socklen_t* src_len); + +// Utility +const char* socket_strerror(int err); + +#endif // SOCKET_COMPAT_H diff --git a/lib/u_async.c b/lib/u_async.c index d18caea..105f62c 100644 --- a/lib/u_async.c +++ b/lib/u_async.c @@ -5,12 +5,15 @@ #include #include #include -#include #include -#include #include #include +#ifndef _WIN32 +#include +#include +#endif + // Platform-specific includes #ifdef __linux__ #include @@ -32,9 +35,13 @@ struct timeout_node { // Socket node with array-based storage struct socket_node { - int fd; - socket_callback_t read_cbk; - socket_callback_t write_cbk; + int fd; // File descriptor (for pipe, file) + socket_t sock; // Socket (for cross-platform sockets) + int type; // SOCKET_NODE_TYPE_FD or SOCKET_NODE_TYPE_SOCK + socket_callback_t read_cbk; // For FD type + socket_callback_t write_cbk; // For FD type + socket_t_callback_t read_cbk_sock; // For SOCK type + socket_t_callback_t write_cbk_sock; // For SOCK type socket_callback_t except_cbk; void* user_data; int active; // 1 if socket is active, 0 if freed (for reuse) @@ -106,7 +113,10 @@ static void socket_array_destroy(struct socket_array* sa) { free(sa); } -static int socket_array_add(struct socket_array* sa, int fd, socket_callback_t read_cbk, socket_callback_t write_cbk, socket_callback_t except_cbk, void* user_data) { +static int socket_array_add_internal(struct socket_array* sa, int fd, socket_t sock, int type, + socket_callback_t read_cbk_fd, socket_callback_t write_cbk_fd, + socket_t_callback_t read_cbk_sock, socket_t_callback_t write_cbk_sock, + socket_callback_t except_cbk, void* user_data) { if (!sa || fd < 0 || fd >= FD_SETSIZE) return -1; if (fd >= sa->capacity) { // Need to resize - double the capacity @@ -159,8 +169,12 @@ static int socket_array_add(struct socket_array* sa, int fd, socket_callback_t r // Add the socket sa->sockets[index].fd = fd; - sa->sockets[index].read_cbk = read_cbk; - sa->sockets[index].write_cbk = write_cbk; + sa->sockets[index].sock = sock; + sa->sockets[index].type = type; + sa->sockets[index].read_cbk = read_cbk_fd; + sa->sockets[index].write_cbk = write_cbk_fd; + sa->sockets[index].read_cbk_sock = read_cbk_sock; + sa->sockets[index].write_cbk_sock = write_cbk_sock; sa->sockets[index].except_cbk = except_cbk; sa->sockets[index].user_data = user_data; sa->sockets[index].active = 1; @@ -175,6 +189,27 @@ static int socket_array_add(struct socket_array* sa, int fd, socket_callback_t r return index; } +// Wrapper for adding regular file descriptors (pipe, file) +static int socket_array_add(struct socket_array* sa, int fd, socket_callback_t read_cbk, + socket_callback_t write_cbk, socket_callback_t except_cbk, void* user_data) { + return socket_array_add_internal(sa, fd, SOCKET_INVALID, SOCKET_NODE_TYPE_FD, + read_cbk, write_cbk, NULL, NULL, except_cbk, user_data); +} + +// Wrapper for adding socket_t (cross-platform sockets) +static int socket_array_add_socket_t(struct socket_array* sa, socket_t sock, socket_t_callback_t read_cbk, + socket_t_callback_t write_cbk, socket_callback_t except_cbk, void* user_data) { + // On Windows, SOCKET is UINT_PTR, so we need to handle indexing differently +#ifdef _WIN32 + int fd = (int)(intptr_t)sock; // Use socket value as index on Windows (simplified) +#else + int fd = sock; // On POSIX, socket_t is int +#endif + if (fd < 0 || fd >= FD_SETSIZE) return -1; + return socket_array_add_internal(sa, fd, sock, SOCKET_NODE_TYPE_SOCK, + NULL, NULL, read_cbk, write_cbk, except_cbk, user_data); +} + static int socket_array_remove(struct socket_array* sa, int fd) { if (!sa || fd < 0 || fd >= sa->capacity) return -1; @@ -184,6 +219,8 @@ static int socket_array_remove(struct socket_array* sa, int fd) { // Mark as inactive sa->sockets[index].active = 0; sa->sockets[index].fd = -1; + sa->sockets[index].sock = SOCKET_INVALID; + sa->sockets[index].type = SOCKET_NODE_TYPE_FD; sa->fd_to_index[fd] = -1; sa->index_to_fd[index] = -1; @@ -211,6 +248,23 @@ static struct socket_node* socket_array_get(struct socket_array* sa, int fd) { return &sa->sockets[index]; } +// Get socket_node by socket_t +static struct socket_node* socket_array_get_by_sock(struct socket_array* sa, socket_t sock) { + if (!sa) return NULL; +#ifdef _WIN32 + int fd = (int)(intptr_t)sock; +#else + int fd = sock; +#endif + if (fd < 0 || fd >= sa->capacity) return NULL; + + int index = sa->fd_to_index[fd]; + if (index == -1 || !sa->sockets[index].active) return NULL; + if (sa->sockets[index].type != SOCKET_NODE_TYPE_SOCK) return NULL; + + return &sa->sockets[index]; +} + // Callback to free timeout node and update counters static void timeout_node_free_callback(void* user_data, void* data) { struct UASYNC* ua = (struct UASYNC*)user_data; @@ -439,6 +493,74 @@ err_t uasync_remove_socket(struct UASYNC* ua, void* s_id) { return ERR_FAIL; } +// Add socket_t (cross-platform socket) +void* uasync_add_socket_t(struct UASYNC* ua, socket_t sock, socket_t_callback_t read_cbk, + socket_t_callback_t write_cbk, socket_t_callback_t except_cbk, void* user_data) { + if (!ua || sock == SOCKET_INVALID) return NULL; + + int index = socket_array_add_socket_t(ua->sockets, sock, read_cbk, write_cbk, + (socket_callback_t)except_cbk, user_data); + if (index < 0) return NULL; + + ua->socket_alloc_count++; + ua->poll_fds_dirty = 1; + +#if HAS_EPOLL + if (ua->use_epoll && ua->epoll_fd >= 0) { + struct epoll_event ev; + ev.events = 0; + if (read_cbk) ev.events |= EPOLLIN; + if (write_cbk) ev.events |= EPOLLOUT; + ev.data.ptr = &ua->sockets->sockets[index]; + + // On Windows, need to cast socket_t to int for epoll_ctl +#ifdef _WIN32 + int fd = (int)(intptr_t)sock; +#else + int fd = sock; +#endif + if (epoll_ctl(ua->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) { + socket_array_remove(ua->sockets, fd); + ua->socket_alloc_count--; + return NULL; + } + } +#endif + + return &ua->sockets->sockets[index]; +} + +// Remove socket by socket_t +err_t uasync_remove_socket_t(struct UASYNC* ua, socket_t sock) { + if (!ua || sock == SOCKET_INVALID) return ERR_FAIL; + + struct socket_node* node = socket_array_get_by_sock(ua->sockets, sock); + if (!node || !node->active) return ERR_FAIL; + +#if HAS_EPOLL + if (ua->use_epoll && ua->epoll_fd >= 0) { +#ifdef _WIN32 + int fd = (int)(intptr_t)sock; +#else + int fd = sock; +#endif + epoll_ctl(ua->epoll_fd, EPOLL_CTL_DEL, fd, NULL); + } +#endif + +#ifdef _WIN32 + int fd = (int)(intptr_t)sock; +#else + int fd = sock; +#endif + int ret = socket_array_remove(ua->sockets, fd); + if (ret == 0) { + ua->socket_free_count++; + ua->poll_fds_dirty = 1; + return ERR_OK; + } + return ERR_FAIL; +} // Helper function to rebuild cached pollfd array static void rebuild_poll_fds(struct UASYNC* ua) { @@ -475,11 +597,28 @@ static void rebuild_poll_fds(struct UASYNC* ua) { int socket_array_idx = ua->sockets->active_indices[i]; struct socket_node* cur = &ua->sockets->sockets[socket_array_idx]; - ua->poll_fds[idx].fd = cur->fd; + // Handle socket_t vs int fd + if (cur->type == SOCKET_NODE_TYPE_SOCK) { + // socket_t - cast to int for pollfd +#ifdef _WIN32 + ua->poll_fds[idx].fd = (int)(intptr_t)cur->sock; +#else + ua->poll_fds[idx].fd = cur->sock; +#endif + } else { + // Regular fd + ua->poll_fds[idx].fd = cur->fd; + } ua->poll_fds[idx].events = 0; ua->poll_fds[idx].revents = 0; - if (cur->read_cbk) ua->poll_fds[idx].events |= POLLIN; + if (cur->type == SOCKET_NODE_TYPE_SOCK) { + if (cur->read_cbk_sock) ua->poll_fds[idx].events |= POLLIN; + if (cur->write_cbk_sock) ua->poll_fds[idx].events |= POLLOUT; + } else { + if (cur->read_cbk) ua->poll_fds[idx].events |= POLLIN; + if (cur->write_cbk) ua->poll_fds[idx].events |= POLLOUT; + } if (cur->write_cbk) ua->poll_fds[idx].events |= POLLOUT; if (cur->except_cbk) ua->poll_fds[idx].events |= POLLPRI; @@ -513,19 +652,29 @@ static void process_epoll_events(struct UASYNC* ua, struct epoll_event* events, } } - /* Exceptional data (out-of-band) - epoll doesn't have POLLPRI equivalent */ - - /* Read readiness */ + /* Read readiness - use appropriate callback based on socket type */ if (events[i].events & EPOLLIN) { - if (node->read_cbk) { - node->read_cbk(node->fd, node->user_data); + if (node->type == SOCKET_NODE_TYPE_SOCK) { + if (node->read_cbk_sock) { + node->read_cbk_sock(node->sock, node->user_data); + } + } else { + if (node->read_cbk) { + node->read_cbk(node->fd, node->user_data); + } } } - /* Write readiness */ + /* Write readiness - use appropriate callback based on socket type */ if (events[i].events & EPOLLOUT) { - if (node->write_cbk) { - node->write_cbk(node->fd, node->user_data); + if (node->type == SOCKET_NODE_TYPE_SOCK) { + if (node->write_cbk_sock) { + node->write_cbk_sock(node->sock, node->user_data); + } + } else { + if (node->write_cbk) { + node->write_cbk(node->fd, node->user_data); + } } } } @@ -657,6 +806,15 @@ void uasync_poll(struct UASYNC* ua, int timeout_tb) { /* Socket event - lookup by fd */ struct socket_node* node = socket_array_get(ua->sockets, ua->poll_fds[i].fd); + if (!node) { // Try by socket_t (in case this is a socket) + socket_t lookup_sock; +#ifdef _WIN32 + lookup_sock = (socket_t)(intptr_t)ua->poll_fds[i].fd; +#else + lookup_sock = ua->poll_fds[i].fd; +#endif + node = socket_array_get_by_sock(ua->sockets, lookup_sock); + } if (!node) continue; // Socket may have been removed /* Check for error conditions first */ @@ -674,17 +832,29 @@ void uasync_poll(struct UASYNC* ua, int timeout_tb) { } } - /* Read readiness */ + /* Read readiness - use appropriate callback based on socket type */ if (ua->poll_fds[i].revents & POLLIN) { - if (node->read_cbk) { - node->read_cbk(node->fd, node->user_data); + if (node->type == SOCKET_NODE_TYPE_SOCK) { + if (node->read_cbk_sock) { + node->read_cbk_sock(node->sock, node->user_data); + } + } else { + if (node->read_cbk) { + node->read_cbk(node->fd, node->user_data); + } } } - /* Write readiness */ + /* Write readiness - use appropriate callback based on socket type */ if (ua->poll_fds[i].revents & POLLOUT) { - if (node->write_cbk) { - node->write_cbk(node->fd, node->user_data); + if (node->type == SOCKET_NODE_TYPE_SOCK) { + if (node->write_cbk_sock) { + node->write_cbk_sock(node->sock, node->user_data); + } + } else { + if (node->write_cbk) { + node->write_cbk(node->fd, node->user_data); + } } } } @@ -699,9 +869,17 @@ void uasync_poll(struct UASYNC* ua, int timeout_tb) { // ========== Instance management functions ========== struct UASYNC* uasync_create(void) { + // Initialize socket platform first (WSAStartup on Windows) + if (socket_platform_init() != 0) { + DEBUG_ERROR(DEBUG_CATEGORY_SOCKET, "Failed to initialize socket platform"); + return NULL; + } struct UASYNC* ua = malloc(sizeof(struct UASYNC)); - if (!ua) return NULL; + if (!ua) { + socket_platform_cleanup(); + return NULL; + } memset(ua, 0, sizeof(struct UASYNC)); ua->wakeup_pipe[0] = -1; @@ -907,6 +1085,9 @@ void uasync_destroy(struct UASYNC* ua, int close_fds) { DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_destroy: completed successfully for ua=%p", ua); free(ua); + + // Cleanup socket platform (WSACleanup on Windows) + socket_platform_cleanup(); } void uasync_init_instance(struct UASYNC* ua) { diff --git a/lib/u_async.h b/lib/u_async.h index f97e341..619c2e7 100644 --- a/lib/u_async.h +++ b/lib/u_async.h @@ -1,24 +1,30 @@ -// uasync.h - -// модуль асинхронных операций. добавляем сокеты и таймауты и mainloop их обслуживает. - -#ifndef UASYNC_H -#define UASYNC_H - -#include -#include -#include "timeout_heap.h" - -typedef void (*timeout_callback_t)(void* user_arg);// передаёт user_arg из uasync_set_timeout -typedef void (*socket_callback_t)(int fd, void* user_arg);// передаёт user_arg из uasync_add_socket -// user_arg полезен если нужно передать управляющую структуру. Ее можно выделить в памяти и в ней хранить всё что надо. т.е. при set_timeout передаём и получаем ее в callback-е - +// uasync.h + +// модуль асинхронных операций. добавляем сокеты и таймауты и mainloop их обслуживает. + +#ifndef UASYNC_H +#define UASYNC_H + +#include +#include +#include "timeout_heap.h" +#include "socket_compat.h" + +typedef void (*timeout_callback_t)(void* user_arg);// передаёт user_arg из uasync_set_timeout +typedef void (*socket_callback_t)(int fd, void* user_arg);// передаёт user_arg из uasync_add_socket (for pipe/file) +typedef void (*socket_t_callback_t)(socket_t sock, void* user_arg);// передаёт user_arg из uasync_add_socket_t (for sockets) +// user_arg полезен если нужно передать управляющую структуру. Ее можно выделить в памяти и в ней хранить всё что надо. т.е. при set_timeout передаём и получаем ее в callback-е -// Error type -typedef int err_t; -#define ERR_OK 0 -#define ERR_FAIL -1 +// Error type +typedef int err_t; +#define ERR_OK 0 +#define ERR_FAIL -1 + +// Socket node types +#define SOCKET_NODE_TYPE_FD 0 // Regular file descriptor (pipe, file) +#define SOCKET_NODE_TYPE_SOCK 1 // Socket (socket_t) + // Uasync instance structure struct UASYNC { TimeoutHeap* timeout_heap; // Heap for timeout management @@ -54,9 +60,12 @@ void uasync_init_instance(struct UASYNC* ua); void* uasync_set_timeout(struct UASYNC* ua, int timeout_tb, void* user_arg, timeout_callback_t callback); err_t uasync_cancel_timeout(struct UASYNC* ua, void* t_id); -// Sockets -void* uasync_add_socket(struct UASYNC* ua, int fd, socket_callback_t read_cbk, socket_callback_t write_cbk, socket_callback_t except_cbk, void* user_arg); -err_t uasync_remove_socket(struct UASYNC* ua, void* s_id); +// Sockets - for regular file descriptors (pipe, file) +void* uasync_add_socket(struct UASYNC* ua, int fd, socket_callback_t read_cbk, socket_callback_t write_cbk, socket_callback_t except_cbk, void* user_arg); +// Sockets - for socket_t (cross-platform sockets) +void* uasync_add_socket_t(struct UASYNC* ua, socket_t sock, socket_t_callback_t read_cbk, socket_t_callback_t write_cbk, socket_t_callback_t except_cbk, void* user_arg); +err_t uasync_remove_socket(struct UASYNC* ua, void* s_id); +err_t uasync_remove_socket_t(struct UASYNC* ua, socket_t sock); // Single iteration of event loop with timeout (timebase units) void uasync_poll(struct UASYNC* ua, int timeout_tb); diff --git a/src/etcp_connections.c b/src/etcp_connections.c index 27cdad1..3813e35 100644 --- a/src/etcp_connections.c +++ b/src/etcp_connections.c @@ -1,9 +1,8 @@ #include "etcp_connections.h" +#include "../lib/socket_compat.h" #include #include #include -#include -#include #include #include "route_lib.h" #include "route_bgp.h" @@ -19,7 +18,7 @@ #include // Forward declaration -static void etcp_connections_read_callback(int fd, void* arg); +static void etcp_connections_read_callback_socket(socket_t sock, void* arg); struct ETCP_CONN* etcp_connection_create(struct UTUN_INSTANCE* instance); static void etcp_link_remove_from_connections(struct ETCP_SOCKET* conn, struct ETCP_LINK* link); @@ -86,7 +85,7 @@ static void etcp_link_send_init(struct ETCP_LINK* link) { struct sockaddr_in* sin = (struct sockaddr_in*)&link->remote_addr; char addr_str[INET_ADDRSTRLEN]; inet_ntop(AF_INET, &sin->sin_addr, addr_str, INET_ADDRSTRLEN); - DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] INIT sending to %s:%d, link=%p, conn_fd=%d", addr_str, ntohs(sin->sin_port), link, link->conn->fd); + DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] INIT sending to %s:%d, link=%p", addr_str, ntohs(sin->sin_port), link); } etcp_encrypt_send(dgram); @@ -275,6 +274,7 @@ struct ETCP_SOCKET* etcp_socket_add(struct UTUN_INSTANCE* instance, struct socka DEBUG_ERROR(DEBUG_CATEGORY_MEMORY, "Failed to allocate connection"); return NULL; } + e_sock->fd = SOCKET_INVALID; // Initialize to invalid socket int family = AF_INET; if (ip) { @@ -286,68 +286,53 @@ struct ETCP_SOCKET* etcp_socket_add(struct UTUN_INSTANCE* instance, struct socka } } - e_sock->fd = socket(family, SOCK_DGRAM, 0); - if (e_sock->fd < 0) { - DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "Failed to create socket: %s", strerror(errno)); + e_sock->fd = socket_create_udp(family); + if (e_sock->fd == SOCKET_INVALID) { + DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "Failed to create socket: %s", + socket_strerror(socket_get_error())); free(e_sock); return NULL; } - int reuse = 0;// Строго не используем reuseaddr, даже в тестах! - if (setsockopt(e_sock->fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0) { - DEBUG_WARN(DEBUG_CATEGORY_CONNECTION, "Failed to set SO_REUSEADDR: %s", strerror(errno)); - } + // Строго не используем reuseaddr, даже в тестах! + socket_set_reuseaddr(e_sock->fd, 0); // Increase socket buffers for high throughput - int sndbuf = 4 * 1024 * 1024; // 4MB send buffer - int rcvbuf = 4 * 1024 * 1024; // 4MB receive buffer - if (setsockopt(e_sock->fd, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(sndbuf)) < 0) { - DEBUG_WARN(DEBUG_CATEGORY_CONNECTION, "Failed to set SO_SNDBUF: %s", strerror(errno)); - } - if (setsockopt(e_sock->fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)) < 0) { - DEBUG_WARN(DEBUG_CATEGORY_CONNECTION, "Failed to set SO_RCVBUF: %s", strerror(errno)); - } + socket_set_buffers(e_sock->fd, 4 * 1024 * 1024, 4 * 1024 * 1024); - int flags = fcntl(e_sock->fd, F_GETFL, 0); - fcntl(e_sock->fd, F_SETFL, flags | O_NONBLOCK); + if (socket_set_nonblocking(e_sock->fd) != 0) { + DEBUG_WARN(DEBUG_CATEGORY_CONNECTION, "Failed to set non-blocking mode"); + } - // Set socket mark if specified + // Set socket mark if specified (Linux only) if (so_mark > 0) { -#ifdef SO_MARK - if (setsockopt(e_sock->fd, SOL_SOCKET, SO_MARK, &so_mark, sizeof(so_mark)) < 0) { - DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "Failed to set SO_MARK: %s", strerror(errno)); - } -#endif + socket_set_mark(e_sock->fd, so_mark); } - // Bind to interface if specified + // Bind to interface if specified (Linux only) if (netif_index > 0) { -#ifdef SO_BINDTODEVICE char ifname[IF_NAMESIZE]; if (if_indextoname(netif_index, ifname)) { - if (setsockopt(e_sock->fd, SOL_SOCKET, SO_BINDTODEVICE, ifname, strlen(ifname)) < 0) { - DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "Failed to bind to interface %s: %s", ifname, strerror(errno)); - } + socket_bind_to_device(e_sock->fd, ifname); } -#endif } // Store the local address and bind socket if provided if (ip) { memcpy(&e_sock->local_addr, ip, sizeof(struct sockaddr_storage)); - // CRITICAL: Actually bind the socket to the address - this was missing! + // CRITICAL: Actually bind the socket to the address socklen_t addr_len = (ip->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6); if (bind(e_sock->fd, (struct sockaddr*)ip, addr_len) < 0) { - perror("bind"); - DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[ETCP] Failed to bind socket to address family %d", ip->ss_family); + DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[ETCP] Failed to bind socket to address family %d: %s", + ip->ss_family, socket_strerror(socket_get_error())); if (ip->ss_family == AF_INET) { struct sockaddr_in* sin = (struct sockaddr_in*)ip; char addr_str[INET_ADDRSTRLEN]; inet_ntop(AF_INET, &sin->sin_addr, addr_str, INET_ADDRSTRLEN); DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[ETCP] Failed to bind to %s:%d", addr_str, ntohs(sin->sin_port)); } - close(e_sock->fd); + socket_close_wrapper(e_sock->fd); free(e_sock); return NULL; } @@ -364,19 +349,19 @@ struct ETCP_SOCKET* etcp_socket_add(struct UTUN_INSTANCE* instance, struct socka instance->etcp_sockets = e_sock; // Register socket with uasync for receiving packets - e_sock->socket_id = uasync_add_socket(instance->ua, e_sock->fd, - etcp_connections_read_callback, - NULL, NULL, e_sock); + e_sock->socket_id = uasync_add_socket_t(instance->ua, e_sock->fd, + etcp_connections_read_callback_socket, + NULL, NULL, e_sock); if (!e_sock->socket_id) { - DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "Failed to register socket with uasync (fd=%d)", e_sock->fd); - close(e_sock->fd); + DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "Failed to register socket with uasync"); + socket_close_wrapper(e_sock->fd); free(e_sock); return NULL; } - DEBUG_INFO(DEBUG_CATEGORY_CONNECTION, "Registered ETCP socket with uasync (fd=%d)", e_sock->fd); + DEBUG_INFO(DEBUG_CATEGORY_CONNECTION, "Registered ETCP socket with uasync"); - DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] Socket %p (fd=%d) registered and active", e_sock, e_sock->fd); + DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] Socket %p registered and active", e_sock); return e_sock; } @@ -385,19 +370,19 @@ void etcp_socket_remove(struct ETCP_SOCKET* conn) { DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, ""); if (!conn) return; - DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] Removing socket %p, fd=%d, socket_id=%p", conn, conn->fd, conn->socket_id); + DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] Removing socket %p, socket_id=%p", conn, conn->socket_id); // Remove from uasync if registered if (conn->socket_id) { DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] Removing socket from uasync, instance=%p, ua=%p", conn->instance, conn->instance->ua); - uasync_remove_socket(conn->instance->ua, conn->socket_id); + uasync_remove_socket_t(conn->instance->ua, conn->fd); conn->socket_id = NULL; DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] Unregistered socket from uasync"); } - if (conn->fd >= 0) { - close(conn->fd); - DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] Closed fd=%d", conn->fd); + if (conn->fd != SOCKET_INVALID) { + socket_close_wrapper(conn->fd); + DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] Closed socket"); } for (size_t i = 0; i < conn->num_channels; i++) { @@ -523,9 +508,10 @@ int etcp_encrypt_send(struct ETCP_DGRAM* dgram) { // DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] Sending packet to %s:%d, size=%zd", addr_str, ntohs(sin->sin_port), enc_buf_len + dgram->noencrypt_len); } - ssize_t sent = sendto(dgram->link->conn->fd, enc_buf, enc_buf_len + dgram->noencrypt_len, 0, (struct sockaddr*)addr, addr_len); + ssize_t sent = socket_sendto(dgram->link->conn->fd, enc_buf, enc_buf_len + dgram->noencrypt_len, + (struct sockaddr*)addr, addr_len); if (sent < 0) { - DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "sendto failed, errno=%d", errno); + DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "sendto failed, sock_err=%d", socket_get_error()); dgram->link->send_errors++; errcode=3; goto es_err; } else { // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "sendto succeeded, sent=%zd bytes to port %d", sent, ntohs(((struct sockaddr_in*)addr)->sin_port)); @@ -537,7 +523,7 @@ es_err: return -1; } -static void etcp_connections_read_callback(int fd, void* arg) { +static void etcp_connections_read_callback_socket(socket_t sock, void* arg) { DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, ""); // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_connections_read_callback fd=%d, socket=%p", fd, arg); // !!!!!! DANGER: в этой функции ПРЕДЕЛЬНАЯ АККУРАТНОСТЬ. Если кажется что не туда указатель то невнимательно аланизировал !!!!! @@ -563,10 +549,10 @@ static void etcp_connections_read_callback(int fd, void* arg) { uint8_t data[PACKET_DATA_SIZE]; socklen_t addr_len=sizeof(addr); memset(&addr, 0, sizeof(addr)); - ssize_t recv_len = recvfrom(fd, data, PACKET_DATA_SIZE, 0, (struct sockaddr*)&addr, &addr_len); + ssize_t recv_len = socket_recvfrom(sock, data, PACKET_DATA_SIZE, (struct sockaddr*)&addr, &addr_len); if (recv_len <= 0) { - DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_connections_read_callback: recvfrom failed, error=%zd, errno=%d", recv_len, errno); + DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_connections_read_callback: recvfrom failed, error=%zd, sock_err=%d", recv_len, socket_get_error()); return; } diff --git a/src/etcp_connections.h b/src/etcp_connections.h index 458eaf9..7e7eb48 100644 --- a/src/etcp_connections.h +++ b/src/etcp_connections.h @@ -5,8 +5,8 @@ #include "secure_channel.h" #include "utun_instance.h" +#include "../lib/socket_compat.h" #include -#include #define PACKET_DATA_SIZE 1536 @@ -29,7 +29,7 @@ struct ETCP_DGRAM {// пакет (незашифрованный) struct ETCP_SOCKET { struct ETCP_SOCKET* next; // Linked list для всех соединений struct UTUN_INSTANCE* instance; - int fd; // Файловый дескриптор UDP сокета + socket_t fd; // UDP socket (cross-platform) struct sockaddr_storage local_addr; // Локальный адрес // для входящих подключений (links) - массив упорядоченный по ip_port_hash