From 83d4ebe8855bf6171d72f92e0d69ee3bc127facd Mon Sep 17 00:00:00 2001 From: Jaroslav Pesek Date: Mon, 7 Jul 2025 14:46:03 +0200 Subject: [PATCH 1/7] CMake - Add flowscatter subdirectory --- modules/CMakeLists.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/CMakeLists.txt b/modules/CMakeLists.txt index 2d84c1c3..606d47e0 100644 --- a/modules/CMakeLists.txt +++ b/modules/CMakeLists.txt @@ -3,3 +3,4 @@ add_subdirectory(sampler) add_subdirectory(telemetry) add_subdirectory(deduplicator) add_subdirectory(clickhouse) +add_subdirectory(flowScatter) From 08c5573761e0424d8fc312555396440ea45402f8 Mon Sep 17 00:00:00 2001 From: Jaroslav Pesek Date: Mon, 7 Jul 2025 14:49:39 +0200 Subject: [PATCH 2/7] Flow Scatter - Initialization of module --- modules/flowScatter/CMakeLists.txt | 1 + modules/flowScatter/README.md | 31 ++++++++++++++++++++++++++ modules/flowScatter/src/CMakeLists.txt | 17 ++++++++++++++ 3 files changed, 49 insertions(+) create mode 100644 modules/flowScatter/CMakeLists.txt create mode 100644 modules/flowScatter/README.md create mode 100644 modules/flowScatter/src/CMakeLists.txt diff --git a/modules/flowScatter/CMakeLists.txt b/modules/flowScatter/CMakeLists.txt new file mode 100644 index 00000000..febd4f0a --- /dev/null +++ b/modules/flowScatter/CMakeLists.txt @@ -0,0 +1 @@ +add_subdirectory(src) diff --git a/modules/flowScatter/README.md b/modules/flowScatter/README.md new file mode 100644 index 00000000..c150ae2c --- /dev/null +++ b/modules/flowScatter/README.md @@ -0,0 +1,31 @@ +# flowScatter module - README + +## Description +The module performs distributing of 1 unirec interface data to `n` trap outputs + +## Interfaces +- Input: 1 +- Output: `n` + +## Parameters +### Common TRAP parameters +- `-h [trap,1]` Print help message for this module / for libtrap specific parameters. +- `-i IFC_SPEC` Specification of interface types and their parameters. +- `-v` Be verbose. +- `-vv` Be more verbose. +- `-vvv` Be even more verbose. + +### Module specific parameters +- `-m, --appfs-mountpoint ` Path where the appFs directory will be mounted +- `-r, --rule ` Decide, what fields are used to create a hash. Might be in form of rule. +- `-c, --count ` Number of output interfaces. + +## Usage Examples +``` +$ flowscatter -i u:trap_in,u:trap_out -r "SRC_IP,SRC_PORT" # according SRC_IP+SRC_PORT + +$ flowscatter -i u:trap_in,u:trap_out -r "TAG_SRC_IP:(SRC_IP)|TAG_DST_IP:(DST_IP)" # if TAG_SRC_IP is not 0, then SRC_IP else if TAG_DST_IP is no 0, then DST_IP + +$ flowscatter -i u:trap_in,u:trap_out -r "TAG_SRC_IP:(SRC_IP,SRC_PORT)|TAG_DST_IP:(DST_IP,DST_PORT)" # if TAG_SRC_IP is not 0, then SRC_IP+SRC_PORT, else if TAG_DST_IP is not 0, then DST_IP+DST_PORT + +``` diff --git a/modules/flowScatter/src/CMakeLists.txt b/modules/flowScatter/src/CMakeLists.txt new file mode 100644 index 00000000..55bb6128 --- /dev/null +++ b/modules/flowScatter/src/CMakeLists.txt @@ -0,0 +1,17 @@ +add_executable(flowscatter + main.cpp + flowScatter.cpp +) + +target_link_libraries(flowscatter PRIVATE + telemetry::telemetry + telemetry::appFs + common + unirec::unirec++ + unirec::unirec + trap::trap + argparse + xxhash +) + +install(TARGETS flowscatter DESTINATION ${INSTALL_DIR_BIN}) From b4ac3b0a72fd6086c349d8b508d3a38bfaf308d0 Mon Sep 17 00:00:00 2001 From: Jaroslav Pesek Date: Mon, 7 Jul 2025 14:51:06 +0200 Subject: [PATCH 3/7] Flow Scatter - Introduce flowscatter class for calculation of output ifc index --- modules/flowScatter/src/flowScatter.cpp | 315 ++++++++++++++++++++++++ modules/flowScatter/src/flowScatter.hpp | 107 ++++++++ 2 files changed, 422 insertions(+) create mode 100644 modules/flowScatter/src/flowScatter.cpp create mode 100644 modules/flowScatter/src/flowScatter.hpp diff --git a/modules/flowScatter/src/flowScatter.cpp b/modules/flowScatter/src/flowScatter.cpp new file mode 100644 index 00000000..feb675f5 --- /dev/null +++ b/modules/flowScatter/src/flowScatter.cpp @@ -0,0 +1,315 @@ +/** + * @file + * @author Jaroslav Pesek + * @brief Implementation of the FlowScatter class. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "flowScatter.hpp" +#include +#include + +namespace { + +using namespace Nemea; + +/** + * @brief Appends the binary representation of a value to a byte vector. + * + * @tparam T The type of the value to be converted to bytes + * @param vec Reference to the vector where bytes will be appended + * @param value The value whose binary representation will be appended + */ +template +void append_bytes(std::vector& vec, const T& value) +{ + const auto* bytes = reinterpret_cast(&value); + vec.insert(vec.end(), bytes, bytes + sizeof(T)); +} + +/** + * @brief Trims whitespace characters from the beginning and end of a string. + * + * @param str The input string to be trimmed + * @return A new string with leading and trailing whitespace removed. + */ +std::string trim(const std::string& str) +{ + const char* ws = " \t\n\r"; + const auto begin = str.find_first_not_of(ws); + if (begin == std::string::npos) { + return ""; + } + const auto end = str.find_last_not_of(ws); + return str.substr(begin, end - begin + 1); +} + +/** + * @brief Checks if a field value in a UniRec record is non-zero. + * + * This function examines a field in a UniRec record and determines whether + * its value is non-zero. The function supports numeric types only.. + * + * @param fieldId The identifier of the field to check + * @param fieldType The type of the field + * @param record The UniRec record containing the field to examine + * + * @return true if the field value is non-zero, false if it equals zero + * + * @throws std::runtime_error if the field type is not supported + */ +bool checkNonZeroValue(ur_field_id_t fieldId, ur_field_type_t fieldType, const UnirecRecordView& record) { + switch (fieldType) { + case UR_TYPE_UINT64: + return (record.getFieldAsType(fieldId) != 0); + case UR_TYPE_INT64: + return (record.getFieldAsType(fieldId) != 0); + case UR_TYPE_UINT32: + return (record.getFieldAsType(fieldId) != 0); + case UR_TYPE_INT32: + return (record.getFieldAsType(fieldId) != 0); + case UR_TYPE_UINT16: + return (record.getFieldAsType(fieldId) != 0); + case UR_TYPE_INT16: + return (record.getFieldAsType(fieldId) != 0); + case UR_TYPE_UINT8: + return (record.getFieldAsType(fieldId) != 0); + case UR_TYPE_INT8: + return (record.getFieldAsType(fieldId) != 0); + case UR_TYPE_FLOAT: + return (record.getFieldAsType(fieldId) != 0.0f); + case UR_TYPE_DOUBLE: + return (record.getFieldAsType(fieldId) != 0.0); + default: + throw std::runtime_error("Unsupported conditional field type: " + std::string(ur_get_name(fieldId))); + } +} + +/** + * @brief Appends a field value from a UniRec record to a hash input vector. + * + * This function extracts a field value from a UniRec record based on its type + * and appends its binary representation to the provided hash input vector. + * + * @param hashInput Reference to the vector where the field's binary data will be appended + * @param fieldId The identifier of the field to extract + * @param fieldType The type of the field + * @param record The UniRec record containing the field + * + * @throws std::runtime_error if the field type is not supported for hashing + */ +void appendFieldToHash(std::vector& hashInput, ur_field_id_t fieldId, ur_field_type_t fieldType, const UnirecRecordView& record) +{ + switch (fieldType) { + case UR_TYPE_UINT64: { + auto value = record.getFieldAsType(fieldId); + append_bytes(hashInput, value); + break; + } + case UR_TYPE_INT64: { + auto value = record.getFieldAsType(fieldId); + append_bytes(hashInput, value); + break; + } + case UR_TYPE_UINT32: { + auto value = record.getFieldAsType(fieldId); + append_bytes(hashInput, value); + break; + } + case UR_TYPE_INT32: { + auto value = record.getFieldAsType(fieldId); + append_bytes(hashInput, value); + break; + } + case UR_TYPE_UINT16: { + auto value = record.getFieldAsType(fieldId); + append_bytes(hashInput, value); + break; + } + case UR_TYPE_INT16: { + auto value = record.getFieldAsType(fieldId); + append_bytes(hashInput, value); + break; + } + case UR_TYPE_UINT8: { + auto value = record.getFieldAsType(fieldId); + append_bytes(hashInput, value); + break; + } + case UR_TYPE_INT8: { + auto value = record.getFieldAsType(fieldId); + append_bytes(hashInput, value); + break; + } + case UR_TYPE_IP: { + auto value = record.getFieldAsType(fieldId); + append_bytes(hashInput, value); + break; + } + case UR_TYPE_MAC: { + auto value = record.getFieldAsType(fieldId); + append_bytes(hashInput, value); + break; + } + default: + throw std::runtime_error("Unsupported field type for hashing: " + std::string(ur_get_name(fieldId))); + } +} +} // namespace + +namespace Fs { + +FlowScatter::FlowScatter(size_t numOutputs, std::string rule) + : M_NUM_OUTPUTS(numOutputs) +{ + if (numOutputs <= 0 || numOutputs > MAX_OUTPUTS) { + throw std::invalid_argument("Number of outputs must be between 1 and " + std::to_string(MAX_OUTPUTS)); + } + m_logger->info("Initializing FlowScatter with {} outputs", numOutputs); + m_logger->info("Rule string: '{}'", rule); + ruleParse(rule); + m_logger->info("FlowScatter initialization completed successfully"); +} + +void FlowScatter::ruleParse(const std::string& rule) +{ + m_rules.branches.clear(); + + std::istringstream ruleStream(rule); + std::string branchStr; + + if (rule.empty()) { + return; // No rules defined, nothing to parse + } + + // Split by '|' + while (std::getline(ruleStream, branchStr, '|')) { + branchStr = trim(branchStr); + if (branchStr.empty()) { + continue; + } + + RuleBranch branch; + + // Expect branch in form ":(field1,field2,...)" + if (branchStr.front() != '<') { + throw std::invalid_argument("Rule conditional branch must start with '<': " + branchStr); + } + + const std::size_t gtPos = branchStr.find('>'); + if (gtPos == std::string::npos) { + throw std::invalid_argument("Rule conditional branch must end with '>': " + branchStr); + } + + // Extract and trim the conditional (may be empty) + std::string conditional = trim(branchStr.substr(1, gtPos - 1)); + branch.conditionalFieldId = conditional; + + const std::size_t colonPos = branchStr.find(':', gtPos + 1); + if (colonPos == std::string::npos) { + throw std::invalid_argument("Missing ':' after in branch: " + branchStr); + } + + const std::size_t lparPos = branchStr.find('(', colonPos + 1); + const std::size_t rparPos = branchStr.find(')', lparPos + 1); + + if (lparPos == std::string::npos || rparPos == std::string::npos || rparPos < lparPos) { + throw std::invalid_argument("Malformed field tuple in branch: " + branchStr); + } + + std::string tuple = branchStr.substr(lparPos + 1, rparPos - lparPos - 1); + std::istringstream tupleStream(tuple); + std::string fieldName; + + while (std::getline(tupleStream, fieldName, ',')) { + fieldName = trim(fieldName); + if (fieldName.empty()) { + throw std::invalid_argument("Empty field in tuple: " + branchStr); + } + branch.fieldNames.push_back(fieldName); + } + + if (branch.fieldNames.empty()) { + throw std::invalid_argument("Tuple must contain at least one field: " + branchStr); + } + m_rules.branches.push_back(std::move(branch)); + } + + m_logger->info("Parsed {} rule branches", m_rules.branches.size()); + for (size_t i = 0; i < m_rules.branches.size(); ++i) { + const auto& branch = m_rules.branches[i]; + std::string fieldsStr; + for (size_t j = 0; j < branch.fieldNames.size(); ++j) { + if (j > 0) fieldsStr += ","; + fieldsStr += branch.fieldNames[j]; + } + m_logger->info("Rule {}: conditional '{}' -> fields: ({})", i + 1, branch.conditionalFieldId, fieldsStr); + } +} + +size_t FlowScatter::outputIndex(UnirecRecordView& record) +{ + m_totalRecords++; + + // If no rules are defined, distribute records round-robin + if (m_rules.branches.empty()) { + size_t index = (m_totalRecords - 1) % M_NUM_OUTPUTS; + m_sentRecords[index]++; + return index; + } + + std::vector hashInput; + + // Iterate through conditional rules + for (const auto& branch : m_rules.branches) { + bool useThisRule = false; + + if (branch.conditionalFieldId.empty()) { + useThisRule = true; + } else { + auto fieldId = static_cast(ur_get_id_by_name(branch.conditionalFieldId.c_str())); + auto fieldType = ur_get_type(fieldId); + useThisRule = checkNonZeroValue(fieldId, fieldType, record); + } + + if (useThisRule) { + // Concatenate all fields for this rule + hashInput.clear(); + for (const auto& field : branch.fieldNames) { + auto hashFieldId = static_cast(ur_get_id_by_name(field.c_str())); + auto hashFieldType = ur_get_type(hashFieldId); + appendFieldToHash(hashInput, hashFieldId, hashFieldType, record); + } + break; + } + } + + // Fallback: if no rule matched, use round-robin + if (hashInput.empty()) { + size_t index = (m_totalRecords - 1) % M_NUM_OUTPUTS; + m_sentRecords[index]++; + return index; + } + + auto hashValue = XXH64(hashInput.data(), hashInput.size(), 0xdeadd00de); + auto index = hashValue % M_NUM_OUTPUTS; + + m_sentRecords[index]++; + + return index; +} + +FlowScatterStats FlowScatter::getStats() const noexcept +{ + FlowScatterStats stats; + stats.totalRecords = m_totalRecords; + + for (size_t i = 0; i < M_NUM_OUTPUTS; ++i) { + stats.sentRecords[i] = m_sentRecords[i]; + } + + return stats; +} + +}// namespace FlowScatter diff --git a/modules/flowScatter/src/flowScatter.hpp b/modules/flowScatter/src/flowScatter.hpp new file mode 100644 index 00000000..35059f52 --- /dev/null +++ b/modules/flowScatter/src/flowScatter.hpp @@ -0,0 +1,107 @@ +/** + * @file + * @author Jaroslav Pesek + * @brief Declaration of the FlowScatter class. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace Fs { + +using namespace Nemea; + +constexpr size_t MAX_OUTPUTS = 128; ///< Maximum number of outputs supported by the module. + +/** + * @brief Structure to hold flow scatter statistics. + * + * This structure is used to keep track of the total number of records processed + * and the number of records sent to each output. + */ +struct FlowScatterStats { + uint64_t totalRecords = 0; ///< Total number of records processed. + uint64_t sentRecords[MAX_OUTPUTS] = {0}; ///< Number of records sent to each output. +}; + +/** + * @brief Represents a single branch of a rule. + */ +struct RuleBranch { + std::string conditionalFieldId = std::string(""); ///< Name of the conditional field, empty string if unconditional. + std::vector fieldNames; ///< Names of fields to be hashed. +}; + +/** + * @brief Structure to hold rules for field selection and hashing. + * + * This structure defines how fields are selected for hashing based on conditions. + * It contains a list of conditional fields and a list of fields that will be used + * for hashing based on the conditions defined in the conditional fields. + */ +struct Rules { + std::vector branches; +}; + +/** + * @brief A class for distributing network flow records across multiple outputs based on configurable hashing rules. + * + * The FlowScatter class implements a load balancing mechanism that distributes Unirec records + * to multiple output interfaces based on hash values computed from specified record fields. + * It supports conditional field selection using tags and maintains statistics about the + * distribution of records across outputs. + * + * The class uses configurable rules to determine which fields to hash and supports + * conditional logic based on field values. Hash distribution ensures balanced load + * across all configured outputs while maintaining session affinity for records + * with identical hash keys. + * + * @note Maximum number of outputs is limited by MAX_OUTPUTS constant. + */ +class FlowScatter { +public: + /** + * @brief Constructs a FlowScatter object with the given number of outputs and rule. + * @param numOutputs The number of outputs to which records can be sent. + * @param rule The rule defining how to create a hash from the record fields. + */ + explicit FlowScatter(size_t numOutputs, std::string rule); + + + /** + * @brief Processes a Unirec record and returns the index of the output to which it should be sent. + * @param record The Unirec record to be processed. + * @return The index of the output to which the record should be sent. + */ + size_t outputIndex(UnirecRecordView& record); + + /** + * @brief Returns the current flow scatter statistics. + * @return The current flow scatter statistics. + */ + FlowScatterStats getStats() const noexcept; + +private: + void ruleParse(const std::string& rule); + + const size_t M_NUM_OUTPUTS; + uint64_t m_totalRecords = 0; + uint64_t m_sentRecords[MAX_OUTPUTS] = {0}; + Rules m_rules; + std::shared_ptr m_logger = Nm::loggerGet("FlowScatter"); + +}; + +} // namespace FlowScatter From 646b0909212db48882ef6943badc5997487a44b4 Mon Sep 17 00:00:00 2001 From: Jaroslav Pesek Date: Mon, 7 Jul 2025 14:52:21 +0200 Subject: [PATCH 4/7] Flow Scatter - Add main.cpp --- modules/flowScatter/src/main.cpp | 230 +++++++++++++++++++++++++++++++ 1 file changed, 230 insertions(+) create mode 100644 modules/flowScatter/src/main.cpp diff --git a/modules/flowScatter/src/main.cpp b/modules/flowScatter/src/main.cpp new file mode 100644 index 00000000..aacb980a --- /dev/null +++ b/modules/flowScatter/src/main.cpp @@ -0,0 +1,230 @@ +/** + * @file + * @author Jaroslav Pesek + * @author Karel Hynek + * @author Pavel Siska + * @brief Sampling Module: Sample flowdata + * + * This module distributes 1 unirec interface data to `n` trap outputs based on rules. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "logger/logger.hpp" +#include "flowScatter.hpp" +#include "unirec/unirec-telemetry.hpp" +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace Nemea; + +std::atomic g_stopFlag(false); + +void signalHandler(int signum) +{ + Nm::loggerGet("signalHandler")->info("Interrupt signal {} received", signum); + g_stopFlag.store(true); +} + +/** + * @brief Handle a format change exception by adjusting the template. + * + * This function is called when a `FormatChangeException` is caught in the main loop. + * + * @param inputInterface Input interface for Unirec communication. + * @param outputInterfaces Output interfaces for Unirec communication. + */ +void handleFormatChange(UnirecInputInterface& inputInterface, + std::vector& outputInterfaces) +{ + inputInterface.changeTemplate(); + uint8_t dataType; const char* spec = nullptr; + if (trap_get_data_fmt(TRAPIFC_INPUT, 0, &dataType, &spec) != TRAP_E_OK) { + throw std::runtime_error("Failed to get updated format from TRAP"); + } + for (auto& outIfc : outputInterfaces) { + outIfc.changeTemplate(spec); + } +} + +/** + * @brief Process the next Unirec record and sample them. + * + * This function receives the next Unirec record through the bidirectional interface + * and performs sampling. + * + * @param inputInterface Input interface for Unirec communication. + * @param outputInterfaces Output interfaces for Unirec communication. + * @param scatter Sampler class for sampling. + */ + +void processNextRecord(UnirecInputInterface& inputInterface, + std::vector& outputInterfaces, + Fs::FlowScatter& scatter) +{ + std::optional unirecRecord = inputInterface.receive(); + if (!unirecRecord) { + return; + } + size_t index = scatter.outputIndex(*unirecRecord); + outputInterfaces[index].send(*unirecRecord); +} + +/** + * @brief Process Unirec records. + * + * The `processUnirecRecords` function continuously receives Unirec records through the provided + * bidirectional interface (`biInterface`) and performs sampling. The loop runs indefinitely until + * an end-of-file condition is encountered. + * + * @param inputInterface Input interface for Unirec communication. + * @param outputInterfaces Output interfaces for Unirec communication. + * @param scatter Sampler class for sampling. + */ +void processUnirecRecords(UnirecInputInterface& inputInterface, + std::vector& outputInterfaces, + Fs::FlowScatter& scatter) +{ + while (!g_stopFlag.load()) { + try { + processNextRecord(inputInterface, outputInterfaces, scatter); + } catch (FormatChangeException& ex) { + handleFormatChange(inputInterface, outputInterfaces); + } catch (EoFException& ex) { + break; + } catch (std::exception& ex) { + throw; + } + } +} + +telemetry::Content getScatterTelemetry(const Fs::FlowScatter& scatter) +{ + auto stats = scatter.getStats(); + + telemetry::Dict dict; + dict["totalRecords"] = stats.totalRecords; + // dict["sampledRecords"] = stats.sampledRecords; + return dict; +} + +int main(int argc, char** argv) +{ + argparse::ArgumentParser program("Unirec Flow Scatter"); + + Nm::loggerInit(); + auto logger = Nm::loggerGet("main"); + + signal(SIGINT, signalHandler); + + try { + program.add_argument("-r", "--rule") + .required() + .help( + "Specify the rule set.") + .default_value(std::string("<>:(SRC_IP)")); + program.add_argument("-c", "--count") + .required() + .help("Specify the number of output interfaces.") + .scan<'i', int>() + .default_value(5); + program.add_argument("-m", "--appfs-mountpoint") + .required() + .help("path where the appFs directory will be mounted") + .default_value(std::string("")); + } catch (const std::exception& ex) { + logger->error(ex.what()); + return EXIT_FAILURE; + } + try { + program.parse_known_args(argc, argv); + } catch (const std::exception& ex) { + logger->error(ex.what()); + return EXIT_FAILURE; + } + + size_t outputCount = 0; + try { + outputCount = static_cast(program.get("--count")); + if (outputCount < 1 || outputCount > Fs::MAX_OUTPUTS) { + throw std::runtime_error("Invalid number of output interfaces: " + std::to_string(outputCount) + + ". Must be in range 1 to " + std::to_string(Fs::MAX_OUTPUTS)); + } + } catch (const std::exception& ex) { + logger->error("Error parsing output count: {}", ex.what()); + return EXIT_FAILURE; + } + + std::shared_ptr telemetryRootDirectory; + telemetryRootDirectory = telemetry::Directory::create(); + + std::unique_ptr appFs; + + try { + auto mountPoint = program.get("--appfs-mountpoint"); + if (!mountPoint.empty()) { + const bool tryToUnmountOnStart = true; + const bool createMountPoint = true; + appFs = std::make_unique( + telemetryRootDirectory, + mountPoint, + tryToUnmountOnStart, + createMountPoint); + appFs->start(); + } + } catch (std::exception& ex) { + logger->error(ex.what()); + return EXIT_FAILURE; + } + + try { + const std::string rule = program.get("--rule"); + + Unirec unirec({1, static_cast(outputCount), "flowscatter", "Unirec flow scatter module"}); + + try { + unirec.init(argc, argv); + } catch (HelpException& ex) { + std::cerr << program; + return EXIT_SUCCESS; + } catch (std::exception& ex) { + logger->error(ex.what()); + return EXIT_FAILURE; + } + + UnirecInputInterface inputInterface = unirec.buildInputInterface(); + std::vector outputInterfaces; + outputInterfaces.reserve(outputCount); + + for (size_t i = 0; i < outputCount; ++i) { + outputInterfaces.emplace_back(unirec.buildOutputInterface()); + } + + Fs::FlowScatter scatter(outputCount, rule); + + auto telemetryInputDirectory = telemetryRootDirectory->addDir("input"); + const telemetry::FileOps inputFileOps + = {[&inputInterface]() { return Nm::getInterfaceTelemetry(inputInterface); }, nullptr}; + const auto inputFile = telemetryInputDirectory->addFile("stats", inputFileOps); + auto telemetryScatterDirectory = telemetryRootDirectory->addDir("flowscatter"); + const telemetry::FileOps samplerFileOps + = {[&scatter]() { return getScatterTelemetry(scatter); }, nullptr}; + const auto samplerFile = telemetryScatterDirectory->addFile("stats", samplerFileOps); + processUnirecRecords(inputInterface, outputInterfaces, scatter); + } catch (std::exception& ex) { + logger->error(ex.what()); + return EXIT_FAILURE; + } + + return EXIT_SUCCESS; +} From 2b78df18219c9427825b3ce2c1cc69a2cec8581a Mon Sep 17 00:00:00 2001 From: GalaxP Date: Mon, 13 Oct 2025 17:40:53 +0200 Subject: [PATCH 5/7] Flow Scatter - caching of rule branches --- modules/flowScatter/src/flowScatter.cpp | 61 ++++++++++++++++++++----- modules/flowScatter/src/flowScatter.hpp | 22 +++++++++ modules/flowScatter/src/main.cpp | 13 +++++- 3 files changed, 83 insertions(+), 13 deletions(-) diff --git a/modules/flowScatter/src/flowScatter.cpp b/modules/flowScatter/src/flowScatter.cpp index feb675f5..dc6fe0d7 100644 --- a/modules/flowScatter/src/flowScatter.cpp +++ b/modules/flowScatter/src/flowScatter.cpp @@ -169,12 +169,21 @@ FlowScatter::FlowScatter(size_t numOutputs, std::string rule) m_logger->info("Initializing FlowScatter with {} outputs", numOutputs); m_logger->info("Rule string: '{}'", rule); ruleParse(rule); + // Resolve fields for the current UniRec template at construction time. + try { + changeTemplate(); + } catch (const std::exception& ex) { + m_logger->warn("Unable to fully resolve rule fields at construction: {}", ex.what()); + // We don't fail here. Fields will be resolved when template is available + // (e.g., on first format change notification). + } m_logger->info("FlowScatter initialization completed successfully"); } void FlowScatter::ruleParse(const std::string& rule) { m_rules.branches.clear(); + m_cachedBranches.clear(); std::istringstream ruleStream(rule); std::string branchStr; @@ -248,6 +257,41 @@ void FlowScatter::ruleParse(const std::string& rule) } } +void FlowScatter::changeTemplate() +{ + m_cachedBranches.clear(); + + for (const auto& branch : m_rules.branches) { + CachedBranch cb; + + if (!branch.conditionalFieldId.empty()) { + int fieldId = ur_get_id_by_name(branch.conditionalFieldId.c_str()); + if (fieldId < 0) { + throw std::runtime_error("Conditional field not found: " + branch.conditionalFieldId); + } + cb.conditionalId = static_cast(fieldId); + cb.conditionalType = ur_get_type(cb.conditionalId); + } + + cb.fieldIds.reserve(branch.fieldNames.size()); + cb.fieldTypes.reserve(branch.fieldNames.size()); + + for (const auto& fname : branch.fieldNames) { + int fieldId = ur_get_id_by_name(fname.c_str()); + if (fieldId < 0) { + throw std::runtime_error("Field for hashing not found in template: " + fname); + } + auto fid = static_cast(fieldId); + cb.fieldIds.push_back(fid); + cb.fieldTypes.push_back(ur_get_type(fid)); + } + + m_cachedBranches.push_back(std::move(cb)); + } + + m_logger->info("Resolved {} cached rule branches for current UniRec template", m_cachedBranches.size()); +} + size_t FlowScatter::outputIndex(UnirecRecordView& record) { m_totalRecords++; @@ -261,25 +305,20 @@ size_t FlowScatter::outputIndex(UnirecRecordView& record) std::vector hashInput; - // Iterate through conditional rules - for (const auto& branch : m_rules.branches) { + // Iterate through cached conditional rules + for (const auto& cb : m_cachedBranches) { bool useThisRule = false; - if (branch.conditionalFieldId.empty()) { + if (cb.conditionalId == 0) { useThisRule = true; } else { - auto fieldId = static_cast(ur_get_id_by_name(branch.conditionalFieldId.c_str())); - auto fieldType = ur_get_type(fieldId); - useThisRule = checkNonZeroValue(fieldId, fieldType, record); + useThisRule = checkNonZeroValue(cb.conditionalId, cb.conditionalType, record); } if (useThisRule) { - // Concatenate all fields for this rule hashInput.clear(); - for (const auto& field : branch.fieldNames) { - auto hashFieldId = static_cast(ur_get_id_by_name(field.c_str())); - auto hashFieldType = ur_get_type(hashFieldId); - appendFieldToHash(hashInput, hashFieldId, hashFieldType, record); + for (size_t i = 0; i < cb.fieldIds.size(); ++i) { + appendFieldToHash(hashInput, cb.fieldIds[i], cb.fieldTypes[i], record); } break; } diff --git a/modules/flowScatter/src/flowScatter.hpp b/modules/flowScatter/src/flowScatter.hpp index 35059f52..7a5373c2 100644 --- a/modules/flowScatter/src/flowScatter.hpp +++ b/modules/flowScatter/src/flowScatter.hpp @@ -87,6 +87,18 @@ class FlowScatter { */ size_t outputIndex(UnirecRecordView& record); + /** + * @brief Re-resolve field ids/types after a UniRec template/format change. + * + * Call this when the UniRec template changes (format change). This will + * resolve all field names from the parsed rules to UniRec field ids and + * cache their types so `outputIndex` does not need to look them up per-record. + * + * This method may throw if any required field is not present in the + * current UniRec template. + */ + void changeTemplate(); + /** * @brief Returns the current flow scatter statistics. * @return The current flow scatter statistics. @@ -102,6 +114,16 @@ class FlowScatter { Rules m_rules; std::shared_ptr m_logger = Nm::loggerGet("FlowScatter"); + /** Cached mapping of rule branches to UniRec field ids and types. */ + struct CachedBranch { + ur_field_id_t conditionalId = 0; + ur_field_type_t conditionalType = static_cast(0); + std::vector fieldIds; + std::vector fieldTypes; + }; + + std::vector m_cachedBranches; + }; } // namespace FlowScatter diff --git a/modules/flowScatter/src/main.cpp b/modules/flowScatter/src/main.cpp index aacb980a..97416702 100644 --- a/modules/flowScatter/src/main.cpp +++ b/modules/flowScatter/src/main.cpp @@ -45,7 +45,8 @@ void signalHandler(int signum) * @param outputInterfaces Output interfaces for Unirec communication. */ void handleFormatChange(UnirecInputInterface& inputInterface, - std::vector& outputInterfaces) + std::vector& outputInterfaces, + Fs::FlowScatter& scatter) { inputInterface.changeTemplate(); uint8_t dataType; const char* spec = nullptr; @@ -55,6 +56,14 @@ void handleFormatChange(UnirecInputInterface& inputInterface, for (auto& outIfc : outputInterfaces) { outIfc.changeTemplate(spec); } + // Notify scatter so it can resolve UniRec field ids/types once per format change. + try { + scatter.changeTemplate(); + } catch (const std::exception& ex) { + Nm::loggerGet("main")->warn("FlowScatter: unable to resolve fields after format change: {}", ex.what()); + // Don't rethrow — module can continue and will fallback to round-robin until + // fields are available. + } } /** @@ -99,7 +108,7 @@ void processUnirecRecords(UnirecInputInterface& inputInterface, try { processNextRecord(inputInterface, outputInterfaces, scatter); } catch (FormatChangeException& ex) { - handleFormatChange(inputInterface, outputInterfaces); + handleFormatChange(inputInterface, outputInterfaces, scatter); } catch (EoFException& ex) { break; } catch (std::exception& ex) { From 53da95da08d2fa3f5e4c1020e9d8fd2d6488da7f Mon Sep 17 00:00:00 2001 From: Jaroslav Pesek Date: Thu, 23 Oct 2025 15:51:01 +0200 Subject: [PATCH 6/7] Nix - add editorcheck to devel env --- flake.nix | 1 + 1 file changed, 1 insertion(+) diff --git a/flake.nix b/flake.nix index dfb4dc4d..5e72d4cf 100644 --- a/flake.nix +++ b/flake.nix @@ -28,6 +28,7 @@ pkgs.fuse3 pkgs.rpm pkgs.clang-tools + pkgs.editorconfig-checker netmonpkgs.packages.x86_64-linux.nemea-modules-meta netmonpkgs.packages.x86_64-linux.nemea-framework netmonpkgs.packages.x86_64-linux.telemetry From 4c7dc312a04cd919810fe457b5b53e9ba6c345af Mon Sep 17 00:00:00 2001 From: Jaroslav Pesek Date: Thu, 23 Oct 2025 15:51:31 +0200 Subject: [PATCH 7/7] Flowscatter - comply with tidy --- flake.nix | 1 + modules/flowScatter/src/flowScatter.cpp | 582 +++++++++++++----------- modules/flowScatter/src/flowScatter.hpp | 126 ++--- modules/flowScatter/src/main.cpp | 80 ++-- 4 files changed, 427 insertions(+), 362 deletions(-) diff --git a/flake.nix b/flake.nix index 5e72d4cf..99966e0a 100644 --- a/flake.nix +++ b/flake.nix @@ -29,6 +29,7 @@ pkgs.rpm pkgs.clang-tools pkgs.editorconfig-checker + pkgs.doxygen netmonpkgs.packages.x86_64-linux.nemea-modules-meta netmonpkgs.packages.x86_64-linux.nemea-framework netmonpkgs.packages.x86_64-linux.telemetry diff --git a/modules/flowScatter/src/flowScatter.cpp b/modules/flowScatter/src/flowScatter.cpp index dc6fe0d7..5f9c2a8f 100644 --- a/modules/flowScatter/src/flowScatter.cpp +++ b/modules/flowScatter/src/flowScatter.cpp @@ -7,8 +7,8 @@ */ #include "flowScatter.hpp" -#include #include +#include namespace { @@ -21,11 +21,11 @@ using namespace Nemea; * @param vec Reference to the vector where bytes will be appended * @param value The value whose binary representation will be appended */ -template -void append_bytes(std::vector& vec, const T& value) +template +void appendBytes(std::vector& vec, const T& value) { - const auto* bytes = reinterpret_cast(&value); - vec.insert(vec.end(), bytes, bytes + sizeof(T)); + const auto* bytes = reinterpret_cast(&value); + vec.insert(vec.end(), bytes, bytes + sizeof(T)); } /** @@ -36,13 +36,13 @@ void append_bytes(std::vector& vec, const T& value) */ std::string trim(const std::string& str) { - const char* ws = " \t\n\r"; - const auto begin = str.find_first_not_of(ws); - if (begin == std::string::npos) { - return ""; - } - const auto end = str.find_last_not_of(ws); - return str.substr(begin, end - begin + 1); + const char* whiteSpace = " \t\n\r"; + const auto begin = str.find_first_not_of(whiteSpace); + if (begin == std::string::npos) { + return ""; + } + const auto end = str.find_last_not_of(whiteSpace); + return str.substr(begin, end - begin + 1); } /** @@ -59,31 +59,36 @@ std::string trim(const std::string& str) * * @throws std::runtime_error if the field type is not supported */ -bool checkNonZeroValue(ur_field_id_t fieldId, ur_field_type_t fieldType, const UnirecRecordView& record) { - switch (fieldType) { - case UR_TYPE_UINT64: - return (record.getFieldAsType(fieldId) != 0); - case UR_TYPE_INT64: - return (record.getFieldAsType(fieldId) != 0); - case UR_TYPE_UINT32: - return (record.getFieldAsType(fieldId) != 0); - case UR_TYPE_INT32: - return (record.getFieldAsType(fieldId) != 0); - case UR_TYPE_UINT16: - return (record.getFieldAsType(fieldId) != 0); - case UR_TYPE_INT16: - return (record.getFieldAsType(fieldId) != 0); - case UR_TYPE_UINT8: - return (record.getFieldAsType(fieldId) != 0); - case UR_TYPE_INT8: - return (record.getFieldAsType(fieldId) != 0); - case UR_TYPE_FLOAT: - return (record.getFieldAsType(fieldId) != 0.0f); - case UR_TYPE_DOUBLE: - return (record.getFieldAsType(fieldId) != 0.0); - default: - throw std::runtime_error("Unsupported conditional field type: " + std::string(ur_get_name(fieldId))); - } +bool checkNonZeroValue( + ur_field_id_t fieldId, + ur_field_type_t fieldType, + const UnirecRecordView& record) +{ + switch (fieldType) { + case UR_TYPE_UINT64: + return (record.getFieldAsType(fieldId) != 0); + case UR_TYPE_INT64: + return (record.getFieldAsType(fieldId) != 0); + case UR_TYPE_UINT32: + return (record.getFieldAsType(fieldId) != 0); + case UR_TYPE_INT32: + return (record.getFieldAsType(fieldId) != 0); + case UR_TYPE_UINT16: + return (record.getFieldAsType(fieldId) != 0); + case UR_TYPE_INT16: + return (record.getFieldAsType(fieldId) != 0); + case UR_TYPE_UINT8: + return (record.getFieldAsType(fieldId) != 0); + case UR_TYPE_INT8: + return (record.getFieldAsType(fieldId) != 0); + case UR_TYPE_FLOAT: + return (record.getFieldAsType(fieldId) != 0.0F); + case UR_TYPE_DOUBLE: + return (record.getFieldAsType(fieldId) != 0.0); + default: + throw std::runtime_error( + "Unsupported conditional field type: " + std::string(ur_get_name(fieldId))); + } } /** @@ -99,256 +104,307 @@ bool checkNonZeroValue(ur_field_id_t fieldId, ur_field_type_t fieldType, const U * * @throws std::runtime_error if the field type is not supported for hashing */ -void appendFieldToHash(std::vector& hashInput, ur_field_id_t fieldId, ur_field_type_t fieldType, const UnirecRecordView& record) +void appendFieldToHash( + std::vector& hashInput, + ur_field_id_t fieldId, + ur_field_type_t fieldType, + const UnirecRecordView& record) { - switch (fieldType) { - case UR_TYPE_UINT64: { - auto value = record.getFieldAsType(fieldId); - append_bytes(hashInput, value); - break; - } - case UR_TYPE_INT64: { - auto value = record.getFieldAsType(fieldId); - append_bytes(hashInput, value); - break; - } - case UR_TYPE_UINT32: { - auto value = record.getFieldAsType(fieldId); - append_bytes(hashInput, value); - break; - } - case UR_TYPE_INT32: { - auto value = record.getFieldAsType(fieldId); - append_bytes(hashInput, value); - break; - } - case UR_TYPE_UINT16: { - auto value = record.getFieldAsType(fieldId); - append_bytes(hashInput, value); - break; - } - case UR_TYPE_INT16: { - auto value = record.getFieldAsType(fieldId); - append_bytes(hashInput, value); - break; - } - case UR_TYPE_UINT8: { - auto value = record.getFieldAsType(fieldId); - append_bytes(hashInput, value); - break; - } - case UR_TYPE_INT8: { - auto value = record.getFieldAsType(fieldId); - append_bytes(hashInput, value); - break; - } - case UR_TYPE_IP: { - auto value = record.getFieldAsType(fieldId); - append_bytes(hashInput, value); - break; - } - case UR_TYPE_MAC: { - auto value = record.getFieldAsType(fieldId); - append_bytes(hashInput, value); - break; - } - default: - throw std::runtime_error("Unsupported field type for hashing: " + std::string(ur_get_name(fieldId))); - } + switch (fieldType) { + case UR_TYPE_UINT64: { + auto value = record.getFieldAsType(fieldId); + appendBytes(hashInput, value); + break; + } + case UR_TYPE_INT64: { + auto value = record.getFieldAsType(fieldId); + appendBytes(hashInput, value); + break; + } + case UR_TYPE_UINT32: { + auto value = record.getFieldAsType(fieldId); + appendBytes(hashInput, value); + break; + } + case UR_TYPE_INT32: { + auto value = record.getFieldAsType(fieldId); + appendBytes(hashInput, value); + break; + } + case UR_TYPE_UINT16: { + auto value = record.getFieldAsType(fieldId); + appendBytes(hashInput, value); + break; + } + case UR_TYPE_INT16: { + auto value = record.getFieldAsType(fieldId); + appendBytes(hashInput, value); + break; + } + case UR_TYPE_UINT8: { + auto value = record.getFieldAsType(fieldId); + appendBytes(hashInput, value); + break; + } + case UR_TYPE_INT8: { + auto value = record.getFieldAsType(fieldId); + appendBytes(hashInput, value); + break; + } + case UR_TYPE_IP: { + auto value = record.getFieldAsType(fieldId); + appendBytes(hashInput, value); + break; + } + case UR_TYPE_MAC: { + auto value = record.getFieldAsType(fieldId); + appendBytes(hashInput, value); + break; + } + default: + throw std::runtime_error( + "Unsupported field type for hashing: " + std::string(ur_get_name(fieldId))); + } } } // namespace namespace Fs { FlowScatter::FlowScatter(size_t numOutputs, std::string rule) - : M_NUM_OUTPUTS(numOutputs) + : M_NUM_OUTPUTS(numOutputs) +{ + if (numOutputs > g_MAX_OUTPUTS) { + throw std::invalid_argument( + "Number of outputs must be between 1 and " + std::to_string(g_MAX_OUTPUTS)); + } + m_logger->info("Initializing FlowScatter with {} outputs", numOutputs); + m_logger->info("Rule string: '{}'", rule); + ruleParse(rule); + // Resolve fields for the current UniRec template at construction time. + try { + changeTemplate(); + } catch (const std::exception& ex) { + m_logger->warn("Unable to fully resolve rule fields at construction: {}", ex.what()); + // We don't fail here. Fields will be resolved when template is available + // (e.g., on first format change notification). + } + m_logger->info("FlowScatter initialization completed successfully"); +} + +/** + * @brief Split a comma-separated list into trimmed tokens. + */ +static std::vector splitCommaSeparated(std::string_view input) +{ + std::vector tokens; + std::size_t start = 0; + while (start < input.size()) { + const std::size_t comma = input.find(',', start); + const std::string_view piece = (comma == std::string::npos) + ? input.substr(start) + : input.substr(start, comma - start); + std::string trimmed = trim(std::string(piece)); + if (!trimmed.empty()) { + tokens.emplace_back(std::move(trimmed)); + } else { + // Keep behavior consistent with original: empty field is invalid + throw std::invalid_argument("Empty field in tuple"); + } + if (comma == std::string::npos) { + break; + } + start = comma + 1; + } + return tokens; +} + +/** + * @brief Parse a single branch specification like ":(f1,f2,...)". + */ +static Fs::RuleBranch parseBranchString(const std::string& rawBranch) { - if (numOutputs <= 0 || numOutputs > MAX_OUTPUTS) { - throw std::invalid_argument("Number of outputs must be between 1 and " + std::to_string(MAX_OUTPUTS)); - } - m_logger->info("Initializing FlowScatter with {} outputs", numOutputs); - m_logger->info("Rule string: '{}'", rule); - ruleParse(rule); - // Resolve fields for the current UniRec template at construction time. - try { - changeTemplate(); - } catch (const std::exception& ex) { - m_logger->warn("Unable to fully resolve rule fields at construction: {}", ex.what()); - // We don't fail here. Fields will be resolved when template is available - // (e.g., on first format change notification). - } - m_logger->info("FlowScatter initialization completed successfully"); + const std::string branchStr = trim(rawBranch); + if (branchStr.empty()) { + throw std::invalid_argument("Empty branch specification"); + } + if (branchStr.front() != '<') { + throw std::invalid_argument("Rule conditional branch must start with '<': " + branchStr); + } + + const std::size_t gtPos = branchStr.find('>'); + if (gtPos == std::string::npos) { + throw std::invalid_argument("Rule conditional branch must end with '>': " + branchStr); + } + + Fs::RuleBranch branch; + + const std::string conditional = trim(branchStr.substr(1, gtPos - 1)); + branch.conditionalFieldId = conditional; + + const std::size_t colonPos = branchStr.find(':', gtPos + 1); + if (colonPos == std::string::npos) { + throw std::invalid_argument("Missing ':' after in branch: " + branchStr); + } + + const std::size_t lparPos = branchStr.find('(', colonPos + 1); + const std::size_t rparPos = branchStr.find(')', lparPos + 1); + if (lparPos == std::string::npos || rparPos == std::string::npos || rparPos < lparPos) { + throw std::invalid_argument("Malformed field tuple in branch: " + branchStr); + } + + const std::string tuple = branchStr.substr(lparPos + 1, rparPos - lparPos - 1); + branch.fieldNames = splitCommaSeparated(tuple); + + if (branch.fieldNames.empty()) { + throw std::invalid_argument("Tuple must contain at least one field: " + branchStr); + } + + return branch; } void FlowScatter::ruleParse(const std::string& rule) { - m_rules.branches.clear(); - m_cachedBranches.clear(); - - std::istringstream ruleStream(rule); - std::string branchStr; - - if (rule.empty()) { - return; // No rules defined, nothing to parse - } - - // Split by '|' - while (std::getline(ruleStream, branchStr, '|')) { - branchStr = trim(branchStr); - if (branchStr.empty()) { - continue; - } - - RuleBranch branch; - - // Expect branch in form ":(field1,field2,...)" - if (branchStr.front() != '<') { - throw std::invalid_argument("Rule conditional branch must start with '<': " + branchStr); - } - - const std::size_t gtPos = branchStr.find('>'); - if (gtPos == std::string::npos) { - throw std::invalid_argument("Rule conditional branch must end with '>': " + branchStr); - } - - // Extract and trim the conditional (may be empty) - std::string conditional = trim(branchStr.substr(1, gtPos - 1)); - branch.conditionalFieldId = conditional; - - const std::size_t colonPos = branchStr.find(':', gtPos + 1); - if (colonPos == std::string::npos) { - throw std::invalid_argument("Missing ':' after in branch: " + branchStr); - } - - const std::size_t lparPos = branchStr.find('(', colonPos + 1); - const std::size_t rparPos = branchStr.find(')', lparPos + 1); - - if (lparPos == std::string::npos || rparPos == std::string::npos || rparPos < lparPos) { - throw std::invalid_argument("Malformed field tuple in branch: " + branchStr); - } - - std::string tuple = branchStr.substr(lparPos + 1, rparPos - lparPos - 1); - std::istringstream tupleStream(tuple); - std::string fieldName; - - while (std::getline(tupleStream, fieldName, ',')) { - fieldName = trim(fieldName); - if (fieldName.empty()) { - throw std::invalid_argument("Empty field in tuple: " + branchStr); - } - branch.fieldNames.push_back(fieldName); - } - - if (branch.fieldNames.empty()) { - throw std::invalid_argument("Tuple must contain at least one field: " + branchStr); - } - m_rules.branches.push_back(std::move(branch)); - } - - m_logger->info("Parsed {} rule branches", m_rules.branches.size()); - for (size_t i = 0; i < m_rules.branches.size(); ++i) { - const auto& branch = m_rules.branches[i]; - std::string fieldsStr; - for (size_t j = 0; j < branch.fieldNames.size(); ++j) { - if (j > 0) fieldsStr += ","; - fieldsStr += branch.fieldNames[j]; - } - m_logger->info("Rule {}: conditional '{}' -> fields: ({})", i + 1, branch.conditionalFieldId, fieldsStr); - } + m_rules.branches.clear(); + m_cachedBranches.clear(); + + if (rule.empty()) { + return; // No rules defined, nothing to parse + } + + std::istringstream ruleStream(rule); + std::string branchStr; + + // Split by '|' + while (std::getline(ruleStream, branchStr, '|')) { + const std::string trimmed = trim(branchStr); + if (trimmed.empty()) { + continue; + } + // Delegate parsing of a single branch + m_rules.branches.emplace_back(parseBranchString(trimmed)); + } + + m_logger->info("Parsed {} rule branches", m_rules.branches.size()); + for (std::size_t i = 0; i < m_rules.branches.size(); ++i) { + const auto& branch = m_rules.branches[i]; + std::string fieldsStr; + for (std::size_t j = 0; j < branch.fieldNames.size(); ++j) { + if (j > 0) { + fieldsStr += ","; + } + fieldsStr += branch.fieldNames[j]; + } + m_logger->info( + "Rule {}: conditional '{}' -> fields: ({})", + i + 1, + branch.conditionalFieldId, + fieldsStr); + } } void FlowScatter::changeTemplate() { - m_cachedBranches.clear(); - - for (const auto& branch : m_rules.branches) { - CachedBranch cb; - - if (!branch.conditionalFieldId.empty()) { - int fieldId = ur_get_id_by_name(branch.conditionalFieldId.c_str()); - if (fieldId < 0) { - throw std::runtime_error("Conditional field not found: " + branch.conditionalFieldId); - } - cb.conditionalId = static_cast(fieldId); - cb.conditionalType = ur_get_type(cb.conditionalId); - } - - cb.fieldIds.reserve(branch.fieldNames.size()); - cb.fieldTypes.reserve(branch.fieldNames.size()); - - for (const auto& fname : branch.fieldNames) { - int fieldId = ur_get_id_by_name(fname.c_str()); - if (fieldId < 0) { - throw std::runtime_error("Field for hashing not found in template: " + fname); - } - auto fid = static_cast(fieldId); - cb.fieldIds.push_back(fid); - cb.fieldTypes.push_back(ur_get_type(fid)); - } - - m_cachedBranches.push_back(std::move(cb)); - } - - m_logger->info("Resolved {} cached rule branches for current UniRec template", m_cachedBranches.size()); + m_cachedBranches.clear(); + + for (const auto& branch : m_rules.branches) { + CachedBranch cachedBranch; + + if (!branch.conditionalFieldId.empty()) { + int const fieldId = ur_get_id_by_name(branch.conditionalFieldId.c_str()); + if (fieldId < 0) { + throw std::runtime_error( + "Conditional field not found: " + branch.conditionalFieldId); + } + cachedBranch.conditionalId = static_cast(fieldId); + cachedBranch.conditionalType = ur_get_type(cachedBranch.conditionalId); + } + + cachedBranch.fieldIds.reserve(branch.fieldNames.size()); + cachedBranch.fieldTypes.reserve(branch.fieldNames.size()); + + for (const auto& fname : branch.fieldNames) { + int const fieldId = ur_get_id_by_name(fname.c_str()); + if (fieldId < 0) { + throw std::runtime_error("Field for hashing not found in template: " + fname); + } + auto fid = static_cast(fieldId); + cachedBranch.fieldIds.push_back(fid); + cachedBranch.fieldTypes.push_back(ur_get_type(fid)); + } + + m_cachedBranches.push_back(std::move(cachedBranch)); + } + + m_logger->info( + "Resolved {} cached rule branches for current UniRec template", + m_cachedBranches.size()); } -size_t FlowScatter::outputIndex(UnirecRecordView& record) +size_t FlowScatter::outputIndex(const UnirecRecordView& record) { - m_totalRecords++; - - // If no rules are defined, distribute records round-robin - if (m_rules.branches.empty()) { - size_t index = (m_totalRecords - 1) % M_NUM_OUTPUTS; - m_sentRecords[index]++; - return index; - } - - std::vector hashInput; - - // Iterate through cached conditional rules - for (const auto& cb : m_cachedBranches) { - bool useThisRule = false; - - if (cb.conditionalId == 0) { - useThisRule = true; - } else { - useThisRule = checkNonZeroValue(cb.conditionalId, cb.conditionalType, record); - } - - if (useThisRule) { - hashInput.clear(); - for (size_t i = 0; i < cb.fieldIds.size(); ++i) { - appendFieldToHash(hashInput, cb.fieldIds[i], cb.fieldTypes[i], record); - } - break; - } - } - - // Fallback: if no rule matched, use round-robin - if (hashInput.empty()) { - size_t index = (m_totalRecords - 1) % M_NUM_OUTPUTS; - m_sentRecords[index]++; - return index; - } - - auto hashValue = XXH64(hashInput.data(), hashInput.size(), 0xdeadd00de); - auto index = hashValue % M_NUM_OUTPUTS; - - m_sentRecords[index]++; - - return index; + m_totalRecords++; + + // If no rules are defined, distribute records round-robin + if (m_rules.branches.empty()) { + size_t const index = (m_totalRecords - 1) % M_NUM_OUTPUTS; + m_sentRecords[index]++; + return index; + } + + std::vector hashInput; + + // Iterate through cached conditional rules + for (const auto& cachedBranch : m_cachedBranches) { + bool useThisRule = false; + + if (cachedBranch.conditionalId == 0) { + useThisRule = true; + } else { + useThisRule = checkNonZeroValue( + cachedBranch.conditionalId, + cachedBranch.conditionalType, + record); + } + + if (useThisRule) { + hashInput.clear(); + for (size_t i = 0; i < cachedBranch.fieldIds.size(); ++i) { + appendFieldToHash( + hashInput, + cachedBranch.fieldIds[i], + cachedBranch.fieldTypes[i], + record); + } + break; + } + } + + // Fallback: if no rule matched, use round-robin + if (hashInput.empty()) { + size_t const index = (m_totalRecords - 1) % M_NUM_OUTPUTS; + m_sentRecords[index]++; + return index; + } + // NOLINTBEGIN(readability-magic-numbers) + auto hashValue = XXH64(hashInput.data(), hashInput.size(), 0xdeadd00de); + // NOLINTEND(readability-magic-numbers) + auto index = hashValue % M_NUM_OUTPUTS; + + m_sentRecords[index]++; + + return index; } FlowScatterStats FlowScatter::getStats() const noexcept { - FlowScatterStats stats; - stats.totalRecords = m_totalRecords; + FlowScatterStats stats; + stats.totalRecords = m_totalRecords; - for (size_t i = 0; i < M_NUM_OUTPUTS; ++i) { - stats.sentRecords[i] = m_sentRecords[i]; - } + for (size_t i = 0; i < M_NUM_OUTPUTS; ++i) { + stats.sentRecords[i] = m_sentRecords[i]; + } - return stats; + return stats; } -}// namespace FlowScatter +} // namespace Fs diff --git a/modules/flowScatter/src/flowScatter.hpp b/modules/flowScatter/src/flowScatter.hpp index 7a5373c2..65fdf507 100644 --- a/modules/flowScatter/src/flowScatter.hpp +++ b/modules/flowScatter/src/flowScatter.hpp @@ -8,14 +8,12 @@ #pragma once -#include -#include -#include #include -#include +#include #include #include -#include +#include +#include #include #include @@ -23,7 +21,7 @@ namespace Fs { using namespace Nemea; -constexpr size_t MAX_OUTPUTS = 128; ///< Maximum number of outputs supported by the module. +constexpr size_t g_MAX_OUTPUTS = 128; ///< Maximum number of outputs supported by the module. /** * @brief Structure to hold flow scatter statistics. @@ -32,16 +30,18 @@ constexpr size_t MAX_OUTPUTS = 128; ///< Maximum number of outputs supported by * and the number of records sent to each output. */ struct FlowScatterStats { - uint64_t totalRecords = 0; ///< Total number of records processed. - uint64_t sentRecords[MAX_OUTPUTS] = {0}; ///< Number of records sent to each output. + uint64_t totalRecords = 0; ///< Total number of records processed. + std::array sentRecords + = {0}; ///< Number of records sent to each output. }; /** * @brief Represents a single branch of a rule. */ struct RuleBranch { - std::string conditionalFieldId = std::string(""); ///< Name of the conditional field, empty string if unconditional. - std::vector fieldNames; ///< Names of fields to be hashed. + std::string conditionalFieldId + = std::string(""); ///< Name of the conditional field, empty string if unconditional. + std::vector fieldNames; ///< Names of fields to be hashed. }; /** @@ -52,11 +52,12 @@ struct RuleBranch { * for hashing based on the conditions defined in the conditional fields. */ struct Rules { - std::vector branches; + std::vector branches; ///< Branches of the rule. }; /** - * @brief A class for distributing network flow records across multiple outputs based on configurable hashing rules. + * @brief A class for distributing network flow records across multiple outputs based on + * configurable hashing rules. * * The FlowScatter class implements a load balancing mechanism that distributes Unirec records * to multiple output interfaces based on hash values computed from specified record fields. @@ -72,58 +73,57 @@ struct Rules { */ class FlowScatter { public: - /** - * @brief Constructs a FlowScatter object with the given number of outputs and rule. - * @param numOutputs The number of outputs to which records can be sent. - * @param rule The rule defining how to create a hash from the record fields. - */ - explicit FlowScatter(size_t numOutputs, std::string rule); - - - /** - * @brief Processes a Unirec record and returns the index of the output to which it should be sent. - * @param record The Unirec record to be processed. - * @return The index of the output to which the record should be sent. - */ - size_t outputIndex(UnirecRecordView& record); - - /** - * @brief Re-resolve field ids/types after a UniRec template/format change. - * - * Call this when the UniRec template changes (format change). This will - * resolve all field names from the parsed rules to UniRec field ids and - * cache their types so `outputIndex` does not need to look them up per-record. - * - * This method may throw if any required field is not present in the - * current UniRec template. - */ - void changeTemplate(); - - /** - * @brief Returns the current flow scatter statistics. - * @return The current flow scatter statistics. - */ - FlowScatterStats getStats() const noexcept; + /** + * @brief Constructs a FlowScatter object with the given number of outputs and rule. + * @param numOutputs The number of outputs to which records can be sent. + * @param rule The rule defining how to create a hash from the record fields. + */ + explicit FlowScatter(size_t numOutputs, std::string rule); + + /** + * @brief Processes a Unirec record and returns the index of the output to which it should be + * sent. + * @param record The Unirec record to be processed. + * @return The index of the output to which the record should be sent. + */ + size_t outputIndex(const UnirecRecordView& record); + + /** + * @brief Re-resolve field ids/types after a UniRec template/format change. + * + * Call this when the UniRec template changes (format change). This will + * resolve all field names from the parsed rules to UniRec field ids and + * cache their types so `outputIndex` does not need to look them up per-record. + * + * This method may throw if any required field is not present in the + * current UniRec template. + */ + void changeTemplate(); + + /** + * @brief Returns the current flow scatter statistics. + * @return The current flow scatter statistics. + */ + FlowScatterStats getStats() const noexcept; private: - void ruleParse(const std::string& rule); - - const size_t M_NUM_OUTPUTS; - uint64_t m_totalRecords = 0; - uint64_t m_sentRecords[MAX_OUTPUTS] = {0}; - Rules m_rules; - std::shared_ptr m_logger = Nm::loggerGet("FlowScatter"); - - /** Cached mapping of rule branches to UniRec field ids and types. */ - struct CachedBranch { - ur_field_id_t conditionalId = 0; - ur_field_type_t conditionalType = static_cast(0); - std::vector fieldIds; - std::vector fieldTypes; - }; - - std::vector m_cachedBranches; - + void ruleParse(const std::string& rule); + + const size_t M_NUM_OUTPUTS; + uint64_t m_totalRecords = 0; + std::array m_sentRecords = {0}; + Rules m_rules; + std::shared_ptr m_logger = Nm::loggerGet("FlowScatter"); + + /** Cached mapping of rule branches to UniRec field ids and types. */ + struct CachedBranch { + ur_field_id_t conditionalId = 0; + ur_field_type_t conditionalType = static_cast(0); + std::vector fieldIds; + std::vector fieldTypes; + }; + + std::vector m_cachedBranches; }; -} // namespace FlowScatter +} // namespace Fs diff --git a/modules/flowScatter/src/main.cpp b/modules/flowScatter/src/main.cpp index 97416702..13e3c94e 100644 --- a/modules/flowScatter/src/main.cpp +++ b/modules/flowScatter/src/main.cpp @@ -10,11 +10,12 @@ * SPDX-License-Identifier: BSD-3-Clause */ -#include "logger/logger.hpp" #include "flowScatter.hpp" +#include "logger/logger.hpp" #include "unirec/unirec-telemetry.hpp" #include +#include #include #include #include @@ -24,13 +25,13 @@ #include #include #include -#include using namespace Nemea; -std::atomic g_stopFlag(false); +static std::atomic g_stopFlag(false); +constexpr int g_DEFAULT_OUTPUTS = 5; -void signalHandler(int signum) +static void signalHandler(int signum) { Nm::loggerGet("signalHandler")->info("Interrupt signal {} received", signum); g_stopFlag.store(true); @@ -44,26 +45,30 @@ void signalHandler(int signum) * @param inputInterface Input interface for Unirec communication. * @param outputInterfaces Output interfaces for Unirec communication. */ -void handleFormatChange(UnirecInputInterface& inputInterface, - std::vector& outputInterfaces, - Fs::FlowScatter& scatter) +static void handleFormatChange( + UnirecInputInterface& inputInterface, + std::vector& outputInterfaces, + Fs::FlowScatter& scatter) { - inputInterface.changeTemplate(); - uint8_t dataType; const char* spec = nullptr; - if (trap_get_data_fmt(TRAPIFC_INPUT, 0, &dataType, &spec) != TRAP_E_OK) { - throw std::runtime_error("Failed to get updated format from TRAP"); - } + inputInterface.changeTemplate(); + uint8_t dataType; + const char* spec = nullptr; + if (trap_get_data_fmt(TRAPIFC_INPUT, 0, &dataType, &spec) != TRAP_E_OK) { + throw std::runtime_error("Failed to get updated format from TRAP"); + } for (auto& outIfc : outputInterfaces) { outIfc.changeTemplate(spec); } - // Notify scatter so it can resolve UniRec field ids/types once per format change. - try { - scatter.changeTemplate(); - } catch (const std::exception& ex) { - Nm::loggerGet("main")->warn("FlowScatter: unable to resolve fields after format change: {}", ex.what()); - // Don't rethrow — module can continue and will fallback to round-robin until - // fields are available. - } + // Notify scatter so it can resolve UniRec field ids/types once per format change. + try { + scatter.changeTemplate(); + } catch (const std::exception& ex) { + Nm::loggerGet("main")->warn( + "FlowScatter: unable to resolve fields after format change: {}", + ex.what()); + // Don't rethrow — module can continue and will fallback to round-robin until + // fields are available. + } } /** @@ -77,15 +82,16 @@ void handleFormatChange(UnirecInputInterface& inputInterface, * @param scatter Sampler class for sampling. */ -void processNextRecord(UnirecInputInterface& inputInterface, - std::vector& outputInterfaces, - Fs::FlowScatter& scatter) +static void processNextRecord( + UnirecInputInterface& inputInterface, + std::vector& outputInterfaces, + Fs::FlowScatter& scatter) { std::optional unirecRecord = inputInterface.receive(); if (!unirecRecord) { return; } - size_t index = scatter.outputIndex(*unirecRecord); + size_t const index = scatter.outputIndex(*unirecRecord); outputInterfaces[index].send(*unirecRecord); } @@ -100,9 +106,10 @@ void processNextRecord(UnirecInputInterface& inputInterface, * @param outputInterfaces Output interfaces for Unirec communication. * @param scatter Sampler class for sampling. */ -void processUnirecRecords(UnirecInputInterface& inputInterface, - std::vector& outputInterfaces, - Fs::FlowScatter& scatter) +static void processUnirecRecords( + UnirecInputInterface& inputInterface, + std::vector& outputInterfaces, + Fs::FlowScatter& scatter) { while (!g_stopFlag.load()) { try { @@ -117,7 +124,7 @@ void processUnirecRecords(UnirecInputInterface& inputInterface, } } -telemetry::Content getScatterTelemetry(const Fs::FlowScatter& scatter) +static telemetry::Content getScatterTelemetry(const Fs::FlowScatter& scatter) { auto stats = scatter.getStats(); @@ -139,14 +146,13 @@ int main(int argc, char** argv) try { program.add_argument("-r", "--rule") .required() - .help( - "Specify the rule set.") + .help("Specify the rule set.") .default_value(std::string("<>:(SRC_IP)")); program.add_argument("-c", "--count") .required() .help("Specify the number of output interfaces.") .scan<'i', int>() - .default_value(5); + .default_value(g_DEFAULT_OUTPUTS); program.add_argument("-m", "--appfs-mountpoint") .required() .help("path where the appFs directory will be mounted") @@ -165,9 +171,10 @@ int main(int argc, char** argv) size_t outputCount = 0; try { outputCount = static_cast(program.get("--count")); - if (outputCount < 1 || outputCount > Fs::MAX_OUTPUTS) { - throw std::runtime_error("Invalid number of output interfaces: " + std::to_string(outputCount) - + ". Must be in range 1 to " + std::to_string(Fs::MAX_OUTPUTS)); + if (outputCount < 1 || outputCount > Fs::g_MAX_OUTPUTS) { + throw std::runtime_error( + "Invalid number of output interfaces: " + std::to_string(outputCount) + + ". Must be in range 1 to " + std::to_string(Fs::g_MAX_OUTPUTS)); } } catch (const std::exception& ex) { logger->error("Error parsing output count: {}", ex.what()); @@ -197,9 +204,10 @@ int main(int argc, char** argv) } try { - const std::string rule = program.get("--rule"); + const auto rule = program.get("--rule"); - Unirec unirec({1, static_cast(outputCount), "flowscatter", "Unirec flow scatter module"}); + Unirec unirec( + {1, static_cast(outputCount), "flowscatter", "Unirec flow scatter module"}); try { unirec.init(argc, argv);