diff --git a/flake.nix b/flake.nix index 9093a69..437b362 100644 --- a/flake.nix +++ b/flake.nix @@ -28,6 +28,8 @@ pkgs.fuse3 pkgs.rpm pkgs.clang-tools + pkgs.editorconfig-checker + pkgs.doxygen pkgs.libmaxminddb netmonpkgs.packages.x86_64-linux.nemea-modules-meta netmonpkgs.packages.x86_64-linux.nemea-framework diff --git a/modules/CMakeLists.txt b/modules/CMakeLists.txt index 79a3e5e..386ae6f 100644 --- a/modules/CMakeLists.txt +++ b/modules/CMakeLists.txt @@ -1,6 +1,7 @@ -# add_subdirectory(listDetector) -# add_subdirectory(sampler) -# add_subdirectory(telemetry) -# add_subdirectory(deduplicator) -# add_subdirectory(clickhouse) +add_subdirectory(listDetector) +add_subdirectory(sampler) +add_subdirectory(telemetry) +add_subdirectory(deduplicator) +add_subdirectory(clickhouse) +add_subdirectory(flowScatter) add_subdirectory(fieldClassifier) diff --git a/modules/flowScatter/CMakeLists.txt b/modules/flowScatter/CMakeLists.txt new file mode 100644 index 0000000..febd4f0 --- /dev/null +++ b/modules/flowScatter/CMakeLists.txt @@ -0,0 +1 @@ +add_subdirectory(src) diff --git a/modules/flowScatter/README.md b/modules/flowScatter/README.md new file mode 100644 index 0000000..c150ae2 --- /dev/null +++ b/modules/flowScatter/README.md @@ -0,0 +1,31 @@ +# flowScatter module - README + +## Description +The module performs distributing of 1 unirec interface data to `n` trap outputs + +## Interfaces +- Input: 1 +- Output: `n` + +## Parameters +### Common TRAP parameters +- `-h [trap,1]` Print help message for this module / for libtrap specific parameters. +- `-i IFC_SPEC` Specification of interface types and their parameters. +- `-v` Be verbose. +- `-vv` Be more verbose. +- `-vvv` Be even more verbose. + +### Module specific parameters +- `-m, --appfs-mountpoint ` Path where the appFs directory will be mounted +- `-r, --rule ` Decide, what fields are used to create a hash. Might be in form of rule. +- `-c, --count ` Number of output interfaces. + +## Usage Examples +``` +$ flowscatter -i u:trap_in,u:trap_out -r "SRC_IP,SRC_PORT" # according SRC_IP+SRC_PORT + +$ flowscatter -i u:trap_in,u:trap_out -r "TAG_SRC_IP:(SRC_IP)|TAG_DST_IP:(DST_IP)" # if TAG_SRC_IP is not 0, then SRC_IP else if TAG_DST_IP is no 0, then DST_IP + +$ flowscatter -i u:trap_in,u:trap_out -r "TAG_SRC_IP:(SRC_IP,SRC_PORT)|TAG_DST_IP:(DST_IP,DST_PORT)" # if TAG_SRC_IP is not 0, then SRC_IP+SRC_PORT, else if TAG_DST_IP is not 0, then DST_IP+DST_PORT + +``` diff --git a/modules/flowScatter/src/CMakeLists.txt b/modules/flowScatter/src/CMakeLists.txt new file mode 100644 index 0000000..55bb612 --- /dev/null +++ b/modules/flowScatter/src/CMakeLists.txt @@ -0,0 +1,17 @@ +add_executable(flowscatter + main.cpp + flowScatter.cpp +) + +target_link_libraries(flowscatter PRIVATE + telemetry::telemetry + telemetry::appFs + common + unirec::unirec++ + unirec::unirec + trap::trap + argparse + xxhash +) + +install(TARGETS flowscatter DESTINATION ${INSTALL_DIR_BIN}) diff --git a/modules/flowScatter/src/flowScatter.cpp b/modules/flowScatter/src/flowScatter.cpp new file mode 100644 index 0000000..5f9c2a8 --- /dev/null +++ b/modules/flowScatter/src/flowScatter.cpp @@ -0,0 +1,410 @@ +/** + * @file + * @author Jaroslav Pesek + * @brief Implementation of the FlowScatter class. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "flowScatter.hpp" +#include +#include + +namespace { + +using namespace Nemea; + +/** + * @brief Appends the binary representation of a value to a byte vector. + * + * @tparam T The type of the value to be converted to bytes + * @param vec Reference to the vector where bytes will be appended + * @param value The value whose binary representation will be appended + */ +template +void appendBytes(std::vector& vec, const T& value) +{ + const auto* bytes = reinterpret_cast(&value); + vec.insert(vec.end(), bytes, bytes + sizeof(T)); +} + +/** + * @brief Trims whitespace characters from the beginning and end of a string. + * + * @param str The input string to be trimmed + * @return A new string with leading and trailing whitespace removed. + */ +std::string trim(const std::string& str) +{ + const char* whiteSpace = " \t\n\r"; + const auto begin = str.find_first_not_of(whiteSpace); + if (begin == std::string::npos) { + return ""; + } + const auto end = str.find_last_not_of(whiteSpace); + return str.substr(begin, end - begin + 1); +} + +/** + * @brief Checks if a field value in a UniRec record is non-zero. + * + * This function examines a field in a UniRec record and determines whether + * its value is non-zero. The function supports numeric types only.. + * + * @param fieldId The identifier of the field to check + * @param fieldType The type of the field + * @param record The UniRec record containing the field to examine + * + * @return true if the field value is non-zero, false if it equals zero + * + * @throws std::runtime_error if the field type is not supported + */ +bool checkNonZeroValue( + ur_field_id_t fieldId, + ur_field_type_t fieldType, + const UnirecRecordView& record) +{ + switch (fieldType) { + case UR_TYPE_UINT64: + return (record.getFieldAsType(fieldId) != 0); + case UR_TYPE_INT64: + return (record.getFieldAsType(fieldId) != 0); + case UR_TYPE_UINT32: + return (record.getFieldAsType(fieldId) != 0); + case UR_TYPE_INT32: + return (record.getFieldAsType(fieldId) != 0); + case UR_TYPE_UINT16: + return (record.getFieldAsType(fieldId) != 0); + case UR_TYPE_INT16: + return (record.getFieldAsType(fieldId) != 0); + case UR_TYPE_UINT8: + return (record.getFieldAsType(fieldId) != 0); + case UR_TYPE_INT8: + return (record.getFieldAsType(fieldId) != 0); + case UR_TYPE_FLOAT: + return (record.getFieldAsType(fieldId) != 0.0F); + case UR_TYPE_DOUBLE: + return (record.getFieldAsType(fieldId) != 0.0); + default: + throw std::runtime_error( + "Unsupported conditional field type: " + std::string(ur_get_name(fieldId))); + } +} + +/** + * @brief Appends a field value from a UniRec record to a hash input vector. + * + * This function extracts a field value from a UniRec record based on its type + * and appends its binary representation to the provided hash input vector. + * + * @param hashInput Reference to the vector where the field's binary data will be appended + * @param fieldId The identifier of the field to extract + * @param fieldType The type of the field + * @param record The UniRec record containing the field + * + * @throws std::runtime_error if the field type is not supported for hashing + */ +void appendFieldToHash( + std::vector& hashInput, + ur_field_id_t fieldId, + ur_field_type_t fieldType, + const UnirecRecordView& record) +{ + switch (fieldType) { + case UR_TYPE_UINT64: { + auto value = record.getFieldAsType(fieldId); + appendBytes(hashInput, value); + break; + } + case UR_TYPE_INT64: { + auto value = record.getFieldAsType(fieldId); + appendBytes(hashInput, value); + break; + } + case UR_TYPE_UINT32: { + auto value = record.getFieldAsType(fieldId); + appendBytes(hashInput, value); + break; + } + case UR_TYPE_INT32: { + auto value = record.getFieldAsType(fieldId); + appendBytes(hashInput, value); + break; + } + case UR_TYPE_UINT16: { + auto value = record.getFieldAsType(fieldId); + appendBytes(hashInput, value); + break; + } + case UR_TYPE_INT16: { + auto value = record.getFieldAsType(fieldId); + appendBytes(hashInput, value); + break; + } + case UR_TYPE_UINT8: { + auto value = record.getFieldAsType(fieldId); + appendBytes(hashInput, value); + break; + } + case UR_TYPE_INT8: { + auto value = record.getFieldAsType(fieldId); + appendBytes(hashInput, value); + break; + } + case UR_TYPE_IP: { + auto value = record.getFieldAsType(fieldId); + appendBytes(hashInput, value); + break; + } + case UR_TYPE_MAC: { + auto value = record.getFieldAsType(fieldId); + appendBytes(hashInput, value); + break; + } + default: + throw std::runtime_error( + "Unsupported field type for hashing: " + std::string(ur_get_name(fieldId))); + } +} +} // namespace + +namespace Fs { + +FlowScatter::FlowScatter(size_t numOutputs, std::string rule) + : M_NUM_OUTPUTS(numOutputs) +{ + if (numOutputs > g_MAX_OUTPUTS) { + throw std::invalid_argument( + "Number of outputs must be between 1 and " + std::to_string(g_MAX_OUTPUTS)); + } + m_logger->info("Initializing FlowScatter with {} outputs", numOutputs); + m_logger->info("Rule string: '{}'", rule); + ruleParse(rule); + // Resolve fields for the current UniRec template at construction time. + try { + changeTemplate(); + } catch (const std::exception& ex) { + m_logger->warn("Unable to fully resolve rule fields at construction: {}", ex.what()); + // We don't fail here. Fields will be resolved when template is available + // (e.g., on first format change notification). + } + m_logger->info("FlowScatter initialization completed successfully"); +} + +/** + * @brief Split a comma-separated list into trimmed tokens. + */ +static std::vector splitCommaSeparated(std::string_view input) +{ + std::vector tokens; + std::size_t start = 0; + while (start < input.size()) { + const std::size_t comma = input.find(',', start); + const std::string_view piece = (comma == std::string::npos) + ? input.substr(start) + : input.substr(start, comma - start); + std::string trimmed = trim(std::string(piece)); + if (!trimmed.empty()) { + tokens.emplace_back(std::move(trimmed)); + } else { + // Keep behavior consistent with original: empty field is invalid + throw std::invalid_argument("Empty field in tuple"); + } + if (comma == std::string::npos) { + break; + } + start = comma + 1; + } + return tokens; +} + +/** + * @brief Parse a single branch specification like ":(f1,f2,...)". + */ +static Fs::RuleBranch parseBranchString(const std::string& rawBranch) +{ + const std::string branchStr = trim(rawBranch); + if (branchStr.empty()) { + throw std::invalid_argument("Empty branch specification"); + } + if (branchStr.front() != '<') { + throw std::invalid_argument("Rule conditional branch must start with '<': " + branchStr); + } + + const std::size_t gtPos = branchStr.find('>'); + if (gtPos == std::string::npos) { + throw std::invalid_argument("Rule conditional branch must end with '>': " + branchStr); + } + + Fs::RuleBranch branch; + + const std::string conditional = trim(branchStr.substr(1, gtPos - 1)); + branch.conditionalFieldId = conditional; + + const std::size_t colonPos = branchStr.find(':', gtPos + 1); + if (colonPos == std::string::npos) { + throw std::invalid_argument("Missing ':' after in branch: " + branchStr); + } + + const std::size_t lparPos = branchStr.find('(', colonPos + 1); + const std::size_t rparPos = branchStr.find(')', lparPos + 1); + if (lparPos == std::string::npos || rparPos == std::string::npos || rparPos < lparPos) { + throw std::invalid_argument("Malformed field tuple in branch: " + branchStr); + } + + const std::string tuple = branchStr.substr(lparPos + 1, rparPos - lparPos - 1); + branch.fieldNames = splitCommaSeparated(tuple); + + if (branch.fieldNames.empty()) { + throw std::invalid_argument("Tuple must contain at least one field: " + branchStr); + } + + return branch; +} + +void FlowScatter::ruleParse(const std::string& rule) +{ + m_rules.branches.clear(); + m_cachedBranches.clear(); + + if (rule.empty()) { + return; // No rules defined, nothing to parse + } + + std::istringstream ruleStream(rule); + std::string branchStr; + + // Split by '|' + while (std::getline(ruleStream, branchStr, '|')) { + const std::string trimmed = trim(branchStr); + if (trimmed.empty()) { + continue; + } + // Delegate parsing of a single branch + m_rules.branches.emplace_back(parseBranchString(trimmed)); + } + + m_logger->info("Parsed {} rule branches", m_rules.branches.size()); + for (std::size_t i = 0; i < m_rules.branches.size(); ++i) { + const auto& branch = m_rules.branches[i]; + std::string fieldsStr; + for (std::size_t j = 0; j < branch.fieldNames.size(); ++j) { + if (j > 0) { + fieldsStr += ","; + } + fieldsStr += branch.fieldNames[j]; + } + m_logger->info( + "Rule {}: conditional '{}' -> fields: ({})", + i + 1, + branch.conditionalFieldId, + fieldsStr); + } +} + +void FlowScatter::changeTemplate() +{ + m_cachedBranches.clear(); + + for (const auto& branch : m_rules.branches) { + CachedBranch cachedBranch; + + if (!branch.conditionalFieldId.empty()) { + int const fieldId = ur_get_id_by_name(branch.conditionalFieldId.c_str()); + if (fieldId < 0) { + throw std::runtime_error( + "Conditional field not found: " + branch.conditionalFieldId); + } + cachedBranch.conditionalId = static_cast(fieldId); + cachedBranch.conditionalType = ur_get_type(cachedBranch.conditionalId); + } + + cachedBranch.fieldIds.reserve(branch.fieldNames.size()); + cachedBranch.fieldTypes.reserve(branch.fieldNames.size()); + + for (const auto& fname : branch.fieldNames) { + int const fieldId = ur_get_id_by_name(fname.c_str()); + if (fieldId < 0) { + throw std::runtime_error("Field for hashing not found in template: " + fname); + } + auto fid = static_cast(fieldId); + cachedBranch.fieldIds.push_back(fid); + cachedBranch.fieldTypes.push_back(ur_get_type(fid)); + } + + m_cachedBranches.push_back(std::move(cachedBranch)); + } + + m_logger->info( + "Resolved {} cached rule branches for current UniRec template", + m_cachedBranches.size()); +} + +size_t FlowScatter::outputIndex(const UnirecRecordView& record) +{ + m_totalRecords++; + + // If no rules are defined, distribute records round-robin + if (m_rules.branches.empty()) { + size_t const index = (m_totalRecords - 1) % M_NUM_OUTPUTS; + m_sentRecords[index]++; + return index; + } + + std::vector hashInput; + + // Iterate through cached conditional rules + for (const auto& cachedBranch : m_cachedBranches) { + bool useThisRule = false; + + if (cachedBranch.conditionalId == 0) { + useThisRule = true; + } else { + useThisRule = checkNonZeroValue( + cachedBranch.conditionalId, + cachedBranch.conditionalType, + record); + } + + if (useThisRule) { + hashInput.clear(); + for (size_t i = 0; i < cachedBranch.fieldIds.size(); ++i) { + appendFieldToHash( + hashInput, + cachedBranch.fieldIds[i], + cachedBranch.fieldTypes[i], + record); + } + break; + } + } + + // Fallback: if no rule matched, use round-robin + if (hashInput.empty()) { + size_t const index = (m_totalRecords - 1) % M_NUM_OUTPUTS; + m_sentRecords[index]++; + return index; + } + // NOLINTBEGIN(readability-magic-numbers) + auto hashValue = XXH64(hashInput.data(), hashInput.size(), 0xdeadd00de); + // NOLINTEND(readability-magic-numbers) + auto index = hashValue % M_NUM_OUTPUTS; + + m_sentRecords[index]++; + + return index; +} + +FlowScatterStats FlowScatter::getStats() const noexcept +{ + FlowScatterStats stats; + stats.totalRecords = m_totalRecords; + + for (size_t i = 0; i < M_NUM_OUTPUTS; ++i) { + stats.sentRecords[i] = m_sentRecords[i]; + } + + return stats; +} + +} // namespace Fs diff --git a/modules/flowScatter/src/flowScatter.hpp b/modules/flowScatter/src/flowScatter.hpp new file mode 100644 index 0000000..65fdf50 --- /dev/null +++ b/modules/flowScatter/src/flowScatter.hpp @@ -0,0 +1,129 @@ +/** + * @file + * @author Jaroslav Pesek + * @brief Declaration of the FlowScatter class. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace Fs { + +using namespace Nemea; + +constexpr size_t g_MAX_OUTPUTS = 128; ///< Maximum number of outputs supported by the module. + +/** + * @brief Structure to hold flow scatter statistics. + * + * This structure is used to keep track of the total number of records processed + * and the number of records sent to each output. + */ +struct FlowScatterStats { + uint64_t totalRecords = 0; ///< Total number of records processed. + std::array sentRecords + = {0}; ///< Number of records sent to each output. +}; + +/** + * @brief Represents a single branch of a rule. + */ +struct RuleBranch { + std::string conditionalFieldId + = std::string(""); ///< Name of the conditional field, empty string if unconditional. + std::vector fieldNames; ///< Names of fields to be hashed. +}; + +/** + * @brief Structure to hold rules for field selection and hashing. + * + * This structure defines how fields are selected for hashing based on conditions. + * It contains a list of conditional fields and a list of fields that will be used + * for hashing based on the conditions defined in the conditional fields. + */ +struct Rules { + std::vector branches; ///< Branches of the rule. +}; + +/** + * @brief A class for distributing network flow records across multiple outputs based on + * configurable hashing rules. + * + * The FlowScatter class implements a load balancing mechanism that distributes Unirec records + * to multiple output interfaces based on hash values computed from specified record fields. + * It supports conditional field selection using tags and maintains statistics about the + * distribution of records across outputs. + * + * The class uses configurable rules to determine which fields to hash and supports + * conditional logic based on field values. Hash distribution ensures balanced load + * across all configured outputs while maintaining session affinity for records + * with identical hash keys. + * + * @note Maximum number of outputs is limited by MAX_OUTPUTS constant. + */ +class FlowScatter { +public: + /** + * @brief Constructs a FlowScatter object with the given number of outputs and rule. + * @param numOutputs The number of outputs to which records can be sent. + * @param rule The rule defining how to create a hash from the record fields. + */ + explicit FlowScatter(size_t numOutputs, std::string rule); + + /** + * @brief Processes a Unirec record and returns the index of the output to which it should be + * sent. + * @param record The Unirec record to be processed. + * @return The index of the output to which the record should be sent. + */ + size_t outputIndex(const UnirecRecordView& record); + + /** + * @brief Re-resolve field ids/types after a UniRec template/format change. + * + * Call this when the UniRec template changes (format change). This will + * resolve all field names from the parsed rules to UniRec field ids and + * cache their types so `outputIndex` does not need to look them up per-record. + * + * This method may throw if any required field is not present in the + * current UniRec template. + */ + void changeTemplate(); + + /** + * @brief Returns the current flow scatter statistics. + * @return The current flow scatter statistics. + */ + FlowScatterStats getStats() const noexcept; + +private: + void ruleParse(const std::string& rule); + + const size_t M_NUM_OUTPUTS; + uint64_t m_totalRecords = 0; + std::array m_sentRecords = {0}; + Rules m_rules; + std::shared_ptr m_logger = Nm::loggerGet("FlowScatter"); + + /** Cached mapping of rule branches to UniRec field ids and types. */ + struct CachedBranch { + ur_field_id_t conditionalId = 0; + ur_field_type_t conditionalType = static_cast(0); + std::vector fieldIds; + std::vector fieldTypes; + }; + + std::vector m_cachedBranches; +}; + +} // namespace Fs diff --git a/modules/flowScatter/src/main.cpp b/modules/flowScatter/src/main.cpp new file mode 100644 index 0000000..13e3c94 --- /dev/null +++ b/modules/flowScatter/src/main.cpp @@ -0,0 +1,247 @@ +/** + * @file + * @author Jaroslav Pesek + * @author Karel Hynek + * @author Pavel Siska + * @brief Sampling Module: Sample flowdata + * + * This module distributes 1 unirec interface data to `n` trap outputs based on rules. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +#include "flowScatter.hpp" +#include "logger/logger.hpp" +#include "unirec/unirec-telemetry.hpp" +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace Nemea; + +static std::atomic g_stopFlag(false); +constexpr int g_DEFAULT_OUTPUTS = 5; + +static void signalHandler(int signum) +{ + Nm::loggerGet("signalHandler")->info("Interrupt signal {} received", signum); + g_stopFlag.store(true); +} + +/** + * @brief Handle a format change exception by adjusting the template. + * + * This function is called when a `FormatChangeException` is caught in the main loop. + * + * @param inputInterface Input interface for Unirec communication. + * @param outputInterfaces Output interfaces for Unirec communication. + */ +static void handleFormatChange( + UnirecInputInterface& inputInterface, + std::vector& outputInterfaces, + Fs::FlowScatter& scatter) +{ + inputInterface.changeTemplate(); + uint8_t dataType; + const char* spec = nullptr; + if (trap_get_data_fmt(TRAPIFC_INPUT, 0, &dataType, &spec) != TRAP_E_OK) { + throw std::runtime_error("Failed to get updated format from TRAP"); + } + for (auto& outIfc : outputInterfaces) { + outIfc.changeTemplate(spec); + } + // Notify scatter so it can resolve UniRec field ids/types once per format change. + try { + scatter.changeTemplate(); + } catch (const std::exception& ex) { + Nm::loggerGet("main")->warn( + "FlowScatter: unable to resolve fields after format change: {}", + ex.what()); + // Don't rethrow — module can continue and will fallback to round-robin until + // fields are available. + } +} + +/** + * @brief Process the next Unirec record and sample them. + * + * This function receives the next Unirec record through the bidirectional interface + * and performs sampling. + * + * @param inputInterface Input interface for Unirec communication. + * @param outputInterfaces Output interfaces for Unirec communication. + * @param scatter Sampler class for sampling. + */ + +static void processNextRecord( + UnirecInputInterface& inputInterface, + std::vector& outputInterfaces, + Fs::FlowScatter& scatter) +{ + std::optional unirecRecord = inputInterface.receive(); + if (!unirecRecord) { + return; + } + size_t const index = scatter.outputIndex(*unirecRecord); + outputInterfaces[index].send(*unirecRecord); +} + +/** + * @brief Process Unirec records. + * + * The `processUnirecRecords` function continuously receives Unirec records through the provided + * bidirectional interface (`biInterface`) and performs sampling. The loop runs indefinitely until + * an end-of-file condition is encountered. + * + * @param inputInterface Input interface for Unirec communication. + * @param outputInterfaces Output interfaces for Unirec communication. + * @param scatter Sampler class for sampling. + */ +static void processUnirecRecords( + UnirecInputInterface& inputInterface, + std::vector& outputInterfaces, + Fs::FlowScatter& scatter) +{ + while (!g_stopFlag.load()) { + try { + processNextRecord(inputInterface, outputInterfaces, scatter); + } catch (FormatChangeException& ex) { + handleFormatChange(inputInterface, outputInterfaces, scatter); + } catch (EoFException& ex) { + break; + } catch (std::exception& ex) { + throw; + } + } +} + +static telemetry::Content getScatterTelemetry(const Fs::FlowScatter& scatter) +{ + auto stats = scatter.getStats(); + + telemetry::Dict dict; + dict["totalRecords"] = stats.totalRecords; + // dict["sampledRecords"] = stats.sampledRecords; + return dict; +} + +int main(int argc, char** argv) +{ + argparse::ArgumentParser program("Unirec Flow Scatter"); + + Nm::loggerInit(); + auto logger = Nm::loggerGet("main"); + + signal(SIGINT, signalHandler); + + try { + program.add_argument("-r", "--rule") + .required() + .help("Specify the rule set.") + .default_value(std::string("<>:(SRC_IP)")); + program.add_argument("-c", "--count") + .required() + .help("Specify the number of output interfaces.") + .scan<'i', int>() + .default_value(g_DEFAULT_OUTPUTS); + program.add_argument("-m", "--appfs-mountpoint") + .required() + .help("path where the appFs directory will be mounted") + .default_value(std::string("")); + } catch (const std::exception& ex) { + logger->error(ex.what()); + return EXIT_FAILURE; + } + try { + program.parse_known_args(argc, argv); + } catch (const std::exception& ex) { + logger->error(ex.what()); + return EXIT_FAILURE; + } + + size_t outputCount = 0; + try { + outputCount = static_cast(program.get("--count")); + if (outputCount < 1 || outputCount > Fs::g_MAX_OUTPUTS) { + throw std::runtime_error( + "Invalid number of output interfaces: " + std::to_string(outputCount) + + ". Must be in range 1 to " + std::to_string(Fs::g_MAX_OUTPUTS)); + } + } catch (const std::exception& ex) { + logger->error("Error parsing output count: {}", ex.what()); + return EXIT_FAILURE; + } + + std::shared_ptr telemetryRootDirectory; + telemetryRootDirectory = telemetry::Directory::create(); + + std::unique_ptr appFs; + + try { + auto mountPoint = program.get("--appfs-mountpoint"); + if (!mountPoint.empty()) { + const bool tryToUnmountOnStart = true; + const bool createMountPoint = true; + appFs = std::make_unique( + telemetryRootDirectory, + mountPoint, + tryToUnmountOnStart, + createMountPoint); + appFs->start(); + } + } catch (std::exception& ex) { + logger->error(ex.what()); + return EXIT_FAILURE; + } + + try { + const auto rule = program.get("--rule"); + + Unirec unirec( + {1, static_cast(outputCount), "flowscatter", "Unirec flow scatter module"}); + + try { + unirec.init(argc, argv); + } catch (HelpException& ex) { + std::cerr << program; + return EXIT_SUCCESS; + } catch (std::exception& ex) { + logger->error(ex.what()); + return EXIT_FAILURE; + } + + UnirecInputInterface inputInterface = unirec.buildInputInterface(); + std::vector outputInterfaces; + outputInterfaces.reserve(outputCount); + + for (size_t i = 0; i < outputCount; ++i) { + outputInterfaces.emplace_back(unirec.buildOutputInterface()); + } + + Fs::FlowScatter scatter(outputCount, rule); + + auto telemetryInputDirectory = telemetryRootDirectory->addDir("input"); + const telemetry::FileOps inputFileOps + = {[&inputInterface]() { return Nm::getInterfaceTelemetry(inputInterface); }, nullptr}; + const auto inputFile = telemetryInputDirectory->addFile("stats", inputFileOps); + auto telemetryScatterDirectory = telemetryRootDirectory->addDir("flowscatter"); + const telemetry::FileOps samplerFileOps + = {[&scatter]() { return getScatterTelemetry(scatter); }, nullptr}; + const auto samplerFile = telemetryScatterDirectory->addFile("stats", samplerFileOps); + processUnirecRecords(inputInterface, outputInterfaces, scatter); + } catch (std::exception& ex) { + logger->error(ex.what()); + return EXIT_FAILURE; + } + + return EXIT_SUCCESS; +}