Last active
October 13, 2025 02:22
-
-
Save TheRealFalseReality/01f95b53d5120c93bcd3cc72d94ceba6 to your computer and use it in GitHub Desktop.
temp_dallas_mqtt.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import glob | |
| import time | |
| import paho.mqtt.client as mqtt | |
| import json | |
| # --- MQTT Configuration --- | |
| MQTT_BROKER = "homeassistant.local" # Example: "192.168.1.100" | |
| MQTT_PORT = 1883 | |
| MQTT_USERNAME = "mosquitto" | |
| MQTT_PASSWORD = "Maskell1" | |
| # --- State Topic --- | |
| STATE_TOPIC = "ender3/enclosure/temperature/state" | |
| # --- Home Assistant Discovery Configuration --- | |
| MQTT_DISCOVERY_PREFIX = "homeassistant" | |
| DEVICE_ID = "pi_temperature_sensor_01" | |
| UNIQUE_ID = "pi_temp_livingroom_01" | |
| SENSOR_NAME = "Living Room Temperature" | |
| # --- DS18B20 Sensor Configuration --- | |
| os.system('modprobe w1-gpio') | |
| os.system('modprobe w1-therm') | |
| base_dir = '/sys/bus/w1/devices/' | |
| try: | |
| device_folder = glob.glob(base_dir + '28*')[0] | |
| device_file = device_folder + '/w1_slave' | |
| except IndexError: | |
| print("Error: DS18B20 sensor not found. Check wiring and 1-Wire setup.") | |
| exit() | |
| def read_temp_raw(): | |
| with open(device_file, 'r') as f: | |
| lines = f.readlines() | |
| return lines | |
| def read_temp(): | |
| lines = read_temp_raw() | |
| while lines[0].strip()[-3:] != 'YES': | |
| time.sleep(0.2) | |
| lines = read_temp_raw() | |
| equals_pos = lines[1].find('t=') | |
| if equals_pos != -1: | |
| temp_string = lines[1][equals_pos+2:] | |
| temp_c = float(temp_string) / 1000.0 | |
| return temp_c | |
| return None | |
| def publish_discovery_message(client): | |
| """Publishes the discovery message for Home Assistant.""" | |
| discovery_topic = f"{MQTT_DISCOVERY_PREFIX}/sensor/{DEVICE_ID}/{UNIQUE_ID}/config" | |
| discovery_payload = { | |
| "name": SENSOR_NAME, | |
| "state_topic": STATE_TOPIC, | |
| "unit_of_measurement": "°C", | |
| "value_template": "{{ value }}", | |
| "unique_id": UNIQUE_ID, | |
| "device_class": "temperature", | |
| "device": { | |
| "identifiers": [DEVICE_ID], | |
| "name": "Raspberry Pi Temperature Monitor", | |
| "model": "Pi with DS18B20", | |
| "manufacturer": "Raspberry Pi Foundation" | |
| } | |
| } | |
| print(f"Publishing Home Assistant discovery message to: {discovery_topic}") | |
| client.publish(discovery_topic, json.dumps(discovery_payload), retain=True) | |
| def on_connect(client, userdata, flags, rc): | |
| if rc == 0: | |
| print("Connected to MQTT Broker!") | |
| # Announce the sensor to Home Assistant upon connecting | |
| publish_discovery_message(client) | |
| else: | |
| print(f"Failed to connect, return code {rc}\n") | |
| def setup_mqtt_client(): | |
| client = mqtt.Client() | |
| client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD) | |
| client.on_connect = on_connect | |
| client.connect(MQTT_BROKER, MQTT_PORT, 60) | |
| return client | |
| def main(): | |
| mqtt_client = setup_mqtt_client() | |
| mqtt_client.loop_start() | |
| print("Starting temperature publisher service.") | |
| try: | |
| while True: | |
| temp_c = read_temp() | |
| if temp_c is not None: | |
| print(f"Temperature: {temp_c:.2f}°C. Publishing to {STATE_TOPIC}") | |
| mqtt_client.publish(STATE_TOPIC, f"{temp_c:.2f}", retain=True) | |
| else: | |
| print("Failed to read temperature.") | |
| time.sleep(60) | |
| except KeyboardInterrupt: | |
| print("Publication stopped.") | |
| finally: | |
| mqtt_client.loop_stop() | |
| mqtt_client.disconnect() | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment