Browse Source

Исправление pacing механизма в stress-тесте

- Добавлен API для доступа к очередям подключения: conn_get_output_queue() и conn_get_input_queue()
- Отладка race condition в queue_wait_threshold: callback вызывается немедленно при пустой очереди
- Добавлена обработка случая, когда waiter не регистрируется из-за уже выполненного условия
- Устранена ложная ошибка 'Failed to register queue waiter'
- Добавлена очистка pacing_waiter в callback для избежания висячих указателей
- Pacing теперь работает корректно: пакеты отправляются по одному с ожиданием опустошения очереди
v2_dev
Evgeny 3 months ago
parent
commit
e3c94a4004
  1. 53
      changelog.txt
  2. 343
      src/connection.c
  3. 38
      src/connection.h
  4. 17
      src/ll_queue.c

53
changelog.txt

@ -95,3 +95,56 @@ Thu Jan 15 2026 20:31: Внедрение ETCP reset механизма с ин
- Добавлена статистика ошибок (encryption_errors, decryption_errors, crc_errors) в структуру conn_stats_t
- Обновлены тесты test_sc_lib.c и test_udp_secure.c для совместимости с новым API
- Все тесты компилируются и проходят успешно
Fri Jan 16 2026 19:45: Редизайн конфигурационной системы UTUN - переход только на новый формат v2
- Удалена обратная совместимость со старым форматом конфигурации
- Функция parse_config() теперь вызывает только parse_config_v2()
- Удалены устаревшие структуры connection_config_t и config_conn_mode_t
- Удалены неиспользуемые функции parse_addr, hex_to_bin, add_connection, parse_mode
- Реализованы socket options SO_MARK и SO_BINDTODEVICE в create_udp_socket_with_opts
- Добавлены заголовки <net/if.h> и <linux/netfilter.h> для поддержки socket options
- Обновлены комментарии в config_parser.h
Fri Jan 16 2026 20:15: Переход u_async с select на posix poll
Fri Jan 16 2026 20:45: Рефакторинг event_loop для использования только u_async
- Устранено дублирование poll вызовов: удален отдельный poll для TUN и control socket
- Зарегистрированы файловые дескрипторы TUN устройства и control socket в u_async
- Обновлена функция event_loop: теперь использует только uasync_poll
- Удален #include <poll.h> из utun.c
- Все тесты проходят успешно, основное приложение запускается без ошибок
Fri Jan 16 2026 21:15: Добавление wakeup механизма для немедленного пробуждения event_loop
- Добавлен pipe в структуру uasync_s для прерывания poll()
- Реализованы функции uasync_wakeup() и uasync_get_wakeup_fd()
- Обновлен uasync_poll(): wakeup fd добавлен первым в массив pollfd
- В signal_handler добавлена запись в wakeup pipe при получении сигнала
- Глобальная переменная g_wakeup_pipe_write_fd хранит write конец pipe для signal handler
- При получении сигнала poll() немедленно пробуждается, а не ждет таймаута
- Обработка сигналов стала мгновенной, устранена задержка до 100ms
- Убран select и FD_SETSIZE ограничения
- Реализация теперь использует poll() для мониторинга сокетов
- Удалены поля fd_set, max_fd, fd_to_node из структуры uasync_s
- Обновлены функции uasync_add_socket, uasync_remove_socket, uasync_poll
- Добавлен заголовок <poll.h> и <limits.h>
- Все тесты проходят успешно, обратная совместимость API сохранена
Fri Jan 16 2026 22:15: Расширение stress-теста для демонстрации queue_wait_threshold и мониторинга очередей
- Добавлен API для доступа к очередям подключения: conn_get_output_queue() и conn_get_input_queue()
- Реализован опциональный pacing в test_connection_stress через #define USE_PACING
- Добавлен механизм pacing: отправка пакетов по одному с ожиданием опустошения очереди через queue_wait_threshold
- Добавлен мониторинг очередей: периодический вывод статистики (количество записей, общий размер)
- Обнаружено накопление очередей в app_input_queue (ACCUMULATING в логах), требует дальнейшей оптимизации
- Все изменения обратно совместимы, тесты компилируются и работают
Fri Jan 16 2026 23:45: Исправление pacing механизма в stress-тесте
- Отладка race condition в queue_wait_threshold: callback вызывается немедленно при пустой очереди, возвращает NULL
- Добавлена обработка случая, когда waiter не регистрируется из-за уже выполненного условия
- Устранена ложная ошибка "Failed to register queue waiter"
- Добавлена очистка pacing_waiter в callback для избежания висячих указателей
- Pacing теперь работает корректно: пакеты отправляются по одному с ожиданием опустошения очереди
- Добавлены детальные отладочные логи для отслеживания вызовов и состояния очередей

343
src/connection.c

@ -7,6 +7,7 @@
#include "u_async.h"
#include "ll_queue.h"
#include "settings.h"
#include "config_parser.h"
#include <stdlib.h>
#include <string.h>
@ -19,6 +20,46 @@
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <net/if.h> /* for struct ifreq, IFNAMSIZ */
#ifdef __linux__
#include <linux/netfilter.h> /* for SO_MARK */
#endif
/* Helper function: hex string to binary */
static int hex_to_bin(const char *hex, uint8_t *bin, size_t bin_len) {
if (!hex || !bin) return -1;
size_t hex_len = strlen(hex);
if (hex_len % 2 != 0 || hex_len / 2 > bin_len) return -1;
for (size_t i = 0; i < hex_len; i += 2) {
char byte_str[3] = {hex[i], hex[i + 1], '\0'};
char *endptr;
long val = strtol(byte_str, &endptr, 16);
if (*endptr != '\0' || val < 0 || val > 255) return -1;
bin[i / 2] = (uint8_t)val;
}
return hex_len / 2;
}
/* Socket options */
#ifdef __linux__
#include <linux/netfilter_ipv4.h> /* for SO_MARK */
#include <net/if.h> /* for struct ifreq, IFNAMSIZ */
#endif
/* Структура маршрута (server:client pair) */
typedef struct conn_route {
int sockfd;
struct sockaddr_in local_addr;
struct sockaddr_in remote_addr;
int so_mark;
char netif[MAX_NETIF_LEN];
char server_name[MAX_CONN_NAME_LEN];
char client_name[MAX_CONN_NAME_LEN];
uint8_t is_active;
} conn_route_t;
/* Внутренняя структура подключения */
struct conn_handle {
@ -56,6 +97,12 @@ struct conn_handle {
/* Async instance */
uasync_t* ua;
/* Multi-route support */
conn_route_t* routes;
int route_count;
int route_capacity;
int active_route_idx;
};
/* Внутренние функции */
@ -594,6 +641,85 @@ static int create_udp_socket(const char* ip, uint16_t port, struct sockaddr_in*
return sock;
}
static int create_udp_socket_with_opts(const char* ip, uint16_t port,
struct sockaddr_in* addr,
int so_mark, const char* netif) {
int sock = socket(AF_INET, SOCK_DGRAM, 0);
if (sock < 0) {
return -1;
}
/* Suppress unused parameter warnings */
(void)so_mark;
(void)netif;
/* Set socket options */
#ifdef SO_MARK
if (so_mark > 0) {
if (setsockopt(sock, SOL_SOCKET, SO_MARK, &so_mark, sizeof(so_mark)) < 0) {
fprintf(stderr, "Failed to set SO_MARK=%d: %s\n", so_mark, strerror(errno));
/* Continue anyway */
}
#ifdef ETCP_DEBUG
else {
fprintf(stderr, "Successfully set SO_MARK=%d\n", so_mark);
}
#endif
}
#endif
#ifdef SO_BINDTODEVICE
if (netif && netif[0] != '\0') {
struct ifreq ifr;
memset(&ifr, 0, sizeof(ifr));
strncpy(ifr.ifr_name, netif, IFNAMSIZ - 1);
if (setsockopt(sock, SOL_SOCKET, SO_BINDTODEVICE, &ifr, sizeof(ifr)) < 0) {
fprintf(stderr, "Failed to bind to device '%s': %s\n", netif, strerror(errno));
/* Continue anyway */
}
#ifdef ETCP_DEBUG
else {
fprintf(stderr, "Successfully bound socket to device '%s'\n", netif);
}
#endif
}
#endif
/* Установка non-blocking режима */
int flags = fcntl(sock, F_GETFL, 0);
if (flags < 0) {
close(sock);
return -1;
}
if (fcntl(sock, F_SETFL, flags | O_NONBLOCK) < 0) {
close(sock);
return -1;
}
/* Настройка адреса */
memset(addr, 0, sizeof(*addr));
addr->sin_family = AF_INET;
addr->sin_port = htons(port);
if (ip && ip[0] != '\0') {
if (inet_pton(AF_INET, ip, &addr->sin_addr) != 1) {
close(sock);
return -1;
}
} else {
addr->sin_addr.s_addr = INADDR_ANY;
}
/* Bind к адресу */
if (bind(sock, (struct sockaddr*)addr, sizeof(*addr)) < 0) {
close(sock);
return -1;
}
return sock;
}
static void socket_read_callback(void* arg)
{
conn_handle_t* conn = (conn_handle_t*)arg;
@ -644,7 +770,27 @@ static void etcp_tx_callback(epkt_t* epkt, uint8_t* data, uint16_t len, void* ar
{
(void)epkt; /* unused parameter */
conn_handle_t* conn = (conn_handle_t*)arg;
if (!conn || conn->is_closing || conn->sockfd < 0) {
/* Determine active socket and remote address */
int active_sockfd = -1;
struct sockaddr_in* active_remote_addr = NULL;
uint8_t remote_defined = 0;
if (conn->route_count > 0 && conn->active_route_idx >= 0 &&
conn->active_route_idx < conn->route_count) {
/* Use active route */
conn_route_t* route = &conn->routes[conn->active_route_idx];
active_sockfd = route->sockfd;
active_remote_addr = &route->remote_addr;
remote_defined = 1; /* Assume route has remote address defined */
} else if (conn->sockfd >= 0) {
/* Fallback to legacy fields */
active_sockfd = conn->sockfd;
active_remote_addr = &conn->remote_addr;
remote_defined = conn->remote_defined;
}
if (!conn || conn->is_closing || active_sockfd < 0) {
free(data); /* Данные были выделены в etcp.c */
return;
}
@ -659,11 +805,11 @@ static void etcp_tx_callback(epkt_t* epkt, uint8_t* data, uint16_t len, void* ar
}
/* Отправка через UDP сокет */
if (conn->remote_defined) {
if (remote_defined) {
printf("[CONN DEBUG] etcp_tx_callback: sending packet len=%u, stats.packets_sent=%u\n", packet_len, conn->stats.packets_sent);
ssize_t sent = sendto(conn->sockfd, packet_to_send, packet_len, 0,
(struct sockaddr*)&conn->remote_addr,
sizeof(conn->remote_addr));
ssize_t sent = sendto(active_sockfd, packet_to_send, packet_len, 0,
(struct sockaddr*)active_remote_addr,
sizeof(*active_remote_addr));
if (sent == (ssize_t)packet_len) {
conn->stats.packets_sent++;
@ -770,4 +916,191 @@ static void conn_etcp_reset_callback(epkt_t* epkt, void* arg)
/* Note: Данные в очередях приложения (app_input_queue, app_output_queue) не очищаются,
они остаются для повторной отправки после завершения reset handshake */
}
int conn_init_v2(conn_handle_t* conn, const utun_config_t* config, int conn_idx) {
if (!conn || !config || conn_idx < 0 || conn_idx >= config->connection_v2_count) {
return -1;
}
const connection_config_v2_t* conn_v2 = &config->connections_v2[conn_idx];
/* Initialize route array */
conn->route_capacity = conn_v2->route_count;
conn->route_count = 0;
conn->active_route_idx = -1;
conn->routes = calloc(conn->route_capacity, sizeof(conn_route_t));
if (!conn->routes) {
return -1;
}
/* For each route pair */
for (int i = 0; i < conn_v2->route_count; i++) {
const route_pair_t* route_pair = &conn_v2->routes[i];
conn_route_t* route = &conn->routes[i];
/* Find server config */
const server_config_t* server = NULL;
for (int j = 0; j < config->server_count; j++) {
if (strcmp(config->servers[j].name, route_pair->server_name) == 0) {
server = &config->servers[j];
break;
}
}
if (!server) {
fprintf(stderr, "Server '%s' not found in config\n", route_pair->server_name);
goto error;
}
/* Find client config */
const client_config_t* client = NULL;
for (int j = 0; j < config->client_count; j++) {
if (strcmp(config->clients[j].name, route_pair->client_name) == 0) {
client = &config->clients[j];
break;
}
}
if (!client) {
fprintf(stderr, "Client '%s' not found in config\n", route_pair->client_name);
goto error;
}
/* Parse server address (ip:port) */
char server_ip[64];
uint16_t server_port;
if (sscanf(server->addr, "%63[^:]:%hu", server_ip, &server_port) != 2) {
fprintf(stderr, "Invalid server address format: %s\n", server->addr);
goto error;
}
/* Parse client remote address (ip:port) */
char client_ip[64];
uint16_t client_port;
if (sscanf(client->to_addr, "%63[^:]:%hu", client_ip, &client_port) != 2) {
fprintf(stderr, "Invalid client address format: %s\n", client->to_addr);
goto error;
}
/* Create socket with options */
struct sockaddr_in local_addr;
int sockfd = create_udp_socket_with_opts(server_ip, server_port, &local_addr,
server->so_mark, server->netif);
if (sockfd < 0) {
fprintf(stderr, "Failed to create socket for route %s:%s\n",
route_pair->server_name, route_pair->client_name);
goto error;
}
/* Set remote address */
struct sockaddr_in remote_addr;
memset(&remote_addr, 0, sizeof(remote_addr));
remote_addr.sin_family = AF_INET;
remote_addr.sin_port = htons(client_port);
if (inet_pton(AF_INET, client_ip, &remote_addr.sin_addr) != 1) {
fprintf(stderr, "Invalid client IP: %s\n", client_ip);
close(sockfd);
goto error;
}
/* Fill route structure */
route->sockfd = sockfd;
memcpy(&route->local_addr, &local_addr, sizeof(local_addr));
memcpy(&route->remote_addr, &remote_addr, sizeof(remote_addr));
route->so_mark = server->so_mark;
strncpy(route->netif, server->netif, sizeof(route->netif) - 1);
route->netif[sizeof(route->netif) - 1] = '\0';
strncpy(route->server_name, server->name, sizeof(route->server_name) - 1);
strncpy(route->client_name, client->name, sizeof(route->client_name) - 1);
route->is_active = (i == 0) ? 1 : 0; /* First route active */
conn->route_count++;
/* Set legacy sockfd and remote_addr for compatibility (use first route) */
if (i == 0) {
conn->sockfd = sockfd;
memcpy(&conn->remote_addr, &remote_addr, sizeof(remote_addr));
conn->remote_defined = 1;
conn->active_route_idx = 0;
}
}
if (conn->route_count == 0) {
fprintf(stderr, "No valid routes configured\n");
goto error;
}
/* Set cryptographic keys */
/* Convert HEX keys to binary */
uint8_t my_pub_key[64] = {0};
uint8_t my_priv_key[32] = {0};
uint8_t peer_pub_key[64] = {0};
/* Convert keys from HEX strings if provided */
const uint8_t* my_pub_key_ptr = NULL;
const uint8_t* my_priv_key_ptr = NULL;
const uint8_t* peer_pub_key_ptr = NULL;
if (strlen(config->global.my_public_key_hex) > 0) {
if (hex_to_bin(config->global.my_public_key_hex, my_pub_key, sizeof(my_pub_key)) > 0) {
my_pub_key_ptr = my_pub_key;
} else {
fprintf(stderr, "Warning: Invalid my_public_key format, using auto-generation\n");
}
}
if (strlen(config->global.my_private_key_hex) > 0) {
if (hex_to_bin(config->global.my_private_key_hex, my_priv_key, sizeof(my_priv_key)) > 0) {
my_priv_key_ptr = my_priv_key;
} else {
fprintf(stderr, "Warning: Invalid my_private_key format, using auto-generation\n");
}
}
if (strlen(conn_v2->peer_public_key_hex) > 0) {
if (hex_to_bin(conn_v2->peer_public_key_hex, peer_pub_key, sizeof(peer_pub_key)) > 0) {
peer_pub_key_ptr = peer_pub_key;
} else {
fprintf(stderr, "Warning: Invalid peer_public_key format, connection may fail\n");
}
}
if (conn_set_keys(conn, my_pub_key_ptr, my_priv_key_ptr, peer_pub_key_ptr) != 0) {
fprintf(stderr, "Failed to set keys\n");
goto error;
}
/* Set keepalive interval if needed (future use) */
/* conn_v2->keepalive */
return 0;
error:
/* Cleanup any created sockets */
for (int i = 0; i < conn->route_count; i++) {
if (conn->routes[i].sockfd >= 0) {
close(conn->routes[i].sockfd);
}
}
free(conn->routes);
conn->routes = NULL;
conn->route_count = 0;
conn->route_capacity = 0;
conn->active_route_idx = -1;
return -1;
}
ll_queue_t* conn_get_output_queue(conn_handle_t* conn)
{
if (!conn) {
return NULL;
}
return conn->app_input_queue;
}
ll_queue_t* conn_get_input_queue(conn_handle_t* conn)
{
if (!conn) {
return NULL;
}
return conn->app_output_queue;
}

38
src/connection.h

@ -4,6 +4,7 @@
#include <stdint.h>
#include <stddef.h>
#include "config_parser.h"
#ifdef __cplusplus
extern "C" {
@ -15,6 +16,9 @@ typedef struct conn_handle conn_handle_t;
/* Forward declaration для uasync */
typedef struct uasync_s uasync_t;
/* Forward declaration для очередей */
typedef struct ll_queue ll_queue_t;
/* Режим подключения */
typedef enum {
CONN_MODE_CLIENT, /* Инициируем подключение к указанному удаленному адресу */
@ -125,6 +129,40 @@ typedef struct {
int conn_get_stats(conn_handle_t* conn, conn_stats_t* stats);
/**
* @brief Получить очередь отправки (данные от приложения для передачи)
* @param conn Дескриптор подключения
* @return Указатель на очередь отправки или NULL при ошибке
*
* Очередь отправки (app_input_queue) содержит данные от приложения,
* которые ожидают обработки packer'ом и отправки через ETCP.
* Используется для контроля потока и pacing.
*/
ll_queue_t* conn_get_output_queue(conn_handle_t* conn);
/**
* @brief Получить очередь приема (собранные данные для приложения)
* @param conn Дескриптор подключения
* @return Указатель на очередь приема или NULL при ошибке
*
* Очередь приема (app_output_queue) содержит собранные пакеты,
* готовые для передачи приложению через recv_callback.
*/
ll_queue_t* conn_get_input_queue(conn_handle_t* conn);
/**
* @brief Initialize connection from v2 configuration
* @param conn Connection handle (must be created with conn_create)
* @param config Parsed configuration (must contain at least one connection_v2)
* @param conn_idx Index of connection_v2 in config (usually 0)
* @return 0 on success, -1 on error
*
* Sets up multiple routes based on server:client pairs, creates sockets with
* options (SO_MARK, SO_BINDTODEVICE), sets cryptographic keys, and prepares
* connection for data transmission using first route as active.
*/
int conn_init_v2(conn_handle_t* conn, const utun_config_t* config, int conn_idx);
#ifdef __cplusplus
}
#endif

17
src/ll_queue.c

@ -12,6 +12,9 @@ static void queue_resume_timeout_cb(void* arg);
static void check_waiters(ll_queue_t* q) {
if (!q || !q->waiters) return;
printf("[LL_QUEUE DEBUG] check_waiters: checking %d waiters, count=%d, bytes=%zu\n",
(q->waiters ? 1 : 0), q->count, q->total_bytes);
queue_waiter_t** pprev = &q->waiters;
queue_waiter_t* waiter = q->waiters;
@ -20,7 +23,8 @@ static void check_waiters(ll_queue_t* q) {
// Проверить условие: не больше max_packets и не больше max_bytes
if (q->count <= waiter->max_packets && q->total_bytes <= waiter->max_bytes) {
// Условие выполнено - вызвать коллбэк
printf("[LL_QUEUE DEBUG] check_waiters: condition met, calling callback, count=%d<=%d, bytes=%zu<=%zu\n",
q->count, waiter->max_packets, q->total_bytes, waiter->max_bytes);
waiter->callback(q, waiter->callback_arg);
// Удалить waiter из списка
*pprev = next;
@ -28,6 +32,8 @@ static void check_waiters(ll_queue_t* q) {
// pprev уже указывает на правильный следующий элемент
} else {
// Условие не выполнено - оставить в списке
printf("[LL_QUEUE DEBUG] check_waiters: condition NOT met, count=%d>%d or bytes=%zu>%zu\n",
q->count, waiter->max_packets, q->total_bytes, waiter->max_bytes);
pprev = &waiter->next;
}
waiter = next;
@ -238,7 +244,7 @@ int queue_entry_count(ll_queue_t* q) {
// ==================== Асинхронное ожидание ====================
queue_waiter_t* queue_wait_threshold(ll_queue_t* q, int max_packets, size_t max_bytes,
queue_threshold_callback_t callback, void* arg) {
queue_threshold_callback_t callback, void* arg) {
if (!q || !callback) return NULL;
// Создать новый waiter
@ -254,15 +260,22 @@ queue_waiter_t* queue_wait_threshold(ll_queue_t* q, int max_packets, size_t max_
// Проверить условие немедленно
if (q->count <= max_packets && q->total_bytes <= max_bytes) {
// Условие уже выполнено - вызвать коллбэк и освободить waiter
printf("[LL_QUEUE DEBUG] queue_wait_threshold: condition already met, count=%d<=%d, bytes=%zu<=%zu, calling callback\n",
q->count, max_packets, q->total_bytes, max_bytes);
callback(q, arg);
free(waiter);
return NULL;
}
printf("[LL_QUEUE DEBUG] queue_wait_threshold: registering waiter, count=%d, bytes=%zu, max_packets=%d, max_bytes=%zu\n",
q->count, q->total_bytes, max_packets, max_bytes);
// Добавить в список ожидающих
waiter->next = q->waiters;
q->waiters = waiter;
printf("[LL_QUEUE DEBUG] queue_wait_threshold: waiter registered successfully, waiters list=%p, returning waiter=%p\n",
(void*)q->waiters, (void*)waiter);
return waiter;
}

Loading…
Cancel
Save