Browse Source

tun/win шлёт в routing.

nodeinfo-routing-update
jeka 1 month ago
parent
commit
c271084d35
  1. 114
      lib/u_async.c
  2. 126
      src/routing.c
  3. 49
      src/tun_if.c
  4. 1
      src/tun_if.h
  5. 2
      src/tun_windows.c
  6. 2
      src/utun.c

114
lib/u_async.c

@ -373,6 +373,7 @@ static void handle_wakeup(struct UASYNC* ua) {
while (read(ua->wakeup_pipe[0], buf, sizeof(buf)) > 0) {}
#endif
// DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "POST: wakeup process");
// Execute all posted callbacks (in main thread)
process_posted_tasks(ua);
}
@ -1105,62 +1106,6 @@ struct UASYNC* uasync_create(void) {
ua->wakeup_initialized = 0;
ua->posted_tasks_head = NULL;
#ifdef _WIN32
// Windows: self-connected UDP socket for wakeup
SOCKET r = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
SOCKET w = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (r == INVALID_SOCKET || w == INVALID_SOCKET) {
DEBUG_ERROR(DEBUG_CATEGORY_UASYNC, "Failed to create wakeup sockets");
free(ua);
return NULL;
}
struct sockaddr_in addr = {0};
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
addr.sin_port = 0;
if (bind(r, (struct sockaddr*)&addr, sizeof(addr)) == SOCKET_ERROR ||
getsockname(r, (struct sockaddr*)&addr, &(int){sizeof(addr)}) == SOCKET_ERROR ||
connect(w, (struct sockaddr*)&addr, sizeof(addr)) == SOCKET_ERROR) {
closesocket(r);
closesocket(w);
DEBUG_ERROR(DEBUG_CATEGORY_UASYNC, "Wakeup socket setup failed: %d", WSAGetLastError());
free(ua);
return NULL;
}
ua->wakeup_pipe[0] = (int)(intptr_t)r;
ua->wakeup_pipe[1] = (int)(intptr_t)w;
ua->wakeup_initialized = 1;
u_long mode = 1;
ioctlsocket(r, FIONBIO, &mode);
// Register the read socket with uasync
uasync_add_socket_t(ua, r, wakeup_read_callback_win, NULL, NULL, ua); // ← ua как user_data
// uasync_add_socket_t(ua, r, wakeup_read_callback_win, NULL, NULL, NULL);
InitializeCriticalSection(&ua->posted_lock);
#else
// POSIX pipe
if (pipe(ua->wakeup_pipe) != 0) {
DEBUG_ERROR(DEBUG_CATEGORY_UASYNC, "pipe() failed: %s", strerror(errno));
free(ua);
return NULL;
}
ua->wakeup_initialized = 1;
fcntl(ua->wakeup_pipe[0], F_SETFL, fcntl(ua->wakeup_pipe[0], F_GETFL, 0) | O_NONBLOCK);
fcntl(ua->wakeup_pipe[1], F_SETFL, fcntl(ua->wakeup_pipe[1], F_GETFL, 0) | O_NONBLOCK);
if (!ua->use_epoll) {
uasync_add_socket(ua, ua->wakeup_pipe[0], wakeup_read_callback_posix, NULL, NULL, ua); // ← ua
}
pthread_mutex_init(&ua->posted_lock, NULL);
#endif
ua->sockets = socket_array_create(16);
if (!ua->sockets) {
if (ua->wakeup_initialized) {
@ -1217,6 +1162,62 @@ struct UASYNC* uasync_create(void) {
}
#endif
#ifdef _WIN32
// Windows: self-connected UDP socket for wakeup
SOCKET r = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
SOCKET w = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (r == INVALID_SOCKET || w == INVALID_SOCKET) {
DEBUG_ERROR(DEBUG_CATEGORY_UASYNC, "Failed to create wakeup sockets");
free(ua);
return NULL;
}
struct sockaddr_in addr = {0};
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
addr.sin_port = 0;
if (bind(r, (struct sockaddr*)&addr, sizeof(addr)) == SOCKET_ERROR ||
getsockname(r, (struct sockaddr*)&addr, &(int){sizeof(addr)}) == SOCKET_ERROR ||
connect(w, (struct sockaddr*)&addr, sizeof(addr)) == SOCKET_ERROR) {
closesocket(r);
closesocket(w);
DEBUG_ERROR(DEBUG_CATEGORY_UASYNC, "Wakeup socket setup failed: %d", WSAGetLastError());
free(ua);
return NULL;
}
ua->wakeup_pipe[0] = (int)(intptr_t)r;
ua->wakeup_pipe[1] = (int)(intptr_t)w;
ua->wakeup_initialized = 1;
u_long mode = 1;
ioctlsocket(r, FIONBIO, &mode);
// Register the read socket with uasync
uasync_add_socket_t(ua, r, wakeup_read_callback_win, NULL, NULL, ua); // ← ua как user_data
// uasync_add_socket_t(ua, r, wakeup_read_callback_win, NULL, NULL, NULL);
InitializeCriticalSection(&ua->posted_lock);
#else
// POSIX pipe
if (pipe(ua->wakeup_pipe) != 0) {
DEBUG_ERROR(DEBUG_CATEGORY_UASYNC, "pipe() failed: %s", strerror(errno));
free(ua);
return NULL;
}
ua->wakeup_initialized = 1;
fcntl(ua->wakeup_pipe[0], F_SETFL, fcntl(ua->wakeup_pipe[0], F_GETFL, 0) | O_NONBLOCK);
fcntl(ua->wakeup_pipe[1], F_SETFL, fcntl(ua->wakeup_pipe[1], F_GETFL, 0) | O_NONBLOCK);
if (!ua->use_epoll) {
uasync_add_socket(ua, ua->wakeup_pipe[0], wakeup_read_callback_posix, NULL, NULL, ua); // ← ua
}
pthread_mutex_init(&ua->posted_lock, NULL);
#endif
return ua;
}
@ -1436,6 +1437,7 @@ void uasync_post(struct UASYNC* ua, uasync_post_callback_t callback, void* arg)
pthread_mutex_unlock(&ua->posted_lock);
#endif
// DEBUG_DEBUG(DEBUG_CATEGORY_TIMERS, "POST: wakeup send");
uasync_wakeup(ua); // будим mainloop
}

126
src/routing.c

@ -161,93 +161,71 @@ static void routing_pkt_from_tun_cb(struct ll_queue* q, void* arg) {
}
struct ETCP_FRAGMENT* pkt = (struct ETCP_FRAGMENT*)queue_data_get(instance->tun->output_queue);
while (pkt) {
if (pkt->ll.dgram && pkt->ll.len > 0) {
// Дамп пакета из TUN
dump_ip_packet("TUN->routing", (const uint8_t*)pkt->ll.dgram, pkt->ll.len);
// Extract destination IP for routing
uint32_t dst_ip = extract_dst_ip(pkt->ll.dgram, pkt->ll.len);
if (dst_ip == 0) {
DEBUG_WARN(DEBUG_CATEGORY_ROUTING, "Failed to extract destination IP, dropping packet");
free(pkt->ll.dgram);
memory_pool_free(instance->tun->pool, pkt);
pkt = (struct ETCP_FRAGMENT*)queue_data_get(instance->tun->output_queue);
continue;
}
if (!pkt) return;
if (pkt->ll.dgram && pkt->ll.len > 0) {
dump_ip_packet("TUN->routing", (const uint8_t*)pkt->ll.dgram, pkt->ll.len);
uint32_t dst_ip = extract_dst_ip(pkt->ll.dgram, pkt->ll.len);
if (dst_ip == 0) {
DEBUG_WARN(DEBUG_CATEGORY_ROUTING, "Failed to extract destination IP, dropping packet");
free(pkt->ll.dgram);
memory_pool_free(instance->tun->pool, pkt);
} else {
struct in_addr addr;
addr.s_addr = dst_ip;
// Lookup route in routing table
struct ROUTE_ARRAY* routes = route_table_lookup(instance->rt, dst_ip);
if (!routes || routes->routes == 0) {
DEBUG_WARN(DEBUG_CATEGORY_ROUTING, "No route to %s, dropping packet", ip_to_str(&addr, AF_INET).str);
free(pkt->ll.dgram);
memory_pool_free(instance->tun->pool, pkt);
pkt = (struct ETCP_FRAGMENT*)queue_data_get(instance->tun->output_queue);
continue;
}
// Use first (best) route
struct ETCP_CONN* conn = routes->entries[0]->next_hop;
if (!conn) {
DEBUG_WARN(DEBUG_CATEGORY_ROUTING, "Route to %s has no next_hop, dropping packet", ip_to_str(&addr, AF_INET).str);
free(pkt->ll.dgram);
memory_pool_free(instance->tun->pool, pkt);
pkt = (struct ETCP_FRAGMENT*)queue_data_get(instance->tun->output_queue);
continue;
}
// Check connection has normalizer
if (!conn->normalizer) {
DEBUG_WARN(DEBUG_CATEGORY_ROUTING, "Connection for %s has no normalizer, dropping packet", ip_to_str(&addr, AF_INET).str);
free(pkt->ll.dgram);
memory_pool_free(instance->tun->pool, pkt);
pkt = (struct ETCP_FRAGMENT*)queue_data_get(instance->tun->output_queue);
continue;
}
// Create ll_entry with ID prefix for ETCP
// Format: <id 1 byte> <ip_packet ...>
size_t etcp_data_len = 1 + pkt->ll.len;
struct ll_entry* entry = ll_alloc_lldgram(etcp_data_len);
if (!entry) {
DEBUG_ERROR(DEBUG_CATEGORY_ROUTING, "Failed to allocate entry for ETCP send");
free(pkt->ll.dgram);
memory_pool_free(instance->tun->pool, pkt);
pkt = (struct ETCP_FRAGMENT*)queue_data_get(instance->tun->output_queue);
continue;
}
// Set ID byte and copy IP packet
entry->dgram[0] = ETCP_ID_DATA;
memcpy(entry->dgram + 1, pkt->ll.dgram, pkt->ll.len);
entry->len = etcp_data_len;
// Send via etcp_send
if (etcp_send(conn, entry) != 0) {
DEBUG_WARN(DEBUG_CATEGORY_ROUTING, "etcp_send failed for %s", ip_to_str(&addr, AF_INET).str);
queue_entry_free(entry);
queue_dgram_free(entry);
} else {
DEBUG_DEBUG(DEBUG_CATEGORY_ROUTING, "Forwarded packet to %s via ETCP conn [%s]",
ip_to_str(&addr, AF_INET).str, conn->log_name);
struct ETCP_CONN* conn = routes->entries[0]->next_hop;
if (!conn) {
DEBUG_WARN(DEBUG_CATEGORY_ROUTING, "Route to %s has no next_hop, dropping packet", ip_to_str(&addr, AF_INET).str);
free(pkt->ll.dgram);
memory_pool_free(instance->tun->pool, pkt);
} else if (!conn->normalizer) {
DEBUG_WARN(DEBUG_CATEGORY_ROUTING, "Connection for %s has no normalizer, dropping packet", ip_to_str(&addr, AF_INET).str);
free(pkt->ll.dgram);
memory_pool_free(instance->tun->pool, pkt);
} else {
size_t etcp_data_len = 1 + pkt->ll.len;
struct ll_entry* entry = ll_alloc_lldgram(etcp_data_len);
if (!entry) {
DEBUG_ERROR(DEBUG_CATEGORY_ROUTING, "Failed to allocate entry for ETCP send");
free(pkt->ll.dgram);
memory_pool_free(instance->tun->pool, pkt);
} else {
entry->dgram[0] = ETCP_ID_DATA;
memcpy(entry->dgram + 1, pkt->ll.dgram, pkt->ll.len);
entry->len = etcp_data_len;
if (etcp_send(conn, entry) != 0) {
DEBUG_WARN(DEBUG_CATEGORY_ROUTING, "etcp_send failed for %s", ip_to_str(&addr, AF_INET).str);
queue_entry_free(entry);
queue_dgram_free(entry);
} else {
dump_ip_packet("routing->ETCP", (const uint8_t*)entry->dgram, entry->len);
DEBUG_DEBUG(DEBUG_CATEGORY_ROUTING, "Forwarded packet to %s via ETCP conn [%s]",
ip_to_str(&addr, AF_INET).str, conn->log_name);
}
free(pkt->ll.dgram);
memory_pool_free(instance->tun->pool, pkt);
}
}
}
// Free original TUN packet
}
} else {
if (pkt->ll.dgram) {
free(pkt->ll.dgram);
memory_pool_free(instance->tun->pool, pkt);
} else {
// Empty packet - just free structure
if (pkt->ll.dgram) {
free(pkt->ll.dgram);
}
memory_pool_free(instance->tun->pool, pkt);
}
// Get next packet
pkt = (struct ETCP_FRAGMENT*)queue_data_get(instance->tun->output_queue);
memory_pool_free(instance->tun->pool, pkt);
}
queue_resume_callback(instance->tun->output_queue);
}
// Initialize routing module for instance

49
src/tun_if.c

@ -83,27 +83,28 @@ static void tun_input_queue_callback(struct ll_queue* q, void* arg)
{
(void)q;
struct tun_if* tun = (struct tun_if*)arg;
struct ETCP_FRAGMENT* pkt;
struct ETCP_FRAGMENT* pkt = queue_data_get(tun->input_queue);
if (!pkt) return;
while ((pkt = (struct ETCP_FRAGMENT*)queue_data_get(tun->input_queue)) != NULL) {
if (pkt->ll.dgram && pkt->ll.len > 0) {
if (pkt->ll.len > TUN_MAX_PACKET_SIZE) {
DEBUG_WARN(DEBUG_CATEGORY_TUN, "Packet too large: %zu bytes", pkt->ll.len);
if (pkt->ll.dgram && pkt->ll.len > 0) {
if (pkt->ll.len > TUN_MAX_PACKET_SIZE) {
DEBUG_WARN(DEBUG_CATEGORY_TUN, "Packet too large: %zu bytes", pkt->ll.len);
} else {
ssize_t n = tun_platform_write(tun, pkt->ll.dgram, pkt->ll.len);
if (n < 0) {
DEBUG_ERROR(DEBUG_CATEGORY_TUN, "tun_platform_write failed");
tun->write_errors++;
} else {
ssize_t n = tun_platform_write(tun, pkt->ll.dgram, pkt->ll.len);
if (n < 0) {
DEBUG_ERROR(DEBUG_CATEGORY_TUN, "tun_platform_write failed");
tun->write_errors++;
} else {
tun->bytes_written += n;
tun->packets_written++;
dump_ip_packet("->TUN", pkt->ll.dgram, pkt->ll.len);
}
tun->bytes_written += n;
tun->packets_written++;
dump_ip_packet("->TUN", pkt->ll.dgram, pkt->ll.len);
}
free(pkt->ll.dgram);
}
memory_pool_free(tun->pool, pkt);
free(pkt->ll.dgram);
}
memory_pool_free(tun->pool, pkt);
queue_resume_callback(tun->input_queue);
}
// ===================================================================
@ -153,10 +154,6 @@ struct tun_if* tun_init(struct UASYNC* ua, struct utun_config* config)
queue_set_callback(tun->input_queue, tun_input_queue_callback, tun);
#ifdef _WIN32
InitializeCriticalSection(&tun->output_queue_lock);
#endif
if (!test_mode) {
int poll_fd = tun_platform_get_poll_fd(tun);
@ -194,7 +191,6 @@ fail:
WaitForSingleObject(tun->read_thread, INFINITE);
CloseHandle(tun->read_thread);
}
DeleteCriticalSection(&tun->output_queue_lock);
#endif
if (tun->output_queue) queue_free(tun->output_queue);
if (tun->input_queue) queue_free(tun->input_queue);
@ -218,7 +214,6 @@ void tun_close(struct tun_if* tun)
WaitForSingleObject(tun->read_thread, INFINITE);
CloseHandle(tun->read_thread);
CloseHandle(tun->stop_event);
DeleteCriticalSection(&tun->output_queue_lock);
}
#endif
@ -274,6 +269,7 @@ int tun_is_test_mode(struct tun_if* tun) { return tun ? tun->test_mode : -1; }
void tun_packet_handler(void* arg) {
struct tun_packet_data* pd = (struct tun_packet_data*)arg;
if (!pd || !pd->tun || !pd->entry) {
DEBUG_ERROR(DEBUG_CATEGORY_TUN, "ARG ZERO");
if (pd) free(pd);
return;
}
@ -284,9 +280,12 @@ void tun_packet_handler(void* arg) {
// Освободить временную структуру
free(pd);
dump_ip_packet("TUNh->", entry->dgram, entry->len);
// Положить в очередь (уже в main thread)
int ok = queue_data_put(tun->output_queue, entry, 0);
if (ok != 0) {
DEBUG_ERROR(DEBUG_CATEGORY_TUN, "Put error");
queue_entry_free(entry);
tun->read_errors++;
}
@ -309,13 +308,7 @@ int tun_inject_packet(struct tun_if* tun, const uint8_t* buf, size_t len)
pkt->ll.dgram = data;
pkt->ll.len = len;
#ifdef _WIN32
EnterCriticalSection(&tun->output_queue_lock);
#endif
int ret = queue_data_put(tun->output_queue, (struct ll_entry*)pkt, 0);
#ifdef _WIN32
LeaveCriticalSection(&tun->output_queue_lock);
#endif
if (ret != 0) {
free(data);

1
src/tun_if.h

@ -61,7 +61,6 @@ struct tun_if {
uint32_t ifindex; // интерфейсный индекс
#ifdef _WIN32
CRITICAL_SECTION output_queue_lock; // защита output_queue (TUN → routing)
HANDLE read_thread;
HANDLE stop_event;
volatile int running; // 1 = поток работает

2
src/tun_windows.c

@ -294,7 +294,7 @@ DWORD WINAPI tun_read_thread_proc(LPVOID arg)
tun->packets_read++;
// Дамп пакета
dump_ip_packet("TUN->", buf, n);
// dump_ip_packet("TUN->", buf, n);
// Передать данные в main thread через uasync_post
uasync_post(tun->ua, tun_packet_handler, pd);

2
src/utun.c

@ -274,7 +274,7 @@ int main(int argc, char *argv[]) {
debug_config_init();
debug_set_level(DEBUG_LEVEL_TRACE);
// debug_set_categories(DEBUG_CATEGORY_ALL & ~DEBUG_CATEGORY_UASYNC & ~DEBUG_CATEGORY_TIMERS & ~DEBUG_CATEGORY_CRYPTO & ~DEBUG_CATEGORY_ETCP & ~DEBUG_CATEGORY_CONNECTION); // Enable all except uasync
debug_set_categories(DEBUG_CATEGORY_ALL & ~DEBUG_CATEGORY_UASYNC & ~DEBUG_CATEGORY_TIMERS);
debug_set_categories(DEBUG_CATEGORY_ALL & ~DEBUG_CATEGORY_UASYNC);// & ~DEBUG_CATEGORY_TIMERS
debug_enable_function_name(1);
if (args.debug_config) {

Loading…
Cancel
Save