|
|
|
|
@ -38,6 +38,7 @@ static void input_queue_cb(struct ll_queue* q, void* arg);
|
|
|
|
|
static void etcp_link_ready_callback(struct ETCP_CONN* etcp); |
|
|
|
|
static void input_send_q_cb(struct ll_queue* q, void* arg); |
|
|
|
|
static void wait_ack_cb(struct ll_queue* q, void* arg); |
|
|
|
|
static void send_ack_req_cb(struct ll_queue* q, void* arg); |
|
|
|
|
static void etcp_conn_process_send_queue(struct ETCP_CONN* etcp); |
|
|
|
|
struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp); |
|
|
|
|
|
|
|
|
|
@ -163,6 +164,7 @@ struct ETCP_CONN* etcp_connection_create(struct UTUN_INSTANCE* instance, char* n
|
|
|
|
|
queue_set_callback(etcp->input_queue, input_queue_cb, etcp); |
|
|
|
|
queue_set_callback(etcp->input_send_q, input_send_q_cb, etcp); |
|
|
|
|
queue_set_callback(etcp->input_wait_ack, wait_ack_cb, etcp); |
|
|
|
|
queue_set_callback(etcp->ack_q, send_ack_req_cb, etcp); |
|
|
|
|
|
|
|
|
|
etcp->link_ready_for_send_fn = etcp_link_ready_callback; |
|
|
|
|
|
|
|
|
|
@ -303,6 +305,9 @@ void etcp_conn_reset(struct ETCP_CONN* etcp) {
|
|
|
|
|
queue_resume_callback(etcp->input_send_q); |
|
|
|
|
queue_resume_callback(etcp->input_wait_ack); |
|
|
|
|
queue_resume_callback(etcp->recv_q); |
|
|
|
|
// queue_resume_callback(etcp->ack_q);
|
|
|
|
|
|
|
|
|
|
etcp->tx_state=ETCP_TX_STATE_DATA_WAIT; |
|
|
|
|
|
|
|
|
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, "end"); |
|
|
|
|
} |
|
|
|
|
@ -430,7 +435,7 @@ static void input_queue_try_resume(struct ETCP_CONN* etcp) {// при ACK
|
|
|
|
|
|
|
|
|
|
// Сперва отправим всё из очереди отправки
|
|
|
|
|
size_t send_q_bytes = queue_total_bytes(etcp->input_send_q); |
|
|
|
|
queue_resume_callback(etcp->input_send_q);// вызвать лишний раз resume не страшно.
|
|
|
|
|
// queue_resume_callback(etcp->input_send_q);// вызвать лишний раз resume не страшно.
|
|
|
|
|
if (send_q_bytes>0) return; |
|
|
|
|
|
|
|
|
|
// когда очередь отправки пуста - пробуем взять новый пакет на обработку
|
|
|
|
|
@ -596,17 +601,42 @@ static void wait_ack_cb(struct ll_queue* q, void* arg) {// добавили па
|
|
|
|
|
if (!etcp->retrans_timer) ack_timeout_check(etcp);// если таймер ретрансмиссий взведен - дожидаемся таймера.
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void input_send_q_cb(struct ll_queue* q, void* arg) {// etcp->input_send_q data ready
|
|
|
|
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
|
|
|
struct ETCP_CONN* etcp=(struct ETCP_CONN*)arg; |
|
|
|
|
etcp_conn_process_send_queue(etcp); |
|
|
|
|
if (etcp->tx_state==ETCP_TX_STATE_DATA_WAIT) queue_resume_callback(etcp->input_send_q); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void input_send_q_cb(struct ll_queue* q, void* arg) {// etcp->input_send_q processing
|
|
|
|
|
static void send_ack_req_cb(struct ll_queue* q, void* arg) {// etcp->ack_q data ready
|
|
|
|
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
|
|
|
struct ETCP_CONN* etcp=(struct ETCP_CONN*)arg; |
|
|
|
|
etcp_conn_process_send_queue(etcp); |
|
|
|
|
if (etcp->tx_state==ETCP_TX_STATE_DATA_WAIT) queue_resume_callback(etcp->ack_q); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void etcp_link_ready_callback(struct ETCP_CONN* etcp) { |
|
|
|
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
|
|
|
if (!etcp) return; |
|
|
|
|
if (etcp->tx_state!=ETCP_TX_STATE_LINK_WAIT) return; |
|
|
|
|
queue_resume_callback(etcp->input_send_q); |
|
|
|
|
queue_resume_callback(etcp->ack_q); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void ack_response_timer_cb(void* arg) {// проверяем неотправленные ack response и отправляем если надо.
|
|
|
|
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
|
|
|
struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg; |
|
|
|
|
etcp_conn_process_send_queue(etcp);// проталкиваем (она же должна отправлять только ack если больше ничего нет)
|
|
|
|
|
// если ack все еще заняты - обновляем таймаут
|
|
|
|
|
if (etcp->ack_q->count) etcp->ack_resp_timer = uasync_set_timeout(etcp->instance->ua, ACK_DELAY_TB, etcp, ack_response_timer_cb); |
|
|
|
|
else etcp->ack_resp_timer=NULL; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Process packets in send queue and transmit them
|
|
|
|
|
static void etcp_conn_process_send_queue(struct ETCP_CONN* etcp) {// вызываем когда есть элемент в send_q или надо отправить ack
|
|
|
|
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
|
|
|
struct ETCP_DGRAM* dgram; |
|
|
|
|
if (etcp->tx_state!=ETCP_TX_STATE_DATA_WAIT) return; |
|
|
|
|
while(dgram = etcp_request_pkt(etcp)) { |
|
|
|
|
etcp_loadbalancer_send(dgram); |
|
|
|
|
} |
|
|
|
|
@ -618,11 +648,12 @@ struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) {
|
|
|
|
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
|
|
|
struct ETCP_LINK* link = etcp_loadbalancer_select_link(etcp); |
|
|
|
|
if (!link) { |
|
|
|
|
etcp->tx_state=ETCP_TX_STATE_WAIT_LINKS; |
|
|
|
|
etcp->tx_state=ETCP_TX_STATE_LINK_WAIT; |
|
|
|
|
etcp->cnt_link_wait++; |
|
|
|
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] no link available", etcp->log_name); |
|
|
|
|
return NULL;// если линков нет - ждём появления свободного
|
|
|
|
|
} |
|
|
|
|
etcp->tx_state=ETCP_TX_STATE_DATA_WAIT; |
|
|
|
|
|
|
|
|
|
size_t send_q_size = queue_entry_count(etcp->input_send_q); |
|
|
|
|
|
|
|
|
|
@ -635,28 +666,21 @@ struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) {
|
|
|
|
|
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_request_pkt: getting packet from input_send_q");
|
|
|
|
|
struct INFLIGHT_PACKET* inf_pkt = (struct INFLIGHT_PACKET*)queue_data_get(etcp->input_send_q); |
|
|
|
|
if (inf_pkt) { |
|
|
|
|
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] prepare udp dgram for send packet %p (seq=%u, len=%u)", etcp->log_name, inf_pkt, inf_pkt->seq, inf_pkt->ll.len);
|
|
|
|
|
|
|
|
|
|
inf_pkt->last_timestamp=get_time_tb(); |
|
|
|
|
inf_pkt->send_count++; |
|
|
|
|
inf_pkt->state=INFLIGHT_STATE_WAIT_ACK; |
|
|
|
|
queue_data_put(etcp->input_wait_ack, (struct ll_entry*)inf_pkt, inf_pkt->seq);// move dgram to wait_ack queue
|
|
|
|
|
} else { |
|
|
|
|
queue_resume_callback(etcp->input_send_q); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
size_t ack_q_size = queue_entry_count(etcp->ack_q); |
|
|
|
|
|
|
|
|
|
if (!inf_pkt && ack_q_size == 0) { |
|
|
|
|
DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "[%s] no data/ack to send", etcp->log_name); |
|
|
|
|
etcp->tx_state=ETCP_TX_STATE_NO_DATA; |
|
|
|
|
return NULL; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
struct ETCP_DGRAM* dgram = memory_pool_alloc(etcp->instance->pkt_pool); |
|
|
|
|
if (!dgram) { |
|
|
|
|
DEBUG_ERROR(DEBUG_CATEGORY_ETCP, "[%s] failed to allocate ETCP_DGRAM", etcp->log_name); |
|
|
|
|
etcp->tx_state=ETCP_TX_STATE_ERR_MEM; |
|
|
|
|
// TODO: КОРРЕКТНО ОСВОБОДИТЬ ПАМЯТЬ
|
|
|
|
|
return NULL; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -752,24 +776,6 @@ struct ETCP_DGRAM* etcp_request_pkt(struct ETCP_CONN* etcp) {
|
|
|
|
|
return dgram; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Callback for when a link is ready to send data
|
|
|
|
|
static void etcp_link_ready_callback(struct ETCP_CONN* etcp) { |
|
|
|
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
|
|
|
if (!etcp) return; |
|
|
|
|
// DEBUG_DEBUG(DEBUG_CATEGORY_ETCP, "etcp_link_ready_callback: processing send queue for etcp=%p", etcp);
|
|
|
|
|
queue_resume_callback(etcp->input_send_q); |
|
|
|
|
// etcp_conn_process_send_queue(etcp);
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void ack_response_timer_cb(void* arg) {// проверяем неотправленные ack response и отправляем если надо.
|
|
|
|
|
DEBUG_TRACE(DEBUG_CATEGORY_ETCP, ""); |
|
|
|
|
struct ETCP_CONN* etcp = (struct ETCP_CONN*)arg; |
|
|
|
|
etcp_conn_process_send_queue(etcp);// проталкиваем (она же должна отправлять только ack если больше ничего нет)
|
|
|
|
|
// если ack все еще заняты - обновляем таймаут
|
|
|
|
|
if (etcp->ack_q->count) etcp->ack_resp_timer = uasync_set_timeout(etcp->instance->ua, ACK_DELAY_TB, etcp, ack_response_timer_cb); |
|
|
|
|
else etcp->ack_resp_timer=NULL; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// ====================================================================== Прием данных
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|