-
-
Save jk/dc58ef3759b1d61f021281531858a2a9 to your computer and use it in GitHub Desktop.
| # -*- coding: utf-8 -*- | |
| """Test using the InfluxDB client.""" | |
| import time | |
| import colorsys | |
| import os | |
| import sys | |
| import socket | |
| import ST7735 | |
| import ltr559 | |
| from bme280 import BME280 | |
| from enviroplus import gas | |
| from subprocess import PIPE, Popen | |
| from PIL import Image | |
| from PIL import ImageDraw | |
| from PIL import ImageFont | |
| from influxdb import InfluxDBClient | |
| try: | |
| from smbus2 import SMBus | |
| except ImportError: | |
| from smbus import SMBus | |
| print(""" | |
| Press Ctrl+C to exit! | |
| """) | |
| # BME280 temperature/pressure/humidity sensor | |
| bus = SMBus(1) | |
| bme280 = BME280(i2c_dev=bus) | |
| # Create ST7735 LCD display class | |
| st7735 = ST7735.ST7735( | |
| port=0, | |
| cs=1, | |
| dc=9, | |
| backlight=12, | |
| rotation=270, | |
| spi_speed_hz=10000000 | |
| ) | |
| # Initialize display | |
| st7735.begin() | |
| WIDTH = st7735.width | |
| HEIGHT = st7735.height | |
| # Set up canvas and font | |
| img = Image.new('RGB', (WIDTH, HEIGHT), color=(0, 0, 0)) | |
| draw = ImageDraw.Draw(img) | |
| path = os.path.dirname(os.path.realpath(__file__)) | |
| font = ImageFont.truetype(path + "/fonts/Asap/Asap-Bold.ttf", 20) | |
| # Set up InfluxDB | |
| influx = InfluxDBClient(host="influx.domain.tld", | |
| username="username", | |
| password="secret", | |
| database="enviroplus") | |
| influx_json_prototyp = [ | |
| { | |
| "measurement": "enviroplus", | |
| "tags": { | |
| "host": "enviroplus" | |
| }, | |
| "fields": { | |
| } | |
| } | |
| ] | |
| # The position of the top bar | |
| top_pos = 25 | |
| # Displays data and text on the 0.96" LCD | |
| def display_text(variable, data, unit): | |
| # Maintain length of list | |
| values[variable] = values[variable][1:] + [data] | |
| # Scale the values for the variable between 0 and 1 | |
| colours = [(v - min(values[variable]) + 1) / (max(values[variable]) | |
| - min(values[variable]) + 1) for v in values[variable]] | |
| # Format the variable name and value | |
| message = "{}: {:.1f} {}".format(variable[:4], data, unit) | |
| print(message) | |
| draw.rectangle((0, 0, WIDTH, HEIGHT), (255, 255, 255)) | |
| for i in range(len(colours)): | |
| # Convert the values to colours from red to blue | |
| colour = (1.0 - colours[i]) * 0.6 | |
| r, g, b = [int(x * 255.0) for x in colorsys.hsv_to_rgb(colour, | |
| 1.0, 1.0)] | |
| # Draw a 1-pixel wide rectangle of colour | |
| draw.rectangle((i, top_pos, i+1, HEIGHT), (r, g, b)) | |
| # Draw a line graph in black | |
| line_y = HEIGHT - (top_pos + (colours[i] * (HEIGHT - top_pos)))\ | |
| + top_pos | |
| draw.rectangle((i, line_y, i+1, line_y+1), (0, 0, 0)) | |
| # Write the text at the top in black | |
| draw.text((0, 0), message, font=font, fill=(0, 0, 0)) | |
| st7735.display(img) | |
| # Get the temperature of the CPU for compensation | |
| def get_cpu_temperature(): | |
| process = Popen(['vcgencmd', 'measure_temp'], stdout=PIPE) | |
| output, _error = process.communicate() | |
| output = output.decode() | |
| return float(output[output.index('=') + 1:output.rindex("'")]) | |
| # Get local IP address | |
| def get_ip(): | |
| s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
| try: | |
| # doesn't even have to be reachable | |
| s.connect(('10.255.255.255', 1)) | |
| IP = s.getsockname()[0] | |
| except: | |
| IP = '127.0.0.1' | |
| finally: | |
| s.close() | |
| return IP | |
| # Tuning factor for compensation. Decrease this number to adjust the | |
| # temperature down, and increase to adjust up | |
| factor = 0.8 | |
| cpu_temps = [0] * 5 | |
| delay = 0.5 # Debounce the proximity tap | |
| mode = 0 # The starting mode | |
| last_page = 0 | |
| light = 1 | |
| font_size = 25 | |
| text_colour = (128, 128, 128) | |
| back_colour = (0, 0, 0) | |
| size_x, size_y = draw.textsize(get_ip(), font) | |
| # Calculate text position | |
| x = (WIDTH - size_x) / 2 | |
| y = (HEIGHT / 2) - (size_y / 2) | |
| # Draw background rectangle and write text. | |
| draw.rectangle((0, 0, 160, 80), back_colour) | |
| draw.text((x, y), get_ip(), font=font, fill=text_colour) | |
| st7735.display(img) | |
| # Create a values dict to store the data | |
| variables = ["temperature", | |
| "pressure", | |
| "humidity", | |
| "light", | |
| "oxidised", | |
| "reduced", | |
| "nh3"] | |
| values = {} | |
| for v in variables: | |
| values[v] = [1] * WIDTH | |
| # The main loop | |
| try: | |
| iterations = 0 | |
| while True: | |
| proximity = ltr559.get_proximity() | |
| # Compensated Temperature | |
| cpu_temp = get_cpu_temperature() | |
| # Smooth out with some averaging to decrease jitter | |
| cpu_temps = cpu_temps[1:] + [cpu_temp] | |
| avg_cpu_temp = sum(cpu_temps) / float(len(cpu_temps)) | |
| raw_temp = bme280.get_temperature() | |
| compensated_temp = raw_temp - ((avg_cpu_temp - raw_temp) / factor) | |
| # Change json | |
| influx_json_prototyp[0]['fields']['ltr559.proximity'] = proximity | |
| if proximity < 10: | |
| influx_json_prototyp[0]['fields']['ltr559.lux'] = ltr559.get_lux() | |
| else: | |
| influx_json_prototyp[0]['fields']['ltr559.lux'] = 1.0 | |
| if iterations >= 6: | |
| influx_json_prototyp[0]['fields']['bme280.temperature.raw'] = bme280.get_temperature() | |
| influx_json_prototyp[0]['fields']['bme280.temperature.compensated'] = compensated_temp | |
| influx_json_prototyp[0]['fields']['cpu.temperature'] = get_cpu_temperature() | |
| influx_json_prototyp[0]['fields']['bme280.pressure'] = bme280.get_pressure() | |
| influx_json_prototyp[0]['fields']['bme280.humidity'] = bme280.get_humidity() | |
| gas_data = gas.read_all() | |
| influx_json_prototyp[0]['fields']['mics6814.oxidising'] = gas_data.oxidising | |
| influx_json_prototyp[0]['fields']['mics6814.reducing'] = gas_data.reducing | |
| influx_json_prototyp[0]['fields']['mics6814.nh3'] = gas_data.nh3 | |
| if iterations >= 3: | |
| print("Write points: {0}".format(influx_json_prototyp)) | |
| influx.write_points(influx_json_prototyp, retention_policy="rp_1y") | |
| else: | |
| print("Skip iteration: " + str(iterations)) | |
| time.sleep(10) | |
| iterations += 1 | |
| # Exit cleanly | |
| except KeyboardInterrupt: | |
| sys.exit(0) |
Hi, this is great! I have successfully implemented this with my PI.
The one thing that I noticed is that you are not importing any data from a PMS5003 sensor, is this something that you'd be willing and able to add for an upcoming release
Please let me know when you have a chance.
Thank you again for this!
@Esteidinger I don't have any plans to share it with a wider audience, nor will I share my grafana dashboards. I'm not doing anything fancy here for now.
@exchefinma Yeah, I'm able to do this since all the code is in all-in-one.py from the examples, but since I'm not owning a PMS5003 and I'm already measuring particles with ESP8266+SDS011 I'm not motivated to support it my scripts, which I'm just writing for my own.
Thank you @jk. I've got it running and I really appreciate having it.
Hello,
I find this code very interesting. In addition to Enviro+, I also own a PMS5003 and having an integration would be fine.
Thank you
I find this code very interesting. In addition to Enviro+, I also own a PMS5003 and having an integration would be fine.
@FValent This isn't »a wish come true«. I share my code, nothing more. If you need something else implemented, you've to implement it on your own.
Great starting point !
Thanks for sharing this @jk, I've setup my influxdb and grafana but been always short in time to put together the code you've shared here. Very helpful thank you :-)
This is brilliant thank you.
If you choose to share this with a wider audience.. some comments.
I installed the influxdb python package with pip install influxdb
You have a hard coded value for retention_policy on line 204 which I had to remove to work in my setup.
I also need to figure out how to get these values into fahrenheit.
Do you have any grafana graphs you would like to share?