/* * etcpmon_client.c - ETCP Monitor Client Network Implementation */ #include "etcpmon_client.h" #include "etcpmon_graph.h" #include #include #include #include //#pragma comment(lib, "ws2_32.lib") /* Log file name */ #define LOG_FILENAME "etcpmon.log" /* Get current timestamp in milliseconds (Unix epoch) */ uint64_t get_timestamp_ms(void) { FILETIME ft; GetSystemTimeAsFileTime(&ft); /* Convert FILETIME (100-nanosecond intervals since 1601) to milliseconds */ uint64_t time = ((uint64_t)ft.dwHighDateTime << 32) | ft.dwLowDateTime; /* Convert from 1601 to 1970 epoch (11644473600000 milliseconds) */ return (time / 10000) - 11644473600000ULL; } /* Log hex data to file */ static void log_hex_data(FILE* log, const char* prefix, const uint8_t* data, size_t len) { if (!log || !data || len == 0) return; fprintf(log, "%llu: [%s] ", (unsigned long long)get_timestamp_ms(), prefix); for (size_t i = 0; i < len && i < 256; i++) { /* Limit to 256 bytes per line */ fprintf(log, "%02X ", data[i]); } if (len > 256) { fprintf(log, "... (%llu bytes total)", (unsigned long long)len); } fprintf(log, "\n"); fflush(log); } void etcpmon_client_init(struct etcpmon_client* client) { if (!client) return; memset(client, 0, sizeof(*client)); client->sock = INVALID_SOCKET; client->connected = 0; client->fully_connected = 0; client->log_file = NULL; memset(&client->history, 0, sizeof(client->history)); for (int i = 0; i < GRAPH_METRICS_COUNT; i++) { client->history.min_val[i] = 1e9f; client->history.max_val[i] = -1e9f; } client->history.last_minmax_update = 0; // ← добавь } void etcpmon_client_cleanup(struct etcpmon_client* client) { if (!client) return; etcpmon_client_disconnect(client); /* Close log file */ if (client->log_file) { fprintf(client->log_file, "%llu: [LOG] Log closed\n", (unsigned long long)get_timestamp_ms()); fclose(client->log_file); client->log_file = NULL; } /* Free cached data */ if (client->conn_list) { free(client->conn_list); client->conn_list = NULL; } if (client->last_metrics) { free(client->last_metrics); client->last_metrics = NULL; } if (client->last_links) { free(client->last_links); client->last_links = NULL; } } int etcpmon_client_connect(struct etcpmon_client* client, const char* addr, uint16_t port) { if (!client || !addr) return -1; if (client->connected) { etcpmon_client_disconnect(client); } /* Open log file (truncate on start) */ if (!client->log_file) { client->log_file = fopen(LOG_FILENAME, "w"); if (client->log_file) { fprintf(client->log_file, "%llu: [LOG] ETCP Monitor communication log started\n", (unsigned long long)get_timestamp_ms()); fprintf(client->log_file, "%llu: [LOG] Connecting to %s:%d\n", (unsigned long long)get_timestamp_ms(), addr, port); fflush(client->log_file); } } /* Create socket */ client->sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (client->sock == INVALID_SOCKET) { if (client->log_file) { fprintf(client->log_file, "%llu: [ERROR] Failed to create socket\n", (unsigned long long)get_timestamp_ms()); fflush(client->log_file); } if (client->on_error) { client->on_error("Failed to create socket", client->user_data); } return -1; } /* Set non-blocking mode */ u_long nonblock = 1; ioctlsocket(client->sock, FIONBIO, &nonblock); /* Setup address */ struct sockaddr_in server_addr; memset(&server_addr, 0, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_port = htons(port); if (inet_pton(AF_INET, addr, &server_addr.sin_addr) != 1) { if (client->log_file) { fprintf(client->log_file, "%llu: [ERROR] Invalid address: %s\n", (unsigned long long)get_timestamp_ms(), addr); fflush(client->log_file); } closesocket(client->sock); client->sock = INVALID_SOCKET; if (client->on_error) { client->on_error("Invalid address", client->user_data); } return -1; } /* Connect (non-blocking) */ int result = connect(client->sock, (struct sockaddr*)&server_addr, sizeof(server_addr)); if (result == SOCKET_ERROR) { int err = WSAGetLastError(); if (err != WSAEWOULDBLOCK && err != WSAEINPROGRESS) { if (client->log_file) { fprintf(client->log_file, "%llu: [ERROR] Connection failed: %d\n", (unsigned long long)get_timestamp_ms(), err); fflush(client->log_file); } closesocket(client->sock); client->sock = INVALID_SOCKET; if (client->on_error) { char msg[256]; snprintf(msg, sizeof(msg), "Connection failed: %d", err); client->on_error(msg, client->user_data); } return -1; } /* Connection in progress, will complete asynchronously */ } strncpy(client->server_addr, addr, sizeof(client->server_addr) - 1); client->server_port = port; client->connected = 1; client->fully_connected = 0; if (client->log_file) { fprintf(client->log_file, "%llu: [LOG] Socket created, connection in progress\n", (unsigned long long)get_timestamp_ms()); fflush(client->log_file); } return 0; } void etcpmon_client_disconnect(struct etcpmon_client* client) { if (!client) return; if (client->sock != INVALID_SOCKET) { /* Send disconnect command */ if (client->connected) { struct etcpmon_msg_header hdr; etcpmon_build_header(&hdr, 0, ETCPMON_CMD_DISCONNECT); if (client->log_file) { log_hex_data(client->log_file, "TX", (const uint8_t*)&hdr, sizeof(hdr)); fprintf(client->log_file, "%llu: [LOG] Sending CMD_DISCONNECT\n", (unsigned long long)get_timestamp_ms()); } send(client->sock, (const char*)&hdr, sizeof(hdr), 0); } if (client->log_file) { fprintf(client->log_file, "%llu: [LOG] Connection closed\n", (unsigned long long)get_timestamp_ms()); fflush(client->log_file); } closesocket(client->sock); client->sock = INVALID_SOCKET; } client->connected = 0; client->fully_connected = 0; client->recv_len = 0; client->selected_peer_id = 0; } int etcpmon_client_is_connected(const struct etcpmon_client* client) { return client ? client->connected : 0; } int etcpmon_client_request_list(struct etcpmon_client* client) { if (!client || client->sock == INVALID_SOCKET) return -1; struct etcpmon_msg_header hdr; etcpmon_build_header(&hdr, 0, ETCPMON_CMD_LIST_CONN); if (client->log_file) { log_hex_data(client->log_file, "TX", (const uint8_t*)&hdr, sizeof(hdr)); fprintf(client->log_file, "%llu: [LOG] CMD_LIST_CONN\n", (unsigned long long)get_timestamp_ms()); fflush(client->log_file); } int sent = send(client->sock, (const char*)&hdr, sizeof(hdr), 0); if (sent != sizeof(hdr)) { if (client->log_file) { fprintf(client->log_file, "%llu: [ERROR] Failed to send CMD_LIST_CONN: %d\n", (unsigned long long)get_timestamp_ms(), WSAGetLastError()); fflush(client->log_file); } return -1; } return 0; } int etcpmon_client_select_connection(struct etcpmon_client* client, uint64_t peer_node_id) { if (!client || client->sock == INVALID_SOCKET) return -1; uint8_t buffer[sizeof(struct etcpmon_msg_header) + sizeof(struct etcpmon_cmd_select)]; struct etcpmon_msg_header* hdr = (struct etcpmon_msg_header*)buffer; struct etcpmon_cmd_select* cmd = (struct etcpmon_cmd_select*)(buffer + sizeof(*hdr)); etcpmon_build_header(hdr, sizeof(*cmd), ETCPMON_CMD_SELECT_CONN); cmd->peer_node_id = peer_node_id; if (client->log_file) { log_hex_data(client->log_file, "TX", buffer, sizeof(buffer)); fprintf(client->log_file, "%llu: [LOG] CMD_SELECT_CONN peer_id=%016llX\n", (unsigned long long)get_timestamp_ms(), (unsigned long long)peer_node_id); fflush(client->log_file); } int sent = send(client->sock, (const char*)buffer, sizeof(buffer), 0); if (sent != sizeof(buffer)) { if (client->log_file) { fprintf(client->log_file, "%llu: [ERROR] Failed to send CMD_SELECT_CONN: %d\n", (unsigned long long)get_timestamp_ms(), WSAGetLastError()); fflush(client->log_file); } return -1; } client->selected_peer_id = peer_node_id; return 0; } int etcpmon_client_request_metrics(struct etcpmon_client* client) { if (!client || client->sock == INVALID_SOCKET) return -1; struct etcpmon_msg_header hdr; etcpmon_build_header(&hdr, 0, ETCPMON_CMD_GET_METRICS); if (client->log_file) { log_hex_data(client->log_file, "TX", (const uint8_t*)&hdr, sizeof(hdr)); fprintf(client->log_file, "%llu: [LOG] CMD_GET_METRICS\n", (unsigned long long)get_timestamp_ms()); fflush(client->log_file); } int sent = send(client->sock, (const char*)&hdr, sizeof(hdr), 0); if (sent != sizeof(hdr)) { if (client->log_file) { fprintf(client->log_file, "%llu: [ERROR] Failed to send CMD_GET_METRICS: %d\n", (unsigned long long)get_timestamp_ms(), WSAGetLastError()); fflush(client->log_file); } return -1; } return 0; } static void handle_response(struct etcpmon_client* client, uint8_t msg_type, const uint8_t* payload, uint16_t payload_size) { switch (msg_type) { case ETCPMON_RSP_CONN_LIST: { if (payload_size < sizeof(struct etcpmon_rsp_conn_list)) break; struct etcpmon_rsp_conn_list* rsp = (struct etcpmon_rsp_conn_list*)payload; uint8_t count = rsp->count; if (payload_size < sizeof(*rsp) + count * sizeof(struct etcpmon_conn_info)) break; /* Update cached list */ if (client->conn_list) { free(client->conn_list); } client->conn_list = (struct etcpmon_conn_info*)malloc(count * sizeof(struct etcpmon_conn_info)); if (client->conn_list) { memcpy(client->conn_list, payload + sizeof(*rsp), count * sizeof(struct etcpmon_conn_info)); client->conn_count = count; if (client->log_file) { fprintf(client->log_file, "%llu: [LOG] RSP_CONN_LIST count=%d\n", (unsigned long long)get_timestamp_ms(), count); for (uint8_t i = 0; i < count; i++) { fprintf(client->log_file, "%llu: [LOG] [%d] peer_id=%016llX name='%s'\n", (unsigned long long)get_timestamp_ms(), i, (unsigned long long)client->conn_list[i].peer_node_id, client->conn_list[i].name); } fflush(client->log_file); } if (client->on_conn_list) { client->on_conn_list(client->conn_list, count, client->user_data); } } break; } case ETCPMON_RSP_METRICS: { if (payload_size < sizeof(struct etcpmon_rsp_metrics)) break; struct etcpmon_rsp_metrics* rsp = (struct etcpmon_rsp_metrics*)payload; uint8_t links_count = rsp->etcp.links_count; if (payload_size < sizeof(*rsp) + links_count * sizeof(struct etcpmon_link_metrics)) break; /* Update cached metrics */ if (client->last_metrics) { free(client->last_metrics); } if (client->last_links) { free(client->last_links); } client->last_metrics = (struct etcpmon_rsp_metrics*)malloc(sizeof(*rsp)); client->last_links = (struct etcpmon_link_metrics*)malloc(links_count * sizeof(struct etcpmon_link_metrics)); if (client->last_metrics && client->last_links) { memcpy(client->last_metrics, rsp, sizeof(*rsp)); memcpy(client->last_links, payload + sizeof(*rsp), links_count * sizeof(struct etcpmon_link_metrics)); client->last_links_count = links_count; if (client->log_file) { fprintf(client->log_file, "%llu: [LOG] RSP_METRICS rtt=%u bytes=%llu links=%d\n", (unsigned long long)get_timestamp_ms(), rsp->etcp.rtt_last, (unsigned long long)rsp->etcp.bytes_sent_total, links_count); fflush(client->log_file); } if (client->on_metrics) { client->on_metrics(client->last_metrics, client->last_links, links_count, client->user_data); } etcpmon_client_add_to_history(client, client->last_metrics); } break; } case ETCPMON_RSP_ERROR: { if (payload_size >= sizeof(struct etcpmon_rsp_error)) { struct etcpmon_rsp_error* rsp = (struct etcpmon_rsp_error*)payload; const char* msg = (const char*)(payload + sizeof(*rsp)); if (client->log_file) { fprintf(client->log_file, "%llu: [LOG] RSP_ERROR code=%d msg='%s'\n", (unsigned long long)get_timestamp_ms(), rsp->error_code, msg); fflush(client->log_file); } if (client->on_error) { client->on_error(msg, client->user_data); } } break; } } } int etcpmon_client_process(struct etcpmon_client* client) { if (!client || client->sock == INVALID_SOCKET) return -1; // Проверка завершения non-blocking connect if (client->connected && !client->fully_connected) { int error = 0; int len = sizeof(error); if (getsockopt(client->sock, SOL_SOCKET, SO_ERROR, (char*)&error, &len) == 0) { if (error == 0) { // Connect успешно завершён client->fully_connected = 1; if (client->log_file) { fprintf(client->log_file, "%llu: [LOG] Connection established\n", (unsigned long long)get_timestamp_ms()); fflush(client->log_file); } if (client->on_connected) { client->on_connected(client->user_data); // Здесь вызов request_list } // Продолжаем к recv, т.к. подключены } else if (error == WSAEINPROGRESS || error == WSAEALREADY) { // Подключение ещё в процессе, ничего не делаем, ждём следующий вызов process if (client->log_file) { fprintf(client->log_file, "%llu: [LOG] Connect in progress (polling)...\n", (unsigned long long)get_timestamp_ms()); fflush(client->log_file); } return 0; // Важно: выходим, чтобы не идти в recv (оно провалится с ошибкой) } else { // Подключение провалилось по другой причине if (client->log_file) { fprintf(client->log_file, "%llu: [ERROR] Connect failed: %d\n", (unsigned long long)get_timestamp_ms(), error); fflush(client->log_file); } etcpmon_client_disconnect(client); if (client->on_disconnected) { client->on_disconnected(client->user_data); } if (client->on_error) { char msg[256]; snprintf(msg, sizeof(msg), "Connect failed: %d", error); client->on_error(msg, client->user_data); } return -1; } } else { // getsockopt провалился (редко, но на всякий случай) if (client->log_file) { fprintf(client->log_file, "%llu: [ERROR] getsockopt failed: %d\n", (unsigned long long)get_timestamp_ms(), WSAGetLastError()); fflush(client->log_file); } etcpmon_client_disconnect(client); if (client->on_disconnected) { client->on_disconnected(client->user_data); } return -1; } } // Если подключение в процессе, мы уже вышли выше. Здесь recv только для установленного соединения uint8_t* buf = client->recv_buffer + client->recv_len; int buf_space = ETCPMON_MAX_MSG_SIZE - client->recv_len; if (buf_space <= 0) { client->recv_len = 0; buf = client->recv_buffer; buf_space = ETCPMON_MAX_MSG_SIZE; } int received = recv(client->sock, (char*)buf, buf_space, 0); if (received == SOCKET_ERROR) { int err = WSAGetLastError(); if (err == WSAEWOULDBLOCK) { return 0; /* No data available */ } /* Connection error */ if (client->log_file) { fprintf(client->log_file, "%llu: [ERROR] recv failed: %d\n", (unsigned long long)get_timestamp_ms(), err); fflush(client->log_file); } etcpmon_client_disconnect(client); if (client->on_disconnected) { client->on_disconnected(client->user_data); } return -1; } if (received == 0) { /* Connection closed by server */ if (client->log_file) { fprintf(client->log_file, "%llu: [LOG] Connection closed by server (recv returned 0)\n", (unsigned long long)get_timestamp_ms()); fflush(client->log_file); } etcpmon_client_disconnect(client); if (client->on_disconnected) { client->on_disconnected(client->user_data); } return -1; } /* Log received data */ if (client->log_file) { log_hex_data(client->log_file, "RX", buf, received); } client->recv_len += received; /* Process complete messages */ int processed = 0; while (client->recv_len >= sizeof(struct etcpmon_msg_header)) { struct etcpmon_msg_header* hdr = (struct etcpmon_msg_header*)client->recv_buffer; if (etcpmon_validate_header(hdr) != 0) { /* Invalid header, skip byte */ if (client->log_file) { fprintf(client->log_file, "%llu: [ERROR] Invalid header, skipping byte\n", (unsigned long long)get_timestamp_ms()); fflush(client->log_file); } memmove(client->recv_buffer, client->recv_buffer + 1, client->recv_len - 1); client->recv_len--; continue; } if (client->recv_len < hdr->size) { break; /* Wait for more data */ } /* Handle message */ uint8_t* payload = client->recv_buffer + sizeof(*hdr); uint16_t payload_size = hdr->size - sizeof(*hdr); if (client->log_file) { fprintf(client->log_file, "%llu: [LOG] Received message type=0x%02X size=%d\n", (unsigned long long)get_timestamp_ms(), hdr->type, hdr->size); fflush(client->log_file); } handle_response(client, hdr->type, payload, payload_size); processed++; /* Remove processed message */ if (client->recv_len > hdr->size) { memmove(client->recv_buffer, client->recv_buffer + hdr->size, client->recv_len - hdr->size); } client->recv_len -= hdr->size; } return processed; } void etcpmon_client_set_callbacks(struct etcpmon_client* client, void (*on_connected)(void*), void (*on_disconnected)(void*), void (*on_conn_list)(struct etcpmon_conn_info*, uint8_t, void*), void (*on_metrics)(struct etcpmon_rsp_metrics*, struct etcpmon_link_metrics*, uint8_t, void*), void (*on_error)(const char*, void*), void* user_data) { if (!client) return; client->on_connected = on_connected; client->on_disconnected = on_disconnected; client->on_conn_list = on_conn_list; client->on_metrics = on_metrics; client->on_error = on_error; client->user_data = user_data; } const char* etcpmon_client_get_conn_name(struct etcpmon_client* client, uint64_t peer_id) { if (!client || !client->conn_list) return NULL; for (uint8_t i = 0; i < client->conn_count; i++) { if (client->conn_list[i].peer_node_id == peer_id) { return client->conn_list[i].name; } } return NULL; } struct metrics_history* etcpmon_client_get_history(struct etcpmon_client* client) { if (!client) return NULL; return &client->history; } void etcpmon_client_add_to_history(struct etcpmon_client* client, struct etcpmon_rsp_metrics* metrics) { if (!client || !metrics) return; struct metrics_history* h = &client->history; int idx = h->head; // for (int m = 0; m < GRAPH_METRICS_COUNT; m++) { // float v = h->values[m][idx]; // if (v < h->min_val[m]) h->min_val[m] = v; // if (v > h->max_val[m]) h->max_val[m] = v; // } if (client->log_file) { fprintf(client->log_file, "add_to_history: rtt_last=%u rtt_avg10=%u rtt_avg100=%u jitter=%u\n", metrics->etcp.rtt_last, metrics->etcp.rtt_avg_10, metrics->etcp.rtt_avg_100, metrics->etcp.jitter); fflush(client->log_file); } float scale = 0.1f; // to ms (protocol: 0.1ms units) h->values[GRAPH_METRIC_RTT_LAST][idx] = (float)metrics->etcp.rtt_last * scale; h->values[GRAPH_METRIC_RTT_AVG10][idx] = (float)metrics->etcp.rtt_avg_10 * scale; h->values[GRAPH_METRIC_RTT_AVG100][idx] = (float)metrics->etcp.rtt_avg_100 * scale; h->values[GRAPH_METRIC_JITTER][idx] = (float)metrics->etcp.jitter * scale; uint32_t retrans_delta = 0; uint32_t acks_delta = 0; uint64_t bytes_delta = 0; if (h->count > 0) { if (metrics->etcp.retrans_count >= h->last_retrans) { retrans_delta = metrics->etcp.retrans_count - h->last_retrans; } if (metrics->etcp.ack_count >= h->last_acks) { acks_delta = metrics->etcp.ack_count - h->last_acks; } if (metrics->etcp.bytes_sent_total >= h->last_bytes_sent) { bytes_delta = metrics->etcp.bytes_sent_total - h->last_bytes_sent; } } h->last_retrans = metrics->etcp.retrans_count; h->last_acks = metrics->etcp.ack_count; h->last_bytes_sent = metrics->etcp.bytes_sent_total; h->values[GRAPH_METRIC_RETRANS][idx] = (float)retrans_delta; h->values[GRAPH_METRIC_ACKS][idx] = (float)acks_delta; h->values[GRAPH_METRIC_INFLIGHT][idx] = (float)metrics->etcp.unacked_bytes; h->values[GRAPH_METRIC_BYTES_SENT][idx] = (float)bytes_delta; h->head = (h->head + 1) % GRAPH_HISTORY_SIZE; if (h->count < GRAPH_HISTORY_SIZE) { h->count++; } if (client->log_file) { fprintf(client->log_file, "history updated: count=%d head=%d\n", h->count, h->head); fflush(client->log_file); } }