Skip to content

Instantly share code, notes, and snippets.

@patrickelectric
Created March 2, 2026 14:02
Show Gist options
  • Select an option

  • Save patrickelectric/9be8199d2d0befb404c2622a39ca9b64 to your computer and use it in GitHub Desktop.

Select an option

Save patrickelectric/9be8199d2d0befb404c2622a39ca9b64 to your computer and use it in GitHub Desktop.
UV Project to test zenoh example in ardupilot
import threading
import time
import zenoh
def listener(sample):
print(f"[SUB] {sample.key_expr} => {sample.payload.to_string()}")
def publisher_loop(session):
publisher = session.declare_publisher("ardupilot/receive")
while True:
publisher.put("pong")
print("[PUB] ardupilot/receive => pong")
time.sleep(1)
def main():
config = zenoh.Config()
with zenoh.open(config) as session:
sub = session.declare_subscriber("ardupilot/output", listener)
pub_thread = threading.Thread(target=publisher_loop, args=(session,), daemon=True)
pub_thread.start()
print("Running... Press Ctrl+C to stop.")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Shutting down.")
finally:
sub.undeclare()
if __name__ == "__main__":
main()
[project]
name = "Zenoh-ArduPino-S2"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"eclipse-zenoh==1.7.2",
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment