Skip to content

Instantly share code, notes, and snippets.

@TheRealFalseReality
Last active October 13, 2025 02:22
Show Gist options
  • Select an option

  • Save TheRealFalseReality/01f95b53d5120c93bcd3cc72d94ceba6 to your computer and use it in GitHub Desktop.

Select an option

Save TheRealFalseReality/01f95b53d5120c93bcd3cc72d94ceba6 to your computer and use it in GitHub Desktop.
temp_dallas_mqtt.py
import os
import glob
import time
import paho.mqtt.client as mqtt
import json
# --- MQTT Configuration ---
MQTT_BROKER = "homeassistant.local" # Example: "192.168.1.100"
MQTT_PORT = 1883
MQTT_USERNAME = "mosquitto"
MQTT_PASSWORD = "Maskell1"
# --- State Topic ---
STATE_TOPIC = "ender3/enclosure/temperature/state"
# --- Home Assistant Discovery Configuration ---
MQTT_DISCOVERY_PREFIX = "homeassistant"
DEVICE_ID = "pi_temperature_sensor_01"
UNIQUE_ID = "pi_temp_livingroom_01"
SENSOR_NAME = "Living Room Temperature"
# --- DS18B20 Sensor Configuration ---
os.system('modprobe w1-gpio')
os.system('modprobe w1-therm')
base_dir = '/sys/bus/w1/devices/'
try:
device_folder = glob.glob(base_dir + '28*')[0]
device_file = device_folder + '/w1_slave'
except IndexError:
print("Error: DS18B20 sensor not found. Check wiring and 1-Wire setup.")
exit()
def read_temp_raw():
with open(device_file, 'r') as f:
lines = f.readlines()
return lines
def read_temp():
lines = read_temp_raw()
while lines[0].strip()[-3:] != 'YES':
time.sleep(0.2)
lines = read_temp_raw()
equals_pos = lines[1].find('t=')
if equals_pos != -1:
temp_string = lines[1][equals_pos+2:]
temp_c = float(temp_string) / 1000.0
return temp_c
return None
def publish_discovery_message(client):
"""Publishes the discovery message for Home Assistant."""
discovery_topic = f"{MQTT_DISCOVERY_PREFIX}/sensor/{DEVICE_ID}/{UNIQUE_ID}/config"
discovery_payload = {
"name": SENSOR_NAME,
"state_topic": STATE_TOPIC,
"unit_of_measurement": "°C",
"value_template": "{{ value }}",
"unique_id": UNIQUE_ID,
"device_class": "temperature",
"device": {
"identifiers": [DEVICE_ID],
"name": "Raspberry Pi Temperature Monitor",
"model": "Pi with DS18B20",
"manufacturer": "Raspberry Pi Foundation"
}
}
print(f"Publishing Home Assistant discovery message to: {discovery_topic}")
client.publish(discovery_topic, json.dumps(discovery_payload), retain=True)
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
# Announce the sensor to Home Assistant upon connecting
publish_discovery_message(client)
else:
print(f"Failed to connect, return code {rc}\n")
def setup_mqtt_client():
client = mqtt.Client()
client.username_pw_set(MQTT_USERNAME, MQTT_PASSWORD)
client.on_connect = on_connect
client.connect(MQTT_BROKER, MQTT_PORT, 60)
return client
def main():
mqtt_client = setup_mqtt_client()
mqtt_client.loop_start()
print("Starting temperature publisher service.")
try:
while True:
temp_c = read_temp()
if temp_c is not None:
print(f"Temperature: {temp_c:.2f}°C. Publishing to {STATE_TOPIC}")
mqtt_client.publish(STATE_TOPIC, f"{temp_c:.2f}", retain=True)
else:
print("Failed to read temperature.")
time.sleep(60)
except KeyboardInterrupt:
print("Publication stopped.")
finally:
mqtt_client.loop_stop()
mqtt_client.disconnect()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment