Browse Source

1

nodeinfo-routing-update
Evgeny 4 weeks ago
parent
commit
4352041416
  1. 41
      src/etcp.c
  2. 5
      src/etcp.h
  3. 9
      src/pkt_normalizer.c
  4. BIN
      tests/bench_timeout_heap
  5. BIN
      tests/bench_uasync_timeouts
  6. 44
      tools/bping/bping.c

41
src/etcp.c

@ -255,6 +255,9 @@ void etcp_conn_reset(struct ETCP_CONN* etcp) {
etcp->last_delivered_id = 0;
etcp->rx_ack_till = 0;
// Устанавливаем флаг ожидания первого пакета
etcp->got_initial_pkt = 0;
// Reset metrics
etcp->unacked_bytes = 0;
etcp->rtt_last = 0;
@ -288,6 +291,23 @@ void etcp_conn_reset(struct ETCP_CONN* etcp) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "end");
}
void etcp_conn_reinit(struct ETCP_CONN* etcp) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
DEBUG_INFO(DEBUG_CATEGORY_ETCP, "Reinitializing ETCP connection [%s]", etcp->log_name);
// Сбрасываем initialized во всех линках
struct ETCP_LINK* link = etcp->links;
while (link) {
link->initialized = 0;
link = link->next;
}
// Вызываем etcp_conn_reset для сброса состояния
etcp_conn_reset(etcp);
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "end");
}
// Update log_name when peer_node_id becomes known
void etcp_update_log_name(struct ETCP_CONN* etcp) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
@ -903,8 +923,24 @@ void etcp_conn_input(struct ETCP_DGRAM* pkt) {
if (len>=5) {
// формируем ACK
struct ACK_PACKET* p = (struct ACK_PACKET*)queue_entry_new_from_pool(etcp->instance->ack_pool);
uint32_t seq=data[1] | (data[2]<<8) | (data[3]<<16) | (data[4]<<24);
if (etcp->got_initial_pkt == 0) {
if (seq==1) etcp->got_initial_pkt = 1;
else {
DEBUG_WARN(DEBUG_CATEGORY_ETCP, "[%s] Waiting for initial packet but recv seq=%d, ignoring", etcp->log_name, seq);
len=0;
break;
}
}
else {
uint32_t d=seq - etcp->last_delivered_id;
if (d>MAX_INFLIGHT_SIZE) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] Received packet out of inflight bounds: seq=%d last delivered=%d", etcp->log_name, seq, etcp->last_delivered_id);
len=0;
break;
}
}
struct ACK_PACKET* p = (struct ACK_PACKET*)queue_entry_new_from_pool(etcp->instance->ack_pool);
p->seq=seq;
p->pkt_timestamp=pkt->timestamp;
p->recv_timestamp=get_current_timestamp();
@ -920,12 +956,14 @@ void etcp_conn_input(struct ETCP_DGRAM* pkt) {
uint8_t* payload_data = memory_pool_alloc(etcp->instance->data_pool);
if (!payload_data) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to allocate payload_data from data_pool", etcp->log_name);
len=0;
break;
}
struct ETCP_FRAGMENT* rx_pkt = (struct ETCP_FRAGMENT*)queue_entry_new_from_pool(etcp->io_pool);
if (!rx_pkt) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to allocate rx_pkt from io_pool", etcp->log_name);
memory_pool_free(etcp->instance->data_pool, payload_data);
len=0;
break;
}
rx_pkt->seq=seq;
@ -955,4 +993,3 @@ void etcp_conn_input(struct ETCP_DGRAM* pkt) {
memory_pool_free(etcp->instance->pkt_pool, pkt); // Free the incoming dgram
}

5
src/etcp.h

@ -34,6 +34,8 @@ uint16_t get_current_timestamp(void);
#define INFLIGHT_STATE_WAIT_SEND 1
#define INFLIGHT_INITIAL_HASH_SIZE 1024
#define MAX_INFLIGHT_SIZE 16384 // максимальное число элементов в inflight приёмной очереди (для предотвращения атак)
// в этот список пакет добавляется когда перемещается из input_queue в input_send_q, при этом к пакету добавляется struct INFLIGHT_PACKET из inflight_pool.
// пакет полностью удаляется когда приходит ACK (либо conn_reset/close)
struct INFLIGHT_PACKET {// выделяется из etcp->inflight_pool
@ -96,6 +98,7 @@ struct ETCP_CONN {
// IDs and state
uint8_t got_initial_pkt; //
uint32_t next_tx_id; // Next TX ID
uint32_t last_rx_id; // Last received ID
uint32_t last_delivered_id; // Last delivered to output_queue
@ -147,6 +150,8 @@ void etcp_connection_close(struct ETCP_CONN* etcp);
void etcp_conn_reset(struct ETCP_CONN* etcp);
void etcp_conn_reinit(struct ETCP_CONN* etcp);
// Отправка: используем api ll_queue для очереди ETCP_CONN.input_queue
// Прием: используем api ll_queue для очереди ETCP_CONN.output_queue
// для очередей используется формат

9
src/pkt_normalizer.c

@ -351,14 +351,19 @@ static void pn_unpacker_cb(struct ll_queue* q, void* arg) {
if (len - ptr < 2) {
// Incomplete header, reset
DEBUG_ERROR(DEBUG_CATEGORY_NORMALIZER, "reset state");
pn_unpacker_reset_state(pn);
// pn_unpacker_reset_state(pn);
etcp_conn_reinit(pn->etcp);
break;
}
uint16_t part_size = payload[ptr] | (payload[ptr + 1] << 8);
// DEBUG_INFO(DEBUG_CATEGORY_NORMALIZER, "pn_unpacker: new fragment pkt_len=%d (at %d)", part_size, ptr);
ptr += 2;
if (part_size<1 || part_size>1500) DEBUG_ERROR(DEBUG_CATEGORY_NORMALIZER, "PART_SIZE ERROR!!! %d", part_size);
if (part_size<1 || part_size>16384) {
DEBUG_ERROR(DEBUG_CATEGORY_NORMALIZER, "PART_SIZE ERROR!!! %d", part_size);
etcp_conn_reinit(pn->etcp);
break;
}
pn->recvpart = ll_alloc_lldgram(part_size);
if (!pn->recvpart) {

BIN
tests/bench_timeout_heap

Binary file not shown.

BIN
tests/bench_uasync_timeouts

Binary file not shown.

44
tools/bping/bping.c

@ -19,7 +19,8 @@
#define RECV_TIMEOUT_SEC 5
static char *host = NULL;
static int data_size = 56;
static int data_size_min = 56;
static int data_size_max = 56;
static int pings_per_burst = 10;
static int bursts = 0; // 0 = бесконечно
static double interval_sec = 1.0;
@ -61,9 +62,29 @@ void send_one_ping(int sock, const struct sockaddr_in *to, int datalen, uint16_t
(struct sockaddr *)to, sizeof(*to));
}
static int parse_size_arg(const char *arg, int *min_val, int *max_val) {
const char *colon = strchr(arg, ':');
if (colon) {
char *end;
long min = strtol(arg, &end, 10);
if (end != colon || min < 0 || min > MAX_DATA_SIZE) return -1;
long max = strtol(colon + 1, &end, 10);
if (*end != '\0' || max < 0 || max > MAX_DATA_SIZE || max < min) return -1;
*min_val = (int)min;
*max_val = (int)max;
} else {
char *end;
long val = strtol(arg, &end, 10);
if (*end != '\0' || val < 0 || val > MAX_DATA_SIZE) return -1;
*min_val = *max_val = (int)val;
}
return 0;
}
void usage() {
printf("Использование: bping [-s размер] [-p посылок] [-b пачек] [-i интервал] host\n\n");
printf(" -s размер данные в байтах (по умолчанию 56, макс ~65k)\n");
printf(" формат: -s 1472 (фиксированный) или -s 100:600 (диапазон)\n");
printf(" -p посылок пингов в одной пачке (по умолчанию 10, макс 10000)\n");
printf(" -b пачек количество пачек (0 = ∞, по умолчанию 0)\n");
printf(" -i интервал между пачками в секундах (0.001, 0.05 и т.д., по умолчанию 1.0)\n\n");
@ -71,7 +92,7 @@ void usage() {
printf("Примеры:\n");
printf(" sudo bping 8.8.8.8\n");
printf(" bping -s 1472 -p 200 -i 0.05 1.1.1.1\n");
printf(" bping -p 500 -b 0 -i 0.01 192.168.1.1\n");
printf(" bping -s 100:600 -p 500 -i 0.01 192.168.1.1\n");
exit(1);
}
@ -84,7 +105,12 @@ int main(int argc, char **argv) {
int opt;
while ((opt = getopt(argc, argv, "s:p:b:i:h")) != -1) {
switch (opt) {
case 's': data_size = atoi(optarg); break;
case 's':
if (parse_size_arg(optarg, &data_size_min, &data_size_max) != 0) {
fprintf(stderr, "Ошибка: неверный формат размера. Используйте -s число или -s мин:макс\n");
usage();
}
break;
case 'p': pings_per_burst = atoi(optarg); break;
case 'b': bursts = atoi(optarg); break;
case 'i': interval_sec = strtod(optarg, NULL); break;
@ -95,7 +121,7 @@ int main(int argc, char **argv) {
if (optind >= argc) usage();
host = argv[optind];
if (data_size < 0 || data_size > MAX_DATA_SIZE || pings_per_burst < 1 || pings_per_burst > MAX_PINGS_PER_BURST) {
if (data_size_min < 0 || data_size_max > MAX_DATA_SIZE || pings_per_burst < 1 || pings_per_burst > MAX_PINGS_PER_BURST) {
usage();
}
@ -130,8 +156,13 @@ int main(int argc, char **argv) {
uint16_t global_seq = 0;
printf("🚀 bping → %s (raw ICMP)\n", host);
if (data_size_min == data_size_max) {
printf("Данные: %d байт | В пачке: %d | Пачек: %s | Интервал: %.3f сек\n\n",
data_size, pings_per_burst, (bursts == 0 ? "" : "ограничено"), interval_sec);
data_size_min, pings_per_burst, (bursts == 0 ? "" : "ограничено"), interval_sec);
} else {
printf("Данные: %d:%d байт (случайно) | В пачке: %d | Пачек: %s | Интервал: %.3f сек\n\n",
data_size_min, data_size_max, pings_per_burst, (bursts == 0 ? "" : "ограничено"), interval_sec);
}
// Выделяем память один раз
struct timeval *send_times = malloc(pings_per_burst * sizeof(struct timeval));
@ -150,10 +181,11 @@ int main(int argc, char **argv) {
// === Отправляем пачку максимально быстро ===
int sent = 0;
for (int i = 0; i < pings_per_burst; i++) {
int pkt_size = data_size_min + (rand() % (data_size_max - data_size_min + 1));
uint16_t seq = ++global_seq;
seq_list[i] = seq;
gettimeofday(&send_times[i], NULL);
send_one_ping(sock, &dest, data_size, ident, seq);
send_one_ping(sock, &dest, pkt_size, ident, seq);
sent++;
}

Loading…
Cancel
Save