|
|
|
|
@ -47,12 +47,16 @@ void etcpmon_client_init(struct etcpmon_client* client) {
|
|
|
|
|
client->connected = 0; |
|
|
|
|
client->fully_connected = 0; |
|
|
|
|
client->log_file = NULL; |
|
|
|
|
client->next_seq_id = 1; /* Start from 1, 0 = broadcast */ |
|
|
|
|
memset(&client->history, 0, sizeof(client->history)); |
|
|
|
|
for (int i = 0; i < GRAPH_METRICS_COUNT; i++) { |
|
|
|
|
for (int i = 0; i < GRAPH_METRIC_COUNT; i++) { |
|
|
|
|
client->history.min_val[i] = 1e9f; |
|
|
|
|
client->history.max_val[i] = -1e9f; |
|
|
|
|
} |
|
|
|
|
client->history.last_minmax_update = 0; // ← добавь
|
|
|
|
|
client->history.last_minmax_update = 0; |
|
|
|
|
|
|
|
|
|
/* Новый таймер авто-запроса метрик */ |
|
|
|
|
client->last_metrics_request_ms = 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
void etcpmon_client_cleanup(struct etcpmon_client* client) { |
|
|
|
|
@ -60,6 +64,16 @@ void etcpmon_client_cleanup(struct etcpmon_client* client) {
|
|
|
|
|
|
|
|
|
|
etcpmon_client_disconnect(client); |
|
|
|
|
|
|
|
|
|
/* Free pending requests */ |
|
|
|
|
struct etcpmon_pending_request* req = client->pending_head; |
|
|
|
|
while (req) { |
|
|
|
|
struct etcpmon_pending_request* next = req->next; |
|
|
|
|
free(req); |
|
|
|
|
req = next; |
|
|
|
|
} |
|
|
|
|
client->pending_head = NULL; |
|
|
|
|
client->pending_tail = NULL; |
|
|
|
|
|
|
|
|
|
/* Close log file */ |
|
|
|
|
if (client->log_file) { |
|
|
|
|
fprintf(client->log_file, "%llu: [LOG] Log closed\n",
|
|
|
|
|
@ -183,7 +197,7 @@ void etcpmon_client_disconnect(struct etcpmon_client* client) {
|
|
|
|
|
/* Send disconnect command */ |
|
|
|
|
if (client->connected) { |
|
|
|
|
struct etcpmon_msg_header hdr; |
|
|
|
|
etcpmon_build_header(&hdr, 0, ETCPMON_CMD_DISCONNECT); |
|
|
|
|
etcpmon_build_header(&hdr, 0, ETCPMON_CMD_DISCONNECT, 0); |
|
|
|
|
|
|
|
|
|
if (client->log_file) { |
|
|
|
|
log_hex_data(client->log_file, "TX", (const uint8_t*)&hdr, sizeof(hdr)); |
|
|
|
|
@ -218,11 +232,11 @@ int etcpmon_client_request_list(struct etcpmon_client* client) {
|
|
|
|
|
if (!client || client->sock == INVALID_SOCKET) return -1; |
|
|
|
|
|
|
|
|
|
struct etcpmon_msg_header hdr; |
|
|
|
|
etcpmon_build_header(&hdr, 0, ETCPMON_CMD_LIST_CONN); |
|
|
|
|
etcpmon_build_header(&hdr, 0, ETCPMON_CMD_LIST_CONN, 0); /* seq=0 = legacy режим */ |
|
|
|
|
|
|
|
|
|
if (client->log_file) { |
|
|
|
|
log_hex_data(client->log_file, "TX", (const uint8_t*)&hdr, sizeof(hdr)); |
|
|
|
|
fprintf(client->log_file, "%llu: [LOG] CMD_LIST_CONN\n",
|
|
|
|
|
fprintf(client->log_file, "%llu: [LOG] CMD_LIST_CONN (seq=0)\n",
|
|
|
|
|
(unsigned long long)get_timestamp_ms()); |
|
|
|
|
fflush(client->log_file); |
|
|
|
|
} |
|
|
|
|
@ -243,11 +257,13 @@ int etcpmon_client_request_list(struct etcpmon_client* client) {
|
|
|
|
|
int etcpmon_client_select_connection(struct etcpmon_client* client, uint64_t peer_node_id) { |
|
|
|
|
if (!client || client->sock == INVALID_SOCKET) return -1; |
|
|
|
|
|
|
|
|
|
/* SELECT is special - it's not a request/response, it just sets state
|
|
|
|
|
* So we send with seq_id=0 (broadcast) */ |
|
|
|
|
uint8_t buffer[sizeof(struct etcpmon_msg_header) + sizeof(struct etcpmon_cmd_select)]; |
|
|
|
|
struct etcpmon_msg_header* hdr = (struct etcpmon_msg_header*)buffer; |
|
|
|
|
struct etcpmon_cmd_select* cmd = (struct etcpmon_cmd_select*)(buffer + sizeof(*hdr)); |
|
|
|
|
|
|
|
|
|
etcpmon_build_header(hdr, sizeof(*cmd), ETCPMON_CMD_SELECT_CONN); |
|
|
|
|
etcpmon_build_header(hdr, sizeof(*cmd), ETCPMON_CMD_SELECT_CONN, 0); /* seq_id=0 for SELECT */ |
|
|
|
|
cmd->peer_node_id = peer_node_id; |
|
|
|
|
|
|
|
|
|
if (client->log_file) { |
|
|
|
|
@ -275,7 +291,7 @@ int etcpmon_client_request_metrics(struct etcpmon_client* client) {
|
|
|
|
|
if (!client || client->sock == INVALID_SOCKET) return -1; |
|
|
|
|
|
|
|
|
|
struct etcpmon_msg_header hdr; |
|
|
|
|
etcpmon_build_header(&hdr, 0, ETCPMON_CMD_GET_METRICS); |
|
|
|
|
etcpmon_build_header(&hdr, 0, ETCPMON_CMD_GET_METRICS, 0); |
|
|
|
|
|
|
|
|
|
if (client->log_file) { |
|
|
|
|
log_hex_data(client->log_file, "TX", (const uint8_t*)&hdr, sizeof(hdr)); |
|
|
|
|
@ -297,6 +313,82 @@ int etcpmon_client_request_metrics(struct etcpmon_client* client) {
|
|
|
|
|
return 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Send request with callback - queued version */ |
|
|
|
|
int etcpmon_client_send_request(struct etcpmon_client* client, |
|
|
|
|
uint8_t type, |
|
|
|
|
const void* payload, uint16_t payload_size, |
|
|
|
|
void (*callback)(void* arg, uint8_t msg_type, const uint8_t* payload, uint16_t payload_size), |
|
|
|
|
void* arg) { |
|
|
|
|
if (!client || client->sock == INVALID_SOCKET || !callback) return -1; |
|
|
|
|
|
|
|
|
|
/* Allocate pending request */ |
|
|
|
|
struct etcpmon_pending_request* req = (struct etcpmon_pending_request*)malloc(sizeof(*req)); |
|
|
|
|
if (!req) { |
|
|
|
|
if (client->log_file) { |
|
|
|
|
fprintf(client->log_file, "%llu: [ERROR] Failed to allocate pending request\n", |
|
|
|
|
(unsigned long long)get_timestamp_ms()); |
|
|
|
|
fflush(client->log_file); |
|
|
|
|
} |
|
|
|
|
return -1; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Assign sequence ID */ |
|
|
|
|
req->seq_id = client->next_seq_id++; |
|
|
|
|
if (client->next_seq_id == 0) client->next_seq_id = 1; /* Skip 0 (broadcast) */ |
|
|
|
|
req->callback = callback; |
|
|
|
|
req->arg = arg; |
|
|
|
|
req->next = NULL; |
|
|
|
|
|
|
|
|
|
/* Add to queue */ |
|
|
|
|
if (client->pending_tail) { |
|
|
|
|
client->pending_tail->next = req; |
|
|
|
|
} else { |
|
|
|
|
client->pending_head = req; |
|
|
|
|
} |
|
|
|
|
client->pending_tail = req; |
|
|
|
|
|
|
|
|
|
/* Build message with seq_id */ |
|
|
|
|
uint8_t buffer[sizeof(struct etcpmon_msg_header) + ETCPMON_MAX_MSG_SIZE]; |
|
|
|
|
if (payload_size > ETCPMON_MAX_MSG_SIZE) { |
|
|
|
|
free(req); |
|
|
|
|
return -1; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
struct etcpmon_msg_header* hdr = (struct etcpmon_msg_header*)buffer; |
|
|
|
|
etcpmon_build_header(hdr, payload_size, type, req->seq_id); |
|
|
|
|
|
|
|
|
|
if (payload && payload_size > 0) { |
|
|
|
|
memcpy(buffer + sizeof(*hdr), payload, payload_size); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
uint16_t total_size = sizeof(*hdr) + payload_size; |
|
|
|
|
|
|
|
|
|
if (client->log_file) { |
|
|
|
|
log_hex_data(client->log_file, "TX", buffer, total_size); |
|
|
|
|
fprintf(client->log_file, "%llu: [LOG] CMD 0x%02X seq=%d queued\n", |
|
|
|
|
(unsigned long long)get_timestamp_ms(), type, req->seq_id); |
|
|
|
|
fflush(client->log_file); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int sent = send(client->sock, (const char*)buffer, total_size, 0); |
|
|
|
|
if (sent != total_size) { |
|
|
|
|
/* Remove from queue on error */ |
|
|
|
|
if (client->pending_head == req) { |
|
|
|
|
client->pending_head = req->next; |
|
|
|
|
if (!client->pending_head) client->pending_tail = NULL; |
|
|
|
|
} |
|
|
|
|
free(req); |
|
|
|
|
if (client->log_file) { |
|
|
|
|
fprintf(client->log_file, "%llu: [ERROR] Failed to send request: %d\n", |
|
|
|
|
(unsigned long long)get_timestamp_ms(), WSAGetLastError()); |
|
|
|
|
fflush(client->log_file); |
|
|
|
|
} |
|
|
|
|
return -1; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return 0; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void handle_response(struct etcpmon_client* client, uint8_t msg_type,
|
|
|
|
|
const uint8_t* payload, uint16_t payload_size) { |
|
|
|
|
switch (msg_type) { |
|
|
|
|
@ -400,6 +492,80 @@ static void handle_response(struct etcpmon_client* client, uint8_t msg_type,
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Find pending request by seq_id */ |
|
|
|
|
static struct etcpmon_pending_request* find_pending_request(struct etcpmon_client* client, uint8_t seq_id) { |
|
|
|
|
struct etcpmon_pending_request* req = client->pending_head; |
|
|
|
|
while (req) { |
|
|
|
|
if (req->seq_id == seq_id) { |
|
|
|
|
return req; |
|
|
|
|
} |
|
|
|
|
req = req->next; |
|
|
|
|
} |
|
|
|
|
return NULL; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Remove pending request from queue */ |
|
|
|
|
static void remove_pending_request(struct etcpmon_client* client, struct etcpmon_pending_request* req) { |
|
|
|
|
struct etcpmon_pending_request** curr = &client->pending_head; |
|
|
|
|
while (*curr) { |
|
|
|
|
if (*curr == req) { |
|
|
|
|
*curr = req->next; |
|
|
|
|
if (client->pending_tail == req) { |
|
|
|
|
client->pending_tail = *curr; |
|
|
|
|
} |
|
|
|
|
free(req); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
curr = &(*curr)->next; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* Handle response - find matching request and call callback */ |
|
|
|
|
static void handle_response_with_callback(struct etcpmon_client* client, uint8_t msg_type, |
|
|
|
|
const uint8_t* payload, uint16_t payload_size) { |
|
|
|
|
uint8_t seq_id = client->last_response_seq_id; |
|
|
|
|
|
|
|
|
|
/* Find pending request */ |
|
|
|
|
struct etcpmon_pending_request* req = find_pending_request(client, seq_id); |
|
|
|
|
|
|
|
|
|
if (req) { |
|
|
|
|
/* Call the callback */ |
|
|
|
|
if (req->callback) { |
|
|
|
|
req->callback(req->arg, msg_type, payload, payload_size); |
|
|
|
|
} |
|
|
|
|
/* Remove from queue */ |
|
|
|
|
remove_pending_request(client, req); |
|
|
|
|
|
|
|
|
|
if (client->log_file) { |
|
|
|
|
fprintf(client->log_file, "%llu: [LOG] Response handled, seq=%d removed from queue\n", |
|
|
|
|
(unsigned long long)get_timestamp_ms(), seq_id); |
|
|
|
|
fflush(client->log_file); |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
/* No pending request - maybe broadcast/unsolicited or stale */ |
|
|
|
|
if (client->log_file) { |
|
|
|
|
fprintf(client->log_file, "%llu: [LOG] Response seq=%d - no pending request (broadcast?)\n", |
|
|
|
|
(unsigned long long)get_timestamp_ms(), seq_id); |
|
|
|
|
fflush(client->log_file); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* === НОВОЕ: ВСЕГДА вызываем legacy обработчики (on_conn_list / on_metrics / on_error) ===
|
|
|
|
|
* Это позволяет использовать и старый код, и новый асинхронный send_request одновременно */ |
|
|
|
|
if (seq_id == 0 || msg_type == ETCPMON_RSP_CONN_LIST ||
|
|
|
|
|
msg_type == ETCPMON_RSP_METRICS || msg_type == ETCPMON_RSP_ERROR) { |
|
|
|
|
handle_response(client, msg_type, payload, payload_size); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
static void metrics_auto_callback(void* arg, uint8_t msg_type, const uint8_t* payload, uint16_t payload_size) { |
|
|
|
|
(void)arg; |
|
|
|
|
(void)msg_type; |
|
|
|
|
(void)payload; |
|
|
|
|
(void)payload_size; |
|
|
|
|
/* Ничего не делаем — legacy on_metrics вызывается автоматически ниже */ |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
int etcpmon_client_process(struct etcpmon_client* client) { |
|
|
|
|
if (!client || client->sock == INVALID_SOCKET) return -1; |
|
|
|
|
|
|
|
|
|
@ -419,6 +585,7 @@ int etcpmon_client_process(struct etcpmon_client* client) {
|
|
|
|
|
if (client->on_connected) { |
|
|
|
|
client->on_connected(client->user_data); // Здесь вызов request_list
|
|
|
|
|
} |
|
|
|
|
return 0;// выходим, не лезем в recv/send в этом же тике
|
|
|
|
|
// Продолжаем к recv, т.к. подключены
|
|
|
|
|
} else if (error == WSAEINPROGRESS || error == WSAEALREADY) { |
|
|
|
|
// Подключение ещё в процессе, ничего не делаем, ждём следующий вызов process
|
|
|
|
|
@ -461,6 +628,27 @@ int etcpmon_client_process(struct etcpmon_client* client) {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/* === Авто-перезапрос метрик каждые 100 мс (новый асинхронный способ) === */ |
|
|
|
|
if (client->fully_connected && client->selected_peer_id != 0 && client->sock != INVALID_SOCKET) { |
|
|
|
|
uint64_t now = get_timestamp_ms(); |
|
|
|
|
if (now - client->last_metrics_request_ms >= ETCPMON_UPDATE_INTERVAL_MS) { |
|
|
|
|
/* Используем новый асинхронный путь с коллбэком */ |
|
|
|
|
etcpmon_client_send_request(client, |
|
|
|
|
ETCPMON_CMD_GET_METRICS, |
|
|
|
|
NULL, 0, |
|
|
|
|
metrics_auto_callback, |
|
|
|
|
NULL); |
|
|
|
|
|
|
|
|
|
client->last_metrics_request_ms = now; |
|
|
|
|
|
|
|
|
|
if (client->log_file) { |
|
|
|
|
fprintf(client->log_file, "%llu: [LOG] Auto-request METRICS (async, seq=%d)\n", |
|
|
|
|
(unsigned long long)now, client->next_seq_id - 1); |
|
|
|
|
fflush(client->log_file); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Если подключение в процессе, мы уже вышли выше. Здесь recv только для установленного соединения
|
|
|
|
|
uint8_t* buf = client->recv_buffer + client->recv_len; |
|
|
|
|
int buf_space = ETCPMON_MAX_MSG_SIZE - client->recv_len; |
|
|
|
|
@ -537,13 +725,17 @@ int etcpmon_client_process(struct etcpmon_client* client) {
|
|
|
|
|
uint8_t* payload = client->recv_buffer + sizeof(*hdr); |
|
|
|
|
uint16_t payload_size = hdr->size - sizeof(*hdr); |
|
|
|
|
|
|
|
|
|
/* Store seq_id for matching with pending requests */ |
|
|
|
|
client->last_response_seq_id = hdr->seq_id; |
|
|
|
|
|
|
|
|
|
if (client->log_file) { |
|
|
|
|
fprintf(client->log_file, "%llu: [LOG] Received message type=0x%02X size=%d\n",
|
|
|
|
|
(unsigned long long)get_timestamp_ms(), hdr->type, hdr->size); |
|
|
|
|
fprintf(client->log_file, "%llu: [LOG] Received message type=0x%02X size=%d seq=%d\n",
|
|
|
|
|
(unsigned long long)get_timestamp_ms(), hdr->type, hdr->size, hdr->seq_id); |
|
|
|
|
fflush(client->log_file); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
handle_response(client, hdr->type, payload, payload_size); |
|
|
|
|
/* Use callback-based handler */ |
|
|
|
|
handle_response_with_callback(client, hdr->type, payload, payload_size); |
|
|
|
|
processed++; |
|
|
|
|
|
|
|
|
|
/* Remove processed message */ |
|
|
|
|
|