You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

652 lines
25 KiB

/*
* etcpmon_client.c - ETCP Monitor Client Network Implementation
*/
#include "etcpmon_client.h"
#include "etcpmon_graph.h"
#include <stdio.h>
#include <string.h>
#include <ws2tcpip.h>
#include <windows.h>
//#pragma comment(lib, "ws2_32.lib")
/* Log file name */
#define LOG_FILENAME "etcpmon.log"
/* Get current timestamp in milliseconds (Unix epoch) */
uint64_t get_timestamp_ms(void) {
FILETIME ft;
GetSystemTimeAsFileTime(&ft);
/* Convert FILETIME (100-nanosecond intervals since 1601) to milliseconds */
uint64_t time = ((uint64_t)ft.dwHighDateTime << 32) | ft.dwLowDateTime;
/* Convert from 1601 to 1970 epoch (11644473600000 milliseconds) */
return (time / 10000) - 11644473600000ULL;
}
/* Log hex data to file */
static void log_hex_data(FILE* log, const char* prefix, const uint8_t* data, size_t len) {
if (!log || !data || len == 0) return;
fprintf(log, "%llu: [%s] ", (unsigned long long)get_timestamp_ms(), prefix);
for (size_t i = 0; i < len && i < 256; i++) { /* Limit to 256 bytes per line */
fprintf(log, "%02X ", data[i]);
}
if (len > 256) {
fprintf(log, "... (%llu bytes total)", (unsigned long long)len);
}
fprintf(log, "\n");
fflush(log);
}
void etcpmon_client_init(struct etcpmon_client* client) {
if (!client) return;
memset(client, 0, sizeof(*client));
client->sock = INVALID_SOCKET;
client->connected = 0;
client->fully_connected = 0;
client->log_file = NULL;
memset(&client->history, 0, sizeof(client->history));
for (int i = 0; i < GRAPH_METRICS_COUNT; i++) {
client->history.min_val[i] = 1e9f;
client->history.max_val[i] = -1e9f;
}
client->history.last_minmax_update = 0; // ← добавь
}
void etcpmon_client_cleanup(struct etcpmon_client* client) {
if (!client) return;
etcpmon_client_disconnect(client);
/* Close log file */
if (client->log_file) {
fprintf(client->log_file, "%llu: [LOG] Log closed\n",
(unsigned long long)get_timestamp_ms());
fclose(client->log_file);
client->log_file = NULL;
}
/* Free cached data */
if (client->conn_list) {
free(client->conn_list);
client->conn_list = NULL;
}
if (client->last_metrics) {
free(client->last_metrics);
client->last_metrics = NULL;
}
if (client->last_links) {
free(client->last_links);
client->last_links = NULL;
}
}
int etcpmon_client_connect(struct etcpmon_client* client, const char* addr, uint16_t port) {
if (!client || !addr) return -1;
if (client->connected) {
etcpmon_client_disconnect(client);
}
/* Open log file (truncate on start) */
if (!client->log_file) {
client->log_file = fopen(LOG_FILENAME, "w");
if (client->log_file) {
fprintf(client->log_file, "%llu: [LOG] ETCP Monitor communication log started\n",
(unsigned long long)get_timestamp_ms());
fprintf(client->log_file, "%llu: [LOG] Connecting to %s:%d\n",
(unsigned long long)get_timestamp_ms(), addr, port);
fflush(client->log_file);
}
}
/* Create socket */
client->sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (client->sock == INVALID_SOCKET) {
if (client->log_file) {
fprintf(client->log_file, "%llu: [ERROR] Failed to create socket\n",
(unsigned long long)get_timestamp_ms());
fflush(client->log_file);
}
if (client->on_error) {
client->on_error("Failed to create socket", client->user_data);
}
return -1;
}
/* Set non-blocking mode */
u_long nonblock = 1;
ioctlsocket(client->sock, FIONBIO, &nonblock);
/* Setup address */
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
if (inet_pton(AF_INET, addr, &server_addr.sin_addr) != 1) {
if (client->log_file) {
fprintf(client->log_file, "%llu: [ERROR] Invalid address: %s\n",
(unsigned long long)get_timestamp_ms(), addr);
fflush(client->log_file);
}
closesocket(client->sock);
client->sock = INVALID_SOCKET;
if (client->on_error) {
client->on_error("Invalid address", client->user_data);
}
return -1;
}
/* Connect (non-blocking) */
int result = connect(client->sock, (struct sockaddr*)&server_addr, sizeof(server_addr));
if (result == SOCKET_ERROR) {
int err = WSAGetLastError();
if (err != WSAEWOULDBLOCK && err != WSAEINPROGRESS) {
if (client->log_file) {
fprintf(client->log_file, "%llu: [ERROR] Connection failed: %d\n",
(unsigned long long)get_timestamp_ms(), err);
fflush(client->log_file);
}
closesocket(client->sock);
client->sock = INVALID_SOCKET;
if (client->on_error) {
char msg[256];
snprintf(msg, sizeof(msg), "Connection failed: %d", err);
client->on_error(msg, client->user_data);
}
return -1;
}
/* Connection in progress, will complete asynchronously */
}
strncpy(client->server_addr, addr, sizeof(client->server_addr) - 1);
client->server_port = port;
client->connected = 1;
client->fully_connected = 0;
if (client->log_file) {
fprintf(client->log_file, "%llu: [LOG] Socket created, connection in progress\n",
(unsigned long long)get_timestamp_ms());
fflush(client->log_file);
}
return 0;
}
void etcpmon_client_disconnect(struct etcpmon_client* client) {
if (!client) return;
if (client->sock != INVALID_SOCKET) {
/* Send disconnect command */
if (client->connected) {
struct etcpmon_msg_header hdr;
etcpmon_build_header(&hdr, 0, ETCPMON_CMD_DISCONNECT);
if (client->log_file) {
log_hex_data(client->log_file, "TX", (const uint8_t*)&hdr, sizeof(hdr));
fprintf(client->log_file, "%llu: [LOG] Sending CMD_DISCONNECT\n",
(unsigned long long)get_timestamp_ms());
}
send(client->sock, (const char*)&hdr, sizeof(hdr), 0);
}
if (client->log_file) {
fprintf(client->log_file, "%llu: [LOG] Connection closed\n",
(unsigned long long)get_timestamp_ms());
fflush(client->log_file);
}
closesocket(client->sock);
client->sock = INVALID_SOCKET;
}
client->connected = 0;
client->fully_connected = 0;
client->recv_len = 0;
client->selected_peer_id = 0;
}
int etcpmon_client_is_connected(const struct etcpmon_client* client) {
return client ? client->connected : 0;
}
int etcpmon_client_request_list(struct etcpmon_client* client) {
if (!client || client->sock == INVALID_SOCKET) return -1;
struct etcpmon_msg_header hdr;
etcpmon_build_header(&hdr, 0, ETCPMON_CMD_LIST_CONN);
if (client->log_file) {
log_hex_data(client->log_file, "TX", (const uint8_t*)&hdr, sizeof(hdr));
fprintf(client->log_file, "%llu: [LOG] CMD_LIST_CONN\n",
(unsigned long long)get_timestamp_ms());
fflush(client->log_file);
}
int sent = send(client->sock, (const char*)&hdr, sizeof(hdr), 0);
if (sent != sizeof(hdr)) {
if (client->log_file) {
fprintf(client->log_file, "%llu: [ERROR] Failed to send CMD_LIST_CONN: %d\n",
(unsigned long long)get_timestamp_ms(), WSAGetLastError());
fflush(client->log_file);
}
return -1;
}
return 0;
}
int etcpmon_client_select_connection(struct etcpmon_client* client, uint64_t peer_node_id) {
if (!client || client->sock == INVALID_SOCKET) return -1;
uint8_t buffer[sizeof(struct etcpmon_msg_header) + sizeof(struct etcpmon_cmd_select)];
struct etcpmon_msg_header* hdr = (struct etcpmon_msg_header*)buffer;
struct etcpmon_cmd_select* cmd = (struct etcpmon_cmd_select*)(buffer + sizeof(*hdr));
etcpmon_build_header(hdr, sizeof(*cmd), ETCPMON_CMD_SELECT_CONN);
cmd->peer_node_id = peer_node_id;
if (client->log_file) {
log_hex_data(client->log_file, "TX", buffer, sizeof(buffer));
fprintf(client->log_file, "%llu: [LOG] CMD_SELECT_CONN peer_id=%016llX\n",
(unsigned long long)get_timestamp_ms(), (unsigned long long)peer_node_id);
fflush(client->log_file);
}
int sent = send(client->sock, (const char*)buffer, sizeof(buffer), 0);
if (sent != sizeof(buffer)) {
if (client->log_file) {
fprintf(client->log_file, "%llu: [ERROR] Failed to send CMD_SELECT_CONN: %d\n",
(unsigned long long)get_timestamp_ms(), WSAGetLastError());
fflush(client->log_file);
}
return -1;
}
client->selected_peer_id = peer_node_id;
return 0;
}
int etcpmon_client_request_metrics(struct etcpmon_client* client) {
if (!client || client->sock == INVALID_SOCKET) return -1;
struct etcpmon_msg_header hdr;
etcpmon_build_header(&hdr, 0, ETCPMON_CMD_GET_METRICS);
if (client->log_file) {
log_hex_data(client->log_file, "TX", (const uint8_t*)&hdr, sizeof(hdr));
fprintf(client->log_file, "%llu: [LOG] CMD_GET_METRICS\n",
(unsigned long long)get_timestamp_ms());
fflush(client->log_file);
}
int sent = send(client->sock, (const char*)&hdr, sizeof(hdr), 0);
if (sent != sizeof(hdr)) {
if (client->log_file) {
fprintf(client->log_file, "%llu: [ERROR] Failed to send CMD_GET_METRICS: %d\n",
(unsigned long long)get_timestamp_ms(), WSAGetLastError());
fflush(client->log_file);
}
return -1;
}
return 0;
}
static void handle_response(struct etcpmon_client* client, uint8_t msg_type,
const uint8_t* payload, uint16_t payload_size) {
switch (msg_type) {
case ETCPMON_RSP_CONN_LIST: {
if (payload_size < sizeof(struct etcpmon_rsp_conn_list)) break;
struct etcpmon_rsp_conn_list* rsp = (struct etcpmon_rsp_conn_list*)payload;
uint8_t count = rsp->count;
if (payload_size < sizeof(*rsp) + count * sizeof(struct etcpmon_conn_info)) break;
/* Update cached list */
if (client->conn_list) {
free(client->conn_list);
}
client->conn_list = (struct etcpmon_conn_info*)malloc(count * sizeof(struct etcpmon_conn_info));
if (client->conn_list) {
memcpy(client->conn_list, payload + sizeof(*rsp), count * sizeof(struct etcpmon_conn_info));
client->conn_count = count;
if (client->log_file) {
fprintf(client->log_file, "%llu: [LOG] RSP_CONN_LIST count=%d\n",
(unsigned long long)get_timestamp_ms(), count);
for (uint8_t i = 0; i < count; i++) {
fprintf(client->log_file, "%llu: [LOG] [%d] peer_id=%016llX name='%s'\n",
(unsigned long long)get_timestamp_ms(), i,
(unsigned long long)client->conn_list[i].peer_node_id,
client->conn_list[i].name);
}
fflush(client->log_file);
}
if (client->on_conn_list) {
client->on_conn_list(client->conn_list, count, client->user_data);
}
}
break;
}
case ETCPMON_RSP_METRICS: {
if (payload_size < sizeof(struct etcpmon_rsp_metrics)) break;
struct etcpmon_rsp_metrics* rsp = (struct etcpmon_rsp_metrics*)payload;
uint8_t links_count = rsp->etcp.links_count;
if (payload_size < sizeof(*rsp) + links_count * sizeof(struct etcpmon_link_metrics)) break;
/* Update cached metrics */
if (client->last_metrics) {
free(client->last_metrics);
}
if (client->last_links) {
free(client->last_links);
}
client->last_metrics = (struct etcpmon_rsp_metrics*)malloc(sizeof(*rsp));
client->last_links = (struct etcpmon_link_metrics*)malloc(links_count * sizeof(struct etcpmon_link_metrics));
if (client->last_metrics && client->last_links) {
memcpy(client->last_metrics, rsp, sizeof(*rsp));
memcpy(client->last_links, payload + sizeof(*rsp),
links_count * sizeof(struct etcpmon_link_metrics));
client->last_links_count = links_count;
if (client->log_file) {
fprintf(client->log_file, "%llu: [LOG] RSP_METRICS rtt=%u bytes=%llu links=%d\n",
(unsigned long long)get_timestamp_ms(),
rsp->etcp.rtt_last,
(unsigned long long)rsp->etcp.bytes_sent_total,
links_count);
fflush(client->log_file);
}
if (client->on_metrics) {
client->on_metrics(client->last_metrics, client->last_links,
links_count, client->user_data);
}
etcpmon_client_add_to_history(client, client->last_metrics);
}
break;
}
case ETCPMON_RSP_ERROR: {
if (payload_size >= sizeof(struct etcpmon_rsp_error)) {
struct etcpmon_rsp_error* rsp = (struct etcpmon_rsp_error*)payload;
const char* msg = (const char*)(payload + sizeof(*rsp));
if (client->log_file) {
fprintf(client->log_file, "%llu: [LOG] RSP_ERROR code=%d msg='%s'\n",
(unsigned long long)get_timestamp_ms(), rsp->error_code, msg);
fflush(client->log_file);
}
if (client->on_error) {
client->on_error(msg, client->user_data);
}
}
break;
}
}
}
int etcpmon_client_process(struct etcpmon_client* client) {
if (!client || client->sock == INVALID_SOCKET) return -1;
// Проверка завершения non-blocking connect
if (client->connected && !client->fully_connected) {
int error = 0;
int len = sizeof(error);
if (getsockopt(client->sock, SOL_SOCKET, SO_ERROR, (char*)&error, &len) == 0) {
if (error == 0) {
// Connect успешно завершён
client->fully_connected = 1;
if (client->log_file) {
fprintf(client->log_file, "%llu: [LOG] Connection established\n",
(unsigned long long)get_timestamp_ms());
fflush(client->log_file);
}
if (client->on_connected) {
client->on_connected(client->user_data); // Здесь вызов request_list
}
// Продолжаем к recv, т.к. подключены
} else if (error == WSAEINPROGRESS || error == WSAEALREADY) {
// Подключение ещё в процессе, ничего не делаем, ждём следующий вызов process
if (client->log_file) {
fprintf(client->log_file, "%llu: [LOG] Connect in progress (polling)...\n",
(unsigned long long)get_timestamp_ms());
fflush(client->log_file);
}
return 0; // Важно: выходим, чтобы не идти в recv (оно провалится с ошибкой)
} else {
// Подключение провалилось по другой причине
if (client->log_file) {
fprintf(client->log_file, "%llu: [ERROR] Connect failed: %d\n",
(unsigned long long)get_timestamp_ms(), error);
fflush(client->log_file);
}
etcpmon_client_disconnect(client);
if (client->on_disconnected) {
client->on_disconnected(client->user_data);
}
if (client->on_error) {
char msg[256];
snprintf(msg, sizeof(msg), "Connect failed: %d", error);
client->on_error(msg, client->user_data);
}
return -1;
}
} else {
// getsockopt провалился (редко, но на всякий случай)
if (client->log_file) {
fprintf(client->log_file, "%llu: [ERROR] getsockopt failed: %d\n",
(unsigned long long)get_timestamp_ms(), WSAGetLastError());
fflush(client->log_file);
}
etcpmon_client_disconnect(client);
if (client->on_disconnected) {
client->on_disconnected(client->user_data);
}
return -1;
}
}
// Если подключение в процессе, мы уже вышли выше. Здесь recv только для установленного соединения
uint8_t* buf = client->recv_buffer + client->recv_len;
int buf_space = ETCPMON_MAX_MSG_SIZE - client->recv_len;
if (buf_space <= 0) {
client->recv_len = 0;
buf = client->recv_buffer;
buf_space = ETCPMON_MAX_MSG_SIZE;
}
int received = recv(client->sock, (char*)buf, buf_space, 0);
if (received == SOCKET_ERROR) {
int err = WSAGetLastError();
if (err == WSAEWOULDBLOCK) {
return 0; /* No data available */
}
/* Connection error */
if (client->log_file) {
fprintf(client->log_file, "%llu: [ERROR] recv failed: %d\n",
(unsigned long long)get_timestamp_ms(), err);
fflush(client->log_file);
}
etcpmon_client_disconnect(client);
if (client->on_disconnected) {
client->on_disconnected(client->user_data);
}
return -1;
}
if (received == 0) {
/* Connection closed by server */
if (client->log_file) {
fprintf(client->log_file, "%llu: [LOG] Connection closed by server (recv returned 0)\n",
(unsigned long long)get_timestamp_ms());
fflush(client->log_file);
}
etcpmon_client_disconnect(client);
if (client->on_disconnected) {
client->on_disconnected(client->user_data);
}
return -1;
}
/* Log received data */
if (client->log_file) {
log_hex_data(client->log_file, "RX", buf, received);
}
client->recv_len += received;
/* Process complete messages */
int processed = 0;
while (client->recv_len >= sizeof(struct etcpmon_msg_header)) {
struct etcpmon_msg_header* hdr = (struct etcpmon_msg_header*)client->recv_buffer;
if (etcpmon_validate_header(hdr) != 0) {
/* Invalid header, skip byte */
if (client->log_file) {
fprintf(client->log_file, "%llu: [ERROR] Invalid header, skipping byte\n",
(unsigned long long)get_timestamp_ms());
fflush(client->log_file);
}
memmove(client->recv_buffer, client->recv_buffer + 1, client->recv_len - 1);
client->recv_len--;
continue;
}
if (client->recv_len < hdr->size) {
break; /* Wait for more data */
}
/* Handle message */
uint8_t* payload = client->recv_buffer + sizeof(*hdr);
uint16_t payload_size = hdr->size - sizeof(*hdr);
if (client->log_file) {
fprintf(client->log_file, "%llu: [LOG] Received message type=0x%02X size=%d\n",
(unsigned long long)get_timestamp_ms(), hdr->type, hdr->size);
fflush(client->log_file);
}
handle_response(client, hdr->type, payload, payload_size);
processed++;
/* Remove processed message */
if (client->recv_len > hdr->size) {
memmove(client->recv_buffer, client->recv_buffer + hdr->size,
client->recv_len - hdr->size);
}
client->recv_len -= hdr->size;
}
return processed;
}
void etcpmon_client_set_callbacks(struct etcpmon_client* client,
void (*on_connected)(void*),
void (*on_disconnected)(void*),
void (*on_conn_list)(struct etcpmon_conn_info*, uint8_t, void*),
void (*on_metrics)(struct etcpmon_rsp_metrics*,
struct etcpmon_link_metrics*,
uint8_t, void*),
void (*on_error)(const char*, void*),
void* user_data) {
if (!client) return;
client->on_connected = on_connected;
client->on_disconnected = on_disconnected;
client->on_conn_list = on_conn_list;
client->on_metrics = on_metrics;
client->on_error = on_error;
client->user_data = user_data;
}
const char* etcpmon_client_get_conn_name(struct etcpmon_client* client, uint64_t peer_id) {
if (!client || !client->conn_list) return NULL;
for (uint8_t i = 0; i < client->conn_count; i++) {
if (client->conn_list[i].peer_node_id == peer_id) {
return client->conn_list[i].name;
}
}
return NULL;
}
struct metrics_history* etcpmon_client_get_history(struct etcpmon_client* client) {
if (!client) return NULL;
return &client->history;
}
void etcpmon_client_add_to_history(struct etcpmon_client* client, struct etcpmon_rsp_metrics* metrics) {
if (!client || !metrics) return;
struct metrics_history* h = &client->history;
int idx = h->head;
// for (int m = 0; m < GRAPH_METRICS_COUNT; m++) {
// float v = h->values[m][idx];
// if (v < h->min_val[m]) h->min_val[m] = v;
// if (v > h->max_val[m]) h->max_val[m] = v;
// }
if (client->log_file) {
fprintf(client->log_file, "add_to_history: rtt_last=%u rtt_avg10=%u rtt_avg100=%u jitter=%u\n",
metrics->etcp.rtt_last, metrics->etcp.rtt_avg_10, metrics->etcp.rtt_avg_100, metrics->etcp.jitter);
fflush(client->log_file);
}
float scale = 0.1f; // to ms (protocol: 0.1ms units)
h->values[GRAPH_METRIC_RTT_LAST][idx] = (float)metrics->etcp.rtt_last * scale;
h->values[GRAPH_METRIC_RTT_AVG10][idx] = (float)metrics->etcp.rtt_avg_10 * scale;
h->values[GRAPH_METRIC_RTT_AVG100][idx] = (float)metrics->etcp.rtt_avg_100 * scale;
h->values[GRAPH_METRIC_JITTER][idx] = (float)metrics->etcp.jitter * scale;
uint32_t retrans_delta = 0;
uint32_t acks_delta = 0;
uint64_t bytes_delta = 0;
if (h->count > 0) {
if (metrics->etcp.retrans_count >= h->last_retrans) {
retrans_delta = metrics->etcp.retrans_count - h->last_retrans;
}
if (metrics->etcp.ack_count >= h->last_acks) {
acks_delta = metrics->etcp.ack_count - h->last_acks;
}
if (metrics->etcp.bytes_sent_total >= h->last_bytes_sent) {
bytes_delta = metrics->etcp.bytes_sent_total - h->last_bytes_sent;
}
}
h->last_retrans = metrics->etcp.retrans_count;
h->last_acks = metrics->etcp.ack_count;
h->last_bytes_sent = metrics->etcp.bytes_sent_total;
h->values[GRAPH_METRIC_RETRANS][idx] = (float)retrans_delta;
h->values[GRAPH_METRIC_ACKS][idx] = (float)acks_delta;
h->values[GRAPH_METRIC_INFLIGHT][idx] = (float)metrics->etcp.unacked_bytes;
h->values[GRAPH_METRIC_BYTES_SENT][idx] = (float)bytes_delta;
h->head = (h->head + 1) % GRAPH_HISTORY_SIZE;
if (h->count < GRAPH_HISTORY_SIZE) {
h->count++;
}
if (client->log_file) {
fprintf(client->log_file, "history updated: count=%d head=%d\n", h->count, h->head);
fflush(client->log_file);
}
}