mirror of
https://github.com/Retropex/bitcoin.git
synced 2025-05-12 19:20:42 +02:00
Merge 10554 via zmq_wtx-28+knots
This commit is contained in:
commit
560ef5631a
21
doc/zmq.md
21
doc/zmq.md
@ -61,8 +61,10 @@ Currently, the following notifications are supported:
|
||||
|
||||
-zmqpubhashtx=address
|
||||
-zmqpubhashblock=address
|
||||
-zmqpubhashwallettx=address
|
||||
-zmqpubrawblock=address
|
||||
-zmqpubrawtx=address
|
||||
-zmqpubrawwallettx=address
|
||||
-zmqpubsequence=address
|
||||
|
||||
The socket type is PUB and the address must be a valid ZeroMQ socket
|
||||
@ -74,8 +76,10 @@ The option to set the PUB socket's outbound message high water mark
|
||||
|
||||
-zmqpubhashtxhwm=n
|
||||
-zmqpubhashblockhwm=n
|
||||
-zmqpubhashwallettxhwm=n
|
||||
-zmqpubrawblockhwm=n
|
||||
-zmqpubrawtxhwm=n
|
||||
-zmqpubrawwallettxhwm=n
|
||||
-zmqpubsequencehwm=n
|
||||
|
||||
The high water mark value must be an integer greater than or equal to 0.
|
||||
@ -93,6 +97,14 @@ corresponds to the notification type. For instance, for the
|
||||
notification `-zmqpubhashtx` the topic is `hashtx` (no null
|
||||
terminator). These options can also be provided in bitcoin.conf.
|
||||
|
||||
For wallet transaction notifications (both hashwallettx and rawwallettx), the
|
||||
topic also indicates if the transaction came from a block or mempool. If
|
||||
originated from mempool `-mempool` postfix will be added to the topic, for
|
||||
block `-block` postfix will be added. Because zeromq is using prefix matching
|
||||
for topics you can subscribe to `rawwallettx` (or `hashwallettx`) to get both
|
||||
notifications. If you only want one type of notification subscribe to either
|
||||
`rawwallettx-mempool` or `rawwallettx-block`.
|
||||
|
||||
The topics are:
|
||||
|
||||
`sequence`: the body is structured as the following based on the type of message:
|
||||
@ -121,6 +133,15 @@ Where the 8-byte uints correspond to the mempool sequence number.
|
||||
|
||||
| hashblock | <32-byte block hash in Little Endian> | <uint32 sequence number in Little Endian>
|
||||
|
||||
|
||||
`rawwallettx`: Identical to `rawtx`, except only when transactions are added (or updated) in an open wallet. Full topic is either `rawwallettx-block` for transactions in a block, or `rawwallettx-mempool` otherwise.
|
||||
|
||||
| rawwallettx-<"block" or "mempool"> | <serialized transaction> | <uint32 sequence number in Little Endian>
|
||||
|
||||
`hashwallettx`: Identical to `hashtx`, except only when transactions are added (or updated) in an open wallet. Full topic is either `hashwallettx-block` for transactions in a block, or `hashwallettx-mempool` otherwise.
|
||||
|
||||
| hashwallettx-<"block" or "mempool"> | <32-byte transaction hash in Little Endian> | <uint32 sequence number in Little Endian>
|
||||
|
||||
**_NOTE:_** Note that the 32-byte hashes are in Little Endian and not in the Big Endian format that the RPC interface and block explorers use to display transaction and block hashes.
|
||||
|
||||
ZeroMQ endpoint specifiers for TCP (and others) are documented in the
|
||||
|
@ -498,7 +498,7 @@ endif
|
||||
|
||||
# zmq #
|
||||
if ENABLE_ZMQ
|
||||
libbitcoin_zmq_a_CPPFLAGS = $(AM_CPPFLAGS) $(BITCOIN_INCLUDES) $(ZMQ_CFLAGS)
|
||||
libbitcoin_zmq_a_CPPFLAGS = $(AM_CPPFLAGS) $(BITCOIN_INCLUDES) $(BOOST_CPPFLAGS) $(ZMQ_CFLAGS)
|
||||
libbitcoin_zmq_a_CXXFLAGS = $(AM_CXXFLAGS) $(PIE_FLAGS)
|
||||
libbitcoin_zmq_a_SOURCES = \
|
||||
zmq/zmqabstractnotifier.cpp \
|
||||
|
10
src/init.cpp
10
src/init.cpp
@ -602,24 +602,32 @@ void SetupServerArgs(ArgsManager& argsman)
|
||||
#ifdef ENABLE_ZMQ
|
||||
argsman.AddArg("-zmqpubhashblock=<address>", "Enable publish hash block in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
|
||||
argsman.AddArg("-zmqpubhashtx=<address>", "Enable publish hash transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
|
||||
argsman.AddArg("-zmqpubhashwallettx=<address>", "Enable publish hash wallet transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
|
||||
argsman.AddArg("-zmqpubrawblock=<address>", "Enable publish raw block in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
|
||||
argsman.AddArg("-zmqpubrawtx=<address>", "Enable publish raw transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
|
||||
argsman.AddArg("-zmqpubrawwallettx=<address>", "Enable publish raw wallet transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
|
||||
argsman.AddArg("-zmqpubsequence=<address>", "Enable publish hash block and tx sequence in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
|
||||
argsman.AddArg("-zmqpubhashblockhwm=<n>", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
|
||||
argsman.AddArg("-zmqpubhashtxhwm=<n>", strprintf("Set publish hash transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
|
||||
argsman.AddArg("-zmqpubhashwallettxhwm=<n>", strprintf("Set publish hash wallet transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
|
||||
argsman.AddArg("-zmqpubrawblockhwm=<n>", strprintf("Set publish raw block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
|
||||
argsman.AddArg("-zmqpubrawtxhwm=<n>", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
|
||||
argsman.AddArg("-zmqpubrawwallettxhwm=<n>", strprintf("Set publish raw wallet transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
|
||||
argsman.AddArg("-zmqpubsequencehwm=<n>", strprintf("Set publish hash sequence message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
|
||||
#else
|
||||
hidden_args.emplace_back("-zmqpubhashblock=<address>");
|
||||
hidden_args.emplace_back("-zmqpubhashtx=<address>");
|
||||
hidden_args.emplace_back("-zmqpubhashwallettx=<address>");
|
||||
hidden_args.emplace_back("-zmqpubrawblock=<address>");
|
||||
hidden_args.emplace_back("-zmqpubrawtx=<address>");
|
||||
hidden_args.emplace_back("-zmqpubrawwallettx=<address>");
|
||||
hidden_args.emplace_back("-zmqpubsequence=<n>");
|
||||
hidden_args.emplace_back("-zmqpubhashblockhwm=<n>");
|
||||
hidden_args.emplace_back("-zmqpubhashtxhwm=<n>");
|
||||
hidden_args.emplace_back("-zmqpubhashwallettxhwm=<n>");
|
||||
hidden_args.emplace_back("-zmqpubrawblockhwm=<n>");
|
||||
hidden_args.emplace_back("-zmqpubrawtxhwm=<n>");
|
||||
hidden_args.emplace_back("-zmqpubrawwallettxhwm=<n>");
|
||||
hidden_args.emplace_back("-zmqpubsequencehwm=<n>");
|
||||
#endif
|
||||
|
||||
@ -1354,6 +1362,8 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
|
||||
{"-zmqpubrawblock", true},
|
||||
{"-zmqpubrawtx", true},
|
||||
{"-zmqpubsequence", true},
|
||||
{"-zmqpubhashwallettx", true},
|
||||
{"-zmqpubrawwallettx", true},
|
||||
}) {
|
||||
for (const std::string& socket_addr : args.GetArgs(arg)) {
|
||||
std::string host_out;
|
||||
|
@ -91,6 +91,11 @@ using util::ToString;
|
||||
|
||||
namespace wallet {
|
||||
|
||||
/*
|
||||
* Signal when transactions are added to wallet
|
||||
*/
|
||||
boost::signals2::signal<void (const CTransactionRef &ptxn, const uint256 &blockHash)> CWallet::TransactionAddedToWallet;
|
||||
|
||||
bool AddWalletSetting(interfaces::Chain& chain, const std::string& wallet_name)
|
||||
{
|
||||
const auto update_function = [&wallet_name](common::SettingsValue& setting_value) {
|
||||
@ -1221,6 +1226,9 @@ CWalletTx* CWallet::AddToWallet(CTransactionRef tx, const TxState& state, const
|
||||
// Notify UI of new or updated transaction
|
||||
NotifyTransactionChanged(hash, fInsertedNew ? CT_NEW : CT_UPDATED);
|
||||
|
||||
// Notify listeners on new wallet transaction
|
||||
CWallet::TransactionAddedToWallet(wtx.tx, TxStateSerializedBlockHash(wtx.m_state));
|
||||
|
||||
#if HAVE_SYSTEM
|
||||
// notify an external script when a wallet transaction comes in or is updated
|
||||
if (!m_notify_tx_changed_scripts.empty()) {
|
||||
|
@ -871,6 +871,9 @@ public:
|
||||
*/
|
||||
boost::signals2::signal<void(const uint256& hashTx, ChangeType status)> NotifyTransactionChanged;
|
||||
|
||||
static boost::signals2::signal<void (const CTransactionRef &ptxn,
|
||||
const uint256 &blockHash)> TransactionAddedToWallet;
|
||||
|
||||
/** Show progress e.g. for rescan */
|
||||
boost::signals2::signal<void (const std::string &title, int nProgress)> ShowProgress;
|
||||
|
||||
|
@ -42,3 +42,7 @@ bool CZMQAbstractNotifier::NotifyTransactionRemoval(const CTransaction &/*transa
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
bool CZMQAbstractNotifier::NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock){
|
||||
return true;
|
||||
}
|
||||
|
@ -13,6 +13,7 @@
|
||||
class CBlockIndex;
|
||||
class CTransaction;
|
||||
class CZMQAbstractNotifier;
|
||||
class uint256;
|
||||
|
||||
using CZMQNotifierFactory = std::function<std::unique_ptr<CZMQAbstractNotifier>()>;
|
||||
|
||||
@ -56,6 +57,7 @@ public:
|
||||
virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence);
|
||||
// Notifies of transactions added to mempool or appearing in blocks
|
||||
virtual bool NotifyTransaction(const CTransaction &transaction);
|
||||
virtual bool NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock);
|
||||
|
||||
protected:
|
||||
void* psocket{nullptr};
|
||||
|
@ -2,6 +2,10 @@
|
||||
// Distributed under the MIT software license, see the accompanying
|
||||
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
|
||||
#if defined(HAVE_CONFIG_H)
|
||||
#include <config/bitcoin-config.h>
|
||||
#endif
|
||||
|
||||
#include <zmq/zmqnotificationinterface.h>
|
||||
|
||||
#include <common/args.h>
|
||||
@ -24,6 +28,10 @@
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#ifdef ENABLE_WALLET
|
||||
#include <wallet/wallet.h>
|
||||
#endif
|
||||
|
||||
CZMQNotificationInterface::CZMQNotificationInterface() = default;
|
||||
|
||||
CZMQNotificationInterface::~CZMQNotificationInterface()
|
||||
@ -45,10 +53,12 @@ std::unique_ptr<CZMQNotificationInterface> CZMQNotificationInterface::Create(std
|
||||
std::map<std::string, CZMQNotifierFactory> factories;
|
||||
factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
|
||||
factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
|
||||
factories["pubhashwallettx"] = CZMQAbstractNotifier::Create<CZMQPublishHashWalletTransactionNotifier>;
|
||||
factories["pubrawblock"] = [&get_block_by_index]() -> std::unique_ptr<CZMQAbstractNotifier> {
|
||||
return std::make_unique<CZMQPublishRawBlockNotifier>(get_block_by_index);
|
||||
};
|
||||
factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
|
||||
factories["pubrawwallettx"] = CZMQAbstractNotifier::Create<CZMQPublishRawWalletTransactionNotifier>;
|
||||
factories["pubsequence"] = CZMQAbstractNotifier::Create<CZMQPublishSequenceNotifier>;
|
||||
|
||||
std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
|
||||
@ -93,6 +103,10 @@ bool CZMQNotificationInterface::Initialize()
|
||||
LogPrint(BCLog::ZMQ, "Initialize notification interface\n");
|
||||
assert(!pcontext);
|
||||
|
||||
#ifdef ENABLE_WALLET
|
||||
m_wtx_added_connection = wallet::CWallet::TransactionAddedToWallet.connect(std::bind(&CZMQNotificationInterface::TransactionAddedToWallet, this, std::placeholders::_1, std::placeholders::_2));
|
||||
#endif
|
||||
|
||||
pcontext = zmq_ctx_new();
|
||||
|
||||
if (!pcontext)
|
||||
@ -117,6 +131,11 @@ bool CZMQNotificationInterface::Initialize()
|
||||
void CZMQNotificationInterface::Shutdown()
|
||||
{
|
||||
LogPrint(BCLog::ZMQ, "Shutdown notification interface\n");
|
||||
|
||||
#ifdef ENABLE_WALLET
|
||||
m_wtx_added_connection.disconnect();
|
||||
#endif
|
||||
|
||||
if (pcontext)
|
||||
{
|
||||
for (auto& notifier : notifiers) {
|
||||
@ -204,4 +223,12 @@ void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr<const CB
|
||||
});
|
||||
}
|
||||
|
||||
void CZMQNotificationInterface::TransactionAddedToWallet(const CTransactionRef& ptx, const uint256 &hashBlock) {
|
||||
const CTransaction& tx = *ptx;
|
||||
|
||||
TryForEachAndRemoveFailed(notifiers, [&tx, &hashBlock](CZMQAbstractNotifier* notifier) {
|
||||
return notifier->NotifyWalletTransaction(tx, hashBlock);
|
||||
});
|
||||
}
|
||||
|
||||
std::unique_ptr<CZMQNotificationInterface> g_zmq_notification_interface;
|
||||
|
@ -14,6 +14,8 @@
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
|
||||
#include <boost/signals2/connection.hpp>
|
||||
|
||||
class CBlock;
|
||||
class CBlockIndex;
|
||||
class CZMQAbstractNotifier;
|
||||
@ -32,6 +34,8 @@ protected:
|
||||
bool Initialize();
|
||||
void Shutdown();
|
||||
|
||||
void TransactionAddedToWallet(const CTransactionRef& tx, const uint256 &hashBlock);
|
||||
|
||||
// CValidationInterface
|
||||
void TransactionAddedToMempool(const NewMempoolTransactionInfo& tx, uint64_t mempool_sequence) override;
|
||||
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override;
|
||||
@ -44,6 +48,7 @@ private:
|
||||
|
||||
void* pcontext{nullptr};
|
||||
std::list<std::unique_ptr<CZMQAbstractNotifier>> notifiers;
|
||||
boost::signals2::connection m_wtx_added_connection;
|
||||
};
|
||||
|
||||
extern std::unique_ptr<CZMQNotificationInterface> g_zmq_notification_interface;
|
||||
|
@ -42,8 +42,12 @@ static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifi
|
||||
|
||||
static const char *MSG_HASHBLOCK = "hashblock";
|
||||
static const char *MSG_HASHTX = "hashtx";
|
||||
static const char *MSG_HASHWALLETTXMEMPOOL = "hashwallettx-mempool";
|
||||
static const char *MSG_HASHWALLETTXBLOCK = "hashwallettx-block";
|
||||
static const char *MSG_RAWBLOCK = "rawblock";
|
||||
static const char *MSG_RAWTX = "rawtx";
|
||||
static const char *MSG_RAWWALLETTXMEMPOOL = "rawwallettx-mempool";
|
||||
static const char *MSG_RAWWALLETTXBLOCK = "rawwallettx-block";
|
||||
static const char *MSG_SEQUENCE = "sequence";
|
||||
|
||||
// Internal function to send multipart message
|
||||
@ -239,6 +243,23 @@ bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &t
|
||||
return SendZmqMessage(MSG_HASHTX, data, 32);
|
||||
}
|
||||
|
||||
bool CZMQPublishHashWalletTransactionNotifier::NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock){
|
||||
uint256 hash = transaction.GetHash();
|
||||
LogPrint(BCLog::ZMQ, "Publish hashwallettx %s to %s\n", hash.GetHex(), this->address);
|
||||
uint8_t data[32];
|
||||
for (unsigned int i = 0; i < 32; i++)
|
||||
data[31 - i] = hash.begin()[i];
|
||||
|
||||
const char *command;
|
||||
|
||||
if (!hashBlock.IsNull())
|
||||
command = MSG_HASHWALLETTXBLOCK;
|
||||
else
|
||||
command = MSG_HASHWALLETTXMEMPOOL;
|
||||
|
||||
return SendZmqMessage(command, data, 32);
|
||||
}
|
||||
|
||||
bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
|
||||
{
|
||||
LogPrint(BCLog::ZMQ, "Publish rawblock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address);
|
||||
@ -301,3 +322,19 @@ bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &t
|
||||
LogPrint(BCLog::ZMQ, "Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address);
|
||||
return SendSequenceMsg(*this, hash, /* Mempool (R)emoval */ 'R', mempool_sequence);
|
||||
}
|
||||
|
||||
bool CZMQPublishRawWalletTransactionNotifier::NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock){
|
||||
uint256 hash = transaction.GetHash();
|
||||
LogPrint(BCLog::ZMQ, "Publish rawwallettx %s to %s\n", hash.GetHex(), this->address);
|
||||
DataStream ss;
|
||||
ss << TX_WITH_WITNESS(transaction);
|
||||
|
||||
const char *command;
|
||||
|
||||
if (!hashBlock.IsNull())
|
||||
command = MSG_RAWWALLETTXBLOCK;
|
||||
else
|
||||
command = MSG_RAWWALLETTXMEMPOOL;
|
||||
|
||||
return SendZmqMessage(command, &(*ss.begin()), ss.size());
|
||||
}
|
||||
|
@ -46,6 +46,12 @@ public:
|
||||
bool NotifyTransaction(const CTransaction &transaction) override;
|
||||
};
|
||||
|
||||
class CZMQPublishHashWalletTransactionNotifier : public CZMQAbstractPublishNotifier
|
||||
{
|
||||
public:
|
||||
bool NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock) override;
|
||||
};
|
||||
|
||||
class CZMQPublishRawBlockNotifier : public CZMQAbstractPublishNotifier
|
||||
{
|
||||
private:
|
||||
@ -63,6 +69,12 @@ public:
|
||||
bool NotifyTransaction(const CTransaction &transaction) override;
|
||||
};
|
||||
|
||||
class CZMQPublishRawWalletTransactionNotifier : public CZMQAbstractPublishNotifier
|
||||
{
|
||||
public:
|
||||
bool NotifyWalletTransaction(const CTransaction &transaction, const uint256 &hashBlock) override;
|
||||
};
|
||||
|
||||
class CZMQPublishSequenceNotifier : public CZMQAbstractPublishNotifier
|
||||
{
|
||||
public:
|
||||
|
@ -3,6 +3,8 @@
|
||||
# Distributed under the MIT software license, see the accompanying
|
||||
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
||||
"""Test the ZMQ notification interface."""
|
||||
|
||||
import io
|
||||
import os
|
||||
import struct
|
||||
import tempfile
|
||||
@ -23,6 +25,7 @@ from test_framework.messages import (
|
||||
CBlock,
|
||||
hash256,
|
||||
tx_from_hex,
|
||||
CTransaction,
|
||||
)
|
||||
from test_framework.util import (
|
||||
assert_equal,
|
||||
@ -53,10 +56,12 @@ class ZMQSubscriber:
|
||||
self.socket.setsockopt(zmq.SUBSCRIBE, self.topic)
|
||||
|
||||
# Receive message from publisher and verify that topic and sequence match
|
||||
def _receive_from_publisher_and_check(self):
|
||||
def _receive_from_publisher_and_check(self, specific_topic = None):
|
||||
expected_topic = specific_topic if specific_topic else self.topic
|
||||
|
||||
topic, body, seq = self.socket.recv_multipart()
|
||||
# Topic should match the subscriber topic.
|
||||
assert_equal(topic, self.topic)
|
||||
assert_equal(topic, expected_topic)
|
||||
# Sequence should be incremental.
|
||||
received_seq = struct.unpack('<I', seq)[-1]
|
||||
if self.sequence is None:
|
||||
@ -66,8 +71,8 @@ class ZMQSubscriber:
|
||||
self.sequence += 1
|
||||
return body
|
||||
|
||||
def receive(self):
|
||||
return self._receive_from_publisher_and_check()
|
||||
def receive(self, specific_topic = None):
|
||||
return self._receive_from_publisher_and_check(specific_topic)
|
||||
|
||||
def receive_sequence(self):
|
||||
body = self._receive_from_publisher_and_check()
|
||||
@ -90,7 +95,10 @@ class ZMQTestSetupBlock:
|
||||
raw transaction data.
|
||||
"""
|
||||
def __init__(self, test_framework, node):
|
||||
self.block_hash = test_framework.generate(node, 1, sync_fun=test_framework.no_op)[0]
|
||||
if test_framework.is_wallet_compiled():
|
||||
self.block_hash = test_framework.generatetoaddress(node, nblocks=1, address=node.getnewaddress(), maxtries=1000000, sync_fun=test_framework.no_op)[0]
|
||||
else:
|
||||
self.block_hash = test_framework.generate(node, 1, sync_fun=test_framework.no_op)[0]
|
||||
coinbase = node.getblock(self.block_hash, 2)['tx'][0]
|
||||
self.tx_hash = coinbase['txid']
|
||||
self.raw_tx = coinbase['hex']
|
||||
@ -106,11 +114,16 @@ class ZMQTestSetupBlock:
|
||||
|
||||
|
||||
class ZMQTest (BitcoinTestFramework):
|
||||
def add_options(self, parser):
|
||||
self.add_wallet_options(parser)
|
||||
|
||||
def set_test_params(self):
|
||||
self.num_nodes = 2
|
||||
# whitelist peers to speed up tx relay / mempool sync
|
||||
self.noban_tx_relay = True
|
||||
self.zmq_port_base = p2p_port(self.num_nodes + 1)
|
||||
if self.is_wallet_compiled():
|
||||
self.skip_if_no_wallet()
|
||||
|
||||
def skip_test_if_missing_module(self):
|
||||
self.skip_if_no_py3_zmq()
|
||||
@ -163,7 +176,10 @@ class ZMQTest (BitcoinTestFramework):
|
||||
recv_failed = False
|
||||
for sub in subscribers:
|
||||
try:
|
||||
while not test_block.caused_notification(sub.receive().hex()):
|
||||
specific_topic = sub.topic
|
||||
if b'wallet' in sub.topic:
|
||||
specific_topic += b"-block"
|
||||
while not test_block.caused_notification(sub.receive(specific_topic=specific_topic).hex()):
|
||||
self.log.debug("Ignoring sync-up notification for previously generated block.")
|
||||
except zmq.error.Again:
|
||||
self.log.debug("Didn't receive sync-up notification, trying again.")
|
||||
@ -195,16 +211,36 @@ class ZMQTest (BitcoinTestFramework):
|
||||
socket_path = tempfile.NamedTemporaryFile().name
|
||||
address = f"ipc://{socket_path}"
|
||||
|
||||
subs = self.setup_zmq_test([(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]])
|
||||
services = ["hashblock", "hashtx", "rawblock", "rawtx"]
|
||||
if self.is_wallet_compiled():
|
||||
services += ["hashwallettx", "rawwallettx"]
|
||||
subs = self.setup_zmq_test([(topic, address) for topic in services])
|
||||
|
||||
hashblock = subs[0]
|
||||
hashtx = subs[1]
|
||||
rawblock = subs[2]
|
||||
rawtx = subs[3]
|
||||
if self.is_wallet_compiled():
|
||||
hashwallettx = subs[-2]
|
||||
rawwallettx = subs[-1]
|
||||
|
||||
self.sync_all()
|
||||
# Flush initial wallettx events before we begin
|
||||
while True:
|
||||
try:
|
||||
topic, body, seq = hashwallettx.socket.recv_multipart()
|
||||
except zmq.ZMQError:
|
||||
break
|
||||
subscriber = {b'hashwallettx-block': hashwallettx, b'rawwallettx-block': rawwallettx}[topic]
|
||||
assert_equal(struct.unpack('<I', seq)[-1], subscriber.sequence)
|
||||
subscriber.sequence += 1
|
||||
|
||||
num_blocks = 5
|
||||
self.log.info(f"Generate {num_blocks} blocks (and {num_blocks} coinbase txes)")
|
||||
genhashes = self.generatetoaddress(self.nodes[0], num_blocks, ADDRESS_BCRT1_UNSPENDABLE)
|
||||
if self.is_wallet_compiled():
|
||||
genhashes = self.generate(self.nodes[0], num_blocks)
|
||||
else:
|
||||
genhashes = self.generatetoaddress(self.nodes[0], num_blocks, ADDRESS_BCRT1_UNSPENDABLE)
|
||||
|
||||
for x in range(num_blocks):
|
||||
# Should receive the coinbase txid.
|
||||
@ -224,6 +260,15 @@ class ZMQTest (BitcoinTestFramework):
|
||||
assert_equal(len(block.vtx), 1)
|
||||
assert_equal(genhashes[x], hash256_reversed(hex[:80]).hex())
|
||||
|
||||
if self.is_wallet_compiled():
|
||||
# Should receive wallet tx
|
||||
wallettxid = hashwallettx.receive(b"hashwallettx-block")
|
||||
wallethex = rawwallettx.receive(b"rawwallettx-block")
|
||||
wallettx = CTransaction()
|
||||
wallettx.deserialize(io.BytesIO(wallethex))
|
||||
wallettx.calc_sha256()
|
||||
assert_equal(wallettx.hash, wallettxid.hex())
|
||||
|
||||
# Should receive the generated block hash.
|
||||
hash = hashblock.receive().hex()
|
||||
assert_equal(genhashes[x], hash)
|
||||
@ -232,8 +277,12 @@ class ZMQTest (BitcoinTestFramework):
|
||||
|
||||
|
||||
self.log.info("Wait for tx from second node")
|
||||
payment_tx = self.wallet.send_self_transfer(from_node=self.nodes[1])
|
||||
payment_txid = payment_tx['txid']
|
||||
if self.is_wallet_compiled():
|
||||
payment_txid = self.nodes[1].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
|
||||
payment_tx = {'wtxid': self.nodes[1].getrawtransaction(payment_txid, 1)['hash']}
|
||||
else:
|
||||
payment_tx = self.wallet.send_self_transfer(from_node=self.nodes[1])
|
||||
payment_txid = payment_tx['txid']
|
||||
self.sync_all()
|
||||
# Should receive the broadcasted txid.
|
||||
txid = hashtx.receive()
|
||||
@ -250,13 +299,22 @@ class ZMQTest (BitcoinTestFramework):
|
||||
txid = hashtx.receive()
|
||||
assert_equal(payment_txid, txid.hex())
|
||||
|
||||
if self.is_wallet_compiled():
|
||||
wallettxid = hashwallettx.receive(b"hashwallettx-mempool")
|
||||
wallethex = rawwallettx.receive(b"rawwallettx-mempool")
|
||||
wallettx = CTransaction()
|
||||
wallettx.deserialize(io.BytesIO(wallethex))
|
||||
wallettx.calc_sha256()
|
||||
assert_equal(wallettx.hash, wallettxid.hex())
|
||||
|
||||
self.log.info("Test the getzmqnotifications RPC")
|
||||
assert_equal(self.nodes[0].getzmqnotifications(), [
|
||||
{"type": "pubhashblock", "address": address, "hwm": 1000},
|
||||
{"type": "pubhashtx", "address": address, "hwm": 1000},
|
||||
] + ([{"type": "pubhashwallettx", "address": address, "hwm": 1000}] if self.is_wallet_compiled() else []) + [
|
||||
{"type": "pubrawblock", "address": address, "hwm": 1000},
|
||||
{"type": "pubrawtx", "address": address, "hwm": 1000},
|
||||
] + ([{"type": "pubrawwallettx", "address": address, "hwm": 1000}] if self.is_wallet_compiled() else []) + [
|
||||
])
|
||||
|
||||
assert_equal(self.nodes[1].getzmqnotifications(), [])
|
||||
|
Loading…
Reference in New Issue
Block a user