You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

604 lines
26 KiB

#include "pkt_normalizer.h"
#include "../lib/u_async.h"
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <stdio.h>
static void packer_handler(struct ll_queue* q, struct ll_entry* unused, void* arg);
static void unpacker_handler(struct ll_queue* q, struct ll_entry* unused, void* arg);
static void send_buf(struct pn_struct* pn);
static int get_header(uint8_t* header, size_t L);
/* Calculate fragment size from mtu */
struct pn_struct* pkt_normalizer_init(uasync_t* ua, int is_packer, int mtu) {
struct pn_struct* pn = malloc(sizeof(struct pn_struct));
if (!pn) return NULL;
pn->ua = ua;
pn->input = queue_new(ua, NULL); // No memory pool for now
if (!pn->input) {
free(pn);
return NULL;
}
pn->output = queue_new(ua, NULL); // No memory pool for now
if (!pn->output) {
queue_free(pn->input);
free(pn);
return NULL;
}
pn->is_packer = is_packer;
if (is_packer) {
// Calculate fragment size from mtu: fragment = mtu - 100
int fragment_size = mtu - ETCP_OVERHEAD;
if (fragment_size < 256) fragment_size = 256; // Minimum sane value
pn->u.packer.cap = fragment_size;
pn->u.packer.buf = malloc(pn->u.packer.cap);
if (!pn->u.packer.buf) {
queue_free(pn->input);
queue_free(pn->output);
free(pn);
return NULL;
}
pn->u.packer.len = 0;
pn->u.packer.error_count = 0;
queue_set_callback(pn->input, packer_handler, pn);
} else {
pn->u.unpacker.buf = NULL;
pn->u.unpacker.len = 0;
pn->u.unpacker.total_len = 0;
pn->u.unpacker.cap = 0;
pn->u.unpacker.error_count = 0;
queue_set_callback(pn->input, unpacker_handler, pn);
}
return pn;
}
void pkt_normalizer_deinit(struct pn_struct* pn) {
if (!pn) return;
queue_free(pn->input);
queue_free(pn->output);
if (pn->is_packer) {
free(pn->u.packer.buf);
} else {
free(pn->u.unpacker.buf);
free(pn->u.unpacker.service_buf);
}
free(pn);
}
struct pkt_normalizer_pair* pkt_normalizer_pair_init(uasync_t* ua, int mtu) {
struct pkt_normalizer_pair* pair = malloc(sizeof(struct pkt_normalizer_pair));
if (!pair) return NULL;
pair->packer = pkt_normalizer_init(ua, 1, mtu);
if (!pair->packer) {
free(pair);
return NULL;
}
pair->unpacker = pkt_normalizer_init(ua, 0, mtu);
if (!pair->unpacker) {
pkt_normalizer_deinit(pair->packer);
free(pair);
return NULL;
}
return pair;
}
void pkt_normalizer_pair_deinit(struct pkt_normalizer_pair* pair) {
if (!pair) return;
pkt_normalizer_deinit(pair->packer);
pkt_normalizer_deinit(pair->unpacker);
free(pair);
}
static int get_header(uint8_t* header, size_t L) {
if (L > 1535) return -1;
if (L <= 239) {
header[0] = (uint8_t)L;
return 1;
} else {
uint8_t high = (uint8_t)(L >> 8);
if (high > 5) return -1;
header[0] = 0xF0 + high;
header[1] = (uint8_t)(L & 0xFF);
return 2;
}
}
/* Сбросить состояние сборки фрагментов */
static void reset_fragment_state(struct pn_struct* pn) {
if (!pn->is_packer) {
pn->u.unpacker.len = 0;
pn->u.unpacker.total_len = 0;
pn->u.unpacker.in_fragment = 0;
}
}
/* Таймаут для сборки фрагментов */
static void send_buf(struct pn_struct* pn) {
if (pn->u.packer.len == 0) return;
size_t payload_len = pn->u.packer.len;
struct ll_entry* out = queue_entry_new(2 + payload_len);
if (!out) return;
uint8_t* d = ll_entry_data(out);
*(uint16_t*)d = (uint16_t)payload_len;
memcpy(d + 2, pn->u.packer.buf, payload_len);
queue_entry_put(pn->output, out);
pn->u.packer.len = 0;
}
static void packer_handler(struct ll_queue* q, struct ll_entry* unused, void* arg) {
(void)unused;
struct pn_struct* pn = arg;
size_t max = (size_t)1400;
struct ll_entry* entry = queue_entry_get(q);
if (!entry) {
queue_resume_callback(q);
return;
}
size_t L = ll_entry_size(entry);
uint8_t* data = ll_entry_data(entry);
uint8_t header[2];
int hsize = get_header(header, L);
size_t needed = (size_t)hsize + L;
if (hsize < 0 || needed > max) {
// Fragment
if (pn->u.packer.len > 0) {
send_buf(pn);
}
size_t remaining = L;
size_t pos = 0;
int fragment_count = 0;
while (remaining > 0) {
size_t chunk;
size_t payload_len;
struct ll_entry* fout;
uint8_t* fd;
uint8_t frag_header[2];
int frag_hsize;
if (fragment_count == 0) {
// Первый фрагмент: FF + общая длина (2 байта)
chunk = remaining > (max - 5) ? (max - 5) : remaining; // 2+1+2+chunk <= max
payload_len = 1 + 2 + chunk; // FF + total_len + data
fout = queue_entry_new(2 + payload_len);
if (!fout) {
break;
}
fd = ll_entry_data(fout);
*(uint16_t*)fd = (uint16_t)payload_len;
fd += 2;
*fd++ = 0xFF;
*fd++ = (uint8_t)(L >> 8); // старший байт общей длины
*fd++ = (uint8_t)(L & 0xFF); // младший байт общей длины
} else {
// Не первый фрагмент
if (remaining <= max - 3) {
// Это последний возможный фрагмент (помещается в один пакет с префиксом FE)
// Пытаемся отправить как обычный блок
frag_hsize = get_header(frag_header, remaining);
if (frag_hsize > 0 && (size_t)frag_hsize + remaining + 2 <= max) {
// Успешно: обычный блок
payload_len = frag_hsize + remaining;
chunk = remaining;
fout = queue_entry_new(2 + payload_len);
if (!fout) {
break;
}
fd = ll_entry_data(fout);
*(uint16_t*)fd = (uint16_t)payload_len;
fd += 2;
memcpy(fd, frag_header, frag_hsize);
fd += frag_hsize;
} else {
// Не удалось отправить как обычный блок - разбиваем на 2 фрагмента
// 1. FE фрагмент с частью данных
// 2. Обычный блок с оставшимися данными
// Находим максимальный размер для FE фрагмента
size_t max_fe_data = max - 3; // 2 байта длины + 0xFE
if (max_fe_data > remaining) {
max_fe_data = remaining;
}
// Пробуем различные размеры, начиная с максимального
size_t fe_data_size = 0;
for (size_t try_fe = max_fe_data; try_fe > 0; try_fe--) {
size_t try_regular = remaining - try_fe;
if (try_regular == 0) continue; // Нужно отправить что-то как обычный блок
uint8_t test_header[2];
int hsize = get_header(test_header, try_regular);
if (hsize <= 0) continue;
if ((size_t)hsize + try_regular + 2 <= max) {
fe_data_size = try_fe;
break;
}
}
if (fe_data_size == 0) {
// Не удалось найти разбиение - ошибка
pn->u.packer.error_count++;
// Отправляем как FE (нарушение спецификации, но это крайний случай)
chunk = remaining > (max - 3) ? (max - 3) : remaining;
payload_len = 1 + chunk; // FE + data
fout = queue_entry_new(2 + payload_len);
if (!fout) break;
fd = ll_entry_data(fout);
*(uint16_t*)fd = (uint16_t)payload_len;
fd += 2;
*fd++ = 0xFE;
} else {
// Отправляем FE фрагмент
chunk = fe_data_size;
payload_len = 1 + chunk; // FE + data
fout = queue_entry_new(2 + payload_len);
if (!fout) break;
fd = ll_entry_data(fout);
*(uint16_t*)fd = (uint16_t)payload_len;
fd += 2;
*fd++ = 0xFE;
memcpy(fd, data + pos, chunk);
queue_entry_put(pn->output, fout);
pos += chunk;
remaining -= chunk;
fragment_count++;
// Обновляем оставшиеся данные для обычного блока
// (цикл продолжит обработку на следующей итерации)
continue;
}
}
} else {
// Промежуточный фрагмент, отправляем как FE
chunk = remaining > (max - 3) ? (max - 3) : remaining;
payload_len = 1 + chunk; // FE + data
fout = queue_entry_new(2 + payload_len);
if (!fout) {
break;
}
fd = ll_entry_data(fout);
*(uint16_t*)fd = (uint16_t)payload_len;
fd += 2;
*fd++ = 0xFE;
}
}
memcpy(fd, data + pos, chunk);
queue_entry_put(pn->output, fout);
pos += chunk;
remaining -= chunk;
fragment_count++;
}
} else {
if (pn->u.packer.len + needed > max) {
send_buf(pn);
}
// Add to buffer
uint8_t* p = pn->u.packer.buf + pn->u.packer.len;
memcpy(p, header, (size_t)hsize);
memcpy(p + hsize, data, L);
pn->u.packer.len += needed;
}
queue_entry_free(entry);
if (pn->u.packer.len > 0) {
send_buf(pn);
}
queue_resume_callback(q);
}
void pkt_normalizer_set_service_callback(struct pn_struct* pn, pkt_normalizer_service_callback_t callback, void* user_data) {
if (!pn) return;
pn->service_callback = callback;
pn->service_callback_user_data = user_data;
}
void pkt_normalizer_reset_service_state(struct pn_struct* pn) {
if (!pn || pn->is_packer) return;
if (pn->u.unpacker.in_service) {
// Deliver pending service packet
if (pn->service_callback) {
pn->service_callback(pn->service_callback_user_data,
pn->u.unpacker.service_type,
pn->u.unpacker.service_buf,
pn->u.unpacker.service_len);
}
free(pn->u.unpacker.service_buf);
pn->u.unpacker.service_buf = NULL;
pn->u.unpacker.service_len = 0;
pn->u.unpacker.service_cap = 0;
pn->u.unpacker.in_service = 0;
}
}
void pkt_normalizer_reset_state(struct pn_struct* pn) {
if (!pn) return;
if (pn->is_packer) {
// Flush packer buffer
if (pn->u.packer.len > 0) {
send_buf(pn);
}
} else {
// Reset unpacker fragment state
reset_fragment_state(pn);
// Reset service state
pkt_normalizer_reset_service_state(pn);
}
}
int pkt_normalizer_send_service(struct pn_struct* pn, uint8_t type, const void* data, size_t len) {
if (!pn || !pn->is_packer) return -1;
// Service packet ограничен 256 байтами всего
if (len > 256 - 2) return -1; // 2 байта на заголовок (0xFC + тип)
size_t max = (size_t)1400;
if (max < 3) return -1;
// Размер сервисного пакета: 2 байта длины + 1 байт 0xFC + 1 байт тип + данные
// Если не помещается в один фрагмент - используем продолжение 0xFD
size_t total_service_len = 1 + 1 + len; // 0xFC + type + data
size_t pos = 0;
while (total_service_len > 0) {
// Определяем размер куска для этого пакета
size_t chunk;
uint8_t service_header;
if (pos == 0) {
// Первый пакет: 0xFC + тип + часть данных
// Максимум данных в первом пакете: max - 2 (длина) - 2 (0xFC+тип)
size_t max_first_data = max - 4;
if (max_first_data > len) max_first_data = len;
chunk = max_first_data;
service_header = 0xFC;
} else {
// Продолжение: 0xFD + данные
// Максимум данных: max - 2 (длина) - 1 (0xFD)
size_t max_cont_data = max - 3;
size_t remaining = len - pos;
if (max_cont_data > remaining) max_cont_data = remaining;
chunk = max_cont_data;
service_header = 0xFD;
}
if (chunk == 0) break;
size_t payload_len = 1 + chunk + (pos == 0 ? 1 : 0); // +1 байт типа для первого пакета
struct ll_entry* entry = queue_entry_new(2 + payload_len);
if (!entry) return -1;
uint8_t* d = ll_entry_data(entry);
*(uint16_t*)d = (uint16_t)payload_len;
d += 2;
*d++ = service_header;
if (pos == 0) {
*d++ = type;
}
memcpy(d, (const uint8_t*)data + pos, chunk);
queue_entry_put(pn->output, entry);
pos += chunk;
total_service_len -= chunk + (pos == chunk ? 2 : 1); // корректно вычитаем заголовки
}
return 0;
}
int pkt_normalizer_get_error_count(const struct pn_struct* pn) {
if (!pn) return 0;
if (pn->is_packer) {
return pn->u.packer.error_count;
}
return pn->u.unpacker.error_count;
}
void pkt_normalizer_reset_error_count(struct pn_struct* pn) {
if (!pn) return;
if (pn->is_packer) {
pn->u.packer.error_count = 0;
} else {
pn->u.unpacker.error_count = 0;
}
}
void pkt_normalizer_flush(struct pn_struct* pn) {
if (!pn || !pn->is_packer) return;
if (pn->u.packer.len > 0) {
send_buf(pn);
}
}
static void unpacker_handler(struct ll_queue* q, struct ll_entry* unused, void* arg) {
(void)unused;
struct pn_struct* pn = arg;
while (queue_entry_count(q) > 0) {
struct ll_entry* entry = queue_entry_get(q);
uint8_t* data = ll_entry_data(entry);
size_t total = ll_entry_size(entry);
uint16_t payload_len = *(uint16_t*)data;
if (total != 2 + (size_t)payload_len) {
queue_entry_free(entry);
continue;
}
uint8_t* cg = data + 2;
size_t cg_pos = 0;
size_t cg_len = (size_t)payload_len;
while (cg_pos < cg_len) {
uint8_t byte = cg[cg_pos++];
if (byte == 0xFF) {
/* Начало нового фрагментированного пакета */
if (pn->u.unpacker.in_fragment) {
/* Не завершен предыдущий фрагмент - ошибка */
pn->u.unpacker.error_count++;
reset_fragment_state(pn);
}
/* Проверить, что есть 2 байта для общей длины */
if (cg_pos + 2 > cg_len) {
pn->u.unpacker.error_count++;
goto err;
}
/* Прочитать общую длину */
uint16_t total_len = ((uint16_t)cg[cg_pos] << 8) | cg[cg_pos + 1];
cg_pos += 2;
size_t chunk_len = cg_len - cg_pos;
/* Выделить буфер при необходимости */
size_t new_len = chunk_len;
if (new_len > pn->u.unpacker.cap) {
size_t new_cap = pn->u.unpacker.cap ? pn->u.unpacker.cap * 2 : 4096;
if (new_cap < new_len) new_cap = new_len;
pn->u.unpacker.buf = realloc(pn->u.unpacker.buf, new_cap);
pn->u.unpacker.cap = new_cap;
}
memcpy(pn->u.unpacker.buf, cg + cg_pos, chunk_len);
pn->u.unpacker.len = chunk_len;
pn->u.unpacker.total_len = total_len;
pn->u.unpacker.in_fragment = 1;
cg_pos += chunk_len;
/* Проверить, не собрали ли уже весь пакет */
if (pn->u.unpacker.len >= pn->u.unpacker.total_len) {
if (pn->u.unpacker.len == pn->u.unpacker.total_len) {
struct ll_entry* out = queue_entry_new(pn->u.unpacker.total_len);
if (out) {
memcpy(ll_entry_data(out), pn->u.unpacker.buf, pn->u.unpacker.total_len);
queue_entry_put(pn->output, out);
}
} else {
/* Слишком много данных - ошибка */
pn->u.unpacker.error_count++;
}
reset_fragment_state(pn);
}
continue;
}
if (byte == 0xFE) {
/* Продолжение фрагментированного пакета */
if (!pn->u.unpacker.in_fragment) {
/* Не было начала фрагмента - ошибка */
pn->u.unpacker.error_count++;
goto err;
}
size_t chunk_len = cg_len - cg_pos;
size_t new_len = pn->u.unpacker.len + chunk_len;
if (new_len > pn->u.unpacker.cap) {
size_t new_cap = pn->u.unpacker.cap ? pn->u.unpacker.cap * 2 : 4096;
if (new_cap < new_len) new_cap = new_len;
pn->u.unpacker.buf = realloc(pn->u.unpacker.buf, new_cap);
pn->u.unpacker.cap = new_cap;
}
memcpy(pn->u.unpacker.buf + pn->u.unpacker.len, cg + cg_pos, chunk_len);
pn->u.unpacker.len = new_len;
cg_pos += chunk_len;
/* Проверить, не собрали ли уже весь пакет */
if (pn->u.unpacker.len >= pn->u.unpacker.total_len) {
if (pn->u.unpacker.len == pn->u.unpacker.total_len) {
struct ll_entry* out = queue_entry_new(pn->u.unpacker.total_len);
if (out) {
memcpy(ll_entry_data(out), pn->u.unpacker.buf, pn->u.unpacker.total_len);
queue_entry_put(pn->output, out);
}
} else {
/* Слишком много данных - ошибка */
pn->u.unpacker.error_count++;
}
reset_fragment_state(pn);
}
continue;
}
if (byte == 0xFC || byte == 0xFD) {
/* Service packet */
if (byte == 0xFC) {
/* Start of service packet */
if (pn->u.unpacker.in_service) {
/* Previous service packet finished - deliver it */
if (pn->service_callback) {
pn->service_callback(pn->service_callback_user_data,
pn->u.unpacker.service_type,
pn->u.unpacker.service_buf,
pn->u.unpacker.service_len);
}
free(pn->u.unpacker.service_buf);
pn->u.unpacker.service_buf = NULL;
pn->u.unpacker.service_len = 0;
pn->u.unpacker.service_cap = 0;
pn->u.unpacker.in_service = 0;
}
/* Read service type */
if (cg_pos >= cg_len) goto err;
uint8_t service_type = cg[cg_pos++];
pn->u.unpacker.service_type = service_type;
pn->u.unpacker.in_service = 1;
pn->u.unpacker.service_len = 0;
} else {
/* 0xFD - continuation */
if (!pn->u.unpacker.in_service) {
/* No service packet started - error */
pn->u.unpacker.error_count++;
goto err;
}
}
/* Read data */
size_t data_len = cg_len - cg_pos;
if (data_len > 0) {
size_t new_len = pn->u.unpacker.service_len + data_len;
if (new_len > 256) {
/* Service packet too long - error */
pn->u.unpacker.error_count++;
free(pn->u.unpacker.service_buf);
pn->u.unpacker.service_buf = NULL;
pn->u.unpacker.service_len = 0;
pn->u.unpacker.service_cap = 0;
pn->u.unpacker.in_service = 0;
goto err;
}
if (new_len > pn->u.unpacker.service_cap) {
size_t new_cap = pn->u.unpacker.service_cap ? pn->u.unpacker.service_cap * 2 : 256;
if (new_cap < new_len) new_cap = new_len;
if (new_cap > 256) new_cap = 256;
uint8_t* new_buf = realloc(pn->u.unpacker.service_buf, new_cap);
if (!new_buf) {
pn->u.unpacker.error_count++;
goto err;
}
pn->u.unpacker.service_buf = new_buf;
pn->u.unpacker.service_cap = new_cap;
}
memcpy(pn->u.unpacker.service_buf + pn->u.unpacker.service_len, cg + cg_pos, data_len);
pn->u.unpacker.service_len = new_len;
cg_pos += data_len;
}
/* Check if this is the end of service packet (end of payload) */
if (cg_pos >= cg_len) {
/* End of current payload, but service packet may continue in next transport packet */
continue;
} else {
/* There is more data in this payload after service packet - error */
pn->u.unpacker.error_count++;
free(pn->u.unpacker.service_buf);
pn->u.unpacker.service_buf = NULL;
pn->u.unpacker.service_len = 0;
pn->u.unpacker.service_cap = 0;
pn->u.unpacker.in_service = 0;
goto err;
}
}
/* Обычная запись (не фрагмент) */
size_t L;
if (byte <= 0xEF) {
L = byte;
} else if (byte >= 0xF0 && byte <= 0xF5) {
if (cg_pos >= cg_len) goto err;
uint8_t ext = cg[cg_pos++];
L = ((size_t)(byte - 0xF0) << 8) | ext;
} else {
/* Недопустимый байт */
goto err;
}
if (cg_pos + L > cg_len) goto err;
if (pn->u.unpacker.in_fragment) {
/* Это последний фрагмент в виде обычной записи */
size_t new_len = pn->u.unpacker.len + L;
if (new_len > pn->u.unpacker.cap) {
size_t new_cap = pn->u.unpacker.cap ? pn->u.unpacker.cap * 2 : 4096;
if (new_cap < new_len) new_cap = new_len;
pn->u.unpacker.buf = realloc(pn->u.unpacker.buf, new_cap);
pn->u.unpacker.cap = new_cap;
}
memcpy(pn->u.unpacker.buf + pn->u.unpacker.len, cg + cg_pos, L);
pn->u.unpacker.len = new_len;
cg_pos += L;
/* Проверить, собрали ли весь пакет */
if (pn->u.unpacker.len >= pn->u.unpacker.total_len) {
if (pn->u.unpacker.len == pn->u.unpacker.total_len) {
struct ll_entry* out = queue_entry_new(pn->u.unpacker.total_len);
if (out) {
memcpy(ll_entry_data(out), pn->u.unpacker.buf, pn->u.unpacker.total_len);
queue_entry_put(pn->output, out);
}
} else {
/* Слишком много данных - ошибка */
pn->u.unpacker.error_count++;
}
reset_fragment_state(pn);
}
} else {
/* Обычная запись (не часть фрагмента) */
struct ll_entry* out = queue_entry_new(L);
if (out) {
memcpy(ll_entry_data(out), cg + cg_pos, L);
queue_entry_put(pn->output, out);
}
cg_pos += L;
}
}
err:
queue_entry_free(entry);
}
queue_resume_callback(q);
}