Merge 26375 via zmq_optimise_duplread-26+k

This commit is contained in:
Luke Dashjr 2024-03-25 17:26:53 +00:00
commit 05f22484c3
14 changed files with 70 additions and 37 deletions

View File

@ -1428,6 +1428,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
});
if (g_zmq_notification_interface) {
CValidationInterface::any_use_tip_block_cache = true;
RegisterValidationInterface(g_zmq_notification_interface.get());
}
#endif

View File

@ -488,7 +488,7 @@ public:
EXCLUSIVE_LOCKS_REQUIRED(!m_recent_confirmed_transactions_mutex);
void BlockDisconnected(const std::shared_ptr<const CBlock> &block, const CBlockIndex* pindex) override
EXCLUSIVE_LOCKS_REQUIRED(!m_recent_confirmed_transactions_mutex);
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload, const std::shared_ptr<const CBlock>& block) override
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
void BlockChecked(const CBlock& block, const BlockValidationState& state) override
EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex);
@ -2052,7 +2052,7 @@ void PeerManagerImpl::NewPoWValidBlock(const CBlockIndex *pindex, const std::sha
* Update our best height and announce any block hashes which weren't previously
* in m_chainman.ActiveChain() to our peers.
*/
void PeerManagerImpl::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
void PeerManagerImpl::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload, const std::shared_ptr<const CBlock>& block)
{
SetBestHeight(pindexNew->nHeight);
SetServiceFlagsIBDCache(!fInitialDownload);

View File

@ -442,7 +442,7 @@ public:
{
m_notifications->blockDisconnected(kernel::MakeBlockInfo(index, block.get()));
}
void UpdatedBlockTip(const CBlockIndex* index, const CBlockIndex* fork_index, bool is_ibd) override
void UpdatedBlockTip(const CBlockIndex* index, const CBlockIndex* fork_index, bool is_ibd, const std::shared_ptr<const CBlock>& block) override
{
m_notifications->updatedBlockTip();
}

View File

@ -38,9 +38,14 @@ struct TestSubscriber final : public CValidationInterface {
explicit TestSubscriber(uint256 tip) : m_expected_tip(tip) {}
void UpdatedBlockTip(const CBlockIndex* pindexNew, const CBlockIndex* pindexFork, bool fInitialDownload) override
void UpdatedBlockTip(const CBlockIndex* pindexNew, const CBlockIndex* pindexFork, bool fInitialDownload, const std::shared_ptr<const CBlock>& block) override
{
BOOST_CHECK_EQUAL(m_expected_tip, pindexNew->GetBlockHash());
if (block) {
BOOST_CHECK_EQUAL(m_expected_tip, block->GetHash());
} else {
CValidationInterface::any_use_tip_block_cache = true;
}
}
void BlockConnected(ChainstateRole role, const std::shared_ptr<const CBlock>& block, const CBlockIndex* pindex) override
@ -148,6 +153,8 @@ void MinerTestingSetup::BuildChain(const uint256& root, int height, const unsign
BOOST_AUTO_TEST_CASE(processnewblock_signals_ordering)
{
Assert(!CValidationInterface::any_use_tip_block_cache);
// build a large-ish chain that's likely to have some forks
std::vector<std::shared_ptr<const CBlock>> blocks;
while (blocks.size() < 50) {
@ -200,6 +207,8 @@ BOOST_AUTO_TEST_CASE(processnewblock_signals_ordering)
UnregisterSharedValidationInterface(sub);
CValidationInterface::any_use_tip_block_cache = false;
LOCK(cs_main);
BOOST_CHECK_EQUAL(sub->m_expected_tip, m_node.chainman->ActiveChain().Tip()->GetBlockHash());
}

View File

@ -3203,6 +3203,7 @@ bool Chainstate::ActivateBestChain(BlockValidationState& state, std::shared_ptr<
CBlockIndex *pindexMostWork = nullptr;
CBlockIndex *pindexNewTip = nullptr;
std::shared_ptr<const CBlock> new_tip_block{nullptr};
bool exited_ibd{false};
do {
// Block until the validation queue drains. This should largely
@ -3246,11 +3247,19 @@ bool Chainstate::ActivateBestChain(BlockValidationState& state, std::shared_ptr<
// Wipe cache, we may need another branch now.
pindexMostWork = nullptr;
}
pindexNewTip = m_chain.Tip();
auto new_tip = m_chain.Tip();
if (pindexNewTip != new_tip) {
pindexNewTip = new_tip;
new_tip_block = nullptr;
}
for (const PerBlockConnectTrace& trace : connectTrace.GetBlocksConnected()) {
assert(trace.pblock && trace.pindex);
GetMainSignals().BlockConnected(this->GetRole(), trace.pblock, trace.pindex);
// Avoid keeping the CBlock around longer than we have to
if (trace.pindex == pindexNewTip && CValidationInterface::any_use_tip_block_cache) {
new_tip_block = trace.pblock;
}
}
// This will have been toggled in
@ -3276,7 +3285,7 @@ bool Chainstate::ActivateBestChain(BlockValidationState& state, std::shared_ptr<
// Enqueue while holding cs_main to ensure that UpdatedBlockTip is called in the order in which blocks are connected
if (this == &m_chainman.ActiveChainstate() && pindexFork != pindexNewTip) {
// Notify ValidationInterface subscribers
GetMainSignals().UpdatedBlockTip(pindexNewTip, pindexFork, still_in_ibd);
GetMainSignals().UpdatedBlockTip(pindexNewTip, pindexFork, still_in_ibd, new_tip_block);
// Always notify the UI if a new block tip was connected
if (kernel::IsInterrupted(m_chainman.GetNotifications().blockTip(GetSynchronizationState(still_in_ibd), *pindexNewTip))) {

View File

@ -20,6 +20,8 @@
std::string RemovalReasonToString(const MemPoolRemovalReason& r) noexcept;
bool CValidationInterface::any_use_tip_block_cache{false};
/**
* MainSignalsImpl manages a list of shared_ptr<CValidationInterface> callbacks.
*
@ -191,13 +193,13 @@ void SyncWithValidationInterfaceQueue()
#define LOG_EVENT(fmt, ...) \
LogPrint(BCLog::VALIDATION, fmt "\n", __VA_ARGS__)
void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {
void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload, const std::shared_ptr<const CBlock>& block) {
// Dependencies exist that require UpdatedBlockTip events to be delivered in the order in which
// the chain actually updates. One way to ensure this is for the caller to invoke this signal
// in the same critical section where the chain is updated
auto event = [pindexNew, pindexFork, fInitialDownload, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.UpdatedBlockTip(pindexNew, pindexFork, fInitialDownload); });
auto event = [pindexNew, pindexFork, fInitialDownload, block, this] {
m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.UpdatedBlockTip(pindexNew, pindexFork, fInitialDownload, block); });
};
ENQUEUE_AND_LOG_EVENT(event, "%s: new block hash=%s fork block hash=%s (in IBD=%s)", __func__,
pindexNew->GetBlockHash().ToString(),

View File

@ -75,6 +75,9 @@ void SyncWithValidationInterfaceQueue() LOCKS_EXCLUDED(cs_main);
* ValidationInterface() subscribers.
*/
class CValidationInterface {
public:
static bool any_use_tip_block_cache;
protected:
/**
* Protected destructor so that instances can only be deleted by derived classes.
@ -90,7 +93,7 @@ protected:
*
* Called on a background thread. Only called for the active chainstate.
*/
virtual void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {}
virtual void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload, const std::shared_ptr<const CBlock>& block) {}
/**
* Notifies listeners of a transaction having been added to mempool.
*
@ -199,7 +202,7 @@ public:
size_t CallbacksPending();
void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload);
void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload, const std::shared_ptr<const CBlock>&);
void TransactionAddedToMempool(const CTransactionRef&, uint64_t mempool_sequence);
void TransactionRemovedFromMempool(const CTransactionRef&, MemPoolRemovalReason, uint64_t mempool_sequence);
void BlockConnected(ChainstateRole, const std::shared_ptr<const CBlock> &, const CBlockIndex *pindex);

View File

@ -13,7 +13,7 @@ CZMQAbstractNotifier::~CZMQAbstractNotifier()
assert(!psocket);
}
bool CZMQAbstractNotifier::NotifyBlock(const CBlockIndex * /*CBlockIndex*/)
bool CZMQAbstractNotifier::NotifyBlock(const std::shared_ptr<const CBlock>& /*CBlock*/)
{
return true;
}

View File

@ -10,6 +10,7 @@
#include <memory>
#include <string>
class CBlock;
class CBlockIndex;
class CTransaction;
class CZMQAbstractNotifier;
@ -45,7 +46,7 @@ public:
virtual void Shutdown() = 0;
// Notifies of ConnectTip result, i.e., new active tip only
virtual bool NotifyBlock(const CBlockIndex *pindex);
virtual bool NotifyBlock(const std::shared_ptr<const CBlock>& block);
// Notifies of every block connection
virtual bool NotifyBlockConnect(const CBlockIndex *pindex);
// Notifies of every block disconnection

View File

@ -4,6 +4,7 @@
#include <zmq/zmqnotificationinterface.h>
#include <chain.h>
#include <common/args.h>
#include <kernel/chain.h>
#include <logging.h>
@ -69,6 +70,7 @@ std::unique_ptr<CZMQNotificationInterface> CZMQNotificationInterface::Create(std
{
std::unique_ptr<CZMQNotificationInterface> notificationInterface(new CZMQNotificationInterface());
notificationInterface->notifiers = std::move(notifiers);
notificationInterface->m_get_block_by_index = get_block_by_index;
if (notificationInterface->Initialize()) {
return notificationInterface;
@ -137,13 +139,26 @@ void TryForEachAndRemoveFailed(std::list<std::unique_ptr<CZMQAbstractNotifier>>&
} // anonymous namespace
void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload, const std::shared_ptr<const CBlock>& block_cached)
{
if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
return;
TryForEachAndRemoveFailed(notifiers, [pindexNew](CZMQAbstractNotifier* notifier) {
return notifier->NotifyBlock(pindexNew);
std::shared_ptr<const CBlock> block = block_cached;
if (!block) {
// This shouldn't happen, but just in case
LogPrintf("BUG! PLEASE REPORT THIS! Cached CBlock unexpectedly missing in %s\n", __func__);
std::shared_ptr<CBlock> pblock = std::make_shared<CBlock>();
if (!m_get_block_by_index(*pblock, *pindexNew)) {
LogPrint(BCLog::ZMQ, "Error: Can't read block %s from disk\n", pindexNew->GetBlockHash().ToString());
return;
}
block = pblock;
}
TryForEachAndRemoveFailed(notifiers, [block](CZMQAbstractNotifier* notifier) {
return notifier->NotifyBlock(block);
});
}

View File

@ -35,13 +35,14 @@ protected:
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override;
void BlockConnected(ChainstateRole role, const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected) override;
void BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected) override;
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload, const std::shared_ptr<const CBlock>& block) override;
private:
CZMQNotificationInterface();
void* pcontext{nullptr};
std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
std::function<bool(CBlock&, const CBlockIndex&)> m_get_block_by_index;
};
extern std::unique_ptr<CZMQNotificationInterface> g_zmq_notification_interface;

View File

@ -5,13 +5,11 @@
#include <zmq/zmqpublishnotifier.h>
#include <chain.h>
#include <chainparams.h>
#include <crypto/common.h>
#include <kernel/cs_main.h>
#include <logging.h>
#include <netaddress.h>
#include <netbase.h>
#include <node/blockstorage.h>
#include <primitives/block.h>
#include <primitives/transaction.h>
#include <rpc/server.h>
@ -35,10 +33,6 @@
#include <utility>
#include <vector>
namespace Consensus {
struct Params;
}
static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
static const char *MSG_HASHBLOCK = "hashblock";
@ -218,9 +212,9 @@ bool CZMQAbstractPublishNotifier::SendZmqMessage(const char *command, const void
return true;
}
bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
bool CZMQPublishHashBlockNotifier::NotifyBlock(const std::shared_ptr<const CBlock>& block)
{
uint256 hash = pindex->GetBlockHash();
uint256 hash = block->GetHash();
LogPrint(BCLog::ZMQ, "Publish hashblock %s to %s\n", hash.GetHex(), this->address);
uint8_t data[32];
for (unsigned int i = 0; i < 32; i++) {
@ -240,19 +234,11 @@ bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &t
return SendZmqMessage(MSG_HASHTX, data, 32);
}
bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
bool CZMQPublishRawBlockNotifier::NotifyBlock(const std::shared_ptr<const CBlock>& block)
{
LogPrint(BCLog::ZMQ, "Publish rawblock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address);
LogPrint(BCLog::ZMQ, "Publish rawblock %s to %s\n", block->GetHash().GetHex(), this->address);
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
CBlock block;
if (!m_get_block_by_index(block, *pindex)) {
LogPrint(BCLog::ZMQ, "Error: Can't read block %s from disk\n", pindex->GetBlockHash().ToString());
return false;
}
ss << block;
return SendZmqMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
}

View File

@ -37,7 +37,7 @@ public:
class CZMQPublishHashBlockNotifier : public CZMQAbstractPublishNotifier
{
public:
bool NotifyBlock(const CBlockIndex *pindex) override;
bool NotifyBlock(const std::shared_ptr<const CBlock>& block) override;
};
class CZMQPublishHashTransactionNotifier : public CZMQAbstractPublishNotifier
@ -54,7 +54,7 @@ private:
public:
CZMQPublishRawBlockNotifier(std::function<bool(CBlock&, const CBlockIndex&)> get_block_by_index)
: m_get_block_by_index{std::move(get_block_by_index)} {}
bool NotifyBlock(const CBlockIndex *pindex) override;
bool NotifyBlock(const std::shared_ptr<const CBlock>& block) override;
};
class CZMQPublishRawTransactionNotifier : public CZMQAbstractPublishNotifier

View File

@ -283,11 +283,17 @@ class ZMQTest (BitcoinTestFramework):
assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[i])["tx"][0])
# If we do a simple invalidate we announce the disconnected coinbase
# and announce the previous fork tip
self.nodes[0].invalidateblock(connect_blocks[1])
assert_equal(hashblock.receive().hex(), disconnect_block)
assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[1])["tx"][0])
# And the current tip
assert_equal(hashtx.receive().hex(), self.nodes[1].getblock(connect_blocks[0])["tx"][0])
# Reconsider block to make sure we receive the block announcements again
self.nodes[0].reconsiderblock(connect_blocks[1])
assert_equal(hashblock.receive().hex(), connect_blocks[1])
def test_sequence(self):
"""
Sequence zmq notifications give every blockhash and txhash in order