Merge 29112 via fix_wallet_single_batch_only-26+knots

This commit is contained in:
Luke Dashjr 2024-03-25 17:26:53 +00:00
commit 9fae003bc4
4 changed files with 129 additions and 6 deletions

View File

@ -110,7 +110,7 @@ Mutex SQLiteDatabase::g_sqlite_mutex;
int SQLiteDatabase::g_sqlite_count = 0;
SQLiteDatabase::SQLiteDatabase(const fs::path& dir_path, const fs::path& file_path, const DatabaseOptions& options, bool mock)
: WalletDatabase(), m_mock(mock), m_dir_path(fs::PathToString(dir_path)), m_file_path(fs::PathToString(file_path)), m_use_unsafe_sync(options.use_unsafe_sync)
: WalletDatabase(), m_mock(mock), m_dir_path(fs::PathToString(dir_path)), m_file_path(fs::PathToString(file_path)), m_write_semaphore(1), m_use_unsafe_sync(options.use_unsafe_sync)
{
{
LOCK(g_sqlite_mutex);
@ -408,7 +408,7 @@ void SQLiteBatch::Close()
bool force_conn_refresh = false;
// If we began a transaction, and it wasn't committed, abort the transaction in progress
if (m_database.HasActiveTxn()) {
if (m_txn) {
if (TxnAbort()) {
LogPrintf("SQLiteBatch: Batch closed unexpectedly without the transaction being explicitly committed or aborted\n");
} else {
@ -442,6 +442,8 @@ void SQLiteBatch::Close()
m_database.Close();
try {
m_database.Open();
// If TxnAbort failed and we refreshed the connection, the semaphore was not released, so release it here to avoid deadlocks on future writes.
m_database.m_write_semaphore.post();
} catch (const std::runtime_error&) {
// If open fails, cleanup this object and rethrow the exception
m_database.Close();
@ -493,6 +495,9 @@ bool SQLiteBatch::WriteKey(DataStream&& key, DataStream&& value, bool overwrite)
if (!BindBlobToStatement(stmt, 1, key, "key")) return false;
if (!BindBlobToStatement(stmt, 2, value, "value")) return false;
// Acquire semaphore if not previously acquired when creating a transaction.
if (!m_txn) m_database.m_write_semaphore.wait();
// Execute
int res = sqlite3_step(stmt);
sqlite3_clear_bindings(stmt);
@ -500,6 +505,9 @@ bool SQLiteBatch::WriteKey(DataStream&& key, DataStream&& value, bool overwrite)
if (res != SQLITE_DONE) {
LogPrintf("%s: Unable to execute statement: %s\n", __func__, sqlite3_errstr(res));
}
if (!m_txn) m_database.m_write_semaphore.post();
return res == SQLITE_DONE;
}
@ -511,6 +519,9 @@ bool SQLiteBatch::ExecStatement(sqlite3_stmt* stmt, Span<const std::byte> blob)
// Bind: leftmost parameter in statement is index 1
if (!BindBlobToStatement(stmt, 1, blob, "key")) return false;
// Acquire semaphore if not previously acquired when creating a transaction.
if (!m_txn) m_database.m_write_semaphore.wait();
// Execute
int res = sqlite3_step(stmt);
sqlite3_clear_bindings(stmt);
@ -518,6 +529,9 @@ bool SQLiteBatch::ExecStatement(sqlite3_stmt* stmt, Span<const std::byte> blob)
if (res != SQLITE_DONE) {
LogPrintf("%s: Unable to execute statement: %s\n", __func__, sqlite3_errstr(res));
}
if (!m_txn) m_database.m_write_semaphore.post();
return res == SQLITE_DONE;
}
@ -634,30 +648,43 @@ std::unique_ptr<DatabaseCursor> SQLiteBatch::GetNewPrefixCursor(Span<const std::
bool SQLiteBatch::TxnBegin()
{
if (!m_database.m_db || m_database.HasActiveTxn()) return false;
if (!m_database.m_db || m_txn) return false;
m_database.m_write_semaphore.wait();
Assert(!m_database.HasActiveTxn());
int res = Assert(m_exec_handler)->Exec(m_database, "BEGIN TRANSACTION");
if (res != SQLITE_OK) {
LogPrintf("SQLiteBatch: Failed to begin the transaction\n");
m_database.m_write_semaphore.post();
} else {
m_txn = true;
}
return res == SQLITE_OK;
}
bool SQLiteBatch::TxnCommit()
{
if (!m_database.HasActiveTxn()) return false;
if (!m_database.m_db || !m_txn) return false;
Assert(m_database.HasActiveTxn());
int res = Assert(m_exec_handler)->Exec(m_database, "COMMIT TRANSACTION");
if (res != SQLITE_OK) {
LogPrintf("SQLiteBatch: Failed to commit the transaction\n");
} else {
m_txn = false;
m_database.m_write_semaphore.post();
}
return res == SQLITE_OK;
}
bool SQLiteBatch::TxnAbort()
{
if (!m_database.HasActiveTxn()) return false;
if (!m_database.m_db || !m_txn) return false;
Assert(m_database.HasActiveTxn());
int res = Assert(m_exec_handler)->Exec(m_database, "ROLLBACK TRANSACTION");
if (res != SQLITE_OK) {
LogPrintf("SQLiteBatch: Failed to abort the transaction\n");
} else {
m_txn = false;
m_database.m_write_semaphore.post();
}
return res == SQLITE_OK;
}

View File

@ -58,6 +58,18 @@ private:
sqlite3_stmt* m_delete_stmt{nullptr};
sqlite3_stmt* m_delete_prefix_stmt{nullptr};
/** Whether this batch has started a database transaction and whether it owns SQLiteDatabase::m_write_semaphore.
* If the batch starts a db tx, it acquires the semaphore and sets this to true, keeping the semaphore
* until the transaction ends to prevent other batch objects from writing to the database.
*
* If this batch did not start a transaction, the semaphore is acquired transiently when writing and m_txn
* is not set.
*
* m_txn is different from HasActiveTxn() as it is only true when this batch has started the transaction,
* not just when any batch has started a transaction.
*/
bool m_txn{false};
void SetupSQLStatements();
bool ExecStatement(sqlite3_stmt* stmt, Span<const std::byte> blob);
@ -115,6 +127,10 @@ public:
~SQLiteDatabase();
// Batches must acquire this semaphore on writing, and release when done writing.
// This ensures that only one batch is modifying the database at a time.
CSemaphore m_write_semaphore;
bool Verify(bilingual_str& error);
/** Open the database if it is not already opened */

View File

@ -279,8 +279,48 @@ BOOST_AUTO_TEST_CASE(txn_close_failure_dangling_txn)
BOOST_CHECK(!batch2->Exists(key));
}
#endif // USE_SQLITE
BOOST_AUTO_TEST_CASE(concurrent_txn_dont_interfere)
{
std::string key = "key";
std::string value = "value";
std::string value2 = "value_2";
DatabaseOptions options;
DatabaseStatus status;
bilingual_str error;
const auto& database = MakeSQLiteDatabase(m_path_root / "sqlite", options, status, error);
std::unique_ptr<DatabaseBatch> handler = Assert(database)->MakeBatch();
// Verify concurrent db transactions does not interfere between each other.
// Start db txn, write key and check the key does exist within the db txn.
BOOST_CHECK(handler->TxnBegin());
BOOST_CHECK(handler->Write(key, value));
BOOST_CHECK(handler->Exists(key));
// But, the same key, does not exist in another handler
std::unique_ptr<DatabaseBatch> handler2 = Assert(database)->MakeBatch();
BOOST_CHECK(handler2->Exists(key));
// Attempt to commit the handler txn calling the handler2 methods.
// Which, must not be possible.
BOOST_CHECK(!handler2->TxnCommit());
BOOST_CHECK(!handler2->TxnAbort());
// Only the first handler can commit the changes.
BOOST_CHECK(handler->TxnCommit());
// And, once commit is completed, handler2 can read the record
std::string read_value;
BOOST_CHECK(handler2->Read(key, read_value));
BOOST_CHECK_EQUAL(read_value, value);
// Also, once txn is committed, single write statements are re-enabled.
// Which means that handler2 can read the record changes directly.
BOOST_CHECK(handler->Write(key, value2, /*fOverwrite=*/true));
BOOST_CHECK(handler2->Read(key, read_value));
BOOST_CHECK_EQUAL(read_value, value2);
}
#endif // USE_SQLITE
BOOST_AUTO_TEST_SUITE_END()
} // namespace wallet

View File

@ -9,7 +9,10 @@ try:
except ImportError:
pass
import concurrent.futures
from test_framework.blocktools import COINBASE_MATURITY
from test_framework.descriptors import descsum_create
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
assert_equal,
@ -33,6 +36,41 @@ class WalletDescriptorTest(BitcoinTestFramework):
self.skip_if_no_sqlite()
self.skip_if_no_py_sqlite3()
def test_concurrent_writes(self):
self.log.info("Test sqlite concurrent writes are in the correct order")
self.restart_node(0, extra_args=["-unsafesqlitesync=0"])
self.nodes[0].createwallet(wallet_name="concurrency", blank=True)
wallet = self.nodes[0].get_wallet_rpc("concurrency")
# First import a descriptor that uses hardened dervation so that topping up
# Will require writing a ton to db
wallet.importdescriptors([{"desc":descsum_create("wpkh(tprv8ZgxMBicQKsPeuVhWwi6wuMQGfPKi9Li5GtX35jVNknACgqe3CY4g5xgkfDDJcmtF7o1QnxWDRYw4H5P26PXq7sbcUkEqeR4fg3Kxp2tigg/0h/0h/*h)"), "timestamp": "now", "active": True}])
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as thread:
topup = thread.submit(wallet.keypoolrefill, newsize=1000)
# Then while the topup is running, we need to do something that will call
# ChainStateFlushed which will trigger a write to the db, hopefully at the
# same time that the topup still has an open db transaction.
self.nodes[0].cli.gettxoutsetinfo()
assert_equal(topup.result(), None)
wallet.unloadwallet()
# Check that everything was written
wallet_db = self.nodes[0].wallets_path / "concurrency" / self.wallet_data_filename
conn = sqlite3.connect(wallet_db)
with conn:
# Retrieve the bestblock_nomerkle record
bestblock_rec = conn.execute("SELECT value FROM main WHERE hex(key) = '1262657374626C6F636B5F6E6F6D65726B6C65'").fetchone()[0]
# Retrieve the number of descriptor cache records
# Since we store binary data, sqlite's comparison operators don't work everywhere
# so just retrieve all records and process them ourselves.
db_keys = conn.execute("SELECT key FROM main").fetchall()
cache_records = len([k[0] for k in db_keys if b"walletdescriptorcache" in k[0]])
conn.close()
assert_equal(bestblock_rec[5:37][::-1].hex(), self.nodes[0].getbestblockhash())
assert_equal(cache_records, 1000)
def run_test(self):
if self.is_bdb_compiled():
# Make a legacy wallet and check it is BDB
@ -240,6 +278,8 @@ class WalletDescriptorTest(BitcoinTestFramework):
conn.close()
assert_raises_rpc_error(-4, "Unexpected legacy entry in descriptor wallet found.", self.nodes[0].loadwallet, "crashme")
self.test_concurrent_writes()
if __name__ == '__main__':
WalletDescriptorTest().main ()