diff --git a/README.md b/README.md
index b722205..29d1be3 100644
--- a/README.md
+++ b/README.md
@@ -30,6 +30,50 @@ doesn't already exist.
3. This was written to run on some memory-heavy machines so the default buffer sizes
are quite large. The may be changed with some command-line options (see below).
+## Prerequisites
+
+### Libraries
+
+Building a working version of hdrdmacp requires the following libraries and their associated headers:
+
+* `libpthread`
+* `libverbs` or `libibverbs`
+* `zeromq`
+* `zlib`
+
+e.g. to build hdrdmacp on Debian 12, the following command will install the necessary packages:
+
+```
+sudo apt install rdma-core libibverbs1 librdmacm1 libibmad5 libibumad3 librdmacm1 ibverbs-providers rdmacm-utils infiniband-diags libfabric1 ibverbs-utils libzmq3-dev libibverbs-dev libz-dev
+```
+
+on Fedora 41:
+
+```
+sudo dnf install rdma-core libibverbs libibverbs-devel libibverbs-utils zeromq zeromq-devel zlib-devel
+```
+
+### Sufficient max locked memory limit
+
+Some Unix and Linux distributions default to a low value for the maximum locked memory limit, and this will cause hdrdmacp to exit with the following error:
+
+```
+ERROR: Unable to register memory region! errno=12
+ (Please see usage statement for a possible work around)
+```
+
+Before running hdrdmacp, run the `ulimit -l` command. If the output is small, such as Fedora's default of 8192, you will need to increase the limit. Debian's default of 2558652 seems to be sufficient for at least basic use, but you can also use the special `unlimited` value to remove the limit entirely.
+
+Non-`root` users are likely unable to increase their own limit, so consider one of the following options:
+
+1 - `su` to `root`, then run `ulimit -l unlimited` before running hdrdmacp.
+2 - As `root`, add the following two lines to /etc/security/limits.conf, then log off and back on:
+
+```
+@wheel hard memlock 2558652
+@wheel soft memlock 2558652
+```
+
## Building
Commands for downloading and building are below. There is a SConscript file which can be
@@ -40,14 +84,13 @@ program though, it is also easy to just build it via a single command as shown.
> cd hdrdmacp
-> c++ -o hdrdmacp *.cc -libverbs -lz
+> c++ -DHAVE_ZEROMQ=1 --std=c++11 -g -o hdrdmacp *.cc -libverbs -lz -lpthread -lzmq -I.
## Running
Run the program with "--help" to get the help statement:
-
Hall-D RDMA file copy server/client
Usage:
@@ -71,48 +114,95 @@ Note: In the options below:
options:
-c calculate checksum (adler32 currently only prints) (CMO)
-d delete source file upon successful transfer (CMO)
+ -g group set effective group (useful if run as a system service) (SMO)
-h print this usage statement.
-m GB total memory to allocate (def. 8GB for server, 1GB for client)
-n Nbuffs number of buffers to break the allocated memory into. This
will determine the size of RDMA transfer requests.
-P make parent directory path on remote host if needed (CMO)
- -p port set remote port to connect to (can also be given in dest name) (CMO)
+ -p port set remote port to connect to (can also be given in dest name) (CMO)
-s server mode (SMO)
- -sp server port to listen on (default is 10470) (SMO)
-
-NOTES:
- 1. The full filename on the destination must be specfied, not just
- a directory. This is not checked for automatically so the user
- must take care.
-
- 2. The remote host and port refer to a TCP connection that is
- first made to exchange the RDMA connection info. The file is
- then transferred via RDMA.
-
- 3. The destination port may be speficied either via the -p option
- or as part of the destination argument. e.g.
- my.remote.host:12345:/path/to/my/destfilename
- if both are given then the one given in the destination argument
- is used.
-
- 4. If you see an error about "Unable to register memory region!" then
- this may be due to the maximum locked memory size. Check this by
- running "limit" if using tcsh and looking for "memorylocked". If
- using bash, then run "ulimit -a" and look for "max locked memory".
- These should be set to "unlimited". On some of our systems this defaults
- to 64kB and would not honor global settings. A wierd work around was to
- do a "su $USER" which set it to "unlimited". (I do not understand why.)
-
-Example:
- On destination host run: hdrdmacp -s
-
- On source host run: hdrdmacp /path/to/my/srcfile my.remote.host:/path/to/my/destfile
-
-Note that the above will fail if /path/to/my does not already exist on
-my.remote.host. If you add the -P argument then /path/to/my will be
-automatically create (if it doesn't already exist).
-
-
+ -sp port server port to listen on (default is 10470) (SMO)
+ -u user set effective user (useful if run as a system service) (SMO)
+ -v increase verbosity level
+ -q quiet mode (set verbosity level to 0)
+ -zp port port to publish stats as zeroMQ messages to
+ -cp port port to listen for zeroMQ control messages on
+ -cmd host "command arg ..." send command to specified host
+ (run with --help for extended help)
+
+## Notes
+
+1. The full filename on the destination must be specfied, not just a directory. This is not checked for automatically so the user must take care.
+2. The remote host and port refer to a TCP connection that is first made to exchange the RDMA connection info. The file is then transferred via RDMA.
+3. The destination port may be speficied either via the -p option or as part of the destination argument. e.g. my.remote.host:12345:/path/to/my/destfilename if both are given then the one given in the destination argument is used.
+4. Transfers will fail if the destination directory does not already exist on the remote host. If you add the -P argument then the destination directory will be automatically created (if it doesn't already exist).
+
+## Example
+
+### Start the server on the destination system
+
+```
+$ ./hdrdmacp -m 1 -n 4 -s
+Looking for IB devices ...
+
+=============================================
+Found 1 devices
+---------------------------------------------
+ device 0 : ibp2s0 : uverbs0 : IB : InfiniBand channel adapter : Num. ports=2 : port num=1 : lid=3
+=============================================
+
+Device ibp2s0 opened. num_comp_vectors=16
+Port attributes:
+ state: 4
+ max_mtu: 5
+ active_mtu: 5
+ port_cap_flags: 39405672
+ max_msg_sz: 1073741824
+ active_width: 2
+ active_speed: 4
+ phys_state: 5
+ link_layer: 1
+Created 4 buffers of 250MB (1GB total)
+Launching hdRDMAstats thread ...
+Launching hdRDMAcontrol thread ...
+hdRDMAstats::Publish called
+hdRDMAcontrol::Publish called
+Listening for connections on port ... 10470
+```
+
+### Transfer a file from the source system
+
+```
+$ ./hdrdmacp /home/user/test_file ibtest2:/home/user/test_file
+Looking for IB devices ...
+
+=============================================
+Found 1 devices
+---------------------------------------------
+ device 0 : ibp3s0 : uverbs0 : IB : InfiniBand channel adapter : Num. ports=2 : port num=1 : lid=2
+=============================================
+
+Device ibp3s0 opened. num_comp_vectors=16
+Port attributes:
+ state: 4
+ max_mtu: 5
+ active_mtu: 5
+ port_cap_flags: 38881384
+ max_msg_sz: 1073741824
+ active_width: 2
+ active_speed: 4
+ phys_state: 5
+ link_layer: 1
+Created 4 buffers of 250MB (1GB total)
+IP address: 172.16.253.11 (ibtest2)
+Connected to ibtest2:10470
+Sending file: /home/user/test_file-> (ibtest2:)/home/user/test_file (0.000292292 GB)
+ queued 0MB (0/0 MB -- 100% - 30.7032 Gbps)
+ Transferred 0.292292 MB in 0.000919991 sec (2541.69 Mbps)
+ I/O rate reading from file: 7.176e-05 sec (32585.5 Mbps)
+ Confirmed remote file size matches local: 292292 bytes
+```
diff --git a/SConscript b/SConscript
new file mode 100644
index 0000000..ded81a5
--- /dev/null
+++ b/SConscript
@@ -0,0 +1,15 @@
+
+
+import sbms
+
+# get env object and clone it
+Import('*')
+env = env.Clone()
+
+sbms.AddZEROMQ(env)
+
+env.AppendUnique(CXXFLAGS=['--std=c++11', '-g'])
+env.AppendUnique(LIBS=['ibverbs','z', 'pthread'])
+
+sbms.executable(env)
+
diff --git a/SConstruct b/SConstruct.unused
similarity index 78%
rename from SConstruct
rename to SConstruct.unused
index 519aa0e..f62d8e9 100644
--- a/SConstruct
+++ b/SConstruct.unused
@@ -1,7 +1,7 @@
import glob
-#env = Environment(CXX='/apps/gcc/4.9.2/bin/g++')
+env = Environment(CXX='/apps/gcc/4.9.2/bin/g++')
env.AppendUnique(CPPPATH=['.'])
env.AppendUnique(CXXFLAGS=['--std=c++11', '-g'])
diff --git a/hdRDMA.cc b/hdRDMA.cc
index 0bc3fdd..aceea44 100644
--- a/hdRDMA.cc
+++ b/hdRDMA.cc
@@ -9,6 +9,7 @@
#include
#include
#include
+#include
using std::cout;
using std::cerr;
@@ -18,6 +19,7 @@ using std::chrono::duration;
using std::chrono::duration_cast;
using std::chrono::high_resolution_clock;
+extern int VERBOSE;
extern uint64_t HDRDMA_BUFF_LEN_GB;
extern uint64_t HDRDMA_NUM_BUFF_SECTIONS;
@@ -30,14 +32,14 @@ extern uint64_t HDRDMA_NUM_BUFF_SECTIONS;
//-------------------------------------------------------------
hdRDMA::hdRDMA()
{
- cout << "Looking for IB devices ..." << endl;
+ if(VERBOSE) cout << "Looking for IB devices ..." << endl;
int num_devices = 0;
struct ibv_device **devs = ibv_get_device_list( &num_devices );
// List devices
- cout << endl << "=============================================" << endl;
- cout << "Found " << num_devices << " devices" << endl;
- cout << "---------------------------------------------" << endl;
+ if(VERBOSE)cout << endl << "=============================================" << endl;
+ if(VERBOSE)cout << "Found " << num_devices << " devices" << endl;
+ if(VERBOSE)cout << "---------------------------------------------" << endl;
for(int i=0; iport_num = port_num;
}
}
ibv_close_device( ctx );
}
// - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-
- cout << " device " << i
+ if(VERBOSE>0){
+ cout << " device " << i
<< " : " << devs[i]->name
<< " : " << devs[i]->dev_name
<< " : " << transport_type
@@ -95,8 +97,9 @@ hdRDMA::hdRDMA()
// << " : GUID=" << ibv_get_device_guid(devs[i])
<< " : lid=" << lid
<< endl;
+ }
}
- cout << "=============================================" << endl << endl;
+ if(VERBOSE) cout << "=============================================" << endl << endl;
// Open device
ctx = ibv_open_device(dev);
@@ -112,21 +115,23 @@ hdRDMA::hdRDMA()
ibv_query_port( ctx, port_num, &port_attr);
ibv_query_gid(ctx, port_num, index, &gid);
- cout << "Device " << dev->name << " opened."
+ if(VERBOSE>0){
+ cout << "Device " << dev->name << " opened."
<< " num_comp_vectors=" << ctx->num_comp_vectors
<< endl;
- // Print some of the port attributes
- cout << "Port attributes:" << endl;
- cout << " state: " << port_attr.state << endl;
- cout << " max_mtu: " << port_attr.max_mtu << endl;
- cout << " active_mtu: " << port_attr.active_mtu << endl;
- cout << " port_cap_flags: " << port_attr.port_cap_flags << endl;
- cout << " max_msg_sz: " << port_attr.max_msg_sz << endl;
- cout << " active_width: " << (uint64_t)port_attr.active_width << endl;
- cout << " active_speed: " << (uint64_t)port_attr.active_speed << endl;
- cout << " phys_state: " << (uint64_t)port_attr.phys_state << endl;
- cout << " link_layer: " << (uint64_t)port_attr.link_layer << endl;
+ // Print some of the port attributes
+ cout << "Port attributes:" << endl;
+ cout << " state: " << port_attr.state << endl;
+ cout << " max_mtu: " << port_attr.max_mtu << endl;
+ cout << " active_mtu: " << port_attr.active_mtu << endl;
+ cout << " port_cap_flags: " << port_attr.port_cap_flags << endl;
+ cout << " max_msg_sz: " << port_attr.max_msg_sz << endl;
+ cout << " active_width: " << (uint64_t)port_attr.active_width << endl;
+ cout << " active_speed: " << (uint64_t)port_attr.active_speed << endl;
+ cout << " phys_state: " << (uint64_t)port_attr.phys_state << endl;
+ cout << " link_layer: " << (uint64_t)port_attr.link_layer << endl;
+ }
// Allocate protection domain
pd = ibv_alloc_pd(ctx);
@@ -162,14 +167,15 @@ hdRDMA::hdRDMA()
hdRDMAThread::bufferinfo bi = std::make_tuple( b, buff_section_len );
buffer_pool.push_back( bi );
}
- cout << "Created " << buffer_pool.size() << " buffers of " << buff_section_len/1000000 << "MB (" << buff_len/1000000000 << "GB total)" << endl;
+ if(VERBOSE) cout << "Created " << buffer_pool.size() << " buffers of " << buff_section_len/1000000 << "MB (" << buff_len/1000000000 << "GB total)" << endl;
// Create thread to listen for async ibv events
new std::thread( [&](){
+ pthread_setname_np( pthread_self(), "ibv_async_event" );
while( !done ){
struct ibv_async_event async_event;
auto ret = ibv_get_async_event( ctx, &async_event);
- cout << "+++ RDMA async event: type=" << async_event.event_type << " ret=" << ret << endl;
+ if(VERBOSE) cout << "+++ RDMA async event: type=" << async_event.event_type << " ret=" << ret << endl;
ibv_ack_async_event( &async_event );
}
});
@@ -213,8 +219,11 @@ void hdRDMA::Listen(int port)
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_port = htons( port );
-
+
server_sockfd = socket(AF_INET, SOCK_STREAM, 0);
+ int flags = fcntl( server_sockfd, F_GETFL);
+ fcntl( server_sockfd, F_SETFL, flags | O_NONBLOCK ); // set to non-blocking so program can exit cleanly if prompted
+
auto ret = bind( server_sockfd, (struct sockaddr*)&addr, sizeof(addr) );
if( ret != 0 ){
cout << "ERROR: binding server socket!" << endl;
@@ -225,9 +234,11 @@ void hdRDMA::Listen(int port)
// Create separate thread to accept socket connections so we don't block
std::atomic thread_started(false);
server_thread = new std::thread([&](){
-
+
+ pthread_setname_np( pthread_self(), "hdRDMA:Listen" );
+
// Loop forever accepting connections
- cout << "Listening for connections on port ... " << port << endl;
+ if(VERBOSE) cout << "Listening for connections on port ... " << port << endl;
thread_started = true;
while( !done ){
@@ -235,7 +246,7 @@ void hdRDMA::Listen(int port)
struct sockaddr_in peer_addr;
socklen_t peer_addr_len = sizeof(struct sockaddr_in);
peer_sockfd = accept(server_sockfd, (struct sockaddr *)&peer_addr, &peer_addr_len);
- if( peer_sockfd > 0 ){
+ if( peer_sockfd >= 0 ){
// cout << "Connection from " << inet_ntoa(peer_addr.sin_addr) << endl;
// Create a new thread to handle this connection
@@ -246,12 +257,15 @@ void hdRDMA::Listen(int port)
Nconnections++;
}else{
- cout << "Failed connection! errno=" << errno <join();
delete server_thread;
@@ -275,7 +289,7 @@ void hdRDMA::StopListening(void)
if( server_sockfd ) close( server_sockfd );
server_sockfd = 0;
}else{
- cout << "Server not running." <ai_family, ptr, addrstr, 100 );
- cout << "IP address: " << addrstr << " (" << result->ai_canonname << ")" << endl;
+ if(VERBOSE) cout << "IP address: " << addrstr << " (" << result->ai_canonname << ")" << endl;
result = result->ai_next;
}
@@ -327,7 +341,7 @@ void hdRDMA::Connect(std::string host, int port)
cout << "ERROR: connecting to server: " << host << " (" << inet_ntoa(addr.sin_addr) << ")" << endl;
exit(-3);
}else{
- cout << "Connected to " << host << ":" << port << endl;
+ if(VERBOSE) cout << "Connected to " << host << ":" << port << endl;
}
// Create an hdRDMAThread object to handle the RDMA connection details.
@@ -416,6 +430,7 @@ void hdRDMA::Poll(void)
t.first->join();
delete t.second;
threads.erase( t.first );
+ break; // threads is now modified
}
}
diff --git a/hdRDMA.h b/hdRDMA.h
index 4b607bd..38d3ed6 100644
--- a/hdRDMA.h
+++ b/hdRDMA.h
@@ -18,6 +18,11 @@
#include
+#ifndef _DBG_
+#define _DBG_ std::cerr<<__FILE__<<":"<<__LINE__<<" "
+#define _DBG__ std::cerr<<__FILE__<<":"<<__LINE__<<"\n"
+#endif // _DBG_
+
class hdRDMA{
public:
diff --git a/hdRDMAThread.cc b/hdRDMAThread.cc
index 2f4ed5e..62a2a8e 100644
--- a/hdRDMAThread.cc
+++ b/hdRDMAThread.cc
@@ -6,22 +6,34 @@
#include
#include
#include
+#include
+#include
+#include
+#include
+#include
+#include
#include
#include "hdRDMA.h"
-
-using std::cout;
-using std::cerr;
-using std::endl;
-using std::atomic;
+using namespace std;
using std::chrono::duration;
using std::chrono::duration_cast;
using std::chrono::high_resolution_clock;
extern atomic BYTES_RECEIVED_TOT;
+extern atomic NFILES_RECEIVED_TOT;
extern std::string HDRDMA_REMOTE_ADDR;
+extern int VERBOSE;
+extern int HDRDMA_RET_VAL;
+extern string HDRDMA_USERNAME;
+extern string HDRDMA_GROUPNAME;
+extern std::mutex HDRDMA_RECV_FNAMES_MUTEX;
+extern std::set HDRDMA_RECV_FNAMES;
+
+
+extern string SendControlCommand(string host, string command);
//
// Some notes on server mode:
@@ -92,6 +104,8 @@ hdRDMAThread::~hdRDMAThread()
//----------------------------------------------------------------------
void hdRDMAThread::ThreadRun(int sockfd)
{
+ pthread_setname_np( pthread_self(), "hdRDMAThread::ThreadRun" );
+
// The first thing we send via TCP is a 3 byte message indicating
// success or failure. This really just allows us to inform the client
// if the server cannot accept another connection right now due to
@@ -148,6 +162,11 @@ void hdRDMAThread::ThreadRun(int sockfd)
cerr << e.what() << endl;
return;
}
+
+ // Set the filesystem UID/GID if specified by the user. This is primarily for when this is
+ // run as root (e.g. as a service) so it can interact with the filesystem as an unpriviliged
+ // user.
+ SetUIDGID();
// Loop until we're told to stop by either the master thread or the
// remote peer declaring the connection is closing.
@@ -210,6 +229,78 @@ void hdRDMAThread::ThreadRun(int sockfd)
}
+//-------------------------------------------------------------
+// SetUIDGID
+//
+// This is called to (optionally) set the filesystem uid and gid
+// of the process when run in server mode. This is called by
+// each hdRDMAThread since the fsuid and fsgid must be set for
+// each thread.
+//-------------------------------------------------------------
+void hdRDMAThread::SetUIDGID(void)
+{
+ // Note that this calls setfsuid and setfsgid to set the
+ // user and group ids when dealing with the filesystem
+ // ONLY. This feature is here to allow the server to be
+ // started by root, but ensure the creation of files is
+ // done as an unpriviliged user.
+ //
+ // It is worth noting that using seteuid and setreuid were
+ // originally tried here, but would cause problems that
+ // looked very similar to issues when the memorylocked size
+ // was to small. I suspect changing the process IDs caused
+ // that limit to change.
+
+ // Create data structures on stack to use in calls to getgrnam_r
+ // and getpwnam_r. These are used instead of getgrnam and
+ // getpwnam because some problems were seen with files
+ // getting assigned strange uids when multiple files were
+ // being sent to a server simultaneously. I speculate this
+ // was caused by multiple threads simultaneously calling these
+ // which, according to the man page, recycle the same memory.
+ struct passwd pwd;
+ struct passwd *passwd=NULL; // will be set to either NULL or &pwd
+ struct group grp;
+ struct group *group=NULL; // will be set to either NULL or &grp
+ char buf[8192];
+ size_t buflen = 8192;
+
+ // Set effective gid if specified by user
+ if( HDRDMA_GROUPNAME.length()>0 ){
+ getgrnam_r(HDRDMA_GROUPNAME.c_str(), &grp, buf, buflen, &group);
+ //auto group = getgrnam(HDRDMA_GROUPNAME.c_str());
+ if( !group ){
+ cerr << "Unknown group name \"" << HDRDMA_GROUPNAME << "\"!" << endl;
+ exit(-53);
+ }
+ cout << "Setting fsgid to " << group->gr_gid << " (group=" << group->gr_name << ")" << endl;
+ if( setfsgid(group->gr_gid) != 0 ){
+ perror("setegid() error");
+ }
+ }
+
+ // Set effective uid if specified by user
+ if( HDRDMA_USERNAME.length()>0 ){
+ getpwnam_r(HDRDMA_USERNAME.c_str(), &pwd, buf, buflen, &passwd);
+ //auto passwd = getpwnam(HDRDMA_USERNAME.c_str());
+ if( !passwd ){
+ cerr << "Unknown username \"" << HDRDMA_USERNAME << "\"!" << endl;
+ exit(-52);
+ }
+ if( HDRDMA_GROUPNAME.empty() ){
+ // User did not explicitly set group name so set it to default group
+ if(VERBOSE>1)cout << "Setting fsgid to " << passwd->pw_gid << " (default for user " << passwd->pw_name << ")" << endl;
+ if( setfsgid(passwd->pw_gid) != 0 ){
+ perror("setefsgid() error");
+ }
+ }
+ cout << "Setting fsuid to " << passwd->pw_uid << " (username=" << passwd->pw_name << ")" << endl;
+ if( setfsuid(passwd->pw_uid) != 0 ){
+ perror("setfsuid() error");
+ }
+ }
+}
+
//-------------------------------------------------------------
// PostWR
//
@@ -237,6 +328,17 @@ void hdRDMAThread::PostWR( int id )
auto ret = ibv_post_recv( qp, &wr, &bad_wr);
if( ret != 0 ){
cout << "ERROR: ibv_post_recv returned non zero value (" << ret << ")" << endl;
+
+ struct ibv_qp_attr attr;
+ struct ibv_qp_init_attr init_attr;
+ ibv_query_qp(qp, &attr, IBV_QP_STATE, &init_attr);
+ if(attr.qp_state == IBV_QPS_RTR ){
+ cerr << "QP is in RTR state" << endl;
+ }else if(attr.qp_state == IBV_QPS_RTS){
+ cerr << "QP is in RTS state" << endl;
+ }else{
+ cerr << "QP is not in RTR or RTS state (" << attr.qp_state << ")" << endl;
+ }
}
}
@@ -257,8 +359,8 @@ void hdRDMAThread::ExchangeQPInfo( int sockfd )
// Create a new QP to use with the remote peer.
CreateQP();
- // Create a work receive request for each MR buffer we have
- for( uint32_t id=0; id1) cout << "Setting QP to init state" << endl;
ret = ibv_modify_qp (qp, &qp_attr,
IBV_QP_STATE | IBV_QP_PKEY_INDEX |
IBV_QP_PORT | IBV_QP_ACCESS_FLAGS);
@@ -373,8 +478,9 @@ int hdRDMAThread::SetToRTS(void)
qp_attr.ah_attr.dlid = remote_qpinfo.lid,
qp_attr.ah_attr.sl = IB_SL,
qp_attr.ah_attr.src_path_bits = 0,
- qp_attr.ah_attr.port_num = hdrdma->port_num,
+ qp_attr.ah_attr.port_num = hdrdma->port_num;
+ if(VERBOSE>1) cout << "Setting QP to RTR state" << endl;
ret = ibv_modify_qp(qp, &qp_attr,
IBV_QP_STATE | IBV_QP_AV |
IBV_QP_PATH_MTU | IBV_QP_DEST_QPN |
@@ -395,8 +501,9 @@ int hdRDMAThread::SetToRTS(void)
qp_attr.retry_cnt = 7,
qp_attr.rnr_retry = 7,
qp_attr.sq_psn = 0,
- qp_attr.max_rd_atomic = 1,
+ qp_attr.max_rd_atomic = 1;
+ if(VERBOSE>1) cout << "Setting QP to RTS state" << endl;
ret = ibv_modify_qp (qp, &qp_attr,
IBV_QP_STATE | IBV_QP_TIMEOUT |
IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY |
@@ -428,16 +535,24 @@ void hdRDMAThread::ReceiveBuffer(uint8_t *buff, uint32_t buff_len)
ofs = nullptr;
}
ofilename = (char*)&hi->payload;
- cout << "Receiving file: " << ofilename << endl;
-
+ if(VERBOSE>1)cout << "Receiving file: " << ofilename << endl;
+
// Create parent directory path if specified by remote sender
- cout << "hi->flags: 0x" << std::hex << hi->flags << std::dec << endl;
+ if(VERBOSE>2)cout << "hi->flags: 0x" << std::hex << hi->flags << std::dec << endl;
if( hi->flags & HI_MAKE_PARENT_DIRS ){
auto pos = ofilename.find_last_of('/');
+ if(VERBOSE>2) cout << "Making directory: " << ofilename.substr(0, pos) << endl;
if( pos != std::string::npos ) makePath( ofilename.substr(0, pos) );
}
+ if(VERBOSE>2) cout << "Opening output file: " << ofilename << endl;
ofs = new std::ofstream( ofilename.c_str() );
+ if( ! ofs->is_open() ){
+ cerr << "Unable to create file: " << ofilename << endl;
+ return;
+ }else if(VERBOSE>2){
+ cout << "Successfully opened output file: " << ofilename << endl;
+ }
ofilesize = 0;
crcsum = adler32( 0L, Z_NULL, 0 );
calculate_checksum = (hi->flags & HI_CALCULATE_CHECKSUM); // optionally calculate checksum
@@ -446,6 +561,10 @@ void hdRDMAThread::ReceiveBuffer(uint8_t *buff, uint32_t buff_len)
t_last = t1; // used for intermediate rate calculations
delta_t_io = 0.0;
Ntransferred = 0;
+
+ // Add filename to list of files currently being received
+ std::lock_guard lck(HDRDMA_RECV_FNAMES_MUTEX);
+ HDRDMA_RECV_FNAMES.insert(ofilename);
}
if( !ofs ){
@@ -453,6 +572,7 @@ void hdRDMAThread::ReceiveBuffer(uint8_t *buff, uint32_t buff_len)
return;
}
+
// Write buffer payload to file
auto data = &buff[hi->header_len];
auto data_len = buff_len - hi->header_len;
@@ -470,13 +590,18 @@ void hdRDMAThread::ReceiveBuffer(uint8_t *buff, uint32_t buff_len)
if( t_last != t1 ) cout << endl; // print carriage return if we printed any intermediate progress
if( ofs ){
auto t_io_start = high_resolution_clock::now();
+ ofs->flush();
ofs->close();
+ if(VERBOSE>2) cout << "Closed output file: " << ofilename << endl;
auto t_io_end = high_resolution_clock::now();
duration duration_io = duration_cast>(t_io_end-t_io_start);
delta_t_io += duration_io.count();
- ofs->close();
delete ofs;
ofs = nullptr;
+ NFILES_RECEIVED_TOT++;
+
+ std::lock_guard lck(HDRDMA_RECV_FNAMES_MUTEX);
+ HDRDMA_RECV_FNAMES.erase(ofilename);
}
// auto t2 = high_resolution_clock::now();
// duration delta_t = duration_cast>(t2-t1);
@@ -597,18 +722,13 @@ void hdRDMAThread::SendFile(std::string srcfilename, std::string dstfilename, bo
std::string mess = delete_after_send ? " - will be deleted after send":"";
cout << "Sending file: " << srcfilename << "-> (" << HDRDMA_REMOTE_ADDR << ":)" << dstfilename << " (" << filesize_GB << " GB)" << mess << endl;
- struct ibv_send_wr wr, *bad_wr = nullptr;
- struct ibv_sge sge;
- bzero( &wr, sizeof(wr) );
- bzero( &sge, sizeof(sge) );
-
- wr.opcode = IBV_WR_SEND;
- wr.sg_list = &sge;
- wr.num_sge = 1;
- wr.send_flags = IBV_SEND_SIGNALED,
-
- sge.lkey = hdrdma->mr->lkey;
-
+ // This is not an ideal pattern, but it is a minimal refactoring that ensures
+ // the ibv_send_wr and ibv_sge structs live unchanged for the life of the
+ // ibv send calls.
+ int max_buffer_sends = 1000;
+ std::vector wr_vec(1000);
+ std::vector sge_vec(1000);
+
// Send buffers
crcsum = adler32( 0L, Z_NULL, 0 );
t1 = high_resolution_clock::now();
@@ -617,7 +737,22 @@ void hdRDMAThread::SendFile(std::string srcfilename, std::string dstfilename, bo
uint64_t bytes_left = filesize;
uint32_t Noutstanding_writes = 0;
double delta_t_io = 0.0;
- for(int i=0; i<1000; i++){ // if sending more than 1000 buffers something is wrong!
+ for(int i=0; imr->lkey;
+
auto id = i%buffers.size();
auto &buffer = buffers[id];
auto buff = std::get<0>(buffer);
@@ -665,10 +800,15 @@ void hdRDMAThread::SendFile(std::string srcfilename, std::string dstfilename, bo
// Optionally calculate cehcksum
if( calculate_checksum ) crcsum = adler32( crcsum, (uint8_t*)payload_ptr, bytes_payload );
+ // Print optional debugging message
+ if( VERBOSE>1 ){
+ _DBG_ << "\nSending " << sge.length << " bytes (" << bytes_payload << " payload) hi->flags=" << hi->flags << " wr_id=" << wr.wr_id << std::endl;
+ }
+
// Post write
auto ret = ibv_post_send( qp, &wr, &bad_wr );
if( ret != 0 ){
- cout << "ERROR: ibv_post_send returned non zero value (" << ret << ")" << endl;
+ _DBG_ << "ERROR: ibv_post_send returned non zero value (" << ret << ")" << endl;
break;
}
Noutstanding_writes++;
@@ -686,7 +826,12 @@ void hdRDMAThread::SendFile(std::string srcfilename, std::string dstfilename, bo
// If we've posted data using all available sections of the mr
// then we need to wait for one to finish so we can recycle it.
- if( Noutstanding_writes>=buffers.size() ){
+ if(VERBOSE>1) _DBG_ << "Noutstanding_writes="<=buffers.size() ){
+ if( Noutstanding_writes>=1 ){
PollCQ();
Noutstanding_writes--;
}
@@ -720,9 +865,24 @@ void hdRDMAThread::SendFile(std::string srcfilename, std::string dstfilename, bo
if( calculate_checksum ) cout << " checksum: " << std::hex << crcsum << std::dec << endl;
//cout << " IB rate sending file: " << delta_t.count()-delta_t_io << " sec (" << rate_ib_Gbps << " Gbps) - n.b. don't take this seriously!" << endl;
- if( delete_after_send ){
- unlink( srcfilename.c_str() );
- cout <<" Deleted src file: " << srcfilename << endl;
+ // Verify file was completely sent by checking file size on remote host
+ string response = SendControlCommand( HDRDMA_REMOTE_ADDR, string("get_file_size ") + dstfilename);
+// cout << "response: " << response << endl;
+ std::vector vals;
+ std::istringstream iss( response );
+ copy( std::istream_iterator(iss), std::istream_iterator(), back_inserter(vals) );
+ int64_t fsize = 0;
+ if( vals.size()>1 ) fsize = atoll( vals[1].c_str() );
+ if( fsize == filesize ){
+ cout << " Confirmed remote file size matches local: " << fsize << " bytes" << endl;
+ if( delete_after_send ) {
+ unlink(srcfilename.c_str());
+ cout << " Deleted src file: " << srcfilename << endl;
+ }
+ }else{
+ cerr << "Local and remote file sizes do not match after send! (" << filesize << " != " << fsize << ")" << endl;
+ cerr << "response from server was: " << response << endl;
+ HDRDMA_RET_VAL = -1;
}
}
@@ -743,6 +903,7 @@ void hdRDMAThread::PollCQ(void)
// Check to see if a work completion notification has come in
int n = ibv_poll_cq(cq, num_wc, &wc);
+ if( VERBOSE>1) _DBG_ << "polled completion queue. ibv_poll_cq() result: n= " << n << " wc.wr_id=" << wc.wr_id <<"\n";
if( n<0 ){
std::stringstream ss;
ss << "ERROR: ibv_poll_cq returned " << n << " - closing connection";
@@ -751,7 +912,25 @@ void hdRDMAThread::PollCQ(void)
if( n == 0 ){
std::this_thread::sleep_for(std::chrono::microseconds(1));
continue;
- }
+ }
+
+ // If we get here then n should equal 1
+ if( VERBOSE>1){
+
+ _DBG_ << "wc.status=" << wc.status << " (IBV_WC_WR_FLUSH_ERR=" << IBV_WC_WR_FLUSH_ERR << ")\n";
+
+ struct ibv_qp_attr attr;
+ struct ibv_qp_init_attr init_attr;
+ int ret = ibv_query_qp(qp, &attr, IBV_QP_STATE, &init_attr);
+ if(ret) {
+ perror("ibv_query_qp");
+ } else {
+ std::cout << "Current QP state: " << attr.qp_state << std::endl;
+ if(attr.qp_state != IBV_QPS_RTS) {
+ std::cerr << "QP is not in RTS state("<
#include