From 47a9a025f63c194614acac85562454b5eb304664 Mon Sep 17 00:00:00 2001 From: Ben Lincoln Date: Fri, 28 Mar 2025 11:13:22 -0700 Subject: [PATCH 1/2] Merged changes from much more recent upstream revision 25767, updated README --- README.md | 160 ++++++++++---- SConscript | 15 ++ SConstruct => SConstruct.unused | 2 +- hdRDMA.cc | 87 +++++--- hdRDMA.h | 5 + hdRDMAThread.cc | 243 +++++++++++++++++--- hdRDMAThread.h | 2 + hdRDMAcontrol.cc | 169 ++++++++++++++ hdRDMAcontrol.h | 31 +++ hdRDMAstats.cc | 268 +++++++++++++++++++++++ hdRDMAstats.h | 28 +++ hdrdmacp.cc | 263 +++++++++++++++++++--- hdrdmacpinfo.py | 55 +++++ sysconfig/hdrdmacp | 3 + systemd/README | 23 ++ systemd/hdrdmacp | 46 ++++ systemd/hdrdmacp.service.d/override.conf | 19 ++ 17 files changed, 1283 insertions(+), 136 deletions(-) create mode 100644 SConscript rename SConstruct => SConstruct.unused (78%) create mode 100644 hdRDMAcontrol.cc create mode 100644 hdRDMAcontrol.h create mode 100644 hdRDMAstats.cc create mode 100644 hdRDMAstats.h create mode 100644 hdrdmacpinfo.py create mode 100644 sysconfig/hdrdmacp create mode 100644 systemd/README create mode 100644 systemd/hdrdmacp create mode 100644 systemd/hdrdmacp.service.d/override.conf diff --git a/README.md b/README.md index b722205..f67b251 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,50 @@ doesn't already exist. 3. This was written to run on some memory-heavy machines so the default buffer sizes are quite large. The may be changed with some command-line options (see below). +## Prerequisites + +### Libraries + +Building a working version of hdrdmacp requires the following libraries and their associated headers: + +* `libpthread` +* `libverbs` or `libibverbs` +* `zeromq` +* `zlib` + +e.g. to build hdrdmacp on Debian 12, the following command will install the necessary packages: + +``` +sudo apt install rdma-core libibverbs1 librdmacm1 libibmad5 libibumad3 librdmacm1 ibverbs-providers rdmacm-utils infiniband-diags libfabric1 ibverbs-utils libzmq3-dev libibverbs-dev libz-dev +``` + +on Fedora 41: + +``` +sudo dnf install rdma-core libibverbs libibverbs-devel libibverbs-utils zeromq zeromq-devel zlib-devel +``` + +### Sufficient max locked memory limit + +Some Unix and Linux distributions default to a low value for the maximum locked memory limit, and this will cause hdrdmacp to exit with the following error: + +``` +ERROR: Unable to register memory region! errno=12 + (Please see usage statement for a possible work around) +``` + +Before running hdrdmacp, run the `ulimit -l` command. If the output is small, such as Fedora's default of 8192, you will need to increase the limit. Debian's default of 2558652 seems to be sufficient for at least basic use, but you can also use the special `unlimited` value to remove the limit entirely. + +Non-`root` users are likely unable to increase their own limit, so consider one of the following options: + +1 - `su` to `root`, then run `ulimit -l unlimited` before running hdrdmacp. +2 - As `root`, add the following two lines to /etc/security/limits.conf, then log off and back on: + +``` +@wheel hard memlock 2558652 +@wheel soft memlock 2558652 +``` + ## Building Commands for downloading and building are below. There is a SConscript file which can be @@ -40,14 +84,13 @@ program though, it is also easy to just build it via a single command as shown. > cd hdrdmacp -> c++ -o hdrdmacp *.cc -libverbs -lz +> c++ -DHAVE_ZEROMQ=1 --std=c++11 -g -o hdrdmacp *.cc -libverbs -lz -lpthread -lzmq -I. ## Running Run the program with "--help" to get the help statement:
-
 Hall-D RDMA file copy server/client
 
 Usage:
@@ -71,48 +114,87 @@ Note: In the options below:
  options:
     -c         calculate checksum (adler32 currently only prints) (CMO)
     -d         delete source file upon successful transfer (CMO)
+    -g  group  set effective group (useful if run as a system service) (SMO)
     -h         print this usage statement.
     -m  GB     total memory to allocate (def. 8GB for server, 1GB for client)
     -n  Nbuffs number of buffers to break the allocated memory into. This
                will determine the size of RDMA transfer requests.
     -P         make parent directory path on remote host if needed (CMO)
-    -p port    set remote port to connect to (can also be given in dest name) (CMO)
+    -p  port   set remote port to connect to (can also be given in dest name) (CMO)
     -s         server mode (SMO)
