Skip to content

Instantly share code, notes, and snippets.

@TimSC
Last active January 10, 2026 23:22
Show Gist options
  • Select an option

  • Save TimSC/01fa033b607c629ff8e81c33aec1f432 to your computer and use it in GitHub Desktop.

Select an option

Save TimSC/01fa033b607c629ff8e81c33aec1f432 to your computer and use it in GitHub Desktop.
libp2p DHT peer discovery example
#!/usr/bin/env python3
"""Minimal libp2p DHT example (libp2p 0.4.0 compatible)."""
from __future__ import annotations
import argparse
import logging
import secrets
from typing import Iterable
import multiaddr
import trio
from libp2p import new_host
from libp2p.crypto.secp256k1 import create_new_key_pair
from libp2p.kad_dht.kad_dht import KadDHT, DHTMode
from libp2p.security.insecure.transport import InsecureTransport, PLAINTEXT_PROTOCOL_ID
from libp2p.tools.async_service import background_trio_service
from libp2p.tools.utils import info_from_p2p_addr
def make_plaintext_host():
"""Create a host that only speaks plaintext for simplicity."""
secret = secrets.token_bytes(32)
key_pair = create_new_key_pair(secret)
sec_opt = {PLAINTEXT_PROTOCOL_ID: InsecureTransport(local_key_pair=key_pair)}
return new_host(key_pair=key_pair, sec_opt=sec_opt)
async def connect_to_bootstrap_nodes(host, bootstrap_addrs: Iterable[str]) -> None:
"""Add bootstrap candidates to the peerstore so the DHT can reach them."""
for addr in bootstrap_addrs or []:
try:
peer = info_from_p2p_addr(multiaddr.Multiaddr(addr))
host.get_peerstore().add_addrs(peer.peer_id, peer.addrs, 3600)
logging.info("added bootstrap peer %s %s", peer.peer_id, addr)
except Exception as exc:
logging.warning("bootstrap %s failed: %s", addr, exc)
async def provide_key(dht: KadDHT, key: str) -> None:
"""Make this node advertise a rendezvous key on the DHT."""
for candidate in (key, key.encode("utf-8")):
try:
await dht.provide(candidate)
logging.info("providing key %s (%s)", key, type(candidate).__name__)
return
except Exception as exc:
logging.debug("provide failed for %s: %s", candidate, exc)
raise RuntimeError("unable to provide key on this KadDHT instance")
def print_providers(providers: list) -> None:
"""Dump the peers that were returned by find_providers()."""
if not providers:
logging.info("no providers returned yet")
return
for idx, provider in enumerate(providers, start=1):
peer_id = getattr(provider, "peer_id", None)
addrs = []
for addr in getattr(provider, "addrs", []) or []:
addrs.append(str(addr))
logging.info(
"provider %d: peer_id=%s addrs=%s",
idx,
peer_id,
", ".join(addrs) or "<no addrs>",
)
async def roundtrip_value(dht: KadDHT, key: str, value: bytes) -> None:
"""Store and read a small record if the implementation exposes put/get helpers."""
put_value = getattr(dht, "put_value", None)
get_value = getattr(dht, "get_value", None)
if not callable(put_value) or not callable(get_value):
logging.info("put_value/get_value missing from KadDHT; skipping record steps")
return
try:
await put_value(key, value)
stored = await get_value(key)
logging.info("round-trip record: %s", stored)
except Exception as exc:
logging.warning("record round-trip failed: %s", exc)
async def main_loop(args: argparse.Namespace) -> None:
host = make_plaintext_host()
dht = KadDHT(host, mode=DHTMode.SERVER, enable_random_walk=True)
listen_addr = multiaddr.Multiaddr(args.listen)
async with host.run(listen_addrs=[listen_addr]):
logging.info("host id=%s", host.get_id().to_base58())
for addr in host.get_addrs():
logging.info("listening on %s", addr)
await connect_to_bootstrap_nodes(host, args.bootstrap or [])
async with background_trio_service(dht):
await provide_key(dht, args.key)
key_str = args.key
if args.value:
await roundtrip_value(dht, key_str, args.value.encode("utf-8"))
while True:
providers = await dht.find_providers(key_str, count=args.providers)
print_providers(providers)
if args.sleep <= 0:
logging.info("query interval disabled (<=0); stopping after one discovery sweep")
break
logging.info("sleeping %s seconds before the next discovery", args.sleep)
await trio.sleep(args.sleep)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--listen",
default="/ip4/0.0.0.0/tcp/0",
help="Multiaddr to bind the local libp2p host.",
)
parser.add_argument(
"--bootstrap",
action="append",
help="Additional peers to seed the DHT (/ip4/.../p2p/<peer>).",
)
parser.add_argument(
"--key",
default="neurogrid:dht-example",
help="Rendezvous key (must match peers you want to find).",
)
parser.add_argument(
"--providers",
type=int,
default=8,
help="How many provider records to ask for when discovering.",
)
parser.add_argument(
"--value",
help="Optional data payload to store on the DHT (round-trip if supported).",
)
parser.add_argument(
"--sleep",
type=float,
default=5.0,
help="Seconds to wait between repeated discovery sweeps (<=0 disables looping).",
)
return parser.parse_args()
def main() -> None:
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
args = parse_args()
trio.run(main_loop, args)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment