Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/ft/ftfilecreator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
******/

#define CHUNK_MAX_AGE 120
#define MAX_FTCHUNKS_PER_PEER 20
#define MAX_FTCHUNKS_PER_PEER 40

/***********************************************************
*
Expand Down
3 changes: 1 addition & 2 deletions src/ft/ftserver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1256,8 +1256,7 @@ bool ftServer::sendData(const RsPeerId& peerId, const RsFileHash& hash, uint64_t
{
//static const uint32_t MAX_FT_CHUNK = 32 * 1024; /* 32K */
//static const uint32_t MAX_FT_CHUNK = 16 * 1024; /* 16K */
//
static const uint32_t MAX_FT_CHUNK = 8 * 1024; /* 16K */
static const uint32_t MAX_FT_CHUNK = 240 * 1024; /* 240K */

/* workout size */
chunk = MAX_FT_CHUNK;
Expand Down
4 changes: 2 additions & 2 deletions src/ft/fttransfermodule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,7 @@ bool ftTransferModule::locked_tickPeerTransfer(peerInfo &info)
#endif
/* update rate */

if( (info.lastTransfers > 0 && ageReq > 0) || ageReq > 2)
if( (info.lastTransfers > 0 && ageReq > 0) || ageReq > 2)
{
info.actualRate = info.actualRate * 0.75 + 0.25 * info.lastTransfers / (float)ageReq;
info.lastTransfers = 0;
Expand Down Expand Up @@ -822,7 +822,7 @@ bool ftTransferModule::locked_tickPeerTransfer(peerInfo &info)
std::cerr << "locked_tickPeerTransfer() desired next_req: " << next_req;
std::cerr << std::endl;
#endif

/* do request */
uint64_t req_offset = 0;
uint32_t req_size =0 ;
Expand Down
2 changes: 1 addition & 1 deletion src/pqi/pqissllistener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ int pqissllistenbase::setuplisten()
}

#ifdef WINDOWS_SYS
/* Set TCP buffer size for Windows systems */
// Set TCP buffer size for Windows systems

int sockbufsize = 0;
int size = sizeof(int);
Expand Down
108 changes: 82 additions & 26 deletions src/pqi/pqistreamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
#include "util/rsprint.h" // for BinToHex
#include "util/rsstring.h" // for rs_sprintf_append, rs_sprintf

#include <iomanip>

//#define DEBUG_PQISTREAMER 1

static struct RsLog::logInfo pqistreamerzoneInfo = {RsLog::Default, "pqistreamer"};
Expand All @@ -48,7 +50,7 @@ static const int PQISTREAM_AVG_PERIOD = 1; // update speed estimate every
static const float PQISTREAM_AVG_FRAC = 0.8; // for bandpass filter over speed estimate.
static const float PQISTREAM_AVG_DT_FRAC = 0.99; // for low pass filter over elapsed time

static const int PQISTREAM_OPTIMAL_PACKET_SIZE = 512; // It is believed that this value should be lower than TCP slices and large enough as compare to encryption padding.
static const int PQISTREAM_OPTIMAL_PACKET_SIZE = 1400; // It is believed that this value should be lower than TCP slices and large enough as compare to encryption padding.
// most importantly, it should be constant, so as to allow correct QoS.
static const int PQISTREAM_SLICE_FLAG_STARTS = 0x01; //
static const int PQISTREAM_SLICE_FLAG_ENDS = 0x02; // these flags should be kept in the range 0x01-0x08
Expand All @@ -63,7 +65,7 @@ static const int PQISTREAM_PACKET_SLICING_PROBE_DELAY = 60; // send every 6
static uint8_t PACKET_SLICING_PROBE_BYTES[8] = { 0x02, 0xaa, 0xbb, 0xcc, 0x00, 0x00, 0x00, 0x08 } ;

/* Change to true to disable packet slicing and/or packet grouping, if needed */
#define DISABLE_PACKET_SLICING false
#define DISABLE_PACKET_SLICING false
#define DISABLE_PACKET_GROUPING false

/* This removes the print statements (which hammer pqidebug) */
Expand Down Expand Up @@ -304,35 +306,60 @@ int pqistreamer::tick_bio()

int pqistreamer::tick_recv(uint32_t timeout)
{
if(!(mBio->isactive()))
{
// RsDbg() << "PQISTREAMER pqistreamer::tick_recv mBio->isactive false";
RsStackMutex stack(mStreamerMtx);
free_pend();
return 0;
}

int readbytes = 0;
if (mBio->moretoread(timeout))
{
handleincoming();
readbytes = handleincoming();
}
// else
// RsDbg() << "PQISTREAMER pqistreamer::tick_recv mBios->moretoread() false";

if(!(mBio->isactive()))
{
// RsDbg() << "PQISTREAMER pqistreamer::tick_recv mBio->isactive false";
RsStackMutex stack(mStreamerMtx);
free_pend();
}
return 1;

return readbytes;
}

int pqistreamer::tick_send(uint32_t timeout)
{
/* short circuit everything if bio isn't active */
if (!(mBio->isactive()))
{
// RsDbg() << "PQISTREAMER pqistreamer::tick_send mBio->isactive false";
RsStackMutex stack(mStreamerMtx);
free_pend();
return 0;
}

int sentbytes = 0;
if (mBio->cansend(timeout))
{
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
handleoutgoing_locked();
RsStackMutex stack(mStreamerMtx);
sentbytes = handleoutgoing_locked();
}
// else
// RsDbg() << "PQISTREAMER pqistreamer::tick_send mBio->cansend false";

return 1;
if (!(mBio->isactive()))
{
// RsDbg() << "PQISTREAMER pqistreamer::tick_send mBio->isactive false";
RsStackMutex stack(mStreamerMtx);
free_pend();
}

return sentbytes;
}

int pqistreamer::status()
Expand All @@ -357,7 +384,7 @@ int pqistreamer::status()
// this method is overloaded by pqiqosstreamer
void pqistreamer::locked_storeInOutputQueue(void *ptr,int,int)
{
RsDbg() << "Storing packet " << std::hex << ptr << std::dec << " in outqueue.";
// RsDbg() << "Storing packet " << std::hex << ptr << std::dec << " in outqueue.";
mOutPkts.push_back(ptr);
}

Expand Down Expand Up @@ -535,6 +562,7 @@ int pqistreamer::handleoutgoing_locked()
mPkt_wpending_size = 0 ;
}

// RsDbg() << "PQISTREAMER pqistreamer::handleoutgoing_locked() stopped mBio->isactive() false";
return 0;
}

Expand All @@ -554,7 +582,7 @@ int pqistreamer::handleoutgoing_locked()
else
RsDbg() << "PQISTREAMER pqistreamer::handleoutgoing_locked() stopped sending bio not ready, sentbytes " << std::dec << sentbytes << " maxbytes " << maxbytes;
#endif
return 0;
return sentbytes;
}
// send a out_pkt., else send out_data. unless there is a pending packet. The strategy is to
// - grab as many packets as possible while below the optimal packet size, so as to allow some packing and decrease encryption padding overhead (suposeddly)
Expand Down Expand Up @@ -612,10 +640,11 @@ int pqistreamer::handleoutgoing_locked()
{
if(slice_size > 0xffff || !mAcceptsPacketSlicing)
{
std::cerr << "(EE) protocol error in pqitreamer: slice size is too large and cannot be encoded." ;
std::cerr << "(EE) protocol error in pqistreamer: slice size is too large and cannot be encoded." ;
free(mPkt_wpending) ;
mPkt_wpending_size = 0;
return -1 ;
// RsDbg() << "PQISTREAMER pqistreamer::handleoutgoing_locked() stopped error slice size is too large";
return sentbytes ;
}
#ifdef DEBUG_PACKET_SLICING
std::cerr << "sending partial slice, packet ID=" << std::hex << slice_packet_id << std::dec << ", size=" << slice_size << std::endl;
Expand Down Expand Up @@ -672,7 +701,8 @@ int pqistreamer::handleoutgoing_locked()

// pkt_wpending will kept til next time.
// ensuring exactly the same data is written (openSSL requirement).
return -1;
// RsDbg() << "PQISTREAMER pqistreamer::handleoutgoing_locked() stopped sending failed only " << std::dec << ss << " bytes out of " << mPkt_wpending_size;
return sentbytes;
}
#ifdef DEBUG_PQISTREAMER
else
Expand All @@ -697,11 +727,13 @@ int pqistreamer::handleoutgoing_locked()
sent = true;
}
}
#ifdef DEBUG_PQISTREAMER
if(nsent > 0)
std::cerr << "nsent = " << nsent << ", total bytes=" << sentbytes << std::endl;
#endif
return 1;

//#ifdef DEBUG_PQISTREAMER
// if (sentbytes >0)
// RsDbg() << "PQISTREAMER pqistreamer::handleoutgoing_locked() stopped outqueue empty, nsent " << std::dec << nsent << " sentbytes " << sentbytes << " maxbytes " << maxbytes;
//#endif

return sentbytes;
}


Expand All @@ -721,6 +753,7 @@ int pqistreamer::handleincoming()
RsStackMutex stack(mStreamerMtx);
mReading_state = reading_state_initial ;
free_pend();
// RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading bio->isactive() false";
return 0;
}
else
Expand Down Expand Up @@ -765,7 +798,8 @@ int pqistreamer::handleincoming()
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "pqistreamer::handleincoming() read blocked");
std::cerr << "[" << (void*)pthread_self() << "] " << "given up 1" << std::endl ;
#endif
return 0;
// RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading read blocked";
return readbytes;
}
else if (tmplen < 0)
{
Expand All @@ -776,7 +810,8 @@ int pqistreamer::handleincoming()
#ifdef DEBUG_PQISTREAMER
std::cerr << "[" << (void*)pthread_self() << "] " << "given up 2, state = " << mReading_state << std::endl ;
#endif
return 0;
// RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading error in bio read";
return readbytes;
}
else // tmplen > 0
{
Expand All @@ -788,7 +823,8 @@ int pqistreamer::handleincoming()

std::cerr << "[" << (void*)pthread_self() << "] " << "given up 3" << std::endl ;
#endif
return -1;
// RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading strange reading of " << std::dec << tmplen;
return readbytes;
}
}
#ifdef DEBUG_PQISTREAMER
Expand All @@ -809,7 +845,8 @@ int pqistreamer::handleincoming()
#endif
mReading_state = reading_state_initial ; // restart at state 1.
mFailed_read_attempts = 0 ;
return 0;
// RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading enabled packet slicing";
return readbytes;
}
}
continue_packet:
Expand Down Expand Up @@ -889,7 +926,9 @@ int pqistreamer::handleincoming()
mBio->close();
mReading_state = reading_state_initial ; // restart at state 1.
mFailed_read_attempts = 0 ;
return -1;

// RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading read packet too big";
return readbytes;

// Used to exit now! exit(1);
}
Expand Down Expand Up @@ -944,14 +983,16 @@ int pqistreamer::handleincoming()
mBio->close();
mReading_state = reading_state_initial ; // restart at state 1.
mFailed_read_attempts = 0 ;
return -1;
// RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading unexpected read error";
return readbytes;
}
else
{
#ifdef DEBUG_PQISTREAMER
std::cerr << "[" << (void*)pthread_self() << "] " << "given up 5, state = " << mReading_state << std::endl ;
#endif
return 0 ; // this is just a SSL_WANT_READ error. Don't panic, we'll re-try the read soon.
// RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading too many read fail";
return readbytes ; // this is just a SSL_WANT_READ error. Don't panic, we'll re-try the read soon.
// we assume readdata() returned either -1 or the complete read size.
}
}
Expand Down Expand Up @@ -1035,7 +1076,7 @@ int pqistreamer::handleincoming()
RsDbg() << "PQISTREAMER pqistreamer::handleincoming() stopped reading no more to read, readbytes " << std::dec << readbytes << " maxin " << maxin;
#endif

return 0;
return readbytes;
}

RsItem *pqistreamer::addPartialPacket(const void *block, uint32_t len, uint32_t slice_packet_id, bool is_packet_starting, bool is_packet_ending, uint32_t &total_len)
Expand Down Expand Up @@ -1180,6 +1221,13 @@ int pqistreamer::outAllowedBytes_locked()
RsDbg() << "PQISTREAMER pqistreamer::outAllowedBytes_locked() dt " << std::dec << (int)(1000 * dt) << "ms, mAvgDtOut " << (int)(1000 * mAvgDtOut) << "ms, maxout " << (int)(maxout) << " bytes/s, mCurrSent " << mCurrSent << " bytes, quota " << (int)(quota) << " bytes";
#endif

/* RsDbg() << "PQISTREAMER pqistreamer::outAllowedBytes_locked() dt "
<< std::fixed << std::setprecision(1) << (1000.0 * dt) << "ms, mAvgDtOut "
<< (1000.0 * mAvgDtOut) << "ms, maxout "
<< (int)(maxout) << " bytes/s, mCurrSent "
<< mCurrSent << " bytes, quota "
<< (int)(quota) << " bytes";
*/
return quota;
}

Expand Down Expand Up @@ -1219,12 +1267,20 @@ int pqistreamer::inAllowedBytes()

// we now calculate the max amount of data allowed to be received during the next round
// we take into account the excess/deficit of the previous round
double quota = mAvgDtIn * maxin - mCurrRead;
// double quota = mAvgDtIn * maxin - mCurrRead;
double quota = mAvgDtIn * maxin;

#ifdef DEBUG_PQISTREAMER
RsDbg() << "PQISTREAMER pqistreamer::inAllowedBytes() dt " << std::dec << (int)(1000 * dt) << "ms, mAvgDtIn " << (int)(1000 * mAvgDtIn) << "ms, maxin " << (int)(maxin) << " bytes/s, mCurrRead " << mCurrRead << " bytes, quota " << (int)(quota) << " bytes";
#endif

/* RsDbg() << "PQISTREAMER pqistreamer::inAllowedBytes() dt "
<< std::fixed << std::setprecision(1) << (1000.0 * dt) << "ms, mAvgDtIn "
<< (1000.0 * mAvgDtIn) << "ms, maxin "
<< (int)(maxin) << " bytes/s, mCurrRead "
<< mCurrRead << " bytes, quota "
<< (int)(quota) << " bytes";
*/
return quota;
}

Expand Down
Loading
Loading