From 058996ae6d5a92d957cfffde36f38c2b8a4beef7 Mon Sep 17 00:00:00 2001 From: Gleb Naumenko Date: Wed, 6 Feb 2019 20:25:27 -0800 Subject: [PATCH 01/10] Change in transaction pull scheduling to prevent InvBlock-related attacks Co-authored-by: Suhas Daftuar --- src/net.cpp | 54 +---------- src/net.h | 12 --- src/net_processing.cpp | 193 ++++++++++++++++++++++++++++++++++------ src/test/util_tests.cpp | 8 +- src/utiltime.cpp | 13 --- 5 files changed, 168 insertions(+), 112 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index a04687b5b36e..62b4fc6bf154 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -93,8 +93,6 @@ std::map mapLocalHost; static bool vfLimited[NET_MAX] = {}; std::string strSubVersion; -limitedmap mapAlreadyAskedFor(MAX_INV_SZ); - void CConnman::AddOneShot(const std::string& strDest) { LOCK(cs_vOneShots); @@ -2573,12 +2571,8 @@ void CConnman::RelayInv(CInv& inv, int minProtoVersion) void CConnman::RemoveAskFor(const uint256& invHash, int invType) { - mapAlreadyAskedFor.erase(CInv(invType, invHash)); - LOCK(cs_vNodes); - for (const auto& pnode : vNodes) { - pnode->AskForInvReceived(invHash); - } + // TODO: Remove this } void CConnman::UpdateQuorumRelayMemberIfNeeded(CNode* pnode) @@ -2699,52 +2693,6 @@ CNode::~CNode() CloseSocket(hSocket); } -void CNode::AskFor(const CInv& inv, int64_t doubleRequestDelay) -{ - if (mapAskFor.size() > MAPASKFOR_MAX_SZ || setAskFor.size() > SETASKFOR_MAX_SZ) - return; - // a peer may not have multiple non-responded queue positions for a single inv item - if (!setAskFor.insert(inv.hash).second) - return; - - // We're using mapAskFor as a priority queue, - // the key is the earliest time the request can be sent - int64_t nRequestTime; - limitedmap::const_iterator it = mapAlreadyAskedFor.find(inv); - if (it != mapAlreadyAskedFor.end()) - nRequestTime = it->second; - else - nRequestTime = 0; - LogPrint(BCLog::NET, "askfor %s %d (%s) peer=%d\n", inv.ToString(), nRequestTime, FormatISO8601Time(nRequestTime / 1000000), id); - - // Make sure not to reuse time indexes to keep things in the same order - int64_t nNow = GetTimeMicros() - 1000000; - static int64_t nLastTime; - ++nLastTime; - nNow = std::max(nNow, nLastTime); - nLastTime = nNow; - - // Each retry is 2 minutes after the last - nRequestTime = std::max(nRequestTime + doubleRequestDelay, nNow); - if (it != mapAlreadyAskedFor.end()) - mapAlreadyAskedFor.update(it, nRequestTime); - else - mapAlreadyAskedFor.insert(std::make_pair(inv, nRequestTime)); - mapAskFor.insert(std::make_pair(nRequestTime, inv)); -} - -void CNode::AskForInvReceived(const uint256& invHash) -{ - setAskFor.erase(invHash); - for (auto it = mapAskFor.begin(); it != mapAskFor.end();) { - if (it->second.hash == invHash) { - it = mapAskFor.erase(it); - } else { - ++it; - } - } -} - bool CConnman::NodeFullyConnected(const CNode* pnode) { return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect; diff --git a/src/net.h b/src/net.h index 2b25df53f4dd..4557914f45e9 100644 --- a/src/net.h +++ b/src/net.h @@ -86,10 +86,6 @@ static const int MAX_ADDNODE_CONNECTIONS = 16; static const int INBOUND_EVICTION_PROTECTION_TIME = 1; /** -listen default */ static const bool DEFAULT_LISTEN = true; -/** The maximum number of entries in mapAskFor */ -static const size_t MAPASKFOR_MAX_SZ = MAX_INV_SZ; -/** The maximum number of entries in setAskFor (larger due to getdata latency)*/ -static const size_t SETASKFOR_MAX_SZ = 2 * MAX_INV_SZ; /** The maximum number of peer connections to maintain. */ static const unsigned int DEFAULT_MAX_PEER_CONNECTIONS = 125; /** Disconnected peers are added to setOffsetDisconnectedPeers only if node has less than ENOUGH_CONNECTIONS */ @@ -568,8 +564,6 @@ bool validateMasternodeIP(const std::string& addrStr); // valid, reacha extern bool fDiscover; extern bool fListen; -extern limitedmap mapAlreadyAskedFor; - /** Subversion as sent to the P2P network in `version` messages */ extern std::string strSubVersion; @@ -773,8 +767,6 @@ class CNode // Set of tier two messages ids we still have to announce. std::vector vInventoryTierTwoToSend; RecursiveMutex cs_inventory; - std::multimap mapAskFor; - std::set setAskFor; std::vector vBlockRequested; std::chrono::microseconds nNextInvSend{0}; // Used for BIP35 mempool sending, also protected by cs_inventory @@ -925,10 +917,6 @@ class CNode } } - void AskFor(const CInv& inv, int64_t doubleRequestDelay = 2 * 60 * 1000000); - // inv response received, clear it from the waiting inv set. - void AskForInvReceived(const uint256& invHash); - void CloseSocketDisconnect(); bool DisconnectOldProtocol(int nVersionIn, int nVersionRequired); diff --git a/src/net_processing.cpp b/src/net_processing.cpp index bdda9fdf2726..365182a4a9b3 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -36,6 +36,20 @@ using namespace std::chrono_literals; static const uint64_t RANDOMIZER_ID_ADDRESS_RELAY = 0x3cac0035b5866b90ULL; // SHA256("main address relay")[0:8] +/** Maximum number of in-flight transactions from a peer */ +static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100; +/** Maximum number of announced transactions from a peer */ +static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ; +/** How many microseconds to delay requesting transactions from inbound peers */ +static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000; +/** How long to wait (in microseconds) before downloading a transaction from an additional peer */ +static constexpr int64_t GETDATA_TX_INTERVAL = 60 * 1000000; +/** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */ +static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000; +static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY, + "To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY"); +/** Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */ +static const unsigned int MAX_GETDATA_SZ = 1000; /** the maximum percentage of addresses from our addrman to return in response to a getaddr message. */ static constexpr size_t MAX_PCT_ADDR_TO_SEND = 23; @@ -231,7 +245,68 @@ struct CNodeState { CNodeBlocks nodeBlocks; - CNodeState(CAddress addrIn, std::string addrNameIn) : address(addrIn), name(addrNameIn) { + /* + * State associated with transaction download. + * + * Tx download algorithm: + * + * When inv comes in, queue up (process_time, txid) inside the peer's + * CNodeState (m_tx_process_time) as long as m_tx_announced for the peer + * isn't too big (MAX_PEER_TX_ANNOUNCEMENTS). + * + * The process_time for a transaction is set to nNow for outbound peers, + * nNow + 2 seconds for inbound peers. This is the time at which we'll + * consider trying to request the transaction from the peer in + * SendMessages(). The delay for inbound peers is to allow outbound peers + * a chance to announce before we request from inbound peers, to prevent + * an adversary from using inbound connections to blind us to a + * transaction (InvBlock). + * + * When we call SendMessages() for a given peer, + * we will loop over the transactions in m_tx_process_time, looking + * at the transactions whose process_time <= nNow. We'll request each + * such transaction that we don't have already and that hasn't been + * requested from another peer recently, up until we hit the + * MAX_PEER_TX_IN_FLIGHT limit for the peer. Then we'll update + * g_already_asked_for for each requested txid, storing the time of the + * GETDATA request. We use g_already_asked_for to coordinate transaction + * requests amongst our peers. + * + * For transactions that we still need but we have already recently + * requested from some other peer, we'll reinsert (process_time, txid) + * back into the peer's m_tx_process_time at the point in the future at + * which the most recent GETDATA request would time out (ie + * GETDATA_TX_INTERVAL + the request time stored in g_already_asked_for). + * We add an additional delay for inbound peers, again to prefer + * attempting download from outbound peers first. + * We also add an extra small random delay up to 2 seconds + * to avoid biasing some peers over others. (e.g., due to fixed ordering + * of peer processing in ThreadMessageHandler). + * + * When we receive a transaction from a peer, we remove the txid from the + * peer's m_tx_in_flight set and from their recently announced set + * (m_tx_announced). We also clear g_already_asked_for for that entry, so + * that if somehow the transaction is not accepted but also not added to + * the reject filter, then we will eventually redownload from other + * peers. + */ + struct TxDownloadState { + /* Track when to attempt download of announced transactions (process + * time in micros -> txid) + */ + std::multimap m_tx_process_time; + + //! Store all the transactions a peer has recently announced + std::set m_tx_announced; + + //! Store transactions which were requested by us + std::set m_tx_in_flight; + }; + + TxDownloadState m_tx_download; + + CNodeState(CAddress addrIn, std::string addrNameIn) : address(addrIn), name(addrNameIn) + { fCurrentlyConnected = false; nMisbehavior = 0; fShouldBan = false; @@ -245,6 +320,9 @@ struct CNodeState { } }; +// Keeps track of the time (in microseconds) when transactions were requested last time +limitedmap g_already_asked_for GUARDED_BY(cs_main)(MAX_INV_SZ); + /** Map maintaining per-node state. Requires cs_main. */ std::map mapNodeState; @@ -453,8 +531,59 @@ static void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vec } } } +void EraseTxRequest(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + g_already_asked_for.erase(txid); +} -} // anon namespace +int64_t GetTxRequestTime(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + auto it = g_already_asked_for.find(txid); + if (it != g_already_asked_for.end()) { + return it->second; + } + return 0; +} + +void UpdateTxRequestTime(const uint256& txid, int64_t request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + auto it = g_already_asked_for.find(txid); + + if (it == g_already_asked_for.end()) { + g_already_asked_for.insert(std::make_pair(txid, request_time)); + } else { + g_already_asked_for.update(it, request_time); + } +} + +void RequestTx(CNodeState* state, const uint256& txid, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + CNodeState::TxDownloadState& peer_download_state = state->m_tx_download; + if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS || peer_download_state.m_tx_announced.count(txid)) { + // Too many queued announcements from this peer, or we already have + // this announcement + return; + } + peer_download_state.m_tx_announced.insert(txid); + int64_t process_time; + int64_t last_request_time = GetTxRequestTime(txid); + + // First time requesting this tx + if (last_request_time == 0) { + process_time = nNow; + } else { + // Randomize the delay to avoid biasing some peers over others (such as due to + // fixed ordering of peer processing in ThreadMessageHandler) + process_time = last_request_time + GETDATA_TX_INTERVAL + GetRand(MAX_GETDATA_RANDOM_DELAY); + } + + // We delay processing announcements from non-preferred (eg inbound) peers + if (!state->fPreferredDownload) process_time += INBOUND_PEER_TX_DELAY; + + peer_download_state.m_tx_process_time.emplace(process_time, txid); +} + +} // namespace void PeerLogicValidation::InitializeNode(CNode *pnode) { CAddress addr = pnode->addr; @@ -1587,6 +1716,7 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR } LOCK(cs_main); + int64_t nNow = GetTimeMicros(); std::vector vToFetch; @@ -1635,21 +1765,10 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR if (!fAlreadyHave) { bool allowWhileInIBD = allowWhileInIBDObjs.count(inv.type); if (allowWhileInIBD || !IsInitialBlockDownload()) { - int64_t doubleRequestDelay = 2 * 60 * 1000000; - // some messages need to be re-requested faster when the first announcing peer did not answer to GETDATA - switch (inv.type) { - case MSG_QUORUM_RECOVERED_SIG: - doubleRequestDelay = 5 * 1000000; - break; - case MSG_CLSIG: - doubleRequestDelay = 5 * 1000000; - break; - } - pfrom->AskFor(inv, doubleRequestDelay); + RequestTx(State(pfrom->GetId()), inv.hash, nNow); } } } - } if (!vToFetch.empty()) @@ -1779,8 +1898,10 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR bool fMissingInputs = false; CValidationState state; - pfrom->setAskFor.erase(inv.hash); - mapAlreadyAskedFor.erase(inv); + CNodeState* nodestate = State(pfrom->GetId()); + nodestate->m_tx_download.m_tx_announced.erase(inv.hash); + nodestate->m_tx_download.m_tx_in_flight.erase(inv.hash); + EraseTxRequest(inv.hash); if (ptx->ContainsZerocoins()) { // Don't even try to check zerocoins at all. @@ -1869,10 +1990,11 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR } } if (!fRejectedParents) { + int64_t nNow = GetTimeMicros(); for (const uint256& parent_txid : unique_parents) { CInv _inv(MSG_TX, parent_txid); pfrom->AddInventoryKnown(_inv); - if (!AlreadyHave(_inv)) pfrom->AskFor(_inv); + if (!AlreadyHave(_inv)) RequestTx(State(pfrom->GetId()), _inv.hash, nNow); } AddOrphanTx(ptx, pfrom->GetId()); @@ -2644,20 +2766,37 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM // // Message: getdata (non-blocks) // - while (!pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow) { - const CInv& inv = (*pto->mapAskFor.begin()).second; + auto& tx_process_time = state.m_tx_download.m_tx_process_time; + while (!tx_process_time.empty() && tx_process_time.begin()->first <= nNow && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) { + const uint256& txid = tx_process_time.begin()->second; + CInv inv(MSG_TX, txid); if (!AlreadyHave(inv)) { - LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId()); - vGetData.push_back(inv); - if (vGetData.size() >= 1000) { - connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); - vGetData.clear(); + // If this transaction was last requested more than 1 minute ago, + // then request. + int64_t last_request_time = GetTxRequestTime(inv.hash); + if (last_request_time <= nNow - GETDATA_TX_INTERVAL) { + LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId()); + vGetData.push_back(inv); + if (vGetData.size() >= MAX_GETDATA_SZ) { + connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); + vGetData.clear(); + } + UpdateTxRequestTime(inv.hash, nNow); + state.m_tx_download.m_tx_in_flight.insert(inv.hash); + } else { + // This transaction is in flight from someone else; queue + // up processing to happen after the download times out + // (with a slight delay for inbound peers, to prefer + // requests to outbound peers). + RequestTx(&state, txid, nNow); } } else { - //If we're not going to ask, don't expect a response. - pto->setAskFor.erase(inv.hash); + // We have already seen this transaction, no need to download. + state.m_tx_download.m_tx_announced.erase(inv.hash); + state.m_tx_download.m_tx_in_flight.erase(inv.hash); } - pto->mapAskFor.erase(pto->mapAskFor.begin()); + + tx_process_time.erase(tx_process_time.begin()); } if (!vGetData.empty()) connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); diff --git a/src/test/util_tests.cpp b/src/test/util_tests.cpp index 41e4ec451e39..005fe1e8474a 100644 --- a/src/test/util_tests.cpp +++ b/src/test/util_tests.cpp @@ -121,13 +121,7 @@ BOOST_AUTO_TEST_CASE(util_FormatISO8601Date) BOOST_CHECK_EQUAL(FormatISO8601Date(1317425777), "2011-09-30"); } -BOOST_AUTO_TEST_CASE(util_FormatISO8601Time) -{ - BOOST_CHECK_EQUAL(FormatISO8601Time(1317425777), "23:36:17Z"); -} - -struct TestArgsManager : public ArgsManager -{ +struct TestArgsManager : public ArgsManager { TestArgsManager() { m_network_only_args.clear(); } std::map >& GetOverrideArgs() { return m_override_args; } std::map >& GetConfigArgs() { return m_config_args; } diff --git a/src/utiltime.cpp b/src/utiltime.cpp index f518a61a08c6..c0f4b6d26f0d 100644 --- a/src/utiltime.cpp +++ b/src/utiltime.cpp @@ -137,16 +137,3 @@ std::string FormatISO8601Date(int64_t nTime) { } return strprintf("%04i-%02i-%02i", ts.tm_year + 1900, ts.tm_mon + 1, ts.tm_mday); } - -std::string FormatISO8601Time(int64_t nTime) { - struct tm ts; - time_t time_val = nTime; -#ifdef HAVE_GMTIME_R - if (gmtime_r(&time_val, &ts) == nullptr) { -#else - if (gmtime_s(&ts, &time_val) != 0) { -#endif - return {}; - } - return strprintf("%02i:%02i:%02iZ", ts.tm_hour, ts.tm_min, ts.tm_sec); -} From 5656187b502eb0d3b5aa1c8f90718cea64dfc61c Mon Sep 17 00:00:00 2001 From: Suhas Daftuar Date: Mon, 22 Apr 2019 14:13:30 -0400 Subject: [PATCH 02/10] Fix bug around transaction requests If a transaction is already in-flight when a peer announces a new tx to us, we schedule a time in the future to reconsider whether to download. At that future time, there was a bug that would prevent transactions from being rescheduled for potential download again (ie if the transaction was still in-flight at the time of reconsideration, such as from some other peer). Fix this. --- src/net_processing.cpp | 41 ++++++++++++++++++++++++++--------------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 365182a4a9b3..df299fa75fc6 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -556,29 +556,38 @@ void UpdateTxRequestTime(const uint256& txid, int64_t request_time) EXCLUSIVE_LO } } -void RequestTx(CNodeState* state, const uint256& txid, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main) + +int64_t CalculateTxGetDataTime(const uint256& txid, int64_t current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { - CNodeState::TxDownloadState& peer_download_state = state->m_tx_download; - if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS || peer_download_state.m_tx_announced.count(txid)) { - // Too many queued announcements from this peer, or we already have - // this announcement - return; - } - peer_download_state.m_tx_announced.insert(txid); int64_t process_time; int64_t last_request_time = GetTxRequestTime(txid); - // First time requesting this tx if (last_request_time == 0) { - process_time = nNow; + process_time = current_time; } else { // Randomize the delay to avoid biasing some peers over others (such as due to // fixed ordering of peer processing in ThreadMessageHandler) process_time = last_request_time + GETDATA_TX_INTERVAL + GetRand(MAX_GETDATA_RANDOM_DELAY); } - // We delay processing announcements from non-preferred (eg inbound) peers - if (!state->fPreferredDownload) process_time += INBOUND_PEER_TX_DELAY; + // We delay processing announcements from inbound peers + if (use_inbound_delay) process_time += INBOUND_PEER_TX_DELAY; + + return process_time; +} + +void RequestTx(CNodeState* state, const uint256& txid, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + CNodeState::TxDownloadState& peer_download_state = state->m_tx_download; + if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS || peer_download_state.m_tx_announced.count(txid)) { + // Too many queued announcements from this peer, or we already have + // this announcement + return; + } + peer_download_state.m_tx_announced.insert(txid); + // Calculate the time to try requesting this transaction. Use + // fPreferredDownload as a proxy for outbound peers. + int64_t process_time = CalculateTxGetDataTime(txid, nNow, !state->fPreferredDownload); peer_download_state.m_tx_process_time.emplace(process_time, txid); } @@ -2769,6 +2778,9 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM auto& tx_process_time = state.m_tx_download.m_tx_process_time; while (!tx_process_time.empty() && tx_process_time.begin()->first <= nNow && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) { const uint256& txid = tx_process_time.begin()->second; + // Erase this entry from tx_process_time (it may be added back for + // processing at a later time, see below) + tx_process_time.erase(tx_process_time.begin()); CInv inv(MSG_TX, txid); if (!AlreadyHave(inv)) { // If this transaction was last requested more than 1 minute ago, @@ -2788,15 +2800,14 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM // up processing to happen after the download times out // (with a slight delay for inbound peers, to prefer // requests to outbound peers). - RequestTx(&state, txid, nNow); + int64_t next_process_time = CalculateTxGetDataTime(txid, nNow, !state.fPreferredDownload); + tx_process_time.emplace(next_process_time, txid); } } else { // We have already seen this transaction, no need to download. state.m_tx_download.m_tx_announced.erase(inv.hash); state.m_tx_download.m_tx_in_flight.erase(inv.hash); } - - tx_process_time.erase(tx_process_time.begin()); } if (!vGetData.empty()) connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); From 63b2f62e0931b77b227783f2febb30f1febd7b2f Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Mon, 3 Feb 2025 15:55:50 +0100 Subject: [PATCH 03/10] Cherrypick: Generalize TX request code 29d3b75f28f8b6a2333f737aff29fbc3a636f1ac (https://github.com/dashpay/dash/pull/3397/) --- src/net_processing.cpp | 77 ++++++++++++++++++++++++------------------ src/net_processing.h | 3 ++ 2 files changed, 48 insertions(+), 32 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index df299fa75fc6..3957ffa92b6f 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -289,18 +289,20 @@ struct CNodeState { * that if somehow the transaction is not accepted but also not added to * the reject filter, then we will eventually redownload from other * peers. + * + * PIVX: For PIVX, this does not only handles TXs but also all PIVX specific objects */ struct TxDownloadState { /* Track when to attempt download of announced transactions (process * time in micros -> txid) */ - std::multimap m_tx_process_time; + std::multimap m_tx_process_time; //! Store all the transactions a peer has recently announced - std::set m_tx_announced; + std::set m_tx_announced; //! Store transactions which were requested by us - std::set m_tx_in_flight; + std::set m_tx_in_flight; }; TxDownloadState m_tx_download; @@ -531,36 +533,40 @@ static void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vec } } } -void EraseTxRequest(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main) + +} // namespace +void EraseObjectRequest(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { - g_already_asked_for.erase(txid); + AssertLockHeld(cs_main); + g_already_asked_for.erase(hash); } -int64_t GetTxRequestTime(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +int64_t GetObjectRequestTime(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { - auto it = g_already_asked_for.find(txid); + AssertLockHeld(cs_main); + auto it = g_already_asked_for.find(hash); if (it != g_already_asked_for.end()) { return it->second; } return 0; } -void UpdateTxRequestTime(const uint256& txid, int64_t request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +void UpdateObjectRequestTime(const uint256& hash, int64_t request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { - auto it = g_already_asked_for.find(txid); + auto it = g_already_asked_for.find(hash); if (it == g_already_asked_for.end()) { - g_already_asked_for.insert(std::make_pair(txid, request_time)); + g_already_asked_for.insert(std::make_pair(hash, request_time)); } else { g_already_asked_for.update(it, request_time); } } -int64_t CalculateTxGetDataTime(const uint256& txid, int64_t current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +int64_t CalculateObjectGetDataTime(const uint256& txid, int64_t current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { int64_t process_time; - int64_t last_request_time = GetTxRequestTime(txid); + int64_t last_request_time = GetObjectRequestTime(txid); // First time requesting this tx if (last_request_time == 0) { process_time = current_time; @@ -576,23 +582,31 @@ int64_t CalculateTxGetDataTime(const uint256& txid, int64_t current_time, bool u return process_time; } -void RequestTx(CNodeState* state, const uint256& txid, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +void RequestObject(CNodeState* state, const CInv& inv, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { CNodeState::TxDownloadState& peer_download_state = state->m_tx_download; - if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS || peer_download_state.m_tx_announced.count(txid)) { + if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS || peer_download_state.m_tx_announced.count(inv)) { // Too many queued announcements from this peer, or we already have // this announcement return; } - peer_download_state.m_tx_announced.insert(txid); + peer_download_state.m_tx_announced.insert(inv); // Calculate the time to try requesting this transaction. Use // fPreferredDownload as a proxy for outbound peers. - int64_t process_time = CalculateTxGetDataTime(txid, nNow, !state->fPreferredDownload); + int64_t process_time = CalculateObjectGetDataTime(inv.hash, nNow, !state->fPreferredDownload); - peer_download_state.m_tx_process_time.emplace(process_time, txid); + peer_download_state.m_tx_process_time.emplace(process_time, inv); } -} // namespace +void RequestObject(NodeId nodeId, const CInv& inv, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + AssertLockHeld(cs_main); + auto* state = State(nodeId); + if (!state) { + return; + } + RequestObject(state, inv, nNow); +} void PeerLogicValidation::InitializeNode(CNode *pnode) { CAddress addr = pnode->addr; @@ -1774,7 +1788,7 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR if (!fAlreadyHave) { bool allowWhileInIBD = allowWhileInIBDObjs.count(inv.type); if (allowWhileInIBD || !IsInitialBlockDownload()) { - RequestTx(State(pfrom->GetId()), inv.hash, nNow); + RequestObject(State(pfrom->GetId()), inv, nNow); } } } @@ -1908,9 +1922,9 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR CValidationState state; CNodeState* nodestate = State(pfrom->GetId()); - nodestate->m_tx_download.m_tx_announced.erase(inv.hash); - nodestate->m_tx_download.m_tx_in_flight.erase(inv.hash); - EraseTxRequest(inv.hash); + nodestate->m_tx_download.m_tx_announced.erase(inv); + nodestate->m_tx_download.m_tx_in_flight.erase(inv); + EraseObjectRequest(inv.hash); if (ptx->ContainsZerocoins()) { // Don't even try to check zerocoins at all. @@ -2003,7 +2017,7 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR for (const uint256& parent_txid : unique_parents) { CInv _inv(MSG_TX, parent_txid); pfrom->AddInventoryKnown(_inv); - if (!AlreadyHave(_inv)) RequestTx(State(pfrom->GetId()), _inv.hash, nNow); + if (!AlreadyHave(_inv)) RequestObject(State(pfrom->GetId()), _inv, nNow); } AddOrphanTx(ptx, pfrom->GetId()); @@ -2777,15 +2791,14 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM // auto& tx_process_time = state.m_tx_download.m_tx_process_time; while (!tx_process_time.empty() && tx_process_time.begin()->first <= nNow && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) { - const uint256& txid = tx_process_time.begin()->second; + const auto& inv = tx_process_time.begin()->second; // Erase this entry from tx_process_time (it may be added back for // processing at a later time, see below) tx_process_time.erase(tx_process_time.begin()); - CInv inv(MSG_TX, txid); if (!AlreadyHave(inv)) { // If this transaction was last requested more than 1 minute ago, // then request. - int64_t last_request_time = GetTxRequestTime(inv.hash); + int64_t last_request_time = GetObjectRequestTime(inv.hash); if (last_request_time <= nNow - GETDATA_TX_INTERVAL) { LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId()); vGetData.push_back(inv); @@ -2793,20 +2806,20 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); vGetData.clear(); } - UpdateTxRequestTime(inv.hash, nNow); - state.m_tx_download.m_tx_in_flight.insert(inv.hash); + UpdateObjectRequestTime(inv.hash, nNow); + state.m_tx_download.m_tx_in_flight.insert(inv); } else { // This transaction is in flight from someone else; queue // up processing to happen after the download times out // (with a slight delay for inbound peers, to prefer // requests to outbound peers). - int64_t next_process_time = CalculateTxGetDataTime(txid, nNow, !state.fPreferredDownload); - tx_process_time.emplace(next_process_time, txid); + int64_t next_process_time = CalculateObjectGetDataTime(inv.hash, nNow, !state.fPreferredDownload); + tx_process_time.emplace(next_process_time, inv); } } else { // We have already seen this transaction, no need to download. - state.m_tx_download.m_tx_announced.erase(inv.hash); - state.m_tx_download.m_tx_in_flight.erase(inv.hash); + state.m_tx_download.m_tx_announced.erase(inv); + state.m_tx_download.m_tx_in_flight.erase(inv); } } if (!vGetData.empty()) diff --git a/src/net_processing.h b/src/net_processing.h index 35f2e83b566f..3a1fdd7d4019 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -81,4 +81,7 @@ using SecondsDouble = std::chrono::duration Date: Tue, 4 Feb 2025 11:29:45 +0100 Subject: [PATCH 04/10] Make interval and timeout dependend on INV type Cherrypick of 414943b6110bf61a9be5cf2c270487981eddc6cf https://github.com/dashpay/dash/pull/3397/ --- src/net_processing.cpp | 42 +++++++++++++++++++++++++++++++++++------- 1 file changed, 35 insertions(+), 7 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 3957ffa92b6f..a5976750df67 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -44,6 +44,8 @@ static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ; static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000; /** How long to wait (in microseconds) before downloading a transaction from an additional peer */ static constexpr int64_t GETDATA_TX_INTERVAL = 60 * 1000000; +/** How long to wait (expiry * factor microseconds) before expiring an in-flight getdata request to a peer */ +static constexpr int64_t TX_EXPIRY_INTERVAL_FACTOR = 10; /** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */ static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000; static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY, @@ -563,22 +565,48 @@ void UpdateObjectRequestTime(const uint256& hash, int64_t request_time) EXCLUSIV } -int64_t CalculateObjectGetDataTime(const uint256& txid, int64_t current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +int64_t GetObjectInterval(int invType) { + // some messages need to be re-requested faster when the first announcing peer did not answer to GETDATA + switch (invType) { + case MSG_QUORUM_RECOVERED_SIG: + return 15 * 1000000; + case MSG_CLSIG: + return 5 * 1000000; + default: + return GETDATA_TX_INTERVAL; + } +} + +int64_t GetObjectExpiryInterval(int invType) +{ + return GetObjectInterval(invType) * TX_EXPIRY_INTERVAL_FACTOR; +} + +int64_t GetObjectRandomDelay(int invType) +{ + if (invType == MSG_TX) { + return GetRand(MAX_GETDATA_RANDOM_DELAY); + } + return 0; +} + +int64_t CalculateObjectGetDataTime(const CInv& inv, int64_t current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + AssertLockHeld(cs_main); int64_t process_time; - int64_t last_request_time = GetObjectRequestTime(txid); + int64_t last_request_time = GetObjectRequestTime(inv.hash); // First time requesting this tx if (last_request_time == 0) { process_time = current_time; } else { // Randomize the delay to avoid biasing some peers over others (such as due to // fixed ordering of peer processing in ThreadMessageHandler) - process_time = last_request_time + GETDATA_TX_INTERVAL + GetRand(MAX_GETDATA_RANDOM_DELAY); + process_time = last_request_time + GetObjectInterval(inv.type) + GetObjectRandomDelay(inv.type); } // We delay processing announcements from inbound peers if (use_inbound_delay) process_time += INBOUND_PEER_TX_DELAY; - return process_time; } @@ -593,7 +621,7 @@ void RequestObject(CNodeState* state, const CInv& inv, int64_t nNow) EXCLUSIVE_L peer_download_state.m_tx_announced.insert(inv); // Calculate the time to try requesting this transaction. Use // fPreferredDownload as a proxy for outbound peers. - int64_t process_time = CalculateObjectGetDataTime(inv.hash, nNow, !state->fPreferredDownload); + int64_t process_time = CalculateObjectGetDataTime(inv, nNow, !state->fPreferredDownload); peer_download_state.m_tx_process_time.emplace(process_time, inv); } @@ -2799,7 +2827,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM // If this transaction was last requested more than 1 minute ago, // then request. int64_t last_request_time = GetObjectRequestTime(inv.hash); - if (last_request_time <= nNow - GETDATA_TX_INTERVAL) { + if (last_request_time <= nNow - GetObjectExpiryInterval(inv.type)) { LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId()); vGetData.push_back(inv); if (vGetData.size() >= MAX_GETDATA_SZ) { @@ -2813,7 +2841,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM // up processing to happen after the download times out // (with a slight delay for inbound peers, to prefer // requests to outbound peers). - int64_t next_process_time = CalculateObjectGetDataTime(inv.hash, nNow, !state.fPreferredDownload); + int64_t next_process_time = CalculateObjectGetDataTime(inv, nNow, !state.fPreferredDownload); tx_process_time.emplace(next_process_time, inv); } } else { From 35e2b9adb1d8f335bf2987e72209333eb8419260 Mon Sep 17 00:00:00 2001 From: Duddino Date: Tue, 4 Feb 2025 11:39:17 +0100 Subject: [PATCH 05/10] Replace uses of CConnman::RemoveAskFor with EraseObjectRequest --- src/budget/budgetmanager.cpp | 14 ++++++++------ src/llmq/quorums_blockprocessor.cpp | 4 +++- src/llmq/quorums_chainlocks.cpp | 5 +++-- src/llmq/quorums_dkgsessionhandler.cpp | 2 +- src/llmq/quorums_signing.cpp | 6 ++++-- src/masternode-payments.cpp | 7 ++++--- src/masternodeman.cpp | 10 +++++----- src/net.cpp | 6 ------ src/net.h | 3 --- test/lint/lint-circular-dependencies.sh | 1 - 10 files changed, 28 insertions(+), 30 deletions(-) diff --git a/src/budget/budgetmanager.cpp b/src/budget/budgetmanager.cpp index 317b5b8f3ab9..24970d9a3d54 100644 --- a/src/budget/budgetmanager.cpp +++ b/src/budget/budgetmanager.cpp @@ -9,10 +9,10 @@ #include "evo/deterministicmns.h" #include "masternodeman.h" #include "netmessagemaker.h" -#include "tiertwo/tiertwo_sync_state.h" #include "tiertwo/netfulfilledman.h" +#include "tiertwo/tiertwo_sync_state.h" #include "util/validation.h" -#include "validation.h" // GetTransaction, cs_main +#include "validation.h" // GetTransaction, cs_main #ifdef ENABLE_WALLET #include "wallet/wallet.h" // future: use interface instead. @@ -28,6 +28,8 @@ CBudgetManager g_budgetman; // Used to check both proposals and finalized-budgets collateral txes bool CheckCollateral(const uint256& nTxCollateralHash, const uint256& nExpectedHash, std::string& strError, int64_t& nTime, int nCurrentHeight, bool fBudgetFinalization); +void EraseObjectRequest(NodeId nodeId, const CInv& inv); + void CBudgetManager::ReloadMapSeen() { const auto reloadSeenMap = [](auto& mutex1, auto& mutex2, const auto& mapBudgets, auto& mapSeen, auto& mapOrphans) { @@ -1306,7 +1308,7 @@ int CBudgetManager::ProcessMessageInner(CNode* pfrom, std::string& strCommand, C { // Clear inv request LOCK(cs_main); - g_connman->RemoveAskFor(proposal.GetHash(), MSG_BUDGET_PROPOSAL); + EraseObjectRequest(proposal.GetHash()); } return ProcessProposal(proposal); } @@ -1319,7 +1321,7 @@ int CBudgetManager::ProcessMessageInner(CNode* pfrom, std::string& strCommand, C { // Clear inv request LOCK(cs_main); - g_connman->RemoveAskFor(vote.GetHash(), MSG_BUDGET_VOTE); + EraseObjectRequest(vote.GetHash()); } CValidationState state; @@ -1342,7 +1344,7 @@ int CBudgetManager::ProcessMessageInner(CNode* pfrom, std::string& strCommand, C { // Clear inv request LOCK(cs_main); - g_connman->RemoveAskFor(finalbudget.GetHash(), MSG_BUDGET_FINALIZED); + EraseObjectRequest(finalbudget.GetHash()); } return ProcessFinalizedBudget(finalbudget, pfrom); } @@ -1355,7 +1357,7 @@ int CBudgetManager::ProcessMessageInner(CNode* pfrom, std::string& strCommand, C { // Clear inv request LOCK(cs_main); - g_connman->RemoveAskFor(vote.GetHash(), MSG_BUDGET_FINALIZED_VOTE); + EraseObjectRequest(vote.GetHash()); } CValidationState state; diff --git a/src/llmq/quorums_blockprocessor.cpp b/src/llmq/quorums_blockprocessor.cpp index 000ba8e752e0..915d4f14a44f 100644 --- a/src/llmq/quorums_blockprocessor.cpp +++ b/src/llmq/quorums_blockprocessor.cpp @@ -18,6 +18,8 @@ #include "spork.h" #include "validation.h" +void EraseObjectRequest(NodeId nodeId, const CInv& inv); + namespace llmq { std::unique_ptr quorumBlockProcessor{nullptr}; @@ -52,7 +54,7 @@ void CQuorumBlockProcessor::ProcessMessage(CNode* pfrom, CDataStream& vRecv, int uint256 qfc_hash{::SerializeHash(qc)}; { LOCK(cs_main); - g_connman->RemoveAskFor(qfc_hash, MSG_QUORUM_FINAL_COMMITMENT); + EraseObjectRequest(qfc_hash); } if (qc.IsNull()) { diff --git a/src/llmq/quorums_chainlocks.cpp b/src/llmq/quorums_chainlocks.cpp index 3f627df2a627..a8c50eaf16b2 100644 --- a/src/llmq/quorums_chainlocks.cpp +++ b/src/llmq/quorums_chainlocks.cpp @@ -103,7 +103,7 @@ void CChainLocksHandler::ProcessNewChainLock(NodeId from, const llmq::CChainLock { { LOCK(cs_main); - g_connman->RemoveAskFor(hash, MSG_CLSIG); + EraseObjectRequest(hash); } { @@ -484,4 +484,5 @@ void CChainLocksHandler::Cleanup() lastCleanupTime = GetTimeMillis(); } -} +} // namespace llmq + diff --git a/src/llmq/quorums_dkgsessionhandler.cpp b/src/llmq/quorums_dkgsessionhandler.cpp index 9efca8a0dfc4..9d78c3a692ef 100644 --- a/src/llmq/quorums_dkgsessionhandler.cpp +++ b/src/llmq/quorums_dkgsessionhandler.cpp @@ -45,7 +45,7 @@ void CDKGPendingMessages::PushPendingMessage(NodeId from, CDataStream& vRecv, in LOCK2(cs_main, cs); - g_connman->RemoveAskFor(hash, invType); + EraseObjectRequest(hash); if (!seenMessages.emplace(hash).second) { LogPrint(BCLog::NET, "CDKGPendingMessages::%s -- already seen %s, peer=%d\n", __func__, hash.ToString(), from); diff --git a/src/llmq/quorums_signing.cpp b/src/llmq/quorums_signing.cpp index ec8ac8e0a26e..09dd27e61b4d 100644 --- a/src/llmq/quorums_signing.cpp +++ b/src/llmq/quorums_signing.cpp @@ -12,13 +12,15 @@ #include "activemasternode.h" #include "bls/bls_batchverifier.h" #include "cxxtimer.h" -#include "net_processing.h" #include "validation.h" #include #include #include +void EraseObjectRequest(NodeId nodeId, const CInv& inv); +void Misbehaving(NodeId nodeid, int howmuch, const std::string& message = ""); + namespace llmq { @@ -640,7 +642,7 @@ void CSigningManager::ProcessRecoveredSig(NodeId nodeId, const CRecoveredSig& re { LOCK(cs_main); - connman.RemoveAskFor(recoveredSig.GetHash(), MSG_QUORUM_RECOVERED_SIG); + EraseObjectRequest(recoveredSig.GetHash()); } if (db.HasRecoveredSigForHash(recoveredSig.GetHash())) { diff --git a/src/masternode-payments.cpp b/src/masternode-payments.cpp index 8d3da0bb7292..75fb800be819 100644 --- a/src/masternode-payments.cpp +++ b/src/masternode-payments.cpp @@ -5,20 +5,21 @@ #include "masternode-payments.h" +#include "budget/budgetmanager.h" #include "chainparams.h" #include "evo/deterministicmns.h" #include "fs.h" -#include "budget/budgetmanager.h" #include "masternodeman.h" #include "netmessagemaker.h" -#include "tiertwo/netfulfilledman.h" #include "spork.h" #include "sync.h" +#include "tiertwo/netfulfilledman.h" #include "tiertwo/tiertwo_sync_state.h" #include "util/system.h" #include "utilmoneystr.h" #include "validation.h" +void EraseObjectRequest(NodeId nodeId, const CInv& inv); /** Object for who's going to get paid on which blocks */ CMasternodePayments masternodePayments; @@ -430,7 +431,7 @@ bool CMasternodePayments::ProcessMessageMasternodePayments(CNode* pfrom, std::st { // Clear inv request LOCK(cs_main); - g_connman->RemoveAskFor(winner.GetHash(), MSG_MASTERNODE_WINNER); + EraseObjectRequest(winner.GetHash()); } ProcessMNWinner(winner, pfrom, state); diff --git a/src/masternodeman.cpp b/src/masternodeman.cpp index e4248af9afbf..c69058c43cd0 100644 --- a/src/masternodeman.cpp +++ b/src/masternodeman.cpp @@ -5,13 +5,11 @@ #include "masternodeman.h" -#include "addrman.h" #include "evo/deterministicmns.h" #include "fs.h" #include "masternode-payments.h" #include "masternode-sync.h" #include "masternode.h" -#include "messagesigner.h" #include "netbase.h" #include "netmessagemaker.h" #include "shutdown.h" @@ -23,6 +21,8 @@ #define MN_WINNER_MINIMUM_AGE 8000 // Age in seconds. This should be > MASTERNODE_REMOVAL_SECONDS to avoid misconfigured new nodes in the list. +void EraseObjectRequest(NodeId nodeId, const CInv& inv); + /** Masternode manager */ CMasternodeMan mnodeman; /** Keep track of the active Masternode */ @@ -968,7 +968,7 @@ int CMasternodeMan::ProcessMessageInner(CNode* pfrom, std::string& strCommand, C { // Clear inv request LOCK(cs_main); - g_connman->RemoveAskFor(mnb.GetHash(), MSG_MASTERNODE_ANNOUNCE); + EraseObjectRequest(mnb.GetHash()); } return ProcessMNBroadcast(pfrom, mnb); @@ -979,7 +979,7 @@ int CMasternodeMan::ProcessMessageInner(CNode* pfrom, std::string& strCommand, C { // Clear inv request LOCK(cs_main); - g_connman->RemoveAskFor(mnb.GetHash(), MSG_MASTERNODE_ANNOUNCE); + EraseObjectRequest(mnb.GetHash()); } // For now, let's not process mnb2 with pre-BIP155 node addr format. @@ -998,7 +998,7 @@ int CMasternodeMan::ProcessMessageInner(CNode* pfrom, std::string& strCommand, C { // Clear inv request LOCK(cs_main); - g_connman->RemoveAskFor(mnp.GetHash(), MSG_MASTERNODE_PING); + EraseObjectRequest(mnp.GetHash()); } return ProcessMNPing(pfrom, mnp); diff --git a/src/net.cpp b/src/net.cpp index 62b4fc6bf154..1a720fcb3109 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -2569,12 +2569,6 @@ void CConnman::RelayInv(CInv& inv, int minProtoVersion) } } -void CConnman::RemoveAskFor(const uint256& invHash, int invType) -{ - LOCK(cs_vNodes); - // TODO: Remove this -} - void CConnman::UpdateQuorumRelayMemberIfNeeded(CNode* pnode) { if (!pnode->m_masternode_iqr_connection && pnode->m_masternode_connection && diff --git a/src/net.h b/src/net.h index 4557914f45e9..c2703922844f 100644 --- a/src/net.h +++ b/src/net.h @@ -295,9 +295,6 @@ class CConnman std::vector CopyNodeVector(); void ReleaseNodeVector(const std::vector& vecNodes); - // Clears AskFor requests for every known peer - void RemoveAskFor(const uint256& invHash, int invType); - void RelayInv(CInv& inv, int minProtoVersion = ActiveProtocol()); bool IsNodeConnected(const CAddress& addr); // Retrieves a connected peer (if connection success). Used only to check peer address availability for now. diff --git a/test/lint/lint-circular-dependencies.sh b/test/lint/lint-circular-dependencies.sh index 89368704e512..14395d428249 100755 --- a/test/lint/lint-circular-dependencies.sh +++ b/test/lint/lint-circular-dependencies.sh @@ -41,7 +41,6 @@ EXPECTED_CIRCULAR_DEPENDENCIES=( "llmq/quorums -> llmq/quorums_connections -> llmq/quorums" "llmq/quorums_dkgsession -> llmq/quorums_dkgsessionmgr -> llmq/quorums_dkgsessionhandler -> llmq/quorums_dkgsession" "llmq/quorums_dkgsessionhandler -> net_processing -> llmq/quorums_dkgsessionmgr -> llmq/quorums_dkgsessionhandler" - "llmq/quorums_signing -> net_processing -> llmq/quorums_signing" "llmq/quorums_chainlocks -> net_processing -> llmq/quorums_chainlocks" "llmq/quorums_chainlocks -> validation -> llmq/quorums_chainlocks" "chain -> legacy/stakemodifier -> validation -> validationinterface -> chain" From e317219aa83b5a8509cd5b5c43e191198b6d8e68 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Mon, 6 Apr 2020 15:17:22 +0200 Subject: [PATCH 06/10] Don't re-request erased object requests Cherry-pick of ef14b19f052196d77e6e990a9fb30a292eb05548 https://github.com/dashpay/dash/pull/3397/ --- src/net_processing.cpp | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index a5976750df67..84bb1cb900ad 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -325,7 +325,8 @@ struct CNodeState { }; // Keeps track of the time (in microseconds) when transactions were requested last time -limitedmap g_already_asked_for GUARDED_BY(cs_main)(MAX_INV_SZ); +limitedmap g_already_asked_for(MAX_INV_SZ); +limitedmap g_erased_object_requests(MAX_INV_SZ); /** Map maintaining per-node state. Requires cs_main. */ std::map mapNodeState; @@ -541,6 +542,7 @@ void EraseObjectRequest(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { AssertLockHeld(cs_main); g_already_asked_for.erase(hash); + g_erased_object_requests.insert(std::make_pair(hash, GetTimeMillis())); } int64_t GetObjectRequestTime(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main) @@ -2823,6 +2825,11 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM // Erase this entry from tx_process_time (it may be added back for // processing at a later time, see below) tx_process_time.erase(tx_process_time.begin()); + if (g_erased_object_requests.count(inv.hash)) { + state.m_tx_download.m_tx_announced.erase(inv); + state.m_tx_download.m_tx_in_flight.erase(inv); + continue; + } if (!AlreadyHave(inv)) { // If this transaction was last requested more than 1 minute ago, // then request. From 9ad3b6b0f6d227c6b2665e0b2a375d43f8e8636f Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Tue, 4 Feb 2025 11:59:27 +0100 Subject: [PATCH 07/10] Also remove m_tx_announced and m_tx_in_flight entries when EraseObjectRequest is called Cherry-pick of 26fcd3f0bf726b9f50645e258dbc0ac18b086bd7 https://github.com/dashpay/dash/pull/3397/ --- src/budget/budgetmanager.cpp | 10 ++++++---- src/llmq/quorums_blockprocessor.cpp | 2 +- src/llmq/quorums_chainlocks.cpp | 2 +- src/llmq/quorums_dkgsessionhandler.cpp | 2 +- src/llmq/quorums_signing.cpp | 3 ++- src/masternode-payments.cpp | 2 +- src/masternodeman.cpp | 8 +++++--- src/net_processing.cpp | 24 ++++++++++++++++++++---- src/net_processing.h | 2 +- 9 files changed, 38 insertions(+), 17 deletions(-) diff --git a/src/budget/budgetmanager.cpp b/src/budget/budgetmanager.cpp index 24970d9a3d54..8fe2d10983a7 100644 --- a/src/budget/budgetmanager.cpp +++ b/src/budget/budgetmanager.cpp @@ -5,6 +5,8 @@ #include "budget/budgetmanager.h" +#include "addrman.h" +#include "chainparams.h" #include "consensus/validation.h" #include "evo/deterministicmns.h" #include "masternodeman.h" @@ -1308,7 +1310,7 @@ int CBudgetManager::ProcessMessageInner(CNode* pfrom, std::string& strCommand, C { // Clear inv request LOCK(cs_main); - EraseObjectRequest(proposal.GetHash()); + EraseObjectRequest(pfrom->GetId(), CInv(MSG_BUDGET_PROPOSAL, proposal.GetHash())); } return ProcessProposal(proposal); } @@ -1321,7 +1323,7 @@ int CBudgetManager::ProcessMessageInner(CNode* pfrom, std::string& strCommand, C { // Clear inv request LOCK(cs_main); - EraseObjectRequest(vote.GetHash()); + EraseObjectRequest(pfrom->GetId(), CInv(MSG_BUDGET_VOTE, vote.GetHash())); } CValidationState state; @@ -1344,7 +1346,7 @@ int CBudgetManager::ProcessMessageInner(CNode* pfrom, std::string& strCommand, C { // Clear inv request LOCK(cs_main); - EraseObjectRequest(finalbudget.GetHash()); + EraseObjectRequest(pfrom->GetId(), CInv(MSG_BUDGET_FINALIZED, finalbudget.GetHash())); } return ProcessFinalizedBudget(finalbudget, pfrom); } @@ -1357,7 +1359,7 @@ int CBudgetManager::ProcessMessageInner(CNode* pfrom, std::string& strCommand, C { // Clear inv request LOCK(cs_main); - EraseObjectRequest(vote.GetHash()); + EraseObjectRequest(pfrom->GetId(), CInv(MSG_BUDGET_FINALIZED_VOTE, vote.GetHash())); } CValidationState state; diff --git a/src/llmq/quorums_blockprocessor.cpp b/src/llmq/quorums_blockprocessor.cpp index 915d4f14a44f..4665df91183f 100644 --- a/src/llmq/quorums_blockprocessor.cpp +++ b/src/llmq/quorums_blockprocessor.cpp @@ -54,7 +54,7 @@ void CQuorumBlockProcessor::ProcessMessage(CNode* pfrom, CDataStream& vRecv, int uint256 qfc_hash{::SerializeHash(qc)}; { LOCK(cs_main); - EraseObjectRequest(qfc_hash); + EraseObjectRequest(pfrom->GetId(), CInv(MSG_QUORUM_FINAL_COMMITMENT, qfc_hash)); } if (qc.IsNull()) { diff --git a/src/llmq/quorums_chainlocks.cpp b/src/llmq/quorums_chainlocks.cpp index a8c50eaf16b2..50b4cbb6d89e 100644 --- a/src/llmq/quorums_chainlocks.cpp +++ b/src/llmq/quorums_chainlocks.cpp @@ -103,7 +103,7 @@ void CChainLocksHandler::ProcessNewChainLock(NodeId from, const llmq::CChainLock { { LOCK(cs_main); - EraseObjectRequest(hash); + EraseObjectRequest(from, CInv(MSG_CLSIG, hash)); } { diff --git a/src/llmq/quorums_dkgsessionhandler.cpp b/src/llmq/quorums_dkgsessionhandler.cpp index 9d78c3a692ef..93c057ceeac3 100644 --- a/src/llmq/quorums_dkgsessionhandler.cpp +++ b/src/llmq/quorums_dkgsessionhandler.cpp @@ -45,7 +45,7 @@ void CDKGPendingMessages::PushPendingMessage(NodeId from, CDataStream& vRecv, in LOCK2(cs_main, cs); - EraseObjectRequest(hash); + EraseObjectRequest(from, CInv(invType, hash)); if (!seenMessages.emplace(hash).second) { LogPrint(BCLog::NET, "CDKGPendingMessages::%s -- already seen %s, peer=%d\n", __func__, hash.ToString(), from); diff --git a/src/llmq/quorums_signing.cpp b/src/llmq/quorums_signing.cpp index 09dd27e61b4d..55313bd886a0 100644 --- a/src/llmq/quorums_signing.cpp +++ b/src/llmq/quorums_signing.cpp @@ -4,6 +4,7 @@ // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include "quorums_signing.h" +#include "chainparams.h" #include "clientversion.h" #include "netaddress.h" #include "quorums_signing_shares.h" @@ -642,7 +643,7 @@ void CSigningManager::ProcessRecoveredSig(NodeId nodeId, const CRecoveredSig& re { LOCK(cs_main); - EraseObjectRequest(recoveredSig.GetHash()); + EraseObjectRequest(nodeId, CInv(MSG_QUORUM_RECOVERED_SIG, recoveredSig.GetHash())); } if (db.HasRecoveredSigForHash(recoveredSig.GetHash())) { diff --git a/src/masternode-payments.cpp b/src/masternode-payments.cpp index 75fb800be819..cd0779560435 100644 --- a/src/masternode-payments.cpp +++ b/src/masternode-payments.cpp @@ -431,7 +431,7 @@ bool CMasternodePayments::ProcessMessageMasternodePayments(CNode* pfrom, std::st { // Clear inv request LOCK(cs_main); - EraseObjectRequest(winner.GetHash()); + EraseObjectRequest(pfrom->GetId(), CInv(MSG_MASTERNODE_WINNER, winner.GetHash())); } ProcessMNWinner(winner, pfrom, state); diff --git a/src/masternodeman.cpp b/src/masternodeman.cpp index c69058c43cd0..1eb0f1f69235 100644 --- a/src/masternodeman.cpp +++ b/src/masternodeman.cpp @@ -5,6 +5,8 @@ #include "masternodeman.h" +#include "addrman.h" +#include "chainparams.h" #include "evo/deterministicmns.h" #include "fs.h" #include "masternode-payments.h" @@ -968,7 +970,7 @@ int CMasternodeMan::ProcessMessageInner(CNode* pfrom, std::string& strCommand, C { // Clear inv request LOCK(cs_main); - EraseObjectRequest(mnb.GetHash()); + EraseObjectRequest(pfrom->GetId(), CInv(MSG_MASTERNODE_ANNOUNCE, mnb.GetHash())); } return ProcessMNBroadcast(pfrom, mnb); @@ -979,7 +981,7 @@ int CMasternodeMan::ProcessMessageInner(CNode* pfrom, std::string& strCommand, C { // Clear inv request LOCK(cs_main); - EraseObjectRequest(mnb.GetHash()); + EraseObjectRequest(pfrom->GetId(), CInv(MSG_MASTERNODE_ANNOUNCE, mnb.GetHash())); } // For now, let's not process mnb2 with pre-BIP155 node addr format. @@ -998,7 +1000,7 @@ int CMasternodeMan::ProcessMessageInner(CNode* pfrom, std::string& strCommand, C { // Clear inv request LOCK(cs_main); - EraseObjectRequest(mnp.GetHash()); + EraseObjectRequest(pfrom->GetId(), CInv(MSG_MASTERNODE_PING, mnp.GetHash())); } return ProcessMNPing(pfrom, mnp); diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 84bb1cb900ad..8b47764325a1 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -538,11 +538,27 @@ static void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vec } } // namespace -void EraseObjectRequest(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main) + +void EraseObjectRequest(CNodeState* nodestate, const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { AssertLockHeld(cs_main); - g_already_asked_for.erase(hash); - g_erased_object_requests.insert(std::make_pair(hash, GetTimeMillis())); + g_already_asked_for.erase(inv.hash); + g_erased_object_requests.insert(std::make_pair(inv.hash, GetTimeMillis())); + + if (nodestate) { + nodestate->m_tx_download.m_tx_announced.erase(inv); + nodestate->m_tx_download.m_tx_in_flight.erase(inv); + } +} + +void EraseObjectRequest(NodeId nodeId, const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + AssertLockHeld(cs_main); + auto* state = State(nodeId); + if (!state) { + return; + } + EraseObjectRequest(state, inv); } int64_t GetObjectRequestTime(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main) @@ -1954,7 +1970,7 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR CNodeState* nodestate = State(pfrom->GetId()); nodestate->m_tx_download.m_tx_announced.erase(inv); nodestate->m_tx_download.m_tx_in_flight.erase(inv); - EraseObjectRequest(inv.hash); + EraseObjectRequest(pfrom->GetId(), inv); if (ptx->ContainsZerocoins()) { // Don't even try to check zerocoins at all. diff --git a/src/net_processing.h b/src/net_processing.h index 3a1fdd7d4019..97ba4ac38b94 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -81,7 +81,7 @@ using SecondsDouble = std::chrono::duration Date: Tue, 7 Apr 2020 13:26:47 +0200 Subject: [PATCH 08/10] More logging for object request handling --- src/net_processing.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 8b47764325a1..140fb635dc7b 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -542,6 +542,7 @@ static void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vec void EraseObjectRequest(CNodeState* nodestate, const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { AssertLockHeld(cs_main); + LogPrint(BCLog::NET, "%s -- inv=(%s)\n", __func__, inv.ToString()); g_already_asked_for.erase(inv.hash); g_erased_object_requests.insert(std::make_pair(inv.hash, GetTimeMillis())); @@ -642,6 +643,8 @@ void RequestObject(CNodeState* state, const CInv& inv, int64_t nNow) EXCLUSIVE_L int64_t process_time = CalculateObjectGetDataTime(inv, nNow, !state->fPreferredDownload); peer_download_state.m_tx_process_time.emplace(process_time, inv); + + LogPrint(BCLog::NET, "%s -- inv=(%s), nNow=%d, process_time=%d, delta=%d\n", __func__, inv.ToString(), nNow, process_time, process_time - nNow); } void RequestObject(NodeId nodeId, const CInv& inv, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main) @@ -2842,6 +2845,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM // processing at a later time, see below) tx_process_time.erase(tx_process_time.begin()); if (g_erased_object_requests.count(inv.hash)) { + LogPrint(BCLog::NET, "%s -- GETDATA skipping inv=(%s), peer=%d\n", __func__, inv.ToString(), pto->GetId()); state.m_tx_download.m_tx_announced.erase(inv); state.m_tx_download.m_tx_in_flight.erase(inv); continue; @@ -2866,11 +2870,13 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM // requests to outbound peers). int64_t next_process_time = CalculateObjectGetDataTime(inv, nNow, !state.fPreferredDownload); tx_process_time.emplace(next_process_time, inv); + LogPrint(BCLog::NET, "%s -- GETDATA re-queue inv=(%s), next_process_time=%d, delta=%d, peer=%d\n", __func__, inv.ToString(), next_process_time, next_process_time - nNow, pto->GetId()); } } else { // We have already seen this transaction, no need to download. state.m_tx_download.m_tx_announced.erase(inv); state.m_tx_download.m_tx_in_flight.erase(inv); + LogPrint(BCLog::NET, "%s -- GETDATA already seen inv=(%s), peer=%d\n", __func__, inv.ToString(), pto->GetId()); } } if (!vGetData.empty()) From 9d2965935abefea52361858cca124f8bba070461 Mon Sep 17 00:00:00 2001 From: Alexander Block Date: Wed, 8 Apr 2020 14:27:07 +0200 Subject: [PATCH 09/10] net: Use mockable time for tx download --- src/net_processing.cpp | 153 ++++++++++++++++++++++++++++------------- src/random.cpp | 6 ++ src/random.h | 5 +- 3 files changed, 114 insertions(+), 50 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 140fb635dc7b..5f9c8116041b 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -41,13 +41,14 @@ static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100; /** Maximum number of announced transactions from a peer */ static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ; /** How many microseconds to delay requesting transactions from inbound peers */ -static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000; +static constexpr std::chrono::microseconds INBOUND_PEER_TX_DELAY{std::chrono::seconds{2}}; /** How long to wait (in microseconds) before downloading a transaction from an additional peer */ -static constexpr int64_t GETDATA_TX_INTERVAL = 60 * 1000000; +static constexpr std::chrono::microseconds GETDATA_TX_INTERVAL{std::chrono::seconds{60}}; +/** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */ /** How long to wait (expiry * factor microseconds) before expiring an in-flight getdata request to a peer */ static constexpr int64_t TX_EXPIRY_INTERVAL_FACTOR = 10; /** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */ -static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000; +static constexpr std::chrono::microseconds MAX_GETDATA_RANDOM_DELAY{std::chrono::seconds{2}}; static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY, "To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY"); /** Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */ @@ -298,13 +299,16 @@ struct CNodeState { /* Track when to attempt download of announced transactions (process * time in micros -> txid) */ - std::multimap m_tx_process_time; + std::multimap m_tx_process_time; //! Store all the transactions a peer has recently announced std::set m_tx_announced; - //! Store transactions which were requested by us - std::set m_tx_in_flight; + //! Store transactions which were requested by us, with timestamp + std::map m_tx_in_flight; + + //! Periodically check for stuck getdata requests + std::chrono::microseconds m_check_expiry_timer{0}; }; TxDownloadState m_tx_download; @@ -325,8 +329,8 @@ struct CNodeState { }; // Keeps track of the time (in microseconds) when transactions were requested last time -limitedmap g_already_asked_for(MAX_INV_SZ); -limitedmap g_erased_object_requests(MAX_INV_SZ); +limitedmap g_already_asked_for(MAX_INV_SZ); +limitedmap g_erased_object_requests(MAX_INV_SZ); /** Map maintaining per-node state. Requires cs_main. */ std::map mapNodeState; @@ -544,7 +548,7 @@ void EraseObjectRequest(CNodeState* nodestate, const CInv& inv) EXCLUSIVE_LOCKS_ AssertLockHeld(cs_main); LogPrint(BCLog::NET, "%s -- inv=(%s)\n", __func__, inv.ToString()); g_already_asked_for.erase(inv.hash); - g_erased_object_requests.insert(std::make_pair(inv.hash, GetTimeMillis())); + g_erased_object_requests.insert(std::make_pair(inv.hash, GetTime())); if (nodestate) { nodestate->m_tx_download.m_tx_announced.erase(inv); @@ -562,17 +566,17 @@ void EraseObjectRequest(NodeId nodeId, const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED EraseObjectRequest(state, inv); } -int64_t GetObjectRequestTime(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +std::chrono::microseconds GetObjectRequestTime(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { AssertLockHeld(cs_main); auto it = g_already_asked_for.find(hash); if (it != g_already_asked_for.end()) { return it->second; } - return 0; + return {}; } -void UpdateObjectRequestTime(const uint256& hash, int64_t request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +void UpdateObjectRequestTime(const uint256& hash, std::chrono::microseconds request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { auto it = g_already_asked_for.find(hash); @@ -583,40 +587,39 @@ void UpdateObjectRequestTime(const uint256& hash, int64_t request_time) EXCLUSIV } } - -int64_t GetObjectInterval(int invType) +std::chrono::microseconds GetObjectInterval(int invType) { // some messages need to be re-requested faster when the first announcing peer did not answer to GETDATA switch (invType) { case MSG_QUORUM_RECOVERED_SIG: - return 15 * 1000000; + return std::chrono::seconds(15); case MSG_CLSIG: - return 5 * 1000000; + return std::chrono::seconds(5); default: return GETDATA_TX_INTERVAL; } } -int64_t GetObjectExpiryInterval(int invType) +std::chrono::microseconds GetObjectExpiryInterval(int invType) { return GetObjectInterval(invType) * TX_EXPIRY_INTERVAL_FACTOR; } -int64_t GetObjectRandomDelay(int invType) +std::chrono::microseconds GetObjectRandomDelay(int invType) { if (invType == MSG_TX) { - return GetRand(MAX_GETDATA_RANDOM_DELAY); + return GetRandMicros(MAX_GETDATA_RANDOM_DELAY); } - return 0; + return {}; } -int64_t CalculateObjectGetDataTime(const CInv& inv, int64_t current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +std::chrono::microseconds CalculateObjectGetDataTime(const CInv& inv, std::chrono::microseconds current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { AssertLockHeld(cs_main); - int64_t process_time; - int64_t last_request_time = GetObjectRequestTime(inv.hash); + std::chrono::microseconds process_time; + const auto last_request_time = GetObjectRequestTime(inv.hash); // First time requesting this tx - if (last_request_time == 0) { + if (last_request_time.count() == 0) { process_time = current_time; } else { // Randomize the delay to avoid biasing some peers over others (such as due to @@ -629,7 +632,7 @@ int64_t CalculateObjectGetDataTime(const CInv& inv, int64_t current_time, bool u return process_time; } -void RequestObject(CNodeState* state, const CInv& inv, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +void RequestObject(CNodeState* state, const CInv& inv, std::chrono::microseconds current_time, bool fForce = false) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { CNodeState::TxDownloadState& peer_download_state = state->m_tx_download; if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS || peer_download_state.m_tx_announced.count(inv)) { @@ -640,24 +643,48 @@ void RequestObject(CNodeState* state, const CInv& inv, int64_t nNow) EXCLUSIVE_L peer_download_state.m_tx_announced.insert(inv); // Calculate the time to try requesting this transaction. Use // fPreferredDownload as a proxy for outbound peers. - int64_t process_time = CalculateObjectGetDataTime(inv, nNow, !state->fPreferredDownload); + std::chrono::microseconds process_time = CalculateObjectGetDataTime(inv, current_time, !state->fPreferredDownload); peer_download_state.m_tx_process_time.emplace(process_time, inv); + if (fForce) { + // make sure this object is actually requested ASAP + g_erased_object_requests.erase(inv.hash); + g_already_asked_for.erase(inv.hash); + } - LogPrint(BCLog::NET, "%s -- inv=(%s), nNow=%d, process_time=%d, delta=%d\n", __func__, inv.ToString(), nNow, process_time, process_time - nNow); + LogPrint(BCLog::NET, "%s -- inv=(%s), current_time=%d, process_time=%d, delta=%d\n", __func__, inv.ToString(), current_time.count(), process_time.count(), (process_time - current_time).count()); } -void RequestObject(NodeId nodeId, const CInv& inv, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +void RequestObject(NodeId nodeId, const CInv& inv, std::chrono::microseconds current_time, bool fForce) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { AssertLockHeld(cs_main); auto* state = State(nodeId); if (!state) { return; } - RequestObject(state, inv, nNow); + + RequestObject(state, inv, current_time, fForce); +} + +size_t GetRequestedObjectCount(NodeId nodeId) +{ + AssertLockHeld(cs_main); + auto* state = State(nodeId); + if (!state) { + return 0; + } + return state->m_tx_download.m_tx_process_time.size(); +} + +// Returns true for outbound peers, excluding manual connections, feelers, and +// one-shots +bool IsOutboundDisconnectionCandidate(const CNode* node) +{ + return !(node->fInbound || node->fFeeler || node->fOneShot); } -void PeerLogicValidation::InitializeNode(CNode *pnode) { +void PeerLogicValidation::InitializeNode(CNode* pnode) +{ CAddress addr = pnode->addr; std::string addrName = pnode->GetAddrName(); NodeId nodeid = pnode->GetId(); @@ -1788,7 +1815,7 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR } LOCK(cs_main); - int64_t nNow = GetTimeMicros(); + const auto current_time = GetTime(); std::vector vToFetch; @@ -1836,8 +1863,8 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR // wait until we are sync if (!fAlreadyHave) { bool allowWhileInIBD = allowWhileInIBDObjs.count(inv.type); - if (allowWhileInIBD || !IsInitialBlockDownload()) { - RequestObject(State(pfrom->GetId()), inv, nNow); + if (allowWhileInIBD || (!fImporting && !fReindex && !IsInitialBlockDownload())) { + RequestObject(State(pfrom->GetId()), inv, current_time); } } } @@ -1869,7 +1896,6 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR else if (strCommand == NetMsgType::GETBLOCKS || strCommand == NetMsgType::GETHEADERS) { - // Don't relay blocks inv to masternode-only connections if (!pfrom->CanRelay()) { LogPrint(BCLog::NET, "getblocks, don't relay blocks inv to masternode connection. peer=%d\n", pfrom->GetId()); @@ -2062,11 +2088,16 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR } } if (!fRejectedParents) { - int64_t nNow = GetTimeMicros(); - for (const uint256& parent_txid : unique_parents) { - CInv _inv(MSG_TX, parent_txid); + const auto current_time = GetTime(); + + for (const CTxIn& txin : tx.vin) { + CInv _inv(MSG_TX, txin.prevout.hash); pfrom->AddInventoryKnown(_inv); - if (!AlreadyHave(_inv)) RequestObject(State(pfrom->GetId()), _inv, nNow); + if (!AlreadyHave(_inv)) RequestObject(State(pfrom->GetId()), _inv, current_time); + // We don't know if the previous tx was a regular or a mixing one, try both + CInv _inv2(MSG_DSTX, txin.prevout.hash); + pfrom->AddInventoryKnown(_inv2); + if (!AlreadyHave(_inv2)) RequestObject(State(pfrom->GetId()), _inv2, current_time); } AddOrphanTx(ptx, pfrom->GetId()); @@ -2076,7 +2107,7 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR if (nEvicted > 0) LogPrint(BCLog::MEMPOOL, "mapOrphan overflow, removed %u tx\n", nEvicted); } else { - LogPrint(BCLog::MEMPOOL, "not keeping orphan with rejected parents %s\n",tx.GetHash().ToString()); + LogPrint(BCLog::MEMPOOL, "not keeping orphan with rejected parents %s\n", tx.GetHash().ToString()); } } else { // AcceptToMemoryPool() returned false, possibly because the tx is @@ -2225,7 +2256,6 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR else if (strCommand == NetMsgType::MEMPOOL) { - if (!(pfrom->GetLocalServices() & NODE_BLOOM) && !pfrom->fWhitelisted) { LogPrint(BCLog::NET, "mempool request with bloom filters disabled, disconnect peer=%d\n", pfrom->GetId()); pfrom->fDisconnect = true; @@ -2793,7 +2823,9 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM // Detect whether we're stalling current_time = GetTime(); - nNow = GetTimeMicros(); + // nNow is the current system time (GetTimeMicros is not mockable) and + // should be replaced by the mockable current_time eventually + if (state.nStallingSince && state.nStallingSince < nNow - 1000000 * BLOCK_STALLING_TIMEOUT) { // Stalling only triggers when the block download window cannot move. During normal steady state, // the download window should be much larger than the to-be-downloaded set of blocks, so disconnection @@ -2838,9 +2870,31 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM // // Message: getdata (non-blocks) // + + // For robustness, expire old requests after a long timeout, so that + // we can resume downloading transactions from a peer even if they + // were unresponsive in the past. + // Eventually we should consider disconnecting peers, but this is + // conservative. + if (state.m_tx_download.m_check_expiry_timer <= current_time) { + for (auto it = state.m_tx_download.m_tx_in_flight.begin(); it != state.m_tx_download.m_tx_in_flight.end();) { + if (it->second <= current_time - GetObjectExpiryInterval(it->first.type)) { + LogPrint(BCLog::NET, "timeout of inflight tx %s from peer=%d\n", it->first.ToString(), pto->GetId()); + state.m_tx_download.m_tx_announced.erase(it->first); + state.m_tx_download.m_tx_in_flight.erase(it++); + } else { + ++it; + } + } + // On average, we do this check every TX_EXPIRY_INTERVAL. Randomize + // so that we're not doing this for all peers at the same time. + state.m_tx_download.m_check_expiry_timer = current_time + GetObjectExpiryInterval(MSG_TX) / 2 + GetRandMicros(GetObjectExpiryInterval(MSG_TX)); + } + + // DASH this code also handles non-TXs (Dash specific messages) auto& tx_process_time = state.m_tx_download.m_tx_process_time; - while (!tx_process_time.empty() && tx_process_time.begin()->first <= nNow && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) { - const auto& inv = tx_process_time.begin()->second; + while (!tx_process_time.empty() && tx_process_time.begin()->first <= current_time && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) { + const CInv inv = tx_process_time.begin()->second; // Erase this entry from tx_process_time (it may be added back for // processing at a later time, see below) tx_process_time.erase(tx_process_time.begin()); @@ -2853,24 +2907,25 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic& interruptM if (!AlreadyHave(inv)) { // If this transaction was last requested more than 1 minute ago, // then request. - int64_t last_request_time = GetObjectRequestTime(inv.hash); - if (last_request_time <= nNow - GetObjectExpiryInterval(inv.type)) { + const auto last_request_time = GetObjectRequestTime(inv.hash); + if (last_request_time <= current_time - GetObjectInterval(inv.type)) { LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId()); vGetData.push_back(inv); if (vGetData.size() >= MAX_GETDATA_SZ) { connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); vGetData.clear(); } - UpdateObjectRequestTime(inv.hash, nNow); - state.m_tx_download.m_tx_in_flight.insert(inv); + + UpdateObjectRequestTime(inv.hash, current_time); + state.m_tx_download.m_tx_in_flight.emplace(inv, current_time); } else { // This transaction is in flight from someone else; queue // up processing to happen after the download times out // (with a slight delay for inbound peers, to prefer // requests to outbound peers). - int64_t next_process_time = CalculateObjectGetDataTime(inv, nNow, !state.fPreferredDownload); + const auto next_process_time = CalculateObjectGetDataTime(inv, current_time, !state.fPreferredDownload); tx_process_time.emplace(next_process_time, inv); - LogPrint(BCLog::NET, "%s -- GETDATA re-queue inv=(%s), next_process_time=%d, delta=%d, peer=%d\n", __func__, inv.ToString(), next_process_time, next_process_time - nNow, pto->GetId()); + LogPrint(BCLog::NET, "%s -- GETDATA re-queue inv=(%s), next_process_time=%d, delta=%d, peer=%d\n", __func__, inv.ToString(), next_process_time.count(), (next_process_time - current_time).count(), pto->GetId()); } } else { // We have already seen this transaction, no need to download. diff --git a/src/random.cpp b/src/random.cpp index 6a44b86f2b94..689462fe59dc 100644 --- a/src/random.cpp +++ b/src/random.cpp @@ -720,3 +720,9 @@ void RandomInit() ReportHardwareRand(); } + + +std::chrono::microseconds GetRandMicros(std::chrono::microseconds duration_max) noexcept +{ + return std::chrono::microseconds{GetRand(duration_max.count())}; +} diff --git a/src/random.h b/src/random.h index 407d6c802a23..8e7ee6b8c540 100644 --- a/src/random.h +++ b/src/random.h @@ -11,8 +11,9 @@ #include "crypto/common.h" #include "uint256.h" -#include +#include // For std::chrono::microseconds #include +#include /** * Overall design of the RNG and entropy sources. @@ -251,4 +252,6 @@ bool Random_SanityCheck(); */ void RandomInit(); +std::chrono::microseconds GetRandMicros(std::chrono::microseconds duration_max) noexcept; + #endif // PIVX_RANDOM_H From 2ca6dd2c04ec827996561fd727db0f77c27f6b95 Mon Sep 17 00:00:00 2001 From: Duddino Date: Mon, 17 Feb 2025 14:14:09 +0100 Subject: [PATCH 10/10] Update tests to the new mocktime and RequestObject --- src/net_processing.cpp | 25 ++++++++++++++++--- src/net_processing.h | 4 +-- test/functional/p2p_invalid_messages.py | 20 ++++++++------- test/functional/test_framework/messages.py | 8 +++++- .../test_framework/test_framework.py | 8 +++--- .../tiertwo_governance_invalid_budget.py | 4 +-- test/functional/tiertwo_governance_reorg.py | 4 +-- 7 files changed, 50 insertions(+), 23 deletions(-) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 5f9c8116041b..c889738b7cf9 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -632,7 +632,7 @@ std::chrono::microseconds CalculateObjectGetDataTime(const CInv& inv, std::chron return process_time; } -void RequestObject(CNodeState* state, const CInv& inv, std::chrono::microseconds current_time, bool fForce = false) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +void RequestObject(CNodeState* state, const CInv& inv, std::chrono::microseconds current_time, bool fForce = true) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { CNodeState::TxDownloadState& peer_download_state = state->m_tx_download; if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS || peer_download_state.m_tx_announced.count(inv)) { @@ -660,7 +660,7 @@ void RequestObject(NodeId nodeId, const CInv& inv, std::chrono::microseconds cur AssertLockHeld(cs_main); auto* state = State(nodeId); if (!state) { - return; + return; } RequestObject(state, inv, current_time, fForce); @@ -2400,8 +2400,25 @@ bool static ProcessMessage(CNode* pfrom, std::string strCommand, CDataStream& vR } else if (strCommand == NetMsgType::NOTFOUND) { - // We do not care about the NOTFOUND message (for now), but logging an Unknown Command - // message is undesirable as we transmit it ourselves. + // Remove the NOTFOUND transactions from the peer + LOCK(cs_main); + CNodeState *state = State(pfrom->GetId()); + std::vector vInv; + vRecv >> vInv; + if (vInv.size() <= MAX_PEER_TX_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER) { + for (CInv &inv : vInv) { + // If we receive a NOTFOUND message for a txid we requested, erase + // it from our data structures for this peer. + auto in_flight_it = state->m_tx_download.m_tx_in_flight.find(inv); + if (in_flight_it == state->m_tx_download.m_tx_in_flight.end()) { + // Skip any further work if this is a spurious NOTFOUND + // message. + continue; + } + state->m_tx_download.m_tx_in_flight.erase(in_flight_it); + state->m_tx_download.m_tx_announced.erase(inv); + } + } return true; } diff --git a/src/net_processing.h b/src/net_processing.h index 97ba4ac38b94..61d82f5e547e 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -15,9 +15,9 @@ extern RecursiveMutex cs_main; // !TODO: change mutex to cs_orphans /** Default for -maxorphantx, maximum number of orphan transactions kept in memory */ static const unsigned int DEFAULT_MAX_ORPHAN_TRANSACTIONS = 25; /** Expiration time for orphan transactions in seconds */ -static const int64_t ORPHAN_TX_EXPIRE_TIME = 20 * 60; +static const int64_t ORPHAN_TX_EXPIRE_TIME = 5 * 60; /** Minimum time between orphan transactions expire time checks in seconds */ -static const int64_t ORPHAN_TX_EXPIRE_INTERVAL = 5 * 60; +static const int64_t ORPHAN_TX_EXPIRE_INTERVAL = 1 * 60; /** Default for -blockspamfilter, use header spam filter */ static const bool DEFAULT_BLOCK_SPAM_FILTER = true; /** Default for -blockspamfiltermaxsize, maximum size of the list of indexes in the block spam filter */ diff --git a/test/functional/p2p_invalid_messages.py b/test/functional/p2p_invalid_messages.py index 8c52b10af2a8..8ceffb20115f 100755 --- a/test/functional/p2p_invalid_messages.py +++ b/test/functional/p2p_invalid_messages.py @@ -8,11 +8,11 @@ import time from test_framework import messages +from test_framework.messages import CTxIn, COutPoint, msg_mnping from test_framework.mininode import ( P2PDataStore, P2PInterface, ) -from test_framework.messages import CTxIn, COutPoint, msg_mnping from test_framework.test_framework import PivxTestFramework from test_framework.util import ( assert_equal, @@ -212,24 +212,26 @@ def test_large_inv(self): def test_fill_askfor(self): self.nodes[0].generate(1) # IBD conn = self.nodes[0].add_p2p_connection(InvReceiver()) + self.disable_mocktime() invs = [] blockhash = int(self.nodes[0].getbestblockhash(), 16) - for _ in range(50000): + total_requests = 100 + for _ in range(total_requests): mnp = msg_mnping(CTxIn(COutPoint(getrandbits(256))), blockhash, int(time.time())) - conn.vec_mnp[mnp.get_hash()] = mnp - invs.append(messages.CInv(15, mnp.get_hash())) - assert_equal(len(conn.vec_mnp), 50000) - assert_equal(len(invs), 50000) + hash = mnp.get_hash() + conn.vec_mnp[hash] = mnp + invs.append(messages.CInv(15, hash)) + assert_equal(len(conn.vec_mnp), total_requests) + assert_equal(len(invs), total_requests) msg = messages.msg_inv(invs) conn.send_message(msg) - conn.wait_for_p2p_messages(50000) - + conn.wait_for_p2p_messages(total_requests) # Prior #2611 the node was blocking any follow-up request. mnp = msg_mnping(CTxIn(COutPoint(getrandbits(256))), getrandbits(256), int(time.time())) conn.vec_mnp[mnp.get_hash()] = mnp msg = messages.msg_inv([messages.CInv(15, mnp.get_hash())]) conn.send_and_ping(msg) - conn.wait_for_p2p_messages(50001) + conn.wait_for_p2p_messages(total_requests + 1) self.nodes[0].disconnect_p2ps() def test_resource_exhaustion(self): diff --git a/test/functional/test_framework/messages.py b/test/functional/test_framework/messages.py index 694397c96bd5..1cb62393b515 100755 --- a/test/functional/test_framework/messages.py +++ b/test/functional/test_framework/messages.py @@ -1246,10 +1246,16 @@ def serialize(self): r += ser_string(self.vch_sig) r += struct.pack("