From 3fba3d5deec6d7bae33823b8da7682f9b03d9deb Mon Sep 17 00:00:00 2001 From: TheCharlatan Date: Wed, 24 Jan 2024 10:28:55 +0100 Subject: [PATCH 1/7] [refactor] Make signals optional in mempool and chainman This is done in preparation for the next two commits, where the CMainSignals are de-globalized. This avoids adding new constructor arguments to the ChainstateManager and CTxMemPool classes over the next two commits. This could also allow future tests that are only interested in the internal behaviour of the classes to forgo instantiating the signals. --- .../devtools/test_deterministic_coverage.sh | 26 ++++----- src/bitcoin-chainstate.cpp | 1 + src/init.cpp | 2 + src/kernel/chainstatemanager_opts.h | 2 + src/kernel/mempool_options.h | 4 ++ src/test/util/setup_common.cpp | 1 + src/test/util/txmempool.cpp | 2 + .../validation_chainstatemanager_tests.cpp | 1 + src/txmempool.cpp | 11 ++-- src/txmempool.h | 3 + src/validation.cpp | 58 ++++++++++++------- 11 files changed, 72 insertions(+), 39 deletions(-) diff --git a/contrib/devtools/test_deterministic_coverage.sh b/contrib/devtools/test_deterministic_coverage.sh index 8501c72f04..23c260b529 100755 --- a/contrib/devtools/test_deterministic_coverage.sh +++ b/contrib/devtools/test_deterministic_coverage.sh @@ -17,21 +17,21 @@ NON_DETERMINISTIC_TESTS=( "blockfilter_index_tests/blockfilter_index_initial_sync" # src/checkqueue.h: In CCheckQueue::Loop(): while (queue.empty()) { ... } "coinselector_tests/knapsack_solver_test" # coinselector_tests.cpp: if (equal_sets(setCoinsRet, setCoinsRet2)) "fs_tests/fsbridge_fstream" # deterministic test failure? - "miner_tests/CreateNewBlock_validity" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10) + "miner_tests/CreateNewBlock_validity" # validation.cpp: if (signals.CallbacksPending() > 10) "scheduler_tests/manythreads" # scheduler.cpp: CScheduler::serviceQueue() "scheduler_tests/singlethreadedscheduler_ordered" # scheduler.cpp: CScheduler::serviceQueue() - "txvalidationcache_tests/checkinputs_test" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10) - "txvalidationcache_tests/tx_mempool_block_doublespend" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10) - "txindex_tests/txindex_initial_sync" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10) - "txvalidation_tests/tx_mempool_reject_coinbase" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10) - "validation_block_tests/processnewblock_signals_ordering" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10) - "wallet_tests/coin_mark_dirty_immature_credit" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10) - "wallet_tests/dummy_input_size_test" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10) - "wallet_tests/importmulti_rescan" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10) - "wallet_tests/importwallet_rescan" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10) - "wallet_tests/ListCoins" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10) - "wallet_tests/scan_for_wallet_transactions" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10) - "wallet_tests/wallet_disableprivkeys" # validation.cpp: if (GetMainSignals().CallbacksPending() > 10) + "txvalidationcache_tests/checkinputs_test" # validation.cpp: if (signals.CallbacksPending() > 10) + "txvalidationcache_tests/tx_mempool_block_doublespend" # validation.cpp: if (signals.CallbacksPending() > 10) + "txindex_tests/txindex_initial_sync" # validation.cpp: if (signals.CallbacksPending() > 10) + "txvalidation_tests/tx_mempool_reject_coinbase" # validation.cpp: if (signals.CallbacksPending() > 10) + "validation_block_tests/processnewblock_signals_ordering" # validation.cpp: if (signals.CallbacksPending() > 10) + "wallet_tests/coin_mark_dirty_immature_credit" # validation.cpp: if (signals.CallbacksPending() > 10) + "wallet_tests/dummy_input_size_test" # validation.cpp: if (signals.CallbacksPending() > 10) + "wallet_tests/importmulti_rescan" # validation.cpp: if (signals.CallbacksPending() > 10) + "wallet_tests/importwallet_rescan" # validation.cpp: if (signals.CallbacksPending() > 10) + "wallet_tests/ListCoins" # validation.cpp: if (signals.CallbacksPending() > 10) + "wallet_tests/scan_for_wallet_transactions" # validation.cpp: if (signals.CallbacksPending() > 10) + "wallet_tests/wallet_disableprivkeys" # validation.cpp: if (signals.CallbacksPending() > 10) ) TEST_BITCOIN_BINARY="src/test/test_bitcoin" diff --git a/src/bitcoin-chainstate.cpp b/src/bitcoin-chainstate.cpp index c1a71ed749..577f398c1a 100644 --- a/src/bitcoin-chainstate.cpp +++ b/src/bitcoin-chainstate.cpp @@ -118,6 +118,7 @@ int main(int argc, char* argv[]) .chainparams = *chainparams, .datadir = abs_datadir, .notifications = *notifications, + .signals = &GetMainSignals(), }; const node::BlockManager::Options blockman_opts{ .chainparams = chainman_opts.chainparams, diff --git a/src/init.cpp b/src/init.cpp index 988daefeec..735177bfc0 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -1449,6 +1449,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) .chainparams = chainparams, .datadir = args.GetDataDirNet(), .notifications = *node.notifications, + .signals = &GetMainSignals(), }; Assert(ApplyArgsManOptions(args, chainman_opts)); // no error can happen, already checked in AppInitParameterInteraction @@ -1478,6 +1479,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) CTxMemPool::Options mempool_opts{ .check_ratio = chainparams.DefaultConsistencyChecks() ? 1 : 0, + .signals = &GetMainSignals(), }; auto result{ApplyArgsManOptions(args, chainparams, mempool_opts)}; if (!result) { diff --git a/src/kernel/chainstatemanager_opts.h b/src/kernel/chainstatemanager_opts.h index 864aac336e..766a812d91 100644 --- a/src/kernel/chainstatemanager_opts.h +++ b/src/kernel/chainstatemanager_opts.h @@ -18,6 +18,7 @@ #include class CChainParams; +class CMainSignals; static constexpr bool DEFAULT_CHECKPOINTS_ENABLED{true}; static constexpr auto DEFAULT_MAX_TIP_AGE{24h}; @@ -44,6 +45,7 @@ struct ChainstateManagerOpts { DBOptions coins_db{}; CoinsViewOptions coins_view{}; Notifications& notifications; + CMainSignals* signals{nullptr}; //! Number of script check worker threads. Zero means no parallel verification. int worker_threads_num{0}; }; diff --git a/src/kernel/mempool_options.h b/src/kernel/mempool_options.h index 753aebd455..1f69df95ff 100644 --- a/src/kernel/mempool_options.h +++ b/src/kernel/mempool_options.h @@ -13,6 +13,8 @@ #include #include +class CMainSignals; + /** Default for -maxmempool, maximum megabytes of mempool memory usage */ static constexpr unsigned int DEFAULT_MAX_MEMPOOL_SIZE_MB{300}; /** Default for -maxmempool when blocksonly is set */ @@ -56,6 +58,8 @@ struct MemPoolOptions { bool full_rbf{DEFAULT_MEMPOOL_FULL_RBF}; bool persist_v1_dat{DEFAULT_PERSIST_V1_DAT}; MemPoolLimits limits{}; + + CMainSignals* signals{nullptr}; }; } // namespace kernel diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp index 9f587d7ec0..b9c2659221 100644 --- a/src/test/util/setup_common.cpp +++ b/src/test/util/setup_common.cpp @@ -185,6 +185,7 @@ ChainTestingSetup::ChainTestingSetup(const ChainType chainType, const std::vecto .datadir = m_args.GetDataDirNet(), .check_block_index = true, .notifications = *m_node.notifications, + .signals = &GetMainSignals(), .worker_threads_num = 2, }; const BlockManager::Options blockman_opts{ diff --git a/src/test/util/txmempool.cpp b/src/test/util/txmempool.cpp index 3b4161ddd3..8c568c2eea 100644 --- a/src/test/util/txmempool.cpp +++ b/src/test/util/txmempool.cpp @@ -13,6 +13,7 @@ #include #include #include +#include using node::NodeContext; @@ -22,6 +23,7 @@ CTxMemPool::Options MemPoolOptionsForTest(const NodeContext& node) // Default to always checking mempool regardless of // chainparams.DefaultConsistencyChecks for tests .check_ratio = 1, + .signals = &GetMainSignals(), }; const auto result{ApplyArgsManOptions(*node.args, ::Params(), mempool_opts)}; Assert(result); diff --git a/src/test/validation_chainstatemanager_tests.cpp b/src/test/validation_chainstatemanager_tests.cpp index a33e71d50e..4bc0de6d9b 100644 --- a/src/test/validation_chainstatemanager_tests.cpp +++ b/src/test/validation_chainstatemanager_tests.cpp @@ -383,6 +383,7 @@ struct SnapshotTestSetup : TestChain100Setup { .chainparams = ::Params(), .datadir = chainman.m_options.datadir, .notifications = *m_node.notifications, + .signals = &GetMainSignals(), }; const BlockManager::Options blockman_opts{ .chainparams = chainman_opts.chainparams, diff --git a/src/txmempool.cpp b/src/txmempool.cpp index de340f6b6d..0bee27c2b2 100644 --- a/src/txmempool.cpp +++ b/src/txmempool.cpp @@ -406,7 +406,8 @@ CTxMemPool::CTxMemPool(const Options& opts) m_require_standard{opts.require_standard}, m_full_rbf{opts.full_rbf}, m_persist_v1_dat{opts.persist_v1_dat}, - m_limits{opts.limits} + m_limits{opts.limits}, + m_signals{opts.signals} { } @@ -487,12 +488,12 @@ void CTxMemPool::removeUnchecked(txiter it, MemPoolRemovalReason reason) // even if not directly reported below. uint64_t mempool_sequence = GetAndIncrementSequence(); - if (reason != MemPoolRemovalReason::BLOCK) { + if (reason != MemPoolRemovalReason::BLOCK && m_signals) { // Notify clients that a transaction has been removed from the mempool // for any reason except being included in a block. Clients interested // in transactions included in blocks can subscribe to the BlockConnected // notification. - GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason, mempool_sequence); + m_signals->TransactionRemovedFromMempool(it->GetSharedTx(), reason, mempool_sequence); } TRACE5(mempool, removed, it->GetTx().GetHash().data(), @@ -643,7 +644,9 @@ void CTxMemPool::removeForBlock(const std::vector& vtx, unsigne removeConflicts(*tx); ClearPrioritisation(tx->GetHash()); } - GetMainSignals().MempoolTransactionsRemovedForBlock(txs_removed_for_block, nBlockHeight); + if (m_signals) { + m_signals->MempoolTransactionsRemovedForBlock(txs_removed_for_block, nBlockHeight); + } lastRollingFeeUpdate = GetTime(); blockSinceLastRollingFeeBump = true; } diff --git a/src/txmempool.h b/src/txmempool.h index b98355c65f..5e0f22229b 100644 --- a/src/txmempool.h +++ b/src/txmempool.h @@ -40,6 +40,7 @@ #include class CChain; +class CMainSignals; /** Fake height value used in Coin to signify they are only in the memory pool (since 0.8) */ static const uint32_t MEMPOOL_HEIGHT = 0x7FFFFFFF; @@ -447,6 +448,8 @@ public: const Limits m_limits; + CMainSignals* const m_signals; + /** Create a new CTxMemPool. * Sanity checks will be off by default for performance, because otherwise * accepting transactions becomes O(N^2) where N is the number of transactions diff --git a/src/validation.cpp b/src/validation.cpp index 0552bde665..b3a53a537d 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -1224,13 +1224,14 @@ bool MemPoolAccept::SubmitPackage(const ATMPArgs& args, std::vector& results.emplace(ws.m_ptx->GetWitnessHash(), MempoolAcceptResult::Success(std::move(ws.m_replaced_transactions), ws.m_vsize, ws.m_base_fees, effective_feerate, effective_feerate_wtxids)); + if (!m_pool.m_signals) continue; const CTransaction& tx = *ws.m_ptx; const auto tx_info = NewMempoolTransactionInfo(ws.m_ptx, ws.m_base_fees, ws.m_vsize, ws.m_entry->GetHeight(), args.m_bypass_limits, args.m_package_submission, IsCurrentForFeeEstimation(m_active_chainstate), m_pool.HasNoInputsOf(tx)); - GetMainSignals().TransactionAddedToMempool(tx_info, m_pool.GetAndIncrementSequence()); + m_pool.m_signals->TransactionAddedToMempool(tx_info, m_pool.GetAndIncrementSequence()); } return all_submitted; } @@ -1238,7 +1239,7 @@ bool MemPoolAccept::SubmitPackage(const ATMPArgs& args, std::vector& MempoolAcceptResult MemPoolAccept::AcceptSingleTransaction(const CTransactionRef& ptx, ATMPArgs& args) { AssertLockHeld(cs_main); - LOCK(m_pool.cs); // mempool "read lock" (held through GetMainSignals().TransactionAddedToMempool()) + LOCK(m_pool.cs); // mempool "read lock" (held through m_pool.m_signals->TransactionAddedToMempool()) Workspace ws(ptx); const std::vector single_wtxid{ws.m_ptx->GetWitnessHash()}; @@ -1273,13 +1274,15 @@ MempoolAcceptResult MemPoolAccept::AcceptSingleTransaction(const CTransactionRef return MempoolAcceptResult::FeeFailure(ws.m_state, CFeeRate(ws.m_modified_fees, ws.m_vsize), {ws.m_ptx->GetWitnessHash()}); } - const CTransaction& tx = *ws.m_ptx; - const auto tx_info = NewMempoolTransactionInfo(ws.m_ptx, ws.m_base_fees, - ws.m_vsize, ws.m_entry->GetHeight(), - args.m_bypass_limits, args.m_package_submission, - IsCurrentForFeeEstimation(m_active_chainstate), - m_pool.HasNoInputsOf(tx)); - GetMainSignals().TransactionAddedToMempool(tx_info, m_pool.GetAndIncrementSequence()); + if (m_pool.m_signals) { + const CTransaction& tx = *ws.m_ptx; + const auto tx_info = NewMempoolTransactionInfo(ws.m_ptx, ws.m_base_fees, + ws.m_vsize, ws.m_entry->GetHeight(), + args.m_bypass_limits, args.m_package_submission, + IsCurrentForFeeEstimation(m_active_chainstate), + m_pool.HasNoInputsOf(tx)); + m_pool.m_signals->TransactionAddedToMempool(tx_info, m_pool.GetAndIncrementSequence()); + } return MempoolAcceptResult::Success(std::move(ws.m_replaced_transactions), ws.m_vsize, ws.m_base_fees, effective_feerate, single_wtxid); @@ -2691,9 +2694,9 @@ bool Chainstate::FlushStateToDisk( (bool)fFlushForPrune); } } - if (full_flush_completed) { + if (full_flush_completed && m_chainman.m_options.signals) { // Update best block in wallet (so we can detect restored wallets). - GetMainSignals().ChainStateFlushed(this->GetRole(), m_chain.GetLocator()); + m_chainman.m_options.signals->ChainStateFlushed(this->GetRole(), m_chain.GetLocator()); } } catch (const std::runtime_error& e) { return FatalError(m_chainman.GetNotifications(), state, std::string("System error while flushing: ") + e.what()); @@ -2860,7 +2863,9 @@ bool Chainstate::DisconnectTip(BlockValidationState& state, DisconnectedBlockTra UpdateTip(pindexDelete->pprev); // Let wallets know transactions went from 1-confirmed to // 0-confirmed or conflicted: - GetMainSignals().BlockDisconnected(pblock, pindexDelete); + if (m_chainman.m_options.signals) { + m_chainman.m_options.signals->BlockDisconnected(pblock, pindexDelete); + } return true; } @@ -2945,7 +2950,9 @@ bool Chainstate::ConnectTip(BlockValidationState& state, CBlockIndex* pindexNew, { CCoinsViewCache view(&CoinsTip()); bool rv = ConnectBlock(blockConnecting, state, pindexNew, view); - GetMainSignals().BlockChecked(blockConnecting, state); + if (m_chainman.m_options.signals) { + m_chainman.m_options.signals->BlockChecked(blockConnecting, state); + } if (!rv) { if (state.IsInvalid()) InvalidBlockFound(pindexNew, state); @@ -3206,10 +3213,10 @@ static bool NotifyHeaderTip(ChainstateManager& chainman) LOCKS_EXCLUDED(cs_main) return fNotify; } -static void LimitValidationInterfaceQueue() LOCKS_EXCLUDED(cs_main) { +static void LimitValidationInterfaceQueue(CMainSignals& signals) LOCKS_EXCLUDED(cs_main) { AssertLockNotHeld(cs_main); - if (GetMainSignals().CallbacksPending() > 10) { + if (signals.CallbacksPending() > 10) { SyncWithValidationInterfaceQueue(); } } @@ -3248,7 +3255,7 @@ bool Chainstate::ActivateBestChain(BlockValidationState& state, std::shared_ptr< // Note that if a validationinterface callback ends up calling // ActivateBestChain this may lead to a deadlock! We should // probably have a DEBUG_LOCKORDER test for this in the future. - LimitValidationInterfaceQueue(); + if (m_chainman.m_options.signals) LimitValidationInterfaceQueue(*m_chainman.m_options.signals); { LOCK(cs_main); @@ -3287,7 +3294,9 @@ bool Chainstate::ActivateBestChain(BlockValidationState& state, std::shared_ptr< for (const PerBlockConnectTrace& trace : connectTrace.GetBlocksConnected()) { assert(trace.pblock && trace.pindex); - GetMainSignals().BlockConnected(this->GetRole(), trace.pblock, trace.pindex); + if (m_chainman.m_options.signals) { + m_chainman.m_options.signals->BlockConnected(this->GetRole(), trace.pblock, trace.pindex); + } } // This will have been toggled in @@ -3313,7 +3322,9 @@ bool Chainstate::ActivateBestChain(BlockValidationState& state, std::shared_ptr< // Enqueue while holding cs_main to ensure that UpdatedBlockTip is called in the order in which blocks are connected if (this == &m_chainman.ActiveChainstate() && pindexFork != pindexNewTip) { // Notify ValidationInterface subscribers - GetMainSignals().UpdatedBlockTip(pindexNewTip, pindexFork, still_in_ibd); + if (m_chainman.m_options.signals) { + m_chainman.m_options.signals->UpdatedBlockTip(pindexNewTip, pindexFork, still_in_ibd); + } // Always notify the UI if a new block tip was connected if (kernel::IsInterrupted(m_chainman.GetNotifications().blockTip(GetSynchronizationState(still_in_ibd), *pindexNewTip))) { @@ -3447,7 +3458,7 @@ bool Chainstate::InvalidateBlock(BlockValidationState& state, CBlockIndex* pinde if (m_chainman.m_interrupt) break; // Make sure the queue of validation callbacks doesn't grow unboundedly. - LimitValidationInterfaceQueue(); + if (m_chainman.m_options.signals) LimitValidationInterfaceQueue(*m_chainman.m_options.signals); LOCK(cs_main); // Lock for as long as disconnectpool is in scope to make sure MaybeUpdateMempoolForReorg is @@ -4155,8 +4166,9 @@ bool ChainstateManager::AcceptBlock(const std::shared_ptr& pblock, // Header is valid/has work, merkle tree and segwit merkle tree are good...RELAY NOW // (but if it does not build on our best tip, let the SendMessages loop relay it) - if (!IsInitialBlockDownload() && ActiveTip() == pindex->pprev) - GetMainSignals().NewPoWValidBlock(pindex, pblock); + if (!IsInitialBlockDownload() && ActiveTip() == pindex->pprev && m_options.signals) { + m_options.signals->NewPoWValidBlock(pindex, pblock); + } // Write block to history file if (fNewBlock) *fNewBlock = true; @@ -4209,7 +4221,9 @@ bool ChainstateManager::ProcessNewBlock(const std::shared_ptr& blo ret = AcceptBlock(block, state, &pindex, force_processing, nullptr, new_block, min_pow_checked); } if (!ret) { - GetMainSignals().BlockChecked(*block, state); + if (m_options.signals) { + m_options.signals->BlockChecked(*block, state); + } return error("%s: AcceptBlock FAILED (%s)", __func__, state.ToString()); } } From 473dd4b97ae40e43e1a1a97fdbeb40be4855e9bc Mon Sep 17 00:00:00 2001 From: TheCharlatan Date: Thu, 18 Jan 2024 20:23:26 +0100 Subject: [PATCH 2/7] [refactor] Prepare for g_signals de-globalization To this end move some functions into the CMainSignals class. --- src/validationinterface.cpp | 60 ++++++++++++++++++++++------- src/validationinterface.h | 77 +++++++++++++++++++++---------------- 2 files changed, 91 insertions(+), 46 deletions(-) diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp index 5e944a7c47..91ab34d365 100644 --- a/src/validationinterface.cpp +++ b/src/validationinterface.cpp @@ -96,6 +96,10 @@ public: static CMainSignals g_signals; +CMainSignals::CMainSignals() {} + +CMainSignals::~CMainSignals() {} + void CMainSignals::RegisterBackgroundSignalScheduler(CScheduler& scheduler) { assert(!m_internals); @@ -125,46 +129,46 @@ CMainSignals& GetMainSignals() return g_signals; } -void RegisterSharedValidationInterface(std::shared_ptr callbacks) +void CMainSignals::RegisterSharedValidationInterface(std::shared_ptr callbacks) { // Each connection captures the shared_ptr to ensure that each callback is // executed before the subscriber is destroyed. For more details see #18338. - g_signals.m_internals->Register(std::move(callbacks)); + m_internals->Register(std::move(callbacks)); } -void RegisterValidationInterface(CValidationInterface* callbacks) +void CMainSignals::RegisterValidationInterface(CValidationInterface* callbacks) { // Create a shared_ptr with a no-op deleter - CValidationInterface lifecycle // is managed by the caller. RegisterSharedValidationInterface({callbacks, [](CValidationInterface*){}}); } -void UnregisterSharedValidationInterface(std::shared_ptr callbacks) +void CMainSignals::UnregisterSharedValidationInterface(std::shared_ptr callbacks) { UnregisterValidationInterface(callbacks.get()); } -void UnregisterValidationInterface(CValidationInterface* callbacks) +void CMainSignals::UnregisterValidationInterface(CValidationInterface* callbacks) { - if (g_signals.m_internals) { - g_signals.m_internals->Unregister(callbacks); + if (m_internals) { + m_internals->Unregister(callbacks); } } -void UnregisterAllValidationInterfaces() +void CMainSignals::UnregisterAllValidationInterfaces() { - if (!g_signals.m_internals) { + if (!m_internals) { return; } - g_signals.m_internals->Clear(); + m_internals->Clear(); } -void CallFunctionInValidationInterfaceQueue(std::function func) +void CMainSignals::CallFunctionInValidationInterfaceQueue(std::function func) { - g_signals.m_internals->m_schedulerClient.AddToProcessQueue(std::move(func)); + m_internals->m_schedulerClient.AddToProcessQueue(std::move(func)); } -void SyncWithValidationInterfaceQueue() +void CMainSignals::SyncWithValidationInterfaceQueue() { AssertLockNotHeld(cs_main); // Block until the validation queue drains @@ -273,3 +277,33 @@ void CMainSignals::NewPoWValidBlock(const CBlockIndex *pindex, const std::shared LOG_EVENT("%s: block hash=%s", __func__, block->GetHash().ToString()); m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.NewPoWValidBlock(pindex, block); }); } + +// These functions are temporary and will be removed in the following commit +void RegisterValidationInterface(CValidationInterface* callbacks) +{ + GetMainSignals().RegisterValidationInterface(callbacks); +} +void UnregisterValidationInterface(CValidationInterface* callbacks) +{ + GetMainSignals().UnregisterValidationInterface(callbacks); +} +void UnregisterAllValidationInterfaces() +{ + GetMainSignals().UnregisterAllValidationInterfaces(); +} +void RegisterSharedValidationInterface(std::shared_ptr callbacks) +{ + GetMainSignals().RegisterSharedValidationInterface(callbacks); +} +void UnregisterSharedValidationInterface(std::shared_ptr callbacks) +{ + GetMainSignals().UnregisterSharedValidationInterface(callbacks); +} +void CallFunctionInValidationInterfaceQueue(std::function func) +{ + GetMainSignals().CallFunctionInValidationInterfaceQueue(func); +} +void SyncWithValidationInterfaceQueue() +{ + GetMainSignals().SyncWithValidationInterfaceQueue(); +} diff --git a/src/validationinterface.h b/src/validationinterface.h index d9292ae2c9..0cd494233a 100644 --- a/src/validationinterface.h +++ b/src/validationinterface.h @@ -6,13 +6,16 @@ #ifndef BITCOIN_VALIDATIONINTERFACE_H #define BITCOIN_VALIDATIONINTERFACE_H -#include #include +#include #include // CTransaction(Ref) #include +#include +#include #include #include +#include class BlockValidationState; class CBlock; @@ -24,41 +27,14 @@ enum class MemPoolRemovalReason; struct RemovedMempoolTransactionInfo; struct NewMempoolTransactionInfo; -/** Register subscriber */ void RegisterValidationInterface(CValidationInterface* callbacks); -/** Unregister subscriber. DEPRECATED. This is not safe to use when the RPC server or main message handler thread is running. */ void UnregisterValidationInterface(CValidationInterface* callbacks); -/** Unregister all subscribers */ void UnregisterAllValidationInterfaces(); -// Alternate registration functions that release a shared_ptr after the last -// notification is sent. These are useful for race-free cleanup, since -// unregistration is nonblocking and can return before the last notification is -// processed. -/** Register subscriber */ void RegisterSharedValidationInterface(std::shared_ptr callbacks); -/** Unregister subscriber */ void UnregisterSharedValidationInterface(std::shared_ptr callbacks); -/** - * Pushes a function to callback onto the notification queue, guaranteeing any - * callbacks generated prior to now are finished when the function is called. - * - * Be very careful blocking on func to be called if any locks are held - - * validation interface clients may not be able to make progress as they often - * wait for things like cs_main, so blocking until func is called with cs_main - * will result in a deadlock (that DEBUG_LOCKORDER will miss). - */ void CallFunctionInValidationInterfaceQueue(std::function func); -/** - * This is a synonym for the following, which asserts certain locks are not - * held: - * std::promise promise; - * CallFunctionInValidationInterfaceQueue([&promise] { - * promise.set_value(); - * }); - * promise.get_future().wait(); - */ void SyncWithValidationInterfaceQueue() LOCKS_EXCLUDED(cs_main); /** @@ -194,12 +170,10 @@ class CMainSignals { private: std::unique_ptr m_internals; - friend void ::RegisterSharedValidationInterface(std::shared_ptr); - friend void ::UnregisterValidationInterface(CValidationInterface*); - friend void ::UnregisterAllValidationInterfaces(); - friend void ::CallFunctionInValidationInterfaceQueue(std::function func); - public: + CMainSignals(); + ~CMainSignals(); + /** Register a CScheduler to give callbacks which should run in the background (may only be called once) */ void RegisterBackgroundSignalScheduler(CScheduler& scheduler); /** Unregister a CScheduler to give callbacks which should run in the background - these callbacks will now be dropped! */ @@ -209,6 +183,43 @@ public: size_t CallbacksPending(); + /** Register subscriber */ + void RegisterValidationInterface(CValidationInterface* callbacks); + /** Unregister subscriber. DEPRECATED. This is not safe to use when the RPC server or main message handler thread is running. */ + void UnregisterValidationInterface(CValidationInterface* callbacks); + /** Unregister all subscribers */ + void UnregisterAllValidationInterfaces(); + + // Alternate registration functions that release a shared_ptr after the last + // notification is sent. These are useful for race-free cleanup, since + // unregistration is nonblocking and can return before the last notification is + // processed. + /** Register subscriber */ + void RegisterSharedValidationInterface(std::shared_ptr callbacks); + /** Unregister subscriber */ + void UnregisterSharedValidationInterface(std::shared_ptr callbacks); + + /** + * Pushes a function to callback onto the notification queue, guaranteeing any + * callbacks generated prior to now are finished when the function is called. + * + * Be very careful blocking on func to be called if any locks are held - + * validation interface clients may not be able to make progress as they often + * wait for things like cs_main, so blocking until func is called with cs_main + * will result in a deadlock (that DEBUG_LOCKORDER will miss). + */ + void CallFunctionInValidationInterfaceQueue(std::function func); + + /** + * This is a synonym for the following, which asserts certain locks are not + * held: + * std::promise promise; + * CallFunctionInValidationInterfaceQueue([&promise] { + * promise.set_value(); + * }); + * promise.get_future().wait(); + */ + void SyncWithValidationInterfaceQueue() LOCKS_EXCLUDED(cs_main); void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload); void TransactionAddedToMempool(const NewMempoolTransactionInfo&, uint64_t mempool_sequence); From 84f5c135b8118cbe15b8bfb4db80d61237987f64 Mon Sep 17 00:00:00 2001 From: TheCharlatan Date: Thu, 18 Jan 2024 20:23:48 +0100 Subject: [PATCH 3/7] refactor: De-globalize g_signals --- src/bench/wallet_balance.cpp | 3 +- src/bitcoin-chainstate.cpp | 14 ++++--- src/index/base.cpp | 8 ++-- src/init.cpp | 34 ++++++++++------- src/node/context.h | 4 ++ src/node/interfaces.cpp | 14 ++++--- src/node/transaction.cpp | 4 +- src/rpc/blockchain.cpp | 3 +- src/rpc/fees.cpp | 10 ++--- src/rpc/mining.cpp | 4 +- src/rpc/node.cpp | 2 +- src/test/coinstatsindex_tests.cpp | 2 +- src/test/fuzz/package_eval.cpp | 12 +++--- src/test/fuzz/process_message.cpp | 4 +- src/test/fuzz/process_messages.cpp | 4 +- src/test/fuzz/tx_pool.cpp | 10 ++--- src/test/peerman_tests.cpp | 4 +- src/test/policyestimator_tests.cpp | 20 +++++----- src/test/txindex_tests.cpp | 2 +- src/test/util/mining.cpp | 6 +-- src/test/util/setup_common.cpp | 10 +++-- src/test/util/txmempool.cpp | 3 +- src/test/validation_block_tests.cpp | 8 ++-- .../validation_chainstatemanager_tests.cpp | 6 +-- src/test/validationinterface_tests.cpp | 31 ++++++++++------ src/validation.cpp | 2 +- src/validationinterface.cpp | 37 ------------------- src/validationinterface.h | 13 ------- src/wallet/test/util.cpp | 3 +- src/wallet/test/wallet_tests.cpp | 8 ++-- 30 files changed, 131 insertions(+), 154 deletions(-) diff --git a/src/bench/wallet_balance.cpp b/src/bench/wallet_balance.cpp index bf2195293e..7a10b167a6 100644 --- a/src/bench/wallet_balance.cpp +++ b/src/bench/wallet_balance.cpp @@ -39,7 +39,8 @@ static void WalletBalance(benchmark::Bench& bench, const bool set_dirty, const b generatetoaddress(test_setup->m_node, address_mine.value_or(ADDRESS_WATCHONLY)); generatetoaddress(test_setup->m_node, ADDRESS_WATCHONLY); } - SyncWithValidationInterfaceQueue(); + // Calls SyncWithValidationInterfaceQueue + wallet.chain().waitForNotificationsIfTipChanged(uint256::ZERO); auto bal = GetBalance(wallet); // Cache diff --git a/src/bitcoin-chainstate.cpp b/src/bitcoin-chainstate.cpp index 577f398c1a..0514021a8e 100644 --- a/src/bitcoin-chainstate.cpp +++ b/src/bitcoin-chainstate.cpp @@ -74,10 +74,12 @@ int main(int argc, char* argv[]) // Start the lightweight task scheduler thread scheduler.m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { scheduler.serviceQueue(); }); + CMainSignals validation_signals{}; + // Gather some entropy once per minute. scheduler.scheduleEvery(RandAddPeriodic, std::chrono::minutes{1}); - GetMainSignals().RegisterBackgroundSignalScheduler(scheduler); + validation_signals.RegisterBackgroundSignalScheduler(scheduler); class KernelNotifications : public kernel::Notifications { @@ -118,7 +120,7 @@ int main(int argc, char* argv[]) .chainparams = *chainparams, .datadir = abs_datadir, .notifications = *notifications, - .signals = &GetMainSignals(), + .signals = &validation_signals, }; const node::BlockManager::Options blockman_opts{ .chainparams = chainman_opts.chainparams, @@ -236,9 +238,9 @@ int main(int argc, char* argv[]) bool new_block; auto sc = std::make_shared(block.GetHash()); - RegisterSharedValidationInterface(sc); + validation_signals.RegisterSharedValidationInterface(sc); bool accepted = chainman.ProcessNewBlock(blockptr, /*force_processing=*/true, /*min_pow_checked=*/true, /*new_block=*/&new_block); - UnregisterSharedValidationInterface(sc); + validation_signals.UnregisterSharedValidationInterface(sc); if (!new_block && accepted) { std::cerr << "duplicate" << std::endl; break; @@ -291,7 +293,7 @@ epilogue: scheduler.stop(); if (chainman.m_thread_load.joinable()) chainman.m_thread_load.join(); - GetMainSignals().FlushBackgroundCallbacks(); + validation_signals.FlushBackgroundCallbacks(); { LOCK(cs_main); for (Chainstate* chainstate : chainman.GetAll()) { @@ -301,5 +303,5 @@ epilogue: } } } - GetMainSignals().UnregisterBackgroundSignalScheduler(); + validation_signals.UnregisterBackgroundSignalScheduler(); } diff --git a/src/index/base.cpp b/src/index/base.cpp index bcfe7215be..2287437f8f 100644 --- a/src/index/base.cpp +++ b/src/index/base.cpp @@ -89,7 +89,7 @@ bool BaseIndex::Init() return &m_chain->context()->chainman->GetChainstateForIndexing()); // Register to validation interface before setting the 'm_synced' flag, so that // callbacks are not missed once m_synced is true. - RegisterValidationInterface(this); + m_chain->context()->validation_signals->RegisterValidationInterface(this); CBlockLocator locator; if (!GetDB().ReadBestBlock(locator)) { @@ -380,7 +380,7 @@ bool BaseIndex::BlockUntilSyncedToCurrentChain() const } LogPrintf("%s: %s is catching up on block notifications\n", __func__, GetName()); - SyncWithValidationInterfaceQueue(); + m_chain->context()->validation_signals->SyncWithValidationInterfaceQueue(); return true; } @@ -399,7 +399,9 @@ bool BaseIndex::StartBackgroundSync() void BaseIndex::Stop() { - UnregisterValidationInterface(this); + if (m_chain->context()->validation_signals) { + m_chain->context()->validation_signals->UnregisterValidationInterface(this); + } if (m_thread_sync.joinable()) { m_thread_sync.join(); diff --git a/src/init.cpp b/src/init.cpp index 735177bfc0..a5eeef589a 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -291,7 +291,7 @@ void Shutdown(NodeContext& node) // Because these depend on each-other, we make sure that neither can be // using the other before destroying them. - if (node.peerman) UnregisterValidationInterface(node.peerman.get()); + if (node.peerman && node.validation_signals) node.validation_signals->UnregisterValidationInterface(node.peerman.get()); if (node.connman) node.connman->Stop(); StopTorControl(); @@ -317,7 +317,9 @@ void Shutdown(NodeContext& node) // fee estimator from validation interface. if (node.fee_estimator) { node.fee_estimator->Flush(); - UnregisterValidationInterface(node.fee_estimator.get()); + if (node.validation_signals) { + node.validation_signals->UnregisterValidationInterface(node.fee_estimator.get()); + } } // FlushStateToDisk generates a ChainStateFlushed callback, which we should avoid missing @@ -332,7 +334,7 @@ void Shutdown(NodeContext& node) // After there are no more peers/RPC left to give us new data which may generate // CValidationInterface callbacks, flush them... - GetMainSignals().FlushBackgroundCallbacks(); + if (node.validation_signals) node.validation_signals->FlushBackgroundCallbacks(); // Stop and delete all indexes only after flushing background callbacks. if (g_txindex) { @@ -367,17 +369,20 @@ void Shutdown(NodeContext& node) #if ENABLE_ZMQ if (g_zmq_notification_interface) { - UnregisterValidationInterface(g_zmq_notification_interface.get()); + if (node.validation_signals) node.validation_signals->UnregisterValidationInterface(g_zmq_notification_interface.get()); g_zmq_notification_interface.reset(); } #endif node.chain_clients.clear(); - UnregisterAllValidationInterfaces(); - GetMainSignals().UnregisterBackgroundSignalScheduler(); + if (node.validation_signals) { + node.validation_signals->UnregisterAllValidationInterfaces(); + node.validation_signals->UnregisterBackgroundSignalScheduler(); + } node.mempool.reset(); node.fee_estimator.reset(); node.chainman.reset(); + node.validation_signals.reset(); node.scheduler.reset(); node.kernel.reset(); @@ -1158,7 +1163,10 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) } }, std::chrono::minutes{5}); - GetMainSignals().RegisterBackgroundSignalScheduler(*node.scheduler); + assert(!node.validation_signals); + node.validation_signals = std::make_unique(); + auto& validation_signals = *node.validation_signals; + validation_signals.RegisterBackgroundSignalScheduler(*node.scheduler); // Create client interfaces for wallets that are supposed to be loaded // according to -wallet and -disablewallet options. This only constructs @@ -1264,7 +1272,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) // Flush estimates to disk periodically CBlockPolicyEstimator* fee_estimator = node.fee_estimator.get(); node.scheduler->scheduleEvery([fee_estimator] { fee_estimator->FlushFeeEstimates(); }, FEE_FLUSH_INTERVAL); - RegisterValidationInterface(fee_estimator); + validation_signals.RegisterValidationInterface(fee_estimator); } // Check port numbers @@ -1435,7 +1443,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) }); if (g_zmq_notification_interface) { - RegisterValidationInterface(g_zmq_notification_interface.get()); + validation_signals.RegisterValidationInterface(g_zmq_notification_interface.get()); } #endif @@ -1449,7 +1457,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) .chainparams = chainparams, .datadir = args.GetDataDirNet(), .notifications = *node.notifications, - .signals = &GetMainSignals(), + .signals = &validation_signals, }; Assert(ApplyArgsManOptions(args, chainman_opts)); // no error can happen, already checked in AppInitParameterInteraction @@ -1479,7 +1487,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) CTxMemPool::Options mempool_opts{ .check_ratio = chainparams.DefaultConsistencyChecks() ? 1 : 0, - .signals = &GetMainSignals(), + .signals = &validation_signals, }; auto result{ApplyArgsManOptions(args, chainparams, mempool_opts)}; if (!result) { @@ -1507,7 +1515,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) // Drain the validation interface queue to ensure that the old indexes // don't have any pending work. - SyncWithValidationInterfaceQueue(); + Assert(node.validation_signals)->SyncWithValidationInterfaceQueue(); for (auto* index : node.indexes) { index->Interrupt(); @@ -1596,7 +1604,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) node.peerman = PeerManager::make(*node.connman, *node.addrman, node.banman.get(), chainman, *node.mempool, peerman_opts); - RegisterValidationInterface(node.peerman.get()); + validation_signals.RegisterValidationInterface(node.peerman.get()); // ********************************************************* Step 8: start indexers diff --git a/src/node/context.h b/src/node/context.h index 4f3b640b2d..189e5e6ae4 100644 --- a/src/node/context.h +++ b/src/node/context.h @@ -20,6 +20,7 @@ class BanMan; class BaseIndex; class CBlockPolicyEstimator; class CConnman; +class CMainSignals; class CScheduler; class CTxMemPool; class ChainstateManager; @@ -70,7 +71,10 @@ struct NodeContext { interfaces::WalletLoader* wallet_loader{nullptr}; std::unique_ptr scheduler; std::function rpc_interruption_point = [] {}; + //! Issues blocking calls about sync status, errors and warnings std::unique_ptr notifications; + //! Issues calls about blocks and transactions + std::unique_ptr validation_signals; std::atomic exit_status{EXIT_SUCCESS}; //! Declare default constructor and destructor that are not inline, so code diff --git a/src/node/interfaces.cpp b/src/node/interfaces.cpp index 5a8b7fc105..1814834f35 100644 --- a/src/node/interfaces.cpp +++ b/src/node/interfaces.cpp @@ -460,19 +460,20 @@ public: class NotificationsHandlerImpl : public Handler { public: - explicit NotificationsHandlerImpl(std::shared_ptr notifications) - : m_proxy(std::make_shared(std::move(notifications))) + explicit NotificationsHandlerImpl(CMainSignals& signals, std::shared_ptr notifications) + : m_signals{signals}, m_proxy{std::make_shared(std::move(notifications))} { - RegisterSharedValidationInterface(m_proxy); + m_signals.RegisterSharedValidationInterface(m_proxy); } ~NotificationsHandlerImpl() override { disconnect(); } void disconnect() override { if (m_proxy) { - UnregisterSharedValidationInterface(m_proxy); + m_signals.UnregisterSharedValidationInterface(m_proxy); m_proxy.reset(); } } + CMainSignals& m_signals; std::shared_ptr m_proxy; }; @@ -761,12 +762,12 @@ public: } std::unique_ptr handleNotifications(std::shared_ptr notifications) override { - return std::make_unique(std::move(notifications)); + return std::make_unique(validation_signals(), std::move(notifications)); } void waitForNotificationsIfTipChanged(const uint256& old_tip) override { if (!old_tip.IsNull() && old_tip == WITH_LOCK(::cs_main, return chainman().ActiveChain().Tip()->GetBlockHash())) return; - SyncWithValidationInterfaceQueue(); + validation_signals().SyncWithValidationInterfaceQueue(); } std::unique_ptr handleRpc(const CRPCCommand& command) override { @@ -822,6 +823,7 @@ public: NodeContext* context() override { return &m_node; } ArgsManager& args() { return *Assert(m_node.args); } ChainstateManager& chainman() { return *Assert(m_node.chainman); } + CMainSignals& validation_signals() { return *Assert(m_node.validation_signals); } NodeContext& m_node; }; } // namespace diff --git a/src/node/transaction.cpp b/src/node/transaction.cpp index ef9a30a076..b66a4f2f39 100644 --- a/src/node/transaction.cpp +++ b/src/node/transaction.cpp @@ -92,7 +92,7 @@ TransactionError BroadcastTransaction(NodeContext& node, const CTransactionRef t node.mempool->AddUnbroadcastTx(txid); } - if (wait_callback) { + if (wait_callback && node.validation_signals) { // For transactions broadcast from outside the wallet, make sure // that the wallet has been notified of the transaction before // continuing. @@ -101,7 +101,7 @@ TransactionError BroadcastTransaction(NodeContext& node, const CTransactionRef t // with a transaction to/from their wallet, immediately call some // wallet RPC, and get a stale result because callbacks have not // yet been processed. - CallFunctionInValidationInterfaceQueue([&promise] { + node.validation_signals->CallFunctionInValidationInterfaceQueue([&promise] { promise.set_value(); }); callback_set = true; diff --git a/src/rpc/blockchain.cpp b/src/rpc/blockchain.cpp index 34d308211b..03ddb5d14b 100644 --- a/src/rpc/blockchain.cpp +++ b/src/rpc/blockchain.cpp @@ -395,7 +395,8 @@ static RPCHelpMan syncwithvalidationinterfacequeue() }, [&](const RPCHelpMan& self, const JSONRPCRequest& request) -> UniValue { - SyncWithValidationInterfaceQueue(); + NodeContext& node = EnsureAnyNodeContext(request.context); + CHECK_NONFATAL(node.validation_signals)->SyncWithValidationInterfaceQueue(); return UniValue::VNULL; }, }; diff --git a/src/rpc/fees.cpp b/src/rpc/fees.cpp index 57ba486ed9..f3345b4f1c 100644 --- a/src/rpc/fees.cpp +++ b/src/rpc/fees.cpp @@ -4,6 +4,7 @@ // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include +#include #include #include #include @@ -21,10 +22,6 @@ #include #include -namespace node { -struct NodeContext; -} - using node::NodeContext; static RPCHelpMan estimatesmartfee() @@ -68,7 +65,7 @@ static RPCHelpMan estimatesmartfee() const NodeContext& node = EnsureAnyNodeContext(request.context); const CTxMemPool& mempool = EnsureMemPool(node); - SyncWithValidationInterfaceQueue(); + CHECK_NONFATAL(mempool.m_signals)->SyncWithValidationInterfaceQueue(); unsigned int max_target = fee_estimator.HighestTargetTracked(FeeEstimateHorizon::LONG_HALFLIFE); unsigned int conf_target = ParseConfirmTarget(request.params[0], max_target); bool conservative = true; @@ -156,8 +153,9 @@ static RPCHelpMan estimaterawfee() [&](const RPCHelpMan& self, const JSONRPCRequest& request) -> UniValue { CBlockPolicyEstimator& fee_estimator = EnsureAnyFeeEstimator(request.context); + const NodeContext& node = EnsureAnyNodeContext(request.context); - SyncWithValidationInterfaceQueue(); + CHECK_NONFATAL(node.validation_signals)->SyncWithValidationInterfaceQueue(); unsigned int max_target = fee_estimator.HighestTargetTracked(FeeEstimateHorizon::LONG_HALFLIFE); unsigned int conf_target = ParseConfirmTarget(request.params[0], max_target); double threshold = 0.95; diff --git a/src/rpc/mining.cpp b/src/rpc/mining.cpp index f1abfb6396..084b899ae9 100644 --- a/src/rpc/mining.cpp +++ b/src/rpc/mining.cpp @@ -1038,9 +1038,9 @@ static RPCHelpMan submitblock() bool new_block; auto sc = std::make_shared(block.GetHash()); - RegisterSharedValidationInterface(sc); + CHECK_NONFATAL(chainman.m_options.signals)->RegisterSharedValidationInterface(sc); bool accepted = chainman.ProcessNewBlock(blockptr, /*force_processing=*/true, /*min_pow_checked=*/true, /*new_block=*/&new_block); - UnregisterSharedValidationInterface(sc); + CHECK_NONFATAL(chainman.m_options.signals)->UnregisterSharedValidationInterface(sc); if (!new_block && accepted) { return "duplicate"; } diff --git a/src/rpc/node.cpp b/src/rpc/node.cpp index b085828215..ce9cda6b2c 100644 --- a/src/rpc/node.cpp +++ b/src/rpc/node.cpp @@ -90,7 +90,7 @@ static RPCHelpMan mockscheduler() const NodeContext& node_context{EnsureAnyNodeContext(request.context)}; CHECK_NONFATAL(node_context.scheduler)->MockForward(std::chrono::seconds{delta_seconds}); - SyncWithValidationInterfaceQueue(); + CHECK_NONFATAL(node_context.validation_signals)->SyncWithValidationInterfaceQueue(); for (const auto& chain_client : node_context.chain_clients) { chain_client->schedulerMockForward(std::chrono::seconds(delta_seconds)); } diff --git a/src/test/coinstatsindex_tests.cpp b/src/test/coinstatsindex_tests.cpp index cc1ec49d41..08814c1499 100644 --- a/src/test/coinstatsindex_tests.cpp +++ b/src/test/coinstatsindex_tests.cpp @@ -70,7 +70,7 @@ BOOST_FIXTURE_TEST_CASE(coinstatsindex_initial_sync, TestChain100Setup) // SyncWithValidationInterfaceQueue() call below is also needed to ensure // TSAN always sees the test thread waiting for the notification thread, and // avoid potential false positive reports. - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); // Shutdown sequence (c.f. Shutdown() in init.cpp) coin_stats_index.Stop(); diff --git a/src/test/fuzz/package_eval.cpp b/src/test/fuzz/package_eval.cpp index 9e658e0ced..a48ce37bce 100644 --- a/src/test/fuzz/package_eval.cpp +++ b/src/test/fuzz/package_eval.cpp @@ -47,7 +47,7 @@ void initialize_tx_pool() g_outpoints_coinbase_init_mature.push_back(prevout); } } - SyncWithValidationInterfaceQueue(); + g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue(); } struct OutpointsUpdater final : public CValidationInterface { @@ -147,7 +147,7 @@ FUZZ_TARGET(tx_package_eval, .init = initialize_tx_pool) } auto outpoints_updater = std::make_shared(mempool_outpoints); - RegisterSharedValidationInterface(outpoints_updater); + node.validation_signals->RegisterSharedValidationInterface(outpoints_updater); CTxMemPool tx_pool_{MakeMempool(fuzzed_data_provider, node)}; MockedTxPool& tx_pool = *static_cast(&tx_pool_); @@ -269,7 +269,7 @@ FUZZ_TARGET(tx_package_eval, .init = initialize_tx_pool) // Remember all added transactions std::set added; auto txr = std::make_shared(added); - RegisterSharedValidationInterface(txr); + node.validation_signals->RegisterSharedValidationInterface(txr); // When there are multiple transactions in the package, we call ProcessNewPackage(txs, test_accept=false) // and AcceptToMemoryPool(txs.back(), test_accept=true). When there is only 1 transaction, we might flip it @@ -285,8 +285,8 @@ FUZZ_TARGET(tx_package_eval, .init = initialize_tx_pool) /*bypass_limits=*/false, /*test_accept=*/!single_submit)); const bool passed = res.m_result_type == MempoolAcceptResult::ResultType::VALID; - SyncWithValidationInterfaceQueue(); - UnregisterSharedValidationInterface(txr); + node.validation_signals->SyncWithValidationInterfaceQueue(); + node.validation_signals->UnregisterSharedValidationInterface(txr); // There is only 1 transaction in the package. We did a test-package-accept and a ATMP if (single_submit) { @@ -310,7 +310,7 @@ FUZZ_TARGET(tx_package_eval, .init = initialize_tx_pool) CheckMempoolV3Invariants(tx_pool); } - UnregisterSharedValidationInterface(outpoints_updater); + node.validation_signals->UnregisterSharedValidationInterface(outpoints_updater); WITH_LOCK(::cs_main, tx_pool.check(chainstate.CoinsTip(), chainstate.m_chain.Height() + 1)); } diff --git a/src/test/fuzz/process_message.cpp b/src/test/fuzz/process_message.cpp index 56b391ed5c..a467fd5382 100644 --- a/src/test/fuzz/process_message.cpp +++ b/src/test/fuzz/process_message.cpp @@ -47,7 +47,7 @@ void initialize_process_message() for (int i = 0; i < 2 * COINBASE_MATURITY; i++) { MineBlock(g_setup->m_node, CScript() << OP_TRUE); } - SyncWithValidationInterfaceQueue(); + g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue(); } FUZZ_TARGET(process_message, .init = initialize_process_message) @@ -89,6 +89,6 @@ FUZZ_TARGET(process_message, .init = initialize_process_message) } g_setup->m_node.peerman->SendMessages(&p2p_node); } - SyncWithValidationInterfaceQueue(); + g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue(); g_setup->m_node.connman->StopNodes(); } diff --git a/src/test/fuzz/process_messages.cpp b/src/test/fuzz/process_messages.cpp index 6b264907b5..38acd432fa 100644 --- a/src/test/fuzz/process_messages.cpp +++ b/src/test/fuzz/process_messages.cpp @@ -37,7 +37,7 @@ void initialize_process_messages() for (int i = 0; i < 2 * COINBASE_MATURITY; i++) { MineBlock(g_setup->m_node, CScript() << OP_TRUE); } - SyncWithValidationInterfaceQueue(); + g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue(); } FUZZ_TARGET(process_messages, .init = initialize_process_messages) @@ -89,6 +89,6 @@ FUZZ_TARGET(process_messages, .init = initialize_process_messages) g_setup->m_node.peerman->SendMessages(&random_node); } } - SyncWithValidationInterfaceQueue(); + g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue(); g_setup->m_node.connman->StopNodes(); } diff --git a/src/test/fuzz/tx_pool.cpp b/src/test/fuzz/tx_pool.cpp index b44b528b6f..039665bf9f 100644 --- a/src/test/fuzz/tx_pool.cpp +++ b/src/test/fuzz/tx_pool.cpp @@ -50,7 +50,7 @@ void initialize_tx_pool() g_outpoints_coinbase_init_immature; outpoints.push_back(prevout); } - SyncWithValidationInterfaceQueue(); + g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue(); } struct TransactionsDelta final : public CValidationInterface { @@ -105,7 +105,7 @@ void Finish(FuzzedDataProvider& fuzzed_data_provider, MockedTxPool& tx_pool, Cha assert(tx_pool.size() < info_all.size()); WITH_LOCK(::cs_main, tx_pool.check(chainstate.CoinsTip(), chainstate.m_chain.Height() + 1)); } - SyncWithValidationInterfaceQueue(); + g_setup->m_node.validation_signals->SyncWithValidationInterfaceQueue(); } void MockTime(FuzzedDataProvider& fuzzed_data_provider, const Chainstate& chainstate) @@ -285,7 +285,7 @@ FUZZ_TARGET(tx_pool_standard, .init = initialize_tx_pool) std::set removed; std::set added; auto txr = std::make_shared(removed, added); - RegisterSharedValidationInterface(txr); + node.validation_signals->RegisterSharedValidationInterface(txr); const bool bypass_limits = fuzzed_data_provider.ConsumeBool(); // Make sure ProcessNewPackage on one transaction works. @@ -303,8 +303,8 @@ FUZZ_TARGET(tx_pool_standard, .init = initialize_tx_pool) const auto res = WITH_LOCK(::cs_main, return AcceptToMemoryPool(chainstate, tx, GetTime(), bypass_limits, /*test_accept=*/false)); const bool accepted = res.m_result_type == MempoolAcceptResult::ResultType::VALID; - SyncWithValidationInterfaceQueue(); - UnregisterSharedValidationInterface(txr); + node.validation_signals->SyncWithValidationInterfaceQueue(); + node.validation_signals->UnregisterSharedValidationInterface(txr); bool txid_in_mempool = tx_pool.exists(GenTxid::Txid(tx->GetHash())); bool wtxid_in_mempool = tx_pool.exists(GenTxid::Wtxid(tx->GetWitnessHash())); diff --git a/src/test/peerman_tests.cpp b/src/test/peerman_tests.cpp index 2c79329385..28866695bc 100644 --- a/src/test/peerman_tests.cpp +++ b/src/test/peerman_tests.cpp @@ -25,7 +25,7 @@ static void mineBlock(const node::NodeContext& node, std::chrono::seconds block_ block.fChecked = true; // little speedup SetMockTime(curr_time); // process block at current time Assert(node.chainman->ProcessNewBlock(std::make_shared(block), /*force_processing=*/true, /*min_pow_checked=*/true, nullptr)); - SyncWithValidationInterfaceQueue(); // drain events queue + node.validation_signals->SyncWithValidationInterfaceQueue(); // drain events queue } // Verifying when network-limited peer connections are desirable based on the node's proximity to the tip @@ -57,7 +57,7 @@ BOOST_AUTO_TEST_CASE(connections_desirable_service_flags) // By now, we tested that the connections desirable services flags change based on the node's time proximity to the tip. // Now, perform the same tests for when the node receives a block. - RegisterValidationInterface(peerman.get()); + m_node.validation_signals->RegisterValidationInterface(peerman.get()); // First, verify a block in the past doesn't enable limited peers connections // At this point, our time is (NODE_NETWORK_LIMITED_ALLOW_CONN_BLOCKS + 1) * 10 minutes ahead the tip's time. diff --git a/src/test/policyestimator_tests.cpp b/src/test/policyestimator_tests.cpp index ede73c6895..6cadc3290a 100644 --- a/src/test/policyestimator_tests.cpp +++ b/src/test/policyestimator_tests.cpp @@ -20,7 +20,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates) { CBlockPolicyEstimator& feeEst = *Assert(m_node.fee_estimator); CTxMemPool& mpool = *Assert(m_node.mempool); - RegisterValidationInterface(&feeEst); + m_node.validation_signals->RegisterValidationInterface(&feeEst); TestMemPoolEntryHelper entry; CAmount basefee(2000); CAmount deltaFee(100); @@ -74,7 +74,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates) /*submitted_in_package=*/false, /*chainstate_is_current=*/true, /*has_no_mempool_parents=*/true)}; - GetMainSignals().TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence()); + m_node.validation_signals->TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence()); } uint256 hash = tx.GetHash(); txHashes[j].push_back(hash); @@ -102,7 +102,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates) // Check after just a few txs that combining buckets works as expected if (blocknum == 3) { // Wait for fee estimator to catch up - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); // At this point we should need to combine 3 buckets to get enough data points // So estimateFee(1) should fail and estimateFee(2) should return somewhere around // 9*baserate. estimateFee(2) %'s are 100,100,90 = average 97% @@ -113,7 +113,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates) } // Wait for fee estimator to catch up - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); std::vector origFeeEst; // Highest feerate is 10*baseRate and gets in all blocks, @@ -146,7 +146,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates) } // Wait for fee estimator to catch up - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); BOOST_CHECK(feeEst.estimateFee(1) == CFeeRate(0)); for (int i = 2; i < 10;i++) { @@ -175,7 +175,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates) /*submitted_in_package=*/false, /*chainstate_is_current=*/true, /*has_no_mempool_parents=*/true)}; - GetMainSignals().TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence()); + m_node.validation_signals->TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence()); } uint256 hash = tx.GetHash(); txHashes[j].push_back(hash); @@ -188,7 +188,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates) } // Wait for fee estimator to catch up - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); for (int i = 1; i < 10;i++) { BOOST_CHECK(feeEst.estimateFee(i) == CFeeRate(0) || feeEst.estimateFee(i).GetFeePerK() > origFeeEst[i-1] - deltaFee); @@ -212,7 +212,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates) block.clear(); // Wait for fee estimator to catch up - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); BOOST_CHECK(feeEst.estimateFee(1) == CFeeRate(0)); for (int i = 2; i < 10;i++) { @@ -239,7 +239,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates) /*submitted_in_package=*/false, /*chainstate_is_current=*/true, /*has_no_mempool_parents=*/true)}; - GetMainSignals().TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence()); + m_node.validation_signals->TransactionAddedToMempool(tx_info, mpool.GetAndIncrementSequence()); } uint256 hash = tx.GetHash(); CTransactionRef ptx = mpool.get(hash); @@ -257,7 +257,7 @@ BOOST_AUTO_TEST_CASE(BlockPolicyEstimates) block.clear(); } // Wait for fee estimator to catch up - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); BOOST_CHECK(feeEst.estimateFee(1) == CFeeRate(0)); for (int i = 2; i < 9; i++) { // At 9, the original estimate was already at the bottom (b/c scale = 2) BOOST_CHECK(feeEst.estimateFee(i).GetFeePerK() < origFeeEst[i-1] - deltaFee); diff --git a/src/test/txindex_tests.cpp b/src/test/txindex_tests.cpp index e2432a4718..5a32b02ad9 100644 --- a/src/test/txindex_tests.cpp +++ b/src/test/txindex_tests.cpp @@ -71,7 +71,7 @@ BOOST_FIXTURE_TEST_CASE(txindex_initial_sync, TestChain100Setup) // SyncWithValidationInterfaceQueue() call below is also needed to ensure // TSAN always sees the test thread waiting for the notification thread, and // avoid potential false positive reports. - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); // shutdown sequence (c.f. Shutdown() in init.cpp) txindex.Stop(); diff --git a/src/test/util/mining.cpp b/src/test/util/mining.cpp index 08d1b4c902..ad7a38d3fe 100644 --- a/src/test/util/mining.cpp +++ b/src/test/util/mining.cpp @@ -95,12 +95,12 @@ COutPoint MineBlock(const NodeContext& node, std::shared_ptr& block) const auto old_height = WITH_LOCK(chainman.GetMutex(), return chainman.ActiveHeight()); bool new_block; BlockValidationStateCatcher bvsc{block->GetHash()}; - RegisterValidationInterface(&bvsc); + node.validation_signals->RegisterValidationInterface(&bvsc); const bool processed{chainman.ProcessNewBlock(block, true, true, &new_block)}; const bool duplicate{!new_block && processed}; assert(!duplicate); - UnregisterValidationInterface(&bvsc); - SyncWithValidationInterfaceQueue(); + node.validation_signals->UnregisterValidationInterface(&bvsc); + node.validation_signals->SyncWithValidationInterfaceQueue(); const bool was_valid{bvsc.m_state && bvsc.m_state->IsValid()}; assert(old_height + was_valid == WITH_LOCK(chainman.GetMutex(), return chainman.ActiveHeight())); diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp index b9c2659221..aad26f2013 100644 --- a/src/test/util/setup_common.cpp +++ b/src/test/util/setup_common.cpp @@ -171,7 +171,8 @@ ChainTestingSetup::ChainTestingSetup(const ChainType chainType, const std::vecto // from blocking due to queue overrun. m_node.scheduler = std::make_unique(); m_node.scheduler->m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { m_node.scheduler->serviceQueue(); }); - GetMainSignals().RegisterBackgroundSignalScheduler(*m_node.scheduler); + m_node.validation_signals = std::make_unique(); + m_node.validation_signals->RegisterBackgroundSignalScheduler(*m_node.scheduler); m_node.fee_estimator = std::make_unique(FeeestPath(*m_node.args), DEFAULT_ACCEPT_STALE_FEE_ESTIMATES); m_node.mempool = std::make_unique(MemPoolOptionsForTest(m_node)); @@ -185,7 +186,7 @@ ChainTestingSetup::ChainTestingSetup(const ChainType chainType, const std::vecto .datadir = m_args.GetDataDirNet(), .check_block_index = true, .notifications = *m_node.notifications, - .signals = &GetMainSignals(), + .signals = m_node.validation_signals.get(), .worker_threads_num = 2, }; const BlockManager::Options blockman_opts{ @@ -203,8 +204,8 @@ ChainTestingSetup::ChainTestingSetup(const ChainType chainType, const std::vecto ChainTestingSetup::~ChainTestingSetup() { if (m_node.scheduler) m_node.scheduler->stop(); - GetMainSignals().FlushBackgroundCallbacks(); - GetMainSignals().UnregisterBackgroundSignalScheduler(); + m_node.validation_signals->FlushBackgroundCallbacks(); + m_node.validation_signals->UnregisterBackgroundSignalScheduler(); m_node.connman.reset(); m_node.banman.reset(); m_node.addrman.reset(); @@ -213,6 +214,7 @@ ChainTestingSetup::~ChainTestingSetup() m_node.mempool.reset(); m_node.fee_estimator.reset(); m_node.chainman.reset(); + m_node.validation_signals.reset(); m_node.scheduler.reset(); } diff --git a/src/test/util/txmempool.cpp b/src/test/util/txmempool.cpp index 8c568c2eea..71cf8aca60 100644 --- a/src/test/util/txmempool.cpp +++ b/src/test/util/txmempool.cpp @@ -13,7 +13,6 @@ #include #include #include -#include using node::NodeContext; @@ -23,7 +22,7 @@ CTxMemPool::Options MemPoolOptionsForTest(const NodeContext& node) // Default to always checking mempool regardless of // chainparams.DefaultConsistencyChecks for tests .check_ratio = 1, - .signals = &GetMainSignals(), + .signals = node.validation_signals.get(), }; const auto result{ApplyArgsManOptions(*node.args, ::Params(), mempool_opts)}; Assert(result); diff --git a/src/test/validation_block_tests.cpp b/src/test/validation_block_tests.cpp index 35e5c6a037..316ab86c2b 100644 --- a/src/test/validation_block_tests.cpp +++ b/src/test/validation_block_tests.cpp @@ -158,7 +158,7 @@ BOOST_AUTO_TEST_CASE(processnewblock_signals_ordering) bool ignored; // Connect the genesis block and drain any outstanding events BOOST_CHECK(Assert(m_node.chainman)->ProcessNewBlock(std::make_shared(Params().GenesisBlock()), true, true, &ignored)); - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); // subscribe to events (this subscriber will validate event ordering) const CBlockIndex* initial_tip = nullptr; @@ -167,7 +167,7 @@ BOOST_AUTO_TEST_CASE(processnewblock_signals_ordering) initial_tip = m_node.chainman->ActiveChain().Tip(); } auto sub = std::make_shared(initial_tip->GetBlockHash()); - RegisterSharedValidationInterface(sub); + m_node.validation_signals->RegisterSharedValidationInterface(sub); // create a bunch of threads that repeatedly process a block generated above at random // this will create parallelism and randomness inside validation - the ValidationInterface @@ -196,9 +196,9 @@ BOOST_AUTO_TEST_CASE(processnewblock_signals_ordering) for (auto& t : threads) { t.join(); } - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); - UnregisterSharedValidationInterface(sub); + m_node.validation_signals->UnregisterSharedValidationInterface(sub); LOCK(cs_main); BOOST_CHECK_EQUAL(sub->m_expected_tip, m_node.chainman->ActiveChain().Tip()->GetBlockHash()); diff --git a/src/test/validation_chainstatemanager_tests.cpp b/src/test/validation_chainstatemanager_tests.cpp index 4bc0de6d9b..4bbab1cdcd 100644 --- a/src/test/validation_chainstatemanager_tests.cpp +++ b/src/test/validation_chainstatemanager_tests.cpp @@ -102,7 +102,7 @@ BOOST_FIXTURE_TEST_CASE(chainstatemanager, TestChain100Setup) BOOST_CHECK_EQUAL(active_tip2, c2.m_chain.Tip()); // Let scheduler events finish running to avoid accessing memory that is going to be unloaded - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); } //! Test rebalancing the caches associated with each chainstate. @@ -374,7 +374,7 @@ struct SnapshotTestSetup : TestChain100Setup { cs->ForceFlushStateToDisk(); } // Process all callbacks referring to the old manager before wiping it. - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); LOCK(::cs_main); chainman.ResetChainstates(); BOOST_CHECK_EQUAL(chainman.GetAll().size(), 0); @@ -383,7 +383,7 @@ struct SnapshotTestSetup : TestChain100Setup { .chainparams = ::Params(), .datadir = chainman.m_options.datadir, .notifications = *m_node.notifications, - .signals = &GetMainSignals(), + .signals = m_node.validation_signals.get(), }; const BlockManager::Options blockman_opts{ .chainparams = chainman_opts.chainparams, diff --git a/src/test/validationinterface_tests.cpp b/src/test/validationinterface_tests.cpp index 5979441057..1e883c4f20 100644 --- a/src/test/validationinterface_tests.cpp +++ b/src/test/validationinterface_tests.cpp @@ -28,7 +28,7 @@ BOOST_AUTO_TEST_CASE(unregister_validation_interface_race) const CBlock block_dummy; BlockValidationState state_dummy; while (generate) { - GetMainSignals().BlockChecked(block_dummy, state_dummy); + m_node.validation_signals->BlockChecked(block_dummy, state_dummy); } }}; @@ -37,8 +37,8 @@ BOOST_AUTO_TEST_CASE(unregister_validation_interface_race) // keep going for about 1 sec, which is 250k iterations for (int i = 0; i < 250000; i++) { auto sub = std::make_shared(); - RegisterSharedValidationInterface(sub); - UnregisterSharedValidationInterface(sub); + m_node.validation_signals->RegisterSharedValidationInterface(sub); + m_node.validation_signals->UnregisterSharedValidationInterface(sub); } // tell the other thread we are done generate = false; @@ -52,8 +52,8 @@ BOOST_AUTO_TEST_CASE(unregister_validation_interface_race) class TestInterface : public CValidationInterface { public: - TestInterface(std::function on_call = nullptr, std::function on_destroy = nullptr) - : m_on_call(std::move(on_call)), m_on_destroy(std::move(on_destroy)) + TestInterface(CMainSignals& signals, std::function on_call = nullptr, std::function on_destroy = nullptr) + : m_on_call(std::move(on_call)), m_on_destroy(std::move(on_destroy)), m_signals{signals} { } virtual ~TestInterface() @@ -64,14 +64,15 @@ public: { if (m_on_call) m_on_call(); } - static void Call() + void Call() { CBlock block; BlockValidationState state; - GetMainSignals().BlockChecked(block, state); + m_signals.BlockChecked(block, state); } std::function m_on_call; std::function m_on_destroy; + CMainSignals& m_signals; }; // Regression test to ensure UnregisterAllValidationInterfaces calls don't @@ -80,17 +81,23 @@ public: BOOST_AUTO_TEST_CASE(unregister_all_during_call) { bool destroyed = false; - RegisterSharedValidationInterface(std::make_shared( + auto shared{std::make_shared( + *m_node.validation_signals, [&] { // First call should decrements reference count 2 -> 1 - UnregisterAllValidationInterfaces(); + m_node.validation_signals->UnregisterAllValidationInterfaces(); BOOST_CHECK(!destroyed); // Second call should not decrement reference count 1 -> 0 - UnregisterAllValidationInterfaces(); + m_node.validation_signals->UnregisterAllValidationInterfaces(); BOOST_CHECK(!destroyed); }, - [&] { destroyed = true; })); - TestInterface::Call(); + [&] { destroyed = true; })}; + m_node.validation_signals->RegisterSharedValidationInterface(shared); + BOOST_CHECK(shared.use_count() == 2); + shared->Call(); + BOOST_CHECK(shared.use_count() == 1); + BOOST_CHECK(!destroyed); + shared.reset(); BOOST_CHECK(destroyed); } diff --git a/src/validation.cpp b/src/validation.cpp index b3a53a537d..7490ad012f 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -3217,7 +3217,7 @@ static void LimitValidationInterfaceQueue(CMainSignals& signals) LOCKS_EXCLUDED( AssertLockNotHeld(cs_main); if (signals.CallbacksPending() > 10) { - SyncWithValidationInterfaceQueue(); + signals.SyncWithValidationInterfaceQueue(); } } diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp index 91ab34d365..d1d562ff32 100644 --- a/src/validationinterface.cpp +++ b/src/validationinterface.cpp @@ -94,8 +94,6 @@ public: } }; -static CMainSignals g_signals; - CMainSignals::CMainSignals() {} CMainSignals::~CMainSignals() {} @@ -124,11 +122,6 @@ size_t CMainSignals::CallbacksPending() return m_internals->m_schedulerClient.CallbacksPending(); } -CMainSignals& GetMainSignals() -{ - return g_signals; -} - void CMainSignals::RegisterSharedValidationInterface(std::shared_ptr callbacks) { // Each connection captures the shared_ptr to ensure that each callback is @@ -277,33 +270,3 @@ void CMainSignals::NewPoWValidBlock(const CBlockIndex *pindex, const std::shared LOG_EVENT("%s: block hash=%s", __func__, block->GetHash().ToString()); m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.NewPoWValidBlock(pindex, block); }); } - -// These functions are temporary and will be removed in the following commit -void RegisterValidationInterface(CValidationInterface* callbacks) -{ - GetMainSignals().RegisterValidationInterface(callbacks); -} -void UnregisterValidationInterface(CValidationInterface* callbacks) -{ - GetMainSignals().UnregisterValidationInterface(callbacks); -} -void UnregisterAllValidationInterfaces() -{ - GetMainSignals().UnregisterAllValidationInterfaces(); -} -void RegisterSharedValidationInterface(std::shared_ptr callbacks) -{ - GetMainSignals().RegisterSharedValidationInterface(callbacks); -} -void UnregisterSharedValidationInterface(std::shared_ptr callbacks) -{ - GetMainSignals().UnregisterSharedValidationInterface(callbacks); -} -void CallFunctionInValidationInterfaceQueue(std::function func) -{ - GetMainSignals().CallFunctionInValidationInterfaceQueue(func); -} -void SyncWithValidationInterfaceQueue() -{ - GetMainSignals().SyncWithValidationInterfaceQueue(); -} diff --git a/src/validationinterface.h b/src/validationinterface.h index 0cd494233a..57e2716c11 100644 --- a/src/validationinterface.h +++ b/src/validationinterface.h @@ -21,22 +21,11 @@ class BlockValidationState; class CBlock; class CBlockIndex; struct CBlockLocator; -class CValidationInterface; class CScheduler; enum class MemPoolRemovalReason; struct RemovedMempoolTransactionInfo; struct NewMempoolTransactionInfo; -void RegisterValidationInterface(CValidationInterface* callbacks); -void UnregisterValidationInterface(CValidationInterface* callbacks); -void UnregisterAllValidationInterfaces(); - -void RegisterSharedValidationInterface(std::shared_ptr callbacks); -void UnregisterSharedValidationInterface(std::shared_ptr callbacks); - -void CallFunctionInValidationInterfaceQueue(std::function func); -void SyncWithValidationInterfaceQueue() LOCKS_EXCLUDED(cs_main); - /** * Implement this to subscribe to events generated in validation and mempool * @@ -232,6 +221,4 @@ public: void NewPoWValidBlock(const CBlockIndex *, const std::shared_ptr&); }; -CMainSignals& GetMainSignals(); - #endif // BITCOIN_VALIDATIONINTERFACE_H diff --git a/src/wallet/test/util.cpp b/src/wallet/test/util.cpp index cbf3ccd1ec..49d206f409 100644 --- a/src/wallet/test/util.cpp +++ b/src/wallet/test/util.cpp @@ -71,7 +71,8 @@ std::shared_ptr TestLoadWallet(WalletContext& context) void TestUnloadWallet(std::shared_ptr&& wallet) { - SyncWithValidationInterfaceQueue(); + // Calls SyncWithValidationInterfaceQueue + wallet->chain().waitForNotificationsIfTipChanged({}); wallet->m_chain_notifications_handler.reset(); UnloadWallet(std::move(wallet)); } diff --git a/src/wallet/test/wallet_tests.cpp b/src/wallet/test/wallet_tests.cpp index 65297054df..3f6629d55c 100644 --- a/src/wallet/test/wallet_tests.cpp +++ b/src/wallet/test/wallet_tests.cpp @@ -812,7 +812,7 @@ BOOST_FIXTURE_TEST_CASE(CreateWallet, TestChain100Setup) // transactionAddedToMempool notifications, and create block and mempool // transactions paying to the wallet std::promise promise; - CallFunctionInValidationInterfaceQueue([&promise] { + m_node.validation_signals->CallFunctionInValidationInterfaceQueue([&promise] { promise.get_future().wait(); }); std::string error; @@ -840,7 +840,7 @@ BOOST_FIXTURE_TEST_CASE(CreateWallet, TestChain100Setup) // Unblock notification queue and make sure stale blockConnected and // transactionAddedToMempool events are processed promise.set_value(); - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); // AddToWallet events for block_tx and mempool_tx events are counted a // second time as the notification queue is processed BOOST_CHECK_EQUAL(addtx_count, 5); @@ -863,7 +863,7 @@ BOOST_FIXTURE_TEST_CASE(CreateWallet, TestChain100Setup) m_coinbase_txns.push_back(CreateAndProcessBlock({block_tx}, GetScriptForRawPubKey(coinbaseKey.GetPubKey())).vtx[0]); mempool_tx = TestSimpleSpend(*m_coinbase_txns[3], 0, coinbaseKey, GetScriptForRawPubKey(key.GetPubKey())); BOOST_CHECK(m_node.chain->broadcastTransaction(MakeTransactionRef(mempool_tx), DEFAULT_TRANSACTION_MAXFEE, false, error)); - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); }); wallet = TestLoadWallet(context); // Since mempool transactions are requested at the end of loading, there will @@ -903,7 +903,7 @@ BOOST_FIXTURE_TEST_CASE(ZapSelectTx, TestChain100Setup) auto block_tx = TestSimpleSpend(*m_coinbase_txns[0], 0, coinbaseKey, GetScriptForRawPubKey(key.GetPubKey())); CreateAndProcessBlock({block_tx}, GetScriptForRawPubKey(coinbaseKey.GetPubKey())); - SyncWithValidationInterfaceQueue(); + m_node.validation_signals->SyncWithValidationInterfaceQueue(); { auto block_hash = block_tx.GetHash(); From 4abde2c4e3fd9b66394b79874583bdc2a9132c36 Mon Sep 17 00:00:00 2001 From: TheCharlatan Date: Mon, 27 Nov 2023 11:46:36 +0100 Subject: [PATCH 4/7] [refactor] Make MainSignals RAII styled --- src/bitcoin-chainstate.cpp | 5 +---- src/init.cpp | 21 ++++++++++----------- src/test/util/setup_common.cpp | 4 +--- src/validationinterface.cpp | 26 ++++---------------------- src/validationinterface.h | 7 ++----- 5 files changed, 18 insertions(+), 45 deletions(-) diff --git a/src/bitcoin-chainstate.cpp b/src/bitcoin-chainstate.cpp index 0514021a8e..f90ae93eca 100644 --- a/src/bitcoin-chainstate.cpp +++ b/src/bitcoin-chainstate.cpp @@ -74,13 +74,11 @@ int main(int argc, char* argv[]) // Start the lightweight task scheduler thread scheduler.m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { scheduler.serviceQueue(); }); - CMainSignals validation_signals{}; + CMainSignals validation_signals{scheduler}; // Gather some entropy once per minute. scheduler.scheduleEvery(RandAddPeriodic, std::chrono::minutes{1}); - validation_signals.RegisterBackgroundSignalScheduler(scheduler); - class KernelNotifications : public kernel::Notifications { public: @@ -303,5 +301,4 @@ epilogue: } } } - validation_signals.UnregisterBackgroundSignalScheduler(); } diff --git a/src/init.cpp b/src/init.cpp index a5eeef589a..a90f868977 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -377,7 +377,6 @@ void Shutdown(NodeContext& node) node.chain_clients.clear(); if (node.validation_signals) { node.validation_signals->UnregisterAllValidationInterfaces(); - node.validation_signals->UnregisterBackgroundSignalScheduler(); } node.mempool.reset(); node.fee_estimator.reset(); @@ -1143,17 +1142,18 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) assert(!node.scheduler); node.scheduler = std::make_unique(); + auto& scheduler = *node.scheduler; // Start the lightweight task scheduler thread - node.scheduler->m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { node.scheduler->serviceQueue(); }); + scheduler.m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { scheduler.serviceQueue(); }); // Gather some entropy once per minute. - node.scheduler->scheduleEvery([]{ + scheduler.scheduleEvery([]{ RandAddPeriodic(); }, std::chrono::minutes{1}); // Check disk space every 5 minutes to avoid db corruption. - node.scheduler->scheduleEvery([&args, &node]{ + scheduler.scheduleEvery([&args, &node]{ constexpr uint64_t min_disk_space = 50 << 20; // 50 MB if (!CheckDiskSpace(args.GetBlocksDirPath(), min_disk_space)) { LogPrintf("Shutting down due to lack of disk space!\n"); @@ -1164,9 +1164,8 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) }, std::chrono::minutes{5}); assert(!node.validation_signals); - node.validation_signals = std::make_unique(); + node.validation_signals = std::make_unique(scheduler); auto& validation_signals = *node.validation_signals; - validation_signals.RegisterBackgroundSignalScheduler(*node.scheduler); // Create client interfaces for wallets that are supposed to be loaded // according to -wallet and -disablewallet options. This only constructs @@ -1271,7 +1270,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) // Flush estimates to disk periodically CBlockPolicyEstimator* fee_estimator = node.fee_estimator.get(); - node.scheduler->scheduleEvery([fee_estimator] { fee_estimator->FlushFeeEstimates(); }, FEE_FLUSH_INTERVAL); + scheduler.scheduleEvery([fee_estimator] { fee_estimator->FlushFeeEstimates(); }, FEE_FLUSH_INTERVAL); validation_signals.RegisterValidationInterface(fee_estimator); } @@ -1910,7 +1909,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) connOptions.m_i2p_accept_incoming = args.GetBoolArg("-i2pacceptincoming", DEFAULT_I2P_ACCEPT_INCOMING); - if (!node.connman->Start(*node.scheduler, connOptions)) { + if (!node.connman->Start(scheduler, connOptions)) { return false; } @@ -1930,15 +1929,15 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) uiInterface.InitMessage(_("Done loading").translated); for (const auto& client : node.chain_clients) { - client->start(*node.scheduler); + client->start(scheduler); } BanMan* banman = node.banman.get(); - node.scheduler->scheduleEvery([banman]{ + scheduler.scheduleEvery([banman]{ banman->DumpBanlist(); }, DUMP_BANS_INTERVAL); - if (node.peerman) node.peerman->StartScheduledTasks(*node.scheduler); + if (node.peerman) node.peerman->StartScheduledTasks(scheduler); #if HAVE_SYSTEM StartupNotify(args); diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp index aad26f2013..8e9e771b53 100644 --- a/src/test/util/setup_common.cpp +++ b/src/test/util/setup_common.cpp @@ -171,8 +171,7 @@ ChainTestingSetup::ChainTestingSetup(const ChainType chainType, const std::vecto // from blocking due to queue overrun. m_node.scheduler = std::make_unique(); m_node.scheduler->m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { m_node.scheduler->serviceQueue(); }); - m_node.validation_signals = std::make_unique(); - m_node.validation_signals->RegisterBackgroundSignalScheduler(*m_node.scheduler); + m_node.validation_signals = std::make_unique(*m_node.scheduler); m_node.fee_estimator = std::make_unique(FeeestPath(*m_node.args), DEFAULT_ACCEPT_STALE_FEE_ESTIMATES); m_node.mempool = std::make_unique(MemPoolOptionsForTest(m_node)); @@ -205,7 +204,6 @@ ChainTestingSetup::~ChainTestingSetup() { if (m_node.scheduler) m_node.scheduler->stop(); m_node.validation_signals->FlushBackgroundCallbacks(); - m_node.validation_signals->UnregisterBackgroundSignalScheduler(); m_node.connman.reset(); m_node.banman.reset(); m_node.addrman.reset(); diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp index d1d562ff32..c61473be0a 100644 --- a/src/validationinterface.cpp +++ b/src/validationinterface.cpp @@ -94,31 +94,18 @@ public: } }; -CMainSignals::CMainSignals() {} +CMainSignals::CMainSignals(CScheduler& scheduler) + : m_internals{std::make_unique(scheduler)} {} CMainSignals::~CMainSignals() {} -void CMainSignals::RegisterBackgroundSignalScheduler(CScheduler& scheduler) -{ - assert(!m_internals); - m_internals = std::make_unique(scheduler); -} - -void CMainSignals::UnregisterBackgroundSignalScheduler() -{ - m_internals.reset(nullptr); -} - void CMainSignals::FlushBackgroundCallbacks() { - if (m_internals) { - m_internals->m_schedulerClient.EmptyQueue(); - } + m_internals->m_schedulerClient.EmptyQueue(); } size_t CMainSignals::CallbacksPending() { - if (!m_internals) return 0; return m_internals->m_schedulerClient.CallbacksPending(); } @@ -143,16 +130,11 @@ void CMainSignals::UnregisterSharedValidationInterface(std::shared_ptrUnregister(callbacks); - } + m_internals->Unregister(callbacks); } void CMainSignals::UnregisterAllValidationInterfaces() { - if (!m_internals) { - return; - } m_internals->Clear(); } diff --git a/src/validationinterface.h b/src/validationinterface.h index 57e2716c11..fa63570c1f 100644 --- a/src/validationinterface.h +++ b/src/validationinterface.h @@ -160,13 +160,10 @@ private: std::unique_ptr m_internals; public: - CMainSignals(); + CMainSignals(CScheduler& scheduler LIFETIMEBOUND); + ~CMainSignals(); - /** Register a CScheduler to give callbacks which should run in the background (may only be called once) */ - void RegisterBackgroundSignalScheduler(CScheduler& scheduler); - /** Unregister a CScheduler to give callbacks which should run in the background - these callbacks will now be dropped! */ - void UnregisterBackgroundSignalScheduler(); /** Call any remaining callbacks on the calling thread */ void FlushBackgroundCallbacks(); From 0d6d2b650d1017691f48c9109a6cd020ab46aa73 Mon Sep 17 00:00:00 2001 From: TheCharlatan Date: Sat, 20 Jan 2024 09:13:35 +0100 Subject: [PATCH 5/7] scripted-diff: Rename SingleThreadedSchedulerClient to SerialTaskRunner -BEGIN VERIFY SCRIPT- s() { git grep -l "$1" src | (grep -v "$3" || cat;) | xargs sed -i "s/$1/$2/g"; } s 'SingleThreadedSchedulerClient' 'SerialTaskRunner' '' s 'SinglethreadedSchedulerClient' 'SerialTaskRunner' '' s 'm_schedulerClient' 'm_task_runner' '' s 'AddToProcessQueue' 'insert' '' s 'EmptyQueue' 'flush' '' s 'CallbacksPending' 'size' 'validation' sed -i '109s/CallbacksPending/size/' src/validationinterface.cpp -END VERIFY SCRIPT- Co-authored-by: Russell Yanofsky --- src/scheduler.cpp | 14 +++++++------- src/scheduler.h | 10 +++++----- src/test/scheduler_tests.cpp | 10 +++++----- src/validationinterface.cpp | 12 ++++++------ 4 files changed, 23 insertions(+), 23 deletions(-) diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 6c6f644142..a1158e0c07 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -129,7 +129,7 @@ bool CScheduler::AreThreadsServicingQueue() const } -void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() +void SerialTaskRunner::MaybeScheduleProcessQueue() { { LOCK(m_callbacks_mutex); @@ -142,7 +142,7 @@ void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() m_scheduler.schedule([this] { this->ProcessQueue(); }, std::chrono::steady_clock::now()); } -void SingleThreadedSchedulerClient::ProcessQueue() +void SerialTaskRunner::ProcessQueue() { std::function callback; { @@ -158,8 +158,8 @@ void SingleThreadedSchedulerClient::ProcessQueue() // RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue // to ensure both happen safely even if callback() throws. struct RAIICallbacksRunning { - SingleThreadedSchedulerClient* instance; - explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {} + SerialTaskRunner* instance; + explicit RAIICallbacksRunning(SerialTaskRunner* _instance) : instance(_instance) {} ~RAIICallbacksRunning() { { @@ -173,7 +173,7 @@ void SingleThreadedSchedulerClient::ProcessQueue() callback(); } -void SingleThreadedSchedulerClient::AddToProcessQueue(std::function func) +void SerialTaskRunner::insert(std::function func) { { LOCK(m_callbacks_mutex); @@ -182,7 +182,7 @@ void SingleThreadedSchedulerClient::AddToProcessQueue(std::function func MaybeScheduleProcessQueue(); } -void SingleThreadedSchedulerClient::EmptyQueue() +void SerialTaskRunner::flush() { assert(!m_scheduler.AreThreadsServicingQueue()); bool should_continue = true; @@ -193,7 +193,7 @@ void SingleThreadedSchedulerClient::EmptyQueue() } } -size_t SingleThreadedSchedulerClient::CallbacksPending() +size_t SerialTaskRunner::size() { LOCK(m_callbacks_mutex); return m_callbacks_pending.size(); diff --git a/src/scheduler.h b/src/scheduler.h index 9212582b97..940ce08ecf 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -120,7 +120,7 @@ private: * B() will be able to observe all of the effects of callback A() which executed * before it. */ -class SingleThreadedSchedulerClient +class SerialTaskRunner { private: CScheduler& m_scheduler; @@ -133,7 +133,7 @@ private: void ProcessQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); public: - explicit SingleThreadedSchedulerClient(CScheduler& scheduler LIFETIMEBOUND) : m_scheduler{scheduler} {} + explicit SerialTaskRunner(CScheduler& scheduler LIFETIMEBOUND) : m_scheduler{scheduler} {} /** * Add a callback to be executed. Callbacks are executed serially @@ -141,15 +141,15 @@ public: * Practically, this means that callbacks can behave as if they are executed * in order by a single thread. */ - void AddToProcessQueue(std::function func) EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); + void insert(std::function func) EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); /** * Processes all remaining queue members on the calling thread, blocking until queue is empty * Must be called after the CScheduler has no remaining processing threads! */ - void EmptyQueue() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); + void flush() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); - size_t CallbacksPending() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); + size_t size() EXCLUSIVE_LOCKS_REQUIRED(!m_callbacks_mutex); }; #endif // BITCOIN_SCHEDULER_H diff --git a/src/test/scheduler_tests.cpp b/src/test/scheduler_tests.cpp index 3fb3378598..95e725de46 100644 --- a/src/test/scheduler_tests.cpp +++ b/src/test/scheduler_tests.cpp @@ -129,8 +129,8 @@ BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered) CScheduler scheduler; // each queue should be well ordered with respect to itself but not other queues - SingleThreadedSchedulerClient queue1(scheduler); - SingleThreadedSchedulerClient queue2(scheduler); + SerialTaskRunner queue1(scheduler); + SerialTaskRunner queue2(scheduler); // create more threads than queues // if the queues only permit execution of one task at once then @@ -142,7 +142,7 @@ BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered) threads.emplace_back([&] { scheduler.serviceQueue(); }); } - // these are not atomic, if SinglethreadedSchedulerClient prevents + // these are not atomic, if SerialTaskRunner prevents // parallel execution at the queue level no synchronization should be required here int counter1 = 0; int counter2 = 0; @@ -150,12 +150,12 @@ BOOST_AUTO_TEST_CASE(singlethreadedscheduler_ordered) // just simply count up on each queue - if execution is properly ordered then // the callbacks should run in exactly the order in which they were enqueued for (int i = 0; i < 100; ++i) { - queue1.AddToProcessQueue([i, &counter1]() { + queue1.insert([i, &counter1]() { bool expectation = i == counter1++; assert(expectation); }); - queue2.AddToProcessQueue([i, &counter2]() { + queue2.insert([i, &counter2]() { bool expectation = i == counter2++; assert(expectation); }); diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp index c61473be0a..4ab66336da 100644 --- a/src/validationinterface.cpp +++ b/src/validationinterface.cpp @@ -45,9 +45,9 @@ public: // We are not allowed to assume the scheduler only runs in one thread, // but must ensure all callbacks happen in-order, so we end up creating // our own queue here :( - SingleThreadedSchedulerClient m_schedulerClient; + SerialTaskRunner m_task_runner; - explicit MainSignalsImpl(CScheduler& scheduler LIFETIMEBOUND) : m_schedulerClient(scheduler) {} + explicit MainSignalsImpl(CScheduler& scheduler LIFETIMEBOUND) : m_task_runner(scheduler) {} void Register(std::shared_ptr callbacks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) { @@ -101,12 +101,12 @@ CMainSignals::~CMainSignals() {} void CMainSignals::FlushBackgroundCallbacks() { - m_internals->m_schedulerClient.EmptyQueue(); + m_internals->m_task_runner.flush(); } size_t CMainSignals::CallbacksPending() { - return m_internals->m_schedulerClient.CallbacksPending(); + return m_internals->m_task_runner.size(); } void CMainSignals::RegisterSharedValidationInterface(std::shared_ptr callbacks) @@ -140,7 +140,7 @@ void CMainSignals::UnregisterAllValidationInterfaces() void CMainSignals::CallFunctionInValidationInterfaceQueue(std::function func) { - m_internals->m_schedulerClient.AddToProcessQueue(std::move(func)); + m_internals->m_task_runner.insert(std::move(func)); } void CMainSignals::SyncWithValidationInterfaceQueue() @@ -162,7 +162,7 @@ void CMainSignals::SyncWithValidationInterfaceQueue() do { \ auto local_name = (name); \ LOG_EVENT("Enqueuing " fmt, local_name, __VA_ARGS__); \ - m_internals->m_schedulerClient.AddToProcessQueue([=] { \ + m_internals->m_task_runner.insert([=] { \ LOG_EVENT(fmt, local_name, __VA_ARGS__); \ event(); \ }); \ From 06069b3913dda048f5d640a662b0852f86346ace Mon Sep 17 00:00:00 2001 From: TheCharlatan Date: Wed, 24 Jan 2024 14:12:33 +0100 Subject: [PATCH 6/7] scripted-diff: Rename MainSignals to ValidationSignals -BEGIN VERIFY SCRIPT- s() { git grep -l "$1" src | xargs sed -i "s/$1/$2/g"; } s 'CMainSignals' 'ValidationSignals' s 'MainSignalsImpl' 'ValidationSignalsImpl' -END VERIFY SCRIPT- --- src/bitcoin-chainstate.cpp | 2 +- src/init.cpp | 2 +- src/kernel/chainstatemanager_opts.h | 4 +-- src/kernel/mempool_options.h | 4 +-- src/node/context.h | 4 +-- src/node/interfaces.cpp | 6 ++-- src/test/util/setup_common.cpp | 2 +- src/test/validationinterface_tests.cpp | 4 +-- src/txmempool.h | 4 +-- src/validation.cpp | 2 +- src/validationinterface.cpp | 48 +++++++++++++------------- src/validationinterface.h | 12 +++---- 12 files changed, 47 insertions(+), 47 deletions(-) diff --git a/src/bitcoin-chainstate.cpp b/src/bitcoin-chainstate.cpp index f90ae93eca..c4ff4f8ff7 100644 --- a/src/bitcoin-chainstate.cpp +++ b/src/bitcoin-chainstate.cpp @@ -74,7 +74,7 @@ int main(int argc, char* argv[]) // Start the lightweight task scheduler thread scheduler.m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { scheduler.serviceQueue(); }); - CMainSignals validation_signals{scheduler}; + ValidationSignals validation_signals{scheduler}; // Gather some entropy once per minute. scheduler.scheduleEvery(RandAddPeriodic, std::chrono::minutes{1}); diff --git a/src/init.cpp b/src/init.cpp index a90f868977..3db22e71f9 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -1164,7 +1164,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) }, std::chrono::minutes{5}); assert(!node.validation_signals); - node.validation_signals = std::make_unique(scheduler); + node.validation_signals = std::make_unique(scheduler); auto& validation_signals = *node.validation_signals; // Create client interfaces for wallets that are supposed to be loaded diff --git a/src/kernel/chainstatemanager_opts.h b/src/kernel/chainstatemanager_opts.h index 766a812d91..de5f78494a 100644 --- a/src/kernel/chainstatemanager_opts.h +++ b/src/kernel/chainstatemanager_opts.h @@ -18,7 +18,7 @@ #include class CChainParams; -class CMainSignals; +class ValidationSignals; static constexpr bool DEFAULT_CHECKPOINTS_ENABLED{true}; static constexpr auto DEFAULT_MAX_TIP_AGE{24h}; @@ -45,7 +45,7 @@ struct ChainstateManagerOpts { DBOptions coins_db{}; CoinsViewOptions coins_view{}; Notifications& notifications; - CMainSignals* signals{nullptr}; + ValidationSignals* signals{nullptr}; //! Number of script check worker threads. Zero means no parallel verification. int worker_threads_num{0}; }; diff --git a/src/kernel/mempool_options.h b/src/kernel/mempool_options.h index 1f69df95ff..0850b2e60e 100644 --- a/src/kernel/mempool_options.h +++ b/src/kernel/mempool_options.h @@ -13,7 +13,7 @@ #include #include -class CMainSignals; +class ValidationSignals; /** Default for -maxmempool, maximum megabytes of mempool memory usage */ static constexpr unsigned int DEFAULT_MAX_MEMPOOL_SIZE_MB{300}; @@ -59,7 +59,7 @@ struct MemPoolOptions { bool persist_v1_dat{DEFAULT_PERSIST_V1_DAT}; MemPoolLimits limits{}; - CMainSignals* signals{nullptr}; + ValidationSignals* signals{nullptr}; }; } // namespace kernel diff --git a/src/node/context.h b/src/node/context.h index 189e5e6ae4..245f230aec 100644 --- a/src/node/context.h +++ b/src/node/context.h @@ -20,7 +20,7 @@ class BanMan; class BaseIndex; class CBlockPolicyEstimator; class CConnman; -class CMainSignals; +class ValidationSignals; class CScheduler; class CTxMemPool; class ChainstateManager; @@ -74,7 +74,7 @@ struct NodeContext { //! Issues blocking calls about sync status, errors and warnings std::unique_ptr notifications; //! Issues calls about blocks and transactions - std::unique_ptr validation_signals; + std::unique_ptr validation_signals; std::atomic exit_status{EXIT_SUCCESS}; //! Declare default constructor and destructor that are not inline, so code diff --git a/src/node/interfaces.cpp b/src/node/interfaces.cpp index 1814834f35..f9a372e3de 100644 --- a/src/node/interfaces.cpp +++ b/src/node/interfaces.cpp @@ -460,7 +460,7 @@ public: class NotificationsHandlerImpl : public Handler { public: - explicit NotificationsHandlerImpl(CMainSignals& signals, std::shared_ptr notifications) + explicit NotificationsHandlerImpl(ValidationSignals& signals, std::shared_ptr notifications) : m_signals{signals}, m_proxy{std::make_shared(std::move(notifications))} { m_signals.RegisterSharedValidationInterface(m_proxy); @@ -473,7 +473,7 @@ public: m_proxy.reset(); } } - CMainSignals& m_signals; + ValidationSignals& m_signals; std::shared_ptr m_proxy; }; @@ -823,7 +823,7 @@ public: NodeContext* context() override { return &m_node; } ArgsManager& args() { return *Assert(m_node.args); } ChainstateManager& chainman() { return *Assert(m_node.chainman); } - CMainSignals& validation_signals() { return *Assert(m_node.validation_signals); } + ValidationSignals& validation_signals() { return *Assert(m_node.validation_signals); } NodeContext& m_node; }; } // namespace diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp index 8e9e771b53..d93edbc332 100644 --- a/src/test/util/setup_common.cpp +++ b/src/test/util/setup_common.cpp @@ -171,7 +171,7 @@ ChainTestingSetup::ChainTestingSetup(const ChainType chainType, const std::vecto // from blocking due to queue overrun. m_node.scheduler = std::make_unique(); m_node.scheduler->m_service_thread = std::thread(util::TraceThread, "scheduler", [&] { m_node.scheduler->serviceQueue(); }); - m_node.validation_signals = std::make_unique(*m_node.scheduler); + m_node.validation_signals = std::make_unique(*m_node.scheduler); m_node.fee_estimator = std::make_unique(FeeestPath(*m_node.args), DEFAULT_ACCEPT_STALE_FEE_ESTIMATES); m_node.mempool = std::make_unique(MemPoolOptionsForTest(m_node)); diff --git a/src/test/validationinterface_tests.cpp b/src/test/validationinterface_tests.cpp index 1e883c4f20..a46cfc3029 100644 --- a/src/test/validationinterface_tests.cpp +++ b/src/test/validationinterface_tests.cpp @@ -52,7 +52,7 @@ BOOST_AUTO_TEST_CASE(unregister_validation_interface_race) class TestInterface : public CValidationInterface { public: - TestInterface(CMainSignals& signals, std::function on_call = nullptr, std::function on_destroy = nullptr) + TestInterface(ValidationSignals& signals, std::function on_call = nullptr, std::function on_destroy = nullptr) : m_on_call(std::move(on_call)), m_on_destroy(std::move(on_destroy)), m_signals{signals} { } @@ -72,7 +72,7 @@ public: } std::function m_on_call; std::function m_on_destroy; - CMainSignals& m_signals; + ValidationSignals& m_signals; }; // Regression test to ensure UnregisterAllValidationInterfaces calls don't diff --git a/src/txmempool.h b/src/txmempool.h index 5e0f22229b..32f2c024f7 100644 --- a/src/txmempool.h +++ b/src/txmempool.h @@ -40,7 +40,7 @@ #include class CChain; -class CMainSignals; +class ValidationSignals; /** Fake height value used in Coin to signify they are only in the memory pool (since 0.8) */ static const uint32_t MEMPOOL_HEIGHT = 0x7FFFFFFF; @@ -448,7 +448,7 @@ public: const Limits m_limits; - CMainSignals* const m_signals; + ValidationSignals* const m_signals; /** Create a new CTxMemPool. * Sanity checks will be off by default for performance, because otherwise diff --git a/src/validation.cpp b/src/validation.cpp index 7490ad012f..9def734fcc 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -3213,7 +3213,7 @@ static bool NotifyHeaderTip(ChainstateManager& chainman) LOCKS_EXCLUDED(cs_main) return fNotify; } -static void LimitValidationInterfaceQueue(CMainSignals& signals) LOCKS_EXCLUDED(cs_main) { +static void LimitValidationInterfaceQueue(ValidationSignals& signals) LOCKS_EXCLUDED(cs_main) { AssertLockNotHeld(cs_main); if (signals.CallbacksPending() > 10) { diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp index 4ab66336da..25b1e561dc 100644 --- a/src/validationinterface.cpp +++ b/src/validationinterface.cpp @@ -22,14 +22,14 @@ std::string RemovalReasonToString(const MemPoolRemovalReason& r) noexcept; /** - * MainSignalsImpl manages a list of shared_ptr callbacks. + * ValidationSignalsImpl manages a list of shared_ptr callbacks. * * A std::unordered_map is used to track what callbacks are currently * registered, and a std::list is used to store the callbacks that are * currently registered as well as any callbacks that are just unregistered * and about to be deleted when they are done executing. */ -class MainSignalsImpl +class ValidationSignalsImpl { private: Mutex m_mutex; @@ -47,7 +47,7 @@ public: // our own queue here :( SerialTaskRunner m_task_runner; - explicit MainSignalsImpl(CScheduler& scheduler LIFETIMEBOUND) : m_task_runner(scheduler) {} + explicit ValidationSignalsImpl(CScheduler& scheduler LIFETIMEBOUND) : m_task_runner(scheduler) {} void Register(std::shared_ptr callbacks) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) { @@ -94,56 +94,56 @@ public: } }; -CMainSignals::CMainSignals(CScheduler& scheduler) - : m_internals{std::make_unique(scheduler)} {} +ValidationSignals::ValidationSignals(CScheduler& scheduler) + : m_internals{std::make_unique(scheduler)} {} -CMainSignals::~CMainSignals() {} +ValidationSignals::~ValidationSignals() {} -void CMainSignals::FlushBackgroundCallbacks() +void ValidationSignals::FlushBackgroundCallbacks() { m_internals->m_task_runner.flush(); } -size_t CMainSignals::CallbacksPending() +size_t ValidationSignals::CallbacksPending() { return m_internals->m_task_runner.size(); } -void CMainSignals::RegisterSharedValidationInterface(std::shared_ptr callbacks) +void ValidationSignals::RegisterSharedValidationInterface(std::shared_ptr callbacks) { // Each connection captures the shared_ptr to ensure that each callback is // executed before the subscriber is destroyed. For more details see #18338. m_internals->Register(std::move(callbacks)); } -void CMainSignals::RegisterValidationInterface(CValidationInterface* callbacks) +void ValidationSignals::RegisterValidationInterface(CValidationInterface* callbacks) { // Create a shared_ptr with a no-op deleter - CValidationInterface lifecycle // is managed by the caller. RegisterSharedValidationInterface({callbacks, [](CValidationInterface*){}}); } -void CMainSignals::UnregisterSharedValidationInterface(std::shared_ptr callbacks) +void ValidationSignals::UnregisterSharedValidationInterface(std::shared_ptr callbacks) { UnregisterValidationInterface(callbacks.get()); } -void CMainSignals::UnregisterValidationInterface(CValidationInterface* callbacks) +void ValidationSignals::UnregisterValidationInterface(CValidationInterface* callbacks) { m_internals->Unregister(callbacks); } -void CMainSignals::UnregisterAllValidationInterfaces() +void ValidationSignals::UnregisterAllValidationInterfaces() { m_internals->Clear(); } -void CMainSignals::CallFunctionInValidationInterfaceQueue(std::function func) +void ValidationSignals::CallFunctionInValidationInterfaceQueue(std::function func) { m_internals->m_task_runner.insert(std::move(func)); } -void CMainSignals::SyncWithValidationInterfaceQueue() +void ValidationSignals::SyncWithValidationInterfaceQueue() { AssertLockNotHeld(cs_main); // Block until the validation queue drains @@ -171,7 +171,7 @@ void CMainSignals::SyncWithValidationInterfaceQueue() #define LOG_EVENT(fmt, ...) \ LogPrint(BCLog::VALIDATION, fmt "\n", __VA_ARGS__) -void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) { +void ValidationSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) { // Dependencies exist that require UpdatedBlockTip events to be delivered in the order in which // the chain actually updates. One way to ensure this is for the caller to invoke this signal // in the same critical section where the chain is updated @@ -185,7 +185,7 @@ void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockInd fInitialDownload); } -void CMainSignals::TransactionAddedToMempool(const NewMempoolTransactionInfo& tx, uint64_t mempool_sequence) +void ValidationSignals::TransactionAddedToMempool(const NewMempoolTransactionInfo& tx, uint64_t mempool_sequence) { auto event = [tx, mempool_sequence, this] { m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(tx, mempool_sequence); }); @@ -195,7 +195,7 @@ void CMainSignals::TransactionAddedToMempool(const NewMempoolTransactionInfo& tx tx.info.m_tx->GetWitnessHash().ToString()); } -void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) { +void ValidationSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) { auto event = [tx, reason, mempool_sequence, this] { m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionRemovedFromMempool(tx, reason, mempool_sequence); }); }; @@ -205,7 +205,7 @@ void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemP RemovalReasonToString(reason)); } -void CMainSignals::BlockConnected(ChainstateRole role, const std::shared_ptr &pblock, const CBlockIndex *pindex) { +void ValidationSignals::BlockConnected(ChainstateRole role, const std::shared_ptr &pblock, const CBlockIndex *pindex) { auto event = [role, pblock, pindex, this] { m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.BlockConnected(role, pblock, pindex); }); }; @@ -214,7 +214,7 @@ void CMainSignals::BlockConnected(ChainstateRole role, const std::shared_ptrnHeight); } -void CMainSignals::MempoolTransactionsRemovedForBlock(const std::vector& txs_removed_for_block, unsigned int nBlockHeight) +void ValidationSignals::MempoolTransactionsRemovedForBlock(const std::vector& txs_removed_for_block, unsigned int nBlockHeight) { auto event = [txs_removed_for_block, nBlockHeight, this] { m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.MempoolTransactionsRemovedForBlock(txs_removed_for_block, nBlockHeight); }); @@ -224,7 +224,7 @@ void CMainSignals::MempoolTransactionsRemovedForBlock(const std::vector& pblock, const CBlockIndex* pindex) +void ValidationSignals::BlockDisconnected(const std::shared_ptr& pblock, const CBlockIndex* pindex) { auto event = [pblock, pindex, this] { m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.BlockDisconnected(pblock, pindex); }); @@ -234,7 +234,7 @@ void CMainSignals::BlockDisconnected(const std::shared_ptr& pblock pindex->nHeight); } -void CMainSignals::ChainStateFlushed(ChainstateRole role, const CBlockLocator &locator) { +void ValidationSignals::ChainStateFlushed(ChainstateRole role, const CBlockLocator &locator) { auto event = [role, locator, this] { m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.ChainStateFlushed(role, locator); }); }; @@ -242,13 +242,13 @@ void CMainSignals::ChainStateFlushed(ChainstateRole role, const CBlockLocator &l locator.IsNull() ? "null" : locator.vHave.front().ToString()); } -void CMainSignals::BlockChecked(const CBlock& block, const BlockValidationState& state) { +void ValidationSignals::BlockChecked(const CBlock& block, const BlockValidationState& state) { LOG_EVENT("%s: block hash=%s state=%s", __func__, block.GetHash().ToString(), state.ToString()); m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.BlockChecked(block, state); }); } -void CMainSignals::NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr &block) { +void ValidationSignals::NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr &block) { LOG_EVENT("%s: block hash=%s", __func__, block->GetHash().ToString()); m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.NewPoWValidBlock(pindex, block); }); } diff --git a/src/validationinterface.h b/src/validationinterface.h index fa63570c1f..4bd46da2df 100644 --- a/src/validationinterface.h +++ b/src/validationinterface.h @@ -150,19 +150,19 @@ protected: * has been received and connected to the headers tree, though not validated yet. */ virtual void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr& block) {}; - friend class CMainSignals; + friend class ValidationSignals; friend class ValidationInterfaceTest; }; -class MainSignalsImpl; -class CMainSignals { +class ValidationSignalsImpl; +class ValidationSignals { private: - std::unique_ptr m_internals; + std::unique_ptr m_internals; public: - CMainSignals(CScheduler& scheduler LIFETIMEBOUND); + ValidationSignals(CScheduler& scheduler LIFETIMEBOUND); - ~CMainSignals(); + ~ValidationSignals(); /** Call any remaining callbacks on the calling thread */ void FlushBackgroundCallbacks(); From d5228efb5391b31a9a0673019e43e7fa2cd4ac07 Mon Sep 17 00:00:00 2001 From: TheCharlatan Date: Mon, 27 Nov 2023 17:15:11 +0100 Subject: [PATCH 7/7] kernel: Remove dependency on CScheduler By defining a virtual interface class for the scheduler client, users of the kernel can now define their own event consuming infrastructure, without having to spawn threads or rely on the scheduler design. Removing CScheduler also allows removing the thread and exception modules from the kernel library. --- src/Makefile.am | 4 +-- src/bitcoin-chainstate.cpp | 15 ++-------- src/init.cpp | 2 +- src/scheduler.h | 13 ++++++--- src/test/util/setup_common.cpp | 2 +- src/util/task_runner.h | 52 ++++++++++++++++++++++++++++++++++ src/validationinterface.cpp | 24 +++++++--------- src/validationinterface.h | 10 +++++-- 8 files changed, 85 insertions(+), 37 deletions(-) create mode 100644 src/util/task_runner.h diff --git a/src/Makefile.am b/src/Makefile.am index 3e8870c828..0de19af613 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -326,6 +326,7 @@ BITCOIN_CORE_H = \ util/spanparsing.h \ util/string.h \ util/syserror.h \ + util/task_runner.h \ util/thread.h \ util/threadinterrupt.h \ util/threadnames.h \ @@ -975,7 +976,6 @@ libbitcoinkernel_la_SOURCES = \ pubkey.cpp \ random.cpp \ randomenv.cpp \ - scheduler.cpp \ script/interpreter.cpp \ script/script.cpp \ script/script_error.cpp \ @@ -992,7 +992,6 @@ libbitcoinkernel_la_SOURCES = \ util/batchpriority.cpp \ util/chaintype.cpp \ util/check.cpp \ - util/exception.cpp \ util/fs.cpp \ util/fs_helpers.cpp \ util/hasher.cpp \ @@ -1003,7 +1002,6 @@ libbitcoinkernel_la_SOURCES = \ util/strencodings.cpp \ util/string.cpp \ util/syserror.cpp \ - util/thread.cpp \ util/threadnames.cpp \ util/time.cpp \ util/tokenpipe.cpp \ diff --git a/src/bitcoin-chainstate.cpp b/src/bitcoin-chainstate.cpp index c4ff4f8ff7..3eb64aa344 100644 --- a/src/bitcoin-chainstate.cpp +++ b/src/bitcoin-chainstate.cpp @@ -23,11 +23,10 @@ #include #include #include -#include #include