diff --git a/src/ft/ftfilecreator.cc b/src/ft/ftfilecreator.cc index 53a1036b4..3d592a8d2 100644 --- a/src/ft/ftfilecreator.cc +++ b/src/ft/ftfilecreator.cc @@ -40,7 +40,7 @@ ******/ #define CHUNK_MAX_AGE 120 -#define MAX_FTCHUNKS_PER_PEER 20 +#define MAX_FTCHUNKS_PER_PEER 40 /*********************************************************** * diff --git a/src/ft/ftserver.cc b/src/ft/ftserver.cc index 980f231fc..f4859494f 100644 --- a/src/ft/ftserver.cc +++ b/src/ft/ftserver.cc @@ -1256,8 +1256,7 @@ bool ftServer::sendData(const RsPeerId& peerId, const RsFileHash& hash, uint64_t { //static const uint32_t MAX_FT_CHUNK = 32 * 1024; /* 32K */ //static const uint32_t MAX_FT_CHUNK = 16 * 1024; /* 16K */ - // - static const uint32_t MAX_FT_CHUNK = 8 * 1024; /* 16K */ + static const uint32_t MAX_FT_CHUNK = 240 * 1024; /* 240K */ /* workout size */ chunk = MAX_FT_CHUNK; diff --git a/src/ft/fttransfermodule.cc b/src/ft/fttransfermodule.cc index 8c6ae1f08..839c96f90 100644 --- a/src/ft/fttransfermodule.cc +++ b/src/ft/fttransfermodule.cc @@ -741,7 +741,7 @@ bool ftTransferModule::locked_tickPeerTransfer(peerInfo &info) #endif /* update rate */ - if( (info.lastTransfers > 0 && ageReq > 0) || ageReq > 2) + if( (info.lastTransfers > 0 && ageReq > 0) || ageReq > 2) { info.actualRate = info.actualRate * 0.75 + 0.25 * info.lastTransfers / (float)ageReq; info.lastTransfers = 0; @@ -822,7 +822,7 @@ bool ftTransferModule::locked_tickPeerTransfer(peerInfo &info) std::cerr << "locked_tickPeerTransfer() desired next_req: " << next_req; std::cerr << std::endl; #endif - + /* do request */ uint64_t req_offset = 0; uint32_t req_size =0 ; diff --git a/src/pqi/pqissllistener.cc b/src/pqi/pqissllistener.cc index 76e58acf6..7e027c5a9 100644 --- a/src/pqi/pqissllistener.cc +++ b/src/pqi/pqissllistener.cc @@ -234,7 +234,7 @@ int pqissllistenbase::setuplisten() } #ifdef WINDOWS_SYS - /* Set TCP buffer size for Windows systems */ + // Set TCP buffer size for Windows systems int sockbufsize = 0; int size = sizeof(int); diff --git a/src/pqi/pqistreamer.cc b/src/pqi/pqistreamer.cc index ff242f363..ca7c36382 100644 --- a/src/pqi/pqistreamer.cc +++ b/src/pqi/pqistreamer.cc @@ -38,6 +38,8 @@ #include "util/rsprint.h" // for BinToHex #include "util/rsstring.h" // for rs_sprintf_append, rs_sprintf +#include + //#define DEBUG_PQISTREAMER 1 static struct RsLog::logInfo pqistreamerzoneInfo = {RsLog::Default, "pqistreamer"}; @@ -48,7 +50,7 @@ static const int PQISTREAM_AVG_PERIOD = 1; // update speed estimate every static const float PQISTREAM_AVG_FRAC = 0.8; // for bandpass filter over speed estimate. static const float PQISTREAM_AVG_DT_FRAC = 0.99; // for low pass filter over elapsed time -static const int PQISTREAM_OPTIMAL_PACKET_SIZE = 512; // It is believed that this value should be lower than TCP slices and large enough as compare to encryption padding. +static const int PQISTREAM_OPTIMAL_PACKET_SIZE = 1400; // It is believed that this value should be lower than TCP slices and large enough as compare to encryption padding. // most importantly, it should be constant, so as to allow correct QoS. static const int PQISTREAM_SLICE_FLAG_STARTS = 0x01; // static const int PQISTREAM_SLICE_FLAG_ENDS = 0x02; // these flags should be kept in the range 0x01-0x08 @@ -63,7 +65,7 @@ static const int PQISTREAM_PACKET_SLICING_PROBE_DELAY = 60; // send every 6 static uint8_t PACKET_SLICING_PROBE_BYTES[8] = { 0x02, 0xaa, 0xbb, 0xcc, 0x00, 0x00, 0x00, 0x08 } ; /* Change to true to disable packet slicing and/or packet grouping, if needed */ -#define DISABLE_PACKET_SLICING false +#define DISABLE_PACKET_SLICING false #define DISABLE_PACKET_GROUPING false /* This removes the print statements (which hammer pqidebug) */ @@ -304,16 +306,30 @@ int pqistreamer::tick_bio() int pqistreamer::tick_recv(uint32_t timeout) { + if(!(mBio->isactive())) + { +// RsDbg() << "PQISTREAMER pqistreamer::tick_recv mBio->isactive false"; + RsStackMutex stack(mStreamerMtx); + free_pend(); + return 0; + } + + int readbytes = 0; if (mBio->moretoread(timeout)) { - handleincoming(); + readbytes = handleincoming(); } +// else +// RsDbg() << "PQISTREAMER pqistreamer::tick_recv mBios->moretoread() false"; + if(!(mBio->isactive())) { +// RsDbg() << "PQISTREAMER pqistreamer::tick_recv mBio->isactive false"; RsStackMutex stack(mStreamerMtx); free_pend(); } - return 1; + + return readbytes; } int pqistreamer::tick_send(uint32_t timeout) @@ -321,18 +337,29 @@ int pqistreamer::tick_send(uint32_t timeout) /* short circuit everything if bio isn't active */ if (!(mBio->isactive())) { +// RsDbg() << "PQISTREAMER pqistreamer::tick_send mBio->isactive false"; RsStackMutex stack(mStreamerMtx); free_pend(); return 0; } + int sentbytes = 0; if (mBio->cansend(timeout)) { - RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ - handleoutgoing_locked(); + RsStackMutex stack(mStreamerMtx); + sentbytes = handleoutgoing_locked(); } +// else +// RsDbg() << "PQISTREAMER pqistreamer::tick_send mBio->cansend false"; - return 1; + if (!(mBio->isactive())) + { +// RsDbg() << "PQISTREAMER pqistreamer::tick_send mBio->isactive false"; + RsStackMutex stack(mStreamerMtx); + free_pend(); + } + + return sentbytes; } int pqistreamer::status() @@ -357,7 +384,7 @@ int pqistreamer::status() // this method is overloaded by pqiqosstreamer void pqistreamer::locked_storeInOutputQueue(void *ptr,int,int) { - RsDbg() << "Storing packet " << std::hex << ptr << std::dec << " in outqueue."; + // RsDbg() << "Storing packet " << std::hex << ptr << std::dec << " in outqueue."; mOutPkts.push_back(ptr); } @@ -535,6 +562,7 @@ int pqistreamer::handleoutgoing_locked() mPkt_wpending_size = 0 ; } +// RsDbg() << "PQISTREAMER pqistreamer::handleoutgoing_locked() stopped mBio->isactive() false"; return 0; } @@ -554,7 +582,7 @@ int pqistreamer::handleoutgoing_locked() else RsDbg() << "PQISTREAMER pqistreamer::handleoutgoing_locked() stopped sending bio not ready, sentbytes " << std::dec << sentbytes << " maxbytes " << maxbytes; #endif - return 0; + return sentbytes; } // send a out_pkt., else send out_data. unless there is a pending packet. The strategy is to // - grab as many packets as possible while below the optimal packet size, so as to allow some packing and decrease encryption padding overhead (suposeddly) @@ -612,10 +640,11 @@ int pqistreamer::handleoutgoing_locked() { if(slice_size > 0xffff || !mAcceptsPacketSlicing) { - std::cerr << "(EE) protocol error in pqitreamer: slice size is too large and cannot be encoded." ; + std::cerr << "(EE) protocol error in pqistreamer: slice size is too large and cannot be encoded." ; free(mPkt_wpending) ; mPkt_wpending_size = 0; - return -1 ; +// RsDbg() << "PQISTREAMER pqistreamer::handleoutgoing_locked() stopped error slice size is too large"; + return sentbytes ; } #ifdef DEBUG_PACKET_SLICING std::cerr << "sending partial slice, packet ID=" << std::hex << slice_packet_id << std::dec << ", size=" << slice_size << std::endl; @@ -672,7 +701,8 @@ int pqistreamer::handleoutgoing_locked() // pkt_wpending will kept til next time. // ensuring exactly the same data is written (openSSL requirement). - return -1; +// RsDbg() << "PQISTREAMER pqistreamer::handleoutgoing_locked() stopped sending failed only " << std::dec << ss << " bytes out of " << mPkt_wpending_size; + return sentbytes; } #ifdef DEBUG_PQISTREAMER else @@ -697,11 +727,13 @@ int pqistreamer::handleoutgoing_locked() sent = true; } } -#ifdef DEBUG_PQISTREAMER - if(nsent > 0) - std::cerr << "nsent = " << nsent << ", total bytes=" << sentbytes << std::endl; -#endif - return 1; + +//#ifdef DEBUG_PQISTREAMER +// if (sentbytes >0) +// RsDbg() << "PQISTREAMER pqistreamer::handleoutgoing_locked() stopped outqueue empty, nsent " << std::dec << nsent << " sentbytes " << sentbytes << " maxbytes " << maxbytes; +//#endif + + return sentbytes; } @@ -721,6 +753,7 @@ int pqistreamer::handleincoming() RsStackMutex stack(mStreamerMtx); mReading_state = reading_state_initial ; free_pend(); +// RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading bio->isactive() false"; return 0; } else @@ -765,7 +798,8 @@ int pqistreamer::handleincoming() pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "pqistreamer::handleincoming() read blocked"); std::cerr << "[" << (void*)pthread_self() << "] " << "given up 1" << std::endl ; #endif - return 0; +// RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading read blocked"; + return readbytes; } else if (tmplen < 0) { @@ -776,7 +810,8 @@ int pqistreamer::handleincoming() #ifdef DEBUG_PQISTREAMER std::cerr << "[" << (void*)pthread_self() << "] " << "given up 2, state = " << mReading_state << std::endl ; #endif - return 0; +// RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading error in bio read"; + return readbytes; } else // tmplen > 0 { @@ -788,7 +823,8 @@ int pqistreamer::handleincoming() std::cerr << "[" << (void*)pthread_self() << "] " << "given up 3" << std::endl ; #endif - return -1; +// RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading strange reading of " << std::dec << tmplen; + return readbytes; } } #ifdef DEBUG_PQISTREAMER @@ -809,7 +845,8 @@ int pqistreamer::handleincoming() #endif mReading_state = reading_state_initial ; // restart at state 1. mFailed_read_attempts = 0 ; - return 0; +// RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading enabled packet slicing"; + return readbytes; } } continue_packet: @@ -889,7 +926,9 @@ int pqistreamer::handleincoming() mBio->close(); mReading_state = reading_state_initial ; // restart at state 1. mFailed_read_attempts = 0 ; - return -1; + +// RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading read packet too big"; + return readbytes; // Used to exit now! exit(1); } @@ -944,14 +983,16 @@ int pqistreamer::handleincoming() mBio->close(); mReading_state = reading_state_initial ; // restart at state 1. mFailed_read_attempts = 0 ; - return -1; +// RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading unexpected read error"; + return readbytes; } else { #ifdef DEBUG_PQISTREAMER std::cerr << "[" << (void*)pthread_self() << "] " << "given up 5, state = " << mReading_state << std::endl ; #endif - return 0 ; // this is just a SSL_WANT_READ error. Don't panic, we'll re-try the read soon. +// RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading too many read fail"; + return readbytes ; // this is just a SSL_WANT_READ error. Don't panic, we'll re-try the read soon. // we assume readdata() returned either -1 or the complete read size. } } @@ -1035,7 +1076,7 @@ int pqistreamer::handleincoming() RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading no more to read, readbytes " << std::dec << readbytes << " maxin " << maxin; #endif - return 0; + return readbytes; } RsItem *pqistreamer::addPartialPacket(const void *block, uint32_t len, uint32_t slice_packet_id, bool is_packet_starting, bool is_packet_ending, uint32_t &total_len) @@ -1180,6 +1221,13 @@ int pqistreamer::outAllowedBytes_locked() RsDbg() << "PQISTREAMER pqistreamer::outAllowedBytes_locked() dt " << std::dec << (int)(1000 * dt) << "ms, mAvgDtOut " << (int)(1000 * mAvgDtOut) << "ms, maxout " << (int)(maxout) << " bytes/s, mCurrSent " << mCurrSent << " bytes, quota " << (int)(quota) << " bytes"; #endif +/* RsDbg() << "PQISTREAMER pqistreamer::outAllowedBytes_locked() dt " + << std::fixed << std::setprecision(1) << (1000.0 * dt) << "ms, mAvgDtOut " + << (1000.0 * mAvgDtOut) << "ms, maxout " + << (int)(maxout) << " bytes/s, mCurrSent " + << mCurrSent << " bytes, quota " + << (int)(quota) << " bytes"; +*/ return quota; } @@ -1219,12 +1267,20 @@ int pqistreamer::inAllowedBytes() // we now calculate the max amount of data allowed to be received during the next round // we take into account the excess/deficit of the previous round - double quota = mAvgDtIn * maxin - mCurrRead; +// double quota = mAvgDtIn * maxin - mCurrRead; + double quota = mAvgDtIn * maxin; #ifdef DEBUG_PQISTREAMER RsDbg() << "PQISTREAMER pqistreamer::inAllowedBytes() dt " << std::dec << (int)(1000 * dt) << "ms, mAvgDtIn " << (int)(1000 * mAvgDtIn) << "ms, maxin " << (int)(maxin) << " bytes/s, mCurrRead " << mCurrRead << " bytes, quota " << (int)(quota) << " bytes"; #endif +/* RsDbg() << "PQISTREAMER pqistreamer::inAllowedBytes() dt " + << std::fixed << std::setprecision(1) << (1000.0 * dt) << "ms, mAvgDtIn " + << (1000.0 * mAvgDtIn) << "ms, maxin " + << (int)(maxin) << " bytes/s, mCurrRead " + << mCurrRead << " bytes, quota " + << (int)(quota) << " bytes"; +*/ return quota; } diff --git a/src/pqi/pqithreadstreamer.cc b/src/pqi/pqithreadstreamer.cc index 6745dc864..b99d5bcfc 100644 --- a/src/pqi/pqithreadstreamer.cc +++ b/src/pqi/pqithreadstreamer.cc @@ -23,17 +23,35 @@ #include "pqi/pqithreadstreamer.h" #include -#define DEFAULT_STREAMER_TIMEOUT 10000 // 10 ms -#define DEFAULT_STREAMER_SLEEP 30000 // 30 ms -#define DEFAULT_STREAMER_IDLE_SLEEP 1000000 // 1 sec +// for timeBeginPeriod +#ifdef WINDOWS_SYS +#include +#include +#endif + +#define STREAMER_TIMEOUT_MIN 0 // non blocking +#define STREAMER_TIMEOUT_DELTA 1000 // 1 ms +#define STREAMER_TIMEOUT_MAX 10000 // 10 ms + +#define STREAMER_SLEEP_MIN 1000 // 1 ms +#define STREAMER_SLEEP_DELTA 1000 // 1 ms +#define STREAMER_SLEEP_MAX 30000 // 30 ms + +#define DEFAULT_STREAMER_IDLE_SLEEP 1000000 // 1 sec // #define PQISTREAMER_DEBUG pqithreadstreamer::pqithreadstreamer(PQInterface *parent, RsSerialiser *rss, const RsPeerId& id, BinInterface *bio_in, int bio_flags_in) :pqistreamer(rss, id, bio_in, bio_flags_in), mParent(parent), mTimeout(0), mThreadMutex("pqithreadstreamer") { - mTimeout = DEFAULT_STREAMER_TIMEOUT; - mSleepPeriod = DEFAULT_STREAMER_SLEEP; +#ifdef WINDOWS_SYS + // On Windows, the default system timer resolution is around 15 ms. + // This call allows for sleep durations of less than 15 ms, which is + // necessary for frequent polling and high-speed data transfer. + timeBeginPeriod(1); +#endif + mTimeout = STREAMER_TIMEOUT_MAX; + mSleepPeriod = STREAMER_SLEEP_MAX; } bool pqithreadstreamer::RecvItem(RsItem *item) @@ -44,58 +62,77 @@ bool pqithreadstreamer::RecvItem(RsItem *item) int pqithreadstreamer::tick() { // pqithreadstreamer mutex lock is not needed here - // we will only check if the connection is active, and if not we will try to establish it + // we only check if the connection is active, and if not we will try to establish it tick_bio(); return 0; } -void pqithreadstreamer::threadTick() +void pqithreadstreamer::threadTick() { - uint32_t recv_timeout = 0; - uint32_t sleep_period = 0; + static uint32_t recv_timeout = mTimeout; + static uint32_t sleep_period = mSleepPeriod; + uint32_t readbytes = 0; + uint32_t sentbytes = 0; bool isactive = false; + // Locked section to safely read shared variables { RsStackMutex stack(mStreamerMtx); - recv_timeout = mTimeout; - sleep_period = mSleepPeriod; isactive = mBio->isactive(); } - // update the connection rates - updateRates() ; - - // if the connection est not active, long sleep then return + // Long sleep if connection is not active if (!isactive) { +// RsDbg() << "PQISTREAMER pqithreadstreamer::threadTick() mBio->isactive() false long sleep"; rstime::rs_usleep(DEFAULT_STREAMER_IDLE_SLEEP); return ; } - // fill incoming queue with items from SSL + updateRates(); + + // Fill incoming queue { RsStackMutex stack(mThreadMutex); - tick_recv(recv_timeout); + readbytes = tick_recv(recv_timeout); } - // move items to appropriate service queue or shortcut to fast service + // Process incoming items, move them to relevant service queue or shortcut to fast service RsItem *incoming = NULL; while((incoming = GetItem())) { RecvItem(incoming); } - // parse the outgoing queue and send items to SSL + // Parse outgoing queue and send items { RsStackMutex stack(mThreadMutex); - tick_send(0); + sentbytes = tick_send(0); } - // sleep - if (sleep_period) + // Adaptive timeout and sleep + // Check if any data was processed during previous cycle + if (readbytes > 0 || sentbytes > 0) + { + // Activity detected: switch to maximum reactivity immediately + // This ensure fast throughput for data bursts + recv_timeout = STREAMER_TIMEOUT_MIN; + sleep_period = STREAMER_SLEEP_MIN; + } + else + { + // No activity: gradually increase the timeout and sleep to save CPU cycles + if (recv_timeout < STREAMER_TIMEOUT_MAX) + recv_timeout += STREAMER_TIMEOUT_DELTA; + if (sleep_period < STREAMER_SLEEP_MAX) + sleep_period += STREAMER_SLEEP_DELTA; + } + +// RsDbg() << "PQISTREAMER pqithreadstreamer::threadTick() recv_timeout " << std::dec << recv_timeout / 1000 << " sleep_period " << sleep_period / 1000 << " readbytes " << readbytes << " sentbytes " << sentbytes; + + if (sleep_period > 0) { rstime::rs_usleep(sleep_period); } } -