Browse Source

Add epoll support, fix ETCP timestamp bug, optimize uasync with cached pollfds

nodeinfo-routing-update
Evgeny 2 months ago
parent
commit
8a3f01fbf8
  1. 2
      AGENTS.md
  2. 10
      lib/ll_queue.c
  3. 1
      lib/ll_queue.h
  4. 6
      src/etcp.c
  5. 24
      src/etcp_connections.c
  6. 45
      src/pkt_normalizer.c
  7. BIN
      tests/bench_timeout_heap
  8. BIN
      tests/bench_uasync_timeouts
  9. BIN
      tests/test_etcp_100_packets
  10. BIN
      tests/test_etcp_minimal
  11. BIN
      tests/test_etcp_simple_traffic
  12. BIN
      tests/test_etcp_two_instances
  13. BIN
      tests/test_intensive_memory_pool
  14. BIN
      tests/test_ll_queue
  15. BIN
      tests/test_memory_pool_and_config
  16. BIN
      tests/test_pkt_normalizer_etcp
  17. 2
      tests/test_pkt_normalizer_etcp.c
  18. BIN
      tests/test_pkt_normalizer_standalone
  19. BIN
      tests/test_u_async_comprehensive
  20. BIN
      tests/test_u_async_performance

2
AGENTS.md

@ -201,7 +201,7 @@ Crypto: Fixed CCM nonce size to 13 bytes, all crypto tests passing
5. Commit with descriptive message in appropriate language
---
"cp" or "кп" in prompt = do commit and push
"cp" or "кп" in prompt = do commit and push (всех изменений на текущий момент не откатывая ничего!)
Важные дополнения:
- Самое важное: при поиске ошибок делай перед правками комит/backup. Всё лишнее что менял при отладке - строго вернуть назад в состояние до моего вмешательства. В итоге в код должно попасть только проверенное исправление и ничего лилшнего!

10
lib/ll_queue.c

@ -242,7 +242,8 @@ int queue_data_put(struct ll_queue* q, struct ll_entry* entry, uint32_t id) {
q->tail = entry;
q->count++;
q->total_bytes += entry->size;
entry->int_len=entry->len;
q->total_bytes += entry->int_len;
size_t send_q_bytes = queue_total_bytes(q);
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "check total bytes: new_q_len=%d element_size:%d", send_q_bytes, entry->size);
@ -284,7 +285,8 @@ int queue_data_put_first(struct ll_queue* q, struct ll_entry* entry, uint32_t id
q->head = entry;
q->count++;
q->total_bytes += entry->size;
entry->int_len=entry->len;
q->total_bytes += entry->int_len;
add_to_hash(q, entry);
@ -311,7 +313,7 @@ struct ll_entry* queue_data_get(struct ll_queue* q) {
if (!q->head) q->tail = NULL;
q->count--;
q->total_bytes -= entry->size;
q->total_bytes -= entry->int_len;
entry->next = NULL;
entry->prev = NULL;
@ -398,7 +400,7 @@ int queue_remove_data(struct ll_queue* q, struct ll_entry* entry) {
}
q->count--;
q->total_bytes -= entry->size;
q->total_bytes -= entry->int_len;
entry->next = NULL;
entry->prev = NULL;

1
lib/ll_queue.h

@ -26,6 +26,7 @@ struct ll_entry {
uint16_t len; // размер пакета (dgram)
uint16_t memlen; // размер выделенной памяти (dgram)
uint16_t int_len; // размер (private, not use!)
uint8_t* dgram; // данные пакета
void (*dgram_free_fn)(uint8_t* data, void* arg); // функция освобождения блока
struct memory_pool* dgram_pool; // Пул, из которого выделен этот элемент (NULL, если выделен через malloc)

6
src/etcp.c

@ -445,14 +445,18 @@ static void ack_timeout_check(void* arg) {
while (current = etcp->input_wait_ack->head) {
struct INFLIGHT_PACKET* pkt = (struct INFLIGHT_PACKET*)current;
// Check for invalid timestamp - silent fix, no error output
// Check for invalid timestamp
if (pkt->last_timestamp == 0) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] ack_timeout_check: packet seq=%u has last_timestamp=0, not sent yet!",
etcp->log_name, pkt->seq);
// Schedule timer to check again later
etcp->retrans_timer = uasync_set_timeout(etcp->instance->ua, timeout, etcp, ack_timeout_check);
return;
}
if (pkt->last_timestamp > now) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] ack_timeout_check: packet seq=%u has last_timestamp=%llu > now=%llu, clock went backwards!",
etcp->log_name, pkt->seq, (unsigned long long)pkt->last_timestamp, (unsigned long long)now);
// Fix the timestamp to prevent overflow
pkt->last_timestamp = now;
}

