Skip to content

Instantly share code, notes, and snippets.

@havlan
Last active April 4, 2019 08:01
Show Gist options
  • Select an option

  • Save havlan/26d2b82836bab8e111961cec3c55cd6d to your computer and use it in GitHub Desktop.

Select an option

Save havlan/26d2b82836bab8e111961cec3c55cd6d to your computer and use it in GitHub Desktop.
import sys
import logging
from stmpy import Machine, Driver
from graphviz import Source
import stmpy
from sense_hat import SenseHat
from time import sleep
import paho.mqtt.client as mqtt
sense = SenseHat()
class Qu_lab_mqtt:
def __init__(self, room, broker, port):
logging.info(f"Initializing MQTT client for Qu_lab")
self.room = room
self.client = mqtt.Client()
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
def on_connect(self):
pass
# what should this do? Do we expect an answer from the broker ?
def on_message(self, client, userdata, msg):
logging.info(f"Message recieved with topic: {msg.topic}")
#queue, command, id =
# send enqueue message to broker to enqueue user
def send_enqueue(self):
pass
# send dequeue message to broker to dequeue user
def send_dequeue(self):
pass
class Qu_lab:
def __init__(self, room):
self.name = 'Qu'
self.rfid = None # card number
self.is_logged_in = False
self.stm = None
self.topic = str(room)
self.button = None
# for on_next method
# subscribe to self.topic + '/status
def scan_on_login(self, id):
self.is_logged_in = True
self.rfid = id
return True
def on_timeout(self):
self.is_logged_in = False
self.rfid = None
return True
def scan_on_logout(self, id):
if id == self.rfid:
self.is_logged_in = False
self.rfid = None
return True
else:
return False
def on_button_scan(self,b):
self.stm.send("scan")
def on_button_enqueue(self,b):
self.stm.send("btn_pressed")
def on_button_gettingHelp(self,b):
self.stm.send("next")
def on_btn_to_enqueue(self):
# publish enough data to enqueue user
# self.publish(self.topic + '/enqueue')
pass
def on_btn_to_dequeue(self):
# publish enough data to dequeue user
# self.publish(self.topic + '/dequeue)
pass
def on_next(self):
# recieve message from broker about the next user
# enough data to determine who's next.
pass
'''
States format s_name
Transitions format t_trigger_source
'''
qu_object = Qu_lab('grupperom')
t_initial = {
'source':'initial',
'target':'s_idle'
}
s_idle = {
'name':'idle'
}
t_scan_idle = {
'trigger':'scan',
'source':'s_idle',
'target':'s_logged_in'
}
s_logged_in = {
'name':'logged_in',
'entry':'start_timer("t",900000)',
'exit':'stop_timer("t")'
}
t_scan_logged_in = {
'trigger':'scan',
'source':'s_logged_in',
'target':'s_idle'
}
t_timeout_logged_in = {
'trigger':'t',
'source':'s_logged_in',
'target':'s_idle'
}
t_press_btn_logged_in = {
'trigger':'btn_pressed',
'source':'s_logged_in',
'target':'s_enqueued'
}
s_enqueued = {
'name':'enqueued'
}
t_scan_enqueued = {
'trigger':'scan',
'source':'s_enqueued',
'target':'s_idle',
'effect': 'qu_object.dequeue_action'
}
t_press_btn_enqueued = {
'trigger':'btn_pressed',
'source':'s_enqueued',
'target':'s_logged_in',
'effect': 'qu_object.dequeue_action'
}
# TODO evaluate whether it's necessary to publish going up the queue
t_next_enqueued = {
'trigger':'next',
'source':'s_enqueued',
'target':'s_getting_help'
}
s_getting_help = {
'name':'getting_help'
}
t_done_getting_help = {
'trigger':'done',
'source':'s_getting_help',
'target':'s_logged_in',
'effect': 'qu_object.dequeue_action'
}
qu_machine = Machine(
name='mean_machine',
transitions=[t_initial,t_scan_idle,t_scan_logged_in,t_timeout_logged_in,t_press_btn_logged_in,t_press_btn_enqueued,t_next_enqueued,t_done_getting_help, t_scan_enqueued],
obj = qu_object,
states=[s_idle, s_logged_in, s_enqueued, s_getting_help]
)
if __name__ == '__main__':
qu_object.stm = qu_machine
driver = Driver()
driver.add_machine(qu_machine)
# Get the state machine diagram
dot = stmpy.get_graphviz_dot(qu_machine)
src = Source(dot)
#src.render('./test_output', view=True)
driver.start()
driver.send('scan', str(qu_machine.id))
sleep(2)
print(driver.print_status())
sense.clear()
flag = True
while flag:
for event in sense.stick.get_events():
if event.action == "pressed":
print("Event in start_button_event_loop: ", str(event))
driver.send('btn_pressed',str(qu_machine.id))
flag = False
break;
sleep(2)
print(driver.print_status())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment