Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ find_package(hiredis REQUIRED)
add_subdirectory(extern/TCPunch/client/)

add_library(FMI STATIC src/Communicator.cpp src/utils/Configuration.cpp src/comm/Channel.cpp src/comm/ClientServer.cpp
src/comm/S3.cpp src/comm/Redis.cpp src/utils/ChannelPolicy.cpp src/comm/PeerToPeer.cpp src/comm/Direct.cpp)
src/comm/S3.cpp src/comm/Minio.cpp src/comm/Redis.cpp src/utils/ChannelPolicy.cpp src/comm/PeerToPeer.cpp src/comm/Direct.cpp)

target_include_directories(FMI PRIVATE ${Boost_INCLUDE_DIRS} ${TCPUNCH_INCLUDE_DIRS})

Expand Down
49 changes: 49 additions & 0 deletions include/comm/Minio.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#ifndef FMI_MINIO_H
#define FMI_MINIO_H

#include "ClientServer.h"
#include <map>
#include <string>
#include <aws/s3/S3Client.h>
#include <aws/core/Aws.h>
#include <boost/interprocess/streams/bufferstream.hpp>

namespace FMI::Comm {
//! Channel that uses AWS S3 as backend and uses the AWS SDK for C++ to access S3.
class Minio : public ClientServer {
public:
explicit Minio(std::map<std::string, std::string> params, std::map<std::string, std::string> model_params);

~Minio();

void upload_object(channel_data buf, std::string name) override;

bool download_object(channel_data buf, std::string name) override;

void delete_object(std::string name) override;

std::vector<std::string> get_object_names() override;

double get_latency(Utils::peer_num producer, Utils::peer_num consumer, std::size_t size_in_bytes) override;

double get_price(Utils::peer_num producer, Utils::peer_num consumer, std::size_t size_in_bytes) override;

private:
std::string bucket_name;
std::unique_ptr<Aws::S3::S3Client, Aws::Deleter<Aws::S3::S3Client>> client;
Aws::SDKOptions options;

//! Only one AWS SDK InitApi is allowed per application, we therefore track the number of instances (for multiple communicators) and call InitApi / ShutdownApi only on the first / last instance.
inline static int instances = 0;
// Model params
double bandwidth;
double overhead;
double transfer_price;
double download_price;
double upload_price;

};
}


#endif //FMI_MINIO_H
172 changes: 172 additions & 0 deletions python/config_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
from pydantic import BaseModel, Field
from pydantic.networks import IPvAnyAddress
from textwrap import dedent

# TODO replace unknown
class BackendBase(BaseModel):
enabled: bool = Field(
False,
description="Whether the backend is enabled")

max_timeout: int = Field(
1000,
gt=0,
description="The maximum time to wait for a request in milliseconds")


class DirectConfigModel(BaseModel):
# TODO: ipv6 ok as well?
host: IPvAnyAddress = Field(
"127.0.0.1",
description="The host to connect to given as an ipv4")
port: int = Field(
10000,
gt=0,
description="Port to use for connection")
# INFO: timeout is not needed because direct communication wasn't tested with collectives.


class RedisConfigModel(DirectConfigModel):
timeout: int = Field(
gt=0,
default=1,
description="Time to wait between polling attempts in milliseconds.")


class S3ConfigModel(BackendBase):
timeout: int = Field(
gt=0,
default=1,
description="Time to wait between polling attempts in milliseconds.")
bucket_name: str = Field(
"romanboe-uploadtest",
description="Name of storage bucket to use"
)
s3_region: str = Field(
"eu-central-1",
description="AWS region to use"
)


class MinioConfigModel(S3ConfigModel):
host: IPvAnyAddress = Field(
"127.0.0.1",
description="The host to connect to given as an ipv4"
)
port: int = Field(
9000,
gt=0,
description="Port to use for connection"
)

class BackendConfig(BaseModel):
S3: S3ConfigModel
Redis: RedisConfigModel
Direct: DirectConfigModel
Minio: MinioConfigModel