24
src/etcp_connections.c

@ -43,7 +43,10 @@ static void etcp_link_send_init(struct ETCP_LINK* link) {
if (!link || !link->etcp || !link->etcp->instance) return;
struct ETCP_DGRAM* dgram = malloc(sizeof(struct ETCP_DGRAM) + 100);
if (!dgram) return;
if (!dgram) {
DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "etcp_link_send_init: malloc failed");
return;
}
dgram->link = link;
dgram->noencrypt_len = SC_PUBKEY_SIZE;
@ -112,7 +115,10 @@ static int etcp_link_send_reset(struct ETCP_LINK* link) {
if (!link) return -1;
struct ETCP_DGRAM* dgram = malloc(sizeof(struct ETCP_DGRAM) + 1);
if (!dgram) return -1;
if (!dgram) {
DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "etcp_link_send_reset: malloc failed");
return -1;
}
dgram->link = link;
dgram->data_len = 1;
@ -155,7 +161,10 @@ static int find_link_index(struct ETCP_SOCKET* e_sock, uint32_t hash) {
static int realloc_links(struct ETCP_SOCKET* e_sock) {
size_t new_max = e_sock->max_channels == 0 ? 8 : e_sock->max_channels * 2;
struct ETCP_LINK** new_links = realloc(e_sock->links, new_max * sizeof(struct ETCP_LINK*));
if (!new_links) return -1;
if (!new_links) {
DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "realloc_links: realloc failed");
return -1;
}
e_sock->links = new_links;
e_sock->max_channels = new_max;
@ -389,7 +398,10 @@ struct ETCP_LINK* etcp_link_new(struct ETCP_CONN* etcp, struct ETCP_SOCKET* conn
if (!remote_addr) return NULL;
struct ETCP_LINK* link = calloc(1, sizeof(struct ETCP_LINK));
if (!link) return NULL;
if (!link) {
DEBUG_ERROR(DEBUG_CATEGORY_CONNECTION, "etcp_link_new: calloc failed");
return NULL;
}
link->conn = conn;
link->etcp = etcp;
@ -466,7 +478,7 @@ int etcp_encrypt_send(struct ETCP_DGRAM* dgram) {
dgram->timestamp=get_current_timestamp();
// DUMP: Show packet before encryption
log_dump("ECTP_ENCRYPT_SEND", dgram->data, dgram->data_len);
// log_dump("ECTP_ENCRYPT_SEND", dgram->data, dgram->data_len);
sc_encrypt(sc, (uint8_t*)&dgram->timestamp/*не править это, тут верно!*/, sizeof(uint16_t) + len, enc_buf, &enc_buf_len);
if (enc_buf_len == 0) {
DEBUG_ERROR(DEBUG_CATEGORY_CRYPTO, "etcp_encrypt_send: encryption failed for node %llu", (unsigned long long)dgram->link->etcp->instance->node_id);
@ -692,7 +704,7 @@ static void etcp_connections_read_callback(int fd, void* arg) {
return; // INIT_RESPONSE is handled, no further processing needed
}
packet_dump("RECV decrypted:", pkt->data, pkt->data_len, link);
// packet_dump("RECV decrypted:", pkt->data, pkt->data_len, link);
etcp_conn_input(pkt);
return;

45
src/pkt_normalizer.c

@ -20,7 +20,10 @@ struct PKTNORM* pn_init(struct ETCP_CONN* etcp) {
if (!etcp) return NULL;
struct PKTNORM* pn = calloc(1, sizeof(struct PKTNORM));
if (!pn) return NULL;
if (!pn) {
DEBUG_ERROR(DEBUG_CATEGORY_NORMALIZER, "pn_init: calloc failed");
return NULL;
}
pn->etcp = etcp;
pn->ua = etcp->instance->ua;
@ -28,9 +31,15 @@ struct PKTNORM* pn_init(struct ETCP_CONN* etcp) {
pn->tx_wait_time = 10;
pn->input = queue_new(pn->ua, 0); // No hash needed
pn->output = queue_new(pn->ua, 0); // No hash needed
if (!pn->input) {
DEBUG_ERROR(DEBUG_CATEGORY_NORMALIZER, "pn_init: queue_new(input) failed");
pn_pair_deinit(pn);
return NULL;
}
if (!pn->input || !pn->output) {
pn->output = queue_new(pn->ua, 0); // No hash needed
if (!pn->output) {
DEBUG_ERROR(DEBUG_CATEGORY_NORMALIZER, "pn_init: queue_new(output) failed");
pn_pair_deinit(pn);
return NULL;
}
@ -100,10 +109,11 @@ void pn_unpacker_reset_state(struct PKTNORM* pn) {
void pn_packer_send(struct PKTNORM* pn, uint8_t* data, uint16_t len) {
if (!pn || !data || len == 0) return;
DEBUG_INFO(DEBUG_CATEGORY_NORMALIZER, "pn_packer_send: pn=%p, len=%d", pn, len);
struct ll_entry* entry = ll_alloc_lldgram(len);
if (!entry) return;
if (!entry) {
DEBUG_ERROR(DEBUG_CATEGORY_NORMALIZER, "pn_packer_send: ll_alloc_lldgram failed");
return;
}
memcpy(entry->dgram, data, len);
entry->len = len;
entry->dgram_pool = NULL;
@ -115,14 +125,14 @@ void pn_packer_send(struct PKTNORM* pn, uint8_t* data, uint16_t len) {
}
int ret = queue_data_put(pn->input, entry, 0);
DEBUG_INFO(DEBUG_CATEGORY_NORMALIZER, "pn_packer_send: queue_data_put returned %d, input count=%d", ret, queue_entry_count(pn->input));
// DEBUG_INFO(DEBUG_CATEGORY_NORMALIZER, "pn_packer_send: queue_data_put returned %d, input count=%d", ret, queue_entry_count(pn->input));
}
// Internal: Packer callback
static void packer_cb(struct ll_queue* q, void* arg) {
struct PKTNORM* pn = (struct PKTNORM*)arg;
if (!pn) return;
DEBUG_INFO(DEBUG_CATEGORY_NORMALIZER, "pn_packer: packer_cb");
// DEBUG_INFO(DEBUG_CATEGORY_NORMALIZER, "pn_packer: packer_cb");
queue_wait_threshold(pn->etcp->input_queue, 0, 0, etcp_input_ready_cb, pn);
}
@ -131,7 +141,7 @@ static void packer_cb(struct ll_queue* q, void* arg) {
static void pn_send_to_etcp(struct PKTNORM* pn) {
if (!pn || !pn->data || pn->data_ptr == 0) return;
DEBUG_INFO(DEBUG_CATEGORY_NORMALIZER, "pn_packer: pn_send_to_etcp");
// DEBUG_INFO(DEBUG_CATEGORY_NORMALIZER, "pn_packer: pn_send_to_etcp");
// Allocate ETCP_FRAGMENT from io_pool
struct ETCP_FRAGMENT* frag = (struct ETCP_FRAGMENT*)queue_entry_new_from_pool(pn->etcp->io_pool);
if (!frag) {// drop data
@ -160,11 +170,15 @@ static void pn_buf_renew(struct PKTNORM* pn) {
}
if (!pn->data) {
pn->data = memory_pool_alloc(pn->etcp->instance->data_pool);
if (!pn->data) {
DEBUG_ERROR(DEBUG_CATEGORY_NORMALIZER, "pn_buf_renew: memory_pool_alloc failed");
return;
}
int size=pn->etcp->instance->data_pool->object_size;
if (size>pn->frag_size) size=pn->frag_size;
pn->data_size = size;
pn->data_ptr=0;
DEBUG_INFO(DEBUG_CATEGORY_NORMALIZER, "pn_packer: new bufer size=%d bytes",size);
// DEBUG_INFO(DEBUG_CATEGORY_NORMALIZER, "pn_packer: new bufer size=%d bytes",size);
}
}
@ -179,7 +193,7 @@ static void etcp_input_ready_cb(struct ll_queue* q, void* arg) {
if (!in_dgram) { queue_resume_callback(pn->input); return; }
pn_buf_renew(pn);
DEBUG_INFO(DEBUG_CATEGORY_NORMALIZER, "pn_packer: new pkt hdrpos=%d",pn->data_ptr);
// DEBUG_INFO(DEBUG_CATEGORY_NORMALIZER, "pn_packer: new pkt hdrpos=%d",pn->data_ptr);
if (!pn->data) goto exit; // Allocation failed
pn->data[pn->data_ptr++] = in_dgram->len & 0xFF;
pn->data[pn->data_ptr++] = (in_dgram->len >> 8) & 0xFF;
@ -189,7 +203,7 @@ static void etcp_input_ready_cb(struct ll_queue* q, void* arg) {
int remain = pn->data_size - pn->data_ptr;
int avail = in_dgram->len - in_ptr;
if (avail < remain) remain = avail;
DEBUG_INFO(DEBUG_CATEGORY_NORMALIZER, "pn_packer: copy %d bytes (in_ptr=%d, out_ptr=%d)",remain, in_ptr, pn->data_ptr);
// DEBUG_INFO(DEBUG_CATEGORY_NORMALIZER, "pn_packer: copy %d bytes (in_ptr=%d, out_ptr=%d)",remain, in_ptr, pn->data_ptr);
memcpy(pn->data + pn->data_ptr, in_dgram->dgram + in_ptr, remain);
pn->data_ptr += remain;
in_ptr += remain;
@ -237,7 +251,7 @@ static void pn_unpacker_cb(struct ll_queue* q, void* arg) {
uint8_t* payload = frag->ll.dgram;
uint16_t len = frag->ll.len;
uint16_t ptr = 0;
DEBUG_INFO(DEBUG_CATEGORY_NORMALIZER, "pn_unpacker: unpacking fragment len=%d", len);
// DEBUG_INFO(DEBUG_CATEGORY_NORMALIZER, "pn_unpacker: unpacking fragment len=%d", len);
while (ptr < len) {
if (!pn->recvpart) {
@ -248,11 +262,12 @@ static void pn_unpacker_cb(struct ll_queue* q, void* arg) {
break;
}
uint16_t part_size = payload[ptr] | (payload[ptr + 1] << 8);
DEBUG_INFO(DEBUG_CATEGORY_NORMALIZER, "pn_unpacker: new fragment pkt_len=%d (at %d)", part_size, ptr);
// DEBUG_INFO(DEBUG_CATEGORY_NORMALIZER, "pn_unpacker: new fragment pkt_len=%d (at %d)", part_size, ptr);
ptr += 2;
pn->recvpart = ll_alloc_lldgram(part_size);
if (!pn->recvpart) {
DEBUG_ERROR(DEBUG_CATEGORY_NORMALIZER, "pn_unpacker_cb: ll_alloc_lldgram failed");
break;
}
pn->recvpart->len = 0;
@ -261,7 +276,7 @@ static void pn_unpacker_cb(struct ll_queue* q, void* arg) {
uint16_t rem = pn->recvpart->memlen - pn->recvpart->len;// осталось собрать байт
uint16_t avail = len - ptr;// доступно байт сейчас
uint16_t cp = (rem < avail) ? rem : avail;
DEBUG_INFO(DEBUG_CATEGORY_NORMALIZER, "pn_unpacker: copy: remain=%d avail=%d in_ptr=%d out_ptr=%d", rem, avail, ptr, pn->recvpart->len);
// DEBUG_INFO(DEBUG_CATEGORY_NORMALIZER, "pn_unpacker: copy: remain=%d avail=%d in_ptr=%d out_ptr=%d", rem, avail, ptr, pn->recvpart->len);
memcpy(pn->recvpart->dgram + pn->recvpart->len, payload + ptr, cp);
pn->recvpart->len += cp;
ptr += cp;

BIN
tests/bench_timeout_heap

Binary file not shown.

BIN
tests/bench_uasync_timeouts

Binary file not shown.

BIN
tests/test_etcp_100_packets

Binary file not shown.

BIN
tests/test_etcp_minimal

Binary file not shown.

BIN
tests/test_etcp_simple_traffic

Binary file not shown.

BIN
tests/test_etcp_two_instances

Binary file not shown.

BIN
tests/test_intensive_memory_pool

Binary file not shown.

BIN
tests/test_ll_queue

Binary file not shown.

BIN
tests/test_memory_pool_and_config

Binary file not shown.

BIN
tests/test_pkt_normalizer_etcp

Binary file not shown.

2
tests/test_pkt_normalizer_etcp.c

@ -431,7 +431,7 @@ int main() {
debug_config_init();
debug_set_level(DEBUG_LEVEL_DEBUG);
debug_set_categories(DEBUG_CATEGORY_ETCP | DEBUG_CATEGORY_NORMALIZER);
debug_set_categories(DEBUG_CATEGORY_ALL);
utun_instance_set_tun_init_enabled(0);

BIN
tests/test_pkt_normalizer_standalone

Binary file not shown.

BIN
tests/test_u_async_comprehensive

Binary file not shown.

BIN
tests/test_u_async_performance

Binary file not shown.
Loading…
Cancel
Save