Skip to content

Merge Feature/async-api-variable-collectives into FMI origin#22

Open
mstaylor wants to merge 16 commits intospcl:mainfrom
mstaylor:feature/async-api-variable-collectives
Open

Merge Feature/async-api-variable-collectives into FMI origin#22
mstaylor wants to merge 16 commits intospcl:mainfrom
mstaylor:feature/async-api-variable-collectives

Conversation

@mstaylor
Copy link

@mstaylor mstaylor commented Feb 3, 2026

This pull request implements changes made based on including FMI as a Cylon communicator. You can find details about Cylon here: https://cylondata.org. The referenced Cylon is under review to be incorporated into the main branch for an upcoming v2 release: cylondata/cylon#691.

1. Non-Blocking I/O with Callbacks

Added async send/recv operations with callback-based completion notification:

// Non-blocking send with callback
ch->send(buf, dest, &ctx, FMI::Utils::NONBLOCKING,
    [](FMI::Utils::NbxStatus status, const std::string& msg, FMI::Utils::fmiContext* ctx) {
        if (status == FMI::Utils::SUCCESS) {
            // Send completed
        }
    });

// Poll for completion
while (!completed) {
    ch->channel_event_progress(FMI::Utils::send);
}

Key components:

  • Mode enum: BLOCKING / NONBLOCKING
  • NbxStatus enum: Detailed error codes (SUCCESS, SEND_FAILED, RECEIVE_FAILED, NBX_TIMEOUT, etc.)
  • EventProcessStatus enum: Progress tracking (PROCESSING, EMPTY, NOOP)
  • fmiContext struct: Completion context for tracking operations
  • channel_event_progress(): Poll-based progress function

2. Variable-Length Collective Operations

Added MPI-style variable-length collectives:

// Gatherv - gather variable amounts from each peer
std::vector<int32_t> recvcounts = {4, 8, 12, 16};  // bytes per peer
std::vector<int32_t> displs = {0, 4, 12, 24};       // byte displacements
ch->gatherv(sendbuf, recvbuf, root, recvcounts, displs);

// Allgather - gather fixed amounts, distribute to all
ch->allgather(sendbuf, recvbuf, root);

// Allgatherv - gather variable amounts, distribute to all
ch->allgatherv(sendbuf, recvbuf, root, recvcounts, displs);

Note: recvcounts and displs are in bytes, not elements (consistent with channel_data's byte-based API).

3. Memory Management with shared_ptr

Replaced raw pointers with std::shared_ptr<channel_data> throughout:

// Automatic memory management
auto buf = std::make_shared<channel_data>(size);

// External buffer with custom deleter (no-op for stack/external memory)
auto buf = std::make_shared<channel_data>(external_ptr, size, noop_deleter);

Benefits:

  • Automatic cleanup, no memory leaks
  • Safe sharing between async operations
  • noop_deleter for external buffer management

4. CMake Configuration

Added configurable Boost linking:

option(FMI_BOOST_STATIC "Use static Boost libraries" ON)

Build with shared Boost (e.g., from conda):

cmake -DFMI_BOOST_STATIC=OFF -DCMAKE_PREFIX_PATH=$CONDA_PREFIX ..

@@ -0,0 +1,8 @@
@PACKAGE_INIT@
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be easier to keep the TCPunch submodule and open a second PR in the other repository; would that be okay with you?


//! Send data to peer with id dest, must match a recv call
virtual void send(channel_data buf, FMI::Utils::peer_num dest) = 0;
virtual void send(std::shared_ptr<channel_data> buf, FMI::Utils::peer_num dest) = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why this is necessary - if you implemented data storage within channel_data as a shared ptr, then why do we need a pointer here? It looks like we have a nested shared pointer, and I'm not sure this is strictly necessary.

struct channel_data {
char* buf;
std::size_t len;
std::shared_ptr<char[]> buf;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that having RAII-style memory management is a good idea. Now, if we manage everything internally, then do we need a shared_ptr if we can just implement a destructor that frees memory?

Question is: are there situations where a single pointer is shared between multiple owners (possibly threads), and it's not easy to keep track who owns what?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating shared_ptr only to pass a noop_deleter looks a bit like the wrong abstraction.

explicit channel_data(std::size_t length)
: buf(std::shared_ptr<char[]>(new char[length])), len(length) {}

// From raw pointer with custom deleter (for external buffers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused because it looks like we have three classes of resources here: owned data, external buffer, and "original reference". Perhaps we need additional documentation to explain what are the semantics of each class.

FMI::Utils::fmiContext* context, FMI::Utils::Mode mode,
std::function<void(FMI::Utils::NbxStatus, const std::string&,
FMI::Utils::fmiContext*)> callback) {
// ClientServer doesn't support true non-blocking - just call blocking version
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory, one could implement the non-blocking version by having a thread polling on the storage in the background - we can leave a note in the documentation


hostname = params["host"];
port = std::stoi(params["port"]);
if (model_params["resolve_host_dns"] == "true") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain what is the purpose of this additional step?

return it;
}

int socketfd = it->first;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is my understanding correct: every time we call channel_event_progress, we check the state of each IOState separately?

I'm thinking if the entire implementation could not be simplified by a single epoll - create epoll structure, add all socket file descriptors, and then poll events in the loop. For that, you can use a blocking epoll with a timeout. This way, we skip all the unnecessary checks, and instead of looping all the time across all sockets, the process will sleep waiting for the next event.

{"host", "127.0.0.1"},
{"port", "10000"},
{"max_timeout", "1000"}
{"max_timeout", "5000"}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we still use the "5000" as a magic number somewhere in the code :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants