From 7b41419e8def2d9603d46f9e17434b953fd7bf4c Mon Sep 17 00:00:00 2001 From: Doron Somech Date: Thu, 8 Jun 2017 13:14:31 +0300 Subject: [PATCH] ZMQ: add publishers of wallet tx Topic will indicate if transaction came from mempool or block so developers can handle the transaction accordingly without a RPC round trip to bitcoind. (includes ZMQ: Making CWallet::TransactionAddedToWallet static) --- doc/zmq.md | 11 ++++ src/Makefile.am | 2 +- src/init.cpp | 8 +++ src/wallet/wallet.cpp | 8 +++ src/wallet/wallet.h | 3 ++ src/zmq/zmqabstractnotifier.cpp | 4 ++ src/zmq/zmqabstractnotifier.h | 2 + src/zmq/zmqnotificationinterface.cpp | 27 ++++++++++ src/zmq/zmqnotificationinterface.h | 5 ++ src/zmq/zmqpublishnotifier.cpp | 37 ++++++++++++++ src/zmq/zmqpublishnotifier.h | 12 +++++ test/functional/interface_zmq.py | 75 ++++++++++++++++++++++++---- 12 files changed, 183 insertions(+), 11 deletions(-) diff --git a/doc/zmq.md b/doc/zmq.md index 07c340fb99..22c05d9a7e 100644 --- a/doc/zmq.md +++ b/doc/zmq.md @@ -61,8 +61,10 @@ Currently, the following notifications are supported: -zmqpubhashtx=address -zmqpubhashblock=address + -zmqpubhashwallettx=address -zmqpubrawblock=address -zmqpubrawtx=address + -zmqpubrawwallettx=address -zmqpubsequence=address The socket type is PUB and the address must be a valid ZeroMQ socket @@ -93,6 +95,15 @@ corresponds to the notification type. For instance, for the notification `-zmqpubhashtx` the topic is `hashtx` (no null terminator). These options can also be provided in bitcoin.conf. +For wallet transaction notifications (both hash and tx), the +topic also indicate if the transaction came from a block +or mempool. If originated from mempool `-mempool` postfix +will be added to the topic, for block `-block` postfix will +be added. Because zeromq is using prefix matching for topics +you can subscribe to `rawwallettx` (or `hashwallettx`) to get +both notifications. If you only want one type of notification +subscribe to either `rawwallettx-mempool` or `rawwallettx-block`. + The topics are: `sequence`: the body is structured as the following based on the type of message: diff --git a/src/Makefile.am b/src/Makefile.am index 1ccb5332c4..711ebf8e3a 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -488,7 +488,7 @@ endif # zmq # if ENABLE_ZMQ -libbitcoin_zmq_a_CPPFLAGS = $(AM_CPPFLAGS) $(BITCOIN_INCLUDES) $(ZMQ_CFLAGS) +libbitcoin_zmq_a_CPPFLAGS = $(AM_CPPFLAGS) $(BITCOIN_INCLUDES) $(BOOST_CPPFLAGS) $(ZMQ_CFLAGS) libbitcoin_zmq_a_CXXFLAGS = $(AM_CXXFLAGS) $(PIE_FLAGS) libbitcoin_zmq_a_SOURCES = \ zmq/zmqabstractnotifier.cpp \ diff --git a/src/init.cpp b/src/init.cpp index faaf3353d0..c68313390d 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -589,24 +589,32 @@ void SetupServerArgs(ArgsManager& argsman) #ifdef ENABLE_ZMQ argsman.AddArg("-zmqpubhashblock=
", "Enable publish hash block in
", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubhashtx=
", "Enable publish hash transaction in
", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); + argsman.AddArg("-zmqpubhashwallettx=
", "Enable publish hash wallet transaction in
", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawblock=
", "Enable publish raw block in
", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawtx=
", "Enable publish raw transaction in
", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); + argsman.AddArg("-zmqpubrawwallettx=
", "Enable publish raw wallet transaction in
", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubsequence=
", "Enable publish hash block and tx sequence in
", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubhashblockhwm=", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubhashtxhwm=", strprintf("Set publish hash transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); + argsman.AddArg("-zmqpubhashwallettxhwm=", strprintf("Set publish hash wallet transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawblockhwm=", strprintf("Set publish raw block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawtxhwm=", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); + argsman.AddArg("-zmqpubrawwallettxhwm=", strprintf("Set publish raw wallet transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubsequencehwm=", strprintf("Set publish hash sequence message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); #else hidden_args.emplace_back("-zmqpubhashblock=
"); hidden_args.emplace_back("-zmqpubhashtx=
"); + hidden_args.emplace_back("-zmqpubhashwallettx=
"); hidden_args.emplace_back("-zmqpubrawblock=
"); hidden_args.emplace_back("-zmqpubrawtx=
"); + hidden_args.emplace_back("-zmqpubrawwallettx=
"); hidden_args.emplace_back("-zmqpubsequence="); hidden_args.emplace_back("-zmqpubhashblockhwm="); hidden_args.emplace_back("-zmqpubhashtxhwm="); + hidden_args.emplace_back("-zmqpubhashwallettxhwm="); hidden_args.emplace_back("-zmqpubrawblockhwm="); hidden_args.emplace_back("-zmqpubrawtxhwm="); + hidden_args.emplace_back("-zmqpubrawwallettxhwm="); hidden_args.emplace_back("-zmqpubsequencehwm="); #endif diff --git a/src/wallet/wallet.cpp b/src/wallet/wallet.cpp index 132e5e68b4..0a95117ab4 100644 --- a/src/wallet/wallet.cpp +++ b/src/wallet/wallet.cpp @@ -91,6 +91,11 @@ using util::ToString; namespace wallet { +/* + * Signal when transactions are added to wallet + */ +boost::signals2::signal CWallet::TransactionAddedToWallet; + bool AddWalletSetting(interfaces::Chain& chain, const std::string& wallet_name) { const auto update_function = [&wallet_name](common::SettingsValue& setting_value) { @@ -1215,6 +1220,9 @@ CWalletTx* CWallet::AddToWallet(CTransactionRef tx, const TxState& state, const // Notify UI of new or updated transaction NotifyTransactionChanged(hash, fInsertedNew ? CT_NEW : CT_UPDATED); + // Notify listeners on new wallet transaction + CWallet::TransactionAddedToWallet(wtx.tx, TxStateSerializedBlockHash(wtx.m_state)); + #if HAVE_SYSTEM // notify an external script when a wallet transaction comes in or is updated std::string strCmd = m_notify_tx_changed_script; diff --git a/src/wallet/wallet.h b/src/wallet/wallet.h index 26fe85f54e..136f88ca66 100644 --- a/src/wallet/wallet.h +++ b/src/wallet/wallet.h @@ -855,6 +855,9 @@ public: */ boost::signals2::signal NotifyTransactionChanged; + static boost::signals2::signal TransactionAddedToWallet; + /** Show progress e.g. for rescan */ boost::signals2::signal ShowProgress; diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp index 90aefb0018..920763067a 100644 --- a/src/zmq/zmqabstractnotifier.cpp +++ b/src/zmq/zmqabstractnotifier.cpp @@ -42,3 +42,7 @@ bool CZMQAbstractNotifier::NotifyTransactionRemoval(const CTransaction &/*transa { return true; } + +bool CZMQAbstractNotifier::NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock){ + return true; +} diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h index 17fa7bbaa9..51ff88db1a 100644 --- a/src/zmq/zmqabstractnotifier.h +++ b/src/zmq/zmqabstractnotifier.h @@ -13,6 +13,7 @@ class CBlockIndex; class CTransaction; class CZMQAbstractNotifier; +class uint256; using CZMQNotifierFactory = std::function()>; @@ -56,6 +57,7 @@ public: virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence); // Notifies of transactions added to mempool or appearing in blocks virtual bool NotifyTransaction(const CTransaction &transaction); + virtual bool NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock); protected: void* psocket{nullptr}; diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp index 44cbacda64..4a80e11139 100644 --- a/src/zmq/zmqnotificationinterface.cpp +++ b/src/zmq/zmqnotificationinterface.cpp @@ -2,6 +2,10 @@ // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. +#if defined(HAVE_CONFIG_H) +#include +#endif + #include #include @@ -24,6 +28,10 @@ #include #include +#ifdef ENABLE_WALLET +#include +#endif + CZMQNotificationInterface::CZMQNotificationInterface() = default; CZMQNotificationInterface::~CZMQNotificationInterface() @@ -45,10 +53,12 @@ std::unique_ptr CZMQNotificationInterface::Create(std std::map factories; factories["pubhashblock"] = CZMQAbstractNotifier::Create; factories["pubhashtx"] = CZMQAbstractNotifier::Create; + factories["pubhashwallettx"] = CZMQAbstractNotifier::Create; factories["pubrawblock"] = [&get_block_by_index]() -> std::unique_ptr { return std::make_unique(get_block_by_index); }; factories["pubrawtx"] = CZMQAbstractNotifier::Create; + factories["pubrawwallettx"] = CZMQAbstractNotifier::Create; factories["pubsequence"] = CZMQAbstractNotifier::Create; std::list> notifiers; @@ -93,6 +103,10 @@ bool CZMQNotificationInterface::Initialize() LogPrint(BCLog::ZMQ, "Initialize notification interface\n"); assert(!pcontext); +#ifdef ENABLE_WALLET + m_wtx_added_connection = wallet::CWallet::TransactionAddedToWallet.connect(std::bind(&CZMQNotificationInterface::TransactionAddedToWallet, this, std::placeholders::_1, std::placeholders::_2)); +#endif + pcontext = zmq_ctx_new(); if (!pcontext) @@ -117,6 +131,11 @@ bool CZMQNotificationInterface::Initialize() void CZMQNotificationInterface::Shutdown() { LogPrint(BCLog::ZMQ, "Shutdown notification interface\n"); + +#ifdef ENABLE_WALLET + m_wtx_added_connection.disconnect(); +#endif + if (pcontext) { for (auto& notifier : notifiers) { @@ -209,4 +228,12 @@ void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptrNotifyWalletTransaction(tx, hashBlock); + }); +} + std::unique_ptr g_zmq_notification_interface; diff --git a/src/zmq/zmqnotificationinterface.h b/src/zmq/zmqnotificationinterface.h index c879fdd0dd..eb2ef19b65 100644 --- a/src/zmq/zmqnotificationinterface.h +++ b/src/zmq/zmqnotificationinterface.h @@ -14,6 +14,8 @@ #include #include +#include + class CBlock; class CBlockIndex; class CZMQAbstractNotifier; @@ -32,6 +34,8 @@ protected: bool Initialize(); void Shutdown(); + void TransactionAddedToWallet(const CTransactionRef& tx, const uint256 &hashBlock); + // CValidationInterface void TransactionAddedToMempool(const NewMempoolTransactionInfo& tx, uint64_t mempool_sequence) override; void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override; @@ -44,6 +48,7 @@ private: void* pcontext{nullptr}; std::list> notifiers; + boost::signals2::connection m_wtx_added_connection; }; extern std::unique_ptr g_zmq_notification_interface; diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp index 608870c489..fa1f75fdbc 100644 --- a/src/zmq/zmqpublishnotifier.cpp +++ b/src/zmq/zmqpublishnotifier.cpp @@ -42,8 +42,12 @@ static std::multimap mapPublishNotifi static const char *MSG_HASHBLOCK = "hashblock"; static const char *MSG_HASHTX = "hashtx"; +static const char *MSG_HASHWALLETTXMEMPOOL = "hashwallettx-mempool"; +static const char *MSG_HASHWALLETTXBLOCK = "hashwallettx-block"; static const char *MSG_RAWBLOCK = "rawblock"; static const char *MSG_RAWTX = "rawtx"; +static const char *MSG_RAWWALLETTXMEMPOOL = "rawwallettx-mempool"; +static const char *MSG_RAWWALLETTXBLOCK = "rawwallettx-block"; static const char *MSG_SEQUENCE = "sequence"; // Internal function to send multipart message @@ -239,6 +243,23 @@ bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &t return SendZmqMessage(MSG_HASHTX, data, 32); } +bool CZMQPublishHashWalletTransactionNotifier::NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock){ + uint256 hash = transaction.GetHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish hashwallettx %s to %s\n", hash.GetHex(), this->address); + char data[32]; + for (unsigned int i = 0; i < 32; i++) + data[31 - i] = hash.begin()[i]; + + const char *command; + + if (!hashBlock.IsNull()) + command = MSG_HASHWALLETTXBLOCK; + else + command = MSG_HASHWALLETTXMEMPOOL; + + return SendZmqMessage(command, data, 32); +} + bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) { LogPrint(BCLog::ZMQ, "Publish rawblock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address); @@ -301,3 +322,19 @@ bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &t LogPrint(BCLog::ZMQ, "Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address); return SendSequenceMsg(*this, hash, /* Mempool (R)emoval */ 'R', mempool_sequence); } + +bool CZMQPublishRawWalletTransactionNotifier::NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock){ + uint256 hash = transaction.GetHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish rawwallettx %s to %s\n", hash.GetHex(), this->address); + DataStream ss; + ss << TX_WITH_WITNESS(transaction); + + const char *command; + + if (!hashBlock.IsNull()) + command = MSG_RAWWALLETTXBLOCK; + else + command = MSG_RAWWALLETTXMEMPOOL; + + return SendZmqMessage(command, &(*ss.begin()), ss.size()); +} diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h index cc941a899c..6dad6eb904 100644 --- a/src/zmq/zmqpublishnotifier.h +++ b/src/zmq/zmqpublishnotifier.h @@ -46,6 +46,12 @@ public: bool NotifyTransaction(const CTransaction &transaction) override; }; +class CZMQPublishHashWalletTransactionNotifier : public CZMQAbstractPublishNotifier +{ +public: + bool NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock) override; +}; + class CZMQPublishRawBlockNotifier : public CZMQAbstractPublishNotifier { private: @@ -63,6 +69,12 @@ public: bool NotifyTransaction(const CTransaction &transaction) override; }; +class CZMQPublishRawWalletTransactionNotifier : public CZMQAbstractPublishNotifier +{ +public: + bool NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock) override; +}; + class CZMQPublishSequenceNotifier : public CZMQAbstractPublishNotifier { public: diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py index b960f40ccc..9c5820788f 100755 --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -3,6 +3,8 @@ # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test the ZMQ notification interface.""" + +import io import os import struct import tempfile @@ -23,6 +25,7 @@ from test_framework.messages import ( CBlock, hash256, tx_from_hex, + CTransaction, ) from test_framework.util import ( assert_equal, @@ -53,10 +56,12 @@ class ZMQSubscriber: self.socket.setsockopt(zmq.SUBSCRIBE, self.topic) # Receive message from publisher and verify that topic and sequence match - def _receive_from_publisher_and_check(self): + def _receive_from_publisher_and_check(self, specific_topic = None): + expected_topic = specific_topic if specific_topic else self.topic + topic, body, seq = self.socket.recv_multipart() # Topic should match the subscriber topic. - assert_equal(topic, self.topic) + assert_equal(topic, expected_topic) # Sequence should be incremental. received_seq = struct.unpack('