ZMQ: add publishers of wallet tx

Topic will indicate if transaction came from mempool or block so developers can handle the transaction accordingly without a RPC round trip to bitcoind.

(includes ZMQ: Making CWallet::TransactionAddedToWallet static)
This commit is contained in:
Doron Somech 2017-06-08 13:14:31 +03:00 committed by Luke Dashjr
parent 10f2255b2b
commit 7b41419e8d
12 changed files with 183 additions and 11 deletions

View File

@ -61,8 +61,10 @@ Currently, the following notifications are supported:
-zmqpubhashtx=address -zmqpubhashtx=address
-zmqpubhashblock=address -zmqpubhashblock=address
-zmqpubhashwallettx=address
-zmqpubrawblock=address -zmqpubrawblock=address
-zmqpubrawtx=address -zmqpubrawtx=address
-zmqpubrawwallettx=address
-zmqpubsequence=address -zmqpubsequence=address
The socket type is PUB and the address must be a valid ZeroMQ socket The socket type is PUB and the address must be a valid ZeroMQ socket
@ -93,6 +95,15 @@ corresponds to the notification type. For instance, for the
notification `-zmqpubhashtx` the topic is `hashtx` (no null notification `-zmqpubhashtx` the topic is `hashtx` (no null
terminator). These options can also be provided in bitcoin.conf. terminator). These options can also be provided in bitcoin.conf.
For wallet transaction notifications (both hash and tx), the
topic also indicate if the transaction came from a block
or mempool. If originated from mempool `-mempool` postfix
will be added to the topic, for block `-block` postfix will
be added. Because zeromq is using prefix matching for topics
you can subscribe to `rawwallettx` (or `hashwallettx`) to get
both notifications. If you only want one type of notification
subscribe to either `rawwallettx-mempool` or `rawwallettx-block`.
The topics are: The topics are:
`sequence`: the body is structured as the following based on the type of message: `sequence`: the body is structured as the following based on the type of message:

View File

@ -488,7 +488,7 @@ endif
# zmq # # zmq #
if ENABLE_ZMQ if ENABLE_ZMQ
libbitcoin_zmq_a_CPPFLAGS = $(AM_CPPFLAGS) $(BITCOIN_INCLUDES) $(ZMQ_CFLAGS) libbitcoin_zmq_a_CPPFLAGS = $(AM_CPPFLAGS) $(BITCOIN_INCLUDES) $(BOOST_CPPFLAGS) $(ZMQ_CFLAGS)
libbitcoin_zmq_a_CXXFLAGS = $(AM_CXXFLAGS) $(PIE_FLAGS) libbitcoin_zmq_a_CXXFLAGS = $(AM_CXXFLAGS) $(PIE_FLAGS)
libbitcoin_zmq_a_SOURCES = \ libbitcoin_zmq_a_SOURCES = \
zmq/zmqabstractnotifier.cpp \ zmq/zmqabstractnotifier.cpp \

View File

@ -589,24 +589,32 @@ void SetupServerArgs(ArgsManager& argsman)
#ifdef ENABLE_ZMQ #ifdef ENABLE_ZMQ
argsman.AddArg("-zmqpubhashblock=<address>", "Enable publish hash block in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubhashblock=<address>", "Enable publish hash block in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashtx=<address>", "Enable publish hash transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubhashtx=<address>", "Enable publish hash transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashwallettx=<address>", "Enable publish hash wallet transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawblock=<address>", "Enable publish raw block in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawblock=<address>", "Enable publish raw block in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtx=<address>", "Enable publish raw transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawtx=<address>", "Enable publish raw transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawwallettx=<address>", "Enable publish raw wallet transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubsequence=<address>", "Enable publish hash block and tx sequence in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubsequence=<address>", "Enable publish hash block and tx sequence in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashblockhwm=<n>", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubhashblockhwm=<n>", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashtxhwm=<n>", strprintf("Set publish hash transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubhashtxhwm=<n>", strprintf("Set publish hash transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashwallettxhwm=<n>", strprintf("Set publish hash wallet transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawblockhwm=<n>", strprintf("Set publish raw block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawblockhwm=<n>", strprintf("Set publish raw block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtxhwm=<n>", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawtxhwm=<n>", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawwallettxhwm=<n>", strprintf("Set publish raw wallet transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubsequencehwm=<n>", strprintf("Set publish hash sequence message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubsequencehwm=<n>", strprintf("Set publish hash sequence message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
#else #else
hidden_args.emplace_back("-zmqpubhashblock=<address>"); hidden_args.emplace_back("-zmqpubhashblock=<address>");
hidden_args.emplace_back("-zmqpubhashtx=<address>"); hidden_args.emplace_back("-zmqpubhashtx=<address>");
hidden_args.emplace_back("-zmqpubhashwallettx=<address>");
hidden_args.emplace_back("-zmqpubrawblock=<address>"); hidden_args.emplace_back("-zmqpubrawblock=<address>");
hidden_args.emplace_back("-zmqpubrawtx=<address>"); hidden_args.emplace_back("-zmqpubrawtx=<address>");
hidden_args.emplace_back("-zmqpubrawwallettx=<address>");
hidden_args.emplace_back("-zmqpubsequence=<n>"); hidden_args.emplace_back("-zmqpubsequence=<n>");
hidden_args.emplace_back("-zmqpubhashblockhwm=<n>"); hidden_args.emplace_back("-zmqpubhashblockhwm=<n>");
hidden_args.emplace_back("-zmqpubhashtxhwm=<n>"); hidden_args.emplace_back("-zmqpubhashtxhwm=<n>");
hidden_args.emplace_back("-zmqpubhashwallettxhwm=<n>");
hidden_args.emplace_back("-zmqpubrawblockhwm=<n>"); hidden_args.emplace_back("-zmqpubrawblockhwm=<n>");
hidden_args.emplace_back("-zmqpubrawtxhwm=<n>"); hidden_args.emplace_back("-zmqpubrawtxhwm=<n>");
hidden_args.emplace_back("-zmqpubrawwallettxhwm=<n>");
hidden_args.emplace_back("-zmqpubsequencehwm=<n>"); hidden_args.emplace_back("-zmqpubsequencehwm=<n>");
#endif #endif

View File

@ -91,6 +91,11 @@ using util::ToString;
namespace wallet { namespace wallet {
/*
* Signal when transactions are added to wallet
*/
boost::signals2::signal<void (const CTransactionRef &ptxn, const uint256 &blockHash)> CWallet::TransactionAddedToWallet;
bool AddWalletSetting(interfaces::Chain& chain, const std::string& wallet_name) bool AddWalletSetting(interfaces::Chain& chain, const std::string& wallet_name)
{ {
const auto update_function = [&wallet_name](common::SettingsValue& setting_value) { const auto update_function = [&wallet_name](common::SettingsValue& setting_value) {
@ -1215,6 +1220,9 @@ CWalletTx* CWallet::AddToWallet(CTransactionRef tx, const TxState& state, const
// Notify UI of new or updated transaction // Notify UI of new or updated transaction
NotifyTransactionChanged(hash, fInsertedNew ? CT_NEW : CT_UPDATED); NotifyTransactionChanged(hash, fInsertedNew ? CT_NEW : CT_UPDATED);
// Notify listeners on new wallet transaction
CWallet::TransactionAddedToWallet(wtx.tx, TxStateSerializedBlockHash(wtx.m_state));
#if HAVE_SYSTEM #if HAVE_SYSTEM
// notify an external script when a wallet transaction comes in or is updated // notify an external script when a wallet transaction comes in or is updated
std::string strCmd = m_notify_tx_changed_script; std::string strCmd = m_notify_tx_changed_script;

View File

@ -855,6 +855,9 @@ public:
*/ */
boost::signals2::signal<void(const uint256& hashTx, ChangeType status)> NotifyTransactionChanged; boost::signals2::signal<void(const uint256& hashTx, ChangeType status)> NotifyTransactionChanged;
static boost::signals2::signal<void (const CTransactionRef &ptxn,
const uint256 &blockHash)> TransactionAddedToWallet;
/** Show progress e.g. for rescan */ /** Show progress e.g. for rescan */
boost::signals2::signal<void (const std::string &title, int nProgress)> ShowProgress; boost::signals2::signal<void (const std::string &title, int nProgress)> ShowProgress;

View File

@ -42,3 +42,7 @@ bool CZMQAbstractNotifier::NotifyTransactionRemoval(const CTransaction &/*transa
{ {
return true; return true;
} }
bool CZMQAbstractNotifier::NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock){
return true;
}

View File

@ -13,6 +13,7 @@
class CBlockIndex; class CBlockIndex;
class CTransaction; class CTransaction;
class CZMQAbstractNotifier; class CZMQAbstractNotifier;
class uint256;
using CZMQNotifierFactory = std::function<std::unique_ptr<CZMQAbstractNotifier>()>; using CZMQNotifierFactory = std::function<std::unique_ptr<CZMQAbstractNotifier>()>;
@ -56,6 +57,7 @@ public:
virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence); virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence);
// Notifies of transactions added to mempool or appearing in blocks // Notifies of transactions added to mempool or appearing in blocks
virtual bool NotifyTransaction(const CTransaction &transaction); virtual bool NotifyTransaction(const CTransaction &transaction);
virtual bool NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock);
protected: protected:
void* psocket{nullptr}; void* psocket{nullptr};

View File

@ -2,6 +2,10 @@
// Distributed under the MIT software license, see the accompanying // Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php. // file COPYING or http://www.opensource.org/licenses/mit-license.php.
#if defined(HAVE_CONFIG_H)
#include <config/bitcoin-config.h>
#endif
#include <zmq/zmqnotificationinterface.h> #include <zmq/zmqnotificationinterface.h>
#include <common/args.h> #include <common/args.h>
@ -24,6 +28,10 @@
#include <utility> #include <utility>
#include <vector> #include <vector>
#ifdef ENABLE_WALLET
#include <wallet/wallet.h>
#endif
CZMQNotificationInterface::CZMQNotificationInterface() = default; CZMQNotificationInterface::CZMQNotificationInterface() = default;
CZMQNotificationInterface::~CZMQNotificationInterface() CZMQNotificationInterface::~CZMQNotificationInterface()
@ -45,10 +53,12 @@ std::unique_ptr<CZMQNotificationInterface> CZMQNotificationInterface::Create(std
std::map<std::string, CZMQNotifierFactory> factories; std::map<std::string, CZMQNotifierFactory> factories;
factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>; factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>; factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
factories["pubhashwallettx"] = CZMQAbstractNotifier::Create<CZMQPublishHashWalletTransactionNotifier>;
factories["pubrawblock"] = [&get_block_by_index]() -> std::unique_ptr<CZMQAbstractNotifier> { factories["pubrawblock"] = [&get_block_by_index]() -> std::unique_ptr<CZMQAbstractNotifier> {
return std::make_unique<CZMQPublishRawBlockNotifier>(get_block_by_index); return std::make_unique<CZMQPublishRawBlockNotifier>(get_block_by_index);
}; };
factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>; factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
factories["pubrawwallettx"] = CZMQAbstractNotifier::Create<CZMQPublishRawWalletTransactionNotifier>;
factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>; factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers; std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
@ -93,6 +103,10 @@ bool CZMQNotificationInterface::Initialize()
LogPrint(BCLog::ZMQ, "Initialize notification interface\n"); LogPrint(BCLog::ZMQ, "Initialize notification interface\n");
assert(!pcontext); assert(!pcontext);
#ifdef ENABLE_WALLET
m_wtx_added_connection = wallet::CWallet::TransactionAddedToWallet.connect(std::bind(&CZMQNotificationInterface::TransactionAddedToWallet, this, std::placeholders::_1, std::placeholders::_2));
#endif
pcontext = zmq_ctx_new(); pcontext = zmq_ctx_new();
if (!pcontext) if (!pcontext)
@ -117,6 +131,11 @@ bool CZMQNotificationInterface::Initialize()
void CZMQNotificationInterface::Shutdown() void CZMQNotificationInterface::Shutdown()
{ {
LogPrint(BCLog::ZMQ, "Shutdown notification interface\n"); LogPrint(BCLog::ZMQ, "Shutdown notification interface\n");
#ifdef ENABLE_WALLET
m_wtx_added_connection.disconnect();
#endif
if (pcontext) if (pcontext)
{ {
for (auto& notifier : notifiers) { for (auto& notifier : notifiers) {
@ -209,4 +228,12 @@ void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CB
}); });
} }
void CZMQNotificationInterface::TransactionAddedToWallet(const CTransactionRef& ptx, const uint256 &hashBlock) {
const CTransaction& tx = *ptx;
TryForEachAndRemoveFailed(notifiers, [&tx, &hashBlock](CZMQAbstractNotifier* notifier) {
return notifier->NotifyWalletTransaction(tx, hashBlock);
});
}
std::unique_ptr<CZMQNotificationInterface> g_zmq_notification_interface; std::unique_ptr<CZMQNotificationInterface> g_zmq_notification_interface;

View File

@ -14,6 +14,8 @@
#include <memory> #include <memory>
#include <vector> #include <vector>
#include <boost/signals2/connection.hpp>
class CBlock; class CBlock;
class CBlockIndex; class CBlockIndex;
class CZMQAbstractNotifier; class CZMQAbstractNotifier;
@ -32,6 +34,8 @@ protected:
bool Initialize(); bool Initialize();
void Shutdown(); void Shutdown();
void TransactionAddedToWallet(const CTransactionRef& tx, const uint256 &hashBlock);
// CValidationInterface // CValidationInterface
void TransactionAddedToMempool(const NewMempoolTransactionInfo& tx, uint64_t mempool_sequence) override; void TransactionAddedToMempool(const NewMempoolTransactionInfo& tx, uint64_t mempool_sequence) override;
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override; void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override;
@ -44,6 +48,7 @@ private:
void* pcontext{nullptr}; void* pcontext{nullptr};
std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers; std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
boost::signals2::connection m_wtx_added_connection;
}; };
extern std::unique_ptr<CZMQNotificationInterface> g_zmq_notification_interface; extern std::unique_ptr<CZMQNotificationInterface> g_zmq_notification_interface;

View File

@ -42,8 +42,12 @@ static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifi
static const char *MSG_HASHBLOCK = "hashblock"; static const char *MSG_HASHBLOCK = "hashblock";
static const char *MSG_HASHTX = "hashtx"; static const char *MSG_HASHTX = "hashtx";
static const char *MSG_HASHWALLETTXMEMPOOL = "hashwallettx-mempool";
static const char *MSG_HASHWALLETTXBLOCK = "hashwallettx-block";
static const char *MSG_RAWBLOCK = "rawblock"; static const char *MSG_RAWBLOCK = "rawblock";
static const char *MSG_RAWTX = "rawtx"; static const char *MSG_RAWTX = "rawtx";
static const char *MSG_RAWWALLETTXMEMPOOL = "rawwallettx-mempool";
static const char *MSG_RAWWALLETTXBLOCK = "rawwallettx-block";
static const char *MSG_SEQUENCE = "sequence"; static const char *MSG_SEQUENCE = "sequence";
// Internal function to send multipart message // Internal function to send multipart message
@ -239,6 +243,23 @@ bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &t
return SendZmqMessage(MSG_HASHTX, data, 32); return SendZmqMessage(MSG_HASHTX, data, 32);
} }
bool CZMQPublishHashWalletTransactionNotifier::NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock){
uint256 hash = transaction.GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish hashwallettx %s to %s\n", hash.GetHex(), this->address);
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
const char *command;
if (!hashBlock.IsNull())
command = MSG_HASHWALLETTXBLOCK;
else
command = MSG_HASHWALLETTXMEMPOOL;
return SendZmqMessage(command, data, 32);
}
bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
{ {
LogPrint(BCLog::ZMQ, "Publish rawblock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address); LogPrint(BCLog::ZMQ, "Publish rawblock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address);
@ -301,3 +322,19 @@ bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &t
LogPrint(BCLog::ZMQ, "Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address); LogPrint(BCLog::ZMQ, "Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address);
return SendSequenceMsg(*this, hash, /* Mempool (R)emoval */ 'R', mempool_sequence); return SendSequenceMsg(*this, hash, /* Mempool (R)emoval */ 'R', mempool_sequence);
} }
bool CZMQPublishRawWalletTransactionNotifier::NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock){
uint256 hash = transaction.GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish rawwallettx %s to %s\n", hash.GetHex(), this->address);
DataStream ss;
ss << TX_WITH_WITNESS(transaction);
const char *command;
if (!hashBlock.IsNull())
command = MSG_RAWWALLETTXBLOCK;
else
command = MSG_RAWWALLETTXMEMPOOL;
return SendZmqMessage(command, &(*ss.begin()), ss.size());
}

View File

@ -46,6 +46,12 @@ public:
bool NotifyTransaction(const CTransaction &transaction) override; bool NotifyTransaction(const CTransaction &transaction) override;
}; };
class CZMQPublishHashWalletTransactionNotifier : public CZMQAbstractPublishNotifier
{
public:
bool NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock) override;
};
class CZMQPublishRawBlockNotifier : public CZMQAbstractPublishNotifier class CZMQPublishRawBlockNotifier : public CZMQAbstractPublishNotifier
{ {
private: private:
@ -63,6 +69,12 @@ public:
bool NotifyTransaction(const CTransaction &transaction) override; bool NotifyTransaction(const CTransaction &transaction) override;
}; };
class CZMQPublishRawWalletTransactionNotifier : public CZMQAbstractPublishNotifier
{
public:
bool NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock) override;
};
class CZMQPublishSequenceNotifier : public CZMQAbstractPublishNotifier class CZMQPublishSequenceNotifier : public CZMQAbstractPublishNotifier
{ {
public: public:

View File

@ -3,6 +3,8 @@
# Distributed under the MIT software license, see the accompanying # Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php. # file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test the ZMQ notification interface.""" """Test the ZMQ notification interface."""
import io
import os import os
import struct import struct
import tempfile import tempfile
@ -23,6 +25,7 @@ from test_framework.messages import (
CBlock, CBlock,
hash256, hash256,
tx_from_hex, tx_from_hex,
CTransaction,
) )
from test_framework.util import ( from test_framework.util import (
assert_equal, assert_equal,
@ -53,10 +56,12 @@ class ZMQSubscriber:
self.socket.setsockopt(zmq.SUBSCRIBE, self.topic) self.socket.setsockopt(zmq.SUBSCRIBE, self.topic)
# Receive message from publisher and verify that topic and sequence match # Receive message from publisher and verify that topic and sequence match
def _receive_from_publisher_and_check(self): def _receive_from_publisher_and_check(self, specific_topic = None):
expected_topic = specific_topic if specific_topic else self.topic
topic, body, seq = self.socket.recv_multipart() topic, body, seq = self.socket.recv_multipart()
# Topic should match the subscriber topic. # Topic should match the subscriber topic.
assert_equal(topic, self.topic) assert_equal(topic, expected_topic)
# Sequence should be incremental. # Sequence should be incremental.
received_seq = struct.unpack('<I', seq)[-1] received_seq = struct.unpack('<I', seq)[-1]
if self.sequence is None: if self.sequence is None:
@ -66,8 +71,8 @@ class ZMQSubscriber:
self.sequence += 1 self.sequence += 1
return body return body
def receive(self): def receive(self, specific_topic = None):
return self._receive_from_publisher_and_check() return self._receive_from_publisher_and_check(specific_topic)
def receive_sequence(self): def receive_sequence(self):
body = self._receive_from_publisher_and_check() body = self._receive_from_publisher_and_check()
@ -90,7 +95,10 @@ class ZMQTestSetupBlock:
raw transaction data. raw transaction data.
""" """
def __init__(self, test_framework, node): def __init__(self, test_framework, node):
self.block_hash = test_framework.generate(node, 1, sync_fun=test_framework.no_op)[0] if test_framework.is_wallet_compiled():
self.block_hash = test_framework.generatetoaddress(node, nblocks=1, address=node.getnewaddress(), maxtries=1000000, sync_fun=test_framework.no_op)[0]
else:
self.block_hash = test_framework.generate(node, 1, sync_fun=test_framework.no_op)[0]
coinbase = node.getblock(self.block_hash, 2)['tx'][0] coinbase = node.getblock(self.block_hash, 2)['tx'][0]
self.tx_hash = coinbase['txid'] self.tx_hash = coinbase['txid']
self.raw_tx = coinbase['hex'] self.raw_tx = coinbase['hex']
@ -106,11 +114,16 @@ class ZMQTestSetupBlock:
class ZMQTest (BitcoinTestFramework): class ZMQTest (BitcoinTestFramework):
def add_options(self, parser):
self.add_wallet_options(parser)
def set_test_params(self): def set_test_params(self):
self.num_nodes = 2 self.num_nodes = 2
# whitelist peers to speed up tx relay / mempool sync # whitelist peers to speed up tx relay / mempool sync
self.noban_tx_relay = True self.noban_tx_relay = True
self.zmq_port_base = p2p_port(self.num_nodes + 1) self.zmq_port_base = p2p_port(self.num_nodes + 1)
if self.is_wallet_compiled():
self.skip_if_no_wallet()
def skip_test_if_missing_module(self): def skip_test_if_missing_module(self):
self.skip_if_no_py3_zmq() self.skip_if_no_py3_zmq()
@ -163,7 +176,10 @@ class ZMQTest (BitcoinTestFramework):
recv_failed = False recv_failed = False
for sub in subscribers: for sub in subscribers:
try: try:
while not test_block.caused_notification(sub.receive().hex()): specific_topic = sub.topic
if b'wallet' in sub.topic:
specific_topic += b"-block"
while not test_block.caused_notification(sub.receive(specific_topic=specific_topic).hex()):
self.log.debug("Ignoring sync-up notification for previously generated block.") self.log.debug("Ignoring sync-up notification for previously generated block.")
except zmq.error.Again: except zmq.error.Again:
self.log.debug("Didn't receive sync-up notification, trying again.") self.log.debug("Didn't receive sync-up notification, trying again.")
@ -195,16 +211,36 @@ class ZMQTest (BitcoinTestFramework):
socket_path = tempfile.NamedTemporaryFile().name socket_path = tempfile.NamedTemporaryFile().name
address = f"ipc://{socket_path}" address = f"ipc://{socket_path}"
subs = self.setup_zmq_test([(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]]) services = ["hashblock", "hashtx", "rawblock", "rawtx"]
if self.is_wallet_compiled():
services += ["hashwallettx", "rawwallettx"]
subs = self.setup_zmq_test([(topic, address) for topic in services])
hashblock = subs[0] hashblock = subs[0]
hashtx = subs[1] hashtx = subs[1]
rawblock = subs[2] rawblock = subs[2]
rawtx = subs[3] rawtx = subs[3]
if self.is_wallet_compiled():
hashwallettx = subs[-2]
rawwallettx = subs[-1]
self.sync_all()
# Flush initial wallettx events before we begin
while True:
try:
topic, body, seq = hashwallettx.socket.recv_multipart()
except zmq.ZMQError:
break
subscriber = {b'hashwallettx-block': hashwallettx, b'rawwallettx-block': rawwallettx}[topic]
assert_equal(struct.unpack('<I', seq)[-1], subscriber.sequence)
subscriber.sequence += 1
num_blocks = 5 num_blocks = 5
self.log.info(f"Generate {num_blocks} blocks (and {num_blocks} coinbase txes)") self.log.info(f"Generate {num_blocks} blocks (and {num_blocks} coinbase txes)")
genhashes = self.generatetoaddress(self.nodes[0], num_blocks, ADDRESS_BCRT1_UNSPENDABLE) if self.is_wallet_compiled():
genhashes = self.generate(self.nodes[0], num_blocks)
else:
genhashes = self.generatetoaddress(self.nodes[0], num_blocks, ADDRESS_BCRT1_UNSPENDABLE)
for x in range(num_blocks): for x in range(num_blocks):
# Should receive the coinbase txid. # Should receive the coinbase txid.
@ -224,6 +260,15 @@ class ZMQTest (BitcoinTestFramework):
assert_equal(len(block.vtx), 1) assert_equal(len(block.vtx), 1)
assert_equal(genhashes[x], hash256_reversed(hex[:80]).hex()) assert_equal(genhashes[x], hash256_reversed(hex[:80]).hex())
if self.is_wallet_compiled():
# Should receive wallet tx
wallettxid = hashwallettx.receive(b"hashwallettx-block")
wallethex = rawwallettx.receive(b"rawwallettx-block")
wallettx = CTransaction()
wallettx.deserialize(io.BytesIO(wallethex))
wallettx.calc_sha256()
assert_equal(wallettx.hash, wallettxid.hex())
# Should receive the generated block hash. # Should receive the generated block hash.
hash = hashblock.receive().hex() hash = hashblock.receive().hex()
assert_equal(genhashes[x], hash) assert_equal(genhashes[x], hash)
@ -232,8 +277,12 @@ class ZMQTest (BitcoinTestFramework):
self.log.info("Wait for tx from second node") self.log.info("Wait for tx from second node")
payment_tx = self.wallet.send_self_transfer(from_node=self.nodes[1]) if self.is_wallet_compiled():
payment_txid = payment_tx['txid'] payment_txid = self.nodes[1].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
payment_tx = {'wtxid': self.nodes[1].getrawtransaction(payment_txid, 1)['hash']}
else:
payment_tx = self.wallet.send_self_transfer(from_node=self.nodes[1])
payment_txid = payment_tx['txid']
self.sync_all() self.sync_all()
# Should receive the broadcasted txid. # Should receive the broadcasted txid.
txid = hashtx.receive() txid = hashtx.receive()
@ -250,13 +299,19 @@ class ZMQTest (BitcoinTestFramework):
txid = hashtx.receive() txid = hashtx.receive()
assert_equal(payment_txid, txid.hex()) assert_equal(payment_txid, txid.hex())
if self.is_wallet_compiled():
wallettxid = hashwallettx.receive(b"hashwallettx-mempool")
wallethex = rawwallettx.receive(b"rawwallettx-mempool")
assert_equal(hash256_reversed(wallethex), wallettxid)
self.log.info("Test the getzmqnotifications RPC") self.log.info("Test the getzmqnotifications RPC")
assert_equal(self.nodes[0].getzmqnotifications(), [ assert_equal(self.nodes[0].getzmqnotifications(), [
{"type": "pubhashblock", "address": address, "hwm": 1000}, {"type": "pubhashblock", "address": address, "hwm": 1000},
{"type": "pubhashtx", "address": address, "hwm": 1000}, {"type": "pubhashtx", "address": address, "hwm": 1000},
] + ([{"type": "pubhashwallettx", "address": address, "hwm": 1000}] if self.is_wallet_compiled() else []) + [
{"type": "pubrawblock", "address": address, "hwm": 1000}, {"type": "pubrawblock", "address": address, "hwm": 1000},
{"type": "pubrawtx", "address": address, "hwm": 1000}, {"type": "pubrawtx", "address": address, "hwm": 1000},
] + ([{"type": "pubrawwallettx", "address": address, "hwm": 1000}] if self.is_wallet_compiled() else []) + [
]) ])
assert_equal(self.nodes[1].getzmqnotifications(), []) assert_equal(self.nodes[1].getzmqnotifications(), [])