Created
January 5, 2026 06:00
-
-
Save ferruzzi/7abad07c70bc4a5229384ffbbb4c3e0b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import json | |
| import ssl | |
| from pprint import pformat | |
| import paho.mqtt.client as mqtt | |
| # Set these three values | |
| PRINTER_IP = "0.0.0.0" | |
| ACCESS_CODE = "abc123" # Find on the printer under Settings > LAN Only. Note: "LAN Only" does NOT need to be on. | |
| PRINTER_SERIAL_NUMBER = "012345ABCDE" | |
| # These shouldn't need to change | |
| PORT = 8883 | |
| USER = "bblp" | |
| TOPIC = f"device/{PRINTER_SERIAL_NUMBER}/report" | |
| FILENAME = "../ams/data/filaments.json" | |
| data = None | |
| def on_message(client, userdata, msg): | |
| # This callback is overriding a parent method s the signature is fixed even though we aren't using all parameters. | |
| global data | |
| data = json.loads(msg.payload.decode()) | |
| print(f"Message received: \n{pformat(data, indent=2)}") | |
| client.disconnect() | |
| client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) | |
| client.username_pw_set(USER, ACCESS_CODE) | |
| # Bambu uses a self-signed cert; tell the client not to verify the hostname/cert chain | |
| client.tls_set(cert_reqs=ssl.CERT_NONE) | |
| client.tls_insecure_set(True) | |
| client.on_message = on_message | |
| try: | |
| client.connect(PRINTER_IP, PORT) | |
| client.subscribe(TOPIC) | |
| print(f"Connected to {PRINTER_IP}. Waiting for message...") | |
| client.loop_forever() | |
| except Exception as e: | |
| print(f"Connection failed: {e}") | |
| print("\n\n\n") | |
| # That returns a huge json blob. Some interesting bits of data: | |
| print(f"All data for AMS #0: \n {pformat(data['print']['ams']['ams'][0])}") | |
| print() | |
| print(f"All data for AMS 0, Slot 2 (counting from 0): \n {pformat(data['print']['ams']['ams'][0]['tray'][2])}") | |
| print() | |
| ams_0_slot_1 = data["print"]["ams"]["ams"][0]["tray"][1] | |
| print( | |
| f"What filament is in AMS 0, Slot 1: " | |
| f"\n\tTray state: {'empty' if ams_0_slot_1['state'] == 10 else 'loaded' if ams_0_slot_1['state'] == 11 else 'unk'}" | |
| f"\n\tMaterial: {ams_0_slot_1['tray_sub_brands']} " | |
| f"\n\tColor (8-bit hex): {ams_0_slot_1['tray_color']}" | |
| f"\n\tEstimated percentage remaining: {ams_0_slot_1['remain']}%" | |
| f"\n\tEstimated weight remaining: {ams_0_slot_1['tray_weight']}g" | |
| ) | |
| # Printer state is on of: IDLE, PREPARE, RUNNING, PAUSE, FINISH, FAILED | |
| print() | |
| print(f"Printer state: {data['print']['gcode_state']}") | |
| print() | |
| print(f"Current print: {data['print']['mc_percent']}% done; {data['print']['mc_remaining_time']} minutes remaining") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment