Browse Source

1

nodeinfo-routing-update
Evgeny 3 weeks ago
parent
commit
06323dbad1
  1. 64
      src/etcp.c
  2. 7
      src/etcp.h
  3. 1
      src/etcp_debug.c

64
src/etcp.c

@ -38,6 +38,7 @@ static void input_queue_cb(struct ll_queue* q, void* arg);
static void etcp_link_ready_callback(struct ETCP_CONN* etcp);
static void input_send_q_cb(struct ll_queue* q, void* arg);
static void wait_ack_cb(struct ll_queue* q, void* arg);
static void send_ack_req_cb(struct ll_queue* q, void* arg);
static void etcp_conn_process_send_queue(struct ETCP_CONN* etcp);
struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp);
@ -163,6 +164,7 @@ struct ETCP_CONN* etcp_connection_create(struct UTUN_INSTANCE* instance, char* n
queue_set_callback(etcp->input_queue, input_queue_cb, etcp);
queue_set_callback(etcp->input_send_q, input_send_q_cb, etcp);
queue_set_callback(etcp->input_wait_ack, wait_ack_cb, etcp);
queue_set_callback(etcp->ack_q, send_ack_req_cb, etcp);
etcp->link_ready_for_send_fn = etcp_link_ready_callback;
@ -303,6 +305,9 @@ void etcp_conn_reset(struct ETCP_CONN* etcp) {
queue_resume_callback(etcp->input_send_q);
queue_resume_callback(etcp->input_wait_ack);
queue_resume_callback(etcp->recv_q);
// queue_resume_callback(etcp->ack_q);
etcp->tx_state=ETCP_TX_STATE_DATA_WAIT;
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "end");
}
@ -430,7 +435,7 @@ static void input_queue_try_resume(struct ETCP_CONN* etcp) {// при ACK
// Сперва отправим всё из очереди отправки
size_t send_q_bytes = queue_total_bytes(etcp->input_send_q);
queue_resume_callback(etcp->input_send_q);// вызвать лишний раз resume не страшно.
// queue_resume_callback(etcp->input_send_q);// вызвать лишний раз resume не страшно.
if (send_q_bytes>0) return;
// когда очередь отправки пуста - пробуем взять новый пакет на обработку
@ -596,17 +601,42 @@ static void wait_ack_cb(struct ll_queue* q, void* arg) {// добавили па
if (!etcp->retrans_timer) ack_timeout_check(etcp);// если таймер ретрансмиссий взведен - дожидаемся таймера.
}
static void input_send_q_cb(struct ll_queue* q, void* arg) {// etcp->input_send_q data ready
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
struct ETCP_CONN* etcp=(struct ETCP_CONN*)arg;
etcp_conn_process_send_queue(etcp);
if (etcp->tx_state==ETCP_TX_STATE_DATA_WAIT) queue_resume_callback(etcp->input_send_q);
}
static void input_send_q_cb(struct ll_queue* q, void* arg) {// etcp->input_send_q processing
static void send_ack_req_cb(struct ll_queue* q, void* arg) {// etcp->ack_q data ready
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
struct ETCP_CONN* etcp=(struct ETCP_CONN*)arg;
etcp_conn_process_send_queue(etcp);
if (etcp->tx_state==ETCP_TX_STATE_DATA_WAIT) queue_resume_callback(etcp->ack_q);
}
static void etcp_link_ready_callback(struct ETCP_CONN* etcp) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
if (!etcp) return;
if (etcp->tx_state!=ETCP_TX_STATE_LINK_WAIT) return;
queue_resume_callback(etcp->input_send_q);
queue_resume_callback(etcp->ack_q);
}
static void ack_response_timer_cb(void* arg) {// проверяем неотправленные ack response и отправляем если надо.
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg;
etcp_conn_process_send_queue(etcp);// проталкиваем (она же должна отправлять только ack если больше ничего нет)
// если ack все еще заняты - обновляем таймаут
if (etcp->ack_q->count) etcp->ack_resp_timer = uasync_set_timeout(etcp->instance->ua, ACK_DELAY_TB, etcp, ack_response_timer_cb);
else etcp->ack_resp_timer=NULL;
}
// Process packets in send queue and transmit them
static void etcp_conn_process_send_queue(struct ETCP_CONN* etcp) {// вызываем когда есть элемент в send_q или надо отправить ack
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
struct ETCP_DGRAM* dgram;
if (etcp->tx_state!=ETCP_TX_STATE_DATA_WAIT) return;
while(dgram = etcp_request_pkt(etcp)) {
etcp_loadbalancer_send(dgram);
}
@ -618,11 +648,12 @@ struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
struct ETCP_LINK* link = etcp_loadbalancer_select_link(etcp);
if (!link) {
etcp->tx_state=ETCP_TX_STATE_WAIT_LINKS;
etcp->tx_state=ETCP_TX_STATE_LINK_WAIT;
etcp->cnt_link_wait++;
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] no link available", etcp->log_name);
return NULL;// если линков нет - ждём появления свободного
}
etcp->tx_state=ETCP_TX_STATE_DATA_WAIT;
size_t send_q_size = queue_entry_count(etcp->input_send_q);
@ -635,28 +666,21 @@ struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) {
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: getting packet from input_send_q");
struct INFLIGHT_PACKET* inf_pkt = (struct INFLIGHT_PACKET*)queue_data_get(etcp->input_send_q);
if (inf_pkt) {
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] prepare udp dgram for send packet %p (seq=%u, len=%u)", etcp->log_name, inf_pkt, inf_pkt->seq, inf_pkt->ll.len);
inf_pkt->last_timestamp=get_time_tb();
inf_pkt->send_count++;
inf_pkt->state=INFLIGHT_STATE_WAIT_ACK;
queue_data_put(etcp->input_wait_ack, (struct ll_entry*)inf_pkt, inf_pkt->seq);// move dgram to wait_ack queue
} else {
queue_resume_callback(etcp->input_send_q);
}
}
size_t ack_q_size = queue_entry_count(etcp->ack_q);
if (!inf_pkt && ack_q_size == 0) {
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] no data/ack to send", etcp->log_name);
etcp->tx_state=ETCP_TX_STATE_NO_DATA;
return NULL;
}
struct ETCP_DGRAM* dgram = memory_pool_alloc(etcp->instance->pkt_pool);
if (!dgram) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to allocate ETCP_DGRAM", etcp->log_name);
etcp->tx_state=ETCP_TX_STATE_ERR_MEM;
// TODO: КОРРЕКТНО ОСВОБОДИТЬ ПАМЯТЬ
return NULL;
}
@ -752,24 +776,6 @@ struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) {
return dgram;
}
// Callback for when a link is ready to send data
static void etcp_link_ready_callback(struct ETCP_CONN* etcp) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
if (!etcp) return;
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_link_ready_callback: processing send queue for etcp=%p", etcp);
queue_resume_callback(etcp->input_send_q);
// etcp_conn_process_send_queue(etcp);
}
static void ack_response_timer_cb(void* arg) {// проверяем неотправленные ack response и отправляем если надо.
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg;
etcp_conn_process_send_queue(etcp);// проталкиваем (она же должна отправлять только ack если больше ничего нет)
// если ack все еще заняты - обновляем таймаут
if (etcp->ack_q->count) etcp->ack_resp_timer = uasync_set_timeout(etcp->instance->ua, ACK_DELAY_TB, etcp, ack_response_timer_cb);
else etcp->ack_resp_timer=NULL;
}
// ====================================================================== Прием данных

