You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1048 lines
38 KiB

/*
* control_server.c - Control Socket Server Implementation
*
* Handles ETCP monitoring requests from clients
*/
#include "control_server.h"
#include "utun_instance.h"
#include "etcp.h"
#include "etcp_connections.h"
#include "tun_if.h"
#include "route_lib.h"
#include "route_bgp.h"
#include "pkt_normalizer.h"
#include "../lib/u_async.h"
#include "../lib/debug_config.h"
#include "../lib/mem.h"
#include "../lib/ll_queue.h"
#include <stdlib.h>
#include <string.h>
#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
#else
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#endif
#ifndef DEBUG_CATEGORY_CONTROL
#define DEBUG_CATEGORY_CONTROL 1
#endif
/* Log file name */
#define LOG_FILENAME "control_server.log"
/* ============================================================================
* Logging Helpers
* ============================================================================ */
#ifdef _WIN32
#include <windows.h>
static uint64_t get_timestamp_ms(void) {
FILETIME ft;
GetSystemTimeAsFileTime(&ft);
uint64_t time = ((uint64_t)ft.dwHighDateTime << 32) | ft.dwLowDateTime;
return (time / 10000) - 11644473600000ULL;
}
#else
#include <time.h>
static uint64_t get_timestamp_ms(void) {
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return (uint64_t)ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
}
#endif
static void log_hex_data(FILE* log, const char* prefix, const uint8_t* data, size_t len) {
if (!log || !data || len == 0) return;
fprintf(log, "%llu: [%s] ", (unsigned long long)get_timestamp_ms(), prefix);
for (size_t i = 0; i < len && i < 256; i++) {
fprintf(log, "%02X ", data[i]);
}
if (len > 256) {
fprintf(log, "... (%llu bytes total)", (unsigned long long)len);
}
fprintf(log, "\n");
fflush(log);
}
/* ============================================================================
* Forward Declarations
* ============================================================================ */
static void accept_callback(socket_t fd, void* arg);
static void client_read_callback(socket_t fd, void* arg);
static void client_write_callback(socket_t fd, void* arg);
static void client_except_callback(socket_t fd, void* arg);
static void handle_client_data(struct control_server* server, struct control_client* client);
static void close_client(struct control_server* server, struct control_client* client);
static void send_conn_list(struct control_server* server, struct control_client* client, uint8_t seq_id);
static void send_metrics(struct control_server* server, struct control_client* client, uint8_t seq_id);
static void send_error(struct control_client* client, uint8_t error_code, const char* msg, uint8_t seq_id);
static struct ETCP_CONN* find_connection_by_peer_id(struct UTUN_INSTANCE* instance, uint64_t peer_id);
/* ============================================================================
* Server Initialization
* ============================================================================ */
int control_server_init(struct control_server* server,
struct UASYNC* ua,
struct UTUN_INSTANCE* instance,
struct sockaddr_storage* bind_addr,
uint32_t max_clients) {
if (!server || !ua || !instance || !bind_addr) {
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Invalid parameters for control_server_init");
return -1;
}
memset(server, 0, sizeof(*server));
server->instance = instance;
server->ua = ua;
server->max_clients = max_clients ? max_clients : 8;
memcpy(&server->bind_addr, bind_addr, sizeof(*bind_addr));
/* Open log file (truncate on start) */
server->log_file = fopen(LOG_FILENAME, "w");
if (server->log_file) {
fprintf(server->log_file, "%llu: [LOG] Control server log started\n",
(unsigned long long)get_timestamp_ms());
fflush(server->log_file);
}
/* Create listening socket */
int family = bind_addr->ss_family;
server->listen_fd = socket(family, SOCK_STREAM, IPPROTO_TCP);
#ifdef _WIN32
if (server->listen_fd == INVALID_SOCKET) {
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to create listening socket: %d", WSAGetLastError());
return -1;
}
/* Set non-blocking mode */
u_long nonblock = 1;
ioctlsocket(server->listen_fd, FIONBIO, &nonblock);
/* Enable address reuse */
int reuse = 1;
if (setsockopt(server->listen_fd, SOL_SOCKET, SO_REUSEADDR,
(const char*)&reuse, sizeof(reuse)) == SOCKET_ERROR) {
DEBUG_WARN(DEBUG_CATEGORY_CONTROL, "Failed to set SO_REUSEADDR: %d", WSAGetLastError());
}
#else
if (server->listen_fd < 0) {
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to create listening socket: %s", strerror(errno));
return -1;
}
int reuse = 1;
if (setsockopt(server->listen_fd, SOL_SOCKET, SO_REUSEADDR,
&reuse, sizeof(reuse)) < 0) {
DEBUG_WARN(DEBUG_CATEGORY_CONTROL, "Failed to set SO_REUSEADDR: %s", strerror(errno));
}
/* Set non-blocking */
int flags = fcntl(server->listen_fd, F_GETFL, 0);
if (flags >= 0) {
fcntl(server->listen_fd, F_SETFL, flags | O_NONBLOCK);
}
#endif
/* Bind to address */
socklen_t addr_len = (family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6);
#ifdef _WIN32
if (bind(server->listen_fd, (struct sockaddr*)bind_addr, addr_len) == SOCKET_ERROR) {
int err = WSAGetLastError();
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to bind control socket: %d", err);
closesocket(server->listen_fd);
server->listen_fd = INVALID_SOCKET;
return -1;
}
/* Listen */
if (listen(server->listen_fd, 5) == SOCKET_ERROR) {
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to listen on control socket: %d", WSAGetLastError());
closesocket(server->listen_fd);
server->listen_fd = INVALID_SOCKET;
return -1;
}
#else
if (bind(server->listen_fd, (struct sockaddr*)bind_addr, addr_len) < 0) {
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to bind control socket: %s", strerror(errno));
close(server->listen_fd);
server->listen_fd = -1;
return -1;
}
if (listen(server->listen_fd, 5) < 0) {
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to listen on control socket: %s", strerror(errno));
close(server->listen_fd);
server->listen_fd = -1;
return -1;
}
#endif
/* Register with uasync */
server->listen_socket_id = uasync_add_socket_t(ua, server->listen_fd,
accept_callback,
NULL, /* write callback */
NULL, /* except callback */
server);
if (!server->listen_socket_id) {
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to register control socket with uasync");
#ifdef _WIN32
closesocket(server->listen_fd);
server->listen_fd = INVALID_SOCKET;
#else
close(server->listen_fd);
server->listen_fd = -1;
#endif
return -1;
}
if (family == AF_INET) {
struct sockaddr_in* sin = (struct sockaddr_in*)bind_addr;
DEBUG_INFO(DEBUG_CATEGORY_CONTROL, "Control server listening on %s:%d",
ip_to_str(&sin->sin_addr, AF_INET).str, ntohs(sin->sin_port));
} else {
struct sockaddr_in6* sin6 = (struct sockaddr_in6*)bind_addr;
DEBUG_INFO(DEBUG_CATEGORY_CONTROL, "Control server listening on [%s]:%d",
ip_to_str(&sin6->sin6_addr, AF_INET6).str, ntohs(sin6->sin6_port));
}
return 0;
}
/* ============================================================================
* Server Shutdown
* ============================================================================ */
void control_server_shutdown(struct control_server* server) {
if (!server) return;
/* Close all client connections */
while (server->clients) {
close_client(server, server->clients);
}
#ifdef _WIN32
if (server->listen_fd != INVALID_SOCKET) {
uasync_remove_socket_t(server->ua, server->listen_fd);
server->listen_socket_id = NULL;
closesocket(server->listen_fd);
server->listen_fd = INVALID_SOCKET;
}
#else
if (server->listen_fd >= 0) {
uasync_remove_socket_t(server->ua, server->listen_fd);
server->listen_socket_id = NULL;
close(server->listen_fd);
server->listen_fd = -1;
}
#endif
/* Close log file */
if (server->log_file) {
fprintf(server->log_file, "%llu: [LOG] Control server shutdown complete\n",
(unsigned long long)get_timestamp_ms());
fclose(server->log_file);
server->log_file = NULL;
}
DEBUG_INFO(DEBUG_CATEGORY_CONTROL, "Control server shutdown complete");
}
/* ============================================================================
* Client Connection Handling
* ============================================================================ */
static int is_control_ip_allowed(const struct control_server* server, uint32_t client_ip) {
if (!server || !server->instance || !server->instance->config) {
DEBUG_WARN(DEBUG_CATEGORY_CONTROL, "Control IP check: no config available");
return 0;
}
const struct global_config *g = &server->instance->config->global;
if (g->control_allow_count == 0) {
DEBUG_WARN(DEBUG_CATEGORY_CONTROL, "Control connection denied (no allow rules) - add control_allow=IP/mask to [control] in config");
return 0;
}
for (int i = 0; i < g->control_allow_count; i++) {
const struct CFG_CONTROL_ALLOW *r = &g->control_allows[i];
if ((client_ip & r->netmask) == r->network) return 1;
}
DEBUG_WARN(DEBUG_CATEGORY_CONTROL, "Control connection denied from IP (not in allow list) - add control_allow=IP/mask to [control]");
return 0;
}
static void accept_callback(socket_t fd, void* arg) {
struct control_server* server = (struct control_server*)arg;
struct sockaddr_storage client_addr;
socklen_t addr_len = sizeof(client_addr);
#ifdef _WIN32
socket_t client_fd = accept(fd, (struct sockaddr*)&client_addr, &addr_len);
if (client_fd == INVALID_SOCKET) {
int err = WSAGetLastError();
if (err != WSAEWOULDBLOCK) {
if (server->log_file) {
fprintf(server->log_file, "%llu: [ERROR] Accept failed: %d\n",
(unsigned long long)get_timestamp_ms(), err);
fflush(server->log_file);
}
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Accept failed: %d", err);
}
return;
}
/* Set non-blocking */
u_long nonblock = 1;
ioctlsocket(client_fd, FIONBIO, &nonblock);
/* Small delay to let socket stabilize (Windows-specific workaround) */
// Sleep(10);
#else
socket_t client_fd = accept(fd, (struct sockaddr*)&client_addr, &addr_len);
if (client_fd < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
if (server->log_file) {
fprintf(server->log_file, "%llu: [ERROR] Accept failed: %s\n",
(unsigned long long)get_timestamp_ms(), strerror(errno));
fflush(server->log_file);
}
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Accept failed: %s", strerror(errno));
}
return;
}
/* Set non-blocking */
int flags = fcntl(client_fd, F_GETFL, 0);
if (flags >= 0) {
fcntl(client_fd, F_SETFL, flags | O_NONBLOCK);
}
#endif
/* Check allowed IP (default deny all) */
uint32_t client_ip = 0;
if (client_addr.ss_family == AF_INET) {
struct sockaddr_in* sin = (struct sockaddr_in*)&client_addr;
client_ip = ntohl(sin->sin_addr.s_addr);
} else {
DEBUG_WARN(DEBUG_CATEGORY_CONTROL, "IPv6 not supported for control allow list");
#ifdef _WIN32
closesocket(client_fd);
#else
close(client_fd);
#endif
return;
}
if (!is_control_ip_allowed(server, client_ip)) {
#ifdef _WIN32
closesocket(client_fd);
#else
close(client_fd);
#endif
return;
}
DEBUG_WARN(DEBUG_CATEGORY_CONTROL, "Accept...");
/* Check max clients */
if (server->client_count >= server->max_clients) {
if (server->log_file) {
fprintf(server->log_file, "%llu: [ERROR] Max clients reached, rejecting connection\n",
(unsigned long long)get_timestamp_ms());
fflush(server->log_file);
}
DEBUG_WARN(DEBUG_CATEGORY_CONTROL, "Max clients reached, rejecting connection");
#ifdef _WIN32
closesocket(client_fd);
#else
close(client_fd);
#endif
return;
}
/* Allocate client structure */
struct control_client* client = (struct control_client*)u_calloc(1, sizeof(*client));
if (!client) {
if (server->log_file) {
fprintf(server->log_file, "%llu: [ERROR] Failed to allocate client structure\n",
(unsigned long long)get_timestamp_ms());
fflush(server->log_file);
}
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to allocate client structure");
#ifdef _WIN32
closesocket(client_fd);
#else
close(client_fd);
#endif
return;
}
client->fd = client_fd;
client->server = server; /* Store back pointer to server */
// client->connected = 1;
client->selected_peer_id = 0;
/* Register with uasync */
client->socket_id = uasync_add_socket_t(server->ua, client_fd,
client_read_callback,
NULL, /* write callback - not needed for now */
client_except_callback,
client);
if (!client->socket_id) {
if (server->log_file) {
fprintf(server->log_file, "%llu: [ERROR] Failed to register client socket with uasync\n",
(unsigned long long)get_timestamp_ms());
fflush(server->log_file);
}
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to register client socket with uasync");
u_free(client);
#ifdef _WIN32
closesocket(client_fd);
#else
close(client_fd);
#endif
return;
}
/* Add to list */
client->next = server->clients;
server->clients = client;
server->client_count++;
const char* addr_str;
if (client_addr.ss_family == AF_INET) {
struct sockaddr_in* sin = (struct sockaddr_in*)&client_addr;
addr_str = ip_to_str(&sin->sin_addr, AF_INET).str;
} else {
struct sockaddr_in6* sin6 = (struct sockaddr_in6*)&client_addr;
addr_str = ip_to_str(&sin6->sin6_addr, AF_INET6).str;
}
if (server->log_file) {
fprintf(server->log_file, "%llu: [LOG] Client connected from %s (total: %u)\n",
(unsigned long long)get_timestamp_ms(), addr_str, server->client_count);
fflush(server->log_file);
}
DEBUG_INFO(DEBUG_CATEGORY_CONTROL, "Control client connected from %s (total: %u)",
addr_str, server->client_count);
}
static void client_read_callback(socket_t fd, void* arg) {
struct control_client* client = (struct control_client*)arg;
struct control_server* server = client->server;
/* Read available data */
uint8_t* buf = client->recv_buffer + client->recv_len;
size_t buf_space = ETCPMON_MAX_MSG_SIZE - client->recv_len;
#ifdef _WIN32
int received = recv(fd, (char*)buf, (int)buf_space, 0);
if (received == SOCKET_ERROR) {
int err = WSAGetLastError();
if (err != WSAEWOULDBLOCK) {
if (err == 10054) {
/* Connection reset by peer — нормальное отключение */
if (server) {
close_client(server, client);
}
return;
}
if (server && server->log_file) {
fprintf(server->log_file, "%llu: [ERROR] Client recv error: %d\n",
(unsigned long long)get_timestamp_ms(), err);
fflush(server->log_file);
}
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Client recv error: %d", err);
if (server) {
close_client(server, client);
}
return;
}
return;
}
if (received == 0) {
/* Connection closed gracefully */
if (server && server->log_file) {
fprintf(server->log_file, "%llu: [LOG] Client disconnected (recv returned 0)\n",
(unsigned long long)get_timestamp_ms());
fflush(server->log_file);
}
if (server) {
close_client(server, client);
}
return;
}
#else
ssize_t received = recv(fd, buf, buf_space, 0);
if (received < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
if (server && server->log_file) {
fprintf(server->log_file, "%llu: [ERROR] Client recv error: %s\n",
(unsigned long long)get_timestamp_ms(), strerror(errno));
fflush(server->log_file);
}
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Client recv error: %s", strerror(errno));
if (server) {
close_client(server, client);
}
return;
}
return;
}
if (received == 0) {
/* Connection closed gracefully */
if (server && server->log_file) {
fprintf(server->log_file, "%llu: [LOG] Client disconnected (recv returned 0)\n",
(unsigned long long)get_timestamp_ms());
fflush(server->log_file);
}
if (server) {
close_client(server, client);
}
return;
}
#endif
/* Log received data */
if (server && server->log_file) {
log_hex_data(server->log_file, "RX", buf, received);
}
client->recv_len += received;
if (client->recv_len > ETCPMON_MAX_MSG_SIZE) {
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Control recv buffer overflow");
if (server) close_client(server, client);
return;
}
if (server) {
handle_client_data(server, client);
}
}
static void client_write_callback(socket_t fd, void* arg) {
/* Not used for now - writes are immediate */
(void)fd;
(void)arg;
}
static void client_except_callback(socket_t fd, void* arg) {
struct control_client* client = (struct control_client*)arg;
struct control_server* server = client ? client->server : NULL;
if (server) {
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Client socket exception");
close_client(server, client); // <-- сразу удаляем из uasync
}
(void)fd;
}
static void close_client(struct control_server* server, struct control_client* client) {
if (!server || !client) return;
/* Log disconnection */
if (server->log_file) {
fprintf(server->log_file, "%llu: [LOG] Client connection closed\n",
(unsigned long long)get_timestamp_ms());
fflush(server->log_file);
}
/* Close socket */
#ifdef _WIN32
if (client->fd != INVALID_SOCKET) {
uasync_remove_socket_t(server->ua, client->fd);
closesocket(client->fd);
}
#else
if (client->fd >= 0) {
uasync_remove_socket_t(server->ua, client->fd);
close(client->fd);
}
#endif
/* Remove from list */
struct control_client** curr = &server->clients;
while (*curr) {
if (*curr == client) {
*curr = client->next;
break;
}
curr = &(*curr)->next;
}
server->client_count--;
u_free(client);
DEBUG_INFO(DEBUG_CATEGORY_CONTROL, "Control client disconnected (total: %u)",
server->client_count);
}
/* ============================================================================
* Message Handling
* ============================================================================ */
static void handle_client_data(struct control_server* server, struct control_client* client) {
while (client->recv_len >= sizeof(struct etcpmon_msg_header)) {
struct etcpmon_msg_header* hdr = (struct etcpmon_msg_header*)client->recv_buffer;
if (hdr->size == 0 || hdr->size > ETCPMON_MAX_MSG_SIZE) {
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Invalid message size from client: %u", hdr->size);
close_client(server, client);
return;
}
if (hdr->type < ETCPMON_CMD_LIST_CONN || hdr->type > ETCPMON_CMD_DISCONNECT) {
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Invalid command type from client: 0x%02X", hdr->type);
close_client(server, client);
return;
}
/* Validate header */
if (etcpmon_validate_header(hdr) != 0) {
if (server->log_file) {
fprintf(server->log_file, "%llu: [ERROR] Invalid message header from client\n",
(unsigned long long)get_timestamp_ms());
fflush(server->log_file);
}
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Invalid message header from client");
close_client(server, client);
return;
}
/* Check if full message received */
if (client->recv_len < hdr->size) {
break; /* Wait for more data */
}
/* Process message */
uint8_t* payload = client->recv_buffer + sizeof(struct etcpmon_msg_header);
uint16_t payload_size = hdr->size - sizeof(struct etcpmon_msg_header);
uint8_t req_seq = hdr->seq_id;
if (server->log_file) {
fprintf(server->log_file, "%llu: [LOG] Received command type=0x%02X seq=%d size=%d\n",
(unsigned long long)get_timestamp_ms(), hdr->type, req_seq, hdr->size);
fflush(server->log_file);
}
switch (hdr->type) {
case ETCPMON_CMD_LIST_CONN:
send_conn_list(server, client, req_seq);
break;
case ETCPMON_CMD_SELECT_CONN:
if (payload_size >= sizeof(struct etcpmon_cmd_select)) {
struct etcpmon_cmd_select* cmd = (struct etcpmon_cmd_select*)payload;
client->selected_peer_id = cmd->peer_node_id;
if (server->log_file) {
fprintf(server->log_file, "%llu: [LOG] Client selected connection: %016llX\n",
(unsigned long long)get_timestamp_ms(), (unsigned long long)cmd->peer_node_id);
fflush(server->log_file);
}
DEBUG_INFO(DEBUG_CATEGORY_CONTROL, "Client selected connection: %016llX",
(unsigned long long)cmd->peer_node_id);
} else {
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Bad SELECT_CONN payload size %u", payload_size);
close_client(server, client);
return;
}
break;
case ETCPMON_CMD_GET_METRICS:
if (client->selected_peer_id == 0) {
send_error(client, ETCPMON_ERR_NO_CONN_SELECTED,
"No connection selected", req_seq);
} else {
send_metrics(server, client, req_seq);
}
break;
case ETCPMON_CMD_DISCONNECT:
if (server->log_file) {
fprintf(server->log_file, "%llu: [LOG] Client requested disconnect\n",
(unsigned long long)get_timestamp_ms());
fflush(server->log_file);
}
close_client(server, client);
return;
default:
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Unknown command from client: 0x%02X", hdr->type);
close_client(server, client);
return;
}
/* Remove processed message from buffer */
uint16_t msg_size = hdr->size;
if (client->recv_len > msg_size) {
memmove(client->recv_buffer, client->recv_buffer + msg_size,
client->recv_len - msg_size);
}
client->recv_len -= msg_size;
}
}
/* ============================================================================
* Response Builders
* ============================================================================ */
static void send_conn_list(struct control_server* server, struct control_client* client, uint8_t seq_id) {
struct UTUN_INSTANCE* instance = server->instance;
/* Count connections */
uint8_t count = 0;
struct ETCP_CONN* conn = instance->connections;
while (conn && count < ETCPMON_MAX_CONNECTIONS) {
count++;
conn = conn->next;
}
/* Build response */
uint16_t rsp_size = ETCPMON_CONN_LIST_SIZE(count);
uint8_t* buffer = (uint8_t*)u_malloc(rsp_size);
if (!buffer) {
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to allocate connection list buffer");
return;
}
struct etcpmon_msg_header* hdr = (struct etcpmon_msg_header*)buffer;
etcpmon_build_header(hdr, sizeof(struct etcpmon_rsp_conn_list) +
count * sizeof(struct etcpmon_conn_info),
ETCPMON_RSP_CONN_LIST,
seq_id);
struct etcpmon_rsp_conn_list* rsp = (struct etcpmon_rsp_conn_list*)(buffer + sizeof(*hdr));
rsp->count = count;
struct etcpmon_conn_info* info = (struct etcpmon_conn_info*)(buffer + sizeof(*hdr) + sizeof(*rsp));
conn = instance->connections;
for (uint8_t i = 0; i < count && conn; i++) {
info[i].peer_node_id = conn->peer_node_id;
strncpy(info[i].name, conn->log_name, ETCPMON_MAX_CONN_NAME - 1);
info[i].name[ETCPMON_MAX_CONN_NAME - 1] = '\0';
conn = conn->next;
}
/* Log and send response */
if (server->log_file) {
log_hex_data(server->log_file, "TX", buffer, rsp_size);
fprintf(server->log_file, "%llu: [LOG] Sent RSP_CONN_LIST count=%d\n",
(unsigned long long)get_timestamp_ms(), count);
fflush(server->log_file);
}
#ifdef _WIN32
send(client->fd, (const char*)buffer, rsp_size, 0);
#else
send(client->fd, buffer, rsp_size, 0);
#endif
u_free(buffer);
}
static void send_metrics(struct control_server* server, struct control_client* client, uint8_t seq_id) {
struct UTUN_INSTANCE* instance = server->instance;
/* Find selected connection */
struct ETCP_CONN* conn = find_connection_by_peer_id(instance, client->selected_peer_id);
if (!conn) {
send_error(client, ETCPMON_ERR_INVALID_CONN, "Connection not found", seq_id);
return;
}
/* Count links */
uint8_t links_count = 0;
struct ETCP_LINK* link = conn->links;
while (link && links_count < ETCPMON_MAX_LINKS) {
links_count++;
link = link->next;
}
/* Build response */
uint16_t rsp_size = ETCPMON_METRICS_SIZE(links_count);
uint8_t* buffer = (uint8_t*)u_malloc(rsp_size);
if (!buffer) {
DEBUG_ERROR(DEBUG_CATEGORY_CONTROL, "Failed to allocate metrics buffer");
return;
}
struct etcpmon_msg_header* hdr = (struct etcpmon_msg_header*)buffer;
etcpmon_build_header(hdr, sizeof(struct etcpmon_rsp_metrics) +
links_count * sizeof(struct etcpmon_link_metrics),
ETCPMON_RSP_METRICS,
seq_id);
struct etcpmon_rsp_metrics* rsp = (struct etcpmon_rsp_metrics*)(buffer + sizeof(*hdr));
/* Fill ETCP metrics */
rsp->etcp.peer_node_id = conn->peer_node_id;
rsp->etcp.rtt_last = conn->rtt_last;
rsp->etcp.rtt_avg_10 = conn->rtt_avg_10;
rsp->etcp.rtt_avg_100 = conn->rtt_avg_100;
rsp->etcp.jitter = conn->jitter;
rsp->etcp.bytes_sent_total = conn->bytes_sent_total;
rsp->etcp.retrans_count = conn->retransmissions_count;
rsp->etcp.ack_count = conn->ack_packets_count;
rsp->etcp.unacked_bytes = conn->unacked_bytes;
rsp->etcp.optimal_inflight = conn->optimal_inflight;
rsp->etcp.links_count = links_count;
/* Queue metrics */
rsp->etcp.input_queue_bytes = (uint32_t)queue_total_bytes(conn->input_queue);
rsp->etcp.input_queue_packets = (uint32_t)queue_entry_count(conn->input_queue);
rsp->etcp.input_send_q_bytes = (uint32_t)queue_total_bytes(conn->input_send_q);
rsp->etcp.input_send_q_packets = (uint32_t)queue_entry_count(conn->input_send_q);
rsp->etcp.input_wait_ack_bytes = (uint32_t)queue_total_bytes(conn->input_wait_ack);
rsp->etcp.input_wait_ack_packets = (uint32_t)queue_entry_count(conn->input_wait_ack);
rsp->etcp.ack_q_bytes = (uint32_t)queue_total_bytes(conn->ack_q);
rsp->etcp.ack_q_packets = (uint32_t)queue_entry_count(conn->ack_q);
rsp->etcp.recv_q_bytes = (uint32_t)queue_total_bytes(conn->recv_q);
rsp->etcp.recv_q_packets = (uint32_t)queue_entry_count(conn->recv_q);
rsp->etcp.output_queue_bytes = (uint32_t)queue_total_bytes(conn->output_queue);
rsp->etcp.output_queue_packets = (uint32_t)queue_entry_count(conn->output_queue);
/* Error counters */
rsp->etcp.reinit_count = conn->reinit_count;
rsp->etcp.reset_count = conn->reset_count;
rsp->etcp.pkt_format_errors = (conn->links && conn->links->conn) ? (uint32_t)conn->links->conn->pkt_format_errors : 0;
/* Timer flags */
rsp->etcp.retrans_timer_active = (conn->retrans_timer != NULL) ? 1 : 0;
rsp->etcp.ack_resp_timer_active = (conn->ack_resp_timer != NULL) ? 1 : 0;
/* Connection IDs */
rsp->etcp.next_tx_id = conn->next_tx_id;
rsp->etcp.last_rx_id = conn->last_rx_id;
rsp->etcp.last_delivered_id = conn->last_delivered_id;
rsp->etcp.rx_ack_till = conn->rx_ack_till;
/* input_wait_ack queue state */
rsp->etcp.wait_ack_cb_suspended = (uint8_t)conn->input_wait_ack->callback_suspended;
rsp->etcp.wait_ack_cb_set = (conn->input_wait_ack->callback != NULL) ? 1 : 0;
rsp->etcp.wait_ack_resume_timeout = (conn->input_wait_ack->resume_timeout_id != NULL) ? 1 : 0;
/* Normalizer */
if (conn->normalizer) {
struct PKTNORM* norm = (struct PKTNORM*)conn->normalizer;
rsp->etcp.norm_input_pkts = (uint32_t)queue_entry_count(norm->input);
rsp->etcp.norm_input_bytes = (uint32_t)queue_total_bytes(norm->input);
rsp->etcp.norm_output_pkts = (uint32_t)queue_entry_count(norm->output);
rsp->etcp.norm_output_bytes = (uint32_t)queue_total_bytes(norm->output);
rsp->etcp.norm_alloc_errors = norm->alloc_errors;
rsp->etcp.norm_logic_errors = norm->logic_errors;
rsp->etcp.norm_frag_size = norm->frag_size;
rsp->etcp.norm_data_ptr = norm->data_ptr;
rsp->etcp.norm_data_size = norm->data_size;
rsp->etcp.norm_in_total_pkts = norm->in_total_pkts;
rsp->etcp.norm_in_total_bytes = norm->in_total_bytes;
rsp->etcp.norm_out_total_pkts = norm->out_total_pkts;
rsp->etcp.norm_out_total_bytes = norm->out_total_bytes;
} else {
rsp->etcp.norm_input_pkts = 0;
rsp->etcp.norm_input_bytes = 0;
rsp->etcp.norm_output_pkts = 0;
rsp->etcp.norm_output_bytes = 0;
rsp->etcp.norm_alloc_errors = 0;
rsp->etcp.norm_logic_errors = 0;
rsp->etcp.norm_frag_size = 0;
rsp->etcp.norm_data_ptr = 0;
rsp->etcp.norm_data_size = 0;
rsp->etcp.norm_in_total_pkts = 0;
rsp->etcp.norm_in_total_bytes = 0;
rsp->etcp.norm_out_total_pkts = 0;
rsp->etcp.norm_out_total_bytes = 0;
}
/* ACK debug counters */
rsp->etcp.cnt_ack_hit_inf = conn->cnt_ack_hit_inf;
rsp->etcp.cnt_ack_hit_sndq = conn->cnt_ack_hit_sndq;
rsp->etcp.cnt_ack_miss = conn->cnt_ack_miss;
rsp->etcp.cnt_link_wait = conn->cnt_link_wait;
rsp->etcp.tx_state = conn->tx_state;
for (int i = 0; i < 8; i++) {
rsp->etcp.debug[i] = conn->debug[i];
}
/* Fill TUN metrics */
if (instance->tun) {
rsp->tun.bytes_read = instance->tun->bytes_read;
rsp->tun.bytes_written = instance->tun->bytes_written;
rsp->tun.packets_read = instance->tun->packets_read;
rsp->tun.packets_written = instance->tun->packets_written;
rsp->tun.read_errors = instance->tun->read_errors;
rsp->tun.write_errors = instance->tun->write_errors;
/* Routing statistics */
rsp->tun.routed_packets = instance->routed_packets;
rsp->tun.dropped_packets = instance->dropped_packets;
/* TUN queues */
rsp->tun.tun_in_q_packets = (uint32_t)queue_entry_count(instance->tun->input_queue);
rsp->tun.tun_in_q_bytes = (uint32_t)queue_total_bytes(instance->tun->input_queue);
rsp->tun.tun_out_q_packets = (uint32_t)queue_entry_count(instance->tun->output_queue);
rsp->tun.tun_out_q_bytes = (uint32_t)queue_total_bytes(instance->tun->output_queue);
/* Routing table */
if (instance->rt) {
rsp->tun.rt_count = (uint32_t)instance->rt->count;
rsp->tun.rt_local = (uint32_t)instance->rt->stats.local_routes;
rsp->tun.rt_learned = (uint32_t)instance->rt->stats.learned_routes;
} else {
rsp->tun.rt_count = 0;
rsp->tun.rt_local = 0;
rsp->tun.rt_learned = 0;
}
/* BGP routing */
if (instance->bgp) {
rsp->tun.rt_bgp_senders = (uint32_t)queue_entry_count(instance->bgp->senders_list);
rsp->tun.rt_bgp_nodes = (uint32_t)queue_entry_count(instance->bgp->nodes);
} else {
rsp->tun.rt_bgp_senders = 0;
rsp->tun.rt_bgp_nodes = 0;
}
} else {
memset(&rsp->tun, 0, sizeof(rsp->tun));
}
/* Fill link metrics */
struct etcpmon_link_metrics* link_info = (struct etcpmon_link_metrics*)(buffer + sizeof(*hdr) + sizeof(*rsp));
link = conn->links;
for (uint8_t i = 0; i < links_count && link; i++) {
link_info[i].local_link_id = link->local_link_id;
link_info[i].status = link->link_status;
link_info[i].encrypt_errors = (uint32_t)link->encrypt_errors;
link_info[i].decrypt_errors = (uint32_t)link->decrypt_errors;
link_info[i].send_errors = (uint32_t)link->send_errors;
link_info[i].recv_errors = (uint32_t)link->recv_errors;
link_info[i].total_encrypted = link->total_encrypted;
link_info[i].total_decrypted = link->total_decrypted;
link_info[i].bandwidth = link->bandwidth;
link_info[i].nat_changes_count = link->nat_changes_count;
link_info[i].rtt_last = link->rtt_last;
link_info[i].rtt_avg10 = link->rtt_avg10;
link_info[i].tt_last = link->tt_last;
link_info[i].init_timer_active = (link->init_timer != NULL) ? 1 : 0;
link_info[i].keepalive_timer_active = (link->keepalive_timer != NULL) ? 1 : 0;
link_info[i].shaper_timer_active = (link->shaper_timer != NULL) ? 1 : 0;
link_info[i].keepalive_sent = (uint32_t)link->keepalive_sent_count;
link_info[i].keepalive_recv = (uint32_t)link->keepalive_recv_count;
link_info[i].inflight_bytes = link->inflight_bytes;
link_info[i].inflight_packets = link->inflight_packets;
link = link->next;
}
/* Log and send response */
if (server->log_file) {
log_hex_data(server->log_file, "TX", buffer, rsp_size);
fprintf(server->log_file, "%llu: [LOG] Sent RSP_METRICS rtt=%u bytes=%llu links=%d\n",
(unsigned long long)get_timestamp_ms(),
rsp->etcp.rtt_last,
(unsigned long long)rsp->etcp.bytes_sent_total,
links_count);
fflush(server->log_file);
}
#ifdef _WIN32
send(client->fd, (const char*)buffer, rsp_size, 0);
#else
send(client->fd, buffer, rsp_size, 0);
#endif
u_free(buffer);
}
static void send_error(struct control_client* client, uint8_t error_code, const char* msg, uint8_t seq_id) {
struct control_server* server = client->server;
uint16_t msg_len = (uint16_t)strlen(msg);
uint16_t rsp_size = ETCPMON_ERROR_SIZE(msg_len);
uint8_t* buffer = (uint8_t*)u_malloc(rsp_size);
if (!buffer) return;
struct etcpmon_msg_header* hdr = (struct etcpmon_msg_header*)buffer;
etcpmon_build_header(hdr, sizeof(struct etcpmon_rsp_error) + msg_len + 1,
ETCPMON_RSP_ERROR,
seq_id);
struct etcpmon_rsp_error* rsp = (struct etcpmon_rsp_error*)(buffer + sizeof(*hdr));
rsp->error_code = error_code;
memcpy(buffer + sizeof(*hdr) + sizeof(*rsp), msg, msg_len + 1);
/* Log error response */
if (server && server->log_file) {
log_hex_data(server->log_file, "TX", buffer, rsp_size);
fprintf(server->log_file, "%llu: [LOG] Sent RSP_ERROR code=%d msg='%s'\n",
(unsigned long long)get_timestamp_ms(), error_code, msg);
fflush(server->log_file);
}
#ifdef _WIN32
send(client->fd, (const char*)buffer, rsp_size, 0);
#else
send(client->fd, buffer, rsp_size, 0);
#endif
u_free(buffer);
}
/* ============================================================================
* Helper Functions
* ============================================================================ */
static struct ETCP_CONN* find_connection_by_peer_id(struct UTUN_INSTANCE* instance,
uint64_t peer_id) {
struct ETCP_CONN* conn = instance->connections;
while (conn) {
if (conn->peer_node_id == peer_id) {
return conn;
}
conn = conn->next;
}
return NULL;
}
/* ============================================================================
* Public API Implementation
* ============================================================================ */
void control_server_process_updates(struct control_server* server) {
if (!server) return;
/* Process any pending data for all clients */
struct control_client* client = server->clients;
while (client) {
struct control_client* next = client->next;
// if (!client->connected) {
// close_client(server, client);
// } else if (client->recv_len > 0) {
handle_client_data(server, client);
// }
client = next;
}
}
uint32_t control_server_get_client_count(const struct control_server* server) {
return server ? server->client_count : 0;
}