class FaaSModel(BaseModel):
gib_second_price: float = Field(
0.0000166667,
description=dedent("""
Price per second for a function with one GiB RAM. We assume
that the prices scale linearly with the amount of RAM (as it is common
today for the major providers), but other cost structures could easily
be incorporated.
""")
)


class S3Model(BaseModel):
bandwidth: float = Field(
50.0,
description="TODO"
)
overhead: float = Field(
40.4,
description="TODO"
)
transfer_price: float = Field(
0.0,
description="Data transfer price (per GB) between a function and object storage"
)
download_price: float = Field(
0.00000043,
description="Price per 1000 GET request on S3 Object Storage"
)
upload_price: float = Field(
0.0000054,
description="Price per 1000 PUT request on S3 Object Storage"
)


class RedisModel(BaseModel):
bandwidth_single: float = Field(
100.0,
description="TODO"
)
bandwidth_multiple: float = Field(
400.0,
description="TODO"
)
overhead: float = Field(
5.2,
description="TODO"
)
transfer_price: float = Field(
0.0,
description="Data transfer price (per GB) between a function and redis storage (=0)."
)
instance_price: float = Field(
0.0038,
description="TODO"
)
requests_per_hour: int = Field(
1000,
description="TODO"
)
include_infrastructure_costs: bool = Field(
True,
description="TODO"
)


class DirectModel(BaseModel):
bandwidth: float = Field(
400.0,
description="TODO"
)
overhead: float = Field(
0.34,
description="TODO"
)
transfer_price: float = Field(
0.0,
description="Data transfer price (per GB) between a function and another function (=0)."
)
vm_price: float = Field(
0.0134,
description="The price per hour for a VM that hosts the hole punching server."
)
requests_per_hour: int = Field(
1000,
description="TODO"
)
include_infrastructure_costs: bool = Field(
True,
description="TODO"
)


class ModelConfig(BaseModel):
FaaS: FaaSModel
S3: S3Model
Redis: RedisModel
Direct: DirectModel
Minio: S3Model


class FMIConfig(BaseModel):
backends: BackendConfig
model: ModelConfig
3 changes: 3 additions & 0 deletions src/comm/Channel.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
#include "../../include/comm/Channel.h"
#include "../../include/comm/S3.h"
#include "../../include/comm/Minio.h"
#include "../../include/comm/Redis.h"
#include "../../include/comm/Direct.h"

