(Untested code)
Watch for Simplicity spends
import zmq
from bitcoinrpc import BitcoinRPC # Or elementsrpc for Liquid
rpc = BitcoinRPC('user', 'pass', 'http://localhost:18884') # Liquid testnet port example
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://127.0.0.1:29000') # ZMQ for blocks
socket.setsockopt(zmq.SUBSCRIBE, b'rawblock')
while True:
topic, body = socket.recv_multipart()
if topic == b'rawblock':
block_hex = body.hex()
block = rpc.getblock(block_hex, 2)
for tx in block['tx']:
for vin in tx['vin']:
if 'txinwitness' in vin and len(vin['txinwitness']) > 2:
control_block = vin['txinwitness'][-1]
leaf_version = int(control_block[:2], 16) & 0xfe # First byte hex
if leaf_version == 0xc2: # Need to add the version when we know it.
print(f"Potential Simplicity tx: {tx['txid']}")
# Optional: Validate script with simplicity libQuerying blocks for Simplicity spends
from bitcoinrpc import BitcoinRPC
rpc = BitcoinRPC('user', 'pass', 'http://localhost:18884')
def query_simplicity_txs(start_height, end_height=None):
try:
current_height = rpc.getblockcount()
if end_height is None or end_height > current_height:
end_height = current_height
print(f"Querying blocks from height {start_height} to {end_height}...")
for height in range(start_height, end_height + 1):
block_hash = rpc.getblockhash(height)
block = rpc.getblock(block_hash, 2)
for tx in block['tx']:
for vin in tx['vin']:
if 'txinwitness' in vin and len(vin['txinwitness']) > 2:
control_block = vin['txinwitness'][-1]
leaf_version = int(control_block[:2], 16) & 0xfe # Mask to get version
if leaf_version == 0xc2: # Adjust to actual Simplicity leaf version (e.g., 0xc2 or confirmed value)
print(f"Potential Simplicity tx at height {height}: {tx['txid']}")
# Optional: Validate script with simplicity lib, e.g., decode the script item
# script = vin['txinwitness'][-2] # Second-last is the revealed script
# Use rust-simplicity or similar to verify: simplicity.decode_program(script)
if height % 100 == 0: # Progress update every 100 blocks
print(f"Processed up to height {height}")
print("Query complete.")
except Exception as e:
print(f"Error during query: {e}")
# Example usage: Query from block 100000 to current
query_simplicity_txs(start_height=100000)
# Or specify end: query_simplicity_txs(start_height=200000, end_height=200100)