Browse Source

Control: Add monitoring server with ETCP protocol support

- Implement control_server.c/h for monitoring and management
- Add Windows-compatible uasync_poll using select() instead of WSAPoll
- Fix Wintun adapter creation to open existing adapters first
- Add debug category for control server operations
- Update build files to include control server in compilation
- Add test_control_server to test suite
nodeinfo-routing-update
jeka 1 month ago
parent
commit
734851329f
  1. 3
      build.bat
  2. 1
      lib/debug_config.h
  3. 118
      lib/u_async.c
  4. 3
      src/Makefile.am
  5. 856
      src/control_server.c
  6. 120
      src/control_server.h
  7. 23
      src/tun_windows.c
  8. 10
      src/utun.c
  9. 26
      src/utun_instance.c
  10. 4
      src/utun_instance.h
  11. 6
      tests/Makefile.am

3
build.bat

@ -24,4 +24,5 @@ if not exist "%MSYS2_PATH%\msys2_shell.cmd" (
REM Launch MSYS2 UCRT64 shell and run build script
echo Starting MSYS2 UCRT64 environment...
call "%MSYS2_PATH%\msys2_shell.cmd" -ucrt64 -here -c "bash build.sh > build_win.log 2>&1"
rem call "%MSYS2_PATH%\msys2_shell.cmd" -ucrt64 -here -c "bash build.sh > build_win.log 2>&1"
call "%MSYS2_PATH%\msys2_shell.cmd" -ucrt64 -here -defterm -no-start -c "bash build.sh 2>&1 | tee build_win.log"

1
lib/debug_config.h

@ -50,6 +50,7 @@ typedef uint64_t debug_category_t;
#define DEBUG_CATEGORY_NORMALIZER ((debug_category_t)1 << 11) // packet normalizer
#define DEBUG_CATEGORY_BGP ((debug_category_t)1 << 12) // BGP route exchange
#define DEBUG_CATEGORY_SOCKET ((debug_category_t)1 << 13) // Socket operations
#define DEBUG_CATEGORY_CONTROL ((debug_category_t)1 << 14) // Control/monitoring server
#define DEBUG_CATEGORY_ALL ((debug_category_t)0xFFFFFFFFFFFFFFFFULL)
/* Debug configuration structure */

118
lib/u_async.c

@ -788,6 +788,107 @@ void uasync_poll(struct UASYNC* ua, int timeout_tb) {
return;
}
#ifdef _WIN32
// On Windows, use select() instead of WSAPoll to avoid issues with accepted sockets
fd_set read_fds, write_fds, except_fds;
FD_ZERO(&read_fds);
FD_ZERO(&write_fds);
FD_ZERO(&except_fds);
SOCKET max_fd = 0;
// Add all active sockets to fd_sets
for (int i = 0; i < ua->sockets->count; i++) {
int idx = ua->sockets->active_indices[i];
struct socket_node* node = &ua->sockets->sockets[idx];
if (!node->active) continue;
SOCKET s;
if (node->type == SOCKET_NODE_TYPE_SOCK) {
s = node->sock;
} else {
s = (SOCKET)node->fd;
}
if (node->type == SOCKET_NODE_TYPE_SOCK) {
if (node->read_cbk_sock) FD_SET(s, &read_fds);
if (node->write_cbk_sock) FD_SET(s, &write_fds);
} else {
if (node->read_cbk) FD_SET(s, &read_fds);
if (node->write_cbk) FD_SET(s, &write_fds);
}
if (node->except_cbk) FD_SET(s, &except_fds);
if (s > max_fd) max_fd = s;
}
struct timeval tv;
tv.tv_sec = timeout_ms / 1000;
tv.tv_usec = (timeout_ms % 1000) * 1000;
int ret = select((int)max_fd + 1, &read_fds, &write_fds, &except_fds, &tv);
if (ret < 0) {
int err = WSAGetLastError();
if (err != WSAEINTR) {
DEBUG_ERROR(DEBUG_CATEGORY_UASYNC, "select failed: %d", err);
}
return;
}
if (ret > 0) {
for (int i = 0; i < ua->sockets->count; i++) {
int idx = ua->sockets->active_indices[i];
struct socket_node* node = &ua->sockets->sockets[idx];
if (!node->active) continue;
SOCKET s;
if (node->type == SOCKET_NODE_TYPE_SOCK) {
s = node->sock;
} else {
s = (SOCKET)node->fd;
}
int has_read = FD_ISSET(s, &read_fds);
int has_write = FD_ISSET(s, &write_fds);
int has_except = FD_ISSET(s, &except_fds);
if (!has_read && !has_write && !has_except) continue;
if (has_except) {
if (node->except_cbk) {
node->except_cbk(node->fd, node->user_data);
}
}
if (has_read) {
if (node->type == SOCKET_NODE_TYPE_SOCK) {
if (node->read_cbk_sock) {
node->read_cbk_sock(node->sock, node->user_data);
}
} else {
if (node->read_cbk) {
node->read_cbk(node->fd, node->user_data);
}
}
}
if (has_write) {
if (node->type == SOCKET_NODE_TYPE_SOCK) {
if (node->write_cbk_sock) {
node->write_cbk_sock(node->sock, node->user_data);
}
} else {
if (node->write_cbk) {
node->write_cbk(node->fd, node->user_data);
}
}
}
}
}
#else
// On non-Windows, use poll()
// Rebuild poll_fds if dirty or not allocated
if (ua->poll_fds_dirty || !ua->poll_fds) {
rebuild_poll_fds(ua);
@ -798,21 +899,12 @@ void uasync_poll(struct UASYNC* ua, int timeout_tb) {
rebuild_poll_fds(ua);
}
/* Call poll with cached fds */
int ret = poll(ua->poll_fds, ua->poll_fds_count, timeout_ms);
if (ret < 0) {
#ifdef _WIN32
int err = WSAGetLastError();
if (err == WSAEINTR) {
return;
}
DEBUG_ERROR(DEBUG_CATEGORY_UASYNC, "WSAPoll failed: %d", err);
#else
if (errno == EINTR) {
return;
}
perror("poll");
#endif
return;
}
@ -832,12 +924,7 @@ void uasync_poll(struct UASYNC* ua, int timeout_tb) {
/* Socket event - lookup by fd */
struct socket_node* node = socket_array_get(ua->sockets, ua->poll_fds[i].fd);
if (!node) { // Try by socket_t (in case this is a socket)
socket_t lookup_sock;
#ifdef _WIN32
lookup_sock = (socket_t)(intptr_t)ua->poll_fds[i].fd;
#else
lookup_sock = ua->poll_fds[i].fd;
#endif
socket_t lookup_sock = ua->poll_fds[i].fd;
node = socket_array_get_by_sock(ua->sockets, lookup_sock);
}
if (!node) continue; // Socket may have been removed
@ -884,6 +971,7 @@ void uasync_poll(struct UASYNC* ua, int timeout_tb) {
}
}
}
#endif
/* Process timeouts that may have expired during poll or socket processing */
process_timeouts(ua);

3
src/Makefile.am

@ -17,7 +17,8 @@ utun_CORE_SOURCES = \
secure_channel.c \
crc32.c \
pkt_normalizer.c \
etcp_api.c
etcp_api.c \
control_server.c
# Platform-specific TUN sources
if OS_WINDOWS

856
src/control_server.c

@ -0,0 +1,856 @@
/*
* control_server.c - Control Socket Server Implementation
*
* Handles ETCP monitoring requests from clients
*/
#include "control_server.h"
#include "utun_instance.h"
#include "etcp.h"
#include "etcp_connections.h"
#include "tun_if.h"
#include "../lib/u_async.h"
#include "../lib/debug_config.h"
#include <stdlib.h>
#include <string.h>
#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
#else
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#endif
#ifndef DEBUG_CATEGORY_CONTROL
#define DEBUG_CATEGORY_CONTROL 1
#endif
/* Log file name */
#define LOG_FILENAME "control_server.log"
/* ============================================================================
* Logging Helpers
* ============================================================================ */
#ifdef _WIN32
#include <windows.h>
static uint64_t get_timestamp_ms(void) {
FILETIME ft;
GetSystemTimeAsFileTime(&ft);
uint64_t time = ((uint64_t)ft.dwHighDateTime << 32) | ft.dwLowDateTime;
return (time / 10000) - 11644473600000ULL;
}
#else
#include <time.h>
static uint64_t get_timestamp_ms(void) {
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return (uint64_t)ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
}
#endif
static void log_hex_data(FILE* log, const char* prefix, const uint8_t* data, size_t len) {
if (!log || !data || len == 0) return;
fprintf(log, "%llu: [%s] ", (unsigned long long)get_timestamp_ms(), prefix);
for (size_t i = 0; i < len && i < 256; i++) {
fprintf(log, "%02X ", data[i]);
}
if (len > 256) {
fprintf(log, "... (%llu bytes total)", (unsigned long long)len);
}
fprintf(log, "\n");
fflush(log);
}
/* ============================================================================
* Forward Declarations
* ============================================================================ */
static void accept_callback(socket_t fd, void* arg);
static void client_read_callback(socket_t fd, void* arg);
static void client_write_callback(socket_t fd, void* arg);
static void client_except_callback(socket_t fd, void* arg);
static void handle_client_data(struct control_server* server, struct control_client* client);
static void close_client(struct control_server* server, struct control_client* client);
static void send_conn_list(struct control_server* server, struct control_client* client);
static void send_metrics(struct control_server* server, struct control_client* client);
static void send_error(struct control_client* client, uint8_t error_code, const char* msg);
static struct ETCP_CONN* find_connection_by_peer_id(struct UTUN_INSTANCE* instance, uint64_t peer_id);
/* ============================================================================
* Server Initialization
* ============================================================================ */
int control_server_init(struct control_server* server,
struct UASYNC* ua,
struct UTUN_INSTANCE* instance,
struct sockaddr_storage* bind_addr,
uint32_t max_clients) {
if (!server || !ua || !instance || !bind_addr) {
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Invalid parameters for control_server_init");
return -1;
}
return 0;
memset(server, 0, sizeof(*server));
server->instance = instance;
server->ua = ua;
server->max_clients = max_clients ? max_clients : 8;
memcpy(&server->bind_addr, bind_addr, sizeof(*bind_addr));
/* Open log file (truncate on start) */
server->log_file = fopen(LOG_FILENAME, "w");
if (server->log_file) {
fprintf(server->log_file, "%llu: [LOG] Control server log started\n",
(unsigned long long)get_timestamp_ms());
fflush(server->log_file);
}
/* Create listening socket */
int family = bind_addr->ss_family;
server->listen_fd = socket(family, SOCK_STREAM, IPPROTO_TCP);
#ifdef _WIN32
if (server->listen_fd == INVALID_SOCKET) {
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to create listening socket: %d", WSAGetLastError());
return -1;
}
/* Set non-blocking mode */
u_long nonblock = 1;
ioctlsocket(server->listen_fd, FIONBIO, &nonblock);
/* Enable address reuse */
int reuse = 1;
if (setsockopt(server->listen_fd, SOL_SOCKET, SO_REUSEADDR,
(const char*)&reuse, sizeof(reuse)) == SOCKET_ERROR) {
DEBUG_WARN(DEBUG_CATEGORY_CONTROL, "Failed to set SO_REUSEADDR: %d", WSAGetLastError());
}
#else
if (server->listen_fd < 0) {
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to create listening socket: %s", strerror(errno));
return -1;
}
int reuse = 1;
if (setsockopt(server->listen_fd, SOL_SOCKET, SO_REUSEADDR,
&reuse, sizeof(reuse)) < 0) {
DEBUG_WARN(DEBUG_CATEGORY_CONTROL, "Failed to set SO_REUSEADDR: %s", strerror(errno));
}
/* Set non-blocking */
int flags = fcntl(server->listen_fd, F_GETFL, 0);
if (flags >= 0) {
fcntl(server->listen_fd, F_SETFL, flags | O_NONBLOCK);
}
#endif
/* Bind to address */
socklen_t addr_len = (family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6);
#ifdef _WIN32
if (bind(server->listen_fd, (struct sockaddr*)bind_addr, addr_len) == SOCKET_ERROR) {
int err = WSAGetLastError();
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to bind control socket: %d", err);
closesocket(server->listen_fd);
server->listen_fd = INVALID_SOCKET;
return -1;
}
/* Listen */
if (listen(server->listen_fd, 5) == SOCKET_ERROR) {
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to listen on control socket: %d", WSAGetLastError());
closesocket(server->listen_fd);
server->listen_fd = INVALID_SOCKET;
return -1;
}
#else
if (bind(server->listen_fd, (struct sockaddr*)bind_addr, addr_len) < 0) {
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to bind control socket: %s", strerror(errno));
close(server->listen_fd);
server->listen_fd = -1;
return -1;
}
if (listen(server->listen_fd, 5) < 0) {
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to listen on control socket: %s", strerror(errno));
close(server->listen_fd);
server->listen_fd = -1;
return -1;
}
#endif
/* Register with uasync */
server->listen_socket_id = uasync_add_socket_t(ua, server->listen_fd,
accept_callback,
NULL, /* write callback */
NULL, /* except callback */
server);
if (!server->listen_socket_id) {
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to register control socket with uasync");
#ifdef _WIN32
closesocket(server->listen_fd);
server->listen_fd = INVALID_SOCKET;
#else
close(server->listen_fd);
server->listen_fd = -1;
#endif
return -1;
}
char addr_str[INET6_ADDRSTRLEN];
if (family == AF_INET) {
struct sockaddr_in* sin = (struct sockaddr_in*)bind_addr;
inet_ntop(AF_INET, &sin->sin_addr, addr_str, sizeof(addr_str));
DEBUG_INFO(DEBUG_CATEGORY_CONTROL, "Control server listening on %s:%d",
addr_str, ntohs(sin->sin_port));
} else {
struct sockaddr_in6* sin6 = (struct sockaddr_in6*)bind_addr;
inet_ntop(AF_INET6, &sin6->sin6_addr, addr_str, sizeof(addr_str));
DEBUG_INFO(DEBUG_CATEGORY_CONTROL, "Control server listening on [%s]:%d",
addr_str, ntohs(sin6->sin6_port));
}
return 0;
}
/* ============================================================================
* Server Shutdown
* ============================================================================ */
void control_server_shutdown(struct control_server* server) {
if (!server) return;
/* Close all client connections */
while (server->clients) {
close_client(server, server->clients);
}
/* Close listening socket */
if (server->listen_fd != INVALID_SOCKET) {
uasync_remove_socket_t(server->ua, server->listen_fd);
server->listen_socket_id = NULL;
}
#ifdef _WIN32
if (server->listen_fd != INVALID_SOCKET) {
closesocket(server->listen_fd);
server->listen_fd = INVALID_SOCKET;
}
#else
if (server->listen_fd >= 0) {
close(server->listen_fd);
server->listen_fd = -1;
}
#endif
/* Close log file */
if (server->log_file) {
fprintf(server->log_file, "%llu: [LOG] Control server shutdown complete\n",
(unsigned long long)get_timestamp_ms());
fclose(server->log_file);
server->log_file = NULL;
}
DEBUG_INFO(DEBUG_CATEGORY_CONTROL, "Control server shutdown complete");
}
/* ============================================================================
* Client Connection Handling
* ============================================================================ */
static void accept_callback(socket_t fd, void* arg) {
struct control_server* server = (struct control_server*)arg;
struct sockaddr_storage client_addr;
socklen_t addr_len = sizeof(client_addr);
#ifdef _WIN32
socket_t client_fd = accept(fd, (struct sockaddr*)&client_addr, &addr_len);
if (client_fd == INVALID_SOCKET) {
int err = WSAGetLastError();
if (err != WSAEWOULDBLOCK) {
if (server->log_file) {
fprintf(server->log_file, "%llu: [ERROR] Accept failed: %d\n",
(unsigned long long)get_timestamp_ms(), err);
fflush(server->log_file);
}
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Accept failed: %d", err);
}
return;
}
/* Set non-blocking */
u_long nonblock = 1;
ioctlsocket(client_fd, FIONBIO, &nonblock);
/* Small delay to let socket stabilize (Windows-specific workaround) */
// Sleep(10);
#else
socket_t client_fd = accept(fd, (struct sockaddr*)&client_addr, &addr_len);
if (client_fd < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
if (server->log_file) {
fprintf(server->log_file, "%llu: [ERROR] Accept failed: %s\n",
(unsigned long long)get_timestamp_ms(), strerror(errno));
fflush(server->log_file);
}
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Accept failed: %s", strerror(errno));
}
return;
}
/* Set non-blocking */
int flags = fcntl(client_fd, F_GETFL, 0);
if (flags >= 0) {
fcntl(client_fd, F_SETFL, flags | O_NONBLOCK);
}
#endif
DEBUG_WARN(DEBUG_CATEGORY_CONTROL, "Accept...");
/* Check max clients */
if (server->client_count >= server->max_clients) {
if (server->log_file) {
fprintf(server->log_file, "%llu: [ERROR] Max clients reached, rejecting connection\n",
(unsigned long long)get_timestamp_ms());
fflush(server->log_file);
}
DEBUG_WARN(DEBUG_CATEGORY_CONTROL, "Max clients reached, rejecting connection");
#ifdef _WIN32
closesocket(client_fd);
#else
close(client_fd);
#endif
return;
}
/* Allocate client structure */
struct control_client* client = (struct control_client*)calloc(1, sizeof(*client));
if (!client) {
if (server->log_file) {
fprintf(server->log_file, "%llu: [ERROR] Failed to allocate client structure\n",
(unsigned long long)get_timestamp_ms());
fflush(server->log_file);
}
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to allocate client structure");
#ifdef _WIN32
closesocket(client_fd);
#else
close(client_fd);
#endif
return;
}
client->fd = client_fd;
client->server = server; /* Store back pointer to server */
client->connected = 1;
client->selected_peer_id = 0;
/* Register with uasync */
client->socket_id = uasync_add_socket_t(server->ua, client_fd,
client_read_callback,
NULL, /* write callback - not needed for now */
client_except_callback,
client);
if (!client->socket_id) {
if (server->log_file) {
fprintf(server->log_file, "%llu: [ERROR] Failed to register client socket with uasync\n",
(unsigned long long)get_timestamp_ms());
fflush(server->log_file);
}
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to register client socket with uasync");
free(client);
#ifdef _WIN32
closesocket(client_fd);
#else
close(client_fd);
#endif
return;
}
/* Add to list */
client->next = server->clients;
server->clients = client;
server->client_count++;
char addr_str[INET6_ADDRSTRLEN];
if (client_addr.ss_family == AF_INET) {
struct sockaddr_in* sin = (struct sockaddr_in*)&client_addr;
inet_ntop(AF_INET, &sin->sin_addr, addr_str, sizeof(addr_str));
} else {
struct sockaddr_in6* sin6 = (struct sockaddr_in6*)&client_addr;
inet_ntop(AF_INET6, &sin6->sin6_addr, addr_str, sizeof(addr_str));
}
if (server->log_file) {
fprintf(server->log_file, "%llu: [LOG] Client connected from %s (total: %u)\n",
(unsigned long long)get_timestamp_ms(), addr_str, server->client_count);
fflush(server->log_file);
}
DEBUG_INFO(DEBUG_CATEGORY_CONTROL, "Control client connected from %s (total: %u)",
addr_str, server->client_count);
}
static void client_read_callback(socket_t fd, void* arg) {
struct control_client* client = (struct control_client*)arg;
struct control_server* server = client->server;
/* Read available data */
uint8_t* buf = client->recv_buffer + client->recv_len;
size_t buf_space = ETCPMON_MAX_MSG_SIZE - client->recv_len;
#ifdef _WIN32
int received = recv(fd, (char*)buf, (int)buf_space, 0);
if (received == SOCKET_ERROR) {
int err = WSAGetLastError();
if (err != WSAEWOULDBLOCK) {
if (server && server->log_file) {
fprintf(server->log_file, "%llu: [ERROR] Client recv error: %d\n",
(unsigned long long)get_timestamp_ms(), err);
fflush(server->log_file);
}
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Client recv error: %d", err);
client->connected = 0;
}
return;
}
if (received == 0) {
/* Connection closed */
if (server && server->log_file) {
fprintf(server->log_file, "%llu: [LOG] Client disconnected (recv returned 0)\n",
(unsigned long long)get_timestamp_ms());
fflush(server->log_file);
}
client->connected = 0;
return;
}
#else
ssize_t received = recv(fd, buf, buf_space, 0);
if (received < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
if (server && server->log_file) {
fprintf(server->log_file, "%llu: [ERROR] Client recv error: %s\n",
(unsigned long long)get_timestamp_ms(), strerror(errno));
fflush(server->log_file);
}
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Client recv error: %s", strerror(errno));
client->connected = 0;
}
return;
}
if (received == 0) {
/* Connection closed */
if (server && server->log_file) {
fprintf(server->log_file, "%llu: [LOG] Client disconnected (recv returned 0)\n",
(unsigned long long)get_timestamp_ms());
fflush(server->log_file);
}
client->connected = 0;
return;
}
#endif
/* Log received data */
if (server && server->log_file) {
log_hex_data(server->log_file, "RX", buf, received);
}
client->recv_len += received;
/* Process messages */
if (server) {
handle_client_data(server, client);
}
}
static void client_write_callback(socket_t fd, void* arg) {
/* Not used for now - writes are immediate */
(void)fd;
(void)arg;
}
static void client_except_callback(socket_t fd, void* arg) {
struct control_client* client = (struct control_client*)arg;
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Client socket exception");
/* Mark for cleanup */
client->connected = 0;
(void)fd;
}
static void close_client(struct control_server* server, struct control_client* client) {
if (!server || !client) return;
/* Log disconnection */
if (server->log_file) {
fprintf(server->log_file, "%llu: [LOG] Client connection closed\n",
(unsigned long long)get_timestamp_ms());
fflush(server->log_file);
}
/* Remove from uasync */
if (client->fd != INVALID_SOCKET) {
uasync_remove_socket_t(server->ua, client->fd);
}
/* Close socket */
#ifdef _WIN32
if (client->fd != INVALID_SOCKET) {
closesocket(client->fd);
}
#else
if (client->fd >= 0) {
close(client->fd);
}
#endif
/* Remove from list */
struct control_client** curr = &server->clients;
while (*curr) {
if (*curr == client) {
*curr = client->next;
break;
}
curr = &(*curr)->next;
}
server->client_count--;
free(client);
DEBUG_INFO(DEBUG_CATEGORY_CONTROL, "Control client disconnected (total: %u)",
server->client_count);
}
/* ============================================================================
* Message Handling
* ============================================================================ */
static void handle_client_data(struct control_server* server, struct control_client* client) {
while (client->recv_len >= sizeof(struct etcpmon_msg_header)) {
struct etcpmon_msg_header* hdr = (struct etcpmon_msg_header*)client->recv_buffer;
/* Validate header */
if (etcpmon_validate_header(hdr) != 0) {
if (server->log_file) {
fprintf(server->log_file, "%llu: [ERROR] Invalid message header from client\n",
(unsigned long long)get_timestamp_ms());
fflush(server->log_file);
}
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Invalid message header from client");
close_client(server, client);
return;
}
/* Check if full message received */
if (client->recv_len < hdr->size) {
break; /* Wait for more data */
}
/* Process message */
uint8_t* payload = client->recv_buffer + sizeof(struct etcpmon_msg_header);
uint16_t payload_size = hdr->size - sizeof(struct etcpmon_msg_header);
if (server->log_file) {
fprintf(server->log_file, "%llu: [LOG] Received command type=0x%02X size=%d\n",
(unsigned long long)get_timestamp_ms(), hdr->type, hdr->size);
fflush(server->log_file);
}
switch (hdr->type) {
case ETCPMON_CMD_LIST_CONN:
send_conn_list(server, client);
break;
case ETCPMON_CMD_SELECT_CONN:
if (payload_size >= sizeof(struct etcpmon_cmd_select)) {
struct etcpmon_cmd_select* cmd = (struct etcpmon_cmd_select*)payload;
client->selected_peer_id = cmd->peer_node_id;
if (server->log_file) {
fprintf(server->log_file, "%llu: [LOG] Client selected connection: %016llX\n",
(unsigned long long)get_timestamp_ms(), (unsigned long long)cmd->peer_node_id);
fflush(server->log_file);
}
DEBUG_INFO(DEBUG_CATEGORY_CONTROL, "Client selected connection: %016llX",
(unsigned long long)cmd->peer_node_id);
}
break;
case ETCPMON_CMD_GET_METRICS:
if (client->selected_peer_id == 0) {
send_error(client, ETCPMON_ERR_NO_CONN_SELECTED,
"No connection selected");
} else {
send_metrics(server, client);
}
break;
case ETCPMON_CMD_DISCONNECT:
if (server->log_file) {
fprintf(server->log_file, "%llu: [LOG] Client requested disconnect\n",
(unsigned long long)get_timestamp_ms());
fflush(server->log_file);
}
close_client(server, client);
return;
default:
if (server->log_file) {
fprintf(server->log_file, "%llu: [ERROR] Unknown command from client: 0x%02X\n",
(unsigned long long)get_timestamp_ms(), hdr->type);
fflush(server->log_file);
}
DEBUG_WARN(DEBUG_CATEGORY_CONTROL, "Unknown command from client: 0x%02X", hdr->type);
send_error(client, ETCPMON_ERR_INVALID_CMD, "Unknown command");
break;
}
/* Remove processed message from buffer */
uint16_t msg_size = hdr->size;
if (client->recv_len > msg_size) {
memmove(client->recv_buffer, client->recv_buffer + msg_size,
client->recv_len - msg_size);
}
client->recv_len -= msg_size;
}
}
/* ============================================================================
* Response Builders
* ============================================================================ */
static void send_conn_list(struct control_server* server, struct control_client* client) {
struct UTUN_INSTANCE* instance = server->instance;
/* Count connections */
uint8_t count = 0;
struct ETCP_CONN* conn = instance->connections;
while (conn && count < ETCPMON_MAX_CONNECTIONS) {
count++;
conn = conn->next;
}
/* Build response */
uint16_t rsp_size = ETCPMON_CONN_LIST_SIZE(count);
uint8_t* buffer = (uint8_t*)malloc(rsp_size);
if (!buffer) {
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to allocate connection list buffer");
return;
}
struct etcpmon_msg_header* hdr = (struct etcpmon_msg_header*)buffer;
etcpmon_build_header(hdr, sizeof(struct etcpmon_rsp_conn_list) +
count * sizeof(struct etcpmon_conn_info),
ETCPMON_RSP_CONN_LIST);
struct etcpmon_rsp_conn_list* rsp = (struct etcpmon_rsp_conn_list*)(buffer + sizeof(*hdr));
rsp->count = count;
struct etcpmon_conn_info* info = (struct etcpmon_conn_info*)(buffer + sizeof(*hdr) + sizeof(*rsp));
conn = instance->connections;
for (uint8_t i = 0; i < count && conn; i++) {
info[i].peer_node_id = conn->peer_node_id;
strncpy(info[i].name, conn->log_name, ETCPMON_MAX_CONN_NAME - 1);
info[i].name[ETCPMON_MAX_CONN_NAME - 1] = '\0';
conn = conn->next;
}
/* Log and send response */
if (server->log_file) {
log_hex_data(server->log_file, "TX", buffer, rsp_size);
fprintf(server->log_file, "%llu: [LOG] Sent RSP_CONN_LIST count=%d\n",
(unsigned long long)get_timestamp_ms(), count);
fflush(server->log_file);
}
#ifdef _WIN32
send(client->fd, (const char*)buffer, rsp_size, 0);
#else
send(client->fd, buffer, rsp_size, 0);
#endif
free(buffer);
}
static void send_metrics(struct control_server* server, struct control_client* client) {
struct UTUN_INSTANCE* instance = server->instance;
/* Find selected connection */
struct ETCP_CONN* conn = find_connection_by_peer_id(instance, client->selected_peer_id);
if (!conn) {
send_error(client, ETCPMON_ERR_INVALID_CONN, "Connection not found");
return;
}
/* Count links */
uint8_t links_count = 0;
struct ETCP_LINK* link = conn->links;
while (link && links_count < ETCPMON_MAX_LINKS) {
links_count++;
link = link->next;
}
/* Build response */
uint16_t rsp_size = ETCPMON_METRICS_SIZE(links_count);
uint8_t* buffer = (uint8_t*)malloc(rsp_size);
if (!buffer) {
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to allocate metrics buffer");
return;
}
struct etcpmon_msg_header* hdr = (struct etcpmon_msg_header*)buffer;
etcpmon_build_header(hdr, sizeof(struct etcpmon_rsp_metrics) +
links_count * sizeof(struct etcpmon_link_metrics),
ETCPMON_RSP_METRICS);
struct etcpmon_rsp_metrics* rsp = (struct etcpmon_rsp_metrics*)(buffer + sizeof(*hdr));
/* Fill ETCP metrics */
rsp->etcp.rtt_last = conn->rtt_last;
rsp->etcp.rtt_avg_10 = conn->rtt_avg_10;
rsp->etcp.rtt_avg_100 = conn->rtt_avg_100;
rsp->etcp.jitter = conn->jitter;
rsp->etcp.bytes_sent_total = conn->bytes_sent_total;
rsp->etcp.retrans_count = conn->retransmissions_count;
rsp->etcp.ack_count = conn->ack_packets_count;
rsp->etcp.unacked_bytes = conn->unacked_bytes;
rsp->etcp.optimal_inflight = conn->optimal_inflight;
rsp->etcp.links_count = links_count;
/* Fill TUN metrics */
if (instance->tun) {
rsp->tun.bytes_read = instance->tun->bytes_read;
rsp->tun.bytes_written = instance->tun->bytes_written;
rsp->tun.packets_read = instance->tun->packets_read;
rsp->tun.packets_written = instance->tun->packets_written;
rsp->tun.read_errors = instance->tun->read_errors;
rsp->tun.write_errors = instance->tun->write_errors;
} else {
memset(&rsp->tun, 0, sizeof(rsp->tun));
}
/* Fill link metrics */
struct etcpmon_link_metrics* link_info = (struct etcpmon_link_metrics*)(buffer + sizeof(*hdr) + sizeof(*rsp));
link = conn->links;
for (uint8_t i = 0; i < links_count && link; i++) {
link_info[i].local_link_id = link->local_link_id;
link_info[i].status = link->link_status;
link_info[i].encrypt_errors = (uint32_t)link->encrypt_errors;
link_info[i].decrypt_errors = (uint32_t)link->decrypt_errors;
link_info[i].send_errors = (uint32_t)link->send_errors;
link_info[i].recv_errors = (uint32_t)link->recv_errors;
link_info[i].total_encrypted = link->total_encrypted;
link_info[i].total_decrypted = link->total_decrypted;
link_info[i].bandwidth = link->bandwidth;
link_info[i].nat_changes_count = link->nat_changes_count;
link = link->next;
}
/* Log and send response */
if (server->log_file) {
log_hex_data(server->log_file, "TX", buffer, rsp_size);
fprintf(server->log_file, "%llu: [LOG] Sent RSP_METRICS rtt=%u bytes=%llu links=%d\n",
(unsigned long long)get_timestamp_ms(),
rsp->etcp.rtt_last,
(unsigned long long)rsp->etcp.bytes_sent_total,
links_count);
fflush(server->log_file);
}
#ifdef _WIN32
send(client->fd, (const char*)buffer, rsp_size, 0);
#else
send(client->fd, buffer, rsp_size, 0);
#endif
free(buffer);
}
static void send_error(struct control_client* client, uint8_t error_code, const char* msg) {
struct control_server* server = client->server;
uint16_t msg_len = (uint16_t)strlen(msg);
uint16_t rsp_size = ETCPMON_ERROR_SIZE(msg_len);
uint8_t* buffer = (uint8_t*)malloc(rsp_size);
if (!buffer) return;
struct etcpmon_msg_header* hdr = (struct etcpmon_msg_header*)buffer;
etcpmon_build_header(hdr, sizeof(struct etcpmon_rsp_error) + msg_len + 1,
ETCPMON_RSP_ERROR);
struct etcpmon_rsp_error* rsp = (struct etcpmon_rsp_error*)(buffer + sizeof(*hdr));
rsp->error_code = error_code;
memcpy(buffer + sizeof(*hdr) + sizeof(*rsp), msg, msg_len + 1);
/* Log error response */
if (server && server->log_file) {
log_hex_data(server->log_file, "TX", buffer, rsp_size);
fprintf(server->log_file, "%llu: [LOG] Sent RSP_ERROR code=%d msg='%s'\n",
(unsigned long long)get_timestamp_ms(), error_code, msg);
fflush(server->log_file);
}
#ifdef _WIN32
send(client->fd, (const char*)buffer, rsp_size, 0);
#else
send(client->fd, buffer, rsp_size, 0);
#endif
free(buffer);
}
/* ============================================================================
* Helper Functions
* ============================================================================ */
static struct ETCP_CONN* find_connection_by_peer_id(struct UTUN_INSTANCE* instance,
uint64_t peer_id) {
struct ETCP_CONN* conn = instance->connections;
while (conn) {
if (conn->peer_node_id == peer_id) {
return conn;
}
conn = conn->next;
}
return NULL;
}
/* ============================================================================
* Public API Implementation
* ============================================================================ */
void control_server_process_updates(struct control_server* server) {
if (!server) return;
/* Process any pending data for all clients */
struct control_client* client = server->clients;
while (client) {
struct control_client* next = client->next;
if (!client->connected) {
close_client(server, client);
} else if (client->recv_len > 0) {
handle_client_data(server, client);
}
client = next;
}
}
uint32_t control_server_get_client_count(const struct control_server* server) {
return server ? server->client_count : 0;
}

120
src/control_server.h

@ -0,0 +1,120 @@
/*
* control_server.h - Control Socket Server for ETCP Monitoring
*
* Provides TCP control interface for ETCP connection monitoring
*/
#ifndef CONTROL_SERVER_H
#define CONTROL_SERVER_H
#include "../tools/etcpmon/etcpmon_protocol.h"
#include "../lib/socket_compat.h"
#include <stdint.h>
#include <stdio.h>
#ifdef __cplusplus
extern "C" {
#endif
/* Forward declarations */
struct UTUN_INSTANCE;
struct UASYNC;
/* Forward declaration */
struct control_server;
/* Client connection state */
struct control_client {
struct control_client* next; /* Linked list */
struct control_server* server; /* Back pointer to server */
socket_t fd; /* Client socket */
void* socket_id; /* uasync socket ID */
/* Receive buffer */
uint8_t recv_buffer[ETCPMON_MAX_MSG_SIZE];
uint16_t recv_len; /* Bytes received so far */
/* Selected connection */
uint64_t selected_peer_id; /* 0 = none selected */
/* Client state */
uint8_t connected;
};
/* Control server state */
struct control_server {
struct UTUN_INSTANCE* instance; /* Parent instance */
struct UASYNC* ua; /* Async context */
/* Listening socket */
socket_t listen_fd;
void* listen_socket_id; /* uasync socket ID */
struct sockaddr_storage bind_addr; /* Bound address */
/* Client connections */
struct control_client* clients; /* Linked list of clients */
uint32_t client_count; /* Number of connected clients */
uint32_t max_clients; /* Maximum allowed clients */
/* Log file */
FILE* log_file;
};
/* ============================================================================
* Public API
* ============================================================================ */
/* Initialize control server
*
* Creates listening socket and registers with uasync
*
* Parameters:
* server - Server state structure to initialize
* ua - uasync context for event handling
* instance - Parent utun instance
* bind_addr - Address to bind (from config control_sock)
* max_clients - Maximum concurrent client connections
*
* Returns:
* 0 on success, -1 on error
*/
int control_server_init(struct control_server* server,
struct UASYNC* ua,
struct UTUN_INSTANCE* instance,
struct sockaddr_storage* bind_addr,
uint32_t max_clients);
/* Shutdown control server
*
* Closes all client connections and listening socket
*
* Parameters:
* server - Server state to shutdown
*/
void control_server_shutdown(struct control_server* server);
/* Process periodic updates
*
* Should be called periodically (e.g., from main loop) to send
* metrics updates to clients that have selected connections
*
* Parameters:
* server - Server state
*/
void control_server_process_updates(struct control_server* server);
/* Get number of connected clients
*
* Parameters:
* server - Server state
*
* Returns:
* Number of connected clients
*/
uint32_t control_server_get_client_count(const struct control_server* server);
#ifdef __cplusplus
}
#endif
#endif /* CONTROL_SERVER_H */

