From ca9b2476ae7d638ebf6bbd5830c201e49856763a Mon Sep 17 00:00:00 2001 From: AliSot2000 Date: Mon, 24 Mar 2025 17:57:56 +0100 Subject: [PATCH 1/9] + Added ConfigModel for ease of use --- python/config_model.py | 159 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 159 insertions(+) create mode 100644 python/config_model.py diff --git a/python/config_model.py b/python/config_model.py new file mode 100644 index 0000000..dd282d3 --- /dev/null +++ b/python/config_model.py @@ -0,0 +1,159 @@ +from pydantic import BaseModel, Field +from pydantic.networks import IPvAnyAddress +from textwrap import dedent + +# TODO replace unknown +class BackendBase(BaseModel): + enabled: bool = Field( + False, + description="Whether the backend is enabled") + + max_timeout: int = Field( + 1000, + gt=0, + description="The maximum time to wait for a request in milliseconds") + + +class DirectConfigModel(BaseModel): + # TODO: ipv6 ok as well? + host: IPvAnyAddress = Field( + "127.0.0.1", + description="The host to connect to given as an ipv4") + port: int = Field( + 10000, + gt=0, + description="Port to use for connection") + # INFO: timeout is not needed because direct communication wasn't tested with collectives. + + +class RedisConfigModel(DirectConfigModel): + timeout: int = Field( + gt=0, + default=1, + description="Time to wait between polling attempts in milliseconds.") + + +class S3ConfigModel(BackendBase): + timeout: int = Field( + gt=0, + default=1, + description="Time to wait between polling attempts in milliseconds.") + bucket_name: str = Field( + "romanboe-uploadtest", + description="Name of storage bucket to use" + ) + s3_region: str = Field( + "eu-central-1", + description="AWS region to use" + ) + + +class BackendConfig(BaseModel): + S3: S3ConfigModel + Redis: RedisConfigModel + Direct: DirectConfigModel + + +class FaaSModel(BaseModel): + gib_second_price: float = Field( + 0.0000166667, + description=dedent(""" + Price per second for a function with one GiB RAM. We assume + that the prices scale linearly with the amount of RAM (as it is common + today for the major providers), but other cost structures could easily + be incorporated. + """) + ) + + +class S3Model(BaseModel): + bandwidth: float = Field( + 50.0, + description="TODO" + ) + overhead: float = Field( + 40.4, + description="TODO" + ) + transfer_price: float = Field( + 0.0, + description="Data transfer price (per GB) between a function and object storage" + ) + download_price: float = Field( + 0.00000043, + description="Price per 1000 GET request on S3 Object Storage" + ) + upload_price: float = Field( + 0.0000054, + description="Price per 1000 PUT request on S3 Object Storage" + ) + + +class RedisModel(BaseModel): + bandwidth_single: float = Field( + 100.0, + description="TODO" + ) + bandwidth_multiple: float = Field( + 400.0, + description="TODO" + ) + overhead: float = Field( + 5.2, + description="TODO" + ) + transfer_price: float = Field( + 0.0, + description="Data transfer price (per GB) between a function and redis storage (=0)." + ) + instance_price: float = Field( + 0.0038, + description="TODO" + ) + requests_per_hour: int = Field( + 1000, + description="TODO" + ) + include_infrastructure_costs: bool = Field( + True, + description="TODO" + ) + + +class DirectModel(BaseModel): + bandwidth: float = Field( + 400.0, + description="TODO" + ) + overhead: float = Field( + 0.34, + description="TODO" + ) + transfer_price: float = Field( + 0.0, + description="Data transfer price (per GB) between a function and another function (=0)." + ) + vm_price: float = Field( + 0.0134, + description="The price per hour for a VM that hosts the hole punching server." + ) + requests_per_hour: int = Field( + 1000, + description="TODO" + ) + include_infrastructure_costs: bool = Field( + True, + description="TODO" + ) + + +class ModelConfig(BaseModel): + FaaS: FaaSModel + S3: S3Model + Redis: RedisModel + Direct: DirectModel + + +class FMIConfig(BaseModel): + backends: BackendConfig + models: ModelConfig \ No newline at end of file From d11f2333bf290c50dbe52c4a52d2cf3ccea1b37e Mon Sep 17 00:00:00 2001 From: AliSot2000 Date: Wed, 26 Mar 2025 16:40:42 +0100 Subject: [PATCH 2/9] + Added new implementation for Minio --- include/comm/Minio.h | 49 +++++++++++++++++++ src/comm/Channel.cpp | 3 ++ src/comm/Minio.cpp | 114 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 166 insertions(+) create mode 100644 include/comm/Minio.h create mode 100644 src/comm/Minio.cpp diff --git a/include/comm/Minio.h b/include/comm/Minio.h new file mode 100644 index 0000000..9eaf38b --- /dev/null +++ b/include/comm/Minio.h @@ -0,0 +1,49 @@ +#ifndef FMI_S3_H +#define FMI_S3_H + +#include "ClientServer.h" +#include +#include +#include +#include +#include + +namespace FMI::Comm { + //! Channel that uses AWS S3 as backend and uses the AWS SDK for C++ to access S3. + class Minio : public ClientServer { + public: + explicit Minio(std::map params, std::map model_params); + + ~Minio(); + + void upload_object(channel_data buf, std::string name) override; + + bool download_object(channel_data buf, std::string name) override; + + void delete_object(std::string name) override; + + std::vector get_object_names() override; + + double get_latency(Utils::peer_num producer, Utils::peer_num consumer, std::size_t size_in_bytes) override; + + double get_price(Utils::peer_num producer, Utils::peer_num consumer, std::size_t size_in_bytes) override; + + private: + std::string bucket_name; + std::unique_ptr> client; + Aws::SDKOptions options; + + //! Only one AWS SDK InitApi is allowed per application, we therefore track the number of instances (for multiple communicators) and call InitApi / ShutdownApi only on the first / last instance. + inline static int instances = 0; + // Model params + double bandwidth; + double overhead; + double transfer_price; + double download_price; + double upload_price; + + }; +} + + +#endif //FMI_S3_H diff --git a/src/comm/Channel.cpp b/src/comm/Channel.cpp index 8dc2ea3..6b17111 100644 --- a/src/comm/Channel.cpp +++ b/src/comm/Channel.cpp @@ -1,5 +1,6 @@ #include "../../include/comm/Channel.h" #include "../../include/comm/S3.h" +#include "../../include/comm/Minio.h" #include "../../include/comm/Redis.h" #include "../../include/comm/Direct.h" @@ -7,6 +8,8 @@ std::shared_ptr FMI::Comm::Channel::get_channel(std::string std::map model_params) { if (name == "S3") { return std::make_shared(params, model_params); + } else if (name == "Minio") { + return std::make_shared(params, model_params); } else if (name == "Redis") { return std::make_shared(params, model_params); } else if (name == "Direct") { diff --git a/src/comm/Minio.cpp b/src/comm/Minio.cpp new file mode 100644 index 0000000..9b4248f --- /dev/null +++ b/src/comm/Minio.cpp @@ -0,0 +1,114 @@ +#include +#include "../../include/comm/S3.h" +#include +#include +#include +#include +#include +#include + +// INFO Not sure if I needed to change that. +char TAG[] = "MinioClient"; + +FMI::Comm::Minio::Minio(std::map params, std::map model_params) : ClientServer(params) { + if (instances == 0) { + // Only one call allowed (https://github.com/aws/aws-sdk-cpp/issues/456), give possible multiple clients control over initialization + Aws::InitAPI(options); + } + instances++; + bucket_name = params["bucket_name"]; + std::string hostname = params["host"]; + auto port = std::stoi(params["port"]); + + Aws::Client::ClientConfiguration config; + config.scheme = Aws::Http::Scheme::HTTP; // MinIO uses HTTP by default unless configured otherwise + config.endpointOverride = hostname + ":" + std::to_string(port); // Set MinIO endpoint + config.verifySSL = false; // MinIO does not use SSL by default + config.region = params["s3_region"]; + config.useVirtualAddressing = false; // MinIO does not support virtual-hosted-style URLs + + + bandwidth = std::stod(model_params["bandwidth"]); + overhead = std::stod(model_params["overhead"]); + transfer_price = std::stod(model_params["transfer_price"]); + download_price = std::stod(model_params["download_price"]); + upload_price = std::stod(model_params["upload_price"]); + + auto credentialsProvider = Aws::MakeShared(TAG); + client = Aws::MakeUnique(TAG, credentialsProvider, config); +} + +FMI::Comm::Minio::~Minio() { + instances--; + if (instances == 0) { + Aws::ShutdownAPI(options); + } +} + +bool FMI::Comm::Minio::download_object(channel_data buf, std::string name) { + Aws::S3::Model::GetObjectRequest request; + request.WithBucket(bucket_name).WithKey(name); + auto outcome = client->GetObject(request); + if (outcome.IsSuccess()) { + auto& s = outcome.GetResult().GetBody(); + s.read(buf.buf, buf.len); + return true; + } else { + return false; + } +} + +void FMI::Comm::Minio::upload_object(channel_data buf, std::string name) { + Aws::S3::Model::PutObjectRequest request; + request.WithBucket(bucket_name).WithKey(name); + + const std::shared_ptr data = Aws::MakeShared(TAG, buf.buf, buf.len); + + request.SetBody(data); + auto outcome = client->PutObject(request); + if (!outcome.IsSuccess()) { + BOOST_LOG_TRIVIAL(error) << "Error when uploading to S3: " << outcome.GetError(); + } +} + +void FMI::Comm::Minio::delete_object(std::string name) { + Aws::S3::Model::DeleteObjectRequest request; + request.WithBucket(bucket_name).WithKey(name); + auto outcome = client->DeleteObject(request); + if (!outcome.IsSuccess()) { + BOOST_LOG_TRIVIAL(error) << "Error when deleting from S3: " << outcome.GetError(); + } +} + +std::vector FMI::Comm::Minio::get_object_names() { + std::vector object_names; + Aws::S3::Model::ListObjectsRequest request; + request.WithBucket(bucket_name); + auto outcome = client->ListObjects(request); + if (outcome.IsSuccess()) { + auto objects = outcome.GetResult().GetContents(); + for (auto& object : objects) { + object_names.push_back(object.GetKey()); + } + } else { + BOOST_LOG_TRIVIAL(error) << "Error when listing objects from S3: " << outcome.GetError(); + } + return object_names; +} + +double FMI::Comm::Minio::get_latency(Utils::peer_num producer, Utils::peer_num consumer, std::size_t size_in_bytes) { + double fixed_overhead = overhead; + double waiting_time = (double) timeout / 2.; + double comm_overhead = fixed_overhead + waiting_time; + double agg_bandwidth = producer * consumer * bandwidth; + double trans_time = producer * consumer * ((double) size_in_bytes / 1000000.) / agg_bandwidth; + return log2(producer + consumer) * comm_overhead + trans_time; +} + +double FMI::Comm::Minio::get_price(Utils::peer_num producer, Utils::peer_num consumer, std::size_t size_in_bytes) { + double upload_costs = producer * upload_price + producer * ((double) size_in_bytes / 1000000000.) * transfer_price; + double expected_polls = (max_timeout / timeout) / 2; + double download_costs = producer * consumer * expected_polls * download_price + producer * consumer * ((double) size_in_bytes / 1000000000.) * transfer_price; + return upload_costs + download_costs; +} + From 107e511d40d74441c47a83b339618e4ebecf881e Mon Sep 17 00:00:00 2001 From: AliSot2000 Date: Wed, 26 Mar 2025 16:40:57 +0100 Subject: [PATCH 3/9] + Added Minio to config models --- python/config_model.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/python/config_model.py b/python/config_model.py index dd282d3..286ed6f 100644 --- a/python/config_model.py +++ b/python/config_model.py @@ -48,10 +48,22 @@ class S3ConfigModel(BackendBase): ) +class MinioConfigModel(S3ConfigModel): + host: IPvAnyAddress = Field( + "127.0.0.1", + description="The host to connect to given as an ipv4" + ) + port: int = Field( + 9000, + gt=0, + description="Port to use for connection" + ) + class BackendConfig(BaseModel): S3: S3ConfigModel Redis: RedisConfigModel Direct: DirectConfigModel + Minio: MinioConfigModel class FaaSModel(BaseModel): @@ -152,6 +164,7 @@ class ModelConfig(BaseModel): S3: S3Model Redis: RedisModel Direct: DirectModel + Minio: S3Model class FMIConfig(BaseModel): From 8bf67e996aef2d4f06bd8c9218cbb07ec0e6187e Mon Sep 17 00:00:00 2001 From: AliSot2000 Date: Wed, 26 Mar 2025 16:47:03 +0100 Subject: [PATCH 4/9] + Bugfix, needed to update ifdef as well --- include/comm/Minio.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/include/comm/Minio.h b/include/comm/Minio.h index 9eaf38b..336306e 100644 --- a/include/comm/Minio.h +++ b/include/comm/Minio.h @@ -1,5 +1,5 @@ -#ifndef FMI_S3_H -#define FMI_S3_H +#ifndef FMI_MINIO_H +#define FMI_MINIO_H #include "ClientServer.h" #include @@ -46,4 +46,4 @@ namespace FMI::Comm { } -#endif //FMI_S3_H +#endif //FMI_MINIO_H From 6f13c10b1a2212ddb7628ac163cb09a0be9919c8 Mon Sep 17 00:00:00 2001 From: AliSot2000 Date: Wed, 26 Mar 2025 17:41:32 +0100 Subject: [PATCH 5/9] + Added Minio.cpp to the CMakeList --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index beda6f2..64d0355 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -19,7 +19,7 @@ find_package(hiredis REQUIRED) add_subdirectory(extern/TCPunch/client/) add_library(FMI STATIC src/Communicator.cpp src/utils/Configuration.cpp src/comm/Channel.cpp src/comm/ClientServer.cpp - src/comm/S3.cpp src/comm/Redis.cpp src/utils/ChannelPolicy.cpp src/comm/PeerToPeer.cpp src/comm/Direct.cpp) + src/comm/S3.cpp src/comm/Minio.cpp src/comm/Redis.cpp src/utils/ChannelPolicy.cpp src/comm/PeerToPeer.cpp src/comm/Direct.cpp) target_include_directories(FMI PRIVATE ${Boost_INCLUDE_DIRS} ${TCPUNCH_INCLUDE_DIRS}) From e5bb9d11e92dfc7a1520863adaaa3a4fa267c926 Mon Sep 17 00:00:00 2001 From: AliSot2000 Date: Wed, 26 Mar 2025 17:41:57 +0100 Subject: [PATCH 6/9] + Bugfix, Need to include Minio.h + Bugfix, Defined different Tag for Minio --- src/comm/Minio.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/comm/Minio.cpp b/src/comm/Minio.cpp index 9b4248f..16ed61c 100644 --- a/src/comm/Minio.cpp +++ b/src/comm/Minio.cpp @@ -1,5 +1,5 @@ #include -#include "../../include/comm/S3.h" +#include "../../include/comm/Minio.h" #include #include #include @@ -8,7 +8,7 @@ #include // INFO Not sure if I needed to change that. -char TAG[] = "MinioClient"; +char MinioTAG[] = "MinioClient"; FMI::Comm::Minio::Minio(std::map params, std::map model_params) : ClientServer(params) { if (instances == 0) { From 1e444f9f0fffc7d39455a9ba358bff980023973e Mon Sep 17 00:00:00 2001 From: AliSot2000 Date: Wed, 26 Mar 2025 17:42:24 +0100 Subject: [PATCH 7/9] + Commented (deprecated?) code --- src/comm/Minio.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/comm/Minio.cpp b/src/comm/Minio.cpp index 16ed61c..f7b0b2b 100644 --- a/src/comm/Minio.cpp +++ b/src/comm/Minio.cpp @@ -25,7 +25,8 @@ FMI::Comm::Minio::Minio(std::map params, std::map Date: Wed, 26 Mar 2025 17:42:31 +0100 Subject: [PATCH 8/9] + Renamed TAG --- src/comm/Minio.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/comm/Minio.cpp b/src/comm/Minio.cpp index f7b0b2b..a7411fa 100644 --- a/src/comm/Minio.cpp +++ b/src/comm/Minio.cpp @@ -35,8 +35,8 @@ FMI::Comm::Minio::Minio(std::map params, std::map(TAG); - client = Aws::MakeUnique(TAG, credentialsProvider, config); + auto credentialsProvider = Aws::MakeShared(MinioTAG); + client = Aws::MakeUnique(MinioTAG, credentialsProvider, config); } FMI::Comm::Minio::~Minio() { @@ -63,7 +63,7 @@ void FMI::Comm::Minio::upload_object(channel_data buf, std::string name) { Aws::S3::Model::PutObjectRequest request; request.WithBucket(bucket_name).WithKey(name); - const std::shared_ptr data = Aws::MakeShared(TAG, buf.buf, buf.len); + const std::shared_ptr data = Aws::MakeShared(MinioTAG, buf.buf, buf.len); request.SetBody(data); auto outcome = client->PutObject(request); From f92385bb5db4461586e31a0a705f772d13e72ee9 Mon Sep 17 00:00:00 2001 From: AliSot2000 Date: Thu, 27 Mar 2025 16:23:54 +0100 Subject: [PATCH 9/9] + Bugfix, added an s where there shouldn't be one --- python/config_model.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/config_model.py b/python/config_model.py index 286ed6f..40b98a3 100644 --- a/python/config_model.py +++ b/python/config_model.py @@ -169,4 +169,4 @@ class ModelConfig(BaseModel): class FMIConfig(BaseModel): backends: BackendConfig - models: ModelConfig \ No newline at end of file + model: ModelConfig \ No newline at end of file