From d3b27f59dcfc1686defd56f15c66c4b82652878d Mon Sep 17 00:00:00 2001 From: jolavillette Date: Fri, 26 Dec 2025 14:27:45 +0100 Subject: [PATCH 01/13] fttransfermodule: add speed printouts --- src/ft/fttransfermodule.cc | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/ft/fttransfermodule.cc b/src/ft/fttransfermodule.cc index 8c6ae1f08c..ac53f37d67 100644 --- a/src/ft/fttransfermodule.cc +++ b/src/ft/fttransfermodule.cc @@ -741,7 +741,11 @@ bool ftTransferModule::locked_tickPeerTransfer(peerInfo &info) #endif /* update rate */ - if( (info.lastTransfers > 0 && ageReq > 0) || ageReq > 2) +// JOLA + uint32_t actualRateOld = info.actualRate; + uint32_t lastTransfersOld = info.lastTransfers; + + if( (info.lastTransfers > 0 && ageReq > 0) || ageReq > 2) { info.actualRate = info.actualRate * 0.75 + 0.25 * info.lastTransfers / (float)ageReq; info.lastTransfers = 0; @@ -822,7 +826,10 @@ bool ftTransferModule::locked_tickPeerTransfer(peerInfo &info) std::cerr << "locked_tickPeerTransfer() desired next_req: " << next_req; std::cerr << std::endl; #endif - + +// JOLA + RsDbg() << "FT actualRateOld " << std::dec << actualRateOld << " lastTransfersOld " << lastTransfersOld << " age " << ageReq << " actualRateNew " << (uint32_t) info.actualRate << " nextReq " << next_req; + /* do request */ uint64_t req_offset = 0; uint32_t req_size =0 ; From 98ef5d8072c905081f2159d0956ad163c3da8f67 Mon Sep 17 00:00:00 2001 From: jolavillette Date: Sat, 27 Dec 2025 10:12:54 +0100 Subject: [PATCH 02/13] disable packet slicing --- src/pqi/pqistreamer.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/pqi/pqistreamer.cc b/src/pqi/pqistreamer.cc index ff242f3631..bcd5cdc2ff 100644 --- a/src/pqi/pqistreamer.cc +++ b/src/pqi/pqistreamer.cc @@ -63,7 +63,8 @@ static const int PQISTREAM_PACKET_SLICING_PROBE_DELAY = 60; // send every 6 static uint8_t PACKET_SLICING_PROBE_BYTES[8] = { 0x02, 0xaa, 0xbb, 0xcc, 0x00, 0x00, 0x00, 0x08 } ; /* Change to true to disable packet slicing and/or packet grouping, if needed */ -#define DISABLE_PACKET_SLICING false +// JOLA +#define DISABLE_PACKET_SLICING true #define DISABLE_PACKET_GROUPING false /* This removes the print statements (which hammer pqidebug) */ From 46c732817d160606c8cc3bfc38dbff72d6487156 Mon Sep 17 00:00:00 2001 From: jolavillette Date: Sat, 27 Dec 2025 15:56:39 +0100 Subject: [PATCH 03/13] reduce DEFAULT_STREAMER_SLEEP to 10ms in pqithreadstreamer --- src/pqi/pqithreadstreamer.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/pqi/pqithreadstreamer.cc b/src/pqi/pqithreadstreamer.cc index 6745dc864e..64cfd65c8f 100644 --- a/src/pqi/pqithreadstreamer.cc +++ b/src/pqi/pqithreadstreamer.cc @@ -24,7 +24,8 @@ #include #define DEFAULT_STREAMER_TIMEOUT 10000 // 10 ms -#define DEFAULT_STREAMER_SLEEP 30000 // 30 ms +// JOLA +#define DEFAULT_STREAMER_SLEEP 10000 // 10 ms #define DEFAULT_STREAMER_IDLE_SLEEP 1000000 // 1 sec // #define PQISTREAMER_DEBUG From c1151dfb7b57bff2ac2776876c05d52edd14875b Mon Sep 17 00:00:00 2001 From: jolavillette Date: Sat, 27 Dec 2025 18:34:12 +0100 Subject: [PATCH 04/13] increase MAX_FT_CHUNK to 64 KB in ftserver.cc --- src/ft/ftserver.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ft/ftserver.cc b/src/ft/ftserver.cc index 65061c2baa..933e8b94f8 100644 --- a/src/ft/ftserver.cc +++ b/src/ft/ftserver.cc @@ -1256,8 +1256,8 @@ bool ftServer::sendData(const RsPeerId& peerId, const RsFileHash& hash, uint64_t { //static const uint32_t MAX_FT_CHUNK = 32 * 1024; /* 32K */ //static const uint32_t MAX_FT_CHUNK = 16 * 1024; /* 16K */ - // - static const uint32_t MAX_FT_CHUNK = 8 * 1024; /* 16K */ +// JOLA + static const uint32_t MAX_FT_CHUNK = 64 * 1024; /* 64K */ /* workout size */ chunk = MAX_FT_CHUNK; From 0d1a3743af0657538bf7c18e2b5134452622239d Mon Sep 17 00:00:00 2001 From: jolavillette Date: Sat, 27 Dec 2025 19:28:54 +0100 Subject: [PATCH 05/13] increase MAX_FTCHUNKS_PER_PEER to 40 in ftfilecreator --- src/ft/ftfilecreator.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/ft/ftfilecreator.cc b/src/ft/ftfilecreator.cc index 53a1036b47..da340ac0df 100644 --- a/src/ft/ftfilecreator.cc +++ b/src/ft/ftfilecreator.cc @@ -40,7 +40,8 @@ ******/ #define CHUNK_MAX_AGE 120 -#define MAX_FTCHUNKS_PER_PEER 20 +// JOLA +#define MAX_FTCHUNKS_PER_PEER 40 /*********************************************************** * From 6b07d4609d6b43a1e5dd4fff398acf80a491634d Mon Sep 17 00:00:00 2001 From: jolavillette Date: Sat, 27 Dec 2025 21:43:34 +0100 Subject: [PATCH 06/13] reactivate slicing & increase MAX_FT_CHUNK to 128 kB in ftserver --- src/ft/ftserver.cc | 2 +- src/pqi/pqistreamer.cc | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/ft/ftserver.cc b/src/ft/ftserver.cc index 933e8b94f8..cb4308bc57 100644 --- a/src/ft/ftserver.cc +++ b/src/ft/ftserver.cc @@ -1257,7 +1257,7 @@ bool ftServer::sendData(const RsPeerId& peerId, const RsFileHash& hash, uint64_t //static const uint32_t MAX_FT_CHUNK = 32 * 1024; /* 32K */ //static const uint32_t MAX_FT_CHUNK = 16 * 1024; /* 16K */ // JOLA - static const uint32_t MAX_FT_CHUNK = 64 * 1024; /* 64K */ + static const uint32_t MAX_FT_CHUNK = 128 * 1024; /* 128K */ /* workout size */ chunk = MAX_FT_CHUNK; diff --git a/src/pqi/pqistreamer.cc b/src/pqi/pqistreamer.cc index bcd5cdc2ff..01e887297b 100644 --- a/src/pqi/pqistreamer.cc +++ b/src/pqi/pqistreamer.cc @@ -63,8 +63,7 @@ static const int PQISTREAM_PACKET_SLICING_PROBE_DELAY = 60; // send every 6 static uint8_t PACKET_SLICING_PROBE_BYTES[8] = { 0x02, 0xaa, 0xbb, 0xcc, 0x00, 0x00, 0x00, 0x08 } ; /* Change to true to disable packet slicing and/or packet grouping, if needed */ -// JOLA -#define DISABLE_PACKET_SLICING true +#define DISABLE_PACKET_SLICING false #define DISABLE_PACKET_GROUPING false /* This removes the print statements (which hammer pqidebug) */ From 650aa67c45301917333c0e6760dcaaf8e6852ba5 Mon Sep 17 00:00:00 2001 From: jolavillette Date: Sun, 28 Dec 2025 10:10:48 +0100 Subject: [PATCH 07/13] increase optimal packet size to 1400 in pqistreamer --- src/pqi/pqistreamer.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/pqi/pqistreamer.cc b/src/pqi/pqistreamer.cc index 01e887297b..a98b84b8fd 100644 --- a/src/pqi/pqistreamer.cc +++ b/src/pqi/pqistreamer.cc @@ -48,7 +48,8 @@ static const int PQISTREAM_AVG_PERIOD = 1; // update speed estimate every static const float PQISTREAM_AVG_FRAC = 0.8; // for bandpass filter over speed estimate. static const float PQISTREAM_AVG_DT_FRAC = 0.99; // for low pass filter over elapsed time -static const int PQISTREAM_OPTIMAL_PACKET_SIZE = 512; // It is believed that this value should be lower than TCP slices and large enough as compare to encryption padding. +// JOLA +static const int PQISTREAM_OPTIMAL_PACKET_SIZE = 1400; // It is believed that this value should be lower than TCP slices and large enough as compare to encryption padding. // most importantly, it should be constant, so as to allow correct QoS. static const int PQISTREAM_SLICE_FLAG_STARTS = 0x01; // static const int PQISTREAM_SLICE_FLAG_ENDS = 0x02; // these flags should be kept in the range 0x01-0x08 From b0c89098573d077939040e0d0d7eb1644671a570 Mon Sep 17 00:00:00 2001 From: jolavillette Date: Sun, 28 Dec 2025 11:02:50 +0100 Subject: [PATCH 08/13] increase MAX_FT_CHUNK to 240 kB in ftserver --- src/ft/ftserver.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ft/ftserver.cc b/src/ft/ftserver.cc index cb4308bc57..cacfca4070 100644 --- a/src/ft/ftserver.cc +++ b/src/ft/ftserver.cc @@ -1257,7 +1257,7 @@ bool ftServer::sendData(const RsPeerId& peerId, const RsFileHash& hash, uint64_t //static const uint32_t MAX_FT_CHUNK = 32 * 1024; /* 32K */ //static const uint32_t MAX_FT_CHUNK = 16 * 1024; /* 16K */ // JOLA - static const uint32_t MAX_FT_CHUNK = 128 * 1024; /* 128K */ + static const uint32_t MAX_FT_CHUNK = 240 * 1024; /* 128K */ /* workout size */ chunk = MAX_FT_CHUNK; From fff912f10c7329059665736ed6c5a50d2edfc2c7 Mon Sep 17 00:00:00 2001 From: jolavillette Date: Mon, 29 Dec 2025 23:55:42 +0100 Subject: [PATCH 09/13] Windows only: pqissllistener, remove manual TCP buffer overrides to enable OS Auto-Tuning --- src/pqi/pqissllistener.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/pqi/pqissllistener.cc b/src/pqi/pqissllistener.cc index 76e58acf69..4c353e57de 100644 --- a/src/pqi/pqissllistener.cc +++ b/src/pqi/pqissllistener.cc @@ -233,8 +233,9 @@ int pqissllistenbase::setuplisten() "pqissllistenbase::setuplisten() Bound to Address."); } +/* #ifdef WINDOWS_SYS - /* Set TCP buffer size for Windows systems */ + // Set TCP buffer size for Windows systems int sockbufsize = 0; int size = sizeof(int); @@ -274,6 +275,7 @@ int pqissllistenbase::setuplisten() std::cerr << "pqissllistenbase::setuplisten: Error setting TCP send buffer size. Error " << err << std::endl; } #endif +*/ if (0 != (err = listen(lsock, 100))) { From f0e6618b8f22d569a646f501c71a393b7c14cb65 Mon Sep 17 00:00:00 2001 From: jolavillette Date: Tue, 30 Dec 2025 08:42:40 +0100 Subject: [PATCH 10/13] clean up code --- src/ft/ftfilecreator.cc | 1 - src/ft/ftserver.cc | 3 +-- src/ft/fttransfermodule.cc | 7 ------- src/pqi/pqistreamer.cc | 1 - src/pqi/pqithreadstreamer.cc | 1 - 5 files changed, 1 insertion(+), 12 deletions(-) diff --git a/src/ft/ftfilecreator.cc b/src/ft/ftfilecreator.cc index da340ac0df..3d592a8d2a 100644 --- a/src/ft/ftfilecreator.cc +++ b/src/ft/ftfilecreator.cc @@ -40,7 +40,6 @@ ******/ #define CHUNK_MAX_AGE 120 -// JOLA #define MAX_FTCHUNKS_PER_PEER 40 /*********************************************************** diff --git a/src/ft/ftserver.cc b/src/ft/ftserver.cc index cacfca4070..72c65a2b51 100644 --- a/src/ft/ftserver.cc +++ b/src/ft/ftserver.cc @@ -1256,8 +1256,7 @@ bool ftServer::sendData(const RsPeerId& peerId, const RsFileHash& hash, uint64_t { //static const uint32_t MAX_FT_CHUNK = 32 * 1024; /* 32K */ //static const uint32_t MAX_FT_CHUNK = 16 * 1024; /* 16K */ -// JOLA - static const uint32_t MAX_FT_CHUNK = 240 * 1024; /* 128K */ + static const uint32_t MAX_FT_CHUNK = 240 * 1024; /* 240K */ /* workout size */ chunk = MAX_FT_CHUNK; diff --git a/src/ft/fttransfermodule.cc b/src/ft/fttransfermodule.cc index ac53f37d67..839c96f902 100644 --- a/src/ft/fttransfermodule.cc +++ b/src/ft/fttransfermodule.cc @@ -741,10 +741,6 @@ bool ftTransferModule::locked_tickPeerTransfer(peerInfo &info) #endif /* update rate */ -// JOLA - uint32_t actualRateOld = info.actualRate; - uint32_t lastTransfersOld = info.lastTransfers; - if( (info.lastTransfers > 0 && ageReq > 0) || ageReq > 2) { info.actualRate = info.actualRate * 0.75 + 0.25 * info.lastTransfers / (float)ageReq; @@ -827,9 +823,6 @@ bool ftTransferModule::locked_tickPeerTransfer(peerInfo &info) std::cerr << std::endl; #endif -// JOLA - RsDbg() << "FT actualRateOld " << std::dec << actualRateOld << " lastTransfersOld " << lastTransfersOld << " age " << ageReq << " actualRateNew " << (uint32_t) info.actualRate << " nextReq " << next_req; - /* do request */ uint64_t req_offset = 0; uint32_t req_size =0 ; diff --git a/src/pqi/pqistreamer.cc b/src/pqi/pqistreamer.cc index a98b84b8fd..1be95e5501 100644 --- a/src/pqi/pqistreamer.cc +++ b/src/pqi/pqistreamer.cc @@ -48,7 +48,6 @@ static const int PQISTREAM_AVG_PERIOD = 1; // update speed estimate every static const float PQISTREAM_AVG_FRAC = 0.8; // for bandpass filter over speed estimate. static const float PQISTREAM_AVG_DT_FRAC = 0.99; // for low pass filter over elapsed time -// JOLA static const int PQISTREAM_OPTIMAL_PACKET_SIZE = 1400; // It is believed that this value should be lower than TCP slices and large enough as compare to encryption padding. // most importantly, it should be constant, so as to allow correct QoS. static const int PQISTREAM_SLICE_FLAG_STARTS = 0x01; // diff --git a/src/pqi/pqithreadstreamer.cc b/src/pqi/pqithreadstreamer.cc index 64cfd65c8f..97b33756ea 100644 --- a/src/pqi/pqithreadstreamer.cc +++ b/src/pqi/pqithreadstreamer.cc @@ -24,7 +24,6 @@ #include #define DEFAULT_STREAMER_TIMEOUT 10000 // 10 ms -// JOLA #define DEFAULT_STREAMER_SLEEP 10000 // 10 ms #define DEFAULT_STREAMER_IDLE_SLEEP 1000000 // 1 sec From 5d36a8785a79f142354ebee04b13112f0bfbc7ef Mon Sep 17 00:00:00 2001 From: jolavillette Date: Fri, 2 Jan 2026 15:02:39 +0100 Subject: [PATCH 11/13] attempt to use adaptive timeout and sleep in pqithreadstreamer --- src/pqi/pqistreamer.cc | 25 +++++++----- src/pqi/pqistreamer.h | 2 +- src/pqi/pqithreadstreamer.cc | 75 ++++++++++++++++++++++++++++-------- 3 files changed, 76 insertions(+), 26 deletions(-) diff --git a/src/pqi/pqistreamer.cc b/src/pqi/pqistreamer.cc index 1be95e5501..ec72133307 100644 --- a/src/pqi/pqistreamer.cc +++ b/src/pqi/pqistreamer.cc @@ -304,16 +304,17 @@ int pqistreamer::tick_bio() int pqistreamer::tick_recv(uint32_t timeout) { + int readbytes = 0; if (mBio->moretoread(timeout)) { - handleincoming(); + readbytes = handleincoming(); } if(!(mBio->isactive())) { RsStackMutex stack(mStreamerMtx); free_pend(); } - return 1; + return readbytes; } int pqistreamer::tick_send(uint32_t timeout) @@ -326,13 +327,14 @@ int pqistreamer::tick_send(uint32_t timeout) return 0; } + int sentbytes; if (mBio->cansend(timeout)) { RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ - handleoutgoing_locked(); + sentbytes = handleoutgoing_locked(); } - return 1; + return sentbytes; } int pqistreamer::status() @@ -548,12 +550,12 @@ int pqistreamer::handleoutgoing_locked() if ((!(mBio->cansend(0))) || (maxbytes < sentbytes)) { -#ifdef DEBUG_PQISTREAMER +//#ifdef DEBUG_PQISTREAMER if (sentbytes > maxbytes) RsDbg() << "PQISTREAMER pqistreamer::handleoutgoing_locked() stopped sending max reached, sentbytes " << std::dec << sentbytes << " maxbytes " << maxbytes; else RsDbg() << "PQISTREAMER pqistreamer::handleoutgoing_locked() stopped sending bio not ready, sentbytes " << std::dec << sentbytes << " maxbytes " << maxbytes; -#endif +//#endif return 0; } // send a out_pkt., else send out_data. unless there is a pending packet. The strategy is to @@ -701,7 +703,10 @@ int pqistreamer::handleoutgoing_locked() if(nsent > 0) std::cerr << "nsent = " << nsent << ", total bytes=" << sentbytes << std::endl; #endif - return 1; + if (sentbytes >0) + RsDbg() << "PQISTREAMER pqistreamer::handleoutgoing_locked() stopped, outqueue empty sentbytes " << std::dec << sentbytes << " maxbytes " << maxbytes; + + return sentbytes; } @@ -1028,14 +1033,14 @@ int pqistreamer::handleincoming() if(maxin > readbytes && mBio->moretoread(0)) goto start_packet_read ; -#ifdef DEBUG_PQISTREAMER +//#ifdef DEBUG_PQISTREAMER if (readbytes > maxin) RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading max reached, readbytes " << std::dec << readbytes << " maxin " << maxin; else RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading no more to read, readbytes " << std::dec << readbytes << " maxin " << maxin; -#endif +//#endif - return 0; + return readbytes; } RsItem *pqistreamer::addPartialPacket(const void *block, uint32_t len, uint32_t slice_packet_id, bool is_packet_starting, bool is_packet_ending, uint32_t &total_len) diff --git a/src/pqi/pqistreamer.h b/src/pqi/pqistreamer.h index 0e7f6b8154..3cac83c5a8 100644 --- a/src/pqi/pqistreamer.h +++ b/src/pqi/pqistreamer.h @@ -106,6 +106,7 @@ class pqistreamer: public PQInterface // Binary Interface for IO, initialisated at startup. BinInterface *mBio; unsigned int mBio_flags; // BIN_FLAGS_NO_CLOSE | BIN_FLAGS_NO_DELETE + std::list mOutPkts; // Cntrl / Search / Results queue private: int queue_outpqi_locked(RsItem *i,uint32_t& serialized_size); @@ -145,7 +146,6 @@ class pqistreamer: public PQInterface int mFailed_read_attempts ; // Temp Storage for transient data..... - std::list mOutPkts; // Cntrl / Search / Results queue std::list mIncoming; uint32_t mIncomingSize; // size of mIncoming. To avoid calling linear cost std::list::size() diff --git a/src/pqi/pqithreadstreamer.cc b/src/pqi/pqithreadstreamer.cc index 97b33756ea..ffbcf5880a 100644 --- a/src/pqi/pqithreadstreamer.cc +++ b/src/pqi/pqithreadstreamer.cc @@ -23,8 +23,13 @@ #include "pqi/pqithreadstreamer.h" #include -#define DEFAULT_STREAMER_TIMEOUT 10000 // 10 ms -#define DEFAULT_STREAMER_SLEEP 10000 // 10 ms +#ifdef WINDOWS_SYS +#include +#include +#endif + +#define DEFAULT_STREAMER_TIMEOUT 5000 // 10 ms +#define DEFAULT_STREAMER_SLEEP 30000 // 30 ms #define DEFAULT_STREAMER_IDLE_SLEEP 1000000 // 1 sec // #define PQISTREAMER_DEBUG @@ -32,6 +37,12 @@ pqithreadstreamer::pqithreadstreamer(PQInterface *parent, RsSerialiser *rss, const RsPeerId& id, BinInterface *bio_in, int bio_flags_in) :pqistreamer(rss, id, bio_in, bio_flags_in), mParent(parent), mTimeout(0), mThreadMutex("pqithreadstreamer") { +#ifdef WINDOWS_SYS + // On Windows, the default system timer resolution is around 15 ms. + // This call allows for sleep durations of less than 15 ms, which is + // necessary for frequent polling and high-speed data transfer. + timeBeginPeriod(1); +#endif mTimeout = DEFAULT_STREAMER_TIMEOUT; mSleepPeriod = DEFAULT_STREAMER_SLEEP; } @@ -50,52 +61,86 @@ int pqithreadstreamer::tick() return 0; } -void pqithreadstreamer::threadTick() +void pqithreadstreamer::threadTick() { uint32_t recv_timeout = 0; uint32_t sleep_period = 0; bool isactive = false; + bool has_outgoing = false; + // Locked section to safely read shared variables { RsStackMutex stack(mStreamerMtx); recv_timeout = mTimeout; sleep_period = mSleepPeriod; isactive = mBio->isactive(); + // Check if there are packets waiting in the outgoing queue + has_outgoing = !mOutPkts.empty(); } - // update the connection rates - updateRates() ; - - // if the connection est not active, long sleep then return if (!isactive) { rstime::rs_usleep(DEFAULT_STREAMER_IDLE_SLEEP); return ; } - // fill incoming queue with items from SSL + updateRates(); + + // ADAPTIVE TIMEOUT: + // If we have data to send, we force the receive timeout to 0. + // This ensures tick_recv returns immediately if no data is present, + // allowing tick_send to be called without waiting for the 10ms recv timeout. + uint32_t adaptive_timeout = has_outgoing ? 0 : recv_timeout; + + // Fill incoming queue + int readbytes = 0; { RsStackMutex stack(mThreadMutex); - tick_recv(recv_timeout); + readbytes = tick_recv(adaptive_timeout); } - // move items to appropriate service queue or shortcut to fast service + bool activity = false; + + // Process incoming items, move them to appropriate service queue or shortcut to fast service RsItem *incoming = NULL; while((incoming = GetItem())) { + activity = true; // Activity detected (Download) RecvItem(incoming); } - // parse the outgoing queue and send items to SSL + // Parse outgoing queue and send items + int sentbytes = 0; { RsStackMutex stack(mThreadMutex); - tick_send(0); + sentbytes = tick_send(0); + } + + // ADAPTIVE SLEEP: + // If data was moved in either direction, reset sleep to 1ms for max performance. + // Otherwise, gradually increase sleep up to 30ms to save CPU resources. + { + RsStackMutex stack(mStreamerMtx); + if (readbytes > 0 || sentbytes > 0) + { + mSleepPeriod = 1000; + } + else + { + // Increment sleep period up to the 30ms limit + if (mSleepPeriod < 30000) + { + mSleepPeriod += 1000; + } + } + sleep_period = mSleepPeriod; } - // sleep - if (sleep_period) + if (readbytes > 0 || sentbytes > 0 || adaptive_timeout == 0 || sleep_period == 30000) + RsDbg() << "PQISTREAMER pqithreadstreamer::threadTick() readbytes " << std::dec << readbytes << " sentbytes " << sentbytes << " adaptive_timeout " << adaptive_timeout / 1000 << " sleep_period " << sleep_period / 1000; + + if (sleep_period > 0) { rstime::rs_usleep(sleep_period); } } - From e6f2464bcb11db4efc0a8228e1d2149f11afb772 Mon Sep 17 00:00:00 2001 From: jolavillette Date: Fri, 2 Jan 2026 22:17:30 +0100 Subject: [PATCH 12/13] use adaptive timeout and sleep in pqithreadstreamer --- src/pqi/pqistreamer.cc | 19 ++++---- src/pqi/pqistreamer.h | 2 +- src/pqi/pqithreadstreamer.cc | 84 ++++++++++++++++-------------------- 3 files changed, 47 insertions(+), 58 deletions(-) diff --git a/src/pqi/pqistreamer.cc b/src/pqi/pqistreamer.cc index ec72133307..92e9d46e5d 100644 --- a/src/pqi/pqistreamer.cc +++ b/src/pqi/pqistreamer.cc @@ -359,7 +359,7 @@ int pqistreamer::status() // this method is overloaded by pqiqosstreamer void pqistreamer::locked_storeInOutputQueue(void *ptr,int,int) { - RsDbg() << "Storing packet " << std::hex << ptr << std::dec << " in outqueue."; + // RsDbg() << "Storing packet " << std::hex << ptr << std::dec << " in outqueue."; mOutPkts.push_back(ptr); } @@ -550,13 +550,13 @@ int pqistreamer::handleoutgoing_locked() if ((!(mBio->cansend(0))) || (maxbytes < sentbytes)) { -//#ifdef DEBUG_PQISTREAMER +#ifdef DEBUG_PQISTREAMER if (sentbytes > maxbytes) RsDbg() << "PQISTREAMER pqistreamer::handleoutgoing_locked() stopped sending max reached, sentbytes " << std::dec << sentbytes << " maxbytes " << maxbytes; else RsDbg() << "PQISTREAMER pqistreamer::handleoutgoing_locked() stopped sending bio not ready, sentbytes " << std::dec << sentbytes << " maxbytes " << maxbytes; -//#endif - return 0; +#endif + return sentbytes; } // send a out_pkt., else send out_data. unless there is a pending packet. The strategy is to // - grab as many packets as possible while below the optimal packet size, so as to allow some packing and decrease encryption padding overhead (suposeddly) @@ -699,12 +699,11 @@ int pqistreamer::handleoutgoing_locked() sent = true; } } + #ifdef DEBUG_PQISTREAMER - if(nsent > 0) - std::cerr << "nsent = " << nsent << ", total bytes=" << sentbytes << std::endl; -#endif if (sentbytes >0) - RsDbg() << "PQISTREAMER pqistreamer::handleoutgoing_locked() stopped, outqueue empty sentbytes " << std::dec << sentbytes << " maxbytes " << maxbytes; + RsDbg() << "PQISTREAMER pqistreamer::handleoutgoing_locked() stopped outqueue empty, nsent " << std::dec << nsent << " sentbytes " << sentbytes << " maxbytes " << maxbytes; +#endif return sentbytes; } @@ -1033,12 +1032,12 @@ int pqistreamer::handleincoming() if(maxin > readbytes && mBio->moretoread(0)) goto start_packet_read ; -//#ifdef DEBUG_PQISTREAMER +#ifdef DEBUG_PQISTREAMER if (readbytes > maxin) RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading max reached, readbytes " << std::dec << readbytes << " maxin " << maxin; else RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading no more to read, readbytes " << std::dec << readbytes << " maxin " << maxin; -//#endif +#endif return readbytes; } diff --git a/src/pqi/pqistreamer.h b/src/pqi/pqistreamer.h index 3cac83c5a8..0e7f6b8154 100644 --- a/src/pqi/pqistreamer.h +++ b/src/pqi/pqistreamer.h @@ -106,7 +106,6 @@ class pqistreamer: public PQInterface // Binary Interface for IO, initialisated at startup. BinInterface *mBio; unsigned int mBio_flags; // BIN_FLAGS_NO_CLOSE | BIN_FLAGS_NO_DELETE - std::list mOutPkts; // Cntrl / Search / Results queue private: int queue_outpqi_locked(RsItem *i,uint32_t& serialized_size); @@ -146,6 +145,7 @@ class pqistreamer: public PQInterface int mFailed_read_attempts ; // Temp Storage for transient data..... + std::list mOutPkts; // Cntrl / Search / Results queue std::list mIncoming; uint32_t mIncomingSize; // size of mIncoming. To avoid calling linear cost std::list::size() diff --git a/src/pqi/pqithreadstreamer.cc b/src/pqi/pqithreadstreamer.cc index ffbcf5880a..d067c7e3df 100644 --- a/src/pqi/pqithreadstreamer.cc +++ b/src/pqi/pqithreadstreamer.cc @@ -23,14 +23,21 @@ #include "pqi/pqithreadstreamer.h" #include +// for timeBeginPeriod #ifdef WINDOWS_SYS #include #include #endif -#define DEFAULT_STREAMER_TIMEOUT 5000 // 10 ms -#define DEFAULT_STREAMER_SLEEP 30000 // 30 ms -#define DEFAULT_STREAMER_IDLE_SLEEP 1000000 // 1 sec +#define STREAMER_TIMEOUT_MIN 0 // non blocking +#define STREAMER_TIMEOUT_DELTA 1000 // 1 ms +#define STREAMER_TIMEOUT_MAX 10000 // 10 ms + +#define STREAMER_SLEEP_MIN 1000 // 1 ms +#define STREAMER_SLEEP_DELTA 1000 // 1 ms +#define STREAMER_SLEEP_MAX 30000 // 30 ms + +#define DEFAULT_STREAMER_IDLE_SLEEP 1000000 // 1 sec // #define PQISTREAMER_DEBUG @@ -43,8 +50,8 @@ pqithreadstreamer::pqithreadstreamer(PQInterface *parent, RsSerialiser *rss, con // necessary for frequent polling and high-speed data transfer. timeBeginPeriod(1); #endif - mTimeout = DEFAULT_STREAMER_TIMEOUT; - mSleepPeriod = DEFAULT_STREAMER_SLEEP; + mTimeout = STREAMER_TIMEOUT_MAX; + mSleepPeriod = STREAMER_SLEEP_MAX; } bool pqithreadstreamer::RecvItem(RsItem *item) @@ -55,7 +62,7 @@ bool pqithreadstreamer::RecvItem(RsItem *item) int pqithreadstreamer::tick() { // pqithreadstreamer mutex lock is not needed here - // we will only check if the connection is active, and if not we will try to establish it + // we only check if the connection is active, and if not we will try to establish it tick_bio(); return 0; @@ -63,19 +70,16 @@ int pqithreadstreamer::tick() void pqithreadstreamer::threadTick() { - uint32_t recv_timeout = 0; - uint32_t sleep_period = 0; + static uint32_t recv_timeout = mTimeout; + static uint32_t sleep_period = mSleepPeriod; + static uint32_t readbytes = 0; + static uint32_t sentbytes = 0; bool isactive = false; - bool has_outgoing = false; // Locked section to safely read shared variables { RsStackMutex stack(mStreamerMtx); - recv_timeout = mTimeout; - sleep_period = mSleepPeriod; isactive = mBio->isactive(); - // Check if there are packets waiting in the outgoing queue - has_outgoing = !mOutPkts.empty(); } if (!isactive) @@ -86,58 +90,44 @@ void pqithreadstreamer::threadTick() updateRates(); - // ADAPTIVE TIMEOUT: - // If we have data to send, we force the receive timeout to 0. - // This ensures tick_recv returns immediately if no data is present, - // allowing tick_send to be called without waiting for the 10ms recv timeout. - uint32_t adaptive_timeout = has_outgoing ? 0 : recv_timeout; + // Adaptive timeout and sleep + // Check if any data was processed during previous cycle. + if (readbytes > 0 || sentbytes > 0) + { + // Activity detected: Switch to maximum reactivity immediately. + // This prevents the thread from blocking in the next receive call, + // ensuring fast throughput for data bursts. + recv_timeout = STREAMER_TIMEOUT_MIN; + sleep_period = STREAMER_SLEEP_MIN; + } + else + { + // No activity: Gradually increase the timeout and sleep to save CPU cycles. + if (recv_timeout < STREAMER_TIMEOUT_MAX) + recv_timeout += STREAMER_TIMEOUT_DELTA; + if (sleep_period < STREAMER_SLEEP_MAX) + sleep_period += STREAMER_SLEEP_DELTA; + } - // Fill incoming queue - int readbytes = 0; { RsStackMutex stack(mThreadMutex); - readbytes = tick_recv(adaptive_timeout); + readbytes = tick_recv(recv_timeout); } - bool activity = false; - // Process incoming items, move them to appropriate service queue or shortcut to fast service RsItem *incoming = NULL; while((incoming = GetItem())) { - activity = true; // Activity detected (Download) RecvItem(incoming); } // Parse outgoing queue and send items - int sentbytes = 0; { RsStackMutex stack(mThreadMutex); sentbytes = tick_send(0); } - // ADAPTIVE SLEEP: - // If data was moved in either direction, reset sleep to 1ms for max performance. - // Otherwise, gradually increase sleep up to 30ms to save CPU resources. - { - RsStackMutex stack(mStreamerMtx); - if (readbytes > 0 || sentbytes > 0) - { - mSleepPeriod = 1000; - } - else - { - // Increment sleep period up to the 30ms limit - if (mSleepPeriod < 30000) - { - mSleepPeriod += 1000; - } - } - sleep_period = mSleepPeriod; - } - - if (readbytes > 0 || sentbytes > 0 || adaptive_timeout == 0 || sleep_period == 30000) - RsDbg() << "PQISTREAMER pqithreadstreamer::threadTick() readbytes " << std::dec << readbytes << " sentbytes " << sentbytes << " adaptive_timeout " << adaptive_timeout / 1000 << " sleep_period " << sleep_period / 1000; + // RsDbg() << "PQISTREAMER pqithreadstreamer::threadTick() recv_timeout " << std::dec << recv_timeout / 1000 << " sleep_period " << sleep_period / 1000 << " readbytes " << readbytes << " sentbytes " << sentbytes; if (sleep_period > 0) { From 1fb1b7ad1a97fb06fe9358d3f86580d5a9c77951 Mon Sep 17 00:00:00 2001 From: jolavillette Date: Sun, 4 Jan 2026 14:27:12 +0100 Subject: [PATCH 13/13] revert to fixed TCP buffer size of 512 KB in windows, plus bug and logic fixes --- src/pqi/pqissllistener.cc | 2 - src/pqi/pqistreamer.cc | 86 +++++++++++++++++++++++++++++------- src/pqi/pqithreadstreamer.cc | 48 ++++++++++---------- 3 files changed, 94 insertions(+), 42 deletions(-) diff --git a/src/pqi/pqissllistener.cc b/src/pqi/pqissllistener.cc index 4c353e57de..7e027c5a98 100644 --- a/src/pqi/pqissllistener.cc +++ b/src/pqi/pqissllistener.cc @@ -233,7 +233,6 @@ int pqissllistenbase::setuplisten() "pqissllistenbase::setuplisten() Bound to Address."); } -/* #ifdef WINDOWS_SYS // Set TCP buffer size for Windows systems @@ -275,7 +274,6 @@ int pqissllistenbase::setuplisten() std::cerr << "pqissllistenbase::setuplisten: Error setting TCP send buffer size. Error " << err << std::endl; } #endif -*/ if (0 != (err = listen(lsock, 100))) { diff --git a/src/pqi/pqistreamer.cc b/src/pqi/pqistreamer.cc index 92e9d46e5d..ca7c363827 100644 --- a/src/pqi/pqistreamer.cc +++ b/src/pqi/pqistreamer.cc @@ -38,6 +38,8 @@ #include "util/rsprint.h" // for BinToHex #include "util/rsstring.h" // for rs_sprintf_append, rs_sprintf +#include + //#define DEBUG_PQISTREAMER 1 static struct RsLog::logInfo pqistreamerzoneInfo = {RsLog::Default, "pqistreamer"}; @@ -304,16 +306,29 @@ int pqistreamer::tick_bio() int pqistreamer::tick_recv(uint32_t timeout) { + if(!(mBio->isactive())) + { +// RsDbg() << "PQISTREAMER pqistreamer::tick_recv mBio->isactive false"; + RsStackMutex stack(mStreamerMtx); + free_pend(); + return 0; + } + int readbytes = 0; if (mBio->moretoread(timeout)) { readbytes = handleincoming(); } +// else +// RsDbg() << "PQISTREAMER pqistreamer::tick_recv mBios->moretoread() false"; + if(!(mBio->isactive())) { +// RsDbg() << "PQISTREAMER pqistreamer::tick_recv mBio->isactive false"; RsStackMutex stack(mStreamerMtx); free_pend(); } + return readbytes; } @@ -322,18 +337,28 @@ int pqistreamer::tick_send(uint32_t timeout) /* short circuit everything if bio isn't active */ if (!(mBio->isactive())) { +// RsDbg() << "PQISTREAMER pqistreamer::tick_send mBio->isactive false"; RsStackMutex stack(mStreamerMtx); free_pend(); return 0; } - int sentbytes; + int sentbytes = 0; if (mBio->cansend(timeout)) { - RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ + RsStackMutex stack(mStreamerMtx); sentbytes = handleoutgoing_locked(); } +// else +// RsDbg() << "PQISTREAMER pqistreamer::tick_send mBio->cansend false"; + if (!(mBio->isactive())) + { +// RsDbg() << "PQISTREAMER pqistreamer::tick_send mBio->isactive false"; + RsStackMutex stack(mStreamerMtx); + free_pend(); + } + return sentbytes; } @@ -537,6 +562,7 @@ int pqistreamer::handleoutgoing_locked() mPkt_wpending_size = 0 ; } +// RsDbg() << "PQISTREAMER pqistreamer::handleoutgoing_locked() stopped mBio->isactive() false"; return 0; } @@ -614,10 +640,11 @@ int pqistreamer::handleoutgoing_locked() { if(slice_size > 0xffff || !mAcceptsPacketSlicing) { - std::cerr << "(EE) protocol error in pqitreamer: slice size is too large and cannot be encoded." ; + std::cerr << "(EE) protocol error in pqistreamer: slice size is too large and cannot be encoded." ; free(mPkt_wpending) ; mPkt_wpending_size = 0; - return -1 ; +// RsDbg() << "PQISTREAMER pqistreamer::handleoutgoing_locked() stopped error slice size is too large"; + return sentbytes ; } #ifdef DEBUG_PACKET_SLICING std::cerr << "sending partial slice, packet ID=" << std::hex << slice_packet_id << std::dec << ", size=" << slice_size << std::endl; @@ -674,7 +701,8 @@ int pqistreamer::handleoutgoing_locked() // pkt_wpending will kept til next time. // ensuring exactly the same data is written (openSSL requirement). - return -1; +// RsDbg() << "PQISTREAMER pqistreamer::handleoutgoing_locked() stopped sending failed only " << std::dec << ss << " bytes out of " << mPkt_wpending_size; + return sentbytes; } #ifdef DEBUG_PQISTREAMER else @@ -700,10 +728,10 @@ int pqistreamer::handleoutgoing_locked() } } -#ifdef DEBUG_PQISTREAMER - if (sentbytes >0) - RsDbg() << "PQISTREAMER pqistreamer::handleoutgoing_locked() stopped outqueue empty, nsent " << std::dec << nsent << " sentbytes " << sentbytes << " maxbytes " << maxbytes; -#endif +//#ifdef DEBUG_PQISTREAMER +// if (sentbytes >0) +// RsDbg() << "PQISTREAMER pqistreamer::handleoutgoing_locked() stopped outqueue empty, nsent " << std::dec << nsent << " sentbytes " << sentbytes << " maxbytes " << maxbytes; +//#endif return sentbytes; } @@ -725,6 +753,7 @@ int pqistreamer::handleincoming() RsStackMutex stack(mStreamerMtx); mReading_state = reading_state_initial ; free_pend(); +// RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading bio->isactive() false"; return 0; } else @@ -769,7 +798,8 @@ int pqistreamer::handleincoming() pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "pqistreamer::handleincoming() read blocked"); std::cerr << "[" << (void*)pthread_self() << "] " << "given up 1" << std::endl ; #endif - return 0; +// RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading read blocked"; + return readbytes; } else if (tmplen < 0) { @@ -780,7 +810,8 @@ int pqistreamer::handleincoming() #ifdef DEBUG_PQISTREAMER std::cerr << "[" << (void*)pthread_self() << "] " << "given up 2, state = " << mReading_state << std::endl ; #endif - return 0; +// RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading error in bio read"; + return readbytes; } else // tmplen > 0 { @@ -792,7 +823,8 @@ int pqistreamer::handleincoming() std::cerr << "[" << (void*)pthread_self() << "] " << "given up 3" << std::endl ; #endif - return -1; +// RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading strange reading of " << std::dec << tmplen; + return readbytes; } } #ifdef DEBUG_PQISTREAMER @@ -813,7 +845,8 @@ int pqistreamer::handleincoming() #endif mReading_state = reading_state_initial ; // restart at state 1. mFailed_read_attempts = 0 ; - return 0; +// RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading enabled packet slicing"; + return readbytes; } } continue_packet: @@ -893,7 +926,9 @@ int pqistreamer::handleincoming() mBio->close(); mReading_state = reading_state_initial ; // restart at state 1. mFailed_read_attempts = 0 ; - return -1; + +// RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading read packet too big"; + return readbytes; // Used to exit now! exit(1); } @@ -948,14 +983,16 @@ int pqistreamer::handleincoming() mBio->close(); mReading_state = reading_state_initial ; // restart at state 1. mFailed_read_attempts = 0 ; - return -1; +// RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading unexpected read error"; + return readbytes; } else { #ifdef DEBUG_PQISTREAMER std::cerr << "[" << (void*)pthread_self() << "] " << "given up 5, state = " << mReading_state << std::endl ; #endif - return 0 ; // this is just a SSL_WANT_READ error. Don't panic, we'll re-try the read soon. +// RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading too many read fail"; + return readbytes ; // this is just a SSL_WANT_READ error. Don't panic, we'll re-try the read soon. // we assume readdata() returned either -1 or the complete read size. } } @@ -1184,6 +1221,13 @@ int pqistreamer::outAllowedBytes_locked() RsDbg() << "PQISTREAMER pqistreamer::outAllowedBytes_locked() dt " << std::dec << (int)(1000 * dt) << "ms, mAvgDtOut " << (int)(1000 * mAvgDtOut) << "ms, maxout " << (int)(maxout) << " bytes/s, mCurrSent " << mCurrSent << " bytes, quota " << (int)(quota) << " bytes"; #endif +/* RsDbg() << "PQISTREAMER pqistreamer::outAllowedBytes_locked() dt " + << std::fixed << std::setprecision(1) << (1000.0 * dt) << "ms, mAvgDtOut " + << (1000.0 * mAvgDtOut) << "ms, maxout " + << (int)(maxout) << " bytes/s, mCurrSent " + << mCurrSent << " bytes, quota " + << (int)(quota) << " bytes"; +*/ return quota; } @@ -1223,12 +1267,20 @@ int pqistreamer::inAllowedBytes() // we now calculate the max amount of data allowed to be received during the next round // we take into account the excess/deficit of the previous round - double quota = mAvgDtIn * maxin - mCurrRead; +// double quota = mAvgDtIn * maxin - mCurrRead; + double quota = mAvgDtIn * maxin; #ifdef DEBUG_PQISTREAMER RsDbg() << "PQISTREAMER pqistreamer::inAllowedBytes() dt " << std::dec << (int)(1000 * dt) << "ms, mAvgDtIn " << (int)(1000 * mAvgDtIn) << "ms, maxin " << (int)(maxin) << " bytes/s, mCurrRead " << mCurrRead << " bytes, quota " << (int)(quota) << " bytes"; #endif +/* RsDbg() << "PQISTREAMER pqistreamer::inAllowedBytes() dt " + << std::fixed << std::setprecision(1) << (1000.0 * dt) << "ms, mAvgDtIn " + << (1000.0 * mAvgDtIn) << "ms, maxin " + << (int)(maxin) << " bytes/s, mCurrRead " + << mCurrRead << " bytes, quota " + << (int)(quota) << " bytes"; +*/ return quota; } diff --git a/src/pqi/pqithreadstreamer.cc b/src/pqi/pqithreadstreamer.cc index d067c7e3df..b99d5bcfc5 100644 --- a/src/pqi/pqithreadstreamer.cc +++ b/src/pqi/pqithreadstreamer.cc @@ -72,8 +72,8 @@ void pqithreadstreamer::threadTick() { static uint32_t recv_timeout = mTimeout; static uint32_t sleep_period = mSleepPeriod; - static uint32_t readbytes = 0; - static uint32_t sentbytes = 0; + uint32_t readbytes = 0; + uint32_t sentbytes = 0; bool isactive = false; // Locked section to safely read shared variables @@ -82,39 +82,23 @@ void pqithreadstreamer::threadTick() isactive = mBio->isactive(); } + // Long sleep if connection is not active if (!isactive) { +// RsDbg() << "PQISTREAMER pqithreadstreamer::threadTick() mBio->isactive() false long sleep"; rstime::rs_usleep(DEFAULT_STREAMER_IDLE_SLEEP); return ; } updateRates(); - // Adaptive timeout and sleep - // Check if any data was processed during previous cycle. - if (readbytes > 0 || sentbytes > 0) - { - // Activity detected: Switch to maximum reactivity immediately. - // This prevents the thread from blocking in the next receive call, - // ensuring fast throughput for data bursts. - recv_timeout = STREAMER_TIMEOUT_MIN; - sleep_period = STREAMER_SLEEP_MIN; - } - else - { - // No activity: Gradually increase the timeout and sleep to save CPU cycles. - if (recv_timeout < STREAMER_TIMEOUT_MAX) - recv_timeout += STREAMER_TIMEOUT_DELTA; - if (sleep_period < STREAMER_SLEEP_MAX) - sleep_period += STREAMER_SLEEP_DELTA; - } - + // Fill incoming queue { RsStackMutex stack(mThreadMutex); readbytes = tick_recv(recv_timeout); } - // Process incoming items, move them to appropriate service queue or shortcut to fast service + // Process incoming items, move them to relevant service queue or shortcut to fast service RsItem *incoming = NULL; while((incoming = GetItem())) { @@ -127,7 +111,25 @@ void pqithreadstreamer::threadTick() sentbytes = tick_send(0); } - // RsDbg() << "PQISTREAMER pqithreadstreamer::threadTick() recv_timeout " << std::dec << recv_timeout / 1000 << " sleep_period " << sleep_period / 1000 << " readbytes " << readbytes << " sentbytes " << sentbytes; + // Adaptive timeout and sleep + // Check if any data was processed during previous cycle + if (readbytes > 0 || sentbytes > 0) + { + // Activity detected: switch to maximum reactivity immediately + // This ensure fast throughput for data bursts + recv_timeout = STREAMER_TIMEOUT_MIN; + sleep_period = STREAMER_SLEEP_MIN; + } + else + { + // No activity: gradually increase the timeout and sleep to save CPU cycles + if (recv_timeout < STREAMER_TIMEOUT_MAX) + recv_timeout += STREAMER_TIMEOUT_DELTA; + if (sleep_period < STREAMER_SLEEP_MAX) + sleep_period += STREAMER_SLEEP_DELTA; + } + +// RsDbg() << "PQISTREAMER pqithreadstreamer::threadTick() recv_timeout " << std::dec << recv_timeout / 1000 << " sleep_period " << sleep_period / 1000 << " readbytes " << readbytes << " sentbytes " << sentbytes; if (sleep_period > 0) {