diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..fdc27b7 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "modules/libwinibverbs"] + path = modules/libwinibverbs + url = https://github.com/lucasz93/libwinibverbs diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..a19be80 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,56 @@ +cmake_minimum_required(VERSION 3.16) +project (hdrdmacp) +set(CMAKE_CXX_STANDARD 20) + +# +# Add submodules +# +if(WIN32) + add_subdirectory(modules/libwinibverbs) + set(winibverbs_include "modules/libwinibverbs/include") + set(iverbs_lib winibverbs) +else() + set(iverbs_lib ibverbs) +endif() + +find_package(ZLIB REQUIRED) + +#=============================================================================== +# LIBRARY +#=============================================================================== +add_library(libhdrdmacp SHARED + IhdRDMA.h + hdRDMA.cc + hdRDMA.h + hdRDMAThread.cc + hdRDMAThread.h +) +if(WIN32) + add_dependencies(libhdrdmacp winibverbs) +endif() +target_compile_definitions(libhdrdmacp PRIVATE _CRT_SECURE_NO_WARNINGS BUILDING_HDRDMA) +target_link_libraries(libhdrdmacp ${iverbs_lib} ${ZLIB_LIBRARY}) +target_include_directories(libhdrdmacp PRIVATE ${ZLIB_INCLUDE_DIRS} ${winibverbs_include}) + +set_target_properties(libhdrdmacp PROPERTIES OUTPUT_NAME hdrdmacp) + +install(TARGETS libhdrdmacp DESTINATION ${CMAKE_INSTALL_PREFIX}) +install(FILES + IhdRDMA.h + DESTINATION include) + +#=============================================================================== +# APPLICATION +#=============================================================================== +add_executable(hdrdmacp + hdrdmacp.cc +) +add_dependencies(hdrdmacp libhdrdmacp) +target_link_libraries(hdrdmacp libhdrdmacp) + +if (WIN32) + add_custom_command(TARGET hdrdmacp POST_BUILD + COMMAND ${CMAKE_COMMAND} -E copy $ $ + COMMAND_EXPAND_LISTS + ) +endif() diff --git a/IhdRDMA.h b/IhdRDMA.h new file mode 100644 index 0000000..39fab4c --- /dev/null +++ b/IhdRDMA.h @@ -0,0 +1,74 @@ +#pragma once + +#include +#include + +#ifdef _MSC_VER +# define HDRMDA_DLL_EXPORT __declspec(dllexport) +# define HDRMDA_DLL_IMPORT __declspec(dllimport) +#else +# define HDRMDA_DLL_EXPORT __attribute__((__visibility__("default"))) +# define HDRMDA_DLL_IMPORT +#endif + +#ifdef BUILDING_HDRDMA +# define HDRDMA_DLL HDRMDA_DLL_EXPORT +#else +# define HDRDMA_DLL HDRMDA_DLL_IMPORT +#endif + +namespace hdrdma +{ + class IhdRDMA; + class IPathDecoder; + + struct config + { + config(size_t buffer_section_sz, int buffer_section_count, const std::shared_ptr& path_decoder = nullptr) : BufferSectionSize(buffer_section_sz), BufferSectionCount(buffer_section_count), PathDecoder(path_decoder) {} + + const size_t BufferSectionSize; + const size_t BufferSectionCount; + + std::shared_ptr PathDecoder; + }; +} + +extern "C" +{ + // Raw pointers. You probably don't want to use these. + HDRDMA_DLL hdrdma::IhdRDMA* hdrdma_allocate(const hdrdma::config& config); + HDRDMA_DLL void hdrdma_free(hdrdma::IhdRDMA*); +} + +namespace hdrdma +{ + class IPathDecoder { + public: + virtual ~IPathDecoder() {} + + virtual std::string Decode(const std::string_view& path) const = 0; + }; + + class IhdRDMA { + public: + virtual ~IhdRDMA() {} + + virtual bool Good() const = 0; + virtual void Listen(int port) = 0; + virtual void StopListening(void) = 0; + virtual void Connect(std::string host, int port) = 0; + virtual void SendFile(std::string srcfilename, std::string dstfilename, bool delete_after_send = false, bool calculate_checksum = false, bool makeparentdirs = false) = 0; + virtual void Poll(void) = 0; + virtual void Join(void) = 0; + + virtual uint64_t TotalBytesReceived() const = 0; + }; + + using Ptr = std::unique_ptr; + + // Wrappers. You probably want to use these. + static auto Create(const hdrdma::config& config) + { + return Ptr(hdrdma_allocate(config), hdrdma_free); + } +} diff --git a/hdRDMA.cc b/hdRDMA.cc index 0bc3fdd..ec81770 100644 --- a/hdRDMA.cc +++ b/hdRDMA.cc @@ -1,9 +1,15 @@ -#include +#include "hdRDMA.h" +#ifdef __GNUC__ #include +#include // mmap, munmap +#include +#include +#include +#endif + #include -#include #include #include @@ -18,9 +24,66 @@ using std::chrono::duration; using std::chrono::duration_cast; using std::chrono::high_resolution_clock; -extern uint64_t HDRDMA_BUFF_LEN_GB; -extern uint64_t HDRDMA_NUM_BUFF_SECTIONS; +template +struct thp_allocator +{ + constexpr static std::size_t huge_page_size = 1 << 30; // 1 GiB + using value_type = T; + +#ifdef WIN32 +#undef max + static T *allocate(std::size_t n) + { + if (n > std::numeric_limits::max() / sizeof(T)) + { + throw std::bad_alloc(); + } + + void *p = _aligned_malloc(n, huge_page_size); + if (p == nullptr) + { + throw std::bad_alloc(); + } + + return static_cast(p); + } + static void deallocate(T *p) { _aligned_free(p); } +#else + static T *allocate(std::size_t n) + { + if (n > std::numeric_limits::max() / sizeof(T)) + { + throw std::bad_alloc(); + } + + void *p = nullptr; + posix_memalign(&p, huge_page_size, n * sizeof(T)); + madvise(p, n * sizeof(T), MADV_HUGEPAGE); + if (p == nullptr) + { + throw std::bad_alloc(); + } + + return static_cast(p); + } + + static void deallocate(T *p) { std::free(p); } +#endif +}; + +extern "C" +{ + HDRDMA_DLL hdrdma::IhdRDMA* hdrdma_allocate(const hdrdma::config& config) + { + return new hdRDMA(config); + } + + HDRDMA_DLL void hdrdma_free(hdrdma::IhdRDMA* hdrdma) + { + delete hdrdma; + } +} //------------------------------------------------------------- // hdRDMA @@ -28,7 +91,7 @@ extern uint64_t HDRDMA_NUM_BUFF_SECTIONS; // hdRDMA constructor. This will look for IB devices and set up // for RDMA communications on the first one it finds. //------------------------------------------------------------- -hdRDMA::hdRDMA() +hdRDMA::hdRDMA(const hdrdma::config& config) { cout << "Looking for IB devices ..." << endl; int num_devices = 0; @@ -48,9 +111,11 @@ hdRDMA::hdRDMA() case IBV_TRANSPORT_IWARP: transport_type = "IWARP"; break; +#ifdef __GNUC__ case IBV_EXP_TRANSPORT_SCIF: transport_type = "SCIF"; break; +#endif default: transport_type = "UNKNOWN"; break; @@ -82,12 +147,18 @@ hdRDMA::hdRDMA() } } ibv_close_device( ctx ); + ctx = nullptr; } // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - cout << " device " << i << " : " << devs[i]->name +#ifdef __GNUC__ << " : " << devs[i]->dev_name +#endif +#ifdef _MSC_VER + << " : " << devs[i]->name +#endif << " : " << transport_type << " : " << ibv_node_type_str(devs[i]->node_type) << " : Num. ports=" << Nports @@ -96,13 +167,20 @@ hdRDMA::hdRDMA() << " : lid=" << lid << endl; } + + if (!dev) + { + cout << "### No Infiniband adapters found. Is opensm running?" << endl; + return; + } + cout << "=============================================" << endl << endl; // Open device ctx = ibv_open_device(dev); if( !ctx ){ cout << "Error opening IB device context!" << endl; - exit(-11); + throw std::runtime_error("ibv_open_device failed"); } // Get device and port attributes @@ -113,7 +191,9 @@ hdRDMA::hdRDMA() ibv_query_gid(ctx, port_num, index, &gid); cout << "Device " << dev->name << " opened." +#ifdef __GNUC__ << " num_comp_vectors=" << ctx->num_comp_vectors +#endif << endl; // Print some of the port attributes @@ -126,51 +206,73 @@ hdRDMA::hdRDMA() cout << " active_width: " << (uint64_t)port_attr.active_width << endl; cout << " active_speed: " << (uint64_t)port_attr.active_speed << endl; cout << " phys_state: " << (uint64_t)port_attr.phys_state << endl; +#ifdef __GNUC__ cout << " link_layer: " << (uint64_t)port_attr.link_layer << endl; +#endif // Allocate protection domain pd = ibv_alloc_pd(ctx); if( !pd ){ cout << "ERROR allocation protection domain!" << endl; - exit(-12); + throw std::runtime_error("ibv_alloc_pd failed"); } - - // Allocate a large buffer and create a memory region pointing to it. - // We will split this one memory region among multiple receive requests - // n.b. initial tests failed on transfer for buffers larger than 1GB - uint64_t buff_len_GB = HDRDMA_BUFF_LEN_GB; - num_buff_sections = HDRDMA_NUM_BUFF_SECTIONS; - buff_section_len = (buff_len_GB*1000000000)/(uint64_t)num_buff_sections; - buff_len = num_buff_sections*buff_section_len; - buff = new uint8_t[buff_len]; - if( !buff ){ - cout << "ERROR: Unable to allocate buffer!" << endl; - exit(-13); - } - errno = 0; - auto access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE; - mr = ibv_reg_mr( pd, buff, buff_len, access); - if( !mr ){ - cout << "ERROR: Unable to register memory region! errno=" << errno << endl; - cout << " (Please see usage statement for a possible work around)" << endl; - exit( -14 ); - } - - // Fill in buffers - for( uint32_t i=0; i 2GB fail to deregister. + // The failure causes the entire system to freeze and become completely unusable! + // https://forums.developer.nvidia.com/t/cannot-deregister-memory-region-ibv-dereg-mr-if-mr-size-reach-2gb/217980 + // + // So instead of creating a single large memory region, create a number of smaller memory regions. + // This can hurt performance, but at least the system remains stable! + constexpr size_t PAGE_SIZE = 2'000'000'000; + const auto BUFFER_SECTIONS_PER_PAGE = PAGE_SIZE / config.BufferSectionSize; + auto remaining_buffer_size = config.BufferSectionSize * config.BufferSectionCount; + while (remaining_buffer_size) + { + auto page_sz = std::min(BUFFER_SECTIONS_PER_PAGE * config.BufferSectionSize, remaining_buffer_size); + hdBuffer buff; + + // Allocate a large buffer and create a memory region pointing to it. + // We will split this one memory region among multiple receive requests + // n.b. initial tests failed on transfer for buffers larger than 1GB + buff.num_buff_sections = page_sz / config.BufferSectionSize; + buff.buff_section_len = config.BufferSectionSize; + buff.buff_len = page_sz; + buff.buff = thp_allocator::allocate(buff.buff_len); + if( !buff.buff ){ + cout << "ERROR: Unable to allocate buffer!" << endl; + throw std::runtime_error("buff allocation failed"); + } + errno = 0; + auto access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE; + buff.mr = ibv_reg_mr( pd, buff.buff, buff.buff_len, access); + if( !buff.mr ){ + cout << "ERROR: Unable to register memory region! errno=" << errno << endl; + cout << " (Please see usage statement for a possible work around)" << endl; + throw std::runtime_error("ibv_reg_mr failed"); + } + + // Fill in buffers + for( uint32_t i=0; i 0 ){ + if( peer_sockfd != INVALID_SOCKET ){ // cout << "Connection from " << inet_ntoa(peer_addr.sin_addr) << endl; // Create a new thread to handle this connection auto hdthr = new hdRDMAThread( this ); auto thr = new std::thread( &hdRDMAThread::ThreadRun, hdthr, peer_sockfd ); - std::lock_guard lck( threads_mtx ); + std::scoped_lock lck( threads_mtx ); threads[ thr ] = hdthr; Nconnections++; @@ -269,11 +408,15 @@ void hdRDMA::StopListening(void) if( server_thread ){ cout << "Waiting for server to finish ..." << endl; done = true; + if (server_sockfd) + { + shutdown(server_sockfd, SHUT_RD); + closesocket(server_sockfd); + } + server_sockfd = 0; server_thread->join(); delete server_thread; server_thread = nullptr; - if( server_sockfd ) close( server_sockfd ); - server_sockfd = 0; }else{ cout << "Server not running." <ClientConnect( sockfd ); } @@ -343,7 +487,7 @@ void hdRDMA::Connect(std::string host, int port) //------------------------------------------------------------- uint32_t hdRDMA::GetNpeers(void) { - return threads.size(); + return (uint32_t)threads.size(); } //------------------------------------------------------------- @@ -351,11 +495,21 @@ uint32_t hdRDMA::GetNpeers(void) //------------------------------------------------------------- void hdRDMA::GetBuffers( std::vector &buffers, int Nrequested ) { - std::lock_guard grd( buffer_pool_mutex ); + std::unique_lock grd( buffer_pool_mutex ); + + while (!done && !buffer_pool.size()) + { + buffer_pool_cond.wait(grd); + } + + if (done) + { + return; + } //cout << "buffer_pool.size()="<SendFile( srcfilename, dstfilename, delete_after_send, calculate_checksum, makeparentdirs); @@ -410,14 +573,45 @@ void hdRDMA::Poll(void) } // Look for stopped threads and free their resources - std::lock_guard lck( threads_mtx ); + std::scoped_lock lck( threads_mtx ); + std::vector stopped; for( auto t : threads ){ if( t.second->stopped ){ t.first->join(); delete t.second; - threads.erase( t.first ); + stopped.push_back( t.first ); } } + for (auto s : stopped) { + threads.erase(s); + } + +} + +//------------------------------------------------------------- +// Join +// +// Waits for client threads to finish. +//------------------------------------------------------------- +void hdRDMA::Join(void) +{ + cout << "Waiting for clients to finish ..." << endl; + for (auto t : threads) { + t.first->join(); + delete t.second; + } + threads.clear(); +} +//------------------------------------------------------------- +// DecodePath +// +// Handles and user filesystem translations. +//------------------------------------------------------------- +std::string hdRDMA::DecodePath(const std::string_view& p) const +{ + return PathDecoder + ? PathDecoder->Decode(p) + : std::string(p); } diff --git a/hdRDMA.h b/hdRDMA.h index 4b607bd..63bc8e5 100644 --- a/hdRDMA.h +++ b/hdRDMA.h @@ -7,59 +7,83 @@ #include #include #include +#include +#if __GNUC__ #include #include #include +#endif -#include +#ifdef _MSC_VER +#ifndef WIN32 +#define WIN32 +#endif +#include +#endif #include -#include +#include "hdRDMAThread.h" +#include "IhdRDMA.h" -class hdRDMA{ +struct hdBuffer +{ + uint64_t buff_len = 0; + uint8_t *buff = nullptr; + uint64_t num_buff_sections = 0; + uint64_t buff_section_len = 0; + struct ibv_mr *mr = nullptr; +}; + +class hdRDMA final : public hdrdma::IhdRDMA { public: - hdRDMA(); - ~hdRDMA(); - void CreateQP(void); - void Listen(int port); - void StopListening(void); - void Connect(std::string host, int port); + hdRDMA(const hdrdma::config &config); + ~hdRDMA() override; + virtual bool Good() const override { return dev != nullptr; } + virtual void Listen(int port) override; + virtual void StopListening(void) override; + virtual void Connect(std::string host, int port) override; uint32_t GetNpeers(void); void GetBuffers( std::vector &buffers, int Nrequested=4 ); void ReturnBuffers( std::vector &buffers ); - void SendFile(std::string srcfilename, std::string dstfilename, bool delete_after_send=false, bool calculate_checksum=false, bool makeparentdirs=false); - void Poll(void); + virtual void SendFile(std::string srcfilename, std::string dstfilename, bool delete_after_send=false, bool calculate_checksum=false, bool makeparentdirs=false) override; + virtual void Poll(void) override; + virtual void Join(void) override; + + virtual uint64_t TotalBytesReceived() const override { return total_bytes_received; } + std::atomic_ullong total_bytes_received = 0; + + std::string DecodePath(const std::string_view& p) const; + std::shared_ptr PathDecoder; - struct ibv_device *dev = nullptr; struct ibv_context *ctx = nullptr; int port_num = 1; struct ibv_device_attr attr; struct ibv_port_attr port_attr; ibv_pd *pd = nullptr; - uint64_t buff_len = 0; - uint8_t *buff = nullptr; - uint64_t num_buff_sections = 0; - uint64_t buff_section_len = 0; - struct ibv_mr *mr = nullptr; + std::vector buffers; std::deque buffer_pool; std::mutex buffer_pool_mutex; + std::condition_variable buffer_pool_cond; bool done = false; - int server_sockfd = 0; + SOCKET server_sockfd = 0; std::thread *server_thread = nullptr; + std::thread *ack_thread = nullptr; uint32_t Nconnections = 0; - hdRDMAThread *hdthr_client = nullptr; + std::unique_ptr hdthr_client = nullptr; std::map threads; std::mutex threads_mtx; std::atomic Ntransferred; uint64_t Ntransferred_last = 0; std::chrono::high_resolution_clock::time_point t_last; + + std::string remote_addr; }; diff --git a/hdRDMAThread.cc b/hdRDMAThread.cc index 2f4ed5e..276fe94 100644 --- a/hdRDMAThread.cc +++ b/hdRDMAThread.cc @@ -3,7 +3,6 @@ #include #include #include -#include #include #include @@ -20,7 +19,6 @@ using std::chrono::duration; using std::chrono::duration_cast; using std::chrono::high_resolution_clock; -extern atomic BYTES_RECEIVED_TOT; extern std::string HDRDMA_REMOTE_ADDR; // @@ -42,6 +40,64 @@ extern std::string HDRDMA_REMOTE_ADDR; // be quite large, but we can support multiple simultaneous connections. // +#ifdef _MSC_VER +size_t send64(SOCKET s, const void *buffer, size_t sz, int flags) +{ + const uint64_t total = sz; + int bytes_to_write = 0; + int64_t bytes_written = 0; + + while (sz && bytes_written == bytes_to_write) + { + bytes_to_write = (int)std::min(sz, INT_MAX); + + bytes_written = send(s, (const char *)buffer, bytes_to_write, flags); + if (bytes_written < 0) + { + break; + } + + buffer = ((uint8_t *)buffer) + bytes_written; + sz -= bytes_written; + } + + return total - sz; +} + +size_t recv64(SOCKET s, void *buffer, size_t sz, int flags) +{ + const uint64_t total = sz; + int bytes_to_read = 0; + int64_t bytes_read = 0; + + while (sz && bytes_read == bytes_to_read) + { + bytes_to_read = (int)std::min(sz, INT_MAX); + + bytes_read = recv(s, (char *)buffer, bytes_to_read, flags); + if (bytes_read < 0) + { + break; + } + + buffer = ((uint8_t *)buffer) + bytes_read; + sz -= bytes_read; + } + + return total - sz; +} + +#include +#define mkdir(dir, mode) _mkdir(dir) +#define unlink _unlink + +typedef int mode_t; +#endif + +#ifdef __GNUC__ +#define send64 send +#define recv64 recv +#endif //----------------------------------------- // hdRDMAThread (constructor) @@ -60,27 +116,32 @@ hdRDMAThread::hdRDMAThread(hdRDMA *hdrdma) //----------------------------------------- hdRDMAThread::~hdRDMAThread() { - // Put QP insto RESET state so it releases all outstanding work requests - if( qp!=nullptr ){ - struct ibv_qp_attr qp_attr; - bzero( &qp_attr, sizeof(qp_attr) ); - qp_attr.qp_state = IBV_QPS_RESET; - ibv_modify_qp (qp, &qp_attr, IBV_QP_STATE); + Dispose(); +} + +//---------------------------------------------------------------------- +// ThreadRun +// +// This is run in a dedicated thread in server mode as soon as a +// TCP connection is established. It will exchange RDMA connection +// information over the given socket and then loop continously until +// the client signals it is done or the "stop" flag is set by the +// hdRDMA object. +//---------------------------------------------------------------------- +void hdRDMAThread::ThreadRun(SOCKET sockfd) +{ + try + { + TryThreadRun(sockfd); + } + catch(const std::exception& e) + { + std::cerr << e.what() << '\n'; } - // Delete all of our allocated objects - // n.b. order here matters! If the qp is destroyed after the - // comp_channel it will leave open a file descriptor pointing - // to [infinibandevent] that we have no way of closing! - if( qp!=nullptr ) ibv_destroy_qp( qp ); - if( cq!=nullptr ) ibv_destroy_cq ( cq ); - if( comp_channel!=nullptr ) ibv_destroy_comp_channel( comp_channel ); - if( ofs!=nullptr ) delete ofs; - - // Return MR buffers to pool - hdrdma->ReturnBuffers( buffers ); + Dispose(); } - + //---------------------------------------------------------------------- // ThreadRun // @@ -90,7 +151,7 @@ hdRDMAThread::~hdRDMAThread() // the client signals it is done or the "stop" flag is set by the // hdRDMA object. //---------------------------------------------------------------------- -void hdRDMAThread::ThreadRun(int sockfd) +void hdRDMAThread::TryThreadRun(SOCKET sockfd) { // The first thing we send via TCP is a 3 byte message indicating // success or failure. This really just allows us to inform the client @@ -103,7 +164,7 @@ void hdRDMAThread::ThreadRun(int sockfd) // This bit of magic ensures that the sockfd is closed and our "stopped" // flag is set before leaving this method, even if early due to error. - std::shared_ptr x(NULL, [&](int*){ close(sockfd); stopped=true;}); + std::shared_ptr x(NULL, [&](int*){ closesocket(sockfd); stopped=true;}); // Get pool buffers (up to 4). If none are available then tell // remote client we have too many RDMA connections. @@ -112,7 +173,7 @@ void hdRDMAThread::ThreadRun(int sockfd) // No buffers in MR available. Notify remote peer and exit thread std::string mess("BD: RDMA server has no more MR buffers (too many connections)"); cerr << mess << endl; - write(sockfd, mess.c_str(), mess.length()+1); + send64(sockfd, mess.c_str(), mess.length()+1, 0); return; } @@ -126,20 +187,20 @@ void hdRDMAThread::ThreadRun(int sockfd) // errors in a separate thread if we wanted to guarantee that we were // processing the data as fast as it is coming in. That adds some // significant complication so we skip it for now. - int cq_size = buffers.size(); + size_t cq_size = buffers.size(); comp_channel = ibv_create_comp_channel( hdrdma->ctx ); cq = ibv_create_cq( hdrdma->ctx, cq_size, NULL, comp_channel, 0); if( !cq ){ std::stringstream ss; ss << "BD: ERROR: Unable to create Completion Queue! errno=" << errno; cerr << ss.str() << endl; - write(sockfd, ss.str().c_str(), ss.str().length()+1); + send64(sockfd, ss.str().c_str(), ss.str().length()+1, 0); return; } // Tell remote peer we are ready to exchange QPInfo std::string mess("OK:"); - write(sockfd, mess.c_str(), mess.length()); + send64(sockfd, mess.c_str(), mess.length(), 0); // Exchange QP info over TCP socket so we can transmit via RDMA try{ @@ -197,9 +258,9 @@ void hdRDMAThread::ThreadRun(int sockfd) break; // exit thread } auto &buffer = buffers[id]; - auto buff = std::get<0>(buffer); + auto buff = buffer.Buffer; //auto buff_len = std::get<1>(buffer); - BYTES_RECEIVED_TOT += wc.byte_len; + hdrdma->total_bytes_received += wc.byte_len; ReceiveBuffer( buff, wc.byte_len ); //n.b. do NOT use buff_len here! t_last_received = high_resolution_clock::now(); @@ -221,19 +282,19 @@ void hdRDMAThread::PostWR( int id ) //cout << "Posting WR for id: " << id << endl; auto &buffer = buffers[id]; - auto buff = std::get<0>(buffer); - auto buff_len = std::get<1>(buffer); + auto buff = buffer.Buffer; + auto buff_len = buffer.BufferLen; struct ibv_recv_wr wr; struct ibv_sge sge; - bzero( &wr, sizeof(wr)); - bzero( &sge, sizeof(sge)); + memset( &wr, 0, sizeof(wr)); + memset( &sge, 0, sizeof(sge)); wr.wr_id = id; wr.sg_list = &sge; wr.num_sge = 1; sge.addr = (uint64_t)buff; sge.length = buff_len; - sge.lkey = hdrdma->mr->lkey; + sge.lkey = buffer.MR->lkey; auto ret = ibv_post_recv( qp, &wr, &bad_wr); if( ret != 0 ){ cout << "ERROR: ibv_post_recv returned non zero value (" << ret << ")" << endl; @@ -249,16 +310,13 @@ void hdRDMAThread::PostWR( int id ) // QP to the RTS (Ready To Send) state and RTR (Ready to Receive) // state. //------------------------------------------------------------- -void hdRDMAThread::ExchangeQPInfo( int sockfd ) +void hdRDMAThread::ExchangeQPInfo( SOCKET sockfd ) { - int n; + size_t n; struct QPInfo tmp_qp_info; // Create a new QP to use with the remote peer. CreateQP(); - - // Create a work receive request for each MR buffer we have - for( uint32_t id=0; idport_num, @@ -362,7 +423,7 @@ int hdRDMAThread::SetToRTS(void) /* Change QP state to RTR */ { struct ibv_qp_attr qp_attr; - bzero( &qp_attr, sizeof(qp_attr) ); + memset( &qp_attr, 0, sizeof(qp_attr) ); qp_attr.qp_state = IBV_QPS_RTR, qp_attr.path_mtu = IB_MTU, qp_attr.dest_qp_num = remote_qpinfo.qp_num, @@ -389,7 +450,7 @@ int hdRDMAThread::SetToRTS(void) /* Change QP state to RTS */ { struct ibv_qp_attr qp_attr; - bzero( &qp_attr, sizeof(qp_attr) ); + memset( &qp_attr, 0, sizeof(qp_attr) ); qp_attr.qp_state = IBV_QPS_RTS, qp_attr.timeout = 14, qp_attr.retry_cnt = 7, @@ -424,10 +485,9 @@ void hdRDMAThread::ReceiveBuffer(uint8_t *buff, uint32_t buff_len) if( ofs != nullptr ) { cout << "ERROR: Received new file buffer while file " << ofilename << " already open!" << endl; ofs->close(); - delete ofs; ofs = nullptr; } - ofilename = (char*)&hi->payload; + ofilename = hdrdma->DecodePath((const char*)&hi->payload); cout << "Receiving file: " << ofilename << endl; // Create parent directory path if specified by remote sender @@ -437,7 +497,7 @@ void hdRDMAThread::ReceiveBuffer(uint8_t *buff, uint32_t buff_len) if( pos != std::string::npos ) makePath( ofilename.substr(0, pos) ); } - ofs = new std::ofstream( ofilename.c_str() ); + ofs = std::make_unique(ofilename); ofilesize = 0; crcsum = adler32( 0L, Z_NULL, 0 ); calculate_checksum = (hi->flags & HI_CALCULATE_CHECKSUM); // optionally calculate checksum @@ -475,7 +535,6 @@ void hdRDMAThread::ReceiveBuffer(uint8_t *buff, uint32_t buff_len) duration duration_io = duration_cast>(t_io_end-t_io_start); delta_t_io += duration_io.count(); ofs->close(); - delete ofs; ofs = nullptr; } // auto t2 = high_resolution_clock::now(); @@ -525,11 +584,11 @@ void hdRDMAThread::ReceiveBuffer(uint8_t *buff, uint32_t buff_len) // via TCP to the server, but nothing will have been read/written // yet. //------------------------------------------------------------- -void hdRDMAThread::ClientConnect( int sockfd ) +void hdRDMAThread::ClientConnect( SOCKET sockfd ) { // This bit of magic ensures that the sockfd is closed and our "stopped" // flag is set before leaving this method, even if early due to error. - std::shared_ptr x(NULL, [&](int*){ close(sockfd); stopped=true;}); + std::shared_ptr x(NULL, [&](int*){ closesocket(sockfd); stopped=true;}); // Get pool buffers (all of them). If none are available then throw exception hdrdma->GetBuffers( buffers ); @@ -561,12 +620,12 @@ void hdRDMAThread::ClientConnect( int sockfd ) // Read first 3 bytes from TCP socket to make sure the server is able to // send us QPInfo. char str[256]; - bzero(str, 256); // status code does not include terminating null - auto n = read(sockfd, str, 3); + memset(str, 0, 256); // status code does not include terminating null + auto n = recv64(sockfd, str, 3, MSG_WAITALL); if( n!= 3 ) throw Exception("ERROR: Unable to read 3 byte status code from TCP socket!" ); if( std::string(str) != "OK:" ){ - auto n = read(sockfd, str, 256); + auto n = recv64(sockfd, str, 256, MSG_WAITALL); if( n<=0 ) sprintf(str, "Unknown error status from server"); throw Exception( str ); } @@ -580,12 +639,29 @@ void hdRDMAThread::ClientConnect( int sockfd ) // SendFile //------------------------------------------------------------- void hdRDMAThread::SendFile(std::string srcfilename, std::string dstfilename, bool delete_after_send, bool calculate_checksum, bool makeparentdirs) +{ + try + { + TrySendFile(srcfilename, dstfilename, delete_after_send, calculate_checksum, makeparentdirs); + } + catch(const std::exception& e) + { + std::cerr << e.what() << '\n'; + } + + Dispose(); +} + +//------------------------------------------------------------- +// TrySendFile +//------------------------------------------------------------- +void hdRDMAThread::TrySendFile(std::string srcfilename, std::string dstfilename, bool delete_after_send, bool calculate_checksum, bool makeparentdirs) { // Open local file std::ifstream ifs(srcfilename.c_str()); if( !ifs.is_open() ){ cerr <<"ERROR: Unable to open file \"" << srcfilename << "\"!" << endl; - exit(-40); + throw std::runtime_error("Couldn't open output file"); } // Get filesize @@ -595,20 +671,18 @@ void hdRDMAThread::SendFile(std::string srcfilename, std::string dstfilename, bo double filesize_GB = (double)filesize*1.0E-9; std::string mess = delete_after_send ? " - will be deleted after send":""; - cout << "Sending file: " << srcfilename << "-> (" << HDRDMA_REMOTE_ADDR << ":)" << dstfilename << " (" << filesize_GB << " GB)" << mess << endl; + cout << "Sending file: " << srcfilename << "-> (" << hdrdma->remote_addr << ":)" << dstfilename << " (" << filesize_GB << " GB)" << mess << endl; struct ibv_send_wr wr, *bad_wr = nullptr; struct ibv_sge sge; - bzero( &wr, sizeof(wr) ); - bzero( &sge, sizeof(sge) ); + memset( &wr, 0, sizeof(wr) ); + memset( &sge, 0, sizeof(sge) ); wr.opcode = IBV_WR_SEND; wr.sg_list = &sge; wr.num_sge = 1; wr.send_flags = IBV_SEND_SIGNALED, - sge.lkey = hdrdma->mr->lkey; - // Send buffers crcsum = adler32( 0L, Z_NULL, 0 ); t1 = high_resolution_clock::now(); @@ -620,9 +694,10 @@ void hdRDMAThread::SendFile(std::string srcfilename, std::string dstfilename, bo for(int i=0; i<1000; i++){ // if sending more than 1000 buffers something is wrong! auto id = i%buffers.size(); auto &buffer = buffers[id]; - auto buff = std::get<0>(buffer); - auto buff_len = std::get<1>(buffer); + auto buff = buffer.Buffer; + auto buff_len = buffer.BufferLen; sge.addr = (uint64_t)buff; + sge.lkey = buffer.MR->lkey; HeaderInfo *hi = (HeaderInfo*)sge.addr; hi->buff_type = 1; // buffer holds data for file transfer hi->flags = 0x0; @@ -634,7 +709,7 @@ void hdRDMAThread::SendFile(std::string srcfilename, std::string dstfilename, bo hi->flags |= HI_FIRST_BUFFER; // first buffer of file if( calculate_checksum ) hi->flags |= HI_CALCULATE_CHECKSUM; // tell remote server to calculate checksum if( makeparentdirs ) hi->flags |= HI_MAKE_PARENT_DIRS; // tell remote server to make directory path if needed - sprintf( (char*)&hi->payload, dstfilename.c_str() ); + sprintf( (char*)&hi->payload, "%s", dstfilename.c_str() ); }else{ hi->header_len = sizeof(*hi) - sizeof(hi->payload); } @@ -798,3 +873,32 @@ bool hdRDMAThread::makePath( const std::string &path ) } } +//------------------------------------------------------------- +// Dispose +//------------------------------------------------------------- +void hdRDMAThread::Dispose() +{ + // Put QP insto RESET state so it releases all outstanding work requests + if( qp!=nullptr ){ + struct ibv_qp_attr qp_attr; + memset( &qp_attr, 0, sizeof(qp_attr) ); + qp_attr.qp_state = IBV_QPS_RESET; + ibv_modify_qp (qp, &qp_attr, IBV_QP_STATE); + } + + // Delete all of our allocated objects + // n.b. order here matters! If the qp is destroyed after the + // comp_channel it will leave open a file descriptor pointing + // to [infinibandevent] that we have no way of closing! + if( qp!=nullptr ) ibv_destroy_qp( qp ); + if( cq!=nullptr ) ibv_destroy_cq ( cq ); + if( comp_channel!=nullptr ) ibv_destroy_comp_channel( comp_channel ); + + qp = nullptr; + cq = nullptr; + comp_channel = nullptr; + ofs = nullptr; + + // Return MR buffers to pool + hdrdma->ReturnBuffers( buffers ); +} diff --git a/hdRDMAThread.h b/hdRDMAThread.h index d9b96c4..98abab4 100644 --- a/hdRDMAThread.h +++ b/hdRDMAThread.h @@ -19,6 +19,20 @@ class hdRDMA; +#ifdef __GNUC__ +#define PACK( __Declaration__ ) __Declaration__ __attribute__((__packed__)) +#endif + +#ifdef _MSC_VER +#define PACK( __Declaration__ ) __pragma( pack(push, 1) ) __Declaration__ __pragma( pack(pop)) +#endif + +#ifdef __GNUC__ +typedef int SOCKET; +#define closesocket close +#define INVALID_SOCKET -1 +#endif + class hdRDMAThread{ public: @@ -32,18 +46,18 @@ class hdRDMAThread{ }HeaderInfoFlag_t; // Header info sent as first bytes of data packed - struct HeaderInfo { + PACK(struct HeaderInfo { uint32_t header_len; uint16_t buff_type; uint16_t flags; // bit 0=first, 1=last uint32_t payload; - }__attribute__ ((packed)); + }); // Hold info of queue pair on one side of connection - struct QPInfo { + PACK(struct QPInfo { uint16_t lid; uint32_t qp_num; - }__attribute__ ((packed)); + }); class Exception:public std::exception{ public: @@ -51,23 +65,33 @@ class hdRDMAThread{ const char* what(void) const noexcept { return mess.c_str(); } std::string mess; }; - - typedef std::tuple bufferinfo; + struct bufferinfo + { + bufferinfo(uint8_t* buff, uint32_t buff_len, struct ibv_mr* mr) : Buffer(buff), BufferLen(buff_len), MR(mr) {} + uint8_t* Buffer = nullptr; + uint32_t BufferLen = 0; + struct ibv_mr* MR = nullptr; + }; + hdRDMAThread(hdRDMA *hdrdma); ~hdRDMAThread(); - void ThreadRun(int sockfd); + void ThreadRun(SOCKET sockfd); + void TryThreadRun(SOCKET sockfd); void PostWR( int id ); // id= index to buffers - void ExchangeQPInfo( int sockfd ); + void ExchangeQPInfo( SOCKET sockfd ); void CreateQP(void); int SetToRTS(void); void ReceiveBuffer(uint8_t *buff, uint32_t buff_len); - void ClientConnect( int sockfd ); + void ClientConnect( SOCKET sockfd ); void SendFile(std::string srcfilename, std::string dstfilename, bool delete_after_send=false, bool calculate_checksum=false, bool makeparentdirs=false); + void TrySendFile(std::string srcfilename, std::string dstfilename, bool delete_after_send=false, bool calculate_checksum=false, bool makeparentdirs=false); void PollCQ(void); bool makePath( const std::string &path ); + + void Dispose(); bool stop = false; // Flag so thread can be told to stop bool stopped = false; // Flag so thread can declare it has stopped @@ -83,7 +107,7 @@ class hdRDMAThread{ QPInfo qpinfo; QPInfo remote_qpinfo; - std::ofstream *ofs = nullptr; + std::unique_ptr ofs; std::string ofilename; uint64_t ofilesize = 0; uint32_t crcsum; diff --git a/hdrdmacp.cc b/hdrdmacp.cc index c0e6ff0..46eb880 100644 --- a/hdrdmacp.cc +++ b/hdrdmacp.cc @@ -1,11 +1,15 @@ +#ifdef __GNUC__ #include +#endif -//#include -#include +#include "IhdRDMA.h" #include #include +#include +#include +#include using namespace std; using std::chrono::steady_clock; using std::chrono::duration; @@ -40,7 +44,8 @@ int main(int narg, char *argv[]) ParseCommandLineArguments( narg, argv ); // Create an hdRDMA object - hdRDMA hdrdma; + const hdrdma::config hdrdma_config(HDRDMA_BUFF_LEN_GB * 1000'000'000 / HDRDMA_NUM_BUFF_SECTIONS, HDRDMA_NUM_BUFF_SECTIONS); + auto hdrdma = hdrdma::Create(hdrdma_config); // Listen for remote peers if we are in server mode // This will launch a thread and listen for any remote connections. @@ -49,7 +54,7 @@ int main(int narg, char *argv[]) // This will return right away so one must check the GetNpeers() method // to see when a connection is made. if( HDRDMA_IS_SERVER ){ - hdrdma.Listen( HDRDMA_LOCAL_PORT ); + hdrdma->Listen( HDRDMA_LOCAL_PORT ); // We want to report 10sec, 1min, and 5min averages auto t_last_10sec = steady_clock::now(); @@ -61,7 +66,7 @@ int main(int narg, char *argv[]) while( true ){ - hdrdma.Poll(); + hdrdma->Poll(); auto now = steady_clock::now(); auto duration_10sec = duration_cast(now - t_last_10sec); @@ -99,7 +104,7 @@ int main(int narg, char *argv[]) } // Stop server from listening (if one is) - hdrdma.StopListening(); + hdrdma->StopListening(); } // Connect to remote peer if we are in client mode. @@ -109,8 +114,8 @@ int main(int narg, char *argv[]) // be made available for transfers. If the connection cannot be made // then it will exit the program with an error message. if( HDRDMA_IS_CLIENT ){ - hdrdma.Connect( HDRDMA_REMOTE_ADDR, HDRDMA_REMOTE_PORT ); - hdrdma.SendFile( HDRDMA_SRCFILENAME, HDRDMA_DSTFILENAME, HDRDMA_DELETE_AFTER_SEND, HDRDMA_CALCULATE_CHECKSUM, HDRDMA_MAKE_PARENT_DIRS ); + hdrdma->Connect( HDRDMA_REMOTE_ADDR, HDRDMA_REMOTE_PORT ); + hdrdma->SendFile( HDRDMA_SRCFILENAME, HDRDMA_DSTFILENAME, HDRDMA_DELETE_AFTER_SEND, HDRDMA_CALCULATE_CHECKSUM, HDRDMA_MAKE_PARENT_DIRS ); } return 0; diff --git a/modules/libwinibverbs b/modules/libwinibverbs new file mode 160000 index 0000000..6591fb7 --- /dev/null +++ b/modules/libwinibverbs @@ -0,0 +1 @@ +Subproject commit 6591fb78aab09ac0eb2f392af6d3ac01f5d4300b