diff --git a/CMakeLists.txt b/CMakeLists.txt index beda6f2..64d0355 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -19,7 +19,7 @@ find_package(hiredis REQUIRED) add_subdirectory(extern/TCPunch/client/) add_library(FMI STATIC src/Communicator.cpp src/utils/Configuration.cpp src/comm/Channel.cpp src/comm/ClientServer.cpp - src/comm/S3.cpp src/comm/Redis.cpp src/utils/ChannelPolicy.cpp src/comm/PeerToPeer.cpp src/comm/Direct.cpp) + src/comm/S3.cpp src/comm/Minio.cpp src/comm/Redis.cpp src/utils/ChannelPolicy.cpp src/comm/PeerToPeer.cpp src/comm/Direct.cpp) target_include_directories(FMI PRIVATE ${Boost_INCLUDE_DIRS} ${TCPUNCH_INCLUDE_DIRS}) diff --git a/include/comm/Minio.h b/include/comm/Minio.h new file mode 100644 index 0000000..336306e --- /dev/null +++ b/include/comm/Minio.h @@ -0,0 +1,49 @@ +#ifndef FMI_MINIO_H +#define FMI_MINIO_H + +#include "ClientServer.h" +#include +#include +#include +#include +#include + +namespace FMI::Comm { + //! Channel that uses AWS S3 as backend and uses the AWS SDK for C++ to access S3. + class Minio : public ClientServer { + public: + explicit Minio(std::map params, std::map model_params); + + ~Minio(); + + void upload_object(channel_data buf, std::string name) override; + + bool download_object(channel_data buf, std::string name) override; + + void delete_object(std::string name) override; + + std::vector get_object_names() override; + + double get_latency(Utils::peer_num producer, Utils::peer_num consumer, std::size_t size_in_bytes) override; + + double get_price(Utils::peer_num producer, Utils::peer_num consumer, std::size_t size_in_bytes) override; + + private: + std::string bucket_name; + std::unique_ptr> client; + Aws::SDKOptions options; + + //! Only one AWS SDK InitApi is allowed per application, we therefore track the number of instances (for multiple communicators) and call InitApi / ShutdownApi only on the first / last instance. + inline static int instances = 0; + // Model params + double bandwidth; + double overhead; + double transfer_price; + double download_price; + double upload_price; + + }; +} + + +#endif //FMI_MINIO_H diff --git a/python/config_model.py b/python/config_model.py new file mode 100644 index 0000000..40b98a3 --- /dev/null +++ b/python/config_model.py @@ -0,0 +1,172 @@ +from pydantic import BaseModel, Field +from pydantic.networks import IPvAnyAddress +from textwrap import dedent + +# TODO replace unknown +class BackendBase(BaseModel): + enabled: bool = Field( + False, + description="Whether the backend is enabled") + + max_timeout: int = Field( + 1000, + gt=0, + description="The maximum time to wait for a request in milliseconds") + + +class DirectConfigModel(BaseModel): + # TODO: ipv6 ok as well? + host: IPvAnyAddress = Field( + "127.0.0.1", + description="The host to connect to given as an ipv4") + port: int = Field( + 10000, + gt=0, + description="Port to use for connection") + # INFO: timeout is not needed because direct communication wasn't tested with collectives. + + +class RedisConfigModel(DirectConfigModel): + timeout: int = Field( + gt=0, + default=1, + description="Time to wait between polling attempts in milliseconds.") + + +class S3ConfigModel(BackendBase): + timeout: int = Field( + gt=0, + default=1, + description="Time to wait between polling attempts in milliseconds.") + bucket_name: str = Field( + "romanboe-uploadtest", + description="Name of storage bucket to use" + ) + s3_region: str = Field( + "eu-central-1", + description="AWS region to use" + ) + + +class MinioConfigModel(S3ConfigModel): + host: IPvAnyAddress = Field( + "127.0.0.1", + description="The host to connect to given as an ipv4" + ) + port: int = Field( + 9000, + gt=0, + description="Port to use for connection" + ) + +class BackendConfig(BaseModel): + S3: S3ConfigModel + Redis: RedisConfigModel + Direct: DirectConfigModel + Minio: MinioConfigModel + + +class FaaSModel(BaseModel): + gib_second_price: float = Field( + 0.0000166667, + description=dedent(""" + Price per second for a function with one GiB RAM. We assume + that the prices scale linearly with the amount of RAM (as it is common + today for the major providers), but other cost structures could easily + be incorporated. + """) + ) + + +class S3Model(BaseModel): + bandwidth: float = Field( + 50.0, + description="TODO" + ) + overhead: float = Field( + 40.4, + description="TODO" + ) + transfer_price: float = Field( + 0.0, + description="Data transfer price (per GB) between a function and object storage" + ) + download_price: float = Field( + 0.00000043, + description="Price per 1000 GET request on S3 Object Storage" + ) + upload_price: float = Field( + 0.0000054, + description="Price per 1000 PUT request on S3 Object Storage" + ) + + +class RedisModel(BaseModel): + bandwidth_single: float = Field( + 100.0, + description="TODO" + ) + bandwidth_multiple: float = Field( + 400.0, + description="TODO" + ) + overhead: float = Field( + 5.2, + description="TODO" + ) + transfer_price: float = Field( + 0.0, + description="Data transfer price (per GB) between a function and redis storage (=0)." + ) + instance_price: float = Field( + 0.0038, + description="TODO" + ) + requests_per_hour: int = Field( + 1000, + description="TODO" + ) + include_infrastructure_costs: bool = Field( + True, + description="TODO" + ) + + +class DirectModel(BaseModel): + bandwidth: float = Field( + 400.0, + description="TODO" + ) + overhead: float = Field( + 0.34, + description="TODO" + ) + transfer_price: float = Field( + 0.0, + description="Data transfer price (per GB) between a function and another function (=0)." + ) + vm_price: float = Field( + 0.0134, + description="The price per hour for a VM that hosts the hole punching server." + ) + requests_per_hour: int = Field( + 1000, + description="TODO" + ) + include_infrastructure_costs: bool = Field( + True, + description="TODO" + ) + + +class ModelConfig(BaseModel): + FaaS: FaaSModel + S3: S3Model + Redis: RedisModel + Direct: DirectModel + Minio: S3Model + + +class FMIConfig(BaseModel): + backends: BackendConfig + model: ModelConfig \ No newline at end of file diff --git a/src/comm/Channel.cpp b/src/comm/Channel.cpp index 8dc2ea3..6b17111 100644 --- a/src/comm/Channel.cpp +++ b/src/comm/Channel.cpp @@ -1,5 +1,6 @@ #include "../../include/comm/Channel.h" #include "../../include/comm/S3.h" +#include "../../include/comm/Minio.h" #include "../../include/comm/Redis.h" #include "../../include/comm/Direct.h" @@ -7,6 +8,8 @@ std::shared_ptr FMI::Comm::Channel::get_channel(std::string std::map model_params) { if (name == "S3") { return std::make_shared(params, model_params); + } else if (name == "Minio") { + return std::make_shared(params, model_params); } else if (name == "Redis") { return std::make_shared(params, model_params); } else if (name == "Direct") { diff --git a/src/comm/Minio.cpp b/src/comm/Minio.cpp new file mode 100644 index 0000000..a7411fa --- /dev/null +++ b/src/comm/Minio.cpp @@ -0,0 +1,115 @@ +#include +#include "../../include/comm/Minio.h" +#include +#include +#include +#include +#include +#include + +// INFO Not sure if I needed to change that. +char MinioTAG[] = "MinioClient"; + +FMI::Comm::Minio::Minio(std::map params, std::map model_params) : ClientServer(params) { + if (instances == 0) { + // Only one call allowed (https://github.com/aws/aws-sdk-cpp/issues/456), give possible multiple clients control over initialization + Aws::InitAPI(options); + } + instances++; + bucket_name = params["bucket_name"]; + std::string hostname = params["host"]; + auto port = std::stoi(params["port"]); + + Aws::Client::ClientConfiguration config; + config.scheme = Aws::Http::Scheme::HTTP; // MinIO uses HTTP by default unless configured otherwise + config.endpointOverride = hostname + ":" + std::to_string(port); // Set MinIO endpoint + config.verifySSL = false; // MinIO does not use SSL by default + config.region = params["s3_region"]; + // According to ChatGPT this is no longer needed and was necessary in v1.8 + // config.useVirtualAddressing = false; // MinIO does not support virtual-hosted-style URLs + + + bandwidth = std::stod(model_params["bandwidth"]); + overhead = std::stod(model_params["overhead"]); + transfer_price = std::stod(model_params["transfer_price"]); + download_price = std::stod(model_params["download_price"]); + upload_price = std::stod(model_params["upload_price"]); + + auto credentialsProvider = Aws::MakeShared(MinioTAG); + client = Aws::MakeUnique(MinioTAG, credentialsProvider, config); +} + +FMI::Comm::Minio::~Minio() { + instances--; + if (instances == 0) { + Aws::ShutdownAPI(options); + } +} + +bool FMI::Comm::Minio::download_object(channel_data buf, std::string name) { + Aws::S3::Model::GetObjectRequest request; + request.WithBucket(bucket_name).WithKey(name); + auto outcome = client->GetObject(request); + if (outcome.IsSuccess()) { + auto& s = outcome.GetResult().GetBody(); + s.read(buf.buf, buf.len); + return true; + } else { + return false; + } +} + +void FMI::Comm::Minio::upload_object(channel_data buf, std::string name) { + Aws::S3::Model::PutObjectRequest request; + request.WithBucket(bucket_name).WithKey(name); + + const std::shared_ptr data = Aws::MakeShared(MinioTAG, buf.buf, buf.len); + + request.SetBody(data); + auto outcome = client->PutObject(request); + if (!outcome.IsSuccess()) { + BOOST_LOG_TRIVIAL(error) << "Error when uploading to S3: " << outcome.GetError(); + } +} + +void FMI::Comm::Minio::delete_object(std::string name) { + Aws::S3::Model::DeleteObjectRequest request; + request.WithBucket(bucket_name).WithKey(name); + auto outcome = client->DeleteObject(request); + if (!outcome.IsSuccess()) { + BOOST_LOG_TRIVIAL(error) << "Error when deleting from S3: " << outcome.GetError(); + } +} + +std::vector FMI::Comm::Minio::get_object_names() { + std::vector object_names; + Aws::S3::Model::ListObjectsRequest request; + request.WithBucket(bucket_name); + auto outcome = client->ListObjects(request); + if (outcome.IsSuccess()) { + auto objects = outcome.GetResult().GetContents(); + for (auto& object : objects) { + object_names.push_back(object.GetKey()); + } + } else { + BOOST_LOG_TRIVIAL(error) << "Error when listing objects from S3: " << outcome.GetError(); + } + return object_names; +} + +double FMI::Comm::Minio::get_latency(Utils::peer_num producer, Utils::peer_num consumer, std::size_t size_in_bytes) { + double fixed_overhead = overhead; + double waiting_time = (double) timeout / 2.; + double comm_overhead = fixed_overhead + waiting_time; + double agg_bandwidth = producer * consumer * bandwidth; + double trans_time = producer * consumer * ((double) size_in_bytes / 1000000.) / agg_bandwidth; + return log2(producer + consumer) * comm_overhead + trans_time; +} + +double FMI::Comm::Minio::get_price(Utils::peer_num producer, Utils::peer_num consumer, std::size_t size_in_bytes) { + double upload_costs = producer * upload_price + producer * ((double) size_in_bytes / 1000000000.) * transfer_price; + double expected_polls = (max_timeout / timeout) / 2; + double download_costs = producer * consumer * expected_polls * download_price + producer * consumer * ((double) size_in_bytes / 1000000000.) * transfer_price; + return upload_costs + download_costs; +} +