Last active
January 10, 2026 23:22
-
-
Save TimSC/01fa033b607c629ff8e81c33aec1f432 to your computer and use it in GitHub Desktop.
libp2p DHT peer discovery example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """Minimal libp2p DHT example (libp2p 0.4.0 compatible).""" | |
| from __future__ import annotations | |
| import argparse | |
| import logging | |
| import secrets | |
| from typing import Iterable | |
| import multiaddr | |
| import trio | |
| from libp2p import new_host | |
| from libp2p.crypto.secp256k1 import create_new_key_pair | |
| from libp2p.kad_dht.kad_dht import KadDHT, DHTMode | |
| from libp2p.security.insecure.transport import InsecureTransport, PLAINTEXT_PROTOCOL_ID | |
| from libp2p.tools.async_service import background_trio_service | |
| from libp2p.tools.utils import info_from_p2p_addr | |
| def make_plaintext_host(): | |
| """Create a host that only speaks plaintext for simplicity.""" | |
| secret = secrets.token_bytes(32) | |
| key_pair = create_new_key_pair(secret) | |
| sec_opt = {PLAINTEXT_PROTOCOL_ID: InsecureTransport(local_key_pair=key_pair)} | |
| return new_host(key_pair=key_pair, sec_opt=sec_opt) | |
| async def connect_to_bootstrap_nodes(host, bootstrap_addrs: Iterable[str]) -> None: | |
| """Add bootstrap candidates to the peerstore so the DHT can reach them.""" | |
| for addr in bootstrap_addrs or []: | |
| try: | |
| peer = info_from_p2p_addr(multiaddr.Multiaddr(addr)) | |
| host.get_peerstore().add_addrs(peer.peer_id, peer.addrs, 3600) | |
| logging.info("added bootstrap peer %s %s", peer.peer_id, addr) | |
| except Exception as exc: | |
| logging.warning("bootstrap %s failed: %s", addr, exc) | |
| async def provide_key(dht: KadDHT, key: str) -> None: | |
| """Make this node advertise a rendezvous key on the DHT.""" | |
| for candidate in (key, key.encode("utf-8")): | |
| try: | |
| await dht.provide(candidate) | |
| logging.info("providing key %s (%s)", key, type(candidate).__name__) | |
| return | |
| except Exception as exc: | |
| logging.debug("provide failed for %s: %s", candidate, exc) | |
| raise RuntimeError("unable to provide key on this KadDHT instance") | |
| def print_providers(providers: list) -> None: | |
| """Dump the peers that were returned by find_providers().""" | |
| if not providers: | |
| logging.info("no providers returned yet") | |
| return | |
| for idx, provider in enumerate(providers, start=1): | |
| peer_id = getattr(provider, "peer_id", None) | |
| addrs = [] | |
| for addr in getattr(provider, "addrs", []) or []: | |
| addrs.append(str(addr)) | |
| logging.info( | |
| "provider %d: peer_id=%s addrs=%s", | |
| idx, | |
| peer_id, | |
| ", ".join(addrs) or "<no addrs>", | |
| ) | |
| async def roundtrip_value(dht: KadDHT, key: str, value: bytes) -> None: | |
| """Store and read a small record if the implementation exposes put/get helpers.""" | |
| put_value = getattr(dht, "put_value", None) | |
| get_value = getattr(dht, "get_value", None) | |
| if not callable(put_value) or not callable(get_value): | |
| logging.info("put_value/get_value missing from KadDHT; skipping record steps") | |
| return | |
| try: | |
| await put_value(key, value) | |
| stored = await get_value(key) | |
| logging.info("round-trip record: %s", stored) | |
| except Exception as exc: | |
| logging.warning("record round-trip failed: %s", exc) | |
| async def main_loop(args: argparse.Namespace) -> None: | |
| host = make_plaintext_host() | |
| dht = KadDHT(host, mode=DHTMode.SERVER, enable_random_walk=True) | |
| listen_addr = multiaddr.Multiaddr(args.listen) | |
| async with host.run(listen_addrs=[listen_addr]): | |
| logging.info("host id=%s", host.get_id().to_base58()) | |
| for addr in host.get_addrs(): | |
| logging.info("listening on %s", addr) | |
| await connect_to_bootstrap_nodes(host, args.bootstrap or []) | |
| async with background_trio_service(dht): | |
| await provide_key(dht, args.key) | |
| key_str = args.key | |
| if args.value: | |
| await roundtrip_value(dht, key_str, args.value.encode("utf-8")) | |
| while True: | |
| providers = await dht.find_providers(key_str, count=args.providers) | |
| print_providers(providers) | |
| if args.sleep <= 0: | |
| logging.info("query interval disabled (<=0); stopping after one discovery sweep") | |
| break | |
| logging.info("sleeping %s seconds before the next discovery", args.sleep) | |
| await trio.sleep(args.sleep) | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser(description=__doc__) | |
| parser.add_argument( | |
| "--listen", | |
| default="/ip4/0.0.0.0/tcp/0", | |
| help="Multiaddr to bind the local libp2p host.", | |
| ) | |
| parser.add_argument( | |
| "--bootstrap", | |
| action="append", | |
| help="Additional peers to seed the DHT (/ip4/.../p2p/<peer>).", | |
| ) | |
| parser.add_argument( | |
| "--key", | |
| default="neurogrid:dht-example", | |
| help="Rendezvous key (must match peers you want to find).", | |
| ) | |
| parser.add_argument( | |
| "--providers", | |
| type=int, | |
| default=8, | |
| help="How many provider records to ask for when discovering.", | |
| ) | |
| parser.add_argument( | |
| "--value", | |
| help="Optional data payload to store on the DHT (round-trip if supported).", | |
| ) | |
| parser.add_argument( | |
| "--sleep", | |
| type=float, | |
| default=5.0, | |
| help="Seconds to wait between repeated discovery sweeps (<=0 disables looping).", | |
| ) | |
| return parser.parse_args() | |
| def main() -> None: | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") | |
| args = parse_args() | |
| trio.run(main_loop, args) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment