You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
856 lines
32 KiB
856 lines
32 KiB
/* |
|
* etcpmon_client.c - ETCP Monitor Client Network Implementation |
|
*/ |
|
|
|
#include "etcpmon_client.h" |
|
#include "etcpmon_graph.h" |
|
#include <stdio.h> |
|
#include <string.h> |
|
#include <ws2tcpip.h> |
|
#include <windows.h> |
|
|
|
//#pragma comment(lib, "ws2_32.lib") |
|
|
|
/* Log file name */ |
|
#define LOG_FILENAME "etcpmon.log" |
|
|
|
/* Get current timestamp in milliseconds (Unix epoch) */ |
|
uint64_t get_timestamp_ms(void) { |
|
FILETIME ft; |
|
GetSystemTimeAsFileTime(&ft); |
|
|
|
/* Convert FILETIME (100-nanosecond intervals since 1601) to milliseconds */ |
|
uint64_t time = ((uint64_t)ft.dwHighDateTime << 32) | ft.dwLowDateTime; |
|
/* Convert from 1601 to 1970 epoch (11644473600000 milliseconds) */ |
|
return (time / 10000) - 11644473600000ULL; |
|
} |
|
|
|
/* Log hex data to file */ |
|
static void log_hex_data(FILE* log, const char* prefix, const uint8_t* data, size_t len) { |
|
if (!log || !data || len == 0) return; |
|
|
|
fprintf(log, "%llu: [%s] ", (unsigned long long)get_timestamp_ms(), prefix); |
|
for (size_t i = 0; i < len && i < 256; i++) { /* Limit to 256 bytes per line */ |
|
fprintf(log, "%02X ", data[i]); |
|
} |
|
if (len > 256) { |
|
fprintf(log, "... (%llu bytes total)", (unsigned long long)len); |
|
} |
|
fprintf(log, "\n"); |
|
fflush(log); |
|
} |
|
|
|
void etcpmon_client_init(struct etcpmon_client* client) { |
|
if (!client) return; |
|
memset(client, 0, sizeof(*client)); |
|
client->sock = INVALID_SOCKET; |
|
client->connected = 0; |
|
client->fully_connected = 0; |
|
client->log_file = NULL; |
|
client->next_seq_id = 1; /* Start from 1, 0 = broadcast */ |
|
memset(&client->history, 0, sizeof(client->history)); |
|
for (int i = 0; i < GRAPH_METRIC_COUNT; i++) { |
|
client->history.min_val[i] = 1e9f; |
|
client->history.max_val[i] = -1e9f; |
|
} |
|
client->history.last_minmax_update = 0; |
|
|
|
/* Новый таймер авто-запроса метрик */ |
|
client->last_metrics_request_ms = 0; |
|
} |
|
|
|
void etcpmon_client_cleanup(struct etcpmon_client* client) { |
|
if (!client) return; |
|
|
|
etcpmon_client_disconnect(client); |
|
|
|
/* Free pending requests */ |
|
struct etcpmon_pending_request* req = client->pending_head; |
|
while (req) { |
|
struct etcpmon_pending_request* next = req->next; |
|
free(req); |
|
req = next; |
|
} |
|
client->pending_head = NULL; |
|
client->pending_tail = NULL; |
|
|
|
/* Close log file */ |
|
if (client->log_file) { |
|
fprintf(client->log_file, "%llu: [LOG] Log closed\n", |
|
(unsigned long long)get_timestamp_ms()); |
|
fclose(client->log_file); |
|
client->log_file = NULL; |
|
} |
|
|
|
/* Free cached data */ |
|
if (client->conn_list) { |
|
free(client->conn_list); |
|
client->conn_list = NULL; |
|
} |
|
if (client->last_metrics) { |
|
free(client->last_metrics); |
|
client->last_metrics = NULL; |
|
} |
|
if (client->last_links) { |
|
free(client->last_links); |
|
client->last_links = NULL; |
|
} |
|
} |
|
|
|
int etcpmon_client_connect(struct etcpmon_client* client, const char* addr, uint16_t port) { |
|
if (!client || !addr) return -1; |
|
|
|
if (client->connected) { |
|
etcpmon_client_disconnect(client); |
|
} |
|
|
|
/* Open log file (truncate on start) */ |
|
if (!client->log_file) { |
|
client->log_file = fopen(LOG_FILENAME, "w"); |
|
if (client->log_file) { |
|
fprintf(client->log_file, "%llu: [LOG] ETCP Monitor communication log started\n", |
|
(unsigned long long)get_timestamp_ms()); |
|
fprintf(client->log_file, "%llu: [LOG] Connecting to %s:%d\n", |
|
(unsigned long long)get_timestamp_ms(), addr, port); |
|
fflush(client->log_file); |
|
} |
|
} |
|
|
|
/* Create socket */ |
|
client->sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); |
|
if (client->sock == INVALID_SOCKET) { |
|
if (client->log_file) { |
|
fprintf(client->log_file, "%llu: [ERROR] Failed to create socket\n", |
|
(unsigned long long)get_timestamp_ms()); |
|
fflush(client->log_file); |
|
} |
|
if (client->on_error) { |
|
client->on_error("Failed to create socket", client->user_data); |
|
} |
|
return -1; |
|
} |
|
|
|
/* Set non-blocking mode */ |
|
u_long nonblock = 1; |
|
ioctlsocket(client->sock, FIONBIO, &nonblock); |
|
|
|
/* Setup address */ |
|
struct sockaddr_in server_addr; |
|
memset(&server_addr, 0, sizeof(server_addr)); |
|
server_addr.sin_family = AF_INET; |
|
server_addr.sin_port = htons(port); |
|
|
|
if (inet_pton(AF_INET, addr, &server_addr.sin_addr) != 1) { |
|
if (client->log_file) { |
|
fprintf(client->log_file, "%llu: [ERROR] Invalid address: %s\n", |
|
(unsigned long long)get_timestamp_ms(), addr); |
|
fflush(client->log_file); |
|
} |
|
closesocket(client->sock); |
|
client->sock = INVALID_SOCKET; |
|
if (client->on_error) { |
|
client->on_error("Invalid address", client->user_data); |
|
} |
|
return -1; |
|
} |
|
|
|
/* Connect (non-blocking) */ |
|
int result = connect(client->sock, (struct sockaddr*)&server_addr, sizeof(server_addr)); |
|
if (result == SOCKET_ERROR) { |
|
int err = WSAGetLastError(); |
|
if (err != WSAEWOULDBLOCK && err != WSAEINPROGRESS) { |
|
if (client->log_file) { |
|
fprintf(client->log_file, "%llu: [ERROR] Connection failed: %d\n", |
|
(unsigned long long)get_timestamp_ms(), err); |
|
fflush(client->log_file); |
|
} |
|
closesocket(client->sock); |
|
client->sock = INVALID_SOCKET; |
|
if (client->on_error) { |
|
char msg[256]; |
|
snprintf(msg, sizeof(msg), "Connection failed: %d", err); |
|
client->on_error(msg, client->user_data); |
|
} |
|
return -1; |
|
} |
|
/* Connection in progress, will complete asynchronously */ |
|
} |
|
|
|
strncpy(client->server_addr, addr, sizeof(client->server_addr) - 1); |
|
client->server_port = port; |
|
client->connected = 1; |
|
client->fully_connected = 0; |
|
|
|
if (client->log_file) { |
|
fprintf(client->log_file, "%llu: [LOG] Socket created, connection in progress\n", |
|
(unsigned long long)get_timestamp_ms()); |
|
fflush(client->log_file); |
|
} |
|
|
|
return 0; |
|
} |
|
|
|
void etcpmon_client_disconnect(struct etcpmon_client* client) { |
|
if (!client) return; |
|
|
|
if (client->sock != INVALID_SOCKET) { |
|
/* Send disconnect command */ |
|
if (client->connected) { |
|
struct etcpmon_msg_header hdr; |
|
etcpmon_build_header(&hdr, 0, ETCPMON_CMD_DISCONNECT, 0); |
|
|
|
if (client->log_file) { |
|
log_hex_data(client->log_file, "TX", (const uint8_t*)&hdr, sizeof(hdr)); |
|
fprintf(client->log_file, "%llu: [LOG] Sending CMD_DISCONNECT\n", |
|
(unsigned long long)get_timestamp_ms()); |
|
} |
|
|
|
send(client->sock, (const char*)&hdr, sizeof(hdr), 0); |
|
} |
|
|
|
if (client->log_file) { |
|
fprintf(client->log_file, "%llu: [LOG] Connection closed\n", |
|
(unsigned long long)get_timestamp_ms()); |
|
fflush(client->log_file); |
|
} |
|
|
|
closesocket(client->sock); |
|
client->sock = INVALID_SOCKET; |
|
} |
|
|
|
client->connected = 0; |
|
client->fully_connected = 0; |
|
client->recv_len = 0; |
|
client->selected_peer_id = 0; |
|
|
|
/* Очистить очередь pending requests и сбросить seq_id */ |
|
struct etcpmon_pending_request* req = client->pending_head; |
|
while (req) { |
|
struct etcpmon_pending_request* next = req->next; |
|
free(req); |
|
req = next; |
|
} |
|
client->pending_head = NULL; |
|
client->pending_tail = NULL; |
|
client->next_seq_id = 1; |
|
client->last_response_seq_id = 0; |
|
} |
|
|
|
int etcpmon_client_is_connected(const struct etcpmon_client* client) { |
|
return client ? client->connected : 0; |
|
} |
|
|
|
int etcpmon_client_request_list(struct etcpmon_client* client) { |
|
if (!client || client->sock == INVALID_SOCKET) return -1; |
|
|
|
struct etcpmon_msg_header hdr; |
|
etcpmon_build_header(&hdr, 0, ETCPMON_CMD_LIST_CONN, 0); /* seq=0 = legacy режим */ |
|
|
|
if (client->log_file) { |
|
log_hex_data(client->log_file, "TX", (const uint8_t*)&hdr, sizeof(hdr)); |
|
fprintf(client->log_file, "%llu: [LOG] CMD_LIST_CONN (seq=0)\n", |
|
(unsigned long long)get_timestamp_ms()); |
|
fflush(client->log_file); |
|
} |
|
|
|
int sent = send(client->sock, (const char*)&hdr, sizeof(hdr), 0); |
|
if (sent != sizeof(hdr)) { |
|
if (client->log_file) { |
|
fprintf(client->log_file, "%llu: [ERROR] Failed to send CMD_LIST_CONN: %d\n", |
|
(unsigned long long)get_timestamp_ms(), WSAGetLastError()); |
|
fflush(client->log_file); |
|
} |
|
return -1; |
|
} |
|
|
|
return 0; |
|
} |
|
|
|
int etcpmon_client_select_connection(struct etcpmon_client* client, uint64_t peer_node_id) { |
|
if (!client || client->sock == INVALID_SOCKET) return -1; |
|
|
|
/* SELECT is special - it's not a request/response, it just sets state |
|
* So we send with seq_id=0 (broadcast) */ |
|
uint8_t buffer[sizeof(struct etcpmon_msg_header) + sizeof(struct etcpmon_cmd_select)]; |
|
struct etcpmon_msg_header* hdr = (struct etcpmon_msg_header*)buffer; |
|
struct etcpmon_cmd_select* cmd = (struct etcpmon_cmd_select*)(buffer + sizeof(*hdr)); |
|
|
|
etcpmon_build_header(hdr, sizeof(*cmd), ETCPMON_CMD_SELECT_CONN, 0); /* seq_id=0 for SELECT */ |
|
cmd->peer_node_id = peer_node_id; |
|
|
|
if (client->log_file) { |
|
log_hex_data(client->log_file, "TX", buffer, sizeof(buffer)); |
|
fprintf(client->log_file, "%llu: [LOG] CMD_SELECT_CONN peer_id=%016llX\n", |
|
(unsigned long long)get_timestamp_ms(), (unsigned long long)peer_node_id); |
|
fflush(client->log_file); |
|
} |
|
|
|
int sent = send(client->sock, (const char*)buffer, sizeof(buffer), 0); |
|
if (sent != sizeof(buffer)) { |
|
if (client->log_file) { |
|
fprintf(client->log_file, "%llu: [ERROR] Failed to send CMD_SELECT_CONN: %d\n", |
|
(unsigned long long)get_timestamp_ms(), WSAGetLastError()); |
|
fflush(client->log_file); |
|
} |
|
return -1; |
|
} |
|
|
|
client->selected_peer_id = peer_node_id; |
|
return 0; |
|
} |
|
|
|
int etcpmon_client_request_metrics(struct etcpmon_client* client) { |
|
if (!client || client->sock == INVALID_SOCKET) return -1; |
|
|
|
struct etcpmon_msg_header hdr; |
|
etcpmon_build_header(&hdr, 0, ETCPMON_CMD_GET_METRICS, 0); |
|
|
|
if (client->log_file) { |
|
log_hex_data(client->log_file, "TX", (const uint8_t*)&hdr, sizeof(hdr)); |
|
fprintf(client->log_file, "%llu: [LOG] CMD_GET_METRICS\n", |
|
(unsigned long long)get_timestamp_ms()); |
|
fflush(client->log_file); |
|
} |
|
|
|
int sent = send(client->sock, (const char*)&hdr, sizeof(hdr), 0); |
|
if (sent != sizeof(hdr)) { |
|
if (client->log_file) { |
|
fprintf(client->log_file, "%llu: [ERROR] Failed to send CMD_GET_METRICS: %d\n", |
|
(unsigned long long)get_timestamp_ms(), WSAGetLastError()); |
|
fflush(client->log_file); |
|
} |
|
return -1; |
|
} |
|
|
|
return 0; |
|
} |
|
|
|
/* Send request with callback - queued version */ |
|
int etcpmon_client_send_request(struct etcpmon_client* client, |
|
uint8_t type, |
|
const void* payload, uint16_t payload_size, |
|
void (*callback)(void* arg, uint8_t msg_type, const uint8_t* payload, uint16_t payload_size), |
|
void* arg) { |
|
if (!client || client->sock == INVALID_SOCKET || !callback) return -1; |
|
|
|
/* Allocate pending request */ |
|
struct etcpmon_pending_request* req = (struct etcpmon_pending_request*)malloc(sizeof(*req)); |
|
if (!req) { |
|
if (client->log_file) { |
|
fprintf(client->log_file, "%llu: [ERROR] Failed to allocate pending request\n", |
|
(unsigned long long)get_timestamp_ms()); |
|
fflush(client->log_file); |
|
} |
|
return -1; |
|
} |
|
|
|
/* Assign sequence ID */ |
|
req->seq_id = client->next_seq_id++; |
|
if (client->next_seq_id == 0) client->next_seq_id = 1; /* Skip 0 (broadcast) */ |
|
req->callback = callback; |
|
req->arg = arg; |
|
req->next = NULL; |
|
|
|
/* Add to queue */ |
|
if (client->pending_tail) { |
|
client->pending_tail->next = req; |
|
} else { |
|
client->pending_head = req; |
|
} |
|
client->pending_tail = req; |
|
|
|
/* Build message with seq_id */ |
|
uint8_t buffer[sizeof(struct etcpmon_msg_header) + ETCPMON_MAX_MSG_SIZE]; |
|
if (payload_size > ETCPMON_MAX_MSG_SIZE) { |
|
free(req); |
|
return -1; |
|
} |
|
|
|
struct etcpmon_msg_header* hdr = (struct etcpmon_msg_header*)buffer; |
|
etcpmon_build_header(hdr, payload_size, type, req->seq_id); |
|
|
|
if (payload && payload_size > 0) { |
|
memcpy(buffer + sizeof(*hdr), payload, payload_size); |
|
} |
|
|
|
uint16_t total_size = sizeof(*hdr) + payload_size; |
|
|
|
if (client->log_file) { |
|
log_hex_data(client->log_file, "TX", buffer, total_size); |
|
fprintf(client->log_file, "%llu: [LOG] CMD 0x%02X seq=%d queued\n", |
|
(unsigned long long)get_timestamp_ms(), type, req->seq_id); |
|
fflush(client->log_file); |
|
} |
|
|
|
int sent = send(client->sock, (const char*)buffer, total_size, 0); |
|
if (sent != total_size) { |
|
/* Remove from queue on error */ |
|
if (client->pending_head == req) { |
|
client->pending_head = req->next; |
|
if (!client->pending_head) client->pending_tail = NULL; |
|
} |
|
free(req); |
|
if (client->log_file) { |
|
fprintf(client->log_file, "%llu: [ERROR] Failed to send request: %d\n", |
|
(unsigned long long)get_timestamp_ms(), WSAGetLastError()); |
|
fflush(client->log_file); |
|
} |
|
return -1; |
|
} |
|
|
|
return 0; |
|
} |
|
|
|
static void handle_response(struct etcpmon_client* client, uint8_t msg_type, |
|
const uint8_t* payload, uint16_t payload_size) { |
|
switch (msg_type) { |
|
case ETCPMON_RSP_CONN_LIST: { |
|
if (payload_size < sizeof(struct etcpmon_rsp_conn_list)) break; |
|
|
|
struct etcpmon_rsp_conn_list* rsp = (struct etcpmon_rsp_conn_list*)payload; |
|
uint8_t count = rsp->count; |
|
|
|
if (payload_size < sizeof(*rsp) + count * sizeof(struct etcpmon_conn_info)) break; |
|
|
|
/* Update cached list */ |
|
if (client->conn_list) { |
|
free(client->conn_list); |
|
} |
|
client->conn_list = (struct etcpmon_conn_info*)malloc(count * sizeof(struct etcpmon_conn_info)); |
|
if (client->conn_list) { |
|
memcpy(client->conn_list, payload + sizeof(*rsp), count * sizeof(struct etcpmon_conn_info)); |
|
client->conn_count = count; |
|
|
|
if (client->log_file) { |
|
fprintf(client->log_file, "%llu: [LOG] RSP_CONN_LIST count=%d\n", |
|
(unsigned long long)get_timestamp_ms(), count); |
|
for (uint8_t i = 0; i < count; i++) { |
|
fprintf(client->log_file, "%llu: [LOG] [%d] peer_id=%016llX name='%s'\n", |
|
(unsigned long long)get_timestamp_ms(), i, |
|
(unsigned long long)client->conn_list[i].peer_node_id, |
|
client->conn_list[i].name); |
|
} |
|
fflush(client->log_file); |
|
} |
|
|
|
if (client->on_conn_list) { |
|
client->on_conn_list(client->conn_list, count, client->user_data); |
|
} |
|
} |
|
break; |
|
} |
|
|
|
case ETCPMON_RSP_METRICS: { |
|
if (payload_size < sizeof(struct etcpmon_rsp_metrics)) break; |
|
|
|
struct etcpmon_rsp_metrics* rsp = (struct etcpmon_rsp_metrics*)payload; |
|
uint8_t links_count = rsp->etcp.links_count; |
|
|
|
if (payload_size < sizeof(*rsp) + links_count * sizeof(struct etcpmon_link_metrics)) break; |
|
|
|
/* Update cached metrics */ |
|
if (client->last_metrics) { |
|
free(client->last_metrics); |
|
} |
|
if (client->last_links) { |
|
free(client->last_links); |
|
} |
|
|
|
client->last_metrics = (struct etcpmon_rsp_metrics*)malloc(sizeof(*rsp)); |
|
client->last_links = (struct etcpmon_link_metrics*)malloc(links_count * sizeof(struct etcpmon_link_metrics)); |
|
|
|
if (client->last_metrics && client->last_links) { |
|
memcpy(client->last_metrics, rsp, sizeof(*rsp)); |
|
memcpy(client->last_links, payload + sizeof(*rsp), |
|
links_count * sizeof(struct etcpmon_link_metrics)); |
|
client->last_links_count = links_count; |
|
|
|
if (client->log_file) { |
|
fprintf(client->log_file, "%llu: [LOG] RSP_METRICS rtt=%u bytes=%llu links=%d\n", |
|
(unsigned long long)get_timestamp_ms(), |
|
rsp->etcp.rtt_last, |
|
(unsigned long long)rsp->etcp.bytes_sent_total, |
|
links_count); |
|
fflush(client->log_file); |
|
} |
|
|
|
if (client->on_metrics) { |
|
client->on_metrics(client->last_metrics, client->last_links, |
|
links_count, client->user_data); |
|
} |
|
|
|
etcpmon_client_add_to_history(client, client->last_metrics); |
|
} |
|
break; |
|
} |
|
|
|
case ETCPMON_RSP_ERROR: { |
|
if (payload_size >= sizeof(struct etcpmon_rsp_error)) { |
|
struct etcpmon_rsp_error* rsp = (struct etcpmon_rsp_error*)payload; |
|
const char* msg = (const char*)(payload + sizeof(*rsp)); |
|
|
|
if (client->log_file) { |
|
fprintf(client->log_file, "%llu: [LOG] RSP_ERROR code=%d msg='%s'\n", |
|
(unsigned long long)get_timestamp_ms(), rsp->error_code, msg); |
|
fflush(client->log_file); |
|
} |
|
|
|
if (client->on_error) { |
|
client->on_error(msg, client->user_data); |
|
} |
|
} |
|
break; |
|
} |
|
} |
|
} |
|
|
|
/* Find pending request by seq_id */ |
|
static struct etcpmon_pending_request* find_pending_request(struct etcpmon_client* client, uint8_t seq_id) { |
|
struct etcpmon_pending_request* req = client->pending_head; |
|
while (req) { |
|
if (req->seq_id == seq_id) { |
|
return req; |
|
} |
|
req = req->next; |
|
} |
|
return NULL; |
|
} |
|
|
|
/* Remove pending request from queue */ |
|
static void remove_pending_request(struct etcpmon_client* client, struct etcpmon_pending_request* req) { |
|
struct etcpmon_pending_request** curr = &client->pending_head; |
|
while (*curr) { |
|
if (*curr == req) { |
|
*curr = req->next; |
|
if (client->pending_tail == req) { |
|
client->pending_tail = *curr; |
|
} |
|
free(req); |
|
return; |
|
} |
|
curr = &(*curr)->next; |
|
} |
|
} |
|
|
|
/* Handle response - find matching request and call callback */ |
|
static void handle_response_with_callback(struct etcpmon_client* client, uint8_t msg_type, |
|
const uint8_t* payload, uint16_t payload_size) { |
|
uint8_t seq_id = client->last_response_seq_id; |
|
|
|
/* Find pending request */ |
|
struct etcpmon_pending_request* req = find_pending_request(client, seq_id); |
|
|
|
if (req) { |
|
/* Call the callback */ |
|
if (req->callback) { |
|
req->callback(req->arg, msg_type, payload, payload_size); |
|
} |
|
/* Remove from queue */ |
|
remove_pending_request(client, req); |
|
|
|
if (client->log_file) { |
|
fprintf(client->log_file, "%llu: [LOG] Response handled, seq=%d removed from queue\n", |
|
(unsigned long long)get_timestamp_ms(), seq_id); |
|
fflush(client->log_file); |
|
} |
|
} else { |
|
/* No pending request - maybe broadcast/unsolicited or stale */ |
|
if (client->log_file) { |
|
fprintf(client->log_file, "%llu: [LOG] Response seq=%d - no pending request (broadcast?)\n", |
|
(unsigned long long)get_timestamp_ms(), seq_id); |
|
fflush(client->log_file); |
|
} |
|
} |
|
|
|
/* === НОВОЕ: ВСЕГДА вызываем legacy обработчики (on_conn_list / on_metrics / on_error) === |
|
* Это позволяет использовать и старый код, и новый асинхронный send_request одновременно */ |
|
if (seq_id == 0 || msg_type == ETCPMON_RSP_CONN_LIST || |
|
msg_type == ETCPMON_RSP_METRICS || msg_type == ETCPMON_RSP_ERROR) { |
|
handle_response(client, msg_type, payload, payload_size); |
|
} |
|
} |
|
|
|
static void metrics_auto_callback(void* arg, uint8_t msg_type, const uint8_t* payload, uint16_t payload_size) { |
|
(void)arg; |
|
(void)msg_type; |
|
(void)payload; |
|
(void)payload_size; |
|
/* Ничего не делаем — legacy on_metrics вызывается автоматически ниже */ |
|
} |
|
|
|
int etcpmon_client_process(struct etcpmon_client* client) { |
|
if (!client || client->sock == INVALID_SOCKET) return -1; |
|
|
|
// Проверка завершения non-blocking connect |
|
if (client->connected && !client->fully_connected) { |
|
int error = 0; |
|
int len = sizeof(error); |
|
if (getsockopt(client->sock, SOL_SOCKET, SO_ERROR, (char*)&error, &len) == 0) { |
|
if (error == 0) { |
|
// Connect успешно завершён |
|
client->fully_connected = 1; |
|
if (client->log_file) { |
|
fprintf(client->log_file, "%llu: [LOG] Connection established\n", |
|
(unsigned long long)get_timestamp_ms()); |
|
fflush(client->log_file); |
|
} |
|
if (client->on_connected) { |
|
client->on_connected(client->user_data); // Здесь вызов request_list |
|
} |
|
return 0;// выходим, не лезем в recv/send в этом же тике |
|
// Продолжаем к recv, т.к. подключены |
|
} else if (error == WSAEINPROGRESS || error == WSAEALREADY) { |
|
// Подключение ещё в процессе, ничего не делаем, ждём следующий вызов process |
|
if (client->log_file) { |
|
fprintf(client->log_file, "%llu: [LOG] Connect in progress (polling)...\n", |
|
(unsigned long long)get_timestamp_ms()); |
|
fflush(client->log_file); |
|
} |
|
return 0; // Важно: выходим, чтобы не идти в recv (оно провалится с ошибкой) |
|
} else { |
|
// Подключение провалилось по другой причине |
|
if (client->log_file) { |
|
fprintf(client->log_file, "%llu: [ERROR] Connect failed: %d\n", |
|
(unsigned long long)get_timestamp_ms(), error); |
|
fflush(client->log_file); |
|
} |
|
etcpmon_client_disconnect(client); |
|
if (client->on_disconnected) { |
|
client->on_disconnected(client->user_data); |
|
} |
|
if (client->on_error) { |
|
char msg[256]; |
|
snprintf(msg, sizeof(msg), "Connect failed: %d", error); |
|
client->on_error(msg, client->user_data); |
|
} |
|
return -1; |
|
} |
|
} else { |
|
// getsockopt провалился (редко, но на всякий случай) |
|
if (client->log_file) { |
|
fprintf(client->log_file, "%llu: [ERROR] getsockopt failed: %d\n", |
|
(unsigned long long)get_timestamp_ms(), WSAGetLastError()); |
|
fflush(client->log_file); |
|
} |
|
etcpmon_client_disconnect(client); |
|
if (client->on_disconnected) { |
|
client->on_disconnected(client->user_data); |
|
} |
|
return -1; |
|
} |
|
} |
|
|
|
/* === Авто-перезапрос метрик каждые 100 мс (новый асинхронный способ) === */ |
|
if (client->fully_connected && client->selected_peer_id != 0 && client->sock != INVALID_SOCKET) { |
|
uint64_t now = get_timestamp_ms(); |
|
if (now - client->last_metrics_request_ms >= ETCPMON_UPDATE_INTERVAL_MS) { |
|
/* Используем новый асинхронный путь с коллбэком */ |
|
etcpmon_client_send_request(client, |
|
ETCPMON_CMD_GET_METRICS, |
|
NULL, 0, |
|
metrics_auto_callback, |
|
NULL); |
|
|
|
client->last_metrics_request_ms = now; |
|
|
|
if (client->log_file) { |
|
fprintf(client->log_file, "%llu: [LOG] Auto-request METRICS (async, seq=%d)\n", |
|
(unsigned long long)now, client->next_seq_id - 1); |
|
fflush(client->log_file); |
|
} |
|
} |
|
} |
|
|
|
// Если подключение в процессе, мы уже вышли выше. Здесь recv только для установленного соединения |
|
uint8_t* buf = client->recv_buffer + client->recv_len; |
|
int buf_space = ETCPMON_MAX_MSG_SIZE - client->recv_len; |
|
|
|
if (buf_space <= 0) { |
|
client->recv_len = 0; |
|
buf = client->recv_buffer; |
|
buf_space = ETCPMON_MAX_MSG_SIZE; |
|
} |
|
|
|
int received = recv(client->sock, (char*)buf, buf_space, 0); |
|
|
|
if (received == SOCKET_ERROR) { |
|
int err = WSAGetLastError(); |
|
if (err == WSAEWOULDBLOCK) { |
|
return 0; /* No data available */ |
|
} |
|
/* Connection error */ |
|
if (client->log_file) { |
|
fprintf(client->log_file, "%llu: [ERROR] recv failed: %d\n", |
|
(unsigned long long)get_timestamp_ms(), err); |
|
fflush(client->log_file); |
|
} |
|
etcpmon_client_disconnect(client); |
|
if (client->on_disconnected) { |
|
client->on_disconnected(client->user_data); |
|
} |
|
return -1; |
|
} |
|
|
|
if (received == 0) { |
|
/* Connection closed by server */ |
|
if (client->log_file) { |
|
fprintf(client->log_file, "%llu: [LOG] Connection closed by server (recv returned 0)\n", |
|
(unsigned long long)get_timestamp_ms()); |
|
fflush(client->log_file); |
|
} |
|
etcpmon_client_disconnect(client); |
|
if (client->on_disconnected) { |
|
client->on_disconnected(client->user_data); |
|
} |
|
return -1; |
|
} |
|
|
|
/* Log received data */ |
|
if (client->log_file) { |
|
log_hex_data(client->log_file, "RX", buf, received); |
|
} |
|
|
|
client->recv_len += received; |
|
|
|
/* Process complete messages */ |
|
int processed = 0; |
|
while (client->recv_len >= sizeof(struct etcpmon_msg_header)) { |
|
struct etcpmon_msg_header* hdr = (struct etcpmon_msg_header*)client->recv_buffer; |
|
|
|
if (etcpmon_validate_header(hdr) != 0) { |
|
/* Invalid header, skip byte */ |
|
if (client->log_file) { |
|
fprintf(client->log_file, "%llu: [ERROR] Invalid header, skipping byte\n", |
|
(unsigned long long)get_timestamp_ms()); |
|
fflush(client->log_file); |
|
} |
|
memmove(client->recv_buffer, client->recv_buffer + 1, client->recv_len - 1); |
|
client->recv_len--; |
|
continue; |
|
} |
|
|
|
if (client->recv_len < hdr->size) { |
|
break; /* Wait for more data */ |
|
} |
|
|
|
/* Handle message */ |
|
uint8_t* payload = client->recv_buffer + sizeof(*hdr); |
|
uint16_t payload_size = hdr->size - sizeof(*hdr); |
|
|
|
/* Store seq_id for matching with pending requests */ |
|
client->last_response_seq_id = hdr->seq_id; |
|
|
|
if (client->log_file) { |
|
fprintf(client->log_file, "%llu: [LOG] Received message type=0x%02X size=%d seq=%d\n", |
|
(unsigned long long)get_timestamp_ms(), hdr->type, hdr->size, hdr->seq_id); |
|
fflush(client->log_file); |
|
} |
|
|
|
/* Use callback-based handler */ |
|
handle_response_with_callback(client, hdr->type, payload, payload_size); |
|
processed++; |
|
|
|
/* Remove processed message */ |
|
if (client->recv_len > hdr->size) { |
|
memmove(client->recv_buffer, client->recv_buffer + hdr->size, |
|
client->recv_len - hdr->size); |
|
} |
|
client->recv_len -= hdr->size; |
|
} |
|
|
|
return processed; |
|
} |
|
|
|
void etcpmon_client_set_callbacks(struct etcpmon_client* client, |
|
void (*on_connected)(void*), |
|
void (*on_disconnected)(void*), |
|
void (*on_conn_list)(struct etcpmon_conn_info*, uint8_t, void*), |
|
void (*on_metrics)(struct etcpmon_rsp_metrics*, |
|
struct etcpmon_link_metrics*, |
|
uint8_t, void*), |
|
void (*on_error)(const char*, void*), |
|
void* user_data) { |
|
if (!client) return; |
|
|
|
client->on_connected = on_connected; |
|
client->on_disconnected = on_disconnected; |
|
client->on_conn_list = on_conn_list; |
|
client->on_metrics = on_metrics; |
|
client->on_error = on_error; |
|
client->user_data = user_data; |
|
} |
|
|
|
const char* etcpmon_client_get_conn_name(struct etcpmon_client* client, uint64_t peer_id) { |
|
if (!client || !client->conn_list) return NULL; |
|
|
|
for (uint8_t i = 0; i < client->conn_count; i++) { |
|
if (client->conn_list[i].peer_node_id == peer_id) { |
|
return client->conn_list[i].name; |
|
} |
|
} |
|
return NULL; |
|
} |
|
|
|
struct metrics_history* etcpmon_client_get_history(struct etcpmon_client* client) { |
|
if (!client) return NULL; |
|
return &client->history; |
|
} |
|
|
|
void etcpmon_client_add_to_history(struct etcpmon_client* client, struct etcpmon_rsp_metrics* metrics) { |
|
if (!client || !metrics) return; |
|
|
|
struct metrics_history* h = &client->history; |
|
int idx = h->head; |
|
// for (int m = 0; m < GRAPH_METRICS_COUNT; m++) { |
|
// float v = h->values[m][idx]; |
|
// if (v < h->min_val[m]) h->min_val[m] = v; |
|
// if (v > h->max_val[m]) h->max_val[m] = v; |
|
// } |
|
|
|
if (client->log_file) { |
|
fprintf(client->log_file, "add_to_history: rtt_last=%u rtt_avg10=%u rtt_avg100=%u jitter=%u\n", |
|
metrics->etcp.rtt_last, metrics->etcp.rtt_avg_10, metrics->etcp.rtt_avg_100, metrics->etcp.jitter); |
|
fflush(client->log_file); |
|
} |
|
|
|
float scale = 0.1f; // to ms (protocol: 0.1ms units) |
|
h->values[GRAPH_METRIC_RTT_LAST][idx] = (float)metrics->etcp.rtt_last * scale; |
|
h->values[GRAPH_METRIC_RTT_AVG10][idx] = (float)metrics->etcp.rtt_avg_10 * scale; |
|
h->values[GRAPH_METRIC_RTT_AVG100][idx] = (float)metrics->etcp.rtt_avg_100 * scale; |
|
h->values[GRAPH_METRIC_JITTER][idx] = (float)metrics->etcp.jitter * scale; |
|
|
|
uint32_t retrans_delta = 0; |
|
uint32_t acks_delta = 0; |
|
uint64_t bytes_delta = 0; |
|
|
|
if (h->count > 0) { |
|
if (metrics->etcp.retrans_count >= h->last_retrans) { |
|
retrans_delta = metrics->etcp.retrans_count - h->last_retrans; |
|
} |
|
if (metrics->etcp.ack_count >= h->last_acks) { |
|
acks_delta = metrics->etcp.ack_count - h->last_acks; |
|
} |
|
if (metrics->etcp.bytes_sent_total >= h->last_bytes_sent) { |
|
bytes_delta = metrics->etcp.bytes_sent_total - h->last_bytes_sent; |
|
} |
|
} |
|
|
|
h->last_retrans = metrics->etcp.retrans_count; |
|
h->last_acks = metrics->etcp.ack_count; |
|
h->last_bytes_sent = metrics->etcp.bytes_sent_total; |
|
|
|
h->values[GRAPH_METRIC_RETRANS][idx] = (float)retrans_delta; |
|
h->values[GRAPH_METRIC_ACKS][idx] = (float)acks_delta; |
|
h->values[GRAPH_METRIC_INFLIGHT][idx] = (float)metrics->etcp.unacked_bytes; |
|
h->values[GRAPH_METRIC_BYTES_SENT][idx] = (float)bytes_delta; |
|
|
|
h->head = (h->head + 1) % GRAPH_HISTORY_SIZE; |
|
if (h->count < GRAPH_HISTORY_SIZE) { |
|
h->count++; |
|
} |
|
|
|
if (client->log_file) { |
|
fprintf(client->log_file, "history updated: count=%d head=%d\n", h->count, h->head); |
|
fflush(client->log_file); |
|
} |
|
}
|
|
|