You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1416 lines
59 KiB

#include "etcp_connections.h"
#include "../lib/socket_compat.h"
#include "../lib/platform_compat.h"
#ifndef _WIN32
#include <net/if.h>
#endif
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include "utun_instance.h"
#include "config_parser.h"
#include "crc32.h"
#include "etcp.h"
#include "../lib/memory_pool.h"
#include "../lib/u_async.h"
#include "../lib/debug_config.h"
#include "etcp_loadbalancer.h"
#include <stdlib.h>
#include <time.h>
#include "../lib/mem.h"
#include "etcp.h"
// Forward declaration
static void etcp_connections_read_callback_socket(socket_t sock, void* arg);
static void etcp_link_remove_from_connections(struct ETCP_SOCKET* conn, struct ETCP_LINK* link);
static void etcp_link_send_init(struct ETCP_LINK* link, uint8_t reset);
//static int etcp_link_send_reset(struct ETCP_LINK* link);
static void etcp_link_init_timer_cbk(void* arg);
static void etcp_link_send_keepalive(struct ETCP_LINK* link);
static void keepalive_timer_cb(void* arg);
static void link_stats_timer_cb(void* arg);
void etcp_link_update_inflight_lim(struct ETCP_LINK* link, uint32_t new_lim) {
if (!link) return;
uint32_t old = link->inflight_lim_bytes;
link->inflight_lim_bytes = new_lim;
if (old != new_lim && link->inflight_bytes < new_lim && link->send_blocked_inflight) {
link->send_blocked_inflight = 0;
loadbalancer_link_ready(link);
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_link_update_inflight_lim: unblocked link (lim %u->%u)", old, new_lim);
}
}
#define INIT_TIMEOUT_INITIAL 500
#define INIT_TIMEOUT_MAX 50000
static void etcp_link_send_init(struct ETCP_LINK* link, uint8_t reset) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
DEBUG_INFO(DEBUG_CATEGORY_CONNECTION, "link=%p, is_server=%d, reset=%d", link, link ? link->is_server : -1, reset);
if (!link || !link->etcp || !link->etcp->instance) return;
struct ETCP_DGRAM* dgram = u_malloc(PACKET_DATA_SIZE);
if (!dgram) {
DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "malloc failed");
return;
}
dgram->link = link;
dgram->noencrypt_len = SC_PUBKEY_ENC_SIZE;
size_t offset = 0;
// reset=1: ETCP_INIT_REQUEST (0x02), reset=0: ETCP_INIT_REQUEST_NOINIT (0x04)
dgram->data[offset++] = reset ? ETCP_INIT_REQUEST : ETCP_INIT_REQUEST_NOINIT;
uint64_t node_id = link->etcp->instance->node_id;
dgram->data[offset++] = (node_id >> 56) & 0xFF;
dgram->data[offset++] = (node_id >> 48) & 0xFF;
dgram->data[offset++] = (node_id >> 40) & 0xFF;
dgram->data[offset++] = (node_id >> 32) & 0xFF;
dgram->data[offset++] = (node_id >> 24) & 0xFF;
dgram->data[offset++] = (node_id >> 16) & 0xFF;
dgram->data[offset++] = (node_id >> 8) & 0xFF;
dgram->data[offset++] = node_id & 0xFF;
dgram->data[offset++] = (link->mtu_local >> 8) & 0xFF;
dgram->data[offset++] = link->mtu_local & 0xFF;
dgram->data[offset++] = (link->keepalive_interval >> 8) & 0xFF;
dgram->data[offset++] = link->keepalive_interval & 0xFF;
dgram->data[offset++] = ((link->recovery_interval/100) >> 8) & 0xFF;
dgram->data[offset++] = (link->recovery_interval/100) & 0xFF;
dgram->data[offset++] = link->local_link_id;
// padding
int s = rand() % (link->handshake_maxsize - link->handshake_minsize) + link->handshake_minsize;
if (s > link->mtu - dgram->noencrypt_len) s = link->mtu - dgram->noencrypt_len;
int to_add=s-offset-UDP_HDR_SIZE - UDP_SC_HDR_SIZE;
if (to_add<0) to_add=0;
for (int i=0; i<to_add; i++) dgram->data[offset++]=rand();// fill pad
// padding end
uint8_t salt[SC_PUBKEY_ENC_SALT_SIZE];
random_bytes(salt, sizeof(salt));
memcpy(dgram->data + offset, salt, SC_PUBKEY_ENC_SALT_SIZE);
offset += SC_PUBKEY_ENC_SALT_SIZE;
uint8_t obfuscated_pubkey[SC_PUBKEY_SIZE];
sc_obfuscate_pubkey(salt, link->etcp->crypto_ctx.peer_public_key,
link->etcp->instance->my_keys.public_key, obfuscated_pubkey);
memcpy(dgram->data + offset, obfuscated_pubkey, SC_PUBKEY_SIZE);
offset += SC_PUBKEY_SIZE;
dgram->data_len = offset;
DEBUG_INFO(DEBUG_CATEGORY_CONNECTION, "Sending INIT request to link, node_id=%016llx, retry=%d", (unsigned long long)node_id, link->init_retry_count);
// Debug: print remote address before sending
if (link->remote_addr.ss_family == AF_INET) {
struct sockaddr_in* sin = (struct sockaddr_in*)&link->remote_addr;
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] INIT sending to %s:%d, link=%p, rst_req=%d", ip_to_str(&sin->sin_addr, AF_INET).str, ntohs(sin->sin_port), link, reset);
}
etcp_encrypt_send(dgram);
u_free(dgram);
link->init_retry_count++;
}
static void etcp_link_init_timer_cbk(void* arg) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
struct ETCP_LINK* link = (struct ETCP_LINK*)arg;
if (!link || !link->etcp || !link->etcp->instance) return;
if ((link->init_retry_count % 10) == 0 && link->init_timeout < INIT_TIMEOUT_MAX) {
link->init_timeout += link->init_timeout/4 +1;
if (link->init_timeout > INIT_TIMEOUT_MAX) link->init_timeout = INIT_TIMEOUT_MAX;
}
link->init_timer = uasync_set_timeout(link->etcp->instance->ua, link->init_timeout, link, etcp_link_init_timer_cbk);
if (link->link_state == 1) etcp_link_send_init(link,1);// init (with etcp reset)
else etcp_link_send_init(link,0);// no etcp reset (reinit)
}
void etcp_link_restart_init_timer(struct ETCP_LINK* link) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (link->init_timer) uasync_cancel_timeout(link->etcp->instance->ua, link->init_timer);
link->init_timeout = INIT_TIMEOUT_INITIAL;
link->init_timer = uasync_set_timeout(link->etcp->instance->ua, link->init_timeout, link, etcp_link_init_timer_cbk);
}
void etcp_link_enter_init(struct ETCP_LINK* link) {//
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!link) return;
link->link_state = 1; // handshake
if (link->is_server != 0) return;
etcp_link_send_init(link,1);// init with reset
etcp_link_restart_init_timer(link);
}
void etcp_link_enter_reinit(struct ETCP_LINK* link) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!link) return;
link->link_state = 2; // reconnect
etcp_on_link_down(link->etcp);
if (link->is_server != 0) return;
etcp_link_send_init(link,0);// init without reset
if (link->keepalive_timer) {// keepalive заменяяется reinit запросами
uasync_cancel_timeout(link->etcp->instance->ua, link->keepalive_timer);
link->keepalive_timer = NULL;
}
etcp_link_restart_init_timer(link);
}
// Send empty keepalive packet (only timestamp, no sections)
static void etcp_link_send_keepalive(struct ETCP_LINK* link) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!link || !link->etcp || !link->etcp->instance) return;
struct ETCP_DGRAM* dgram = u_malloc(sizeof(struct ETCP_DGRAM) + 4);
if (!dgram) {
DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "etcp_link_send_keepalive: malloc failed");
return;
}
dgram->link = link;
dgram->data_len = 0; // Empty packet - only timestamp in header
dgram->noencrypt_len = 0;
dgram->timestamp = get_current_timestamp();
DEBUG_DEBUG(DEBUG_CATEGORY_CONNECTION, "[%s] Sending keepalive on link %p (local_id=%d)",
link->etcp->log_name, link, link->local_link_id);
link->keepalive_sent_count++;
etcp_encrypt_send(dgram);
u_free(dgram);
}
// Check if all links for an ETCP_CONN are down
// Returns 1 if all links are down or no links exist, 0 otherwise
static int etcp_all_links_down(struct ETCP_CONN* etcp) {
if (!etcp || !etcp->links) return 1;
struct ETCP_LINK* l = etcp->links;
while (l) {
if (l->link_status == 1) {
return 0; // At least one link is up
}
l = l->next;
}
return 1; // All links are down
}
static void start_keepalive_timer(struct ETCP_LINK* link) {
// Start keepalive timer
if (link->init_timer) {// cancel init timer
uasync_cancel_timeout(link->etcp->instance->ua, link->init_timer);
link->init_timer = NULL;
}
if (link->keepalive_timer == NULL) {
link->keepalive_timer = uasync_set_timeout(link->etcp->instance->ua, link->keepalive_interval * 10, link, keepalive_timer_cb);
DEBUG_DEBUG(DEBUG_CATEGORY_CONNECTION, "[%s] Keepalive timer started on link %p (interval=%d ms)", link->etcp->log_name, link, link->keepalive_interval);
}
}
// Keepalive timer callback
static void keepalive_timer_cb(void* arg) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
struct ETCP_LINK* link = (struct ETCP_LINK*)arg;
if (!link || !link->etcp || !link->etcp->instance) {
DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "KEEPALIVE NULL !!!!!!!!");
return;
}
link->keepalive_timer = NULL;
// Check if all links are down and start recovery if needed (client only)
if (link->is_server == 0 && etcp_all_links_down(link->etcp)) {
DEBUG_WARN(DEBUG_CATEGORY_CONNECTION, "[%s] All links are down, starting recovery", link->etcp->log_name);
etcp_link_enter_reinit(link);// keepalive timr после reinit не нужен
return;
}
// Skip if link is not initialized
if (!link->initialized) {
DEBUG_DEBUG(DEBUG_CATEGORY_CONNECTION, "[%s] Keepalive skipped - link not initialized",
link->etcp->log_name);
goto restart_timer;
}
// Check keepalive timeout
uint64_t now = get_time_tb();
uint64_t timeout_units = (uint64_t)link->keepalive_timeout * 10; // ms -> 0.1ms units
uint64_t elapsed = now - link->last_recv_local_time;
if (elapsed > timeout_units) {
if (link->recv_keepalive != 0) {
link->recv_keepalive = 0;
link->link_status = 0;
etcp_on_link_down(link->etcp);
DEBUG_INFO(DEBUG_CATEGORY_GENERAL, "Link down: log_name=%s socket=%s link_id=%d status=DOWN", link->etcp->log_name, link->conn?link->conn->name:"unknown", link->local_link_id);
DEBUG_WARN(DEBUG_CATEGORY_CONNECTION, "[%s] Link %p (local_id=%d) recv status changed to DOWN - no packets for %llu ms", link->etcp->log_name, link, link->local_link_id, (unsigned long long)(elapsed/10));
}
}
// Send keepalive only if no packets were sent since last tick
if (!link->pkt_sent_since_keepalive) {
if (link->is_server) {
if (link->recv_keepalive) etcp_link_send_keepalive(link);// сервер прекращает слать keepalive если линк потерян (ждём keepalive клиента)
}
else etcp_link_send_keepalive(link);
}
link->pkt_sent_since_keepalive = 0;
restart_timer:
link->keepalive_timer = uasync_set_timeout(link->etcp->instance->ua, link->keepalive_interval * 10, link, keepalive_timer_cb);
}
static uint32_t sockaddr_hash(struct sockaddr_storage* addr) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
socklen_t addr_len = (addr->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6);
return crc32_calc((void*)addr, addr_len);
}
// Бинарный поиск линка по ip_port_hash
static int find_link_index(struct ETCP_SOCKET* e_sock, uint32_t hash) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!e_sock || e_sock->num_channels == 0) return -1;
int left = 0;
int right = e_sock->num_channels - 1;
while (left <= right) {
int mid = left + (right - left) / 2;
if (e_sock->links[mid]->ip_port_hash == hash) {
return mid;
} else if (e_sock->links[mid]->ip_port_hash < hash) {
left = mid + 1;
} else {
right = mid - 1;
}
}
return -(left + 1);
}
// Реалокация массива линков с увеличением в 2 раза
static int realloc_links(struct ETCP_SOCKET* e_sock) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
size_t new_max = e_sock->max_channels == 0 ? 8 : e_sock->max_channels * 2;
struct ETCP_LINK** new_links = u_realloc(e_sock->links, new_max * sizeof(struct ETCP_LINK*));
if (!new_links) {
DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "realloc_links: realloc failed");
return -1;
}
e_sock->links = new_links;
e_sock->max_channels = new_max;
return 0;
}
// Вставка линка в отсортированный массив
static int insert_link(struct ETCP_SOCKET* e_sock, struct ETCP_LINK* link) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!e_sock || !link) return -1;
if (e_sock->num_channels >= e_sock->max_channels) {
if (realloc_links(e_sock) < 0) return -1;
}
int idx = find_link_index(e_sock, link->ip_port_hash);
if (idx >= 0) return -1;
idx = -(idx + 1);
if (idx < (int)e_sock->num_channels) {
memmove(&e_sock->links[idx + 1], &e_sock->links[idx],
(e_sock->num_channels - idx) * sizeof(struct ETCP_LINK*));
}
e_sock->links[idx] = link;
e_sock->num_channels++;
return 0;
}
// Удаление линка из массива
static void remove_link(struct ETCP_SOCKET* e_sock, uint32_t hash) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!e_sock || e_sock->num_channels == 0) return;
int idx = find_link_index(e_sock, hash);
if (idx < 0) return;
if (idx < (int)e_sock->num_channels - 1) {
memmove(&e_sock->links[idx], &e_sock->links[idx + 1],
(e_sock->num_channels - idx - 1) * sizeof(struct ETCP_LINK*));
}
e_sock->num_channels--;
}
static int sockaddr_equal(const struct sockaddr_storage* a, const struct sockaddr_storage* b) {
if (!a || !b || a->ss_family != b->ss_family) return 0;
if (a->ss_family == AF_INET) {
const struct sockaddr_in *sa = (const struct sockaddr_in*)a;
const struct sockaddr_in *sb = (const struct sockaddr_in*)b;
return (sa->sin_addr.s_addr == sb->sin_addr.s_addr && sa->sin_port == sb->sin_port);
}
return 0; // IPv6 not fully supported yet
}
// надо править, используй sockaddr_hash
struct ETCP_LINK* etcp_link_find_by_addr(struct ETCP_SOCKET* e_sock, struct sockaddr_storage* addr) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!e_sock || !addr) return NULL;
int idx = find_link_index(e_sock, sockaddr_hash(addr));
if (idx < 0) return NULL;
return e_sock->links[idx];
}
struct ETCP_LINK* etcp_link_find_by_remote_id(struct ETCP_CONN* conn, uint8_t remote_link_id) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!conn || remote_link_id == 0) return NULL;
struct ETCP_LINK* l = conn->links;
while (l) {
if (l->remote_link_id == remote_link_id) return l;
l = l->next;
}
return NULL;
}
int etcp_find_free_local_link_id(struct ETCP_CONN* etcp) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!etcp) return -1;
// Битовый массив для 256 id (32 байта * 8 бит = 256)
uint8_t used_ids[32] = {1};// индекс 0 всегда занят
// Помечаем занятые id
struct ETCP_LINK* link = etcp->links;
while (link) {
if (link->local_link_id < 256) {
used_ids[link->local_link_id >> 3] |= (1 << (link->local_link_id & 7));
}
link = link->next;
}
// Ищем первый свободный id
for (int i = 0; i < 32; i++) {
if (used_ids[i] != 0xFF) {
// Есть свободные биты в этом байте
for (int bit = 0; bit < 8; bit++) {
if (!(used_ids[i] & (1 << bit))) {
return (i << 3) + bit;
}
}
}
}
// Все id заняты
return -1;
}
// ===============================
struct ETCP_SOCKET* etcp_socket_add(struct UTUN_INSTANCE* instance, struct CFG_SERVER* server) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!instance || !server) return NULL;
struct sockaddr_storage* ip = &server->ip;
uint32_t netif_index = server->netif_index;
int so_mark = server->so_mark;
int fib = server->fib;
uint8_t type = server->type;
int mtu = server->mtu ? server->mtu : instance->config->global.mtu;
int loss_rate = server->loss_rate;
char* name = server->name;
struct ETCP_SOCKET* e_sock = u_calloc(1, sizeof(struct ETCP_SOCKET));
if (!e_sock) {
DEBUG_ERROR(DEBUG_CATEGORY_MEMORY, "Failed to allocate connection");
return NULL;
}
e_sock->fd = SOCKET_INVALID; // Initialize to invalid socket
if (name && name[0]) {
strncpy(e_sock->name, name, MAX_CONN_NAME_LEN - 1);
e_sock->name[MAX_CONN_NAME_LEN - 1] = '\0';
} else {
e_sock->name[0] = '\0';
}
int family = AF_INET;
if (ip) {
family = ip->ss_family;
if (family != AF_INET && family != AF_INET6) {
DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "Unsupported address family: %d", family);
u_free(e_sock);
return NULL;
}
}
e_sock->fd = socket_create_udp(family);
if (e_sock->fd == SOCKET_INVALID) {
DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "Failed to create socket: %s",
socket_strerror(socket_get_error()));
u_free(e_sock);
return NULL;
}
// Строго не используем reuseaddr, даже в тестах!
socket_set_reuseaddr(e_sock->fd, 0);
// Increase socket buffers for high throughput
socket_set_buffers(e_sock->fd, 4 * 1024 * 1024, 4 * 1024 * 1024);
if (socket_set_nonblocking(e_sock->fd) != 0) {
DEBUG_WARN(DEBUG_CATEGORY_CONNECTION, "Failed to set non-blocking mode");
}
// Set socket mark if specified (Linux only)
if (so_mark > 0) {
socket_set_mark(e_sock->fd, so_mark);
}
// Set FIB for FreeBSD
#ifdef __FreeBSD__
if (fib > 0) {
if (setsockopt(e_sock->fd, SOL_SOCKET, SO_SETFIB, &fib, sizeof(fib)) < 0) {
DEBUG_WARN(DEBUG_CATEGORY_CONNECTION, "Failed to set FIB %d: %s", fib, strerror(errno));
}
}
#endif
// Bind to interface if specified (Linux only)
#ifndef _WIN32
if (netif_index > 0) {
char ifname[IF_NAMESIZE];
if (if_indextoname(netif_index, ifname)) {
socket_bind_to_device(e_sock->fd, ifname);
}
}
#endif
// Store the local address and bind socket if provided
if (ip) {
memcpy(&e_sock->local_addr, ip, sizeof(struct sockaddr_storage));
// CRITICAL: Actually bind the socket to the address
socklen_t addr_len = (ip->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6);
if (bind(e_sock->fd, (struct sockaddr*)ip, addr_len) < 0) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[ETCP] Failed to bind socket to address family %d: %s",
ip->ss_family, socket_strerror(socket_get_error()));
if (ip->ss_family == AF_INET) {
struct sockaddr_in* sin = (struct sockaddr_in*)ip;
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[ETCP] Failed to bind to %s:%d", ip_to_str(&sin->sin_addr, AF_INET).str, ntohs(sin->sin_port));
}
socket_close_wrapper(e_sock->fd);
u_free(e_sock);
return NULL;
}
struct sockaddr_in* sin = (struct sockaddr_in*)ip;
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] Successfully bound socket to local address, family=%d %s:%d", ip->ss_family, ip_to_str(&sin->sin_addr, AF_INET).str, ntohs(sin->sin_port));
DEBUG_INFO(DEBUG_CATEGORY_GENERAL, "Listen socket initialized: name=%s fd=%d addr=%s:%d", e_sock->name, e_sock->fd, ip_to_str(&sin->sin_addr, AF_INET).str, ntohs(sin->sin_port));
}
e_sock->instance = instance;
e_sock->errorcode = 0;
e_sock->pkt_format_errors = 0;
e_sock->type = type;
e_sock->mtu = mtu;
e_sock->loss_rate = loss_rate;
DEBUG_INFO(DEBUG_CATEGORY_BGP, "Add Socket type=%d", type);
e_sock->next = instance->etcp_sockets;
instance->etcp_sockets = e_sock;
e_sock->socket_id = uasync_add_socket_t(instance->ua, e_sock->fd, etcp_connections_read_callback_socket, NULL, NULL, e_sock);
if (!e_sock->socket_id) {
DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "Failed to register socket with uasync");
socket_close_wrapper(e_sock->fd);
u_free(e_sock);
return NULL;
}
DEBUG_INFO(DEBUG_CATEGORY_CONNECTION, "Registered ETCP socket with uasync");
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] Socket %p registered and active", e_sock);
return e_sock;
}
void etcp_socket_remove(struct ETCP_SOCKET* conn) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!conn) return;
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] Removing socket %p, socket_id=%p", conn, conn->socket_id);
// Remove from uasync if registered
if (conn->socket_id) {
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] Removing socket from uasync, instance=%p, ua=%p", conn->instance, conn->instance->ua);
uasync_remove_socket_t(conn->instance->ua, conn->fd);
conn->socket_id = NULL;
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] Unregistered socket from uasync");
}
if (conn->fd != SOCKET_INVALID) {
socket_close_wrapper(conn->fd);
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] Closed socket");
}
size_t i = 0;
while (i < conn->num_channels) {
struct ETCP_LINK* l = conn->links[i];
etcp_link_close(l); // теперь безопасно — num_channels уменьшится, но i не растёт
// i НЕ инкрементируем — сдвиг уже сделал remove_link
}
u_free(conn->links);
u_free(conn);
}
struct ETCP_LINK* etcp_link_new(struct ETCP_CONN* etcp, struct ETCP_SOCKET* conn, struct sockaddr_storage* remote_addr, uint8_t is_server) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!remote_addr) return NULL;
struct ETCP_LINK* link = u_calloc(1, sizeof(struct ETCP_LINK));
if (!link) {
DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "etcp_link_new: calloc failed - out of memory or pool exhausted");
return NULL;
}
link->conn = conn;
link->etcp = etcp;
link->is_server = is_server;
int mtu = conn->mtu;
if (mtu == 0) mtu = 1500;
link->mtu_local = mtu;
link->mtu = mtu;
link->initialized = 0;
link->init_timer = NULL;
link->init_timeout = 0;
link->init_retry_count = 0;
link->link_status = 0; // down initially
link->handshake_minsize = 100;
link->handshake_maxsize = mtu;// 28 = udp header size
// Initialize keepalive timeout from global config
if (etcp->instance && etcp->instance->config) {
link->keepalive_timeout = etcp->instance->config->global.keepalive_timeout;
link->keepalive_interval = etcp->instance->config->global.keepalive_interval;
} else {
link->keepalive_timeout = 2000; // Default 2 seconds
link->keepalive_interval = 200; // Default 0.2 s
}
if (link->keepalive_interval < 10) link->keepalive_interval = 10;
link->keepalive_sent_count = 0;
link->keepalive_recv_count = 0;
link->inflight_lim_bytes = 30000;
// Выделяем свободный local_link_id
int free_id = etcp_find_free_local_link_id(etcp);
if (free_id <= 0) {
DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "etcp_link_new: no free local_link_id available");
u_free(link);
return NULL;
}
link->local_link_id = (uint8_t)free_id;
memcpy(&link->remote_addr, remote_addr, sizeof(struct sockaddr_storage));
link->ip_port_hash = sockaddr_hash(remote_addr);
link->last_recv_local_time = get_time_tb(); // Initialize to prevent immediate timeout
// RTT sliding window initialization
link->rtt_history_index = 0;
link->rtt_history_count = 0;
link->rtt_max_val = 0;
link->rtt_max_idx = 0;
// rtt_history[] is already zeroed by calloc
// Инициализация статистики
link->win_timebase = 50000; // 50 ms в микросекундах
link->win_ptr = 0;
link->window_pkt_transmitted = 0;
link->window_retransmissions = 0;
link->total_retransmissions = 0;
memset(link->stat_win, 0, sizeof(link->stat_win));
start_stats_timer(link);
// insert_link(conn, link);
if (insert_link(conn, link) < 0) {
// откатываем то, что успели
// (пока список ещё не добавлен — просто free)
DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "Can not insert link to socket");
u_free(link);
return NULL;
}
struct ETCP_LINK* l=etcp->links;
while (l && l->next) l=l->next;
if (l) l->next = link; else etcp->links = link;
DEBUG_INFO(DEBUG_CATEGORY_CONNECTION, "NEW link initialized on etcp=[%s] link=%p socket=%s id=%d is_server=%d mtu=%d", etcp->log_name, link, conn->name, link->local_link_id, link->is_server, link->mtu);
if (is_server == 0) {
DEBUG_INFO(DEBUG_CATEGORY_CONNECTION, "etcp_link_new: client link, calling etcp_link_send_init");
etcp_link_enter_init(link);
}
return link;
}
void etcp_link_close(struct ETCP_LINK* link) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!link) return;
if (!link->conn) {
// только удаляем из списка и free
struct ETCP_LINK **pp = &link->etcp->links;
while (*pp && *pp != link) pp = &(*pp)->next;
if (*pp) *pp = link->next;
u_free(link);
return;
}
if (link->stats_timer) {
uasync_cancel_timeout(link->etcp->instance->ua, link->stats_timer);
link->stats_timer = NULL;
}
// Cancel init timer if active
if (link->init_timer) {
uasync_cancel_timeout(link->etcp->instance->ua, link->init_timer);
link->init_timer = NULL;
}
// Cancel shaper timer if active
if (link->shaper_timer) {
uasync_cancel_timeout(link->etcp->instance->ua, link->shaper_timer);
link->shaper_timer = NULL;
}
// Cancel keepalive timer if active
if (link->keepalive_timer) {
uasync_cancel_timeout(link->etcp->instance->ua, link->keepalive_timer);
link->keepalive_timer = NULL;
}
// универсальное удаление из односвязного списка
struct ETCP_LINK **pp = &link->etcp->links;
while (*pp) {
if (*pp == link) {
*pp = link->next;
break;
}
pp = &(*pp)->next;
}
remove_link(link->conn, link->ip_port_hash);
u_free(link);
}
void start_stats_timer(struct ETCP_LINK* link) {
if (!link || !link->etcp || !link->etcp->instance) return;
if (link->stats_timer) {
uasync_cancel_timeout(link->etcp->instance->ua, link->stats_timer);
}
uint32_t tb = link->win_timebase / 100; // us → 0.1 ms units
if (tb < 50) tb = 50; // минимум 5 ms
if (tb > 5000) tb = 5000; // max 500 ms
link->stats_timer = uasync_set_timeout(
link->etcp->instance->ua,
tb,
link,
link_stats_timer_cb
);
}
// === Новый callback таймера ===
static void link_stats_timer_cb(void* arg) {
struct ETCP_LINK* link = (struct ETCP_LINK*)arg;
if (!link || !link->etcp || !link->etcp->instance) return;
// 1. Сохраняем снимок текущего окна
link->stat_win[link->win_ptr].rtt = link->rtt_avg10;
link->stat_win[link->win_ptr].pkt_loss = (uint16_t)link->window_retransmissions;
link->stat_win[link->win_ptr].pkt_transmitted = link->window_pkt_transmitted;
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] stats window updated (win_timebase=%u us, rtt=%u, retrans=%u, transmitted=%u)",
// link->etcp->log_name, link->win_timebase, link->rtt_avg10, link->window_retransmissions, link->window_pkt_transmitted);
// 2. Переходим к следующему слоту
link->win_ptr = (link->win_ptr + 1) % 32;
// 3. Обнуляем накопители для нового окна
link->window_pkt_transmitted = 0;
link->window_retransmissions = 0;
// 4. Плавная подстройка win_timebase под rtt/2 (в микросекундах)
uint32_t target_us = (uint32_t)link->rtt_avg10 * 50ULL; // rtt_avg10 (0.1 ms) → rtt/2 в us
if (target_us < 10000) target_us = 10000; // минимум 10 ms
if (target_us > 500000) target_us = 500000; // максимум 0.5 s
link->win_timebase = (link->win_timebase * 7 + target_us) / 8;
// 5. Перезапускаем таймер с новым интервалом
start_stats_timer(link);
}
int etcp_encrypt_send(struct ETCP_DGRAM* dgram) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!dgram || !dgram->link) return -1;
DEBUG_DEBUG(DEBUG_CATEGORY_CONNECTION, "[%s] Send rk=%d lk=%d up=%d",
dgram->link->etcp->log_name, dgram->link->recv_keepalive, dgram->link->remote_keepalive, dgram->link->link_status);
// Mark that packet was sent (for keepalive logic)
dgram->link->pkt_sent_since_keepalive = 1;
dgram->flag_up=dgram->link->recv_keepalive;
// 28 байт = udp headers. MTU=UDP payload+28 (1472 bytes max)
int errcode=0;
sc_context_t* sc = &dgram->link->etcp->crypto_ctx;
int len=dgram->data_len-dgram->noencrypt_len;// не забываем добавить timestamp (2 bytes)
if (len<0 || len>1472) { dgram->link->send_errors++; errcode=1; goto es_err; }
uint8_t enc_buf[1600];
size_t enc_buf_len=0;
dgram->timestamp=get_current_timestamp();
dgram->link->total_encrypted += dgram->data_len;
// DUMP: Show packet before encryption
if (debug_should_output(DEBUG_LEVEL_DEBUG, DEBUG_CATEGORY_CRYPTO)) log_dump("ECTP_ENCRYPT_SEND", dgram->data, dgram->data_len);
// DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Encrypt start");
sc_encrypt(sc, (uint8_t*)&dgram->timestamp/*не править это, тут верно!*/, 3 + len, enc_buf, &enc_buf_len);
// DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Encrypt end");
if (enc_buf_len == 0) {
DEBUG_ERROR(DEBUG_CATEGORY_CRYPTO, "etcp_encrypt_send: encryption failed for node %016llx", (unsigned long long)dgram->link->etcp->instance->node_id);
dgram->link->send_errors++;
errcode=2;
goto es_err;
}
if (enc_buf_len + dgram->noencrypt_len > 1472) { dgram->link->send_errors++;
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "packet too long len=%d ne_len=%d", enc_buf_len, dgram->noencrypt_len);
errcode=3; goto es_err; }
memcpy(enc_buf+enc_buf_len, dgram->data+len, dgram->noencrypt_len);
// DUMP: Show complete packet before sending
if (debug_should_output(DEBUG_LEVEL_DEBUG, DEBUG_CATEGORY_CRYPTO)) log_dump("ENCRYPTED, READY TO SEND", enc_buf, enc_buf_len + dgram->noencrypt_len);
struct sockaddr_storage* addr=&dgram->link->remote_addr;
socklen_t addr_len = (addr->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6);
// Debug: print where we're sending the packet
if (addr->ss_family == AF_INET) {
struct sockaddr_in* sin = (struct sockaddr_in*)addr;
// inet_ntop removed - use ip_to_str
}
ssize_t sent=enc_buf_len + dgram->noencrypt_len;
int loss_rate = dgram->link->conn->loss_rate;
int rnd = rand() % 100;
if (loss_rate == 0 || rnd >= loss_rate) {
sent = socket_sendto(dgram->link->conn->fd, enc_buf, enc_buf_len + dgram->noencrypt_len,
(struct sockaddr*)addr, addr_len);
} else {
DEBUG_WARN(DEBUG_CATEGORY_ETCP, "[%s] Packet dropped by loss_rate (rnd=%d, loss_rate=%d%%)",
dgram->link->etcp->log_name, rnd, loss_rate);
}
if (sent < 0) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "sendto failed, sock_err=%d", socket_get_error());
dgram->link->send_errors++; errcode=4; goto es_err;
} else {
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "sendto succeeded, sent=%zd bytes to port %d", sent, ntohs(((struct sockaddr_in*)addr)->sin_port));
}
return (int)sent;
es_err:
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[ETCP] encrypt_send error %d", errcode);
return -1;
}
static void etcp_connections_read_callback_socket(socket_t sock, void* arg) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_connections_read_callback fd=%d, socket=%p", fd, arg);
// !!!!!! DANGER: в этой функции ПРЕДЕЛЬНАЯ АККУРАТНОСТЬ. Если кажется что не туда указатель то невнимательно аланизировал !!!!!
// НЕ РУИНИТЬ (uint8_t*)&pkt->timestamp - это правильно !!!!
//
// Ошибки функции (errorcode):
// 1 - пакет слишком маленький для init (< SC_PUBKEY_SIZE)
// 2 - не удалось установить peer public key при init
// 3 - не удалось расшифровать init пакет
// 4 - не init пакет (неверный код)
// 5 - коллизия peer ID и ключей
// 6 - не удалось расшифровать обычный пакет
// 7 - слишком короткий пакет
// 13 - переполнение при парсинге пакета
// 46 - расшифрованный пакет слишком маленький (< 3 байта)
// 55 - не удалось создать подключение
// 66 - не удалось создать линк
struct ETCP_SOCKET* e_sock = (struct ETCP_SOCKET*)arg;
if (!e_sock) return;
struct sockaddr_storage addr;
uint8_t data[PACKET_DATA_SIZE];
socklen_t addr_len=sizeof(addr);
memset(&addr, 0, sizeof(addr));
ssize_t recv_len = socket_recvfrom(sock, data, PACKET_DATA_SIZE, (struct sockaddr*)&addr, &addr_len);
if (recv_len <= 0) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_connections_read_callback: recvfrom failed, error=%zd, sock_err=%d", recv_len, socket_get_error());
return;
}
// DUMP: Show received packet content
if (debug_should_output(DEBUG_LEVEL_DEBUG, DEBUG_CATEGORY_CRYPTO)) log_dump("RECV in:", data, recv_len); // link unknown at this point
struct ETCP_DGRAM* pkt = memory_pool_alloc(e_sock->instance->pkt_pool);
if (!pkt) return;
size_t pkt_len=0;
int errorcode=0;
struct ETCP_LINK* link=etcp_link_find_by_addr(e_sock, &addr);
// Try normal decryption first if we have an established link with session keys
// This is the common case for data packets and responses
// if (link) {
// link->recv_keepalive = 1; // Link is up after successful initialization - не ставим ап от неизвестных пакетов
// }
if (link!=NULL && link->etcp!=NULL && link->etcp->crypto_ctx.session_ready) {
// DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Decrypt start (normal)");
if (!sc_decrypt(&link->etcp->crypto_ctx, data, recv_len, (uint8_t*)&pkt->timestamp, &pkt_len)) {
// Normal decryption succeeded - process packet normally
goto process_decrypted;
}
// Normal decryption failed - might be INIT packet, fall through to INIT handling
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Normal decryption failed, trying INIT decryption");
}
// Try INIT decryption (for incoming connection requests)
// This handles: no link found, or link without session, or normal decrypt failed
if (recv_len <= SC_PUBKEY_ENC_SIZE + UDP_SC_HDR_SIZE) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_connections_read_callback: packet too small for init, size=%zd", recv_len);
errorcode=1;
goto ec_fr;
}
struct secure_channel sc;
sc_init_ctx(&sc, &e_sock->instance->my_keys);
const uint8_t* salt = data + recv_len - SC_PUBKEY_ENC_SIZE;
const uint8_t* encrypted_pubkey = salt + SC_PUBKEY_ENC_SALT_SIZE;
uint8_t decrypted_pubkey[SC_PUBKEY_SIZE];
sc_obfuscate_pubkey(salt, e_sock->instance->my_keys.public_key, encrypted_pubkey, decrypted_pubkey);
if (sc_set_peer_public_key(&sc, decrypted_pubkey, SC_PEER_PUBKEY_BIN)!=SC_OK) {
DEBUG_ERROR(DEBUG_CATEGORY_CRYPTO, "etcp_connections_read_callback: failed to set peer public key during init");
errorcode=2;
goto ec_fr;
}
if (sc_decrypt(&sc, data, recv_len - SC_PUBKEY_ENC_SIZE, (uint8_t*)&pkt->timestamp, &pkt_len)) {
DEBUG_ERROR(DEBUG_CATEGORY_CRYPTO, "etcp_connections_read_callback: failed to decrypt init packet");
errorcode=3;
goto ec_fr;
}
// INIT decryption succeeded - process as new incoming connection
if (pkt_len<3) {
DEBUG_ERROR(DEBUG_CATEGORY_CRYPTO, "etcp_connections_read_callback: too short packet");
errorcode=7;
goto ec_fr;
}
pkt->data_len=pkt_len-3;
pkt->noencrypt_len=0;
struct {
uint8_t code;
uint8_t id[8];
uint8_t mtu[2];
uint8_t keepalive[2];
uint8_t recovery[2];
uint8_t link_id;
uint8_t pubkey[SC_PUBKEY_SIZE];
} *ack_hdr=(void*)&pkt->data[0];
uint64_t peer_id = be64toh(*(uint64_t*)ack_hdr->id);
if (ack_hdr->code!=ETCP_INIT_REQUEST && ack_hdr->code!=ETCP_INIT_REQUEST_NOINIT) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_connections_read_callback: not an init packet, code=%02x", ack_hdr->code);
errorcode=4;
goto ec_fr;
}// не init
struct ETCP_CONN* conn=e_sock->instance->connections;
while (conn) {// ищем есть ли подключение к этому пиру
if (conn->peer_node_id==peer_id) break;
conn=conn->next;
}
int new_conn=0;
if (!conn || conn->peer_node_id!=peer_id) {// создаём новое подключение [new etcp]
new_conn=1;
conn=etcp_connection_create(e_sock->instance,"");
if (!conn) { errorcode=55; DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "etcp_connections_read_callback: failed to create connection"); goto ec_fr; }
memcpy(&conn->crypto_ctx, &sc, sizeof(sc));
conn->peer_node_id=peer_id;
etcp_update_log_name(conn);
DEBUG_INFO(DEBUG_CATEGORY_GENERAL, "New connection received on socket %s: log_name=%s peer_id=%lu", e_sock->name, conn->log_name, (unsigned long)peer_id);
DEBUG_DEBUG(DEBUG_CATEGORY_CONNECTION, "New connection from %s peer_id=%ld etcp=%p", ip_to_str(&((struct sockaddr_in *)&addr)->sin_addr.s_addr, addr.ss_family).str, peer_id, conn);
conn->next = e_sock->instance->connections;
e_sock->instance->connections = conn;
e_sock->instance->connections_count++;
DEBUG_INFO(DEBUG_CATEGORY_CONNECTION, "Added incoming connection %p to instance, total count: %d", conn, e_sock->instance->connections_count);
}
else {// check keys если существующее подключение
if (memcmp(conn->crypto_ctx.peer_public_key, sc.peer_public_key, SC_PUBKEY_SIZE)) { errorcode=5; DEBUG_ERROR(DEBUG_CATEGORY_CRYPTO, "etcp_connections_read_callback: peer key mismatch for node %016llx", (unsigned long long)peer_id); goto ec_fr; }// коллизия - peer id совпал а ключи разные.
}
// Check if link already exists (for CHANNEL_INIT recovery)
struct ETCP_LINK* existing_link = etcp_link_find_by_remote_id(conn, ack_hdr->link_id);
uint8_t send_reset = 0;
if (existing_link && existing_link->etcp == conn) {// существующий линк
DEBUG_DEBUG(DEBUG_CATEGORY_CONNECTION, "[%s] found existing link for id=%d, socket=[%s]", conn->log_name, ack_hdr->link_id, e_sock->name);
link = existing_link;
if (!sockaddr_equal(&link->remote_addr, &addr)) {
DEBUG_WARN(DEBUG_CATEGORY_CONNECTION, "[%s] IP:port changed for remote_link_id=%d socket:[%s]", conn->log_name, ack_hdr->link_id, e_sock->name);
if (link->conn) remove_link(link->conn, link->ip_port_hash); // remove old connection from old socket
link->conn=e_sock;
memcpy(&link->remote_addr, &addr, sizeof(addr));
link->ip_port_hash = sockaddr_hash(&addr);
if (insert_link(link->conn, link) < 0) {
DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "Failed to reinsert link after addr change");
goto ec_fr;
}
}
// Link exists - reuse it for recovery
link->remote_link_id = ack_hdr->link_id;
// For CHANNEL_INIT (0x04): if link already initialized - no reset, otherwise reset
// For INIT_REQUEST (0x02): always reset
if (ack_hdr->code == ETCP_INIT_REQUEST_NOINIT && conn->initialized) {
send_reset = 0; // Link is up, respond without reset
} else {
send_reset = 1; // INIT_REQUEST (0x02) or uninitialized link - send reset
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "do reinit");
etcp_conn_reinit(conn);
}
// Cancel existing timers
if (link->init_timer) {
uasync_cancel_timeout(link->etcp->instance->ua, link->init_timer);
link->init_timer = NULL;
}
} else {
DEBUG_DEBUG(DEBUG_CATEGORY_CONNECTION, "[%s] NO existing link for id=%d, socket=[%s]", conn->log_name, ack_hdr->link_id, e_sock->name);
// Create new link
link = etcp_link_new(conn, e_sock, &addr, 1);
if (!link) { if (new_conn) etcp_connection_close(conn); errorcode=66; DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "etcp_connections_read_callback: failed to create link for connection"); goto ec_fr; }// облом
link->remote_link_id = ack_hdr->link_id;
// For new links: INIT_REQUEST (0x02) causes reset, CHANNEL_INIT (0x04) does not
if (ack_hdr->code == ETCP_INIT_REQUEST || new_conn) {
send_reset = 1; // INIT_REQUEST (0x02) or uninitialized link - send reset
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "do reinit 2");
etcp_conn_reinit(conn);
}
link->keepalive_interval=(ack_hdr->keepalive[0]<<8) | ack_hdr->keepalive[1];
link->recovery_interval=((ack_hdr->recovery[0]<<8) | ack_hdr->recovery[1])*100;// timebase в link, timebase/100 в кодограмме
if (link->keepalive_interval < 10) link->keepalive_interval = 10;
DEBUG_DEBUG(DEBUG_CATEGORY_CONNECTION, "set keepalive for link=%d", link->keepalive_interval);
}
link->mtu_remote = (ack_hdr->mtu[0] << 8) | ack_hdr->mtu[1];
link->mtu = link->mtu_local < link->mtu_remote ? link->mtu_local : link->mtu_remote;
struct {
uint8_t code;
uint8_t id[8];
uint8_t mtu[2];
uint8_t link_id;
uint8_t peer_ipv4[4];
uint8_t peer_port[2];
} *ack_repl_hdr=(void*)&pkt->data[0];
// Set response code: 0x03 (with reset) or 0x05 (without reset)
if (send_reset != 0 || new_conn != 0 || ack_hdr->code == ETCP_INIT_REQUEST) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "send init_response with reset");
ack_repl_hdr->code = ETCP_INIT_RESPONSE; // 0x03 - with reset
} else {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "send init_response without reset");
ack_repl_hdr->code = ETCP_INIT_RESPONSE_NOINIT; // 0x05 - without reset
}
*(uint64_t*)ack_repl_hdr->id = htobe64(e_sock->instance->node_id);
ack_repl_hdr->mtu[0]=link->mtu_local>>8;
ack_repl_hdr->mtu[1]=link->mtu_local;
ack_repl_hdr->link_id = link->local_link_id;
// Add client's IP:port (so client behind NAT can know its external address)
if (addr.ss_family == AF_INET) {
struct sockaddr_in *sin = (struct sockaddr_in*)&addr;
memcpy(ack_repl_hdr->peer_ipv4, &sin->sin_addr.s_addr, 4);
uint16_t port = ntohs(sin->sin_port);
ack_repl_hdr->peer_port[0] = port >> 8;
ack_repl_hdr->peer_port[1] = port & 0xFF;
} else {
// For IPv6, set to 0 (not supported for NAT traversal)
memset(ack_repl_hdr->peer_ipv4, 0, 4);
memset(ack_repl_hdr->peer_port, 0, 2);
}
pkt->noencrypt_len=0;
pkt->link=link;
link->recv_keepalive = 1;
int xoffset=sizeof(*ack_repl_hdr);
// padding
int s = rand() % (link->handshake_maxsize - link->handshake_minsize) + link->handshake_minsize;
if (s > link->mtu) s = link->mtu;
int to_add=s - xoffset - UDP_HDR_SIZE - UDP_SC_HDR_SIZE;
if (to_add<0) to_add=0;
for (int i=0; i<to_add; i++) pkt->data[xoffset++]=rand();// fill pad
// padding end
pkt->data_len=xoffset;
DEBUG_DEBUG(DEBUG_CATEGORY_CONNECTION, "Sending INIT RESPONSE, link=%p, local_link_id=%d, remote_link_id=%d", link, link->local_link_id, link->remote_link_id);
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP DEBUG] Send INIT RESPONSE");
etcp_encrypt_send(pkt);
memory_pool_free(e_sock->instance->pkt_pool, pkt);
link->initialized = 1;
link->link_state = 3;
if (link->etcp->initialized == 0) {
etcp_conn_ready(link->etcp);
DEBUG_INFO(DEBUG_CATEGORY_GENERAL, "Connection established: log_name=%s socket=%s link_id=%d status=UP", link->etcp->log_name, e_sock->name, link->local_link_id);
}
DEBUG_INFO(DEBUG_CATEGORY_CONNECTION, "[%s] Link %p (local_id=%d) initialized and marked as UP (server)", link->etcp->log_name, link, link->local_link_id);
start_keepalive_timer(link);
loadbalancer_link_ready(link);
return;
process_decrypted:
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "Decrypt ok - normal pkt");
if (pkt_len<3) { errorcode=46; DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_connections_read_callback: decrypted packet too small, size=%zu", pkt_len); goto ec_fr; }
pkt->data_len=pkt_len-3;
pkt->noencrypt_len=0;
pkt->link=link;
link->remote_keepalive=pkt->flag_up;
if ( link->remote_keepalive && !link->link_status && link->initialized) loadbalancer_link_ready(link);// up выставляем только если и remote=up и local=up
link->link_status=link->remote_keepalive;
link->last_recv_local_time=get_time_tb();
link->last_recv_timestamp=pkt->timestamp;
link->last_recv_updated=1;
// Mark link as up when receiving packets
if (link->recv_keepalive != 1) {
link->recv_keepalive = 1;
// loadbalancer_link_ready(link);
DEBUG_INFO(DEBUG_CATEGORY_CONNECTION, "[%s] Link %p (local_id=%d) status changed to UP - packet received",
link->etcp->log_name, link, link->local_link_id);
}
// Count decrypted bytes
link->total_decrypted += pkt->data_len;
// Count received keepalive packets (empty packets with no payload)
if (pkt->data_len == 0) {
link->keepalive_recv_count++;
}
size_t offset = 0;
uint8_t code = pkt->data[offset++];
if (code == ETCP_INIT_RESPONSE || code == ETCP_INIT_RESPONSE_NOINIT) {
// Parse response
// ETCP_INIT_RESPONSE (0x03) - reset entire ETCP_CONN
// ETCP_INIT_RESPONSE_NOINIT (0x05) - no reset
uint64_t server_node_id = 0;
for (int i = 0; i < 8; i++) {
server_node_id = (server_node_id << 8) | pkt->data[offset++];
}
link->mtu_remote = (pkt->data[offset++] << 8) | pkt->data[offset++];
link->mtu = link->mtu_local < link->mtu_remote ? link->mtu_local : link->mtu_remote;
link->remote_link_id = pkt->data[offset++];
// Parse NAT IP:port from response (new format includes 4+2 bytes)
if (pkt_len >= 18) {
uint32_t new_nat_ip = (pkt->data[offset] << 24) | (pkt->data[offset+1] << 16) |
(pkt->data[offset+2] << 8) | pkt->data[offset+3];
offset += 4;
uint16_t new_nat_port = (pkt->data[offset] << 8) | pkt->data[offset+1];
offset += 2;
// Check if NAT address changed
if (link->nat_ip == 0 && link->nat_port == 0) {
// First time receiving NAT info
link->nat_ip = new_nat_ip;
link->nat_port = new_nat_port;
struct in_addr addr;
addr.s_addr = htonl(new_nat_ip);
DEBUG_INFO(DEBUG_CATEGORY_CONNECTION, "[%s] NAT address initialized: %s:%u",
link->etcp->log_name, ip_to_str(&addr, AF_INET).str, new_nat_port);
} else if (link->nat_ip != new_nat_ip || link->nat_port != new_nat_port) {
// NAT address changed
struct in_addr old_addr, new_addr;
old_addr.s_addr = htonl(link->nat_ip);
new_addr.s_addr = htonl(new_nat_ip);
link->nat_ip = new_nat_ip;
link->nat_port = new_nat_port;
link->nat_changes_count++;
DEBUG_INFO(DEBUG_CATEGORY_CONNECTION, "[%s] NAT address changed: %s:%u -> %s:%u (change #%u)",
link->etcp->log_name, ip_to_str(&old_addr, AF_INET).str, link->nat_port, ip_to_str(&new_addr, AF_INET).str, new_nat_port,
link->nat_changes_count);
}
} else {
// Legacy format without NAT info
DEBUG_DEBUG(DEBUG_CATEGORY_CONNECTION, "[%s] Received legacy INIT_RESPONSE without NAT info",
link->etcp->log_name);
}
if (offset > pkt_len) { errorcode=13; DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_connections_read_callback: packet parsing overflow, offset=%zu, pkt_len=%zu", offset, pkt_len); goto ec_fr; }
// DEBUG_INFO(DEBUG_CATEGORY_CONNECTION, "Received INIT_RESPONSE from server_node_id=%llu, mtu=%d", (unsigned long long)server_node_id, link->mtu);
link->etcp->peer_node_id = server_node_id; // If not set
etcp_update_log_name(link->etcp); // Update log_name with peer_node_id
link->initialized = 1;// получен init response (client)
link->link_state = 3; // connected
if (code == ETCP_INIT_RESPONSE) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "do reinit 3 %p", link->etcp);
etcp_conn_reinit(link->etcp);
}
if (link->etcp->initialized == 0) {
etcp_conn_ready(link->etcp);
}
loadbalancer_link_ready(link);
DEBUG_INFO(DEBUG_CATEGORY_CONNECTION, "[%s] Link %p (local_id=%d) initialized and marked as UP (client): rk=%d lk=%d up=%d ki=%d",
link->etcp->log_name, link, link->local_link_id, link->recv_keepalive, link->remote_keepalive, link->link_status, link->keepalive_interval);
// Start keepalive timer
etcp_link_send_keepalive(link);
start_keepalive_timer(link);
loadbalancer_link_ready(link);
DEBUG_INFO(DEBUG_CATEGORY_CONNECTION, "etcp client: Link initialized successfully! Server node_id=%016llx, mtu=%d, local_link_id=%d, remote_link_id=%d", (unsigned long long)server_node_id, link->mtu, link->local_link_id, link->remote_link_id);
memory_pool_free(e_sock->instance->pkt_pool, pkt);
return; // INIT_RESPONSE is handled, no further processing needed
}
if (link->link_state == 2) {// из recovery получен нормальный пакет - восстанавливаем линк в нормальный режим
start_keepalive_timer(link);
etcp_link_send_keepalive(link); // Start keepalive timer
link->link_state = 3; // connected
}
// log_dump("RECV decrypted:", pkt->data, pkt->data_len, link);
if (link->link_state == 3) etcp_conn_input(pkt);
else memory_pool_free(e_sock->instance->pkt_pool, pkt);
return;
ec_fr:
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_connections_read_callback: error %d", errorcode);
e_sock->pkt_format_errors++;
e_sock->errorcode=errorcode;
memory_pool_free(e_sock->instance->pkt_pool, pkt);
return;
}
int init_connections(struct UTUN_INSTANCE* instance) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!instance || !instance->config) return -1;
if (instance->etcp_sockets) {
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Connections already initialized, skipping");
return 0;
}
struct utun_config* config = instance->config;
// Initialize servers first - create sockets for incoming connections
struct CFG_SERVER* server = config->servers;
while (server) {
// Create socket for this server
// Auto-detect local IP for public servers with 0.0.0.0
uint32_t default_ip = 0;
if (server->type == CFG_SERVER_TYPE_PUBLIC) {
struct sockaddr_in* sin = (struct sockaddr_in*)&server->ip;
if (sin->sin_addr.s_addr == 0) {
default_ip = get_default_route_ip();
if (default_ip == 0) {
DEBUG_WARN(DEBUG_CATEGORY_ETCP, "Failed to detect default route IP for server %s", server->name);
}
}
}
struct ETCP_SOCKET* e_sock = etcp_socket_add(instance, server);
if (e_sock && default_ip != 0) {
struct in_addr addr;
addr.s_addr = default_ip;
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Server %s type %d ip=%s", server->name, server->type, ip_to_str(&addr, AF_INET).str);
e_sock->local_defaultroute_ip = default_ip;
}
if (!e_sock) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "Failed to create socket for server %s", server->name);
server = server->next;
continue;
}
// Convert IP to string for logging
char addr_str[INET6_ADDRSTRLEN + 10];
if (server->ip.ss_family == AF_INET) {
struct sockaddr_in* sin = (struct sockaddr_in*)&server->ip;
snprintf(addr_str, sizeof(addr_str), "%s:%d", ip_to_str(&sin->sin_addr, AF_INET).str, ntohs(sin->sin_port));
} else {
struct sockaddr_in6* sin6 = (struct sockaddr_in6*)&server->ip;
snprintf(addr_str, sizeof(addr_str), "%s:%d", ip_to_str(&sin6->sin6_addr, AF_INET6).str, ntohs(sin6->sin6_port));
}
DEBUG_INFO(DEBUG_CATEGORY_CONNECTION, "Initialized server %s on %s (links: %zu)",
server->name, addr_str, e_sock->num_channels);
server = server->next;
}
// Initialize clients - create outgoing connections
struct CFG_CLIENT* client = config->clients;
DEBUG_DEBUG(DEBUG_CATEGORY_CONNECTION, "init_connections called, instance=%p, config=%p, clients=%p, connections_count=%d",
instance, config, config ? config->clients : NULL, instance ? instance->connections_count : -1);
while (client) {
// Check if client has required configuration
DEBUG_INFO(DEBUG_CATEGORY_CONNECTION, "Client %s - keepalive=%d, links=%p, peer_key_len=%zu",
client->name, client->keepalive, client->links,
strlen(client->peer_public_key_hex));
// Create ETCP connection for this client
struct ETCP_CONN* etcp_conn = etcp_connection_create(instance, client->name);
if (!etcp_conn) {
DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "Failed to create ETCP connection for client %s", client->name);
client = client->next;
continue;
}
// Initialize crypto context for this connection
if (sc_init_ctx(&etcp_conn->crypto_ctx, &instance->my_keys) != SC_OK) {
DEBUG_ERROR(DEBUG_CATEGORY_CRYPTO, "init_connections: failed to initialize crypto context for client %s", client->name);
etcp_connection_close(etcp_conn);
client = client->next;
continue;
}
// If client has peer public key configured, set it
if (strlen(client->peer_public_key_hex) > 0) {
// For now, set peer node ID to indicate we have peer key
// The actual peer key will be exchanged during connection establishment
etcp_conn->peer_node_id = 1; // Simple indicator
etcp_update_log_name(etcp_conn); // Update log_name with peer_node_id
DEBUG_INFO(DEBUG_CATEGORY_CRYPTO, "init_connections: setting peer public key for client %s", client->name);
// Set peer public key (assuming hex format)
if (sc_set_peer_public_key(&etcp_conn->crypto_ctx, client->peer_public_key_hex, 1) != SC_OK) {
DEBUG_ERROR(DEBUG_CATEGORY_CRYPTO, "init_connections: failed to set peer public key for client %s", client->name);
} else {
DEBUG_INFO(DEBUG_CATEGORY_CRYPTO, "init_connections: successfully set peer public key for client %s", client->name);
}
} else {
DEBUG_WARN(DEBUG_CATEGORY_CONFIG, "init_connections: no peer public key configured for client %s", client->name);
}
etcp_conn->routing_exchange_active=1;// инициируем обмен маршрутами
// Create links for this client
struct CFG_CLIENT_LINK* client_link = client->links;
while (client_link) {
// Find the local server for this link
struct CFG_SERVER* local_server = client_link->local_srv;
if (!local_server) {
client_link = client_link->next;
continue;
}
// Find the socket for this server
struct ETCP_SOCKET* e_sock = NULL;
struct ETCP_SOCKET* sock = instance->etcp_sockets;
while (sock) {
if (sock->local_addr.ss_family == local_server->ip.ss_family) {
if (sock->local_addr.ss_family == AF_INET) {
struct sockaddr_in* sock_addr = (struct sockaddr_in*)&sock->local_addr;
struct sockaddr_in* srv_addr = (struct sockaddr_in*)&local_server->ip;
if (sock_addr->sin_addr.s_addr == srv_addr->sin_addr.s_addr &&
sock_addr->sin_port == srv_addr->sin_port) {
e_sock = sock;
break;
}
}
}
sock = sock->next;
}
if (!e_sock) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "No socket found for client %s link", client->name);
client_link = client_link->next;
continue;
}
// Create link for this client connection
struct ETCP_LINK* link = etcp_link_new(etcp_conn, e_sock, &client_link->remote_addr, 0); // 0 = client initiates
if (!link) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "Failed to create link for client %s", client->name);
client_link = client_link->next;
continue;
}
DEBUG_INFO(DEBUG_CATEGORY_CONNECTION, "Created link %p for client %s, socket=%p",
link, client->name, e_sock);
client_link = client_link->next;
}
etcp_conn->next = instance->connections;
instance->connections = etcp_conn;
instance->connections_count++;
DEBUG_INFO(DEBUG_CATEGORY_CONNECTION, "Added connection %p to instance, total count: %d", etcp_conn, instance->connections_count);
client = client->next;
}
// If there are clients configured but no connections created, that's an error
// If there are no clients (server-only mode), 0 connections is OK (server will accept incoming)
if (instance->connections_count == 0 && config->clients != NULL) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "Clients configured but no connections initialized");
return -1;
}
DEBUG_INFO(DEBUG_CATEGORY_CONNECTION, "Initialized %d connections", instance->connections_count);
return 0;
}