23
src/tun_windows.c

@ -191,13 +191,26 @@ int tun_platform_init(struct tun_if* tun, const char* ifname, const char* ip_str
adapter_name = wname;
}
// Create or open Wintun adapter
GUID guid = {0};
WINTUN_ADAPTER_HANDLE adapter = WintunCreateAdapter(adapter_name, tunnel_type, &guid);
// Try to open existing adapter first
WINTUN_ADAPTER_HANDLE adapter = WintunOpenAdapter(adapter_name);
if (!adapter) {
DWORD err = GetLastError();
DEBUG_ERROR(DEBUG_CATEGORY_TUN, "Failed to create Wintun adapter: %lu", err);
return -1;
if (err == ERROR_NOT_FOUND || err == ERROR_FILE_NOT_FOUND) { // 1168 or 2
// Adapter does not exist → safe to create a new one
adapter = WintunCreateAdapter(adapter_name, tunnel_type, NULL);
if (!adapter) {
err = GetLastError();
DEBUG_ERROR(DEBUG_CATEGORY_TUN, "Failed to create new Wintun adapter: %lu", err);
return -1;
}
DEBUG_INFO(DEBUG_CATEGORY_TUN, "Created new Wintun adapter: %s", tun->ifname);
} else {
// Some other real failure (e.g. access denied, driver not loaded, etc.)
DEBUG_ERROR(DEBUG_CATEGORY_TUN, "Failed to open existing Wintun adapter: %lu", err);
return -1;
}
} else {
DEBUG_INFO(DEBUG_CATEGORY_TUN, "Opened existing Wintun adapter: %s", tun->ifname);
}
// Get adapter LUID for IP configuration

10
src/utun.c

@ -251,6 +251,12 @@ static void signal_handler(int sig) {
}
}
void test_tmr(void* arg) {
struct UASYNC* ua = (struct UASYNC*)arg;
uasync_set_timeout(ua, 10000, ua, test_tmr);
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "tick ...");
}
int main(int argc, char *argv[]) {
cmd_args_t args;
parse_args(argc, argv, &args);
@ -306,6 +312,8 @@ int main(int argc, char *argv[]) {
return 1;
}
uasync_set_timeout(ua, 10000, ua, test_tmr);
#ifdef _WIN32
// Check for wintun.dll before enabling TUN
if (access("wintun.dll", F_OK) != 0) {
@ -362,6 +370,8 @@ int main(int argc, char *argv[]) {
return 1;
}
DEBUG_INFO(DEBUG_CATEGORY_MEMORY, "Run mainloop");
while (instance->running) uasync_poll(ua, 100);
// Cleanup

26
src/utun_instance.c

@ -9,6 +9,7 @@
#include "route_bgp.h"
#include "etcp_connections.h"
#include "etcp.h"
#include "control_server.h"
#include "../lib/u_async.h"
#include "../lib/debug_config.h"
#include <stdlib.h>
@ -201,7 +202,15 @@ void utun_instance_destroy(struct UTUN_INSTANCE *instance) {
// Stop running if not already
instance->running = 0;
// Shutdown control server first
if (instance->control_srv) {
DEBUG_INFO(DEBUG_CATEGORY_CONTROL, "Shutting down control server");
control_server_shutdown(instance->control_srv);
free(instance->control_srv);
instance->control_srv = NULL;
}
// Cleanup ETCP sockets and connections FIRST (before destroying uasync)
DEBUG_INFO(DEBUG_CATEGORY_MEMORY, "[INSTANCE_DESTROY] Cleaning up ETCP sockets and connections");
struct ETCP_SOCKET* sock = instance->etcp_sockets;
@ -335,6 +344,21 @@ int utun_instance_init(struct UTUN_INSTANCE *instance) {
fprintf(stderr, "DEBUG: Connections initialized successfully, count=%d\n", instance->connections_count);
fflush(stderr);
// Initialize control server if configured
if (instance->config->global.control_sock.ss_family != 0) {
instance->control_srv = (struct control_server*)calloc(1, sizeof(struct control_server));
if (instance->control_srv) {
if (control_server_init(instance->control_srv, instance->ua, instance,
&instance->config->global.control_sock, 8) != 0) {
DEBUG_WARN(DEBUG_CATEGORY_CONTROL, "Failed to initialize control server, continuing without monitoring");
free(instance->control_srv);
instance->control_srv = NULL;
} else {
DEBUG_INFO(DEBUG_CATEGORY_CONTROL, "Control server initialized successfully");
}
}
}
// Start the main loop
instance->running = 1;

4
src/utun_instance.h

@ -17,6 +17,7 @@ struct ETCP_SOCKET;
struct tun_if;
struct ETCP_BINDINGS;
struct ROUTE_BGP;
struct control_server;
// uTun instance configuration
struct UTUN_INSTANCE {
@ -53,6 +54,9 @@ struct UTUN_INSTANCE {
// ETCP API bindings (per-instance)
struct ETCP_BINDINGS api_bindings;
// Control server for monitoring
struct control_server* control_srv;
};
// Functions

6
tests/Makefile.am

@ -24,6 +24,7 @@ check_PROGRAMS = \
test_bgp_route_exchange \
test_routing_mesh \
test_etcp_dummynet \
test_control_server \
bench_timeout_heap \
bench_uasync_timeouts
@ -209,6 +210,11 @@ bench_uasync_timeouts_SOURCES = bench_uasync_timeouts.c
bench_uasync_timeouts_CFLAGS = -I$(top_srcdir)/lib
bench_uasync_timeouts_LDADD = $(COMMON_LIBS)
# Control Server Test - Tests the ETCP monitoring protocol
test_control_server_SOURCES = test_control_server.c
test_control_server_CFLAGS = -I$(top_srcdir)/src -I$(top_srcdir)/lib -I$(top_srcdir)/tools/etcpmon
test_control_server_LDADD = $(top_builddir)/src/utun-control_server.o $(COMMON_LIBS)
# Build tinycrypt objects before tests that need them
BUILT_SOURCES = $(TINYCRYPT_BUILT)

Loading…
Cancel
Save