diff --git a/doc/zmq.md b/doc/zmq.md index 07c340fb99..bfa8e5f84e 100644 --- a/doc/zmq.md +++ b/doc/zmq.md @@ -61,8 +61,10 @@ Currently, the following notifications are supported: -zmqpubhashtx=address -zmqpubhashblock=address + -zmqpubhashwallettx=address -zmqpubrawblock=address -zmqpubrawtx=address + -zmqpubrawwallettx=address -zmqpubsequence=address The socket type is PUB and the address must be a valid ZeroMQ socket @@ -74,8 +76,10 @@ The option to set the PUB socket's outbound message high water mark -zmqpubhashtxhwm=n -zmqpubhashblockhwm=n + -zmqpubhashwallettxhwm=n -zmqpubrawblockhwm=n -zmqpubrawtxhwm=n + -zmqpubrawwallettxhwm=n -zmqpubsequencehwm=n The high water mark value must be an integer greater than or equal to 0. @@ -93,6 +97,14 @@ corresponds to the notification type. For instance, for the notification `-zmqpubhashtx` the topic is `hashtx` (no null terminator). These options can also be provided in bitcoin.conf. +For wallet transaction notifications (both hashwallettx and rawwallettx), the +topic also indicates if the transaction came from a block or mempool. If +originated from mempool `-mempool` postfix will be added to the topic, for +block `-block` postfix will be added. Because zeromq is using prefix matching +for topics you can subscribe to `rawwallettx` (or `hashwallettx`) to get both +notifications. If you only want one type of notification subscribe to either +`rawwallettx-mempool` or `rawwallettx-block`. + The topics are: `sequence`: the body is structured as the following based on the type of message: @@ -121,6 +133,15 @@ Where the 8-byte uints correspond to the mempool sequence number. | hashblock | <32-byte block hash in Little Endian> | + +`rawwallettx`: Identical to `rawtx`, except only when transactions are added (or updated) in an open wallet. Full topic is either `rawwallettx-block` for transactions in a block, or `rawwallettx-mempool` otherwise. + + | rawwallettx-<"block" or "mempool"> | | + +`hashwallettx`: Identical to `hashtx`, except only when transactions are added (or updated) in an open wallet. Full topic is either `hashwallettx-block` for transactions in a block, or `hashwallettx-mempool` otherwise. + + | hashwallettx-<"block" or "mempool"> | <32-byte transaction hash in Little Endian> | + **_NOTE:_** Note that the 32-byte hashes are in Little Endian and not in the Big Endian format that the RPC interface and block explorers use to display transaction and block hashes. ZeroMQ endpoint specifiers for TCP (and others) are documented in the diff --git a/src/Makefile.am b/src/Makefile.am index f1c4f11f8c..f8c69c45dd 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -491,7 +491,7 @@ endif # zmq # if ENABLE_ZMQ -libbitcoin_zmq_a_CPPFLAGS = $(AM_CPPFLAGS) $(BITCOIN_INCLUDES) $(ZMQ_CFLAGS) +libbitcoin_zmq_a_CPPFLAGS = $(AM_CPPFLAGS) $(BITCOIN_INCLUDES) $(BOOST_CPPFLAGS) $(ZMQ_CFLAGS) libbitcoin_zmq_a_CXXFLAGS = $(AM_CXXFLAGS) $(PIE_FLAGS) libbitcoin_zmq_a_SOURCES = \ zmq/zmqabstractnotifier.cpp \ diff --git a/src/init.cpp b/src/init.cpp index aefcb32e61..1ed5df29c6 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -552,24 +552,32 @@ void SetupServerArgs(ArgsManager& argsman) #if ENABLE_ZMQ argsman.AddArg("-zmqpubhashblock=
", "Enable publish hash block in
", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubhashtx=
", "Enable publish hash transaction in
", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); + argsman.AddArg("-zmqpubhashwallettx=
", "Enable publish hash wallet transaction in
", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawblock=
", "Enable publish raw block in
", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawtx=
", "Enable publish raw transaction in
", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); + argsman.AddArg("-zmqpubrawwallettx=
", "Enable publish raw wallet transaction in
", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubsequence=
", "Enable publish hash block and tx sequence in
", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubhashblockhwm=", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubhashtxhwm=", strprintf("Set publish hash transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); + argsman.AddArg("-zmqpubhashwallettxhwm=", strprintf("Set publish hash wallet transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawblockhwm=", strprintf("Set publish raw block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawtxhwm=", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); + argsman.AddArg("-zmqpubrawwallettxhwm=", strprintf("Set publish raw wallet transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubsequencehwm=", strprintf("Set publish hash sequence message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); #else hidden_args.emplace_back("-zmqpubhashblock=
"); hidden_args.emplace_back("-zmqpubhashtx=
"); + hidden_args.emplace_back("-zmqpubhashwallettx=
"); hidden_args.emplace_back("-zmqpubrawblock=
"); hidden_args.emplace_back("-zmqpubrawtx=
"); + hidden_args.emplace_back("-zmqpubrawwallettx=
"); hidden_args.emplace_back("-zmqpubsequence="); hidden_args.emplace_back("-zmqpubhashblockhwm="); hidden_args.emplace_back("-zmqpubhashtxhwm="); + hidden_args.emplace_back("-zmqpubhashwallettxhwm="); hidden_args.emplace_back("-zmqpubrawblockhwm="); hidden_args.emplace_back("-zmqpubrawtxhwm="); + hidden_args.emplace_back("-zmqpubrawwallettxhwm="); hidden_args.emplace_back("-zmqpubsequencehwm="); #endif @@ -1296,6 +1304,8 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) "-zmqpubrawblock", "-zmqpubrawtx", "-zmqpubsequence", + "-zmqpubhashwallettx", + "-zmqpubrawwallettx", }) { for (const std::string& socket_addr : args.GetArgs(port_option)) { std::string host_out; diff --git a/src/wallet/wallet.cpp b/src/wallet/wallet.cpp index d721e57b45..9e7c104f5a 100644 --- a/src/wallet/wallet.cpp +++ b/src/wallet/wallet.cpp @@ -87,6 +87,11 @@ using interfaces::FoundBlock; namespace wallet { +/* + * Signal when transactions are added to wallet + */ +boost::signals2::signal CWallet::TransactionAddedToWallet; + bool AddWalletSetting(interfaces::Chain& chain, const std::string& wallet_name) { common::SettingsValue setting_value = chain.getRwSetting("wallet"); @@ -1201,6 +1206,9 @@ CWalletTx* CWallet::AddToWallet(CTransactionRef tx, const TxState& state, const // Notify UI of new or updated transaction NotifyTransactionChanged(hash, fInsertedNew ? CT_NEW : CT_UPDATED); + // Notify listeners on new wallet transaction + CWallet::TransactionAddedToWallet(wtx.tx, TxStateSerializedBlockHash(wtx.m_state)); + #if HAVE_SYSTEM // notify an external script when a wallet transaction comes in or is updated if (!m_notify_tx_changed_scripts.empty()) { diff --git a/src/wallet/wallet.h b/src/wallet/wallet.h index a2efb095c6..840bda5e56 100644 --- a/src/wallet/wallet.h +++ b/src/wallet/wallet.h @@ -852,6 +852,9 @@ public: */ boost::signals2::signal NotifyTransactionChanged; + static boost::signals2::signal TransactionAddedToWallet; + /** Show progress e.g. for rescan */ boost::signals2::signal ShowProgress; diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp index 3e0fd9b71d..c311367947 100644 --- a/src/zmq/zmqabstractnotifier.cpp +++ b/src/zmq/zmqabstractnotifier.cpp @@ -42,3 +42,7 @@ bool CZMQAbstractNotifier::NotifyTransactionRemoval(const CTransaction &/*transa { return true; } + +bool CZMQAbstractNotifier::NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock){ + return true; +} diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h index fb5962af20..2ff6530407 100644 --- a/src/zmq/zmqabstractnotifier.h +++ b/src/zmq/zmqabstractnotifier.h @@ -14,6 +14,7 @@ class CBlock; class CBlockIndex; class CTransaction; class CZMQAbstractNotifier; +class uint256; using CZMQNotifierFactory = std::function()>; @@ -57,6 +58,7 @@ public: virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence); // Notifies of transactions added to mempool or appearing in blocks virtual bool NotifyTransaction(const CTransaction &transaction); + virtual bool NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock); protected: void* psocket{nullptr}; diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp index d9e31d6287..91afc2f065 100644 --- a/src/zmq/zmqnotificationinterface.cpp +++ b/src/zmq/zmqnotificationinterface.cpp @@ -23,6 +23,10 @@ #include #include +#ifdef ENABLE_WALLET +#include +#endif + CZMQNotificationInterface::CZMQNotificationInterface() { } @@ -46,10 +50,12 @@ std::unique_ptr CZMQNotificationInterface::Create(std std::map factories; factories["pubhashblock"] = CZMQAbstractNotifier::Create; factories["pubhashtx"] = CZMQAbstractNotifier::Create; + factories["pubhashwallettx"] = CZMQAbstractNotifier::Create; factories["pubrawblock"] = [&get_block_by_index]() -> std::unique_ptr { return std::make_unique(get_block_by_index); }; factories["pubrawtx"] = CZMQAbstractNotifier::Create; + factories["pubrawwallettx"] = CZMQAbstractNotifier::Create; factories["pubsequence"] = CZMQAbstractNotifier::Create; std::list> notifiers; @@ -90,6 +96,10 @@ bool CZMQNotificationInterface::Initialize() LogPrint(BCLog::ZMQ, "Initialize notification interface\n"); assert(!pcontext); +#ifdef ENABLE_WALLET + m_wtx_added_connection = wallet::CWallet::TransactionAddedToWallet.connect(std::bind(&CZMQNotificationInterface::TransactionAddedToWallet, this, std::placeholders::_1, std::placeholders::_2)); +#endif + pcontext = zmq_ctx_new(); if (!pcontext) @@ -114,6 +124,11 @@ bool CZMQNotificationInterface::Initialize() void CZMQNotificationInterface::Shutdown() { LogPrint(BCLog::ZMQ, "Shutdown notification interface\n"); + +#ifdef ENABLE_WALLET + m_wtx_added_connection.disconnect(); +#endif + if (pcontext) { for (auto& notifier : notifiers) { @@ -214,4 +229,12 @@ void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptrNotifyWalletTransaction(tx, hashBlock); + }); +} + std::unique_ptr g_zmq_notification_interface; diff --git a/src/zmq/zmqnotificationinterface.h b/src/zmq/zmqnotificationinterface.h index 736770834a..094a7ca35e 100644 --- a/src/zmq/zmqnotificationinterface.h +++ b/src/zmq/zmqnotificationinterface.h @@ -12,6 +12,7 @@ #include #include #include +#include class CBlock; class CBlockIndex; @@ -30,6 +31,8 @@ protected: bool Initialize(); void Shutdown(); + void TransactionAddedToWallet(const CTransactionRef& tx, const uint256 &hashBlock); + // CValidationInterface void TransactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) override; void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override; @@ -42,6 +45,7 @@ private: void* pcontext{nullptr}; std::list> notifiers; + boost::signals2::connection m_wtx_added_connection; std::function m_get_block_by_index; }; diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp index 807de57051..4f09ae664d 100644 --- a/src/zmq/zmqpublishnotifier.cpp +++ b/src/zmq/zmqpublishnotifier.cpp @@ -37,8 +37,12 @@ static std::multimap mapPublishNotifi static const char *MSG_HASHBLOCK = "hashblock"; static const char *MSG_HASHTX = "hashtx"; +static const char *MSG_HASHWALLETTXMEMPOOL = "hashwallettx-mempool"; +static const char *MSG_HASHWALLETTXBLOCK = "hashwallettx-block"; static const char *MSG_RAWBLOCK = "rawblock"; static const char *MSG_RAWTX = "rawtx"; +static const char *MSG_RAWWALLETTXMEMPOOL = "rawwallettx-mempool"; +static const char *MSG_RAWWALLETTXBLOCK = "rawwallettx-block"; static const char *MSG_SEQUENCE = "sequence"; // Internal function to send multipart message @@ -234,6 +238,23 @@ bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &t return SendZmqMessage(MSG_HASHTX, data, 32); } +bool CZMQPublishHashWalletTransactionNotifier::NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock){ + uint256 hash = transaction.GetHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish hashwallettx %s to %s\n", hash.GetHex(), this->address); + uint8_t data[32]; + for (unsigned int i = 0; i < 32; i++) + data[31 - i] = hash.begin()[i]; + + const char *command; + + if (!hashBlock.IsNull()) + command = MSG_HASHWALLETTXBLOCK; + else + command = MSG_HASHWALLETTXMEMPOOL; + + return SendZmqMessage(command, data, 32); +} + bool CZMQPublishRawBlockNotifier::NotifyBlock(const std::shared_ptr& block) { LogPrint(BCLog::ZMQ, "Publish rawblock %s to %s\n", block->GetHash().GetHex(), this->address); @@ -291,3 +312,19 @@ bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &t LogPrint(BCLog::ZMQ, "Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address); return SendSequenceMsg(*this, hash, /* Mempool (R)emoval */ 'R', mempool_sequence); } + +bool CZMQPublishRawWalletTransactionNotifier::NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock){ + uint256 hash = transaction.GetHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish rawwallettx %s to %s\n", hash.GetHex(), this->address); + CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags()); + ss << transaction; + + const char *command; + + if (!hashBlock.IsNull()) + command = MSG_RAWWALLETTXBLOCK; + else + command = MSG_RAWWALLETTXMEMPOOL; + + return SendZmqMessage(command, &(*ss.begin()), ss.size()); +} diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h index 25105c56dd..d75812ec12 100644 --- a/src/zmq/zmqpublishnotifier.h +++ b/src/zmq/zmqpublishnotifier.h @@ -46,6 +46,12 @@ public: bool NotifyTransaction(const CTransaction &transaction) override; }; +class CZMQPublishHashWalletTransactionNotifier : public CZMQAbstractPublishNotifier +{ +public: + bool NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock) override; +}; + class CZMQPublishRawBlockNotifier : public CZMQAbstractPublishNotifier { private: @@ -63,6 +69,12 @@ public: bool NotifyTransaction(const CTransaction &transaction) override; }; +class CZMQPublishRawWalletTransactionNotifier : public CZMQAbstractPublishNotifier +{ +public: + bool NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock) override; +}; + class CZMQPublishSequenceNotifier : public CZMQAbstractPublishNotifier { public: diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py index 1586a98087..9ed01d074d 100755 --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -3,6 +3,8 @@ # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test the ZMQ notification interface.""" + +import io import struct from time import sleep @@ -19,6 +21,7 @@ from test_framework.test_framework import BitcoinTestFramework from test_framework.messages import ( hash256, tx_from_hex, + CTransaction, ) from test_framework.util import ( assert_equal, @@ -49,10 +52,12 @@ class ZMQSubscriber: self.socket.setsockopt(zmq.SUBSCRIBE, self.topic) # Receive message from publisher and verify that topic and sequence match - def _receive_from_publisher_and_check(self): + def _receive_from_publisher_and_check(self, specific_topic = None): + expected_topic = specific_topic if specific_topic else self.topic + topic, body, seq = self.socket.recv_multipart() # Topic should match the subscriber topic. - assert_equal(topic, self.topic) + assert_equal(topic, expected_topic) # Sequence should be incremental. received_seq = struct.unpack('