Skip to content

Instantly share code, notes, and snippets.

@adenine
Created December 3, 2025 09:45
Show Gist options
  • Select an option

  • Save adenine/4b6a9feb77897ed35dae0cb8605d7434 to your computer and use it in GitHub Desktop.

Select an option

Save adenine/4b6a9feb77897ed35dae0cb8605d7434 to your computer and use it in GitHub Desktop.
Arduino Uno Q Spacebrew Code
from arduino.app_utils import *
import time
import random
import sys
from paho.mqtt import client as mqtt_client
# --- Configuration ---
BROKER = '192.168.1.46'
PORT = 1883
print(BROKER, PORT)
led_state = False
CLIENT_NAME = f"SimpleClient_{random.randint(0, 1000)}"
DESCRIPTION = "A simple example client demonstrating publishers and subscribers."
print(CLIENT_NAME, DESCRIPTION)
# --- Publishers and Subscribers ---
PUBLISHERS = [
"button:boolean",
"slider:range"
]
SUBSCRIBERS = [
"led:boolean",
"text:string"
]
print(PUBLISHERS, SUBSCRIBERS)
# Define the client object globally or pass it,
# but for this structure, the handler functions need the client defined first.
# We will define the client BEFORE assigning the handlers.
def on_connect(client, userdata, flags, rc):
# ... (Your on_connect logic remains the same) ...
if rc == 0:
print(f"βœ… Connected to Spacebrew Server at {BROKER}:{PORT}")
# --- Registration ---
pubs_str = ", ".join(PUBLISHERS)
subs_str = ", ".join(SUBSCRIBERS)
registration_msg = f"{CLIENT_NAME}, {DESCRIPTION}, pubs({pubs_str}), subs({subs_str})"
print(f"πŸ“€ Sending Registration: {registration_msg}")
client.publish("YuxiSpace", registration_msg)
# --- Subscribe to Input Topics ---
for sub in SUBSCRIBERS:
sub_name = sub.split(':')[0]
topic = f"{CLIENT_NAME}/{sub_name}"
client.subscribe(topic)
print(f"πŸ‘‚ Listening on topic: {topic}")
else:
print(f"πŸ”΄ Failed to connect, return code {rc}")
def on_message(client, userdata, msg):
print(f"\n[RX] Received `{msg.payload.decode()}` on `{msg.topic}`")
# Add your custom logic here
def main_loop():
# --- MQTT Setup ---
# The client needs to be defined BEFORE assigning handlers in standard paho-mqtt
# Here, we define it inside main_loop for simple execution.
client = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION1, CLIENT_NAME)
# Assign the handler functions to the client object
client.on_connect = on_connect
client.on_message = on_message
try:
print(f"πŸš€ Starting {CLIENT_NAME}...")
client.connect(BROKER, PORT)
client.loop_start() # Start background thread for MQTT
# Keep the script running and publish some example data
while True:
# Example: Simulate a button press every 5 seconds
print(f"πŸ“€ Publishing 'true' to {CLIENT_NAME}/button")
client.publish(f"{CLIENT_NAME}/button", "true")
time.sleep(2.5)
print(f"πŸ“€ Publishing 'false' to {CLIENT_NAME}/button")
client.publish(f"{CLIENT_NAME}/button", "false")
global led_state
time.sleep(1)
led_state = not led_state
Bridge.call("set_led_state", led_state)
time.sleep(2.5)
# REMOVE: global led_state and Bridge.call(...) - they are irrelevant here
except KeyboardInterrupt:
print("\nStopping client...")
# Clean up the loop and disconnect
client.loop_stop()
client.disconnect()
# Run the main function
if __name__ == "__main__":
main_loop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment