Here is the source code for my simple alarm system:
boot.py
main.py
httpclient.py
express.py
onboard.py
On the raspberry pi, you simply bring up the server by running as root:
sudo python onboary.py 3000
led_on.sh
led_off.sh
Here is the source code for my simple alarm system:
boot.py
main.py
httpclient.py
express.py
onboard.py
On the raspberry pi, you simply bring up the server by running as root:
sudo python onboary.py 3000
led_on.sh
led_off.sh
| from machine import Pin | |
| import network | |
| import time | |
| HIGH = 1 | |
| LOW = 0 | |
| sta = network.WLAN(network.STA_IF) | |
| ap = network.WLAN(network.AP_IF) | |
| wifi = Pin(16, Pin.OUT) | |
| ap.active(False) | |
| sta.active(False) | |
| wifi.off() | |
| time.sleep(2) | |
| sta.active(True) | |
| sta.connect('SSID', 'password') #change settings here | |
| while sta.isconnected() == False: | |
| wifi.on() | |
| time.sleep(0.3) | |
| wifi.off() | |
| time.sleep(0.3) | |
| print('Connected: ', sta.isconnected()) | |
| if sta.isconnected(): | |
| wifi.on() | |
| else: | |
| wifi.off() | |
| switch = Pin(15, Pin.IN) | |
| led = Pin(14, Pin.OUT) | |
| led.off() | |
| print('switch active') |
| #!/usr/bin/env python | |
| # """ | |
| # This code is based on a minimal http server by bradmontgomery | |
| # https://gist.github.com/bradmontgomery/2219997 | |
| # | |
| # Some part of it was inspired by express (expressjs.com) | |
| # """ | |
| from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer | |
| import SocketServer | |
| class ExpressServerFactory: | |
| def __init__(self): | |
| self.cache = {} | |
| def add(self, path, fn): | |
| self.cache[path] = fn | |
| def get(self, path, fn): | |
| self.cache[path] = fn | |
| def post(self, path, fn): | |
| self.cache[path] = fn | |
| def createClass(self): | |
| cache = self.cache | |
| def fn404(self): | |
| self.send_response(404, 'path: "%s" is not defined...') | |
| class B(BaseHTTPRequestHandler): | |
| def do_GET(self): | |
| try: | |
| fn = cache[self.path] | |
| except Exception as e: | |
| fn = fn404 | |
| fn(self) | |
| def do_POST(self): | |
| try: | |
| fn = cache[self.path] | |
| except Exception as e: | |
| fn = fn404 | |
| fn(self) | |
| return B | |
| def noop(*args, **kwargs): | |
| pass | |
| def run(server_class=HTTPServer, handler_class=None, port=80, stop_handler=noop): | |
| server_address = ('', port) | |
| httpd = server_class(server_address, handler_class) | |
| print 'Starting httpd...' | |
| try: | |
| httpd.serve_forever() | |
| except KeyboardInterrupt: | |
| stop_handler(httpd) | |
| if __name__ == "__main__": | |
| from sys import argv | |
| connected = False | |
| srv = MyHttpServer() | |
| def foo(self): | |
| length = int(self.headers['Content-Length']) | |
| post_data = self.rfile.read(length) | |
| print type(post_data), post_data | |
| srv.post('/foo', foo) | |
| handlerClass = srv.createClass() | |
| if len(argv) == 2: | |
| run(port=int(argv[1]), handler_class=handlerClass) | |
| else: | |
| run(handler_class=handlerClass) | |
| import usocket | |
| import ujson | |
| try: | |
| import ussl | |
| SUPPORT_SSL = True | |
| except ImportError: | |
| ussl = None | |
| SUPPORT_SSL = False | |
| import time | |
| SUPPORT_TIMEOUT = hasattr(usocket.socket, 'settimeout') | |
| CONTENT_TYPE_JSON = 'application/json' | |
| class Response(object): | |
| def __init__(self, status_code, raw): | |
| self.status_code = status_code | |
| self.raw = raw | |
| self._content = False | |
| self.encoding = 'utf-8' | |
| @property | |
| def content(self): | |
| if self._content is False: | |
| self._content = self.raw.read() | |
| self.raw.close() | |
| self.raw = None | |
| return self._content | |
| @property | |
| def text(self): | |
| content = self.content | |
| return str(content, self.encoding) if content else '' | |
| def close(self): | |
| if self.raw is not None: | |
| self._content = None | |
| self.raw.close() | |
| self.raw = None | |
| def json(self): | |
| return ujson.loads(self.text) | |
| def raise_for_status(self): | |
| if 400 <= self.status_code < 500: | |
| raise OSError('Client error: %s' % self.status_code) | |
| if 500 <= self.status_code < 600: | |
| raise OSError('Server error: %s' % self.status_code) | |
| # Adapted from upip | |
| def request(method, url, json=None, timeout=None, headers=None): | |
| urlparts = url.split('/', 3) | |
| proto = urlparts[0] | |
| host = urlparts[2] | |
| urlpath = '' if len(urlparts) < 4 else urlparts[3] | |
| if proto == 'http:': | |
| port = 80 | |
| elif proto == 'https:': | |
| port = 443 | |
| else: | |
| raise OSError('Unsupported protocol: %s' % proto[:-1]) | |
| if ':' in host: | |
| host, port = host.split(':') | |
| port = int(port) | |
| if json is not None: | |
| content = ujson.dumps(json) | |
| content_type = CONTENT_TYPE_JSON | |
| else: | |
| content = None | |
| ai = usocket.getaddrinfo(host, port) | |
| addr = ai[0][4] | |
| sock = usocket.socket() | |
| if timeout is not None: | |
| assert SUPPORT_TIMEOUT, 'Socket does not support timeout' | |
| sock.settimeout(timeout) | |
| sock.connect(addr) | |
| if proto == 'https:': | |
| assert SUPPORT_SSL, 'HTTPS not supported: could not find ussl' | |
| sock = ussl.wrap_socket(sock) | |
| sock.write('%s /%s HTTP/1.0\r\nHost: %s\r\n' % (method, urlpath, host)) | |
| if headers is not None: | |
| for header in headers.items(): | |
| sock.write('%s: %s\r\n' % header) | |
| if content is not None: | |
| sock.write('content-length: %s\r\n' % len(content)) | |
| sock.write('content-type: %s\r\n' % content_type) | |
| sock.write('\r\n') | |
| sock.write(content) | |
| else: | |
| sock.write('\r\n') | |
| l = sock.readline() | |
| protover, status, msg = l.split(None, 2) | |
| # Skip headers | |
| while sock.readline() != b'\r\n': | |
| pass | |
| return Response(int(status), sock) | |
| def get(url, **kwargs): | |
| return request('GET', url, **kwargs) | |
| def post(url, **kwargs): | |
| return request('POST', url, **kwargs) | |
| if __name__ == '__main__': | |
| #resp = get('http://192.168.0.7:8080/sample.txt') | |
| resp = get('http://192.168.0.4:3000/status') | |
| print('Response:', resp.text) | |
| post('http://192.168.0.4:3000/post', json={ "value": 1 }).close() | |
| time.sleep(5) | |
| post('http://192.168.0.4:3000/post', json={ "value": 0 }).close() |
| #!/bin/sh | |
| curl http://192.168.0.4:3000/post -d '{"value" : 0}' \ | |
| -H 'Content-Type:application/json' | |
| echo |
| #!/bin/sh | |
| curl http://192.168.0.4:3000/post -d '{"value" : 1}' \ | |
| -H 'Content-Type:application/json' | |
| echo |
| from machine import Pin | |
| import time | |
| import httpclient as http | |
| import network | |
| station = network.WLAN(network.STA_IF) | |
| class L : pass | |
| p = L() | |
| p.WIFI_CONNECTED = station.isconnected() | |
| URL='http://192.168.0.4:3000/post' | |
| switch = Pin(15, Pin.IN) | |
| wifi_led = Pin(16, Pin.OUT) | |
| alarm_led = Pin(14, Pin.OUT) | |
| def set_state(value): | |
| print('value:', value) | |
| alarm_led.value(value) | |
| if p.WIFI_CONNECTED: | |
| http.post(URL, json={'value': value}).close() | |
| def debounce(fn, p, interval=20): | |
| def cb(*args, **kwargs): | |
| count = 0 | |
| hits = 0 | |
| v = p.value() | |
| while count < interval: | |
| if p.value() == v: | |
| hits = hits + 1 | |
| count = count + 1 | |
| time.sleep(0.001) | |
| if hits == interval: | |
| fn(v) | |
| return cb | |
| print('setting switch trigger...') | |
| switch.irq(trigger=Pin.IRQ_RISING | Pin.IRQ_FALLING,\ | |
| handler=debounce(set_state, switch, 20)) | |
| print('running infinitely...') | |
| def do_wifi_check(): | |
| val = station.isconnected() | |
| p.WIFI_CONNECTED = val | |
| wifi_led.value(val) | |
| while True: | |
| do_wifi_check(); | |
| time.sleep(0.5) | |
| # raspberry pi on board led controlling: | |
| # https://raspberrypi.stackexchange.com/a/1504/74294 | |
| import os | |
| from express import run, ExpressServerFactory as Factory | |
| from BaseHTTPServer import HTTPServer | |
| import json | |
| class CBServer(HTTPServer): | |
| def server_activate(self): | |
| HTTPServer.server_activate(self) | |
| self.RequestHandlerClass.post_start() | |
| def server_close(self): | |
| HTTPServer.server_close(self) | |
| self.RequestHandlerClass.post_stop() | |
| sfact = Factory() | |
| def post_start(self): | |
| print 'server is running...' | |
| def process(self): | |
| length = int(self.headers['Content-Length']) | |
| data = self.rfile.read(length) | |
| self.send_response(200, 'Ok') | |
| self.wfile.write('\r\n') | |
| self.wfile.write(data) | |
| req = json.loads(data) | |
| if type(req['value']) == int: | |
| val = req['value'] | |
| if val: | |
| brightness_on() | |
| else: | |
| brightness_off() | |
| def stat(self): | |
| self.send_response(200, 'Ok') | |
| self.wfile.write('\r\n') | |
| self.wfile.write('hello world') | |
| sfact.post('/post', process) | |
| sfact.get('/status', stat) | |
| sclass = sfact.createClass() | |
| def brightness_on(): | |
| os.system("echo 1 > /sys/class/leds/led0/brightness") | |
| def brightness_off(): | |
| os.system("echo 0 > /sys/class/leds/led0/brightness") | |
| class Handler(sclass): | |
| @classmethod | |
| def post_start(self): | |
| print 'setting led trigger...' | |
| os.system('echo none > /sys/class/leds/led0/trigger') | |
| print 'done' | |
| @classmethod | |
| def post_stop(self): | |
| print 'defaulting led trigger...' | |
| os.system('echo mmc0 > /sys/class/leds/led0/trigger') | |
| print 'done' | |
| def on_stop(self): | |
| print 'server closing...' | |
| self.server_close() | |
| print 'server closed' | |
| if __name__ == '__main__': | |
| import sys | |
| if len(sys.argv) == 2: | |
| p = int(sys.argv[1]) | |
| run(port=p, handler_class=Handler, server_class=CBServer, stop_handler=on_stop) | |
| else: | |
| run(handler_class=Handler, server_class=CBServer, stop_handler=on_stop) | |