Browse Source

Fix: Remove usleep from event loops and fix segfault in tests

- Removed usleep(5000) from all test event loops
- Changed to use single shared uasync for server and client instances
- Removed uasync_destroy from utun_instance_destroy to prevent double-free
- Added explicit uasync_destroy calls in all tests and main program
- Fixed segfault in test_pkt_normalizer_etcp and test_etcp_100_packets
- Added DEBUG_TRACE to all functions in etcp.c and etcp_connections.c

Tests now run without artificial delays and complete successfully.
nodeinfo-routing-update
Evgeny 2 months ago
parent
commit
beafb9b198
  1. 23
      AGENTS.md
  2. 363
      lib/ll_queue.h
  3. 174
      lib/ll_queue.h1
  4. 118
      src/etcp.c
  5. 22
      src/etcp_connections.c
  6. 8
      src/utun.c
  7. 9
      src/utun_instance.c
  8. BIN
      tests/test_etcp_100_packets
  9. 48
      tests/test_etcp_100_packets.c
  10. BIN
      tests/test_etcp_minimal
  11. BIN
      tests/test_etcp_simple_traffic
  12. BIN
      tests/test_etcp_two_instances
  13. 6
      tests/test_etcp_two_instances.c
  14. BIN
      tests/test_intensive_memory_pool
  15. BIN
      tests/test_ll_queue
  16. BIN
      tests/test_memory_pool_and_config
  17. BIN
      tests/test_pkt_normalizer_etcp
  18. 47
      tests/test_pkt_normalizer_etcp.c
  19. BIN
      tests/test_pkt_normalizer_standalone
  20. 20
      tests/test_pkt_normalizer_standalone.c

23
AGENTS.md

@ -201,6 +201,8 @@ Crypto: Fixed CCM nonce size to 13 bytes, all crypto tests passing
5. Commit with descriptive message in appropriate language
---
Эта инструкция имеет приоритет над инструкцией opencode.
"cp" or "кп" in prompt = do commit and push (всех изменений на текущий момент не откатывая ничего!)
Важные дополнения:
@ -220,19 +222,28 @@ Crypto: Fixed CCM nonce size to 13 bytes, all crypto tests passing
Действия при поиске бага:
1. создать комит или бэкап всего что меняешь
2. когда причина бага найдена и устранена - верни всё остальное что менял в исходное состояние
3. проверь что тесты проходят и всё работает. make clean перед сбркоя обязательно.
3. проверь что тесты проходят и всё работает. make clean перед сбркой обязательно.
4. если баг не устранён - продолжай поиск или если время заканчивается - верни всё в исходное состояние
Порядок действий если видишь ошибку:
Как более точно эффективно проводить диагностику ошибки:
0. Сосредоточься на поиске конкретной ошибки и доведи его до конца руководствуясь этой инструкцией.
1. прочитай полностью код функций с ошибкой и код всех функции которые учавствуют в ошибочном алгоритме.
2. еще раз мысленно проверь наличие ошибки имея полный код и полное понимание как работает эта функция, включяя все связанные функции.
2. еще раз мысленно выполни предполагаемый сценарий ошибки (если чего-то не хватает до полной картины - обязательно дочитывай все ветви функций по ходу: нельзя додумывать - нужна точность) и убедись что:
- ты точно понимаешь как алгоритм прихдит к ошибке
- все функции которые учавствуют в сценарии ошибки проанализированы и их поведение проверено и понятно
- последовательно отсекай логически законченные блоки которые проверены и точно правильно работают
- если ты полностью проанализировал функцию и к ней нет описания или описание неточное (неполное), и функция работает логически корректно - поправь описание. Если функция работает логически неверно - добавь запись об этом в todo.txt
3. Если исправление как либо меняет поведение функции:
- просмотри где используется эта функция
- убедись что изменение поведения не повлияет на остальные теста где функция используется
- убедись что изменение поведения не повлияет на остальные места где функция используется. особенно важный пункт для библиотек и модулей коммуникаций
Работа с очередями.
- Буфера забивать нельзя! всегда контролируй заполненность буфера. Для этого есть инструмент queue_wait_threshold
- Для получения из очереди также есть callback. их нужно использовать в тестах.
- Запись в очередь: очереди забивать нельзя. Добавляй следующий элемент только когда очередь стала пустой. Пользуйся наблюдателем queue_wait_threshold, он специально сделан для добавления в очередь.
- Чтение из очереди: пользуйся queue_set_callback: при вызове callback , обработай один или несколько элементов, потом вызови resume_callback.
Работа с u_async
- нельзя использовать в одном потоке несколько u_async. один поток = всегда 1 uasync instance
- нельзя использовать sleep/usleep если есть uasync. Нужно использовать uasync_set_timeout.
тех задания для реализации в каталоге /doc.

363
lib/ll_queue.h

@ -1,4 +1,3 @@
// ll_queue.h - Упрощенная архитектура: разделение создания элементов и работы с очередью
#ifndef LL_QUEUE_H
#define LL_QUEUE_H
@ -6,169 +5,285 @@
#include <stdint.h> // для uint64_t
#include "memory_pool.h" // для struct memory_pool
// Предварительные объявления
struct ll_queue;
struct ll_entry;
struct queue_waiter;
struct memory_pool;
struct UASYNC;
/**
* @file ll_queue.h
* @brief Упрощённая двусвязная очередь с поддержкой автозабора элементов через callback,
* порогового ожидания освобождения места, поиска по ID и гибкого управления памятью.
*
* Основные особенности:
* - FIFO-очередь (с возможностью добавления в начало для приоритета)
* - Автоматический вызов callback при появлении элементов (один элемент за раз)
* - Обязательный вызов queue_resume_callback после обработки элемента
* - Поддержка пулов памяти для entry и отдельно для dgram
* - Хеш-таблица для быстрого поиска по ID (опционально)
* - Встроенный одиночный waiter для ожидания освобождения очереди до заданного порога
* - Использует uasync для отложенного возобновления callback'ов (без рекурсии в стеке)
*
* @note Важные правила использования автозабора (queue_set_callback):
* 1. Коллбэк получает управление только если в очереди есть элементы И !callback_suspended
* 2. Внутри коллбэка ОБЯЗАТЕЛЬНО: queue_data_get() обработать элемент queue_resume_callback()
* 3. Без вызова queue_resume_callback очередь навсегда застрянет
* 4. Не извлекайте элементы вручную вне коллбэка это нарушит логику
* 5. queue_data_get() автоматически suspend'ит коллбэки для предотвращения рекурсии
*
* @note Память:
* - queue_free() освобождает ТОЛЬКО структуру очереди и хеш-таблицу
* - Все элементы должны быть извлечены через queue_data_get() и освобождены через queue_entry_free()
* - dgram освобождается отдельно через queue_dgram_free() или кастомную dgram_free_fn
*/
/**
* @struct ll_entry
* @brief Элемент очереди (переменного размера).
*
* Память выделяется одним блоком: [struct ll_entry + data[size]].
* Поле dgram отдельный блок (malloc / пул / внешняя память).
*/
struct ll_entry {
struct ll_entry* next; ///< Следующий элемент в очереди
struct ll_entry* prev; ///< Предыдущий элемент в очереди
uint16_t size; ///< Размер пользовательского буфера data[] (байт)
uint16_t len; ///< Актуальная длина данных в dgram
uint16_t memlen; ///< Выделенный размер под dgram
uint16_t int_len; ///< Внутреннее (не использовать)
uint8_t* dgram; ///< Указатель на данные пакета
void (*dgram_free_fn)(uint8_t* data, void* arg); ///< Кастомная функция освобождения dgram
struct memory_pool* dgram_pool; ///< Пул для dgram (если используется)
uint32_t id; ///< Идентификатор для поиска (задаётся при добавлении)
struct ll_entry* hash_next; ///< Следующий в хеш-цепочке
struct memory_pool* pool; ///< Пул, из которого выделен сам entry (NULL = malloc)
uint8_t data[0]; ///< Гибкий массив пользовательских данных (размер = size)
};
// Автозабор элемента из очереди. вызывается когда в очереди что-то есть и коллбэк не занят обработкой.
// Параметры: указатель на очередь, указатель на структуру элемента (struct ll_entry*), пользовательский аргумент
// Когда коллбэк закончит обрабатывать элемент он должен вызвать queue_resume_callback - сообщить о готовности получить следующий элемент (приём очередным вызовом коллбэка в следующем цикле mainloop).
/**
* @typedef queue_callback_fn
* @brief Коллбэк автозабора элементов из очереди.
* @param q указатель на очередь
* @param arg пользовательский аргумент (из queue_set_callback)
*
* @note Внутри функции:
* - Вызвать queue_data_get(q) для получения элемента
* - Обработать элемент (можно асинхронно)
* - После завершения обработки вызвать queue_resume_callback(q)
*/
typedef void (*queue_callback_fn)(struct ll_queue* q, void* arg);
// Структура элемента - переменный размер, данные расположены сразу после структуры
struct ll_entry {
struct ll_entry* next; // Указатель на следующий элемент в очереди
struct ll_entry* prev; // Указатель на предыдущий элемент в очереди
uint16_t size; // Размер доступной памяти после блока ll_entry - т.е. data[size]. используется для добавления доп. параметров
uint16_t len; // размер пакета (dgram)
uint16_t memlen; // размер выделенной памяти (dgram)
uint16_t int_len; // размер (private, not use!)
uint8_t* dgram; // данные пакета
void (*dgram_free_fn)(uint8_t* data, void* arg); // функция освобождения блока
struct memory_pool* dgram_pool; // Пул, из которого выделен этот элемент (NULL, если выделен через malloc)
uint32_t id; // Идентификатор для хеш-поиска
struct ll_entry* hash_next; // Следующий в хеш-цепочке
struct memory_pool* pool; // Пул, из которого выделен этот элемент (NULL, если выделен через malloc)
uint8_t data[0];
};
/**
* @typedef queue_threshold_callback_fn
* @brief Одноразовый коллбэк при достижении порога очереди (queue_wait_threshold).
* @param q указатель на очередь
* @param arg пользовательский аргумент
*/
typedef void (*queue_threshold_callback_fn)(struct ll_queue* q, void* arg);
// Ожидающий добавления элементов в очередь (пока очередь не освободться до нужного размера чтобы не забивать)
/**
* @struct queue_waiter
* @brief Описание ожидания освобождения места в очереди (встроен в ll_queue только один).
*/
struct queue_waiter {
int max_packets; // Максимальное количество пакетов
size_t max_bytes; // Максимальное количество байт (0 = не проверять байты)
void (*callback)(struct ll_queue* q, void* arg); // Коллбэк для вызова
void* callback_arg; // Аргумент коллбэка
int max_packets; ///< Максимально допустимое количество элементов
size_t max_bytes; ///< Максимально допустимый объём данных (0 = не проверять)
queue_threshold_callback_fn callback; ///< Коллбэк (вызывается один раз)
void* callback_arg; ///< Аргумент коллбэка
};
typedef void (*queue_threshold_callback_fn)(struct ll_queue* q, void* arg);
// Структура очереди
/**
* @struct ll_queue
* @brief Структура очереди.
*/
struct ll_queue {
struct ll_entry* head; // Первый элемент (извлекается отсюда)
struct ll_entry* tail; // Последний элемент (добавляется сюда)
int count; // Текущее количество элементов
size_t total_bytes; // Общий размер данных всех элементов (байт)
int size_limit; // Максимальное количество (-1 = без ограничения)
queue_callback_fn callback; // Функция коллбэка
void* callback_arg; // Пользовательский аргумент для коллбэка
int callback_suspended;
void* resume_timeout_id; // ID таймаута uasync для отложенного возобновления
struct UASYNC* ua; // Экземпляр uasync для таймеров
struct queue_waiter waiter; // Встроенный одиночный waiter
// Хеш-таблица для быстрого поиска по ID
struct ll_entry** hash_table;
size_t hash_size;
struct ll_entry* head; ///< Голова очереди (отсюда извлекаем)
struct ll_entry* tail; ///< Хвост очереди (сюда добавляем)
int count; ///< Текущее количество элементов
size_t total_bytes; ///< Суммарный объём данных (сумма int_len)
int size_limit; ///< Максимальное количество элементов (-1 = без лимита)
queue_callback_fn callback; ///< Коллбэк автозабора
void* callback_arg; ///< Аргумент коллбэка
int callback_suspended; ///< 1 = коллбэки временно приостановлены
void* resume_timeout_id; ///< ID таймера uasync для отложенного resume
struct UASYNC* ua; ///< Экземпляр uasync (обязателен для таймеров)
struct queue_waiter waiter; ///< Встроенный waiter (только один)
struct ll_entry** hash_table; ///< Хеш-таблица для поиска по id (если hash_size > 0)
size_t hash_size; ///< Размер хеш-таблицы
};
// ==================== Управление очередью ====================
/* ==================== Создание / уничтожение ==================== */
// Создать новую пустую очередь
// ua - экземпляр uasync для таймеров (обязательный параметр)
// hash_size - размер хеш-таблицы для быстрого поиска (0 = без хеш-таблицы)
// Возвращает: указатель на очередь или NULL при ошибке выделения памяти
/**
* @brief Создаёт новую пустую очередь.
* @param ua экземпляр uasync (обязателен для отложенного resume коллбэков)
* @param hash_size размер хеш-таблицы (0 = поиск по id отключён)
* @return указатель на очередь или NULL при ошибке выделения памяти
*/
struct ll_queue* queue_new(struct UASYNC* ua, size_t hash_size);
// Освободить очередь и все её элементы
// ВАЖНО: освобождает только структуру очереди, элементы в очереди НЕ освобождаются!
// Элементы должны быть предварительно извлечены через queue_data_get() и освобождены через queue_data_free()
/**
* @brief Освобождает структуру очереди.
* @param q очередь
*
* @warning НЕ ОСВОБОЖДАЕТ элементы в очереди!
* Их нужно извлечь через queue_data_get() и освободить через queue_entry_free().
*/
void queue_free(struct ll_queue* q);
// Установить максимальное количество элементов в очереди
// При превышении лимита новый элемент автоматически освобождается
/* ==================== Конфигурация ==================== */
/**
* @brief Устанавливает максимальное количество элементов в очереди.
* @param q очередь
* @param lim лимит (-1 = без ограничения)
*
* При превышении лимита новые элементы автоматически освобождаются.
*/
void queue_set_size_limit(struct ll_queue* q, int lim);
// ==================== Асинхронное ожидание передачи ====================
/* ==================== Автозабор элементов ==================== */
// Зарегистрировать одноразовый коллбэк, который будет вызван когда очередь будет иметь
// не более max_packets пакетов и не более max_bytes байт (если max_bytes != 0).
// Если условие уже выполнено, коллбэк вызывается немедленно.
// Возвращает указатель на waiter для возможной отмены через queue_cancel_wait
/**
* @brief Устанавливает коллбэк для автоматического извлечения элементов.
* @param q очередь
* @param cbk_fn функция-коллбэк
* @param arg пользовательский аргумент
*
* Коллбэк вызывается автоматически при наличии элементов и разрешённом состоянии.
*/
void queue_set_callback(struct ll_queue* q, queue_callback_fn cbk_fn, void* arg);
/**
* @brief Возобновляет работу коллбэка после обработки элемента.
* @param q очередь
*
* @warning ОБЯЗАТЕЛЬНО вызывать после завершения обработки элемента в коллбэке.
* Иначе автозабор элементов навсегда остановится.
*/
void queue_resume_callback(struct ll_queue* q);
/* ==================== Пороговое ожидание ==================== */
/**
* @brief Регистрирует одноразовый коллбэк, который сработает, когда очередь освободится до заданного порога.
* @param q очередь
* @param max_packets максимальное количество элементов
* @param max_bytes максимальный объём данных (0 = не проверять)
* @param callback функция, вызываемая при достижении порога
* @param arg аргумент коллбэка
* @return указатель на waiter (для отмены) или NULL, если условие уже выполнено (коллбэк вызван сразу)
*/
struct queue_waiter* queue_wait_threshold(struct ll_queue* q, int max_packets, size_t max_bytes,
queue_threshold_callback_fn callback, void* arg);
queue_threshold_callback_fn callback, void* arg);
// Отменить ожидание (удалить waiter из списка)
/**
* @brief Отменяет ранее установленное ожидание порога.
* @param q очередь
* @param waiter указатель, возвращённый queue_wait_threshold()
*/
void queue_cancel_wait(struct ll_queue* q, struct queue_waiter* waiter);
// Получить общий размер данных в очереди (байт)
static inline size_t queue_total_bytes(struct ll_queue* q) {
if (!q) return 0;
return q->total_bytes;
}
/* ==================== Работа с данными ==================== */
/**
* @brief Добавляет элемент в конец очереди (FIFO).
* @param q очередь
* @param entry элемент
* @param id идентификатор для поиска
* @return 0 успех, -1 превышен лимит (элемент освобождён)
*/
int queue_data_put(struct ll_queue* q, struct ll_entry* entry, uint32_t id);
// ==================== Асинхронное ожидание приёма ====================
/**
* @brief Добавляет элемент в начало очереди (LIFO, высокий приоритет).
* @param q очередь
* @param entry элемент
* @param id идентификатор
* @return 0 успех, -1 превышен лимит (элемент освобождён)
*/
int queue_data_put_first(struct ll_queue* q, struct ll_entry* entry, uint32_t id);
// Установить функцию и аргумент коллбэка для автозабора из очереди
// Коллбэк вызывается когда в очереди есть элемент и разрешен коллбэк
// обработчик должен обработать этот пакет (может использовать асинхронное ожидание). Когда будет готов к приёму следующего - должен вызвать resume_callback. обработка строго по одному пакету.
void queue_set_callback(struct ll_queue* q, queue_callback_fn cbk_fn, void* arg);
/**
* @brief Извлекает элемент из начала очереди.
* @param q очередь
* @return элемент или NULL, если очередь пуста
*
* @note Автоматически приостанавливает коллбэки (callback_suspended = 1)
* После обработки элемента необходимо вызвать queue_resume_callback().
*/
struct ll_entry* queue_data_get(struct ll_queue* q);
// обрабатывается строго одина пакет за вызов
// Возобновить коллбэки после обработки элемента переданного в коллбэке (тянуть дополнительные элементы из очереди не предусмотернные api нельзя).
// эта функция должна вызываться всегда после того как cbk_fn обработала пакет (можно с ожиданием через async), иначе очередь застрянет.
// Если в очереди остались элементы, запланирует вызов коллбэка через uasync_set_timeout(0)
// Это предотвращает накопление рекурсии в стеке вызовов
void queue_resume_callback(struct ll_queue* q);
/**
* @brief Возвращает текущее количество элементов в очереди.
* @param q очередь
* @return количество элементов
*/
int queue_entry_count(struct ll_queue* q);
// ==================== Управление элементами ====================
/* ==================== Управление памятью элементов ==================== */
// выделить ll_entry и память под кодограмму
/**
* @brief Выделяет элемент с отдельным буфером dgram (malloc).
* @param len требуемый размер dgram
* @return элемент или NULL при ошибке
*/
struct ll_entry* ll_alloc_lldgram(uint16_t len);
// Создать новый элемент с областью данных указанного размера
// Память выделяется одним блоком: [struct ll_entry][область данных data_size байт]
// Возвращает: указатель на структуру элемента (struct ll_entry*) или NULL при ошибке выделения памяти
/**
* @brief Создаёт элемент с пользовательским буфером data[] (malloc одним блоком).
* @param data_size размер data[]
* @return элемент или NULL при ошибке
*/
struct ll_entry* queue_entry_new(size_t data_size);
// Создать новый элемент из пула (размер был определен при создании пула)
// Возвращает: указатель на структуру элемента (struct ll_entry*) или NULL при ошибке выделения памяти
/**
* @brief Создаёт элемент из пула памяти.
* @param pool пул
* @return элемент или NULL, если пул пуст
*/
struct ll_entry* queue_entry_new_from_pool(struct memory_pool* pool);
// Освободить только entry (не влияет на очереди, dgram не освобождает)
/**
* @brief Освобождает структуру элемента (возвращает в пул или free).
* @param entry элемент
*
* @note НЕ освобождает dgram! Используйте queue_dgram_free() отдельно.
*/
void queue_entry_free(struct ll_entry* entry);
// Освободить только entry->dgram (не влияет на очереди)
/**
* @brief Освобождает буфер dgram элемента.
* @param entry элемент
*/
void queue_dgram_free(struct ll_entry* entry);
//void queue_data_free(void* data);
// ==================== Операции с очередью ====================
// Добавить элемент в конец очереди (FIFO)
// Если очередь была пустой и коллбэки разрешены - вызывает коллбэк
// Возвращает: 0 при успехе, -1 если превышен лимит размера (элемент освобожден)
int queue_data_put(struct ll_queue* q, struct ll_entry* entry, uint32_t id);
// Добавить элемент в начало очереди (LIFO, высокий приоритет)
// Если очередь была пустой и коллбэки разрешены - вызывает коллбэк
// Возвращает: 0 при успехе, -1 если превышен лимит размера (элемент освобожден)
int queue_data_put_first(struct ll_queue* q, struct ll_entry* entry, uint32_t id);
/* ==================== Поиск и удаление ==================== */
// Извлечь элемент из начала очереди
// При извлечении приостанавливает коллбэки (callback_suspended = 1) чтобы предотвратить рекурсию
// Возвращает: указатель на структуру элемента (struct ll_entry*) или NULL если очередь пуста
// ПРИМЕЧАНИЕ: не освобождает память элемента
struct ll_entry* queue_data_get(struct ll_queue* q);
// Получить текущее количество элементов в очереди
int queue_entry_count(struct ll_queue* q);
// ==================== Поиск и удаление по ID ====================
// Найти элемент по ID
// Возвращает: указатель на структуру элемента (struct ll_entry*) или NULL если не найден
/**
* @brief Находит элемент по идентификатору (требуется hash_size > 0 при создании).
* @param q очередь
* @param id искомый идентификатор
* @return элемент или NULL
*/
struct ll_entry* queue_find_data_by_id(struct ll_queue* q, uint32_t id);
// Удалить элемент из очереди по указателю на структуру элемента
// Возвращает: 0 при успехе, -1 если элемент не найден
// ПРИМЕЧАНИЕ: НЕ изменяет ref_count элемента, просто удаляет из очереди
/**
* @brief Удаляет элемент из очереди (не освобождает память).
* @param q очередь
* @param entry элемент
* @return 0 успех, -1 элемент не найден в очереди
*/
int queue_remove_data(struct ll_queue* q, struct ll_entry* entry);
/* ==================== Утилиты ==================== */
/**
* @brief Возвращает суммарный объём данных в очереди.
* @param q очередь
* @return количество байт
*/
static inline size_t queue_total_bytes(struct ll_queue* q) {
return q ? q->total_bytes : 0;
}
#endif // LL_QUEUE_H

174
lib/ll_queue.h1

@ -0,0 +1,174 @@
// ll_queue.h - Упрощенная архитектура: разделение создания элементов и работы с очередью
#ifndef LL_QUEUE_H
#define LL_QUEUE_H
#include <stddef.h> // для size_t
#include <stdint.h> // для uint64_t
#include "memory_pool.h" // для struct memory_pool
// Предварительные объявления
struct ll_queue;
struct ll_entry;
struct queue_waiter;
struct memory_pool;
struct UASYNC;
// Автозабор элемента из очереди. вызывается когда в очереди что-то есть и коллбэк не занят обработкой.
// Параметры: указатель на очередь, указатель на структуру элемента (struct ll_entry*), пользовательский аргумент
// Когда коллбэк закончит обрабатывать элемент он должен вызвать queue_resume_callback - сообщить о готовности получить следующий элемент (приём очередным вызовом коллбэка в следующем цикле mainloop).
typedef void (*queue_callback_fn)(struct ll_queue* q, void* arg);
// Структура элемента - переменный размер, данные расположены сразу после структуры
struct ll_entry {
struct ll_entry* next; // Указатель на следующий элемент в очереди
struct ll_entry* prev; // Указатель на предыдущий элемент в очереди
uint16_t size; // Размер доступной памяти после блока ll_entry - т.е. data[size]. используется для добавления доп. параметров
uint16_t len; // размер пакета (dgram)
uint16_t memlen; // размер выделенной памяти (dgram)
uint16_t int_len; // размер (private, not use!)
uint8_t* dgram; // данные пакета
void (*dgram_free_fn)(uint8_t* data, void* arg); // функция освобождения блока
struct memory_pool* dgram_pool; // Пул, из которого выделен этот элемент (NULL, если выделен через malloc)
uint32_t id; // Идентификатор для хеш-поиска
struct ll_entry* hash_next; // Следующий в хеш-цепочке
struct memory_pool* pool; // Пул, из которого выделен этот элемент (NULL, если выделен через malloc)
uint8_t data[0];
};
// Ожидающий добавления элементов в очередь (пока очередь не освободться до нужного размера чтобы не забивать)
struct queue_waiter {
int max_packets; // Максимальное количество пакетов
size_t max_bytes; // Максимальное количество байт (0 = не проверять байты)
void (*callback)(struct ll_queue* q, void* arg); // Коллбэк для вызова
void* callback_arg; // Аргумент коллбэка
};
typedef void (*queue_threshold_callback_fn)(struct ll_queue* q, void* arg);
// Структура очереди
struct ll_queue {
struct ll_entry* head; // Первый элемент (извлекается отсюда)
struct ll_entry* tail; // Последний элемент (добавляется сюда)
int count; // Текущее количество элементов
size_t total_bytes; // Общий размер данных всех элементов (байт)
int size_limit; // Максимальное количество (-1 = без ограничения)
queue_callback_fn callback; // Функция коллбэка
void* callback_arg; // Пользовательский аргумент для коллбэка
int callback_suspended;
void* resume_timeout_id; // ID таймаута uasync для отложенного возобновления
struct UASYNC* ua; // Экземпляр uasync для таймеров
struct queue_waiter waiter; // Встроенный одиночный waiter
// Хеш-таблица для быстрого поиска по ID
struct ll_entry** hash_table;
size_t hash_size;
};
// ==================== Управление очередью ====================
// Создать новую пустую очередь
// ua - экземпляр uasync для таймеров (обязательный параметр)
// hash_size - размер хеш-таблицы для быстрого поиска (0 = без хеш-таблицы)
// Возвращает: указатель на очередь или NULL при ошибке выделения памяти
struct ll_queue* queue_new(struct UASYNC* ua, size_t hash_size);
// Освободить очередь и все её элементы
// ВАЖНО: освобождает только структуру очереди, элементы в очереди НЕ освобождаются!
// Элементы должны быть предварительно извлечены через queue_data_get() и освобождены через queue_data_free()
void queue_free(struct ll_queue* q);
// Установить максимальное количество элементов в очереди
// При превышении лимита новый элемент автоматически освобождается
void queue_set_size_limit(struct ll_queue* q, int lim);
// ==================== Асинхронное ожидание передачи ====================
// Зарегистрировать одноразовый коллбэк, который будет вызван когда очередь будет иметь
// не более max_packets пакетов и не более max_bytes байт (если max_bytes != 0).
// Если условие уже выполнено, коллбэк вызывается немедленно.
// Возвращает указатель на waiter для возможной отмены через queue_cancel_wait
struct queue_waiter* queue_wait_threshold(struct ll_queue* q, int max_packets, size_t max_bytes,
queue_threshold_callback_fn callback, void* arg);
// Отменить ожидание (удалить waiter из списка)
void queue_cancel_wait(struct ll_queue* q, struct queue_waiter* waiter);
// Получить общий размер данных в очереди (байт)
static inline size_t queue_total_bytes(struct ll_queue* q) {
if (!q) return 0;
return q->total_bytes;
}
// ==================== Асинхронное ожидание приёма ====================
// Установить функцию и аргумент коллбэка для автозабора из очереди
// Коллбэк вызывается когда в очереди есть элемент и разрешен коллбэк
// обработчик должен обработать этот пакет (может использовать асинхронное ожидание). Когда будет готов к приёму следующего - должен вызвать resume_callback. обработка строго по одному пакету.
void queue_set_callback(struct ll_queue* q, queue_callback_fn cbk_fn, void* arg);
// обрабатывается строго одина пакет за вызов
// Возобновить коллбэки после обработки элемента переданного в коллбэке (тянуть дополнительные элементы из очереди не предусмотернные api нельзя).
// эта функция должна вызываться всегда после того как cbk_fn обработала пакет (можно с ожиданием через async), иначе очередь застрянет.
// Если в очереди остались элементы, запланирует вызов коллбэка через uasync_set_timeout(0)
// Это предотвращает накопление рекурсии в стеке вызовов
void queue_resume_callback(struct ll_queue* q);
// ==================== Управление элементами ====================
// выделить ll_entry и память под кодограмму
struct ll_entry* ll_alloc_lldgram(uint16_t len);
// Создать новый элемент с областью данных указанного размера
// Память выделяется одним блоком: [struct ll_entry][область данных data_size байт]
// Возвращает: указатель на структуру элемента (struct ll_entry*) или NULL при ошибке выделения памяти
struct ll_entry* queue_entry_new(size_t data_size);
// Создать новый элемент из пула (размер был определен при создании пула)
// Возвращает: указатель на структуру элемента (struct ll_entry*) или NULL при ошибке выделения памяти
struct ll_entry* queue_entry_new_from_pool(struct memory_pool* pool);
// Освободить только entry (не влияет на очереди, dgram не освобождает)
void queue_entry_free(struct ll_entry* entry);
// Освободить только entry->dgram (не влияет на очереди)
void queue_dgram_free(struct ll_entry* entry);
//void queue_data_free(void* data);
// ==================== Операции с очередью ====================
// Добавить элемент в конец очереди (FIFO)
// Если очередь была пустой и коллбэки разрешены - вызывает коллбэк
// Возвращает: 0 при успехе, -1 если превышен лимит размера (элемент освобожден)
int queue_data_put(struct ll_queue* q, struct ll_entry* entry, uint32_t id);
// Добавить элемент в начало очереди (LIFO, высокий приоритет)
// Если очередь была пустой и коллбэки разрешены - вызывает коллбэк
// Возвращает: 0 при успехе, -1 если превышен лимит размера (элемент освобожден)
int queue_data_put_first(struct ll_queue* q, struct ll_entry* entry, uint32_t id);
// Извлечь элемент из начала очереди
// При извлечении приостанавливает коллбэки (callback_suspended = 1) чтобы предотвратить рекурсию
// Возвращает: указатель на структуру элемента (struct ll_entry*) или NULL если очередь пуста
// ПРИМЕЧАНИЕ: не освобождает память элемента
struct ll_entry* queue_data_get(struct ll_queue* q);
// Получить текущее количество элементов в очереди
int queue_entry_count(struct ll_queue* q);
// ==================== Поиск и удаление по ID ====================
// Найти элемент по ID
// Возвращает: указатель на структуру элемента (struct ll_entry*) или NULL если не найден
struct ll_entry* queue_find_data_by_id(struct ll_queue* q, uint32_t id);
// Удалить элемент из очереди по указателю на структуру элемента
// Возвращает: 0 при успехе, -1 если элемент не найден
// ПРИМЕЧАНИЕ: НЕ изменяет ref_count элемента, просто удаляет из очереди
int queue_remove_data(struct ll_queue* q, struct ll_entry* entry);
#endif // LL_QUEUE_H

118
src/etcp.c

@ -36,9 +36,10 @@ static void input_send_q_cb(struct ll_queue* q, void* arg);
static void wait_ack_cb(struct ll_queue* q, void* arg);
static void etcp_conn_process_send_queue(struct ETCP_CONN* etcp);
// Get current time in 0.1ms units
uint64_t get_current_time_units() {
struct timeval tv;
// Get current time in 0.1ms units
uint64_t get_current_time_units() {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
struct timeval tv;
gettimeofday(&tv, NULL);
uint64_t time_units = ((uint64_t)tv.tv_sec * 10000ULL) + (tv.tv_usec / 100);
// DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "get_current_time_units: tv_sec=%ld, tv_usec=%ld, result=%llu",
@ -46,24 +47,26 @@ uint64_t get_current_time_units() {
return time_units;
}
uint16_t get_current_timestamp() {
uint16_t timestamp = (uint16_t)(get_current_time_units() & 0xFFFF);
uint16_t get_current_timestamp() {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
uint16_t timestamp = (uint16_t)(get_current_time_units() & 0xFFFF);
// DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "get_current_timestamp: result=%u", timestamp);
return timestamp;
}
// Timestamp diff (with wrap-around)
static uint16_t timestamp_diff(uint16_t t1, uint16_t t2) {
// DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "timestamp_diff: t1=%u, t2=%u", t1, t2);
if (t1 >= t2) {
// Timestamp diff (with wrap-around)
static uint16_t timestamp_diff(uint16_t t1, uint16_t t2) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
if (t1 >= t2) {
return t1 - t2;
}
return (UINT16_MAX - t2) + t1 + 1;
}
// Create new ETCP connection
struct ETCP_CONN* etcp_connection_create(struct UTUN_INSTANCE* instance) {
if (!instance) return NULL;
// Create new ETCP connection
struct ETCP_CONN* etcp_connection_create(struct UTUN_INSTANCE* instance) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
if (!instance) return NULL;
struct ETCP_CONN* etcp = calloc(1, sizeof(struct ETCP_CONN));
@ -122,9 +125,10 @@ struct ETCP_CONN* etcp_connection_create(struct UTUN_INSTANCE* instance) {
return etcp;
}
// Close connection with NULL pointer safety (prevents double free)
void etcp_connection_close(struct ETCP_CONN* etcp) {
if (!etcp) return;
// Close connection with NULL pointer safety (prevents double free)
void etcp_connection_close(struct ETCP_CONN* etcp) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
if (!etcp) return;
// Drain and free input_queue (contains ETCP_FRAGMENT with pkt_data from data_pool)
if (etcp->input_queue) {
@ -232,6 +236,7 @@ void etcp_connection_close(struct ETCP_CONN* etcp) {
// Reset connection (stub)
void etcp_conn_reset(struct ETCP_CONN* etcp) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
// Reset IDs, queues, etc. as per protocol.txt
etcp->next_tx_id = 1;
etcp->last_rx_id = 0;
@ -241,6 +246,7 @@ void etcp_conn_reset(struct ETCP_CONN* etcp) {
// Update log_name when peer_node_id becomes known
void etcp_update_log_name(struct ETCP_CONN* etcp) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
if (!etcp || !etcp->instance) return;
uint64_t local_id = etcp->instance->node_id % 10000;
uint64_t peer_id = etcp->peer_node_id % 10000;
@ -251,11 +257,11 @@ void etcp_update_log_name(struct ETCP_CONN* etcp) {
// ====================================================================== Отправка данных
// Send data through ETCP connection
// Allocates memory from data_pool and places in input queue
// Returns: 0 on success, -1 on failure
int etcp_send(struct ETCP_CONN* etcp, const void* data, uint16_t len) {
// DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "etcp_send: ENTER etcp=%p, data=%p, len=%zu", etcp, data, len);
// Send data through ETCP connection
// Allocates memory from data_pool and places in input queue
// Returns: 0 on success, -1 on failure
int etcp_send(struct ETCP_CONN* etcp, const void* data, uint16_t len) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
if (!etcp || !data || len == 0) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] invalid parameters (etcp=%p, data=%p, len=%zu)", etcp->log_name, etcp, data, len);
@ -314,7 +320,7 @@ int etcp_send(struct ETCP_CONN* etcp, const void* data, uint16_t len) {
static void input_queue_try_resume(struct ETCP_CONN* etcp) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "[%s] ENTER etcp=%p", etcp->log_name, etcp);
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
// если размер input_wait_ack+input_send_q в байтах < optimal_inflight то resume сейчас.
size_t wait_ack_bytes = queue_total_bytes(etcp->input_wait_ack);
@ -330,6 +336,7 @@ static void input_queue_try_resume(struct ETCP_CONN* etcp) {
}
void etcp_stats(struct ETCP_CONN* etcp) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
if (!etcp) return;
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] stats for conn=%p:", etcp->log_name, etcp);
@ -371,10 +378,10 @@ void etcp_stats(struct ETCP_CONN* etcp) {
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] rx_ack_till: %u", etcp->log_name, etcp->rx_ack_till);
}
// Input callback for input_queue (добавление новых кодограмм в стек)
// input_queue -> input_send_q
static void input_queue_cb(struct ll_queue* q, void* arg) {
// Input callback for input_queue (добавление новых кодограмм в стек)
// input_queue -> input_send_q
static void input_queue_cb(struct ll_queue* q, void* arg) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg;
struct ETCP_FRAGMENT* in_pkt = (struct ETCP_FRAGMENT*)queue_data_get(q);
@ -423,8 +430,9 @@ static void input_queue_cb(struct ll_queue* q, void* arg) {
}
static void input_send_q_cb(struct ll_queue* q, void* arg) {// etcp->input_send_q processing
struct ETCP_CONN* etcp=(struct ETCP_CONN*)arg;
static void input_send_q_cb(struct ll_queue* q, void* arg) {// etcp->input_send_q processing
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
struct ETCP_CONN* etcp=(struct ETCP_CONN*)arg;
// size_t send_q_bytes = queue_total_bytes(etcp->input_send_q);
// size_t send_q_pkts = queue_entry_count(etcp->input_send_q);
@ -433,9 +441,10 @@ static void input_send_q_cb(struct ll_queue* q, void* arg) {// etcp->input_send_
}
static void ack_timeout_check(void* arg) {
struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg;
uint64_t now = get_current_time_units();
static void ack_timeout_check(void* arg) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg;
uint64_t now = get_current_time_units();
uint64_t timeout = 1000;//(uint64_t)(etcp->rtt_avg_10 * RETRANS_K1) + (uint64_t)(etcp->jitter * RETRANS_K2);
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "ack_timeout_check: starting check, now=%llu, timeout=%llu, rtt_avg_10=%u, jitter=%u",
@ -495,15 +504,16 @@ static void ack_timeout_check(void* arg) {
queue_resume_callback(etcp->input_wait_ack);
}
static void wait_ack_cb(struct ll_queue* q, void* arg) {
struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg;
ack_timeout_check(etcp);// ack_cb срабатывает когда init (таймер не инициализирован) или когда empty (таймер не активен)
static void wait_ack_cb(struct ll_queue* q, void* arg) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg;
ack_timeout_check(etcp);// ack_cb срабатывает когда init (таймер не инициализирован) или когда empty (таймер не активен)
}
// Подготовить и отправить кодограмму
// вызывается линком когда освобождается или очередью если появляются данные на передачу
struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) {
// Подготовить и отправить кодограмму
// вызывается линком когда освобождается или очередью если появляются данные на передачу
struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
struct ETCP_LINK* link = etcp_loadbalancer_select_link(etcp);
if (!link) {
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] no link available", etcp->log_name);
@ -603,24 +613,27 @@ struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) {
return dgram;
}
// Callback for when a link is ready to send data
static void etcp_link_ready_callback(struct ETCP_CONN* etcp) {
if (!etcp) return;
// Callback for when a link is ready to send data
static void etcp_link_ready_callback(struct ETCP_CONN* etcp) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
if (!etcp) return;
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_link_ready_callback: processing send queue for etcp=%p", etcp);
etcp_conn_process_send_queue(etcp);
}
// Process packets in send queue and transmit them
static void etcp_conn_process_send_queue(struct ETCP_CONN* etcp) {
struct ETCP_DGRAM* dgram;
// Process packets in send queue and transmit them
static void etcp_conn_process_send_queue(struct ETCP_CONN* etcp) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
struct ETCP_DGRAM* dgram;
while(dgram = etcp_request_pkt(etcp)) {
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_conn_process_send_queue: sending packet");
etcp_loadbalancer_send(dgram);
}
}
static void ack_response_timer_cb(void* arg) {// проверяем неотправленные ack response и отправляем если надо.
struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg;
static void ack_response_timer_cb(void* arg) {// проверяем неотправленные ack response и отправляем если надо.
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg;
etcp_conn_process_send_queue(etcp);// проталкиваем (она же должна отправлять только ack если больше ничего нет)
// если ack все еще заняты - обновляем таймаут
if (etcp->ack_q->count) etcp->ack_resp_timer = uasync_set_timeout(etcp->instance->ua, ACK_DELAY_TB, etcp, ack_response_timer_cb);
@ -631,6 +644,7 @@ static void ack_response_timer_cb(void* arg) {// проверяем неотпр
void etcp_output_try_assembly(struct ETCP_CONN* etcp) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
// пробуем собрать выходную очередь из фрагментов
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] etcp=%p, last_delivered_id=%u, recv_q_count=%d",
// etcp->log_name, etcp, etcp->last_delivered_id, queue_entry_count(etcp->recv_q));
@ -678,9 +692,10 @@ void etcp_output_try_assembly(struct ETCP_CONN* etcp) {
etcp->log_name, delivered_count, delivered_bytes, etcp->last_delivered_id, queue_entry_count(etcp->output_queue));
}
// Process ACK receipt - remove acknowledged packet from inflight queues
void etcp_ack_recv(struct ETCP_CONN* etcp, uint32_t seq, uint16_t ts, uint16_t dts) {
if (!etcp) return;
// Process ACK receipt - remove acknowledged packet from inflight queues
void etcp_ack_recv(struct ETCP_CONN* etcp, uint32_t seq, uint16_t ts, uint16_t dts) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
if (!etcp) return;
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] processing ACK for seq=%u, ts=%u, dts=%u", etcp->log_name, seq, ts, dts);
@ -747,9 +762,10 @@ void etcp_ack_recv(struct ETCP_CONN* etcp, uint32_t seq, uint16_t ts, uint16_t d
}
// Process incoming decrypted packet
void etcp_conn_input(struct ETCP_DGRAM* pkt) {
if (!pkt || !pkt->data_len) return;
// Process incoming decrypted packet
void etcp_conn_input(struct ETCP_DGRAM* pkt) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
if (!pkt || !pkt->data_len) return;
struct ETCP_CONN* etcp = pkt->link->etcp;
uint8_t* data = pkt->data;

22
src/etcp_connections.c

@ -24,6 +24,7 @@ static void etcp_link_remove_from_connections(struct ETCP_SOCKET* conn, struct E
// Unified packet dump function - uses configured format
static void packet_dump(const char* prefix, const uint8_t* data, size_t len, struct ETCP_LINK* link) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!data || len == 0) return;
// Full multi-line format (legacy)
@ -39,6 +40,7 @@ static void etcp_link_init_timer_cbk(void* arg);
#define INIT_TIMEOUT_MAX 50000
static void etcp_link_send_init(struct ETCP_LINK* link) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
DEBUG_INFO(DEBUG_CATEGORY_CONNECTION, "etcp_link_send_init link=%p, is_server=%d", link, link ? link->is_server : -1);
if (!link || !link->etcp || !link->etcp->instance) return;
@ -104,6 +106,7 @@ static void etcp_link_send_init(struct ETCP_LINK* link) {
}
static void etcp_link_init_timer_cbk(void* arg) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
struct ETCP_LINK* link = (struct ETCP_LINK*)arg;
if (!link || link->initialized || link->is_server != 0) return;
@ -112,6 +115,7 @@ static void etcp_link_init_timer_cbk(void* arg) {
}
static int etcp_link_send_reset(struct ETCP_LINK* link) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!link) return -1;
struct ETCP_DGRAM* dgram = malloc(sizeof(struct ETCP_DGRAM) + 1);
@ -132,12 +136,14 @@ static int etcp_link_send_reset(struct ETCP_LINK* link) {
}
static uint32_t sockaddr_hash(struct sockaddr_storage* addr) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
socklen_t addr_len = (addr->ss_family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6);
return crc32_calc((void*)addr, addr_len);
}
// Бинарный поиск линка по ip_port_hash
static int find_link_index(struct ETCP_SOCKET* e_sock, uint32_t hash) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!e_sock || e_sock->num_channels == 0) return -1;
int left = 0;
@ -159,6 +165,7 @@ static int find_link_index(struct ETCP_SOCKET* e_sock, uint32_t hash) {
// Реалокация массива линков с увеличением в 2 раза
static int realloc_links(struct ETCP_SOCKET* e_sock) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
size_t new_max = e_sock->max_channels == 0 ? 8 : e_sock->max_channels * 2;
struct ETCP_LINK** new_links = realloc(e_sock->links, new_max * sizeof(struct ETCP_LINK*));
if (!new_links) {
@ -173,6 +180,7 @@ static int realloc_links(struct ETCP_SOCKET* e_sock) {
// Вставка линка в отсортированный массив
static int insert_link(struct ETCP_SOCKET* e_sock, struct ETCP_LINK* link) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!e_sock || !link) return -1;
if (e_sock->num_channels >= e_sock->max_channels) {
@ -196,6 +204,7 @@ static int insert_link(struct ETCP_SOCKET* e_sock, struct ETCP_LINK* link) {
// Удаление линка из массива
static void remove_link(struct ETCP_SOCKET* e_sock, uint32_t hash) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!e_sock || e_sock->num_channels == 0) return;
int idx = find_link_index(e_sock, hash);
@ -211,6 +220,7 @@ static void remove_link(struct ETCP_SOCKET* e_sock, uint32_t hash) {
// надо править, используй sockaddr_hash
struct ETCP_LINK* etcp_link_find_by_addr(struct ETCP_SOCKET* e_sock, struct sockaddr_storage* addr) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!e_sock || !addr) return NULL;
int idx = find_link_index(e_sock, sockaddr_hash(addr));
@ -220,6 +230,7 @@ struct ETCP_LINK* etcp_link_find_by_addr(struct ETCP_SOCKET* e_sock, struct sock
}
int etcp_find_free_local_link_id(struct ETCP_CONN* etcp) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!etcp) return -1;
// Битовый массив для 256 id (32 байта * 8 бит = 256)
@ -254,6 +265,7 @@ int etcp_find_free_local_link_id(struct ETCP_CONN* etcp) {
// ===============================
struct ETCP_SOCKET* etcp_socket_add(struct UTUN_INSTANCE* instance, struct sockaddr_storage* ip, uint32_t netif_index, int so_mark, uint8_t type) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!instance) return NULL;
struct ETCP_SOCKET* e_sock = calloc(1, sizeof(struct ETCP_SOCKET));
@ -368,6 +380,7 @@ struct ETCP_SOCKET* etcp_socket_add(struct UTUN_INSTANCE* instance, struct socka
}
void etcp_socket_remove(struct ETCP_SOCKET* conn) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!conn) return;
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "[ETCP] Removing socket %p, fd=%d, socket_id=%p", conn, conn->fd, conn->socket_id);
@ -395,6 +408,7 @@ void etcp_socket_remove(struct ETCP_SOCKET* conn) {
struct ETCP_LINK* etcp_link_new(struct ETCP_CONN* etcp, struct ETCP_SOCKET* conn, struct sockaddr_storage* remote_addr, uint8_t is_server) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!remote_addr) return NULL;
struct ETCP_LINK* link = calloc(1, sizeof(struct ETCP_LINK));
@ -442,6 +456,7 @@ struct ETCP_LINK* etcp_link_new(struct ETCP_CONN* etcp, struct ETCP_SOCKET* conn
}
void etcp_link_close(struct ETCP_LINK* link) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!link || !link->etcp) return;
// Cancel init timer if active
@ -466,6 +481,7 @@ void etcp_link_close(struct ETCP_LINK* link) {
}
int etcp_encrypt_send(struct ETCP_DGRAM* dgram) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_encrypt_send called, link=%p", dgram ? dgram->link : NULL);
// printf("[ETCP DEBUG] etcp_encrypt_send: ENTERING FUNCTION\n");
@ -479,7 +495,9 @@ int etcp_encrypt_send(struct ETCP_DGRAM* dgram) {
// DUMP: Show packet before encryption
// log_dump("ECTP_ENCRYPT_SEND", dgram->data, dgram->data_len);
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Encrypt start");
sc_encrypt(sc, (uint8_t*)&dgram->timestamp/*не править это, тут верно!*/, sizeof(uint16_t) + len, enc_buf, &enc_buf_len);
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Encrypt end");
if (enc_buf_len == 0) {
DEBUG_ERROR(DEBUG_CATEGORY_CRYPTO, "etcp_encrypt_send: encryption failed for node %llu", (unsigned long long)dgram->link->etcp->instance->node_id);
dgram->link->send_errors++;
@ -518,6 +536,7 @@ es_err:
}
static void etcp_connections_read_callback(int fd, void* arg) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_connections_read_callback fd=%d, socket=%p", fd, arg);
// !!!!!! DANGER: в этой функции ПРЕДЕЛЬНАЯ АККУРАТНОСТЬ. Если кажется что не туда указатель то невнимательно аланизировал !!!!!
// НЕ РУИНИТЬ (uint8_t*)&pkt->timestamp - это правильно !!!!
@ -652,6 +671,7 @@ static void etcp_connections_read_callback(int fd, void* arg) {
return;
}
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Decrypt start");
if (sc_decrypt(&link->etcp->crypto_ctx, data, recv_len, (uint8_t*)&pkt->timestamp, &pkt_len)) {
DEBUG_ERROR(DEBUG_CATEGORY_CRYPTO, "etcp_connections_read_callback: failed to decrypt packet from node %llu len=%d", (unsigned long long)link->etcp->instance->node_id, recv_len);
// log_dump("my_privkey",&link->etcp->crypto_ctx.pk->private_key, SC_PRIVKEY_SIZE);
@ -660,6 +680,7 @@ static void etcp_connections_read_callback(int fd, void* arg) {
errorcode=6;
goto ec_fr;
}
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Decrypt end");
if (pkt_len<3) { errorcode=46; DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "etcp_connections_read_callback: decrypted packet too small, size=%zu", pkt_len); goto ec_fr; }
pkt->data_len=pkt_len-2;
pkt->noencrypt_len=0;
@ -718,6 +739,7 @@ ec_fr:
}
int init_connections(struct UTUN_INSTANCE* instance) {
DEBUG_TRACE(DEBUG_CATEGORY_CONNECTION, "");
if (!instance || !instance->config) return -1;
struct utun_config* config = instance->config;

8
src/utun.c

@ -326,7 +326,13 @@ int main(int argc, char *argv[]) {
utun_instance_unregister_sockets(instance);
utun_instance_destroy(instance);
// Destroy uasync instance after instance is destroyed
if (ua) {
uasync_destroy(ua, 0);
}
remove_pidfile(args.pid_file);
return 0;
}

9
src/utun_instance.c

@ -227,12 +227,9 @@ void utun_instance_destroy(struct UTUN_INSTANCE *instance) {
instance->data_pool = NULL;
}
// FINALLY destroy uasync (after all resources are cleaned up)
if (instance->ua) {
DEBUG_INFO(DEBUG_CATEGORY_MEMORY, "[INSTANCE_DESTROY] Destroying uasync instance");
uasync_destroy(instance->ua, 0);
instance->ua = NULL;
}
// Note: uasync is NOT destroyed here - caller must destroy it separately
// This allows sharing uasync between multiple instances
instance->ua = NULL;
// Clear global instance
if (g_instance == instance) {

BIN
tests/test_etcp_100_packets

Binary file not shown.

48
tests/test_etcp_100_packets.c

@ -22,6 +22,7 @@
static struct UTUN_INSTANCE* server_instance = NULL;
static struct UTUN_INSTANCE* client_instance = NULL;
static struct UASYNC* ua = NULL;
static int test_completed = 0;
static void* packet_timeout_id = NULL;
@ -265,7 +266,7 @@ static void monitor_and_send(void* arg) {
printf("Total time: %.2f ms\n", duration_total);
if (packet_timeout_id) {
uasync_cancel_timeout(server_instance->ua, packet_timeout_id);
uasync_cancel_timeout(ua, packet_timeout_id);
packet_timeout_id = NULL;
}
return;
@ -273,7 +274,7 @@ static void monitor_and_send(void* arg) {
}
if (!test_completed) {
packet_timeout_id = uasync_set_timeout(server_instance->ua, 10, NULL, monitor_and_send);
packet_timeout_id = uasync_set_timeout(ua, 10, NULL, monitor_and_send);
}
}
@ -288,7 +289,7 @@ static void test_timeout(void* arg) {
packets_sent_back, TOTAL_PACKETS, packets_received_back, TOTAL_PACKETS);
test_completed = 2;
if (packet_timeout_id) {
uasync_cancel_timeout(server_instance->ua, packet_timeout_id);
uasync_cancel_timeout(ua, packet_timeout_id);
packet_timeout_id = NULL;
}
}
@ -307,39 +308,34 @@ int main() {
utun_instance_set_tun_init_enabled(0);
printf("Creating server...\n");
struct UASYNC* server_ua = uasync_create();
server_instance = utun_instance_create(server_ua, "test_server.conf");
ua = uasync_create();
server_instance = utun_instance_create(ua, "test_server.conf");
if (!server_instance || init_connections(server_instance) < 0) {
printf("Failed to create server\n");
return 1;
}
printf("✅ Server ready\n\n");
printf("Creating client...\n");
struct UASYNC* client_ua = uasync_create();
client_instance = utun_instance_create(client_ua, "test_client.conf");
client_instance = utun_instance_create(ua, "test_client.conf");
if (!client_instance || init_connections(client_instance) < 0) {
printf("Failed to create client\n");
return 1;
}
printf("✅ Client ready\n\n");
printf("Sending %d packets in each direction (max queue size: %d)...\n", TOTAL_PACKETS, MAX_QUEUE_SIZE);
packet_timeout_id = uasync_set_timeout(server_ua, 500, NULL, monitor_and_send);
void* global_timeout_id = uasync_set_timeout(server_ua, TEST_TIMEOUT_MS, NULL, test_timeout);
int elapsed = 0;
while (!test_completed && elapsed < TEST_TIMEOUT_MS + 1000) {
if (server_ua) uasync_poll(server_ua, 5);
if (client_ua) uasync_poll(client_ua, 5);
usleep(5000);
elapsed += 5;
packet_timeout_id = uasync_set_timeout(ua, 500, NULL, monitor_and_send);
void* global_timeout_id = uasync_set_timeout(ua, TEST_TIMEOUT_MS, NULL, test_timeout);
while (!test_completed) {
uasync_poll(ua, 100);
}
printf("\nCleaning up...\n");
if (packet_timeout_id) uasync_cancel_timeout(server_ua, packet_timeout_id);
if (global_timeout_id) uasync_cancel_timeout(server_ua, global_timeout_id);
if (packet_timeout_id) uasync_cancel_timeout(ua, packet_timeout_id);
if (global_timeout_id) uasync_cancel_timeout(ua, global_timeout_id);
if (server_instance) {
server_instance->running = 0;
@ -349,7 +345,13 @@ int main() {
client_instance->running = 0;
utun_instance_destroy(client_instance);
}
// Destroy shared uasync instance after both instances are destroyed
if (ua) {
uasync_destroy(ua, 0);
ua = NULL;
}
if (test_completed == 1) {
printf("\n=== TEST PASSED ===\n");
printf("✅ All %d packets transmitted in each direction\n", TOTAL_PACKETS);

BIN
tests/test_etcp_minimal

Binary file not shown.

BIN
tests/test_etcp_simple_traffic

Binary file not shown.

BIN
tests/test_etcp_two_instances

Binary file not shown.

6
tests/test_etcp_two_instances.c

@ -264,9 +264,9 @@ int main() {
utun_instance_destroy(client_instance);
}
// Cleanup uasync objects - НЕ НУЖНО, так как utun_instance_destroy уже вызывает uasync_destroy
// if (server_ua) uasync_destroy(server_ua);
// if (client_ua) uasync_destroy(client_ua);
// Cleanup uasync objects - now required since utun_instance_destroy no longer calls uasync_destroy
if (server_ua) uasync_destroy(server_ua, 0);
if (client_ua) uasync_destroy(client_ua, 0);
if (test_completed == 1) {
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "\n=== TEST PASSED ===\n");

BIN
tests/test_intensive_memory_pool

Binary file not shown.

BIN
tests/test_ll_queue

Binary file not shown.

BIN
tests/test_memory_pool_and_config

Binary file not shown.

BIN
tests/test_pkt_normalizer_etcp

Binary file not shown.

47
tests/test_pkt_normalizer_etcp.c

@ -16,7 +16,7 @@
#include "../lib/ll_queue.h"
#include "../lib/debug_config.h"
#define TEST_TIMEOUT_MS 5000 // 5 second timeout
#define TEST_TIMEOUT_MS 3000 // 3 second timeout
#define TOTAL_PACKETS 100 // Total packets to send
//#define MAX_QUEUE_SIZE 5 // Max packets in input queue
#define MIN_PACKET_SIZE 10 // Minimum packet size
@ -29,6 +29,7 @@ static struct UTUN_INSTANCE* server_instance = NULL;
static struct UTUN_INSTANCE* client_instance = NULL;
static struct PKTNORM* server_pn = NULL;
static struct PKTNORM* client_pn = NULL;
static struct UASYNC* ua = NULL;
static int test_completed = 0;
static void* packet_timeout_id = NULL;
@ -385,7 +386,7 @@ static void monitor_and_send(void* arg) {
printf("Total time: %.2f ms\n", duration_total);
if (packet_timeout_id) {
uasync_cancel_timeout(server_instance->ua, packet_timeout_id);
uasync_cancel_timeout(ua, packet_timeout_id);
packet_timeout_id = NULL;
}
return;
@ -393,7 +394,7 @@ static void monitor_and_send(void* arg) {
}
if (!test_completed) {
packet_timeout_id = uasync_set_timeout(server_instance->ua, 10, NULL, monitor_and_send);
packet_timeout_id = uasync_set_timeout(ua, 10, NULL, monitor_and_send);
}
}
@ -430,45 +431,41 @@ int main() {
total_bytes, (float)total_bytes / TOTAL_PACKETS / 1024);
debug_config_init();
debug_set_level(DEBUG_LEVEL_DEBUG);
// debug_set_level(DEBUG_LEVEL_DEBUG);
debug_set_level(DEBUG_LEVEL_TRACE);
debug_set_categories(DEBUG_CATEGORY_ALL);
utun_instance_set_tun_init_enabled(0);
printf("Creating server...\n");
struct UASYNC* server_ua = uasync_create();
server_instance = utun_instance_create(server_ua, "test_server.conf");
ua = uasync_create();
server_instance = utun_instance_create(ua, "test_server.conf");
if (!server_instance || init_connections(server_instance) < 0) {
printf("Failed to create server\n");
return 1;
}
printf("Server created, waiting for connection...\n\n");
printf("Creating client...\n");
struct UASYNC* client_ua = uasync_create();
client_instance = utun_instance_create(client_ua, "test_client.conf");
client_instance = utun_instance_create(ua, "test_client.conf");
if (!client_instance || init_connections(client_instance) < 0) {
printf("Failed to create client\n");
return 1;
}
printf("Client created\n\n");
printf("Sending %d packets in each direction via normalizer...\n", TOTAL_PACKETS);
packet_timeout_id = uasync_set_timeout(server_ua, 500, NULL, monitor_and_send);
void* global_timeout_id = uasync_set_timeout(server_ua, TEST_TIMEOUT_MS, NULL, test_timeout);
int elapsed = 0;
while (!test_completed && elapsed < TEST_TIMEOUT_MS + 1000) {
if (server_ua) uasync_poll(server_ua, 5);
if (client_ua) uasync_poll(client_ua, 5);
usleep(5000);
elapsed += 5;
packet_timeout_id = uasync_set_timeout(ua, 500, NULL, monitor_and_send);
void* global_timeout_id = uasync_set_timeout(ua, TEST_TIMEOUT_MS*10, NULL, test_timeout);
while (!test_completed) {
uasync_poll(ua, 100); // 100ms timeout to allow timer processing
}
printf("\nCleaning up...\n");
if (packet_timeout_id) uasync_cancel_timeout(server_ua, packet_timeout_id);
if (global_timeout_id) uasync_cancel_timeout(server_ua, global_timeout_id);
if (packet_timeout_id) uasync_cancel_timeout(ua, packet_timeout_id);
if (global_timeout_id) uasync_cancel_timeout(ua, global_timeout_id);
if (server_pn) {
pn_pair_deinit(server_pn);
@ -485,7 +482,13 @@ int main() {
client_instance->running = 0;
utun_instance_destroy(client_instance);
}
// Destroy shared uasync instance after both instances are destroyed
if (ua) {
uasync_destroy(ua, 0);
ua = NULL;
}
if (test_completed == 1) {
printf("\n=== TEST PASSED ===\n");
printf("All %d packets transmitted in each direction via normalizer\n", TOTAL_PACKETS);

BIN
tests/test_pkt_normalizer_standalone

Binary file not shown.

20
tests/test_pkt_normalizer_standalone.c

@ -299,20 +299,22 @@ int main() {
// Poll uasync to process the flush timer
for (int i = 0; i < 20; i++) {
uasync_poll(mock_instance.ua, 5);
usleep(5000);
}
// Main loop
int elapsed = 0;
while (!test_completed && elapsed < TEST_TIMEOUT_MS + 1000) {
uasync_poll(mock_instance.ua, 5);
usleep(5000);
elapsed += 5;
// Main loop - event driven without usleep
while (!test_completed) {
uasync_poll(mock_instance.ua, 100);
}
printf("\nCleaning up...\n");
cleanup();
// Destroy uasync instance after cleanup
if (mock_instance.ua) {
uasync_destroy(mock_instance.ua, 0);
mock_instance.ua = NULL;
}
if (test_completed == 1) {
printf("\n=== TEST PASSED ===\n");
return 0;

Loading…
Cancel
Save