Browse Source

1

nodeinfo-routing-update
jeka 3 weeks ago
parent
commit
57f3a9e37b
  1. 44
      src/etcp.c
  2. 2
      src/etcp_connections.c

44
src/etcp.c

@ -39,6 +39,7 @@ static void etcp_link_ready_callback(struct ETCP_CONN* etcp);
static void input_send_q_cb(struct ll_queue* q, void* arg); static void input_send_q_cb(struct ll_queue* q, void* arg);
static void wait_ack_cb(struct ll_queue* q, void* arg); static void wait_ack_cb(struct ll_queue* q, void* arg);
static void etcp_conn_process_send_queue(struct ETCP_CONN* etcp); static void etcp_conn_process_send_queue(struct ETCP_CONN* etcp);
struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp);
// Helper function to drain and free ETCP_FRAGMENT queue // Helper function to drain and free ETCP_FRAGMENT queue
static void drain_and_free_fragment_queue(struct ETCP_CONN* etcp, struct ll_queue** q) { static void drain_and_free_fragment_queue(struct ETCP_CONN* etcp, struct ll_queue** q) {
@ -184,7 +185,6 @@ void etcp_connection_close(struct ETCP_CONN* etcp) {
// Cancel active timers to prevent memory leaks // Cancel active timers to prevent memory leaks
if (etcp->retrans_timer) { if (etcp->retrans_timer) {
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] RETRANS TMR CANCEL", etcp->log_name);
uasync_cancel_timeout(etcp->instance->ua, etcp->retrans_timer); uasync_cancel_timeout(etcp->instance->ua, etcp->retrans_timer);
etcp->retrans_timer = NULL; etcp->retrans_timer = NULL;
} }
@ -350,7 +350,7 @@ void etcp_update_log_name(struct ETCP_CONN* etcp) {
} }
// ====================================================================== Отправка данных // ====================================================================== Отправка данных в etcp после нормализации
// Send data through ETCP connection // Send data through ETCP connection
// Allocates memory from data_pool and places in input queue // Allocates memory from data_pool and places in input queue
@ -532,22 +532,10 @@ static void input_queue_cb(struct ll_queue* q, void* arg) {
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "input_queue_cb: successfully moved from input_queue to input_send_q"); // DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "input_queue_cb: successfully moved from input_queue to input_send_q");
etcp_conn_process_send_queue(etcp);// сразу обработаем этот пакет // etcp_conn_process_send_queue(etcp);// сразу обработаем этот пакет
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] nextloop, input_queue size=%d ", etcp->log_name, q->count); DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] nextloop, input_queue size=%d ", etcp->log_name, q->count);
} }
static void input_send_q_cb(struct ll_queue* q, void* arg) {// etcp->input_send_q processing
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
struct ETCP_CONN* etcp=(struct ETCP_CONN*)arg;
// size_t send_q_bytes = queue_total_bytes(etcp->input_send_q);
// size_t send_q_pkts = queue_entry_count(etcp->input_send_q);
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "input_send_q_cb: input_send_q status: %d pkt %d bytes", send_q_pkts, send_q_bytes);
etcp_conn_process_send_queue(etcp);
}
static void ack_timeout_cb(void* arg); static void ack_timeout_cb(void* arg);
static void ack_timeout_check(struct ETCP_CONN* etcp) { static void ack_timeout_check(struct ETCP_CONN* etcp) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
@ -608,6 +596,22 @@ static void wait_ack_cb(struct ll_queue* q, void* arg) {// добавили па
if (!etcp->retrans_timer) ack_timeout_check(etcp);// если таймер ретрансмиссий взведен - дожидаемся таймера. if (!etcp->retrans_timer) ack_timeout_check(etcp);// если таймер ретрансмиссий взведен - дожидаемся таймера.
} }
static void input_send_q_cb(struct ll_queue* q, void* arg) {// etcp->input_send_q processing
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
struct ETCP_CONN* etcp=(struct ETCP_CONN*)arg;
etcp_conn_process_send_queue(etcp);
}
// Process packets in send queue and transmit them
static void etcp_conn_process_send_queue(struct ETCP_CONN* etcp) {// вызываем когда есть элемент в send_q или надо отправить ack
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
struct ETCP_DGRAM* dgram;
while(dgram = etcp_request_pkt(etcp)) {
etcp_loadbalancer_send(dgram);
}
}
// Подготовить и отправить кодограмму // Подготовить и отправить кодограмму
// вызывается линком когда освобождается или очередью если появляются данные на передачу // вызывается линком когда освобождается или очередью если появляются данные на передачу
struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) { struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) {
@ -755,16 +759,6 @@ static void etcp_link_ready_callback(struct ETCP_CONN* etcp) {
// etcp_conn_process_send_queue(etcp); // etcp_conn_process_send_queue(etcp);
} }
// Process packets in send queue and transmit them
static void etcp_conn_process_send_queue(struct ETCP_CONN* etcp) {
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
struct ETCP_DGRAM* dgram;
while(dgram = etcp_request_pkt(etcp)) {
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_conn_process_send_queue: sending packet");
etcp_loadbalancer_send(dgram);
}
}
static void ack_response_timer_cb(void* arg) {// проверяем неотправленные ack response и отправляем если надо. static void ack_response_timer_cb(void* arg) {// проверяем неотправленные ack response и отправляем если надо.
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "");
struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg; struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg;

2
src/etcp_connections.c

@ -658,7 +658,7 @@ int etcp_encrypt_send(struct ETCP_DGRAM* dgram) {
// inet_ntop removed - use ip_to_str // inet_ntop removed - use ip_to_str
} }
ssize_t sent=-1; ssize_t sent=enc_buf_len + dgram->noencrypt_len;
int loss_rate = dgram->link->conn->loss_rate; int loss_rate = dgram->link->conn->loss_rate;
int rnd = rand() % 100; int rnd = rand() % 100;
if (loss_rate == 0 || rnd >= loss_rate) { if (loss_rate == 0 || rnd >= loss_rate) {

Loading…
Cancel
Save