7
src/etcp.h

@ -64,9 +64,8 @@ struct ACK_PACKET {
uint32_t recv_timestamp;// время приема (локальное)
};
#define ETCP_TX_STATE_WAIT_LINKS 1
#define ETCP_TX_STATE_NO_DATA 2
#define ETCP_TX_STATE_ERR_MEM 100
#define ETCP_TX_STATE_DATA_WAIT 1
#define ETCP_TX_STATE_LINK_WAIT 2
// ETCP connection structure (refactored)
struct ETCP_CONN {
@ -148,6 +147,7 @@ struct ETCP_CONN {
uint8_t routing_exchange_active; // 0 - не активен, 1 - надо инициировать обмен маршрутами (клиент), 2 - обмен маршрутами активен
uint8_t got_initial_pkt; //
uint8_t initialized; // 0 - только созданный ETCP, 1 - хотя бы один линк проинициалзирован (обмен ключами произведен)
uint8_t tx_state; // 0 - n/a, 1 - data_wait (queues empty), 2 - link_wait (link busy)
// Callback for ready notification
etcp_on_conn_ready ready_cbk; // callback при готовности соединения
@ -157,7 +157,6 @@ struct ETCP_CONN {
uint32_t cnt_ack_hit_sndq; // счетчик удалений inflight пакетов из sndq
uint32_t cnt_ack_miss; // счетчик не найденных ack
uint32_t cnt_link_wait; // счетчик переходов в ожидание когда link busy
uint32_t tx_state; // 1 - wait link ready, 2
uint32_t debug[8]; // 8 значений для дебага (live watch)
// Logging identifier (format: "XXXX→XXXX [name]" - last 4 digits of local and peer node_id + optional name)

1
src/etcp_debug.c

@ -1,4 +1,3 @@
#define DEBUG_CATEGORY_ETCP 1
#include "etcp_debug.h"
#include "etcp.h"
#include "../lib/debug_config.h"

Loading…
Cancel
Save