-    -sp        server port to listen on (default is 10470) (SMO)
-
-NOTES:
-  1. The full filename on the destination must be specfied, not just
-     a directory. This is not checked for automatically so the user
-     must take care.
-
-  2. The remote host and port refer to a TCP connection that is
-     first made to exchange the RDMA connection info. The file is
-     then transferred via RDMA.
-
-  3. The destination port may be speficied either via the -p option
-     or as part of the destination argument. e.g.
-        my.remote.host:12345:/path/to/my/destfilename
-     if both are given then the one given in the destination argument
-     is used.
-
-  4. If you see an error about "Unable to register memory region!" then
-     this may be due to the maximum locked memory size. Check this by
-     running "limit" if using tcsh and looking for "memorylocked". If
-     using bash, then run "ulimit -a" and look for "max locked memory".
-     These should be set to "unlimited". On some of our systems this defaults
-     to 64kB and would not honor global settings. A wierd work around was to
-     do a "su $USER" which set it to "unlimited". (I do not understand why.)
-
-Example:
-  On destination host run:  hdrdmacp -s
-
-  On source host run:  hdrdmacp /path/to/my/srcfile my.remote.host:/path/to/my/destfile
-
-Note that the above will fail if /path/to/my does not already exist on
-my.remote.host. If you add the -P argument then /path/to/my will be 
-automatically create (if it doesn't already exist).
+    -sp port   server port to listen on (default is 10470) (SMO)
+    -u  user   set effective user (useful if run as a system service) (SMO)
+    -v         increase verbosity level
+    -q         quiet mode (set verbosity level to 0)
+    -zp port   port to publish stats as zeroMQ messages to
+    -cp port   port to listen for zeroMQ control messages on
+    -cmd host "command arg ..." send command to specified host
+
+ (run with --help for extended help)
 
- - - +## Example + +### Start the server on the destination system + +``` +$ ./hdrdmacp -m 1 -n 4 -s +Looking for IB devices ... + +============================================= +Found 1 devices +--------------------------------------------- + device 0 : ibp2s0 : uverbs0 : IB : InfiniBand channel adapter : Num. ports=2 : port num=1 : lid=3 +============================================= + +Device ibp2s0 opened. num_comp_vectors=16 +Port attributes: + state: 4 + max_mtu: 5 + active_mtu: 5 + port_cap_flags: 39405672 + max_msg_sz: 1073741824 + active_width: 2 + active_speed: 4 + phys_state: 5 + link_layer: 1 +Created 4 buffers of 250MB (1GB total) +Launching hdRDMAstats thread ... +Launching hdRDMAcontrol thread ... +hdRDMAstats::Publish called +hdRDMAcontrol::Publish called +Listening for connections on port ... 10470 +``` + +### Transfer a file from the source system + +``` +$ ./hdrdmacp /home/user/test_file ibtest2:/home/user/test_file +Looking for IB devices ... + +============================================= +Found 1 devices +--------------------------------------------- + device 0 : ibp3s0 : uverbs0 : IB : InfiniBand channel adapter : Num. ports=2 : port num=1 : lid=2 +============================================= + +Device ibp3s0 opened. num_comp_vectors=16 +Port attributes: + state: 4 + max_mtu: 5 + active_mtu: 5 + port_cap_flags: 38881384 + max_msg_sz: 1073741824 + active_width: 2 + active_speed: 4 + phys_state: 5 + link_layer: 1 +Created 4 buffers of 250MB (1GB total) +IP address: 172.16.253.11 (ibtest2) +Connected to ibtest2:10470 +Sending file: /home/user/test_file-> (ibtest2:)/home/user/test_file (0.000292292 GB) + queued 0MB (0/0 MB -- 100% - 30.7032 Gbps) + Transferred 0.292292 MB in 0.000919991 sec (2541.69 Mbps) + I/O rate reading from file: 7.176e-05 sec (32585.5 Mbps) + Confirmed remote file size matches local: 292292 bytes +``` diff --git a/SConscript b/SConscript new file mode 100644 index 0000000..ded81a5 --- /dev/null +++ b/SConscript @@ -0,0 +1,15 @@ + + +import sbms + +# get env object and clone it +Import('*') +env = env.Clone() + +sbms.AddZEROMQ(env) + +env.AppendUnique(CXXFLAGS=['--std=c++11', '-g']) +env.AppendUnique(LIBS=['ibverbs','z', 'pthread']) + +sbms.executable(env) + diff --git a/SConstruct b/SConstruct.unused similarity index 78% rename from SConstruct rename to SConstruct.unused index 519aa0e..f62d8e9 100644 --- a/SConstruct +++ b/SConstruct.unused @@ -1,7 +1,7 @@ import glob -#env = Environment(CXX='/apps/gcc/4.9.2/bin/g++') +env = Environment(CXX='/apps/gcc/4.9.2/bin/g++') env.AppendUnique(CPPPATH=['.']) env.AppendUnique(CXXFLAGS=['--std=c++11', '-g']) diff --git a/hdRDMA.cc b/hdRDMA.cc index 0bc3fdd..aceea44 100644 --- a/hdRDMA.cc +++ b/hdRDMA.cc @@ -9,6 +9,7 @@ #include #include #include +#include using std::cout; using std::cerr; @@ -18,6 +19,7 @@ using std::chrono::duration; using std::chrono::duration_cast; using std::chrono::high_resolution_clock; +extern int VERBOSE; extern uint64_t HDRDMA_BUFF_LEN_GB; extern uint64_t HDRDMA_NUM_BUFF_SECTIONS; @@ -30,14 +32,14 @@ extern uint64_t HDRDMA_NUM_BUFF_SECTIONS; //------------------------------------------------------------- hdRDMA::hdRDMA() { - cout << "Looking for IB devices ..." << endl; + if(VERBOSE) cout << "Looking for IB devices ..." << endl; int num_devices = 0; struct ibv_device **devs = ibv_get_device_list( &num_devices ); // List devices - cout << endl << "=============================================" << endl; - cout << "Found " << num_devices << " devices" << endl; - cout << "---------------------------------------------" << endl; + if(VERBOSE)cout << endl << "=============================================" << endl; + if(VERBOSE)cout << "Found " << num_devices << " devices" << endl; + if(VERBOSE)cout << "---------------------------------------------" << endl; for(int i=0; iport_num = port_num; } } ibv_close_device( ctx ); } // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - cout << " device " << i + if(VERBOSE>0){ + cout << " device " << i << " : " << devs[i]->name << " : " << devs[i]->dev_name << " : " << transport_type @@ -95,8 +97,9 @@ hdRDMA::hdRDMA() // << " : GUID=" << ibv_get_device_guid(devs[i]) << " : lid=" << lid << endl; + } } - cout << "=============================================" << endl << endl; + if(VERBOSE) cout << "=============================================" << endl << endl; // Open device ctx = ibv_open_device(dev); @@ -112,21 +115,23 @@ hdRDMA::hdRDMA() ibv_query_port( ctx, port_num, &port_attr); ibv_query_gid(ctx, port_num, index, &gid); - cout << "Device " << dev->name << " opened." + if(VERBOSE>0){ + cout << "Device " << dev->name << " opened." << " num_comp_vectors=" << ctx->num_comp_vectors << endl; - // Print some of the port attributes - cout << "Port attributes:" << endl; - cout << " state: " << port_attr.state << endl; - cout << " max_mtu: " << port_attr.max_mtu << endl; - cout << " active_mtu: " << port_attr.active_mtu << endl; - cout << " port_cap_flags: " << port_attr.port_cap_flags << endl; - cout << " max_msg_sz: " << port_attr.max_msg_sz << endl; - cout << " active_width: " << (uint64_t)port_attr.active_width << endl; - cout << " active_speed: " << (uint64_t)port_attr.active_speed << endl; - cout << " phys_state: " << (uint64_t)port_attr.phys_state << endl; - cout << " link_layer: " << (uint64_t)port_attr.link_layer << endl; + // Print some of the port attributes + cout << "Port attributes:" << endl; + cout << " state: " << port_attr.state << endl; + cout << " max_mtu: " << port_attr.max_mtu << endl; + cout << " active_mtu: " << port_attr.active_mtu << endl; + cout << " port_cap_flags: " << port_attr.port_cap_flags << endl; + cout << " max_msg_sz: " << port_attr.max_msg_sz << endl; + cout << " active_width: " << (uint64_t)port_attr.active_width << endl; + cout << " active_speed: " << (uint64_t)port_attr.active_speed << endl; + cout << " phys_state: " << (uint64_t)port_attr.phys_state << endl; + cout << " link_layer: " << (uint64_t)port_attr.link_layer << endl; + } // Allocate protection domain pd = ibv_alloc_pd(ctx); @@ -162,14 +167,15 @@ hdRDMA::hdRDMA() hdRDMAThread::bufferinfo bi = std::make_tuple( b, buff_section_len ); buffer_pool.push_back( bi ); } - cout << "Created " << buffer_pool.size() << " buffers of " << buff_section_len/1000000 << "MB (" << buff_len/1000000000 << "GB total)" << endl; + if(VERBOSE) cout << "Created " << buffer_pool.size() << " buffers of " << buff_section_len/1000000 << "MB (" << buff_len/1000000000 << "GB total)" << endl; // Create thread to listen for async ibv events new std::thread( [&](){ + pthread_setname_np( pthread_self(), "ibv_async_event" ); while( !done ){ struct ibv_async_event async_event; auto ret = ibv_get_async_event( ctx, &async_event); - cout << "+++ RDMA async event: type=" << async_event.event_type << " ret=" << ret << endl; + if(VERBOSE) cout << "+++ RDMA async event: type=" << async_event.event_type << " ret=" << ret << endl; ibv_ack_async_event( &async_event ); } }); @@ -213,8 +219,11 @@ void hdRDMA::Listen(int port) addr.sin_family = AF_INET; addr.sin_addr.s_addr = htonl(INADDR_ANY); addr.sin_port = htons( port ); - + server_sockfd = socket(AF_INET, SOCK_STREAM, 0); + int flags = fcntl( server_sockfd, F_GETFL); + fcntl( server_sockfd, F_SETFL, flags | O_NONBLOCK ); // set to non-blocking so program can exit cleanly if prompted + auto ret = bind( server_sockfd, (struct sockaddr*)&addr, sizeof(addr) ); if( ret != 0 ){ cout << "ERROR: binding server socket!" << endl; @@ -225,9 +234,11 @@ void hdRDMA::Listen(int port) // Create separate thread to accept socket connections so we don't block std::atomic thread_started(false); server_thread = new std::thread([&](){ - + + pthread_setname_np( pthread_self(), "hdRDMA:Listen" ); + // Loop forever accepting connections - cout << "Listening for connections on port ... " << port << endl; + if(VERBOSE) cout << "Listening for connections on port ... " << port << endl; thread_started = true; while( !done ){ @@ -235,7 +246,7 @@ void hdRDMA::Listen(int port) struct sockaddr_in peer_addr; socklen_t peer_addr_len = sizeof(struct sockaddr_in); peer_sockfd = accept(server_sockfd, (struct sockaddr *)&peer_addr, &peer_addr_len); - if( peer_sockfd > 0 ){ + if( peer_sockfd >= 0 ){ // cout << "Connection from " << inet_ntoa(peer_addr.sin_addr) << endl; // Create a new thread to handle this connection @@ -246,12 +257,15 @@ void hdRDMA::Listen(int port) Nconnections++; }else{ - cout << "Failed connection! errno=" << errno <join(); delete server_thread; @@ -275,7 +289,7 @@ void hdRDMA::StopListening(void) if( server_sockfd ) close( server_sockfd ); server_sockfd = 0; }else{ - cout << "Server not running." <ai_family, ptr, addrstr, 100 ); - cout << "IP address: " << addrstr << " (" << result->ai_canonname << ")" << endl; + if(VERBOSE) cout << "IP address: " << addrstr << " (" << result->ai_canonname << ")" << endl; result = result->ai_next; } @@ -327,7 +341,7 @@ void hdRDMA::Connect(std::string host, int port) cout << "ERROR: connecting to server: " << host << " (" << inet_ntoa(addr.sin_addr) << ")" << endl; exit(-3); }else{ - cout << "Connected to " << host << ":" << port << endl; + if(VERBOSE) cout << "Connected to " << host << ":" << port << endl; } // Create an hdRDMAThread object to handle the RDMA connection details. @@ -416,6 +430,7 @@ void hdRDMA::Poll(void) t.first->join(); delete t.second; threads.erase( t.first ); + break; // threads is now modified } } diff --git a/hdRDMA.h b/hdRDMA.h index 4b607bd..38d3ed6 100644 --- a/hdRDMA.h +++ b/hdRDMA.h @@ -18,6 +18,11 @@ #include +#ifndef _DBG_ +#define _DBG_ std::cerr<<__FILE__<<":"<<__LINE__<<" " +#define _DBG__ std::cerr<<__FILE__<<":"<<__LINE__<<"\n" +#endif // _DBG_ + class hdRDMA{ public: diff --git a/hdRDMAThread.cc b/hdRDMAThread.cc index 2f4ed5e..62a2a8e 100644 --- a/hdRDMAThread.cc +++ b/hdRDMAThread.cc @@ -6,22 +6,34 @@ #include #include #include +#include +#include +#include +#include +#include +#include #include #include "hdRDMA.h" - -using std::cout; -using std::cerr; -using std::endl; -using std::atomic; +using namespace std; using std::chrono::duration; using std::chrono::duration_cast; using std::chrono::high_resolution_clock; extern atomic BYTES_RECEIVED_TOT; +extern atomic NFILES_RECEIVED_TOT; extern std::string HDRDMA_REMOTE_ADDR; +extern int VERBOSE; +extern int HDRDMA_RET_VAL; +extern string HDRDMA_USERNAME; +extern string HDRDMA_GROUPNAME; +extern std::mutex HDRDMA_RECV_FNAMES_MUTEX; +extern std::set HDRDMA_RECV_FNAMES; + + +extern string SendControlCommand(string host, string command); // // Some notes on server mode: @@ -92,6 +104,8 @@ hdRDMAThread::~hdRDMAThread() //---------------------------------------------------------------------- void hdRDMAThread::ThreadRun(int sockfd) { + pthread_setname_np( pthread_self(), "hdRDMAThread::ThreadRun" ); + // The first thing we send via TCP is a 3 byte message indicating // success or failure. This really just allows us to inform the client // if the server cannot accept another connection right now due to @@ -148,6 +162,11 @@ void hdRDMAThread::ThreadRun(int sockfd) cerr << e.what() << endl; return; } + + // Set the filesystem UID/GID if specified by the user. This is primarily for when this is + // run as root (e.g. as a service) so it can interact with the filesystem as an unpriviliged + // user. + SetUIDGID(); // Loop until we're told to stop by either the master thread or the // remote peer declaring the connection is closing. @@ -210,6 +229,78 @@ void hdRDMAThread::ThreadRun(int sockfd) } +//------------------------------------------------------------- +// SetUIDGID +// +// This is called to (optionally) set the filesystem uid and gid +// of the process when run in server mode. This is called by +// each hdRDMAThread since the fsuid and fsgid must be set for +// each thread. +//------------------------------------------------------------- +void hdRDMAThread::SetUIDGID(void) +{ + // Note that this calls setfsuid and setfsgid to set the + // user and group ids when dealing with the filesystem + // ONLY. This feature is here to allow the server to be + // started by root, but ensure the creation of files is + // done as an unpriviliged user. + // + // It is worth noting that using seteuid and setreuid were + // originally tried here, but would cause problems that + // looked very similar to issues when the memorylocked size + // was to small. I suspect changing the process IDs caused + // that limit to change. + + // Create data structures on stack to use in calls to getgrnam_r + // and getpwnam_r. These are used instead of getgrnam and + // getpwnam because some problems were seen with files + // getting assigned strange uids when multiple files were + // being sent to a server simultaneously. I speculate this + // was caused by multiple threads simultaneously calling these + // which, according to the man page, recycle the same memory. + struct passwd pwd; + struct passwd *passwd=NULL; // will be set to either NULL or &pwd + struct group grp; + struct group *group=NULL; // will be set to either NULL or &grp + char buf[8192]; + size_t buflen = 8192; + + // Set effective gid if specified by user + if( HDRDMA_GROUPNAME.length()>0 ){ + getgrnam_r(HDRDMA_GROUPNAME.c_str(), &grp, buf, buflen, &group); + //auto group = getgrnam(HDRDMA_GROUPNAME.c_str()); + if( !group ){ + cerr << "Unknown group name \"" << HDRDMA_GROUPNAME << "\"!" << endl; + exit(-53); + } + cout << "Setting fsgid to " << group->gr_gid << " (group=" << group->gr_name << ")" << endl; + if( setfsgid(group->gr_gid) != 0 ){ + perror("setegid() error"); + } + } + + // Set effective uid if specified by user + if( HDRDMA_USERNAME.length()>0 ){ + getpwnam_r(HDRDMA_USERNAME.c_str(), &pwd, buf, buflen, &passwd); + //auto passwd = getpwnam(HDRDMA_USERNAME.c_str()); + if( !passwd ){ + cerr << "Unknown username \"" << HDRDMA_USERNAME << "\"!" << endl; + exit(-52); + } + if( HDRDMA_GROUPNAME.empty() ){ + // User did not explicitly set group name so set it to default group + if(VERBOSE>1)cout << "Setting fsgid to " << passwd->pw_gid << " (default for user " << passwd->pw_name << ")" << endl; + if( setfsgid(passwd->pw_gid) != 0 ){ + perror("setefsgid() error"); + } + } + cout << "Setting fsuid to " << passwd->pw_uid << " (username=" << passwd->pw_name << ")" << endl; + if( setfsuid(passwd->pw_uid) != 0 ){ + perror("setfsuid() error"); + } + } +} + //------------------------------------------------------------- // PostWR // @@ -237,6 +328,17 @@ void hdRDMAThread::PostWR( int id ) auto ret = ibv_post_recv( qp, &wr, &bad_wr); if( ret != 0 ){ cout << "ERROR: ibv_post_recv returned non zero value (" << ret << ")" << endl; + + struct ibv_qp_attr attr; + struct ibv_qp_init_attr init_attr; + ibv_query_qp(qp, &attr, IBV_QP_STATE, &init_attr); + if(attr.qp_state == IBV_QPS_RTR ){ + cerr << "QP is in RTR state" << endl; + }else if(attr.qp_state == IBV_QPS_RTS){ + cerr << "QP is in RTS state" << endl; + }else{ + cerr << "QP is not in RTR or RTS state (" << attr.qp_state << ")" << endl; + } } } @@ -257,8 +359,8 @@ void hdRDMAThread::ExchangeQPInfo( int sockfd ) // Create a new QP to use with the remote peer. CreateQP(); - // Create a work receive request for each MR buffer we have - for( uint32_t id=0; id1) cout << "Setting QP to init state" << endl; ret = ibv_modify_qp (qp, &qp_attr, IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS); @@ -373,8 +478,9 @@ int hdRDMAThread::SetToRTS(void) qp_attr.ah_attr.dlid = remote_qpinfo.lid, qp_attr.ah_attr.sl = IB_SL, qp_attr.ah_attr.src_path_bits = 0, - qp_attr.ah_attr.port_num = hdrdma->port_num, + qp_attr.ah_attr.port_num = hdrdma->port_num; + if(VERBOSE>1) cout << "Setting QP to RTR state" << endl; ret = ibv_modify_qp(qp, &qp_attr, IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | @@ -395,8 +501,9 @@ int hdRDMAThread::SetToRTS(void) qp_attr.retry_cnt = 7, qp_attr.rnr_retry = 7, qp_attr.sq_psn = 0, - qp_attr.max_rd_atomic = 1, + qp_attr.max_rd_atomic = 1; + if(VERBOSE>1) cout << "Setting QP to RTS state" << endl; ret = ibv_modify_qp (qp, &qp_attr, IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | @@ -428,16 +535,24 @@ void hdRDMAThread::ReceiveBuffer(uint8_t *buff, uint32_t buff_len) ofs = nullptr; } ofilename = (char*)&hi->payload; - cout << "Receiving file: " << ofilename << endl; - + if(VERBOSE>1)cout << "Receiving file: " << ofilename << endl; + // Create parent directory path if specified by remote sender - cout << "hi->flags: 0x" << std::hex << hi->flags << std::dec << endl; + if(VERBOSE>2)cout << "hi->flags: 0x" << std::hex << hi->flags << std::dec << endl; if( hi->flags & HI_MAKE_PARENT_DIRS ){ auto pos = ofilename.find_last_of('/'); + if(VERBOSE>2) cout << "Making directory: " << ofilename.substr(0, pos) << endl; if( pos != std::string::npos ) makePath( ofilename.substr(0, pos) ); } + if(VERBOSE>2) cout << "Opening output file: " << ofilename << endl; ofs = new std::ofstream( ofilename.c_str() ); + if( ! ofs->is_open() ){ + cerr << "Unable to create file: " << ofilename << endl; + return; + }else if(VERBOSE>2){ + cout << "Successfully opened output file: " << ofilename << endl; + } ofilesize = 0; crcsum = adler32( 0L, Z_NULL, 0 ); calculate_checksum = (hi->flags & HI_CALCULATE_CHECKSUM); // optionally calculate checksum @@ -446,6 +561,10 @@ void hdRDMAThread::ReceiveBuffer(uint8_t *buff, uint32_t buff_len) t_last = t1; // used for intermediate rate calculations delta_t_io = 0.0; Ntransferred = 0; + + // Add filename to list of files currently being received + std::lock_guard lck(HDRDMA_RECV_FNAMES_MUTEX); + HDRDMA_RECV_FNAMES.insert(ofilename); } if( !ofs ){ @@ -453,6 +572,7 @@ void hdRDMAThread::ReceiveBuffer(uint8_t *buff, uint32_t buff_len) return; } + // Write buffer payload to file auto data = &buff[hi->header_len]; auto data_len = buff_len - hi->header_len; @@ -470,13 +590,18 @@ void hdRDMAThread::ReceiveBuffer(uint8_t *buff, uint32_t buff_len) if( t_last != t1 ) cout << endl; // print carriage return if we printed any intermediate progress if( ofs ){ auto t_io_start = high_resolution_clock::now(); + ofs->flush(); ofs->close(); + if(VERBOSE>2) cout << "Closed output file: " << ofilename << endl; auto t_io_end = high_resolution_clock::now(); duration duration_io = duration_cast>(t_io_end-t_io_start); delta_t_io += duration_io.count(); - ofs->close(); delete ofs; ofs = nullptr; + NFILES_RECEIVED_TOT++; + + std::lock_guard lck(HDRDMA_RECV_FNAMES_MUTEX); + HDRDMA_RECV_FNAMES.erase(ofilename); } // auto t2 = high_resolution_clock::now(); // duration delta_t = duration_cast>(t2-t1); @@ -597,18 +722,13 @@ void hdRDMAThread::SendFile(std::string srcfilename, std::string dstfilename, bo std::string mess = delete_after_send ? " - will be deleted after send":""; cout << "Sending file: " << srcfilename << "-> (" << HDRDMA_REMOTE_ADDR << ":)" << dstfilename << " (" << filesize_GB << " GB)" << mess << endl; - struct ibv_send_wr wr, *bad_wr = nullptr; - struct ibv_sge sge; - bzero( &wr, sizeof(wr) ); - bzero( &sge, sizeof(sge) ); - - wr.opcode = IBV_WR_SEND; - wr.sg_list = &sge; - wr.num_sge = 1; - wr.send_flags = IBV_SEND_SIGNALED, - - sge.lkey = hdrdma->mr->lkey; - + // This is not an ideal pattern, but it is a minimal refactoring that ensures + // the ibv_send_wr and ibv_sge structs live unchanged for the life of the + // ibv send calls. + int max_buffer_sends = 1000; + std::vector wr_vec(1000); + std::vector sge_vec(1000); + // Send buffers crcsum = adler32( 0L, Z_NULL, 0 ); t1 = high_resolution_clock::now(); @@ -617,7 +737,22 @@ void hdRDMAThread::SendFile(std::string srcfilename, std::string dstfilename, bo uint64_t bytes_left = filesize; uint32_t Noutstanding_writes = 0; double delta_t_io = 0.0; - for(int i=0; i<1000; i++){ // if sending more than 1000 buffers something is wrong! + for(int i=0; imr->lkey; + auto id = i%buffers.size(); auto &buffer = buffers[id]; auto buff = std::get<0>(buffer); @@ -665,10 +800,15 @@ void hdRDMAThread::SendFile(std::string srcfilename, std::string dstfilename, bo // Optionally calculate cehcksum if( calculate_checksum ) crcsum = adler32( crcsum, (uint8_t*)payload_ptr, bytes_payload ); + // Print optional debugging message + if( VERBOSE>1 ){ + _DBG_ << "\nSending " << sge.length << " bytes (" << bytes_payload << " payload) hi->flags=" << hi->flags << " wr_id=" << wr.wr_id << std::endl; + } + // Post write auto ret = ibv_post_send( qp, &wr, &bad_wr ); if( ret != 0 ){ - cout << "ERROR: ibv_post_send returned non zero value (" << ret << ")" << endl; + _DBG_ << "ERROR: ibv_post_send returned non zero value (" << ret << ")" << endl; break; } Noutstanding_writes++; @@ -686,7 +826,12 @@ void hdRDMAThread::SendFile(std::string srcfilename, std::string dstfilename, bo // If we've posted data using all available sections of the mr // then we need to wait for one to finish so we can recycle it. - if( Noutstanding_writes>=buffers.size() ){ + if(VERBOSE>1) _DBG_ << "Noutstanding_writes="<=buffers.size() ){ + if( Noutstanding_writes>=1 ){ PollCQ(); Noutstanding_writes--; } @@ -720,9 +865,24 @@ void hdRDMAThread::SendFile(std::string srcfilename, std::string dstfilename, bo if( calculate_checksum ) cout << " checksum: " << std::hex << crcsum << std::dec << endl; //cout << " IB rate sending file: " << delta_t.count()-delta_t_io << " sec (" << rate_ib_Gbps << " Gbps) - n.b. don't take this seriously!" << endl; - if( delete_after_send ){ - unlink( srcfilename.c_str() ); - cout <<" Deleted src file: " << srcfilename << endl; + // Verify file was completely sent by checking file size on remote host + string response = SendControlCommand( HDRDMA_REMOTE_ADDR, string("get_file_size ") + dstfilename); +// cout << "response: " << response << endl; + std::vector vals; + std::istringstream iss( response ); + copy( std::istream_iterator(iss), std::istream_iterator(), back_inserter(vals) ); + int64_t fsize = 0; + if( vals.size()>1 ) fsize = atoll( vals[1].c_str() ); + if( fsize == filesize ){ + cout << " Confirmed remote file size matches local: " << fsize << " bytes" << endl; + if( delete_after_send ) { + unlink(srcfilename.c_str()); + cout << " Deleted src file: " << srcfilename << endl; + } + }else{ + cerr << "Local and remote file sizes do not match after send! (" << filesize << " != " << fsize << ")" << endl; + cerr << "response from server was: " << response << endl; + HDRDMA_RET_VAL = -1; } } @@ -743,6 +903,7 @@ void hdRDMAThread::PollCQ(void) // Check to see if a work completion notification has come in int n = ibv_poll_cq(cq, num_wc, &wc); + if( VERBOSE>1) _DBG_ << "polled completion queue. ibv_poll_cq() result: n= " << n << " wc.wr_id=" << wc.wr_id <<"\n"; if( n<0 ){ std::stringstream ss; ss << "ERROR: ibv_poll_cq returned " << n << " - closing connection"; @@ -751,7 +912,25 @@ void hdRDMAThread::PollCQ(void) if( n == 0 ){ std::this_thread::sleep_for(std::chrono::microseconds(1)); continue; - } + } + + // If we get here then n should equal 1 + if( VERBOSE>1){ + + _DBG_ << "wc.status=" << wc.status << " (IBV_WC_WR_FLUSH_ERR=" << IBV_WC_WR_FLUSH_ERR << ")\n"; + + struct ibv_qp_attr attr; + struct ibv_qp_init_attr init_attr; + int ret = ibv_query_qp(qp, &attr, IBV_QP_STATE, &init_attr); + if(ret) { + perror("ibv_query_qp"); + } else { + std::cout << "Current QP state: " << attr.qp_state << std::endl; + if(attr.qp_state != IBV_QPS_RTS) { + std::cerr << "QP is not in RTS state("< #include #include +#include #include @@ -59,6 +60,7 @@ class hdRDMAThread{ ~hdRDMAThread(); void ThreadRun(int sockfd); + void SetUIDGID(void); void PostWR( int id ); // id= index to buffers void ExchangeQPInfo( int sockfd ); void CreateQP(void); diff --git a/hdRDMAcontrol.cc b/hdRDMAcontrol.cc new file mode 100644 index 0000000..69a64e9 --- /dev/null +++ b/hdRDMAcontrol.cc @@ -0,0 +1,169 @@ +// +// Created by hdsys on 11/21/19. +// + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +using namespace std; + +#include "hdRDMAcontrol.h" + +extern bool QUIT; +extern void *HDRDMA_ZEROMQ_CONTEXT; +extern int HDRDMA_RET_VAL; + +extern std::mutex HDRDMA_RECV_FNAMES_MUTEX; +extern std::set HDRDMA_RECV_FNAMES; + +extern atomic BYTES_RECEIVED_TOT; +extern atomic NFILES_RECEIVED_TOT; +extern double HDRDMA_LAST_RATE_10sec; +extern double HDRDMA_LAST_RATE_1min; +extern double HDRDMA_LAST_RATE_5min; + +//------------------------------------------------------------- +// hdRDMAcontrol +//------------------------------------------------------------- +hdRDMAcontrol::hdRDMAcontrol(int port):port(port) +{ + + cout << "Launching hdRDMAcontrol thread ..." << endl; + thr = new std::thread( &hdRDMAcontrol::ServerLoop, this ); + +} + +//------------------------------------------------------------- +// ~hdRDMAcontrol +//------------------------------------------------------------- +hdRDMAcontrol::~hdRDMAcontrol(void) +{ + done = true; + if(thr != nullptr) { + cout << "Joining hdRDMAcontrol thread ..." << endl; + thr->join(); + cout << "hdRDMAcontrol thread joined." << endl; + } +} + +//------------------------------------------------------------- +// ServerLoop +//------------------------------------------------------------- +void hdRDMAcontrol::ServerLoop(void) +{ + cout << "hdRDMAcontrol::Publish called" << endl; + + pthread_setname_np( pthread_self(), "hdRDMAcontrol" ); + +#if HAVE_ZEROMQ + char bind_str[256]; + sprintf( bind_str, "tcp://*:%d", port ); + + void *responder = zmq_socket( HDRDMA_ZEROMQ_CONTEXT, ZMQ_REP ); + int rc = zmq_bind( responder, bind_str); + if( rc != 0 ){ + cout << "Unable to bind zeroMQ control socket " << port << "!" << endl; + perror("zmq_bind"); + return; + } + + // All messages will include host + char host[256]; + gethostname( host, 256 ); + + // Loop until told to quit + while( !done && ! QUIT){ + + // Listen for request (non-blocking) + char buff[2048]; + auto rc = zmq_recv( responder, buff, 2048, ZMQ_DONTWAIT); + if( rc< 0 ){ + if( (errno==EAGAIN) || (errno==EINTR) ){ + std::this_thread::sleep_for(std::chrono::milliseconds(250)); + continue; + }else{ + cerr << "ERROR listening on control socket: errno=" << errno << endl; + done = true; + continue; + } + } + + // Split request into tokens + std::vector vals; + istringstream iss( std::string(buff, rc) ); + copy( istream_iterator(iss), istream_iterator(), back_inserter(vals) ); + + // Response + stringstream ss; + + if( vals.empty()){ + ss << ""; + }else if( vals[0] == "quit" ){ + QUIT = true; + done = true; + HDRDMA_RET_VAL = vals.size()>1 ? atoi(vals[1].c_str()):-1; // allow remote user to set return value. default to -1 so system service will restart + ss << "OK"; + }else if( vals[0] == "reset_counters" ){ + BYTES_RECEIVED_TOT = 0; + NFILES_RECEIVED_TOT = 0; + HDRDMA_LAST_RATE_10sec = 0.0; + HDRDMA_LAST_RATE_1min = 0.0; + HDRDMA_LAST_RATE_5min = 0.0; + ss << "OK"; + }else if( vals[0] == "get_file_size" ){ // mulitple files may be specified + if( vals.size()<2){ + ss << ""; + }else{ + for( uint32_t i=1; i +#include +#include + +#if HAVE_ZEROMQ +#include +#endif // HAVE_ZEROMQ + +class hdRDMAcontrol{ + public: + + hdRDMAcontrol(int port); + ~hdRDMAcontrol(void); + + void ServerLoop(void); + + bool done = false; + int port = 0; + std::thread *thr = nullptr; + + +}; + +#endif //PACKAGES_HDRDMACONTROL_H diff --git a/hdRDMAstats.cc b/hdRDMAstats.cc new file mode 100644 index 0000000..7e72a0c --- /dev/null +++ b/hdRDMAstats.cc @@ -0,0 +1,268 @@ +// +// This is used to periodically publish zeroMQ pub/sub messages +// with JSON formatted data regarding status of the process. +// It is only used if the hdrdmacp command was invoced with the +// -zp option to specify the tcp port on which to publish. +// +// + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +using namespace std; + +#include "hdRDMAstats.h" + +// The following are convenience macros that make +// the code below a little more succinct and easier +// to read. It changes this: +// +// JSONADD(key) << val; +// +// to +// +// ss<<"\n,\""< BYTES_RECEIVED_TOT; +extern atomic NFILES_RECEIVED_TOT; +extern double HDRDMA_LAST_RATE_10sec; +extern double HDRDMA_LAST_RATE_1min; +extern double HDRDMA_LAST_RATE_5min; +extern uint64_t HDRDMA_BUFF_LEN_GB; +extern uint64_t HDRDMA_NUM_BUFF_SECTIONS; + + +//------------------------------------------------------------- +// hdRDMAstats +//------------------------------------------------------------- +hdRDMAstats::hdRDMAstats(int port):port(port) +{ + cout << "Launching hdRDMAstats thread ..." << endl; + thr = new std::thread( &hdRDMAstats::Publish, this ); + +} + +//------------------------------------------------------------- +// ~hdRDMAstats +//------------------------------------------------------------- +hdRDMAstats::~hdRDMAstats(void) +{ + done =true; + if(thr != nullptr) { + cout << "Joining hdRDMAstats thread ..." << endl; + thr->join(); + cout << "hdRDMAstats thread joined." << endl; + } + +} + +//------------------------------------------------------------- +// Publish +//------------------------------------------------------------- +void hdRDMAstats::Publish(void) +{ + cout << "hdRDMAstats::Publish called" << endl; + + pthread_setname_np( pthread_self(), "hdRDMAstats" ); + +#if HAVE_ZEROMQ + char bind_str[256]; + sprintf( bind_str, "tcp://*:%d", HDRDMA_ZEROMQ_STATS_PORT ); + + if( HDRDMA_ZEROMQ_CONTEXT == nullptr ) HDRDMA_ZEROMQ_CONTEXT = zmq_ctx_new(); + void *publisher = zmq_socket( HDRDMA_ZEROMQ_CONTEXT, ZMQ_PUB ); + int rc = zmq_bind( publisher, bind_str); + if( rc != 0 ){ + cout << "Unable to bind zeroMQ stats socket " << HDRDMA_ZEROMQ_STATS_PORT << "!" << endl; + perror("zmq_bind"); + return; + } + + // All messages will include host + char host[256]; + gethostname( host, 256 ); + + // Loop until told to quit + while( !done ){ + + // Create JSON string + stringstream ss; + ss << "{\n"; + ss << "\"program\":\"hdrdmacp\""; + + JSONADS( "version", HDRDMA_VERSION); + JSONADS( "host" ,host ); + JSONADD( "Nfiles" ) << NFILES_RECEIVED_TOT; + JSONADD( "TB_received" ) << BYTES_RECEIVED_TOT*1.0E-12; + JSONADD( "avg10sec" ) << HDRDMA_LAST_RATE_10sec; // GB/s (calculated in hdrdmacp.cc) + JSONADD( "avg1min" ) << HDRDMA_LAST_RATE_1min; // GB/s (calculated in hdrdmacp.cc) + JSONADD( "avg5min" ) << HDRDMA_LAST_RATE_5min; // GB/s (calculated in hdrdmacp.cc) + + // Get current system info from /proc + map vals; + HostStatusPROC(vals); + + // Get disk info + GetDiskSpace("/media/ramdisk", vals); + GetDiskSpace("/data1", vals); + GetDiskSpace("/data2", vals); + GetDiskSpace("/data3", vals); + GetDiskSpace("/data4", vals); + + for( auto p : vals ) JSONADD(p.first) << p.second; + + ss << "\n}\n"; + + // Publish record + zmq_send( publisher, ss.str().c_str(), ss.str().length(), 0 ); + + // Only send updates once every 3 seconds. According to + // zeroMQ documentation, this doesn't send anything out + // if there are no current subscribers. + sleep(3); + } + + if(publisher != nullptr) zmq_close(publisher); +#else + cout << "No ZMQ support. Stats will not be published." << endl; +#endif // HAVE_ZEROMQ +} + +//--------------------------------- +// HostStatusPROC (Linux) +//--------------------------------- +void hdRDMAstats::HostStatusPROC(std::map &vals) +{ + /// Get host info using the /proc mechanism on Linux machines. + /// This returns the CPU usage/idle time. In order to work, it + /// it needs to take two measurements separated in time and + /// calculate the difference. So that we don't linger here + /// too long, we maintain static members to keep track of the + /// previous reading and take the delta with that. + + //------------------ CPU Usage ---------------------- + static time_t last_time = 0; + static double last_user = 0.0; + static double last_nice = 0.0; + static double last_sys = 0.0; + static double last_idle = 0.0; + static double delta_user = 0.0; + static double delta_nice = 0.0; + static double delta_sys = 0.0; + static double delta_idle = 1.0; + + time_t now = time(NULL); + if(now > last_time){ + ifstream ifs("/proc/stat"); + if( ifs.is_open() ){ + string cpu; + double user, nice, sys, idle; + + ifs >> cpu >> user >> nice >> sys >> idle; + ifs.close(); + + delta_user = user - last_user; + delta_nice = nice - last_nice; + delta_sys = sys - last_sys; + delta_idle = idle - last_idle; + last_user = user; + last_nice = nice; + last_sys = sys; + last_idle = idle; + + last_time = now; + } + } + + double norm = delta_user + delta_nice + delta_sys + delta_idle; + double user_percent = 100.0*delta_user/norm; + double nice_percent = 100.0*delta_nice/norm; + double sys_percent = 100.0*delta_sys /norm; + double idle_percent = 100.0*delta_idle/norm; + double cpu_usage = 100.0 - idle_percent; + + vals["cpu_user" ] = user_percent; + vals["cpu_nice" ] = nice_percent; + vals["cpu_sys" ] = sys_percent; + vals["cpu_idle" ] = idle_percent; + vals["cpu_total"] = cpu_usage; + + //------------------ Memory Usage ---------------------- + + // Read memory from /proc/meminfo + ifstream ifs("/proc/meminfo"); + int mem_tot_kB = 0; + int mem_free_kB = 0; + int mem_avail_kB = 0; + if(ifs.is_open()){ + char buff[4096]; + bzero(buff, 4096); + ifs.read(buff, 4095); + ifs.close(); + + string sbuff(buff); + + size_t pos = sbuff.find("MemTotal:"); + if(pos != string::npos) mem_tot_kB = atoi(&buff[pos+10+1]); + + pos = sbuff.find("MemFree:"); + if(pos != string::npos) mem_free_kB = atoi(&buff[pos+9+1]); + + pos = sbuff.find("MemAvailable:"); + if(pos != string::npos) mem_avail_kB = atoi(&buff[pos+14+1]); + } + + // RAM + // reported RAM from /proc/memInfo apparently excludes some amount + // claimed by the kernel. To get the correct amount in GB, I did a + // linear fit to the values I "knew" were correct and the reported + // values in kB for several machines. + int mem_tot_GB = (int)round(0.531161471 + (double)mem_tot_kB*9.65808E-7); + vals["ram_tot_GB"] = mem_tot_GB; + vals["ram_free_GB"] = mem_free_kB*1.0E-6; + vals["ram_avail_GB"] = mem_avail_kB*1.0E-6; + vals["HDRDMA_BUFF_LEN_GB"] = HDRDMA_BUFF_LEN_GB; + vals["HDRDMA_NUM_BUFF_SECTIONS"] = HDRDMA_NUM_BUFF_SECTIONS; +} + +//--------------------------------- +// GetDiskSpace +//--------------------------------- +void hdRDMAstats::GetDiskSpace(std::string dirname, std::map &vals) +{ + // Attempt to get stats on the disk specified by dirname. + // If found, they are added to vals. If no directory by + // that name is found then nothing is added to vals and + // this returns quietly. + + struct statvfs vfs; + int err = statvfs(dirname.c_str(), &vfs); + if( err != 0 ) return; + + double total_GB = vfs.f_bsize * vfs.f_blocks * 1.0E-9; + double avail_GB = vfs.f_bsize * vfs.f_bavail * 1.0E-9; + double used_GB = total_GB-avail_GB; + double used_percent = 100.0*used_GB/total_GB; + + vals[dirname+"_tot"] = total_GB; + vals[dirname+"_avail"] = avail_GB; + vals[dirname+"_used"] = used_GB; + vals[dirname+"_percent_used"] = used_percent; +} + diff --git a/hdRDMAstats.h b/hdRDMAstats.h new file mode 100644 index 0000000..379babf --- /dev/null +++ b/hdRDMAstats.h @@ -0,0 +1,28 @@ + + +#include +#include +#include + +#if HAVE_ZEROMQ +#include +#endif // HAVE_ZEROMQ + +class hdRDMAstats{ + + public: + hdRDMAstats(int port); + ~hdRDMAstats(void); + + void Publish(void); + void HostStatusPROC(std::map &vals); + void GetDiskSpace(std::string dirname, std::map &vals); + + bool done = false; + int port = 0; + std::thread *thr = nullptr; +}; + + + + diff --git a/hdrdmacp.cc b/hdrdmacp.cc index c0e6ff0..51e1b53 100644 --- a/hdrdmacp.cc +++ b/hdrdmacp.cc @@ -1,16 +1,29 @@ #include +#include +#include +#include +#include //#include #include +#include +#include #include #include +#include +#include using namespace std; using std::chrono::steady_clock; using std::chrono::duration; using std::chrono::duration_cast; +int VERBOSE=1; +bool QUIT = false; +const char *HDRDMA_VERSION = "1.0.2"; + +int HDRDMA_RET_VAL = 0; bool HDRDMA_IS_SERVER = false; bool HDRDMA_IS_CLIENT = false; @@ -23,13 +36,28 @@ std::string HDRDMA_DSTFILENAME = ""; bool HDRDMA_DELETE_AFTER_SEND = false; bool HDRDMA_CALCULATE_CHECKSUM = false; bool HDRDMA_MAKE_PARENT_DIRS = false; -uint64_t HDRDMA_BUFF_LEN_GB = 0; // defaults differ for server and client modes -uint64_t HDRDMA_NUM_BUFF_SECTIONS = 0; // so these are set in ParseCommandLineArguments +string HDRDMA_USERNAME = ""; +string HDRDMA_GROUPNAME = ""; +int HDRDMA_ZEROMQ_STATS_PORT = 10471; // port to publish zeroMQ messages about our stats to. 0=don't publish +int HDRDMA_ZEROMQ_CONTROL_PORT = 10472; // port to publish zeroMQ messages about our stats to. 0=don't publish +string HDRDMA_COMMAND_HOST; +string HDRDMA_COMMAND; +void *HDRDMA_ZEROMQ_CONTEXT = nullptr; // will be set by hdRDMAstats or hdRDMAcontrol (whichever is called first) +uint64_t HDRDMA_BUFF_LEN_GB = 0; // defaults differ for server and client modes +uint64_t HDRDMA_NUM_BUFF_SECTIONS = 0; // so these are set in ParseCommandLineArguments +std::mutex HDRDMA_RECV_FNAMES_MUTEX; +std::set HDRDMA_RECV_FNAMES; atomic BYTES_RECEIVED_TOT(0); +atomic NFILES_RECEIVED_TOT(0); +double HDRDMA_LAST_RATE_10sec=0.0; +double HDRDMA_LAST_RATE_1min=0.0; +double HDRDMA_LAST_RATE_5min=0.0; +string SendControlCommand(string host, string command); +int GetTotalSystemRAMGB(void); void ParseCommandLineArguments( int narg, char *argv[] ); -void Usage(void); +void Usage(bool show_extended=false); //------------------------------------------------------------- @@ -39,9 +67,6 @@ int main(int narg, char *argv[]) { ParseCommandLineArguments( narg, argv ); - // Create an hdRDMA object - hdRDMA hdrdma; - // Listen for remote peers if we are in server mode // This will launch a thread and listen for any remote connections. // Any that are made will have their RDMA transfer information setup @@ -49,6 +74,20 @@ int main(int narg, char *argv[]) // This will return right away so one must check the GetNpeers() method // to see when a connection is made. if( HDRDMA_IS_SERVER ){ + + // Create an hdRDMA object + hdRDMA hdrdma; + + // Setup network connections for external control and periodic + // publication of stats (if ZMQ support compiled in) + hdRDMAstats *hdrdmastats = nullptr; + hdRDMAcontrol *hdrdmacontrol = nullptr; +#ifdef HAVE_ZEROMQ + HDRDMA_ZEROMQ_CONTEXT = zmq_ctx_new(); + if( HDRDMA_ZEROMQ_STATS_PORT ) hdrdmastats = new hdRDMAstats( HDRDMA_ZEROMQ_STATS_PORT ); + if( HDRDMA_ZEROMQ_CONTROL_PORT ) hdrdmacontrol = new hdRDMAcontrol( HDRDMA_ZEROMQ_CONTROL_PORT ); +#endif // HAVE_ZEROMQ + hdrdma.Listen( HDRDMA_LOCAL_PORT ); // We want to report 10sec, 1min, and 5min averages @@ -59,7 +98,9 @@ int main(int narg, char *argv[]) uint64_t last_bytes_received_1min = BYTES_RECEIVED_TOT; uint64_t last_bytes_received_5min = BYTES_RECEIVED_TOT; - while( true ){ + pthread_setname_np( pthread_self(), "main" ); + + while( !QUIT ){ hdrdma.Poll(); @@ -74,25 +115,28 @@ int main(int narg, char *argv[]) if( delta_t_10sec >= 10.0 ){ double GB_received = (BYTES_RECEIVED_TOT - last_bytes_received_10sec)/1000000000; double rate = GB_received/delta_t_10sec; - cout << "=== [10 sec avg.] " << rate << " GB/s -- " << (double)BYTES_RECEIVED_TOT/1.0E12 << " TB total received" << endl; + if( VERBOSE>2 )cout << "=== [10 sec avg.] " << rate << " GB/s -- " << (double)BYTES_RECEIVED_TOT/1.0E12 << " TB total received" << endl; t_last_10sec = now; last_bytes_received_10sec = BYTES_RECEIVED_TOT; + HDRDMA_LAST_RATE_10sec= rate; } if( delta_t_1min >= 60.0 ){ double GB_received = (BYTES_RECEIVED_TOT - last_bytes_received_1min)/1000000000; double rate = GB_received/delta_t_1min; - cout << "=== [ 1 min avg.] " << rate << " GB/s" << endl; + if( VERBOSE>1 )cout << "=== [ 1 min avg.] " << rate << " GB/s" << endl; t_last_1min = now; last_bytes_received_1min = BYTES_RECEIVED_TOT; + HDRDMA_LAST_RATE_1min= rate; } if( delta_t_5min >= 300.0 ){ double GB_received = (BYTES_RECEIVED_TOT - last_bytes_received_5min)/1000000000; double rate = GB_received/delta_t_5min; - cout << "=== [ 5 min avg.] " << rate << " GB/s" << endl; + if( VERBOSE>0 )cout << "=== [ 5 min avg.] " << rate << " GB/s" << endl; t_last_5min = now; last_bytes_received_5min = BYTES_RECEIVED_TOT; + HDRDMA_LAST_RATE_5min= rate; } std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) ); @@ -100,7 +144,23 @@ int main(int narg, char *argv[]) // Stop server from listening (if one is) hdrdma.StopListening(); - } + + // Stop stats reporting + if( hdrdmastats != nullptr ) delete hdrdmastats; + if( hdrdmacontrol != nullptr ) delete hdrdmacontrol; + + // At this point we have shut most things down but there are still a few + // threads running that are stuck in blocking calls that are just not + // easy to kill. Thus, we harshly kill the entire process to force the + // issue. + // n.b. we set an exit value of -1 so the system service will re-start + // automatically (if we're being run that way). + _exit(HDRDMA_RET_VAL); + +#ifdef HAVE_ZEROMQ + zmq_ctx_destroy( HDRDMA_ZEROMQ_CONTEXT ); // we'll never get here due to above call. This would block forever anyway. +#endif // HAVE_ZEROMQ + } // Connect to remote peer if we are in client mode. // This will attempt to connect to an hdRDMA object listening on the @@ -109,10 +169,77 @@ int main(int narg, char *argv[]) // be made available for transfers. If the connection cannot be made // then it will exit the program with an error message. if( HDRDMA_IS_CLIENT ){ + // Create an hdRDMA object + hdRDMA hdrdma; + hdrdma.Connect( HDRDMA_REMOTE_ADDR, HDRDMA_REMOTE_PORT ); hdrdma.SendFile( HDRDMA_SRCFILENAME, HDRDMA_DSTFILENAME, HDRDMA_DELETE_AFTER_SEND, HDRDMA_CALCULATE_CHECKSUM, HDRDMA_MAKE_PARENT_DIRS ); } + + // Send control command to remote hdrdmacp process being run in server mode + if( !HDRDMA_COMMAND_HOST.empty() ) cout << SendControlCommand(HDRDMA_COMMAND_HOST, HDRDMA_COMMAND) << endl; + return HDRDMA_RET_VAL; +} + +//------------------------------------------------------------- +// SendControlCommand +// +// Send a control command to a remote hdrdmacp process running in +// server mode. The response is returned in the form of a string. +//------------------------------------------------------------- +string SendControlCommand(string host, string command) +{ + +#ifndef HAVE_ZEROMQ + return string("Unable to send control command due to zeroMQ support not being compiled in."); +#else + try{ + char conn_str[256]; + sprintf( conn_str, "tcp://%s:%d", host.c_str(), HDRDMA_ZEROMQ_CONTROL_PORT ); + + HDRDMA_ZEROMQ_CONTEXT = zmq_ctx_new(); + void *requester = zmq_socket( HDRDMA_ZEROMQ_CONTEXT, ZMQ_REQ ); + zmq_connect( requester, conn_str); + + vector buff( 20480, 0); + zmq_send( requester, command.c_str(), command.size(), 0 ); + zmq_recv( requester, buff.data(), buff.size(), 0); + + zmq_close(requester); + zmq_ctx_destroy( HDRDMA_ZEROMQ_CONTEXT ); + + return string( buff.data() ); + }catch(...){ + return string("Error sending zmq control message"); + } +#endif // HAVE_ZEROMQ + +} + +//------------------------------------------------------------- +// GetTotalSystemRAMGB +// +// Returns the total system memory in GB. This is only used +// when automatically determining amount to allocate when in +// server mode. +//------------------------------------------------------------- +int GetTotalSystemRAMGB(void) +{ + // Read memory from /proc/meminfo + ifstream ifs("/proc/meminfo"); + if(ifs.is_open()){ + char buff[4096]; + bzero(buff, 4096); + ifs.read(buff, 4095); + ifs.close(); + + string sbuff(buff); + + size_t pos = sbuff.find("MemTotal:"); + if(pos != string::npos) return atoi(&buff[pos+10+1])/1E6; + } + return 0; } @@ -121,14 +248,18 @@ int main(int narg, char *argv[]) //------------------------------------------------------------- void ParseCommandLineArguments( int narg, char *argv[] ) { + + if( narg ==1 ) { Usage(); exit(0); } + std::vector vfnames; for( int i=1; i8 ) HDRDMA_BUFF_LEN_GB = 8; + } + if( HDRDMA_NUM_BUFF_SECTIONS == 0 ){ + HDRDMA_NUM_BUFF_SECTIONS = 4*HDRDMA_BUFF_LEN_GB; + if( HDRDMA_NUM_BUFF_SECTIONS<8 ) HDRDMA_NUM_BUFF_SECTIONS = 8; + } } } //------------------------------------------------------------- // Usage //------------------------------------------------------------- -void Usage(void) +void Usage(bool show_extended) { cout << endl; cout << "Hall-D RDMA file copy server/client" <= 3 ): PORT = int(sys.argv[2]) + + +# Connect to host +print('Connecting to ' + HOST + ':' + str(PORT) + ' ...') + +# Create socket +try: + context = zmq.Context() + socket = context.socket(zmq.SUB) + socket.connect('tcp://' + HOST + ':' + str(PORT)) + socket.setsockopt_string(zmq.SUBSCRIBE, '') # Accept all messages from publisher +except: + print('Error connecting to host or subscribing to service') + print(sys.exc_info()[0]) + sys.exit() + +print('Subscribed to hdrdmacp server for status info. Waiting ....') + +start_time = time.time() +while True: + try: + string = socket.recv_string(flags=zmq.NOBLOCK) + print('Received string:\n' + string + '\n') + break + #myinfo = json.loads( string ) + #myinfo['received'] = time.time() + + #app.Add_hdrdmacp_HostInfo( myinfo ) + + except zmq.Again as e: + if( (time.time() - start_time) >= MAX_WAIT_TIME ): + print('No messages received in %f seconds. Giving up.' % MAX_WAIT_TIME) + sys.exit() + time.sleep(1) diff --git a/sysconfig/hdrdmacp b/sysconfig/hdrdmacp new file mode 100644 index 0000000..e5eec14 --- /dev/null +++ b/sysconfig/hdrdmacp @@ -0,0 +1,3 @@ +# Set up environment for hdrdmacp service + +PATH=/gluex/builds/devel/Linux_RHEL9-x86_64-gcc11/bin:${PATH} diff --git a/systemd/README b/systemd/README new file mode 100644 index 0000000..4e7e2fc --- /dev/null +++ b/systemd/README @@ -0,0 +1,23 @@ + + +This directory contains files that can be used to run hdrdmacp +in server mode as a service on systems supporting systemd +(e.g. RHEL7). To use do the following: + +1. Copy hdrdmacp.service and hdrdma.service.d to /etc/systemd/system + +2. Either modify the /etc/systemd/system/hdrdmacp.service file + to include the full path to the hdrdmacp executable or create + a /etc/systemd/system/hdrdma.service.d/environment file that + appends to PATH. You may also need to append to LD_LIBRARY_PATH + +3. Enable the service by running the following as sudo/root: + + systemctl enable hdrdmacp + + +You can then check the status with "systemctl status hdrdmacp". +You can also check the log of output with: + + journalctl -u hdrdmacp.service + diff --git a/systemd/hdrdmacp b/systemd/hdrdmacp new file mode 100644 index 0000000..3d6a943 --- /dev/null +++ b/systemd/hdrdmacp @@ -0,0 +1,46 @@ +# +# Run hdrdma in server mode as a service +# +# This can be installed on systems supporting systemd (e.g. RHEL7) as: +# +# /etc/systemd/system/hdrdmacp.service +# +# +# After installing it, the service must be enabled by sudo/root using: +# +# systemctl enable hdrdmacp +# +# +# The service will run the hdrdmacp executable as found in the PATH. +# The file pointed to by the EnvironmentFile line below can be used +# to modify PATH and LD_LIBRARY_PATH (and any other envars). +# +# +# IMPORTANT: You typically want this to create files on the local system +# as un unpriviliged user. The -u and -g options to hdrdmacp can be used +# to set these (via calls to setfsuid and setfsgid), The process will still +# run as root (which is needed for some memory limits) +#----------------------------------------------------------------------- + +# Make this service dependent on the autofs service. +# This is only for cases when the hdrdmacp program is +# accessed via network mounted file system. +# +[Unit] +Description=JLab RDMA file transfer server +Wants=autofs.service +After=autofs.service + + +[Service] +# Modify /etc/sysconfig/hdrdmacp file to set PATH and LD_LIBRARY_PATH +EnvironmentFile=-/etc/sysconfig/hdrdmacp +ExecStart=/usr/bin/bash -c 'hdrdmacp -s -q -u hdops' +Restart=on-failure +RestartSec=5s +StandardOutput=journal +StandardError=journal + + +[Install] +WantedBy=multi-user.target diff --git a/systemd/hdrdmacp.service.d/override.conf b/systemd/hdrdmacp.service.d/override.conf new file mode 100644 index 0000000..fb095d4 --- /dev/null +++ b/systemd/hdrdmacp.service.d/override.conf @@ -0,0 +1,19 @@ +# +# This is not used in the JLab deployment. It is left here +# just in case it is useful to someone else. +# +# +# This is used just to specify an optional environment file +# that can be used to append to PATH and LD_LIBRARY_PATH. +# The "-" in the EnvironmentFile line tells systemd to +# silently ignore it if the environment file doesn't exist. +# +# If you DO need to append to the path, create an environment +# file with contents like this: +# +# PATH=gluex/builds/devel/Linux_RHEL7-x86_64-gcc5.3.0/bin:${PATH} +# LD_LIBRARY_PATH=/apps/gcc/5.3.0/lib64:${LD_LIBRARY_PATH} +# + +[Service] +EnvironmentFile=-/etc/systemd/system/hdrdmacp.service.d/environment From 000946114632b7cdba1042af745397c7b5e44e80 Mon Sep 17 00:00:00 2001 From: Ben Lincoln Date: Fri, 28 Mar 2025 11:20:50 -0700 Subject: [PATCH 2/2] Merged changes from much more recent upstream revision 25767, updated README (again) --- README.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/README.md b/README.md index f67b251..29d1be3 100644 --- a/README.md +++ b/README.md @@ -131,8 +131,16 @@ Note: In the options below: -cmd host "command arg ..." send command to specified host (run with --help for extended help) + +## Notes + +1. The full filename on the destination must be specfied, not just a directory. This is not checked for automatically so the user must take care. +2. The remote host and port refer to a TCP connection that is first made to exchange the RDMA connection info. The file is then transferred via RDMA. +3. The destination port may be speficied either via the -p option or as part of the destination argument. e.g. my.remote.host:12345:/path/to/my/destfilename if both are given then the one given in the destination argument is used. +4. Transfers will fail if the destination directory does not already exist on the remote host. If you add the -P argument then the destination directory will be automatically created (if it doesn't already exist). + ## Example ### Start the server on the destination system