Skip to content

Instantly share code, notes, and snippets.

@groldo
Created October 27, 2021 09:26
Show Gist options
  • Select an option

  • Save groldo/bb10ef7c8dd719423d6f4ba9fc13a7a8 to your computer and use it in GitHub Desktop.

Select an option

Save groldo/bb10ef7c8dd719423d6f4ba9fc13a7a8 to your computer and use it in GitHub Desktop.
motion sensor via mqtt
import RPi.GPIO as GPIO
import time
import logging
import paho.mqtt.client as mqtt
CONFIG = {
"broker": "naboo.lan",
"motion_pin": 11,
"logfile": "motion.log",
"topic": "sensor/flur/motion",
"payload_on": "on",
"payload_off": "off"
}
def setup_gpio():
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BOARD)
GPIO.setup(CONFIG['motion_pin'], GPIO.IN) #Read output from PIR motion sensor
def on_connect(client, userdata, flags, rc):
logging.info('Connected with result code '+str(rc))
def on_disconnect(client, userdata, rc):
if rc != 0:
logging.info('Unexpected disconnection. Return {}'.format(rc))
else:
logging.info('Disconnected from server')
def MQTTConnect():
client = mqtt.Client()
client.on_connect = on_connect
client.on_disconnect = on_disconnect
try:
client.connect_async(CONFIG['broker'])
client.loop_start()
except(OSError, ValueError):
logging.warning('Could not connect to Broker')
return client
def main():
try:
setup_gpio()
logging.basicConfig(filename=CONFIG['logfile'],level=logging.DEBUG,format='%(asctime)s | %(levelname)s: %(message)s')
client = MQTTConnect()
# Loop for the publish message
while True:
i=GPIO.input(CONFIG['motion_pin'])
if i==0: #When output from motion sensor is LOW
try:
client.publish(CONFIG['topic'], CONFIG['payload_off'])
except(OSError, ValueError):
logging.warning('Could not publish message to broker, reconnect and try again')
client = MQTTConnect()
client.publish(CONFIG['topic'], CONFIG['payload_off'])
time.sleep(1)
elif i==1: #When output from motion sensor is HIGH
try:
client.publish(CONFIG['topic'], CONFIG['payload_on'])
except(OSError, ValueError):
logging.warning('Could not publish message to broker, reconnect and try again')
client = MQTTConnect()
client.publish(CONFIG['topic'], CONFIG['payload_on'])
time.sleep(1)
except KeyboardInterrupt:
client.loop_stop()
client.disconnect()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment