Browse Source

Add cross-platform socket compatibility layer for Windows support

- Add socket_compat.h/c with platform abstraction for POSIX/Windows
- socket_t typedef: int on POSIX, SOCKET on Windows
- Add socket_platform_init/cleanup for WSAStartup on Windows
- Add socket operations: create_udp, sendto, recvfrom, set_nonblocking
- Add socket options: set_buffers, set_reuseaddr, set_mark (Linux), bind_to_device (Linux)
- Update u_async: add socket_t support with uasync_add_socket_t/remove_socket_t
- Update ETCP: use socket_t and socket_compat functions
- Add DEBUG_CATEGORY_SOCKET for socket debugging
- All 22 tests pass on Linux
- MSYS2 UCRT64 compatible
nodeinfo-routing-update
Evgeny 2 months ago
parent
commit
3f1d4077e9
  1. 4
      lib/Makefile.am
  2. 1
      lib/debug_config.h
  3. 219
      lib/socket_compat.c
  4. 72
      lib/socket_compat.h
  5. 231
      lib/u_async.c
  6. 13
      lib/u_async.h
  7. 94
      src/etcp_connections.c
  8. 4
      src/etcp_connections.h

4
lib/Makefile.am

@ -12,7 +12,9 @@ libuasync_a_SOURCES = \
memory_pool.c \
memory_pool.h \
sha256.c \
sha256.h
sha256.h \
socket_compat.c \
socket_compat.h
libuasync_a_CFLAGS = \
-D_ISOC99_SOURCE \

1
lib/debug_config.h

@ -43,6 +43,7 @@ typedef uint64_t debug_category_t;
#define DEBUG_CATEGORY_TIMERS ((debug_category_t)1 << 10) // timer management
#define DEBUG_CATEGORY_NORMALIZER ((debug_category_t)1 << 11) // packet normalizer
#define DEBUG_CATEGORY_BGP ((debug_category_t)1 << 12) // BGP route exchange
#define DEBUG_CATEGORY_SOCKET ((debug_category_t)1 << 13) // Socket operations
#define DEBUG_CATEGORY_ALL ((debug_category_t)0xFFFFFFFFFFFFFFFFULL)
/* Debug configuration structure */

219
lib/socket_compat.c

@ -0,0 +1,219 @@
/**
* Socket compatibility layer implementation
*/
#include "socket_compat.h"
#include "debug_config.h"
#ifdef _WIN32
// ==================== Windows Implementation ====================
int socket_platform_init(void) {
WSADATA wsaData;
int result = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (result != 0) {
DEBUG_ERROR(DEBUG_CATEGORY_SOCKET, "WSAStartup failed: %d", result);
return -1;
}
DEBUG_INFO(DEBUG_CATEGORY_SOCKET, "Winsock 2.2 initialized");
return 0;
}
void socket_platform_cleanup(void) {
WSACleanup();
DEBUG_INFO(DEBUG_CATEGORY_SOCKET, "Winsock cleanup completed");
}
int socket_set_nonblocking(socket_t sock) {
u_long mode = 1;
if (ioctlsocket(sock, FIONBIO, &mode) != 0) {
DEBUG_ERROR(DEBUG_CATEGORY_SOCKET,
"ioctlsocket(FIONBIO) failed: %d", socket_get_error());
return -1;
}
return 0;
}
int socket_close_wrapper(socket_t sock) {
return closesocket(sock);
}
const char* socket_strerror(int err) {
static char buf[256];
FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
NULL, err, MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
buf, sizeof(buf), NULL);
return buf;
}
#else
// ==================== POSIX Implementation ====================
int socket_platform_init(void) {
// No initialization needed on POSIX
DEBUG_INFO(DEBUG_CATEGORY_SOCKET, "POSIX socket subsystem ready");
return 0;
}
void socket_platform_cleanup(void) {
// No cleanup needed on POSIX
DEBUG_INFO(DEBUG_CATEGORY_SOCKET, "POSIX socket cleanup completed");
}
int socket_set_nonblocking(socket_t sock) {
int flags = fcntl(sock, F_GETFL, 0);
if (flags < 0) {
DEBUG_ERROR(DEBUG_CATEGORY_SOCKET,
"fcntl(F_GETFL) failed: %s", strerror(errno));
return -1;
}
if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) < 0) {
DEBUG_ERROR(DEBUG_CATEGORY_SOCKET,
"fcntl(F_SETFL) failed: %s", strerror(errno));
return -1;
}
return 0;
}
int socket_close_wrapper(socket_t sock) {
return close(sock);
}
const char* socket_strerror(int err) {
return strerror(err);
}
#endif
// ==================== Universal Implementations ====================
socket_t socket_create_udp(int family) {
socket_t sock = socket(family, SOCK_DGRAM, IPPROTO_UDP);
if (sock == SOCKET_INVALID) {
DEBUG_ERROR(DEBUG_CATEGORY_SOCKET,
"socket(SOCK_DGRAM) failed: %s", socket_strerror(socket_get_error()));
return SOCKET_INVALID;
}
DEBUG_DEBUG(DEBUG_CATEGORY_SOCKET, "Created UDP socket: %ld", (long)sock);
return sock;
}
int socket_set_buffers(socket_t sock, int sndbuf, int rcvbuf) {
int err = 0;
#ifdef _WIN32
// Windows uses char* for optval
if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF,
(const char*)&sndbuf, sizeof(sndbuf)) != 0) {
DEBUG_WARN(DEBUG_CATEGORY_SOCKET,
"SO_SNDBUF failed: %s", socket_strerror(socket_get_error()));
err = -1;
}
if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF,
(const char*)&rcvbuf, sizeof(rcvbuf)) != 0) {
DEBUG_WARN(DEBUG_CATEGORY_SOCKET,
"SO_RCVBUF failed: %s", socket_strerror(socket_get_error()));
err = -1;
}
#else
if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(sndbuf)) < 0) {
DEBUG_WARN(DEBUG_CATEGORY_SOCKET,
"SO_SNDBUF failed: %s", strerror(errno));
err = -1;
}
if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)) < 0) {
DEBUG_WARN(DEBUG_CATEGORY_SOCKET,
"SO_RCVBUF failed: %s", strerror(errno));
err = -1;
}
#endif
return err;
}
int socket_set_reuseaddr(socket_t sock, int reuse) {
#ifdef _WIN32
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
(const char*)&reuse, sizeof(reuse)) != 0) {
DEBUG_WARN(DEBUG_CATEGORY_SOCKET,
"SO_REUSEADDR failed: %s", socket_strerror(socket_get_error()));
return -1;
}
#else
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0) {
DEBUG_WARN(DEBUG_CATEGORY_SOCKET,
"SO_REUSEADDR failed: %s", strerror(errno));
return -1;
}
#endif
return 0;
}
int socket_bind_to_device(socket_t sock, const char* ifname) {
#ifndef _WIN32
#ifdef SO_BINDTODEVICE
if (setsockopt(sock, SOL_SOCKET, SO_BINDTODEVICE, ifname, strlen(ifname)) < 0) {
DEBUG_WARN(DEBUG_CATEGORY_SOCKET,
"SO_BINDTODEVICE failed: %s", strerror(errno));
return -1;
}
return 0;
#else
(void)sock;
(void)ifname;
DEBUG_DEBUG(DEBUG_CATEGORY_SOCKET, "SO_BINDTODEVICE not available");
return -1;
#endif
#else
// Windows doesn't support SO_BINDTODEVICE
(void)sock;
(void)ifname;
DEBUG_DEBUG(DEBUG_CATEGORY_SOCKET, "SO_BINDTODEVICE not available on Windows");
return -1;
#endif
}
int socket_set_mark(socket_t sock, int mark) {
#ifndef _WIN32
#ifdef SO_MARK
if (setsockopt(sock, SOL_SOCKET, SO_MARK, &mark, sizeof(mark)) < 0) {
DEBUG_WARN(DEBUG_CATEGORY_SOCKET,
"SO_MARK failed: %s", strerror(errno));
return -1;
}
return 0;
#else
(void)sock;
(void)mark;
DEBUG_DEBUG(DEBUG_CATEGORY_SOCKET, "SO_MARK not available");
return -1;
#endif
#else
// Windows doesn't support SO_MARK
(void)sock;
(void)mark;
DEBUG_DEBUG(DEBUG_CATEGORY_SOCKET, "SO_MARK not available on Windows");
return -1;
#endif
}
ssize_t socket_sendto(socket_t sock, const void* buf, size_t len,
struct sockaddr* dest, socklen_t dest_len) {
#ifdef _WIN32
return sendto(sock, (const char*)buf, (int)len, 0, dest, dest_len);
#else
return sendto(sock, buf, len, 0, dest, dest_len);
#endif
}
ssize_t socket_recvfrom(socket_t sock, void* buf, size_t len,
struct sockaddr* src, socklen_t* src_len) {
#ifdef _WIN32
int fromlen = src_len ? (int)*src_len : 0;
ssize_t ret = recvfrom(sock, (char*)buf, (int)len, 0, src, src_len ? &fromlen : NULL);
if (src_len) *src_len = (socklen_t)fromlen;
return ret;
#else
return recvfrom(sock, buf, len, 0, src, src_len);
#endif
}

72
lib/socket_compat.h

@ -0,0 +1,72 @@
/**
* Socket compatibility layer for cross-platform support (POSIX / Windows)
* MSYS2 UCRT64 compatible
*/
#ifndef SOCKET_COMPAT_H
#define SOCKET_COMPAT_H
#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
typedef SOCKET socket_t;
#define SOCKET_INVALID INVALID_SOCKET
#define SOCKET_ERROR_CODE SOCKET_ERROR
// Error codes
#define ERR_WOULDBLOCK WSAEWOULDBLOCK
#define ERR_AGAIN WSAEWOULDBLOCK
#define ERR_INTR WSAEINTR
#else
// POSIX systems
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
typedef int socket_t;
#define SOCKET_INVALID (-1)
#define SOCKET_ERROR_CODE (-1)
#define ERR_WOULDBLOCK EWOULDBLOCK
#define ERR_AGAIN EAGAIN
#define ERR_INTR EINTR
#endif
// Platform initialization/cleanup
int socket_platform_init(void);
void socket_platform_cleanup(void);
// Socket operations
socket_t socket_create_udp(int family);
int socket_set_nonblocking(socket_t sock);
int socket_close_wrapper(socket_t sock);
static inline int socket_get_error(void) {
#ifdef _WIN32
return WSAGetLastError();
#else
return errno;
#endif
}
// Socket options
int socket_set_buffers(socket_t sock, int sndbuf, int rcvbuf);
int socket_set_reuseaddr(socket_t sock, int reuse);
int socket_bind_to_device(socket_t sock, const char* ifname);
int socket_set_mark(socket_t sock, int mark);
// I/O operations
ssize_t socket_sendto(socket_t sock, const void* buf, size_t len,
struct sockaddr* dest, socklen_t dest_len);
ssize_t socket_recvfrom(socket_t sock, void* buf, size_t len,
struct sockaddr* src, socklen_t* src_len);
// Utility
const char* socket_strerror(int err);
#endif // SOCKET_COMPAT_H

231
lib/u_async.c

@ -5,12 +5,15 @@
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <poll.h>
#include <limits.h>
#include <fcntl.h>
#ifndef _WIN32
#include <unistd.h>
#include <poll.h>
#endif
// Platform-specific includes
#ifdef __linux__
#include <sys/epoll.h>
@ -32,9 +35,13 @@ struct timeout_node {
// Socket node with array-based storage
struct socket_node {
int fd;
socket_callback_t read_cbk;
socket_callback_t write_cbk;
int fd; // File descriptor (for pipe, file)
socket_t sock; // Socket (for cross-platform sockets)
int type; // SOCKET_NODE_TYPE_FD or SOCKET_NODE_TYPE_SOCK
socket_callback_t read_cbk; // For FD type
socket_callback_t write_cbk; // For FD type
socket_t_callback_t read_cbk_sock; // For SOCK type
socket_t_callback_t write_cbk_sock; // For SOCK type
socket_callback_t except_cbk;
void* user_data;
int active; // 1 if socket is active, 0 if freed (for reuse)
@ -106,7 +113,10 @@ static void socket_array_destroy(struct socket_array* sa) {
free(sa);
}
static int socket_array_add(struct socket_array* sa, int fd, socket_callback_t read_cbk, socket_callback_t write_cbk, socket_callback_t except_cbk, void* user_data) {
static int socket_array_add_internal(struct socket_array* sa, int fd, socket_t sock, int type,
socket_callback_t read_cbk_fd, socket_callback_t write_cbk_fd,
socket_t_callback_t read_cbk_sock, socket_t_callback_t write_cbk_sock,
socket_callback_t except_cbk, void* user_data) {
if (!sa || fd < 0 || fd >= FD_SETSIZE) return -1;
if (fd >= sa->capacity) {
// Need to resize - double the capacity
@ -159,8 +169,12 @@ static int socket_array_add(struct socket_array* sa, int fd, socket_callback_t r
// Add the socket
sa->sockets[index].fd = fd;
sa->sockets[index].read_cbk = read_cbk;
sa->sockets[index].write_cbk = write_cbk;
sa->sockets[index].sock = sock;
sa->sockets[index].type = type;
sa->sockets[index].read_cbk = read_cbk_fd;
sa->sockets[index].write_cbk = write_cbk_fd;
sa->sockets[index].read_cbk_sock = read_cbk_sock;
sa->sockets[index].write_cbk_sock = write_cbk_sock;
sa->sockets[index].except_cbk = except_cbk;
sa->sockets[index].user_data = user_data;
sa->sockets[index].active = 1;
@ -175,6 +189,27 @@ static int socket_array_add(struct socket_array* sa, int fd, socket_callback_t r
return index;
}
// Wrapper for adding regular file descriptors (pipe, file)
static int socket_array_add(struct socket_array* sa, int fd, socket_callback_t read_cbk,
socket_callback_t write_cbk, socket_callback_t except_cbk, void* user_data) {
return socket_array_add_internal(sa, fd, SOCKET_INVALID, SOCKET_NODE_TYPE_FD,
read_cbk, write_cbk, NULL, NULL, except_cbk, user_data);
}
// Wrapper for adding socket_t (cross-platform sockets)
static int socket_array_add_socket_t(struct socket_array* sa, socket_t sock, socket_t_callback_t read_cbk,
socket_t_callback_t write_cbk, socket_callback_t except_cbk, void* user_data) {
// On Windows, SOCKET is UINT_PTR, so we need to handle indexing differently
#ifdef _WIN32
int fd = (int)(intptr_t)sock; // Use socket value as index on Windows (simplified)
#else
int fd = sock; // On POSIX, socket_t is int
#endif
if (fd < 0 || fd >= FD_SETSIZE) return -1;
return socket_array_add_internal(sa, fd, sock, SOCKET_NODE_TYPE_SOCK,
NULL, NULL, read_cbk, write_cbk, except_cbk, user_data);
}
static int socket_array_remove(struct socket_array* sa, int fd) {
if (!sa || fd < 0 || fd >= sa->capacity) return -1;
@ -184,6 +219,8 @@ static int socket_array_remove(struct socket_array* sa, int fd) {
// Mark as inactive
sa->sockets[index].active = 0;
sa->sockets[index].fd = -1;
sa->sockets[index].sock = SOCKET_INVALID;
sa->sockets[index].type = SOCKET_NODE_TYPE_FD;
sa->fd_to_index[fd] = -1;
sa->index_to_fd[index] = -1;
@ -211,6 +248,23 @@ static struct socket_node* socket_array_get(struct socket_array* sa, int fd) {
return &sa->sockets[index];
}
// Get socket_node by socket_t
static struct socket_node* socket_array_get_by_sock(struct socket_array* sa, socket_t sock) {
if (!sa) return NULL;
#ifdef _WIN32
int fd = (int)(intptr_t)sock;
#else
int fd = sock;
#endif
if (fd < 0 || fd >= sa->capacity) return NULL;
int index = sa->fd_to_index[fd];
if (index == -1 || !sa->sockets[index].active) return NULL;
if (sa->sockets[index].type != SOCKET_NODE_TYPE_SOCK) return NULL;
return &sa->sockets[index];
}
// Callback to free timeout node and update counters
static void timeout_node_free_callback(void* user_data, void* data) {
struct UASYNC* ua = (struct UASYNC*)user_data;
@ -439,6 +493,74 @@ err_t uasync_remove_socket(struct UASYNC* ua, void* s_id) {
return ERR_FAIL;
}
// Add socket_t (cross-platform socket)
void* uasync_add_socket_t(struct UASYNC* ua, socket_t sock, socket_t_callback_t read_cbk,
socket_t_callback_t write_cbk, socket_t_callback_t except_cbk, void* user_data) {
if (!ua || sock == SOCKET_INVALID) return NULL;
int index = socket_array_add_socket_t(ua->sockets, sock, read_cbk, write_cbk,
(socket_callback_t)except_cbk, user_data);
if (index < 0) return NULL;
ua->socket_alloc_count++;
ua->poll_fds_dirty = 1;
#if HAS_EPOLL
if (ua->use_epoll && ua->epoll_fd >= 0) {
struct epoll_event ev;
ev.events = 0;
if (read_cbk) ev.events |= EPOLLIN;
if (write_cbk) ev.events |= EPOLLOUT;
ev.data.ptr = &ua->sockets->sockets[index];
// On Windows, need to cast socket_t to int for epoll_ctl
#ifdef _WIN32
int fd = (int)(intptr_t)sock;
#else
int fd = sock;
#endif
if (epoll_ctl(ua->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) {
socket_array_remove(ua->sockets, fd);
ua->socket_alloc_count--;
return NULL;
}
}
#endif
return &ua->sockets->sockets[index];
}
// Remove socket by socket_t
err_t uasync_remove_socket_t(struct UASYNC* ua, socket_t sock) {
if (!ua || sock == SOCKET_INVALID) return ERR_FAIL;
struct socket_node* node = socket_array_get_by_sock(ua->sockets, sock);
if (!node || !node->active) return ERR_FAIL;
#if HAS_EPOLL
if (ua->use_epoll && ua->epoll_fd >= 0) {
#ifdef _WIN32
int fd = (int)(intptr_t)sock;
#else
int fd = sock;
#endif
epoll_ctl(ua->epoll_fd, EPOLL_CTL_DEL, fd, NULL);
}
#endif
#ifdef _WIN32
int fd = (int)(intptr_t)sock;
#else
int fd = sock;
#endif
int ret = socket_array_remove(ua->sockets, fd);
if (ret == 0) {
ua->socket_free_count++;
ua->poll_fds_dirty = 1;
return ERR_OK;
}
return ERR_FAIL;
}
// Helper function to rebuild cached pollfd array
static void rebuild_poll_fds(struct UASYNC* ua) {
@ -475,11 +597,28 @@ static void rebuild_poll_fds(struct UASYNC* ua) {
int socket_array_idx = ua->sockets->active_indices[i];
struct socket_node* cur = &ua->sockets->sockets[socket_array_idx];
ua->poll_fds[idx].fd = cur->fd;
// Handle socket_t vs int fd
if (cur->type == SOCKET_NODE_TYPE_SOCK) {
// socket_t - cast to int for pollfd
#ifdef _WIN32
ua->poll_fds[idx].fd = (int)(intptr_t)cur->sock;
#else
ua->poll_fds[idx].fd = cur->sock;
#endif
} else {
// Regular fd
ua->poll_fds[idx].fd = cur->fd;
}
ua->poll_fds[idx].events = 0;
ua->poll_fds[idx].revents = 0;
if (cur->read_cbk) ua->poll_fds[idx].events |= POLLIN;
if (cur->type == SOCKET_NODE_TYPE_SOCK) {
if (cur->read_cbk_sock) ua->poll_fds[idx].events |= POLLIN;
if (cur->write_cbk_sock) ua->poll_fds[idx].events |= POLLOUT;
} else {
if (cur->read_cbk) ua->poll_fds[idx].events |= POLLIN;
if (cur->write_cbk) ua->poll_fds[idx].events |= POLLOUT;
}
if (cur->write_cbk) ua->poll_fds[idx].events |= POLLOUT;
if (cur->except_cbk) ua->poll_fds[idx].events |= POLLPRI;
@ -513,19 +652,29 @@ static void process_epoll_events(struct UASYNC* ua, struct epoll_event* events,
}
}
/* Exceptional data (out-of-band) - epoll doesn't have POLLPRI equivalent */
/* Read readiness */
/* Read readiness - use appropriate callback based on socket type */
if (events[i].events & EPOLLIN) {
if (node->read_cbk) {
node->read_cbk(node->fd, node->user_data);
if (node->type == SOCKET_NODE_TYPE_SOCK) {
if (node->read_cbk_sock) {
node->read_cbk_sock(node->sock, node->user_data);
}
} else {
if (node->read_cbk) {
node->read_cbk(node->fd, node->user_data);
}
}
}
/* Write readiness */
/* Write readiness - use appropriate callback based on socket type */
if (events[i].events & EPOLLOUT) {
if (node->write_cbk) {
node->write_cbk(node->fd, node->user_data);
if (node->type == SOCKET_NODE_TYPE_SOCK) {
if (node->write_cbk_sock) {
node->write_cbk_sock(node->sock, node->user_data);
}
} else {
if (node->write_cbk) {
node->write_cbk(node->fd, node->user_data);
}
}
}
}
@ -657,6 +806,15 @@ void uasync_poll(struct UASYNC* ua, int timeout_tb) {
/* Socket event - lookup by fd */
struct socket_node* node = socket_array_get(ua->sockets, ua->poll_fds[i].fd);
if (!node) { // Try by socket_t (in case this is a socket)
socket_t lookup_sock;
#ifdef _WIN32
lookup_sock = (socket_t)(intptr_t)ua->poll_fds[i].fd;
#else
lookup_sock = ua->poll_fds[i].fd;
#endif
node = socket_array_get_by_sock(ua->sockets, lookup_sock);
}
if (!node) continue; // Socket may have been removed
/* Check for error conditions first */
@ -674,17 +832,29 @@ void uasync_poll(struct UASYNC* ua, int timeout_tb) {
}
}
/* Read readiness */
/* Read readiness - use appropriate callback based on socket type */
if (ua->poll_fds[i].revents & POLLIN) {
if (node->read_cbk) {
node->read_cbk(node->fd, node->user_data);
if (node->type == SOCKET_NODE_TYPE_SOCK) {
if (node->read_cbk_sock) {
node->read_cbk_sock(node->sock, node->user_data);
}
} else {
if (node->read_cbk) {
node->read_cbk(node->fd, node->user_data);
}
}
}
/* Write readiness */
/* Write readiness - use appropriate callback based on socket type */
if (ua->poll_fds[i].revents & POLLOUT) {
if (node->write_cbk) {
node->write_cbk(node->fd, node->user_data);
if (node->type == SOCKET_NODE_TYPE_SOCK) {
if (node->write_cbk_sock) {
node->write_cbk_sock(node->sock, node->user_data);
}
} else {
if (node->write_cbk) {
node->write_cbk(node->fd, node->user_data);
}
}
}
}
@ -699,9 +869,17 @@ void uasync_poll(struct UASYNC* ua, int timeout_tb) {
// ========== Instance management functions ==========
struct UASYNC* uasync_create(void) {
// Initialize socket platform first (WSAStartup on Windows)
if (socket_platform_init() != 0) {
DEBUG_ERROR(DEBUG_CATEGORY_SOCKET, "Failed to initialize socket platform");
return NULL;
}
struct UASYNC* ua = malloc(sizeof(struct UASYNC));
if (!ua) return NULL;
if (!ua) {
socket_platform_cleanup();
return NULL;
}
memset(ua, 0, sizeof(struct UASYNC));
ua->wakeup_pipe[0] = -1;
@ -907,6 +1085,9 @@ void uasync_destroy(struct UASYNC* ua, int close_fds) {
DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "uasync_destroy: completed successfully for ua=%p", ua);
free(ua);
// Cleanup socket platform (WSACleanup on Windows)
socket_platform_cleanup();
}
void uasync_init_instance(struct UASYNC* ua) {

13
lib/u_async.h

@ -8,9 +8,11 @@
#include <sys/time.h>
#include <stddef.h>
#include "timeout_heap.h"
#include "socket_compat.h"
typedef void (*timeout_callback_t)(void* user_arg);// передаёт user_arg из uasync_set_timeout
typedef void (*socket_callback_t)(int fd, void* user_arg);// передаёт user_arg из uasync_add_socket
typedef void (*socket_callback_t)(int fd, void* user_arg);// передаёт user_arg из uasync_add_socket (for pipe/file)
typedef void (*socket_t_callback_t)(socket_t sock, void* user_arg);// передаёт user_arg из uasync_add_socket_t (for sockets)
// user_arg полезен если нужно передать управляющую структуру. Ее можно выделить в памяти и в ней хранить всё что надо. т.е. при set_timeout передаём и получаем ее в callback-е
@ -19,6 +21,10 @@ typedef int err_t;
#define ERR_OK 0
#define ERR_FAIL -1
// Socket node types
#define SOCKET_NODE_TYPE_FD 0 // Regular file descriptor (pipe, file)
#define SOCKET_NODE_TYPE_SOCK 1 // Socket (socket_t)
// Uasync instance structure
struct UASYNC {
TimeoutHeap* timeout_heap; // Heap for timeout management
@ -54,9 +60,12 @@ void uasync_init_instance(struct UASYNC* ua);
void* uasync_set_timeout(struct UASYNC* ua, int timeout_tb, void* user_arg, timeout_callback_t callback);
err_t uasync_cancel_timeout(struct UASYNC* ua, void* t_id);
// Sockets
// Sockets - for regular file descriptors (pipe, file)
void* uasync_add_socket(struct UASYNC* ua, int fd, socket_callback_t read_cbk, socket_callback_t write_cbk, socket_callback_t except_cbk, void* user_arg);
// Sockets - for socket_t (cross-platform sockets)
void* uasync_add_socket_t(struct UASYNC* ua, socket_t sock, socket_t_callback_t read_cbk, socket_t_callback_t write_cbk, socket_t_callback_t except_cbk, void* user_arg);
err_t uasync_remove_socket(struct UASYNC* ua, void* s_id);
err_t uasync_remove_socket_t(struct UASYNC* ua, socket_t sock);
// Single iteration of event loop with timeout (timebase units)
void uasync_poll(struct UASYNC* ua, int timeout_tb);

94
src/etcp_connections.c

@ -1,9 +1,8 @@
#include "etcp_connections.h"
#include "../lib/socket_compat.h"
#include <arpa/inet.h>
#include <net/if.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <string.h>
#include "route_lib.h"
#include "route_bgp.h"
@ -19,7 +18,7 @@
#include <time.h>
// Forward declaration
static void etcp_connections_read_callback(int fd, void* arg);
static void etcp_connections_read_callback_socket(socket_t sock, void* arg);
struct ETCP_CONN* etcp_connection_create(struct UTUN_INSTANCE* instance);
static void etcp_link_remove_from_connections(struct ETCP_SOCKET* conn, struct ETCP_LINK* link);
@ -86,7 +85,7 @@ static void etcp_link_send_init(struct ETCP_LINK* link) {
struct sockaddr_in* sin = (struct sockaddr_in*)&link->remote_addr;
char addr_str[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &sin->sin_addr, addr_str, INET_ADDRSTRLEN);
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] INIT sending to %s:%d, link=%p, conn_fd=%d", addr_str, ntohs(sin->sin_port), link, link->conn->fd);
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] INIT sending to %s:%d, link=%p", addr_str, ntohs(sin->sin_port), link);
}
etcp_encrypt_send(dgram);
@ -275,6 +274,7 @@ struct ETCP_SOCKET* etcp_socket_add(struct UTUN_INSTANCE* instance, struct socka
DEBUG_ERROR(DEBUG_CATEGORY_MEMORY, "Failed to allocate connection");
return NULL;
}
e_sock->fd = SOCKET_INVALID; // Initialize to invalid socket
int family = AF_INET;
if (ip) {
@ -286,68 +286,53 @@ struct ETCP_SOCKET* etcp_socket_add(struct UTUN_INSTANCE* instance, struct socka
}
}
e_sock->fd = socket(family, SOCK_DGRAM, 0);
if (e_sock->fd < 0) {
DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "Failed to create socket: %s", strerror(errno));
e_sock->fd = socket_create_udp(family);
if (e_sock->fd == SOCKET_INVALID) {
DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "Failed to create socket: %s",
socket_strerror(socket_get_error()));
free(e_sock);
return NULL;
}
int reuse = 0;// Строго не используем reuseaddr, даже в тестах!
if (setsockopt(e_sock->fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0) {
DEBUG_WARN(DEBUG_CATEGORY_CONNECTION, "Failed to set SO_REUSEADDR: %s", strerror(errno));
}
// Строго не используем reuseaddr, даже в тестах!
socket_set_reuseaddr(e_sock->fd, 0);
// Increase socket buffers for high throughput
int sndbuf = 4 * 1024 * 1024; // 4MB send buffer
int rcvbuf = 4 * 1024 * 1024; // 4MB receive buffer
if (setsockopt(e_sock->fd, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(sndbuf)) < 0) {
DEBUG_WARN(DEBUG_CATEGORY_CONNECTION, "Failed to set SO_SNDBUF: %s", strerror(errno));
}
if (setsockopt(e_sock->fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)) < 0) {
DEBUG_WARN(DEBUG_CATEGORY_CONNECTION, "Failed to set SO_RCVBUF: %s", strerror(errno));
}
socket_set_buffers(e_sock->fd, 4 * 1024 * 1024, 4 * 1024 * 1024);
int flags = fcntl(e_sock->fd, F_GETFL, 0);
fcntl(e_sock->fd, F_SETFL, flags | O_NONBLOCK);
if (socket_set_nonblocking(e_sock->fd) != 0) {
DEBUG_WARN(DEBUG_CATEGORY_CONNECTION, "Failed to set non-blocking mode");
}
// Set socket mark if specified
// Set socket mark if specified (Linux only)
if (so_mark > 0) {
#ifdef SO_MARK
if (setsockopt(e_sock->fd, SOL_SOCKET, SO_MARK, &so_mark, sizeof(so_mark)) < 0) {
DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "Failed to set SO_MARK: %s", strerror(errno));
}
#endif
socket_set_mark(e_sock->fd, so_mark);
}
// Bind to interface if specified
// Bind to interface if specified (Linux only)
if (netif_index > 0) {
#ifdef SO_BINDTODEVICE
char ifname[IF_NAMESIZE];
if (if_indextoname(netif_index, ifname)) {
if (setsockopt(e_sock->fd, SOL_SOCKET, SO_BINDTODEVICE, ifname, strlen(ifname)) < 0) {
DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "Failed to bind to interface %s: %s", ifname, strerror(errno));
}
socket_bind_to_device(e_sock->fd, ifname);
}
#endif
}
// Store the local address and bind socket if provided
if (ip) {
memcpy(&e_sock->local_addr, ip, sizeof(struct sockaddr_storage));
// CRITICAL: Actually bind the socket to the address - this was missing!
// CRITICAL: Actually bind the socket to the address
socklen_t addr_len = (ip->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6);
if (bind(e_sock->fd, (struct sockaddr*)ip, addr_len) < 0) {
perror("bind");
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[ETCP] Failed to bind socket to address family %d", ip->ss_family);
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[ETCP] Failed to bind socket to address family %d: %s",
ip->ss_family, socket_strerror(socket_get_error()));
if (ip->ss_family == AF_INET) {
struct sockaddr_in* sin = (struct sockaddr_in*)ip;
char addr_str[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &sin->sin_addr, addr_str, INET_ADDRSTRLEN);
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[ETCP] Failed to bind to %s:%d", addr_str, ntohs(sin->sin_port));
}
close(e_sock->fd);
socket_close_wrapper(e_sock->fd);
free(e_sock);
return NULL;
}
@ -364,19 +349,19 @@ struct ETCP_SOCKET* etcp_socket_add(struct UTUN_INSTANCE* instance, struct socka
instance->etcp_sockets = e_sock;
// Register socket with uasync for receiving packets
e_sock->socket_id = uasync_add_socket(instance->ua, e_sock->fd,
etcp_connections_read_callback,
NULL, NULL, e_sock);
e_sock->socket_id = uasync_add_socket_t(instance->ua, e_sock->fd,
etcp_connections_read_callback_socket,
NULL, NULL, e_sock);
if (!e_sock->socket_id) {
DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "Failed to register socket with uasync (fd=%d)", e_sock->fd);
close(e_sock->fd);
DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "Failed to register socket with uasync");
socket_close_wrapper(e_sock->fd);
free(e_sock);
return NULL;
}
DEBUG_INFO(DEBUG_CATEGORY_CONNECTION, "Registered ETCP socket with uasync (fd=%d)", e_sock->fd);
DEBUG_INFO(DEBUG_CATEGORY_CONNECTION, "Registered ETCP socket with uasync");
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] Socket %p (fd=%d) registered and active", e_sock, e_sock->fd);
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] Socket %p registered and active", e_sock);
return e_sock;
}
@ -385,19 +370,19 @@ void etcp_socket_remove(struct ETCP_SOCKET* conn) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!conn) return;
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] Removing socket %p, fd=%d, socket_id=%p", conn, conn->fd, conn->socket_id);
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] Removing socket %p, socket_id=%p", conn, conn->socket_id);
// Remove from uasync if registered
if (conn->socket_id) {
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] Removing socket from uasync, instance=%p, ua=%p", conn->instance, conn->instance->ua);
uasync_remove_socket(conn->instance->ua, conn->socket_id);
uasync_remove_socket_t(conn->instance->ua, conn->fd);
conn->socket_id = NULL;
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] Unregistered socket from uasync");
}
if (conn->fd >= 0) {
close(conn->fd);
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] Closed fd=%d", conn->fd);
if (conn->fd != SOCKET_INVALID) {
socket_close_wrapper(conn->fd);
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] Closed socket");
}
for (size_t i = 0; i < conn->num_channels; i++) {
@ -523,9 +508,10 @@ int etcp_encrypt_send(struct ETCP_DGRAM* dgram) {
// DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] Sending packet to %s:%d, size=%zd", addr_str, ntohs(sin->sin_port), enc_buf_len + dgram->noencrypt_len);
}
ssize_t sent = sendto(dgram->link->conn->fd, enc_buf, enc_buf_len + dgram->noencrypt_len, 0, (struct sockaddr*)addr, addr_len);
ssize_t sent = socket_sendto(dgram->link->conn->fd, enc_buf, enc_buf_len + dgram->noencrypt_len,
(struct sockaddr*)addr, addr_len);
if (sent < 0) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "sendto failed, errno=%d", errno);
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "sendto failed, sock_err=%d", socket_get_error());
dgram->link->send_errors++; errcode=3; goto es_err;
} else {
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "sendto succeeded, sent=%zd bytes to port %d", sent, ntohs(((struct sockaddr_in*)addr)->sin_port));
@ -537,7 +523,7 @@ es_err:
return -1;
}
static void etcp_connections_read_callback(int fd, void* arg) {
static void etcp_connections_read_callback_socket(socket_t sock, void* arg) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_connections_read_callback fd=%d, socket=%p", fd, arg);
// !!!!!! DANGER: в этой функции ПРЕДЕЛЬНАЯ АККУРАТНОСТЬ. Если кажется что не туда указатель то невнимательно аланизировал !!!!!
@ -563,10 +549,10 @@ static void etcp_connections_read_callback(int fd, void* arg) {
uint8_t data[PACKET_DATA_SIZE];
socklen_t addr_len=sizeof(addr);
memset(&addr, 0, sizeof(addr));
ssize_t recv_len = recvfrom(fd, data, PACKET_DATA_SIZE, 0, (struct sockaddr*)&addr, &addr_len);
ssize_t recv_len = socket_recvfrom(sock, data, PACKET_DATA_SIZE, (struct sockaddr*)&addr, &addr_len);
if (recv_len <= 0) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_connections_read_callback: recvfrom failed, error=%zd, errno=%d", recv_len, errno);
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_connections_read_callback: recvfrom failed, error=%zd, sock_err=%d", recv_len, socket_get_error());
return;
}

4
src/etcp_connections.h

@ -5,8 +5,8 @@
#include "secure_channel.h"
#include "utun_instance.h"
#include "../lib/socket_compat.h"
#include <stdint.h>
#include <sys/socket.h>
#define PACKET_DATA_SIZE 1536
@ -29,7 +29,7 @@ struct ETCP_DGRAM {// пакет (незашифрованный)
struct ETCP_SOCKET {
struct ETCP_SOCKET* next; // Linked list для всех соединений
struct UTUN_INSTANCE* instance;
int fd; // Файловый дескриптор UDP сокета
socket_t fd; // UDP socket (cross-platform)
struct sockaddr_storage local_addr; // Локальный адрес
// для входящих подключений (links) - массив упорядоченный по ip_port_hash

Loading…
Cancel
Save