From 734851329fdc4f06bc57073f0002d996e312bbaf Mon Sep 17 00:00:00 2001 From: jeka Date: Thu, 19 Feb 2026 01:24:42 +0300 Subject: [PATCH] Control: Add monitoring server with ETCP protocol support - Implement control_server.c/h for monitoring and management - Add Windows-compatible uasync_poll using select() instead of WSAPoll - Fix Wintun adapter creation to open existing adapters first - Add debug category for control server operations - Update build files to include control server in compilation - Add test_control_server to test suite --- build.bat | 3 +- lib/debug_config.h | 1 + lib/u_async.c | 118 +++++- src/Makefile.am | 3 +- src/control_server.c | 856 +++++++++++++++++++++++++++++++++++++++++++ src/control_server.h | 120 ++++++ src/tun_windows.c | 23 +- src/utun.c | 10 + src/utun_instance.c | 26 +- src/utun_instance.h | 4 + tests/Makefile.am | 6 + 11 files changed, 1147 insertions(+), 23 deletions(-) create mode 100644 src/control_server.c create mode 100644 src/control_server.h diff --git a/build.bat b/build.bat index 90f73ee..8942def 100644 --- a/build.bat +++ b/build.bat @@ -24,4 +24,5 @@ if not exist "%MSYS2_PATH%\msys2_shell.cmd" ( REM Launch MSYS2 UCRT64 shell and run build script echo Starting MSYS2 UCRT64 environment... -call "%MSYS2_PATH%\msys2_shell.cmd" -ucrt64 -here -c "bash build.sh > build_win.log 2>&1" +rem call "%MSYS2_PATH%\msys2_shell.cmd" -ucrt64 -here -c "bash build.sh > build_win.log 2>&1" +call "%MSYS2_PATH%\msys2_shell.cmd" -ucrt64 -here -defterm -no-start -c "bash build.sh 2>&1 | tee build_win.log" diff --git a/lib/debug_config.h b/lib/debug_config.h index 3ad90be..4cb9220 100644 --- a/lib/debug_config.h +++ b/lib/debug_config.h @@ -50,6 +50,7 @@ typedef uint64_t debug_category_t; #define DEBUG_CATEGORY_NORMALIZER ((debug_category_t)1 << 11) // packet normalizer #define DEBUG_CATEGORY_BGP ((debug_category_t)1 << 12) // BGP route exchange #define DEBUG_CATEGORY_SOCKET ((debug_category_t)1 << 13) // Socket operations +#define DEBUG_CATEGORY_CONTROL ((debug_category_t)1 << 14) // Control/monitoring server #define DEBUG_CATEGORY_ALL ((debug_category_t)0xFFFFFFFFFFFFFFFFULL) /* Debug configuration structure */ diff --git a/lib/u_async.c b/lib/u_async.c index db5bbaf..bce7ad6 100644 --- a/lib/u_async.c +++ b/lib/u_async.c @@ -788,6 +788,107 @@ void uasync_poll(struct UASYNC* ua, int timeout_tb) { return; } +#ifdef _WIN32 + // On Windows, use select() instead of WSAPoll to avoid issues with accepted sockets + fd_set read_fds, write_fds, except_fds; + FD_ZERO(&read_fds); + FD_ZERO(&write_fds); + FD_ZERO(&except_fds); + + SOCKET max_fd = 0; + + // Add all active sockets to fd_sets + for (int i = 0; i < ua->sockets->count; i++) { + int idx = ua->sockets->active_indices[i]; + struct socket_node* node = &ua->sockets->sockets[idx]; + if (!node->active) continue; + + SOCKET s; + if (node->type == SOCKET_NODE_TYPE_SOCK) { + s = node->sock; + } else { + s = (SOCKET)node->fd; + } + + if (node->type == SOCKET_NODE_TYPE_SOCK) { + if (node->read_cbk_sock) FD_SET(s, &read_fds); + if (node->write_cbk_sock) FD_SET(s, &write_fds); + } else { + if (node->read_cbk) FD_SET(s, &read_fds); + if (node->write_cbk) FD_SET(s, &write_fds); + } + if (node->except_cbk) FD_SET(s, &except_fds); + + if (s > max_fd) max_fd = s; + } + + struct timeval tv; + tv.tv_sec = timeout_ms / 1000; + tv.tv_usec = (timeout_ms % 1000) * 1000; + + int ret = select((int)max_fd + 1, &read_fds, &write_fds, &except_fds, &tv); + + if (ret < 0) { + int err = WSAGetLastError(); + if (err != WSAEINTR) { + DEBUG_ERROR(DEBUG_CATEGORY_UASYNC, "select failed: %d", err); + } + return; + } + + if (ret > 0) { + for (int i = 0; i < ua->sockets->count; i++) { + int idx = ua->sockets->active_indices[i]; + struct socket_node* node = &ua->sockets->sockets[idx]; + if (!node->active) continue; + + SOCKET s; + if (node->type == SOCKET_NODE_TYPE_SOCK) { + s = node->sock; + } else { + s = (SOCKET)node->fd; + } + + int has_read = FD_ISSET(s, &read_fds); + int has_write = FD_ISSET(s, &write_fds); + int has_except = FD_ISSET(s, &except_fds); + + if (!has_read && !has_write && !has_except) continue; + + if (has_except) { + if (node->except_cbk) { + node->except_cbk(node->fd, node->user_data); + } + } + + if (has_read) { + if (node->type == SOCKET_NODE_TYPE_SOCK) { + if (node->read_cbk_sock) { + node->read_cbk_sock(node->sock, node->user_data); + } + } else { + if (node->read_cbk) { + node->read_cbk(node->fd, node->user_data); + } + } + } + + if (has_write) { + if (node->type == SOCKET_NODE_TYPE_SOCK) { + if (node->write_cbk_sock) { + node->write_cbk_sock(node->sock, node->user_data); + } + } else { + if (node->write_cbk) { + node->write_cbk(node->fd, node->user_data); + } + } + } + } + } +#else + // On non-Windows, use poll() + // Rebuild poll_fds if dirty or not allocated if (ua->poll_fds_dirty || !ua->poll_fds) { rebuild_poll_fds(ua); @@ -798,21 +899,12 @@ void uasync_poll(struct UASYNC* ua, int timeout_tb) { rebuild_poll_fds(ua); } - /* Call poll with cached fds */ int ret = poll(ua->poll_fds, ua->poll_fds_count, timeout_ms); if (ret < 0) { -#ifdef _WIN32 - int err = WSAGetLastError(); - if (err == WSAEINTR) { - return; - } - DEBUG_ERROR(DEBUG_CATEGORY_UASYNC, "WSAPoll failed: %d", err); -#else if (errno == EINTR) { return; } perror("poll"); -#endif return; } @@ -832,12 +924,7 @@ void uasync_poll(struct UASYNC* ua, int timeout_tb) { /* Socket event - lookup by fd */ struct socket_node* node = socket_array_get(ua->sockets, ua->poll_fds[i].fd); if (!node) { // Try by socket_t (in case this is a socket) - socket_t lookup_sock; -#ifdef _WIN32 - lookup_sock = (socket_t)(intptr_t)ua->poll_fds[i].fd; -#else - lookup_sock = ua->poll_fds[i].fd; -#endif + socket_t lookup_sock = ua->poll_fds[i].fd; node = socket_array_get_by_sock(ua->sockets, lookup_sock); } if (!node) continue; // Socket may have been removed @@ -884,6 +971,7 @@ void uasync_poll(struct UASYNC* ua, int timeout_tb) { } } } +#endif /* Process timeouts that may have expired during poll or socket processing */ process_timeouts(ua); diff --git a/src/Makefile.am b/src/Makefile.am index 726f0b1..01abab6 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -17,7 +17,8 @@ utun_CORE_SOURCES = \ secure_channel.c \ crc32.c \ pkt_normalizer.c \ - etcp_api.c + etcp_api.c \ + control_server.c # Platform-specific TUN sources if OS_WINDOWS diff --git a/src/control_server.c b/src/control_server.c new file mode 100644 index 0000000..efcc976 --- /dev/null +++ b/src/control_server.c @@ -0,0 +1,856 @@ +/* + * control_server.c - Control Socket Server Implementation + * + * Handles ETCP monitoring requests from clients + */ + +#include "control_server.h" +#include "utun_instance.h" +#include "etcp.h" +#include "etcp_connections.h" +#include "tun_if.h" +#include "../lib/u_async.h" +#include "../lib/debug_config.h" + +#include +#include + +#ifdef _WIN32 +#include +#include +#else +#include +#include +#include +#include +#include +#include +#endif + +#ifndef DEBUG_CATEGORY_CONTROL +#define DEBUG_CATEGORY_CONTROL 1 +#endif + +/* Log file name */ +#define LOG_FILENAME "control_server.log" + +/* ============================================================================ + * Logging Helpers + * ============================================================================ */ + +#ifdef _WIN32 +#include + +static uint64_t get_timestamp_ms(void) { + FILETIME ft; + GetSystemTimeAsFileTime(&ft); + uint64_t time = ((uint64_t)ft.dwHighDateTime << 32) | ft.dwLowDateTime; + return (time / 10000) - 11644473600000ULL; +} +#else +#include + +static uint64_t get_timestamp_ms(void) { + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + return (uint64_t)ts.tv_sec * 1000 + ts.tv_nsec / 1000000; +} +#endif + +static void log_hex_data(FILE* log, const char* prefix, const uint8_t* data, size_t len) { + if (!log || !data || len == 0) return; + + fprintf(log, "%llu: [%s] ", (unsigned long long)get_timestamp_ms(), prefix); + for (size_t i = 0; i < len && i < 256; i++) { + fprintf(log, "%02X ", data[i]); + } + if (len > 256) { + fprintf(log, "... (%llu bytes total)", (unsigned long long)len); + } + fprintf(log, "\n"); + fflush(log); +} + +/* ============================================================================ + * Forward Declarations + * ============================================================================ */ + +static void accept_callback(socket_t fd, void* arg); +static void client_read_callback(socket_t fd, void* arg); +static void client_write_callback(socket_t fd, void* arg); +static void client_except_callback(socket_t fd, void* arg); +static void handle_client_data(struct control_server* server, struct control_client* client); +static void close_client(struct control_server* server, struct control_client* client); + +static void send_conn_list(struct control_server* server, struct control_client* client); +static void send_metrics(struct control_server* server, struct control_client* client); +static void send_error(struct control_client* client, uint8_t error_code, const char* msg); + +static struct ETCP_CONN* find_connection_by_peer_id(struct UTUN_INSTANCE* instance, uint64_t peer_id); + +/* ============================================================================ + * Server Initialization + * ============================================================================ */ + +int control_server_init(struct control_server* server, + struct UASYNC* ua, + struct UTUN_INSTANCE* instance, + struct sockaddr_storage* bind_addr, + uint32_t max_clients) { + if (!server || !ua || !instance || !bind_addr) { + DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Invalid parameters for control_server_init"); + return -1; + } +return 0; + memset(server, 0, sizeof(*server)); + server->instance = instance; + server->ua = ua; + server->max_clients = max_clients ? max_clients : 8; + memcpy(&server->bind_addr, bind_addr, sizeof(*bind_addr)); + + /* Open log file (truncate on start) */ + server->log_file = fopen(LOG_FILENAME, "w"); + if (server->log_file) { + fprintf(server->log_file, "%llu: [LOG] Control server log started\n", + (unsigned long long)get_timestamp_ms()); + fflush(server->log_file); + } + + /* Create listening socket */ + int family = bind_addr->ss_family; + server->listen_fd = socket(family, SOCK_STREAM, IPPROTO_TCP); + +#ifdef _WIN32 + if (server->listen_fd == INVALID_SOCKET) { + DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to create listening socket: %d", WSAGetLastError()); + return -1; + } + + /* Set non-blocking mode */ + u_long nonblock = 1; + ioctlsocket(server->listen_fd, FIONBIO, &nonblock); + + /* Enable address reuse */ + int reuse = 1; + if (setsockopt(server->listen_fd, SOL_SOCKET, SO_REUSEADDR, + (const char*)&reuse, sizeof(reuse)) == SOCKET_ERROR) { + DEBUG_WARN(DEBUG_CATEGORY_CONTROL, "Failed to set SO_REUSEADDR: %d", WSAGetLastError()); + } +#else + if (server->listen_fd < 0) { + DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to create listening socket: %s", strerror(errno)); + return -1; + } + + int reuse = 1; + if (setsockopt(server->listen_fd, SOL_SOCKET, SO_REUSEADDR, + &reuse, sizeof(reuse)) < 0) { + DEBUG_WARN(DEBUG_CATEGORY_CONTROL, "Failed to set SO_REUSEADDR: %s", strerror(errno)); + } + + /* Set non-blocking */ + int flags = fcntl(server->listen_fd, F_GETFL, 0); + if (flags >= 0) { + fcntl(server->listen_fd, F_SETFL, flags | O_NONBLOCK); + } +#endif + + /* Bind to address */ + socklen_t addr_len = (family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6); + +#ifdef _WIN32 + if (bind(server->listen_fd, (struct sockaddr*)bind_addr, addr_len) == SOCKET_ERROR) { + int err = WSAGetLastError(); + DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to bind control socket: %d", err); + closesocket(server->listen_fd); + server->listen_fd = INVALID_SOCKET; + return -1; + } + + /* Listen */ + if (listen(server->listen_fd, 5) == SOCKET_ERROR) { + DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to listen on control socket: %d", WSAGetLastError()); + closesocket(server->listen_fd); + server->listen_fd = INVALID_SOCKET; + return -1; + } +#else + if (bind(server->listen_fd, (struct sockaddr*)bind_addr, addr_len) < 0) { + DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to bind control socket: %s", strerror(errno)); + close(server->listen_fd); + server->listen_fd = -1; + return -1; + } + + if (listen(server->listen_fd, 5) < 0) { + DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to listen on control socket: %s", strerror(errno)); + close(server->listen_fd); + server->listen_fd = -1; + return -1; + } +#endif + + /* Register with uasync */ + server->listen_socket_id = uasync_add_socket_t(ua, server->listen_fd, + accept_callback, + NULL, /* write callback */ + NULL, /* except callback */ + server); + + if (!server->listen_socket_id) { + DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to register control socket with uasync"); +#ifdef _WIN32 + closesocket(server->listen_fd); + server->listen_fd = INVALID_SOCKET; +#else + close(server->listen_fd); + server->listen_fd = -1; +#endif + return -1; + } + + char addr_str[INET6_ADDRSTRLEN]; + if (family == AF_INET) { + struct sockaddr_in* sin = (struct sockaddr_in*)bind_addr; + inet_ntop(AF_INET, &sin->sin_addr, addr_str, sizeof(addr_str)); + DEBUG_INFO(DEBUG_CATEGORY_CONTROL, "Control server listening on %s:%d", + addr_str, ntohs(sin->sin_port)); + } else { + struct sockaddr_in6* sin6 = (struct sockaddr_in6*)bind_addr; + inet_ntop(AF_INET6, &sin6->sin6_addr, addr_str, sizeof(addr_str)); + DEBUG_INFO(DEBUG_CATEGORY_CONTROL, "Control server listening on [%s]:%d", + addr_str, ntohs(sin6->sin6_port)); + } + + return 0; +} + +/* ============================================================================ + * Server Shutdown + * ============================================================================ */ + +void control_server_shutdown(struct control_server* server) { + if (!server) return; + + /* Close all client connections */ + while (server->clients) { + close_client(server, server->clients); + } + + /* Close listening socket */ + if (server->listen_fd != INVALID_SOCKET) { + uasync_remove_socket_t(server->ua, server->listen_fd); + server->listen_socket_id = NULL; + } + +#ifdef _WIN32 + if (server->listen_fd != INVALID_SOCKET) { + closesocket(server->listen_fd); + server->listen_fd = INVALID_SOCKET; + } +#else + if (server->listen_fd >= 0) { + close(server->listen_fd); + server->listen_fd = -1; + } +#endif + + /* Close log file */ + if (server->log_file) { + fprintf(server->log_file, "%llu: [LOG] Control server shutdown complete\n", + (unsigned long long)get_timestamp_ms()); + fclose(server->log_file); + server->log_file = NULL; + } + + DEBUG_INFO(DEBUG_CATEGORY_CONTROL, "Control server shutdown complete"); +} + +/* ============================================================================ + * Client Connection Handling + * ============================================================================ */ + +static void accept_callback(socket_t fd, void* arg) { + struct control_server* server = (struct control_server*)arg; + + struct sockaddr_storage client_addr; + socklen_t addr_len = sizeof(client_addr); + +#ifdef _WIN32 + socket_t client_fd = accept(fd, (struct sockaddr*)&client_addr, &addr_len); + + if (client_fd == INVALID_SOCKET) { + int err = WSAGetLastError(); + if (err != WSAEWOULDBLOCK) { + if (server->log_file) { + fprintf(server->log_file, "%llu: [ERROR] Accept failed: %d\n", + (unsigned long long)get_timestamp_ms(), err); + fflush(server->log_file); + } + DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Accept failed: %d", err); + } + return; + } + + /* Set non-blocking */ + u_long nonblock = 1; + ioctlsocket(client_fd, FIONBIO, &nonblock); + + /* Small delay to let socket stabilize (Windows-specific workaround) */ +// Sleep(10); +#else + socket_t client_fd = accept(fd, (struct sockaddr*)&client_addr, &addr_len); + if (client_fd < 0) { + if (errno != EAGAIN && errno != EWOULDBLOCK) { + if (server->log_file) { + fprintf(server->log_file, "%llu: [ERROR] Accept failed: %s\n", + (unsigned long long)get_timestamp_ms(), strerror(errno)); + fflush(server->log_file); + } + DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Accept failed: %s", strerror(errno)); + } + return; + } + + /* Set non-blocking */ + int flags = fcntl(client_fd, F_GETFL, 0); + if (flags >= 0) { + fcntl(client_fd, F_SETFL, flags | O_NONBLOCK); + } +#endif + DEBUG_WARN(DEBUG_CATEGORY_CONTROL, "Accept..."); + + /* Check max clients */ + if (server->client_count >= server->max_clients) { + if (server->log_file) { + fprintf(server->log_file, "%llu: [ERROR] Max clients reached, rejecting connection\n", + (unsigned long long)get_timestamp_ms()); + fflush(server->log_file); + } + DEBUG_WARN(DEBUG_CATEGORY_CONTROL, "Max clients reached, rejecting connection"); +#ifdef _WIN32 + closesocket(client_fd); +#else + close(client_fd); +#endif + return; + } + + /* Allocate client structure */ + struct control_client* client = (struct control_client*)calloc(1, sizeof(*client)); + if (!client) { + if (server->log_file) { + fprintf(server->log_file, "%llu: [ERROR] Failed to allocate client structure\n", + (unsigned long long)get_timestamp_ms()); + fflush(server->log_file); + } + DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to allocate client structure"); +#ifdef _WIN32 + closesocket(client_fd); +#else + close(client_fd); +#endif + return; + } + + client->fd = client_fd; + client->server = server; /* Store back pointer to server */ + client->connected = 1; + client->selected_peer_id = 0; + + /* Register with uasync */ + client->socket_id = uasync_add_socket_t(server->ua, client_fd, + client_read_callback, + NULL, /* write callback - not needed for now */ + client_except_callback, + client); + + if (!client->socket_id) { + if (server->log_file) { + fprintf(server->log_file, "%llu: [ERROR] Failed to register client socket with uasync\n", + (unsigned long long)get_timestamp_ms()); + fflush(server->log_file); + } + DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to register client socket with uasync"); + free(client); +#ifdef _WIN32 + closesocket(client_fd); +#else + close(client_fd); +#endif + return; + } + + /* Add to list */ + client->next = server->clients; + server->clients = client; + server->client_count++; + + char addr_str[INET6_ADDRSTRLEN]; + if (client_addr.ss_family == AF_INET) { + struct sockaddr_in* sin = (struct sockaddr_in*)&client_addr; + inet_ntop(AF_INET, &sin->sin_addr, addr_str, sizeof(addr_str)); + } else { + struct sockaddr_in6* sin6 = (struct sockaddr_in6*)&client_addr; + inet_ntop(AF_INET6, &sin6->sin6_addr, addr_str, sizeof(addr_str)); + } + + if (server->log_file) { + fprintf(server->log_file, "%llu: [LOG] Client connected from %s (total: %u)\n", + (unsigned long long)get_timestamp_ms(), addr_str, server->client_count); + fflush(server->log_file); + } + + DEBUG_INFO(DEBUG_CATEGORY_CONTROL, "Control client connected from %s (total: %u)", + addr_str, server->client_count); +} + +static void client_read_callback(socket_t fd, void* arg) { + struct control_client* client = (struct control_client*)arg; + struct control_server* server = client->server; + + /* Read available data */ + uint8_t* buf = client->recv_buffer + client->recv_len; + size_t buf_space = ETCPMON_MAX_MSG_SIZE - client->recv_len; + +#ifdef _WIN32 + int received = recv(fd, (char*)buf, (int)buf_space, 0); + + if (received == SOCKET_ERROR) { + int err = WSAGetLastError(); + if (err != WSAEWOULDBLOCK) { + if (server && server->log_file) { + fprintf(server->log_file, "%llu: [ERROR] Client recv error: %d\n", + (unsigned long long)get_timestamp_ms(), err); + fflush(server->log_file); + } + DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Client recv error: %d", err); + client->connected = 0; + } + return; + } + if (received == 0) { + /* Connection closed */ + if (server && server->log_file) { + fprintf(server->log_file, "%llu: [LOG] Client disconnected (recv returned 0)\n", + (unsigned long long)get_timestamp_ms()); + fflush(server->log_file); + } + client->connected = 0; + return; + } +#else + ssize_t received = recv(fd, buf, buf_space, 0); + if (received < 0) { + if (errno != EAGAIN && errno != EWOULDBLOCK) { + if (server && server->log_file) { + fprintf(server->log_file, "%llu: [ERROR] Client recv error: %s\n", + (unsigned long long)get_timestamp_ms(), strerror(errno)); + fflush(server->log_file); + } + DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Client recv error: %s", strerror(errno)); + client->connected = 0; + } + return; + } + if (received == 0) { + /* Connection closed */ + if (server && server->log_file) { + fprintf(server->log_file, "%llu: [LOG] Client disconnected (recv returned 0)\n", + (unsigned long long)get_timestamp_ms()); + fflush(server->log_file); + } + client->connected = 0; + return; + } +#endif + + /* Log received data */ + if (server && server->log_file) { + log_hex_data(server->log_file, "RX", buf, received); + } + + client->recv_len += received; + + /* Process messages */ + if (server) { + handle_client_data(server, client); + } +} + +static void client_write_callback(socket_t fd, void* arg) { + /* Not used for now - writes are immediate */ + (void)fd; + (void)arg; +} + +static void client_except_callback(socket_t fd, void* arg) { + struct control_client* client = (struct control_client*)arg; + DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Client socket exception"); + + /* Mark for cleanup */ + client->connected = 0; + (void)fd; +} + +static void close_client(struct control_server* server, struct control_client* client) { + if (!server || !client) return; + + /* Log disconnection */ + if (server->log_file) { + fprintf(server->log_file, "%llu: [LOG] Client connection closed\n", + (unsigned long long)get_timestamp_ms()); + fflush(server->log_file); + } + + /* Remove from uasync */ + if (client->fd != INVALID_SOCKET) { + uasync_remove_socket_t(server->ua, client->fd); + } + + /* Close socket */ +#ifdef _WIN32 + if (client->fd != INVALID_SOCKET) { + closesocket(client->fd); + } +#else + if (client->fd >= 0) { + close(client->fd); + } +#endif + + /* Remove from list */ + struct control_client** curr = &server->clients; + while (*curr) { + if (*curr == client) { + *curr = client->next; + break; + } + curr = &(*curr)->next; + } + + server->client_count--; + free(client); + + DEBUG_INFO(DEBUG_CATEGORY_CONTROL, "Control client disconnected (total: %u)", + server->client_count); +} + +/* ============================================================================ + * Message Handling + * ============================================================================ */ + +static void handle_client_data(struct control_server* server, struct control_client* client) { + while (client->recv_len >= sizeof(struct etcpmon_msg_header)) { + struct etcpmon_msg_header* hdr = (struct etcpmon_msg_header*)client->recv_buffer; + + /* Validate header */ + if (etcpmon_validate_header(hdr) != 0) { + if (server->log_file) { + fprintf(server->log_file, "%llu: [ERROR] Invalid message header from client\n", + (unsigned long long)get_timestamp_ms()); + fflush(server->log_file); + } + DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Invalid message header from client"); + close_client(server, client); + return; + } + + /* Check if full message received */ + if (client->recv_len < hdr->size) { + break; /* Wait for more data */ + } + + /* Process message */ + uint8_t* payload = client->recv_buffer + sizeof(struct etcpmon_msg_header); + uint16_t payload_size = hdr->size - sizeof(struct etcpmon_msg_header); + + if (server->log_file) { + fprintf(server->log_file, "%llu: [LOG] Received command type=0x%02X size=%d\n", + (unsigned long long)get_timestamp_ms(), hdr->type, hdr->size); + fflush(server->log_file); + } + + switch (hdr->type) { + case ETCPMON_CMD_LIST_CONN: + send_conn_list(server, client); + break; + + case ETCPMON_CMD_SELECT_CONN: + if (payload_size >= sizeof(struct etcpmon_cmd_select)) { + struct etcpmon_cmd_select* cmd = (struct etcpmon_cmd_select*)payload; + client->selected_peer_id = cmd->peer_node_id; + if (server->log_file) { + fprintf(server->log_file, "%llu: [LOG] Client selected connection: %016llX\n", + (unsigned long long)get_timestamp_ms(), (unsigned long long)cmd->peer_node_id); + fflush(server->log_file); + } + DEBUG_INFO(DEBUG_CATEGORY_CONTROL, "Client selected connection: %016llX", + (unsigned long long)cmd->peer_node_id); + } + break; + + case ETCPMON_CMD_GET_METRICS: + if (client->selected_peer_id == 0) { + send_error(client, ETCPMON_ERR_NO_CONN_SELECTED, + "No connection selected"); + } else { + send_metrics(server, client); + } + break; + + case ETCPMON_CMD_DISCONNECT: + if (server->log_file) { + fprintf(server->log_file, "%llu: [LOG] Client requested disconnect\n", + (unsigned long long)get_timestamp_ms()); + fflush(server->log_file); + } + close_client(server, client); + return; + + default: + if (server->log_file) { + fprintf(server->log_file, "%llu: [ERROR] Unknown command from client: 0x%02X\n", + (unsigned long long)get_timestamp_ms(), hdr->type); + fflush(server->log_file); + } + DEBUG_WARN(DEBUG_CATEGORY_CONTROL, "Unknown command from client: 0x%02X", hdr->type); + send_error(client, ETCPMON_ERR_INVALID_CMD, "Unknown command"); + break; + } + + /* Remove processed message from buffer */ + uint16_t msg_size = hdr->size; + if (client->recv_len > msg_size) { + memmove(client->recv_buffer, client->recv_buffer + msg_size, + client->recv_len - msg_size); + } + client->recv_len -= msg_size; + } +} + +/* ============================================================================ + * Response Builders + * ============================================================================ */ + +static void send_conn_list(struct control_server* server, struct control_client* client) { + struct UTUN_INSTANCE* instance = server->instance; + + /* Count connections */ + uint8_t count = 0; + struct ETCP_CONN* conn = instance->connections; + while (conn && count < ETCPMON_MAX_CONNECTIONS) { + count++; + conn = conn->next; + } + + /* Build response */ + uint16_t rsp_size = ETCPMON_CONN_LIST_SIZE(count); + uint8_t* buffer = (uint8_t*)malloc(rsp_size); + if (!buffer) { + DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to allocate connection list buffer"); + return; + } + + struct etcpmon_msg_header* hdr = (struct etcpmon_msg_header*)buffer; + etcpmon_build_header(hdr, sizeof(struct etcpmon_rsp_conn_list) + + count * sizeof(struct etcpmon_conn_info), + ETCPMON_RSP_CONN_LIST); + + struct etcpmon_rsp_conn_list* rsp = (struct etcpmon_rsp_conn_list*)(buffer + sizeof(*hdr)); + rsp->count = count; + + struct etcpmon_conn_info* info = (struct etcpmon_conn_info*)(buffer + sizeof(*hdr) + sizeof(*rsp)); + conn = instance->connections; + for (uint8_t i = 0; i < count && conn; i++) { + info[i].peer_node_id = conn->peer_node_id; + strncpy(info[i].name, conn->log_name, ETCPMON_MAX_CONN_NAME - 1); + info[i].name[ETCPMON_MAX_CONN_NAME - 1] = '\0'; + conn = conn->next; + } + + /* Log and send response */ + if (server->log_file) { + log_hex_data(server->log_file, "TX", buffer, rsp_size); + fprintf(server->log_file, "%llu: [LOG] Sent RSP_CONN_LIST count=%d\n", + (unsigned long long)get_timestamp_ms(), count); + fflush(server->log_file); + } + +#ifdef _WIN32 + send(client->fd, (const char*)buffer, rsp_size, 0); +#else + send(client->fd, buffer, rsp_size, 0); +#endif + + free(buffer); +} + +static void send_metrics(struct control_server* server, struct control_client* client) { + struct UTUN_INSTANCE* instance = server->instance; + + /* Find selected connection */ + struct ETCP_CONN* conn = find_connection_by_peer_id(instance, client->selected_peer_id); + if (!conn) { + send_error(client, ETCPMON_ERR_INVALID_CONN, "Connection not found"); + return; + } + + /* Count links */ + uint8_t links_count = 0; + struct ETCP_LINK* link = conn->links; + while (link && links_count < ETCPMON_MAX_LINKS) { + links_count++; + link = link->next; + } + + /* Build response */ + uint16_t rsp_size = ETCPMON_METRICS_SIZE(links_count); + uint8_t* buffer = (uint8_t*)malloc(rsp_size); + if (!buffer) { + DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to allocate metrics buffer"); + return; + } + + struct etcpmon_msg_header* hdr = (struct etcpmon_msg_header*)buffer; + etcpmon_build_header(hdr, sizeof(struct etcpmon_rsp_metrics) + + links_count * sizeof(struct etcpmon_link_metrics), + ETCPMON_RSP_METRICS); + + struct etcpmon_rsp_metrics* rsp = (struct etcpmon_rsp_metrics*)(buffer + sizeof(*hdr)); + + /* Fill ETCP metrics */ + rsp->etcp.rtt_last = conn->rtt_last; + rsp->etcp.rtt_avg_10 = conn->rtt_avg_10; + rsp->etcp.rtt_avg_100 = conn->rtt_avg_100; + rsp->etcp.jitter = conn->jitter; + rsp->etcp.bytes_sent_total = conn->bytes_sent_total; + rsp->etcp.retrans_count = conn->retransmissions_count; + rsp->etcp.ack_count = conn->ack_packets_count; + rsp->etcp.unacked_bytes = conn->unacked_bytes; + rsp->etcp.optimal_inflight = conn->optimal_inflight; + rsp->etcp.links_count = links_count; + + /* Fill TUN metrics */ + if (instance->tun) { + rsp->tun.bytes_read = instance->tun->bytes_read; + rsp->tun.bytes_written = instance->tun->bytes_written; + rsp->tun.packets_read = instance->tun->packets_read; + rsp->tun.packets_written = instance->tun->packets_written; + rsp->tun.read_errors = instance->tun->read_errors; + rsp->tun.write_errors = instance->tun->write_errors; + } else { + memset(&rsp->tun, 0, sizeof(rsp->tun)); + } + + /* Fill link metrics */ + struct etcpmon_link_metrics* link_info = (struct etcpmon_link_metrics*)(buffer + sizeof(*hdr) + sizeof(*rsp)); + link = conn->links; + for (uint8_t i = 0; i < links_count && link; i++) { + link_info[i].local_link_id = link->local_link_id; + link_info[i].status = link->link_status; + link_info[i].encrypt_errors = (uint32_t)link->encrypt_errors; + link_info[i].decrypt_errors = (uint32_t)link->decrypt_errors; + link_info[i].send_errors = (uint32_t)link->send_errors; + link_info[i].recv_errors = (uint32_t)link->recv_errors; + link_info[i].total_encrypted = link->total_encrypted; + link_info[i].total_decrypted = link->total_decrypted; + link_info[i].bandwidth = link->bandwidth; + link_info[i].nat_changes_count = link->nat_changes_count; + link = link->next; + } + + /* Log and send response */ + if (server->log_file) { + log_hex_data(server->log_file, "TX", buffer, rsp_size); + fprintf(server->log_file, "%llu: [LOG] Sent RSP_METRICS rtt=%u bytes=%llu links=%d\n", + (unsigned long long)get_timestamp_ms(), + rsp->etcp.rtt_last, + (unsigned long long)rsp->etcp.bytes_sent_total, + links_count); + fflush(server->log_file); + } + +#ifdef _WIN32 + send(client->fd, (const char*)buffer, rsp_size, 0); +#else + send(client->fd, buffer, rsp_size, 0); +#endif + + free(buffer); +} + +static void send_error(struct control_client* client, uint8_t error_code, const char* msg) { + struct control_server* server = client->server; + uint16_t msg_len = (uint16_t)strlen(msg); + uint16_t rsp_size = ETCPMON_ERROR_SIZE(msg_len); + + uint8_t* buffer = (uint8_t*)malloc(rsp_size); + if (!buffer) return; + + struct etcpmon_msg_header* hdr = (struct etcpmon_msg_header*)buffer; + etcpmon_build_header(hdr, sizeof(struct etcpmon_rsp_error) + msg_len + 1, + ETCPMON_RSP_ERROR); + + struct etcpmon_rsp_error* rsp = (struct etcpmon_rsp_error*)(buffer + sizeof(*hdr)); + rsp->error_code = error_code; + memcpy(buffer + sizeof(*hdr) + sizeof(*rsp), msg, msg_len + 1); + + /* Log error response */ + if (server && server->log_file) { + log_hex_data(server->log_file, "TX", buffer, rsp_size); + fprintf(server->log_file, "%llu: [LOG] Sent RSP_ERROR code=%d msg='%s'\n", + (unsigned long long)get_timestamp_ms(), error_code, msg); + fflush(server->log_file); + } + +#ifdef _WIN32 + send(client->fd, (const char*)buffer, rsp_size, 0); +#else + send(client->fd, buffer, rsp_size, 0); +#endif + + free(buffer); +} + +/* ============================================================================ + * Helper Functions + * ============================================================================ */ + +static struct ETCP_CONN* find_connection_by_peer_id(struct UTUN_INSTANCE* instance, + uint64_t peer_id) { + struct ETCP_CONN* conn = instance->connections; + while (conn) { + if (conn->peer_node_id == peer_id) { + return conn; + } + conn = conn->next; + } + return NULL; +} + +/* ============================================================================ + * Public API Implementation + * ============================================================================ */ + +void control_server_process_updates(struct control_server* server) { + if (!server) return; + + /* Process any pending data for all clients */ + struct control_client* client = server->clients; + while (client) { + struct control_client* next = client->next; + + if (!client->connected) { + close_client(server, client); + } else if (client->recv_len > 0) { + handle_client_data(server, client); + } + + client = next; + } +} + +uint32_t control_server_get_client_count(const struct control_server* server) { + return server ? server->client_count : 0; +} diff --git a/src/control_server.h b/src/control_server.h new file mode 100644 index 0000000..25cf814 --- /dev/null +++ b/src/control_server.h @@ -0,0 +1,120 @@ +/* + * control_server.h - Control Socket Server for ETCP Monitoring + * + * Provides TCP control interface for ETCP connection monitoring + */ + +#ifndef CONTROL_SERVER_H +#define CONTROL_SERVER_H + +#include "../tools/etcpmon/etcpmon_protocol.h" +#include "../lib/socket_compat.h" +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +/* Forward declarations */ +struct UTUN_INSTANCE; +struct UASYNC; + +/* Forward declaration */ +struct control_server; + +/* Client connection state */ +struct control_client { + struct control_client* next; /* Linked list */ + struct control_server* server; /* Back pointer to server */ + socket_t fd; /* Client socket */ + void* socket_id; /* uasync socket ID */ + + /* Receive buffer */ + uint8_t recv_buffer[ETCPMON_MAX_MSG_SIZE]; + uint16_t recv_len; /* Bytes received so far */ + + /* Selected connection */ + uint64_t selected_peer_id; /* 0 = none selected */ + + /* Client state */ + uint8_t connected; +}; + +/* Control server state */ +struct control_server { + struct UTUN_INSTANCE* instance; /* Parent instance */ + struct UASYNC* ua; /* Async context */ + + /* Listening socket */ + socket_t listen_fd; + void* listen_socket_id; /* uasync socket ID */ + struct sockaddr_storage bind_addr; /* Bound address */ + + /* Client connections */ + struct control_client* clients; /* Linked list of clients */ + uint32_t client_count; /* Number of connected clients */ + uint32_t max_clients; /* Maximum allowed clients */ + + /* Log file */ + FILE* log_file; +}; + +/* ============================================================================ + * Public API + * ============================================================================ */ + +/* Initialize control server + * + * Creates listening socket and registers with uasync + * + * Parameters: + * server - Server state structure to initialize + * ua - uasync context for event handling + * instance - Parent utun instance + * bind_addr - Address to bind (from config control_sock) + * max_clients - Maximum concurrent client connections + * + * Returns: + * 0 on success, -1 on error + */ +int control_server_init(struct control_server* server, + struct UASYNC* ua, + struct UTUN_INSTANCE* instance, + struct sockaddr_storage* bind_addr, + uint32_t max_clients); + +/* Shutdown control server + * + * Closes all client connections and listening socket + * + * Parameters: + * server - Server state to shutdown + */ +void control_server_shutdown(struct control_server* server); + +/* Process periodic updates + * + * Should be called periodically (e.g., from main loop) to send + * metrics updates to clients that have selected connections + * + * Parameters: + * server - Server state + */ +void control_server_process_updates(struct control_server* server); + +/* Get number of connected clients + * + * Parameters: + * server - Server state + * + * Returns: + * Number of connected clients + */ +uint32_t control_server_get_client_count(const struct control_server* server); + +#ifdef __cplusplus +} +#endif + +#endif /* CONTROL_SERVER_H */ diff --git a/src/tun_windows.c b/src/tun_windows.c index 48fc0f2..35114d6 100644 --- a/src/tun_windows.c +++ b/src/tun_windows.c @@ -191,13 +191,26 @@ int tun_platform_init(struct tun_if* tun, const char* ifname, const char* ip_str adapter_name = wname; } - // Create or open Wintun adapter - GUID guid = {0}; - WINTUN_ADAPTER_HANDLE adapter = WintunCreateAdapter(adapter_name, tunnel_type, &guid); + // Try to open existing adapter first + WINTUN_ADAPTER_HANDLE adapter = WintunOpenAdapter(adapter_name); if (!adapter) { DWORD err = GetLastError(); - DEBUG_ERROR(DEBUG_CATEGORY_TUN, "Failed to create Wintun adapter: %lu", err); - return -1; + if (err == ERROR_NOT_FOUND || err == ERROR_FILE_NOT_FOUND) { // 1168 or 2 + // Adapter does not exist → safe to create a new one + adapter = WintunCreateAdapter(adapter_name, tunnel_type, NULL); + if (!adapter) { + err = GetLastError(); + DEBUG_ERROR(DEBUG_CATEGORY_TUN, "Failed to create new Wintun adapter: %lu", err); + return -1; + } + DEBUG_INFO(DEBUG_CATEGORY_TUN, "Created new Wintun adapter: %s", tun->ifname); + } else { + // Some other real failure (e.g. access denied, driver not loaded, etc.) + DEBUG_ERROR(DEBUG_CATEGORY_TUN, "Failed to open existing Wintun adapter: %lu", err); + return -1; + } + } else { + DEBUG_INFO(DEBUG_CATEGORY_TUN, "Opened existing Wintun adapter: %s", tun->ifname); } // Get adapter LUID for IP configuration diff --git a/src/utun.c b/src/utun.c index 99b4da0..5b9f8b7 100644 --- a/src/utun.c +++ b/src/utun.c @@ -251,6 +251,12 @@ static void signal_handler(int sig) { } } +void test_tmr(void* arg) { + struct UASYNC* ua = (struct UASYNC*)arg; + uasync_set_timeout(ua, 10000, ua, test_tmr); + DEBUG_INFO(DEBUG_CATEGORY_ETCP, "tick ..."); + } + int main(int argc, char *argv[]) { cmd_args_t args; parse_args(argc, argv, &args); @@ -306,6 +312,8 @@ int main(int argc, char *argv[]) { return 1; } + uasync_set_timeout(ua, 10000, ua, test_tmr); + #ifdef _WIN32 // Check for wintun.dll before enabling TUN if (access("wintun.dll", F_OK) != 0) { @@ -362,6 +370,8 @@ int main(int argc, char *argv[]) { return 1; } + DEBUG_INFO(DEBUG_CATEGORY_MEMORY, "Run mainloop"); + while (instance->running) uasync_poll(ua, 100); // Cleanup diff --git a/src/utun_instance.c b/src/utun_instance.c index 09d304d..868ca93 100644 --- a/src/utun_instance.c +++ b/src/utun_instance.c @@ -9,6 +9,7 @@ #include "route_bgp.h" #include "etcp_connections.h" #include "etcp.h" +#include "control_server.h" #include "../lib/u_async.h" #include "../lib/debug_config.h" #include @@ -201,7 +202,15 @@ void utun_instance_destroy(struct UTUN_INSTANCE *instance) { // Stop running if not already instance->running = 0; - + + // Shutdown control server first + if (instance->control_srv) { + DEBUG_INFO(DEBUG_CATEGORY_CONTROL, "Shutting down control server"); + control_server_shutdown(instance->control_srv); + free(instance->control_srv); + instance->control_srv = NULL; + } + // Cleanup ETCP sockets and connections FIRST (before destroying uasync) DEBUG_INFO(DEBUG_CATEGORY_MEMORY, "[INSTANCE_DESTROY] Cleaning up ETCP sockets and connections"); struct ETCP_SOCKET* sock = instance->etcp_sockets; @@ -335,6 +344,21 @@ int utun_instance_init(struct UTUN_INSTANCE *instance) { fprintf(stderr, "DEBUG: Connections initialized successfully, count=%d\n", instance->connections_count); fflush(stderr); + // Initialize control server if configured + if (instance->config->global.control_sock.ss_family != 0) { + instance->control_srv = (struct control_server*)calloc(1, sizeof(struct control_server)); + if (instance->control_srv) { + if (control_server_init(instance->control_srv, instance->ua, instance, + &instance->config->global.control_sock, 8) != 0) { + DEBUG_WARN(DEBUG_CATEGORY_CONTROL, "Failed to initialize control server, continuing without monitoring"); + free(instance->control_srv); + instance->control_srv = NULL; + } else { + DEBUG_INFO(DEBUG_CATEGORY_CONTROL, "Control server initialized successfully"); + } + } + } + // Start the main loop instance->running = 1; diff --git a/src/utun_instance.h b/src/utun_instance.h index 4cb939a..984b3fa 100644 --- a/src/utun_instance.h +++ b/src/utun_instance.h @@ -17,6 +17,7 @@ struct ETCP_SOCKET; struct tun_if; struct ETCP_BINDINGS; struct ROUTE_BGP; +struct control_server; // uTun instance configuration struct UTUN_INSTANCE { @@ -53,6 +54,9 @@ struct UTUN_INSTANCE { // ETCP API bindings (per-instance) struct ETCP_BINDINGS api_bindings; + + // Control server for monitoring + struct control_server* control_srv; }; // Functions diff --git a/tests/Makefile.am b/tests/Makefile.am index ca23d8e..76165e4 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -24,6 +24,7 @@ check_PROGRAMS = \ test_bgp_route_exchange \ test_routing_mesh \ test_etcp_dummynet \ + test_control_server \ bench_timeout_heap \ bench_uasync_timeouts @@ -209,6 +210,11 @@ bench_uasync_timeouts_SOURCES = bench_uasync_timeouts.c bench_uasync_timeouts_CFLAGS = -I$(top_srcdir)/lib bench_uasync_timeouts_LDADD = $(COMMON_LIBS) +# Control Server Test - Tests the ETCP monitoring protocol +test_control_server_SOURCES = test_control_server.c +test_control_server_CFLAGS = -I$(top_srcdir)/src -I$(top_srcdir)/lib -I$(top_srcdir)/tools/etcpmon +test_control_server_LDADD = $(top_builddir)/src/utun-control_server.o $(COMMON_LIBS) + # Build tinycrypt objects before tests that need them BUILT_SOURCES = $(TINYCRYPT_BUILT)