std::shared_ptr<FMI::Comm::Channel> FMI::Comm::Channel::get_channel(std::string name, std::map<std::string, std::string> params,
std::map<std::string, std::string> model_params) {
if (name == "S3") {
return std::make_shared<S3>(params, model_params);
} else if (name == "Minio") {
return std::make_shared<Minio>(params, model_params);
} else if (name == "Redis") {
return std::make_shared<Redis>(params, model_params);
} else if (name == "Direct") {
Expand Down
115 changes: 115 additions & 0 deletions src/comm/Minio.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#include <aws/core/auth/AWSCredentialsProvider.h>
#include "../../include/comm/Minio.h"
#include <boost/log/trivial.hpp>
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/s3/model/GetObjectRequest.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/ListObjectsRequest.h>
#include <cmath>

// INFO Not sure if I needed to change that.
char MinioTAG[] = "MinioClient";

FMI::Comm::Minio::Minio(std::map<std::string, std::string> params, std::map<std::string, std::string> model_params) : ClientServer(params) {
if (instances == 0) {
// Only one call allowed (https://github.com/aws/aws-sdk-cpp/issues/456), give possible multiple clients control over initialization
Aws::InitAPI(options);
}
instances++;
bucket_name = params["bucket_name"];
std::string hostname = params["host"];
auto port = std::stoi(params["port"]);

Aws::Client::ClientConfiguration config;
config.scheme = Aws::Http::Scheme::HTTP; // MinIO uses HTTP by default unless configured otherwise
config.endpointOverride = hostname + ":" + std::to_string(port); // Set MinIO endpoint
config.verifySSL = false; // MinIO does not use SSL by default
config.region = params["s3_region"];
// According to ChatGPT this is no longer needed and was necessary in v1.8
// config.useVirtualAddressing = false; // MinIO does not support virtual-hosted-style URLs


bandwidth = std::stod(model_params["bandwidth"]);
overhead = std::stod(model_params["overhead"]);
transfer_price = std::stod(model_params["transfer_price"]);
download_price = std::stod(model_params["download_price"]);
upload_price = std::stod(model_params["upload_price"]);

auto credentialsProvider = Aws::MakeShared<Aws::Auth::EnvironmentAWSCredentialsProvider>(MinioTAG);
client = Aws::MakeUnique<Aws::S3::S3Client>(MinioTAG, credentialsProvider, config);
}

FMI::Comm::Minio::~Minio() {
instances--;
if (instances == 0) {
Aws::ShutdownAPI(options);
}
}

bool FMI::Comm::Minio::download_object(channel_data buf, std::string name) {
Aws::S3::Model::GetObjectRequest request;
request.WithBucket(bucket_name).WithKey(name);
auto outcome = client->GetObject(request);
if (outcome.IsSuccess()) {
auto& s = outcome.GetResult().GetBody();
s.read(buf.buf, buf.len);
return true;
} else {
return false;
}
}

void FMI::Comm::Minio::upload_object(channel_data buf, std::string name) {
Aws::S3::Model::PutObjectRequest request;
request.WithBucket(bucket_name).WithKey(name);

const std::shared_ptr<Aws::IOStream> data = Aws::MakeShared<boost::interprocess::bufferstream>(MinioTAG, buf.buf, buf.len);

request.SetBody(data);
auto outcome = client->PutObject(request);
if (!outcome.IsSuccess()) {
BOOST_LOG_TRIVIAL(error) << "Error when uploading to S3: " << outcome.GetError();
}
}

void FMI::Comm::Minio::delete_object(std::string name) {
Aws::S3::Model::DeleteObjectRequest request;
request.WithBucket(bucket_name).WithKey(name);
auto outcome = client->DeleteObject(request);
if (!outcome.IsSuccess()) {
BOOST_LOG_TRIVIAL(error) << "Error when deleting from S3: " << outcome.GetError();
}
}

std::vector<std::string> FMI::Comm::Minio::get_object_names() {
std::vector<std::string> object_names;
Aws::S3::Model::ListObjectsRequest request;
request.WithBucket(bucket_name);
auto outcome = client->ListObjects(request);
if (outcome.IsSuccess()) {
auto objects = outcome.GetResult().GetContents();
for (auto& object : objects) {
object_names.push_back(object.GetKey());
}
} else {
BOOST_LOG_TRIVIAL(error) << "Error when listing objects from S3: " << outcome.GetError();
}
return object_names;
}

double FMI::Comm::Minio::get_latency(Utils::peer_num producer, Utils::peer_num consumer, std::size_t size_in_bytes) {
double fixed_overhead = overhead;
double waiting_time = (double) timeout / 2.;
double comm_overhead = fixed_overhead + waiting_time;
double agg_bandwidth = producer * consumer * bandwidth;
double trans_time = producer * consumer * ((double) size_in_bytes / 1000000.) / agg_bandwidth;
return log2(producer + consumer) * comm_overhead + trans_time;
}

double FMI::Comm::Minio::get_price(Utils::peer_num producer, Utils::peer_num consumer, std::size_t size_in_bytes) {
double upload_costs = producer * upload_price + producer * ((double) size_in_bytes / 1000000000.) * transfer_price;
double expected_polls = (max_timeout / timeout) / 2;
double download_costs = producer * consumer * expected_polls * download_price + producer * consumer * ((double) size_in_bytes / 1000000000.) * transfer_price;
return upload_costs + download_costs;
}