diff --git a/doc/zmq.md b/doc/zmq.md
index 07c340fb99..22c05d9a7e 100644
--- a/doc/zmq.md
+++ b/doc/zmq.md
@@ -61,8 +61,10 @@ Currently, the following notifications are supported:
-zmqpubhashtx=address
-zmqpubhashblock=address
+ -zmqpubhashwallettx=address
-zmqpubrawblock=address
-zmqpubrawtx=address
+ -zmqpubrawwallettx=address
-zmqpubsequence=address
The socket type is PUB and the address must be a valid ZeroMQ socket
@@ -93,6 +95,15 @@ corresponds to the notification type. For instance, for the
notification `-zmqpubhashtx` the topic is `hashtx` (no null
terminator). These options can also be provided in bitcoin.conf.
+For wallet transaction notifications (both hash and tx), the
+topic also indicate if the transaction came from a block
+or mempool. If originated from mempool `-mempool` postfix
+will be added to the topic, for block `-block` postfix will
+be added. Because zeromq is using prefix matching for topics
+you can subscribe to `rawwallettx` (or `hashwallettx`) to get
+both notifications. If you only want one type of notification
+subscribe to either `rawwallettx-mempool` or `rawwallettx-block`.
+
The topics are:
`sequence`: the body is structured as the following based on the type of message:
diff --git a/src/Makefile.am b/src/Makefile.am
index 1ccb5332c4..711ebf8e3a 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -488,7 +488,7 @@ endif
# zmq #
if ENABLE_ZMQ
-libbitcoin_zmq_a_CPPFLAGS = $(AM_CPPFLAGS) $(BITCOIN_INCLUDES) $(ZMQ_CFLAGS)
+libbitcoin_zmq_a_CPPFLAGS = $(AM_CPPFLAGS) $(BITCOIN_INCLUDES) $(BOOST_CPPFLAGS) $(ZMQ_CFLAGS)
libbitcoin_zmq_a_CXXFLAGS = $(AM_CXXFLAGS) $(PIE_FLAGS)
libbitcoin_zmq_a_SOURCES = \
zmq/zmqabstractnotifier.cpp \
diff --git a/src/init.cpp b/src/init.cpp
index faaf3353d0..c68313390d 100644
--- a/src/init.cpp
+++ b/src/init.cpp
@@ -589,24 +589,32 @@ void SetupServerArgs(ArgsManager& argsman)
#ifdef ENABLE_ZMQ
argsman.AddArg("-zmqpubhashblock=
", "Enable publish hash block in ", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashtx=", "Enable publish hash transaction in ", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
+ argsman.AddArg("-zmqpubhashwallettx=", "Enable publish hash wallet transaction in ", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawblock=", "Enable publish raw block in ", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtx=", "Enable publish raw transaction in ", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
+ argsman.AddArg("-zmqpubrawwallettx=", "Enable publish raw wallet transaction in ", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubsequence=", "Enable publish hash block and tx sequence in ", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashblockhwm=", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashtxhwm=", strprintf("Set publish hash transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
+ argsman.AddArg("-zmqpubhashwallettxhwm=", strprintf("Set publish hash wallet transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawblockhwm=", strprintf("Set publish raw block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtxhwm=", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
+ argsman.AddArg("-zmqpubrawwallettxhwm=", strprintf("Set publish raw wallet transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubsequencehwm=", strprintf("Set publish hash sequence message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
#else
hidden_args.emplace_back("-zmqpubhashblock=");
hidden_args.emplace_back("-zmqpubhashtx=");
+ hidden_args.emplace_back("-zmqpubhashwallettx=");
hidden_args.emplace_back("-zmqpubrawblock=");
hidden_args.emplace_back("-zmqpubrawtx=");
+ hidden_args.emplace_back("-zmqpubrawwallettx=");
hidden_args.emplace_back("-zmqpubsequence=");
hidden_args.emplace_back("-zmqpubhashblockhwm=");
hidden_args.emplace_back("-zmqpubhashtxhwm=");
+ hidden_args.emplace_back("-zmqpubhashwallettxhwm=");
hidden_args.emplace_back("-zmqpubrawblockhwm=");
hidden_args.emplace_back("-zmqpubrawtxhwm=");
+ hidden_args.emplace_back("-zmqpubrawwallettxhwm=");
hidden_args.emplace_back("-zmqpubsequencehwm=");
#endif
diff --git a/src/wallet/wallet.cpp b/src/wallet/wallet.cpp
index 132e5e68b4..0a95117ab4 100644
--- a/src/wallet/wallet.cpp
+++ b/src/wallet/wallet.cpp
@@ -91,6 +91,11 @@ using util::ToString;
namespace wallet {
+/*
+ * Signal when transactions are added to wallet
+ */
+boost::signals2::signal CWallet::TransactionAddedToWallet;
+
bool AddWalletSetting(interfaces::Chain& chain, const std::string& wallet_name)
{
const auto update_function = [&wallet_name](common::SettingsValue& setting_value) {
@@ -1215,6 +1220,9 @@ CWalletTx* CWallet::AddToWallet(CTransactionRef tx, const TxState& state, const
// Notify UI of new or updated transaction
NotifyTransactionChanged(hash, fInsertedNew ? CT_NEW : CT_UPDATED);
+ // Notify listeners on new wallet transaction
+ CWallet::TransactionAddedToWallet(wtx.tx, TxStateSerializedBlockHash(wtx.m_state));
+
#if HAVE_SYSTEM
// notify an external script when a wallet transaction comes in or is updated
std::string strCmd = m_notify_tx_changed_script;
diff --git a/src/wallet/wallet.h b/src/wallet/wallet.h
index 26fe85f54e..136f88ca66 100644
--- a/src/wallet/wallet.h
+++ b/src/wallet/wallet.h
@@ -855,6 +855,9 @@ public:
*/
boost::signals2::signal NotifyTransactionChanged;
+ static boost::signals2::signal TransactionAddedToWallet;
+
/** Show progress e.g. for rescan */
boost::signals2::signal ShowProgress;
diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp
index 90aefb0018..920763067a 100644
--- a/src/zmq/zmqabstractnotifier.cpp
+++ b/src/zmq/zmqabstractnotifier.cpp
@@ -42,3 +42,7 @@ bool CZMQAbstractNotifier::NotifyTransactionRemoval(const CTransaction &/*transa
{
return true;
}
+
+bool CZMQAbstractNotifier::NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock){
+ return true;
+}
diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h
index 17fa7bbaa9..51ff88db1a 100644
--- a/src/zmq/zmqabstractnotifier.h
+++ b/src/zmq/zmqabstractnotifier.h
@@ -13,6 +13,7 @@
class CBlockIndex;
class CTransaction;
class CZMQAbstractNotifier;
+class uint256;
using CZMQNotifierFactory = std::function()>;
@@ -56,6 +57,7 @@ public:
virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence);
// Notifies of transactions added to mempool or appearing in blocks
virtual bool NotifyTransaction(const CTransaction &transaction);
+ virtual bool NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock);
protected:
void* psocket{nullptr};
diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp
index 44cbacda64..4a80e11139 100644
--- a/src/zmq/zmqnotificationinterface.cpp
+++ b/src/zmq/zmqnotificationinterface.cpp
@@ -2,6 +2,10 @@
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+#if defined(HAVE_CONFIG_H)
+#include
+#endif
+
#include
#include
@@ -24,6 +28,10 @@
#include
#include
+#ifdef ENABLE_WALLET
+#include
+#endif
+
CZMQNotificationInterface::CZMQNotificationInterface() = default;
CZMQNotificationInterface::~CZMQNotificationInterface()
@@ -45,10 +53,12 @@ std::unique_ptr CZMQNotificationInterface::Create(std
std::map factories;
factories["pubhashblock"] = CZMQAbstractNotifier::Create;
factories["pubhashtx"] = CZMQAbstractNotifier::Create;
+ factories["pubhashwallettx"] = CZMQAbstractNotifier::Create;
factories["pubrawblock"] = [&get_block_by_index]() -> std::unique_ptr {
return std::make_unique(get_block_by_index);
};
factories["pubrawtx"] = CZMQAbstractNotifier::Create;
+ factories["pubrawwallettx"] = CZMQAbstractNotifier::Create;
factories["pubsequence"] = CZMQAbstractNotifier::Create;
std::list> notifiers;
@@ -93,6 +103,10 @@ bool CZMQNotificationInterface::Initialize()
LogPrint(BCLog::ZMQ, "Initialize notification interface\n");
assert(!pcontext);
+#ifdef ENABLE_WALLET
+ m_wtx_added_connection = wallet::CWallet::TransactionAddedToWallet.connect(std::bind(&CZMQNotificationInterface::TransactionAddedToWallet, this, std::placeholders::_1, std::placeholders::_2));
+#endif
+
pcontext = zmq_ctx_new();
if (!pcontext)
@@ -117,6 +131,11 @@ bool CZMQNotificationInterface::Initialize()
void CZMQNotificationInterface::Shutdown()
{
LogPrint(BCLog::ZMQ, "Shutdown notification interface\n");
+
+#ifdef ENABLE_WALLET
+ m_wtx_added_connection.disconnect();
+#endif
+
if (pcontext)
{
for (auto& notifier : notifiers) {
@@ -209,4 +228,12 @@ void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptrNotifyWalletTransaction(tx, hashBlock);
+ });
+}
+
std::unique_ptr g_zmq_notification_interface;
diff --git a/src/zmq/zmqnotificationinterface.h b/src/zmq/zmqnotificationinterface.h
index c879fdd0dd..eb2ef19b65 100644
--- a/src/zmq/zmqnotificationinterface.h
+++ b/src/zmq/zmqnotificationinterface.h
@@ -14,6 +14,8 @@
#include
#include
+#include
+
class CBlock;
class CBlockIndex;
class CZMQAbstractNotifier;
@@ -32,6 +34,8 @@ protected:
bool Initialize();
void Shutdown();
+ void TransactionAddedToWallet(const CTransactionRef& tx, const uint256 &hashBlock);
+
// CValidationInterface
void TransactionAddedToMempool(const NewMempoolTransactionInfo& tx, uint64_t mempool_sequence) override;
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override;
@@ -44,6 +48,7 @@ private:
void* pcontext{nullptr};
std::list> notifiers;
+ boost::signals2::connection m_wtx_added_connection;
};
extern std::unique_ptr g_zmq_notification_interface;
diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp
index 608870c489..fa1f75fdbc 100644
--- a/src/zmq/zmqpublishnotifier.cpp
+++ b/src/zmq/zmqpublishnotifier.cpp
@@ -42,8 +42,12 @@ static std::multimap mapPublishNotifi
static const char *MSG_HASHBLOCK = "hashblock";
static const char *MSG_HASHTX = "hashtx";
+static const char *MSG_HASHWALLETTXMEMPOOL = "hashwallettx-mempool";
+static const char *MSG_HASHWALLETTXBLOCK = "hashwallettx-block";
static const char *MSG_RAWBLOCK = "rawblock";
static const char *MSG_RAWTX = "rawtx";
+static const char *MSG_RAWWALLETTXMEMPOOL = "rawwallettx-mempool";
+static const char *MSG_RAWWALLETTXBLOCK = "rawwallettx-block";
static const char *MSG_SEQUENCE = "sequence";
// Internal function to send multipart message
@@ -239,6 +243,23 @@ bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &t
return SendZmqMessage(MSG_HASHTX, data, 32);
}
+bool CZMQPublishHashWalletTransactionNotifier::NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock){
+ uint256 hash = transaction.GetHash();
+ LogPrint(BCLog::ZMQ, "zmq: Publish hashwallettx %s to %s\n", hash.GetHex(), this->address);
+ char data[32];
+ for (unsigned int i = 0; i < 32; i++)
+ data[31 - i] = hash.begin()[i];
+
+ const char *command;
+
+ if (!hashBlock.IsNull())
+ command = MSG_HASHWALLETTXBLOCK;
+ else
+ command = MSG_HASHWALLETTXMEMPOOL;
+
+ return SendZmqMessage(command, data, 32);
+}
+
bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
{
LogPrint(BCLog::ZMQ, "Publish rawblock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address);
@@ -301,3 +322,19 @@ bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &t
LogPrint(BCLog::ZMQ, "Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address);
return SendSequenceMsg(*this, hash, /* Mempool (R)emoval */ 'R', mempool_sequence);
}
+
+bool CZMQPublishRawWalletTransactionNotifier::NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock){
+ uint256 hash = transaction.GetHash();
+ LogPrint(BCLog::ZMQ, "zmq: Publish rawwallettx %s to %s\n", hash.GetHex(), this->address);
+ DataStream ss;
+ ss << TX_WITH_WITNESS(transaction);
+
+ const char *command;
+
+ if (!hashBlock.IsNull())
+ command = MSG_RAWWALLETTXBLOCK;
+ else
+ command = MSG_RAWWALLETTXMEMPOOL;
+
+ return SendZmqMessage(command, &(*ss.begin()), ss.size());
+}
diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h
index cc941a899c..6dad6eb904 100644
--- a/src/zmq/zmqpublishnotifier.h
+++ b/src/zmq/zmqpublishnotifier.h
@@ -46,6 +46,12 @@ public:
bool NotifyTransaction(const CTransaction &transaction) override;
};
+class CZMQPublishHashWalletTransactionNotifier : public CZMQAbstractPublishNotifier
+{
+public:
+ bool NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock) override;
+};
+
class CZMQPublishRawBlockNotifier : public CZMQAbstractPublishNotifier
{
private:
@@ -63,6 +69,12 @@ public:
bool NotifyTransaction(const CTransaction &transaction) override;
};
+class CZMQPublishRawWalletTransactionNotifier : public CZMQAbstractPublishNotifier
+{
+public:
+ bool NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock) override;
+};
+
class CZMQPublishSequenceNotifier : public CZMQAbstractPublishNotifier
{
public:
diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py
index b960f40ccc..9c5820788f 100755
--- a/test/functional/interface_zmq.py
+++ b/test/functional/interface_zmq.py
@@ -3,6 +3,8 @@
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Test the ZMQ notification interface."""
+
+import io
import os
import struct
import tempfile
@@ -23,6 +25,7 @@ from test_framework.messages import (
CBlock,
hash256,
tx_from_hex,
+ CTransaction,
)
from test_framework.util import (
assert_equal,
@@ -53,10 +56,12 @@ class ZMQSubscriber:
self.socket.setsockopt(zmq.SUBSCRIBE, self.topic)
# Receive message from publisher and verify that topic and sequence match
- def _receive_from_publisher_and_check(self):
+ def _receive_from_publisher_and_check(self, specific_topic = None):
+ expected_topic = specific_topic if specific_topic else self.topic
+
topic, body, seq = self.socket.recv_multipart()
# Topic should match the subscriber topic.
- assert_equal(topic, self.topic)
+ assert_equal(topic, expected_topic)
# Sequence should be incremental.
received_seq = struct.unpack('