Last active
April 4, 2019 08:01
-
-
Save havlan/26d2b82836bab8e111961cec3c55cd6d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import sys | |
| import logging | |
| from stmpy import Machine, Driver | |
| from graphviz import Source | |
| import stmpy | |
| from sense_hat import SenseHat | |
| from time import sleep | |
| import paho.mqtt.client as mqtt | |
| sense = SenseHat() | |
| class Qu_lab_mqtt: | |
| def __init__(self, room, broker, port): | |
| logging.info(f"Initializing MQTT client for Qu_lab") | |
| self.room = room | |
| self.client = mqtt.Client() | |
| self.client.on_connect = self.on_connect | |
| self.client.on_message = self.on_message | |
| def on_connect(self): | |
| pass | |
| # what should this do? Do we expect an answer from the broker ? | |
| def on_message(self, client, userdata, msg): | |
| logging.info(f"Message recieved with topic: {msg.topic}") | |
| #queue, command, id = | |
| # send enqueue message to broker to enqueue user | |
| def send_enqueue(self): | |
| pass | |
| # send dequeue message to broker to dequeue user | |
| def send_dequeue(self): | |
| pass | |
| class Qu_lab: | |
| def __init__(self, room): | |
| self.name = 'Qu' | |
| self.rfid = None # card number | |
| self.is_logged_in = False | |
| self.stm = None | |
| self.topic = str(room) | |
| self.button = None | |
| # for on_next method | |
| # subscribe to self.topic + '/status | |
| def scan_on_login(self, id): | |
| self.is_logged_in = True | |
| self.rfid = id | |
| return True | |
| def on_timeout(self): | |
| self.is_logged_in = False | |
| self.rfid = None | |
| return True | |
| def scan_on_logout(self, id): | |
| if id == self.rfid: | |
| self.is_logged_in = False | |
| self.rfid = None | |
| return True | |
| else: | |
| return False | |
| def on_button_scan(self,b): | |
| self.stm.send("scan") | |
| def on_button_enqueue(self,b): | |
| self.stm.send("btn_pressed") | |
| def on_button_gettingHelp(self,b): | |
| self.stm.send("next") | |
| def on_btn_to_enqueue(self): | |
| # publish enough data to enqueue user | |
| # self.publish(self.topic + '/enqueue') | |
| pass | |
| def on_btn_to_dequeue(self): | |
| # publish enough data to dequeue user | |
| # self.publish(self.topic + '/dequeue) | |
| pass | |
| def on_next(self): | |
| # recieve message from broker about the next user | |
| # enough data to determine who's next. | |
| pass | |
| ''' | |
| States format s_name | |
| Transitions format t_trigger_source | |
| ''' | |
| qu_object = Qu_lab('grupperom') | |
| t_initial = { | |
| 'source':'initial', | |
| 'target':'s_idle' | |
| } | |
| s_idle = { | |
| 'name':'idle' | |
| } | |
| t_scan_idle = { | |
| 'trigger':'scan', | |
| 'source':'s_idle', | |
| 'target':'s_logged_in' | |
| } | |
| s_logged_in = { | |
| 'name':'logged_in', | |
| 'entry':'start_timer("t",900000)', | |
| 'exit':'stop_timer("t")' | |
| } | |
| t_scan_logged_in = { | |
| 'trigger':'scan', | |
| 'source':'s_logged_in', | |
| 'target':'s_idle' | |
| } | |
| t_timeout_logged_in = { | |
| 'trigger':'t', | |
| 'source':'s_logged_in', | |
| 'target':'s_idle' | |
| } | |
| t_press_btn_logged_in = { | |
| 'trigger':'btn_pressed', | |
| 'source':'s_logged_in', | |
| 'target':'s_enqueued' | |
| } | |
| s_enqueued = { | |
| 'name':'enqueued' | |
| } | |
| t_scan_enqueued = { | |
| 'trigger':'scan', | |
| 'source':'s_enqueued', | |
| 'target':'s_idle', | |
| 'effect': 'qu_object.dequeue_action' | |
| } | |
| t_press_btn_enqueued = { | |
| 'trigger':'btn_pressed', | |
| 'source':'s_enqueued', | |
| 'target':'s_logged_in', | |
| 'effect': 'qu_object.dequeue_action' | |
| } | |
| # TODO evaluate whether it's necessary to publish going up the queue | |
| t_next_enqueued = { | |
| 'trigger':'next', | |
| 'source':'s_enqueued', | |
| 'target':'s_getting_help' | |
| } | |
| s_getting_help = { | |
| 'name':'getting_help' | |
| } | |
| t_done_getting_help = { | |
| 'trigger':'done', | |
| 'source':'s_getting_help', | |
| 'target':'s_logged_in', | |
| 'effect': 'qu_object.dequeue_action' | |
| } | |
| qu_machine = Machine( | |
| name='mean_machine', | |
| transitions=[t_initial,t_scan_idle,t_scan_logged_in,t_timeout_logged_in,t_press_btn_logged_in,t_press_btn_enqueued,t_next_enqueued,t_done_getting_help, t_scan_enqueued], | |
| obj = qu_object, | |
| states=[s_idle, s_logged_in, s_enqueued, s_getting_help] | |
| ) | |
| if __name__ == '__main__': | |
| qu_object.stm = qu_machine | |
| driver = Driver() | |
| driver.add_machine(qu_machine) | |
| # Get the state machine diagram | |
| dot = stmpy.get_graphviz_dot(qu_machine) | |
| src = Source(dot) | |
| #src.render('./test_output', view=True) | |
| driver.start() | |
| driver.send('scan', str(qu_machine.id)) | |
| sleep(2) | |
| print(driver.print_status()) | |
| sense.clear() | |
| flag = True | |
| while flag: | |
| for event in sense.stick.get_events(): | |
| if event.action == "pressed": | |
| print("Event in start_button_event_loop: ", str(event)) | |
| driver.send('btn_pressed',str(qu_machine.id)) | |
| flag = False | |
| break; | |
| sleep(2) | |
| print(driver.print_status()) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment