Created
January 1, 2026 03:02
-
-
Save acceptableEngineering/541ae58941008d01592469235f3f14a5 to your computer and use it in GitHub Desktop.
ambient-wx-2-influx.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Ambient Weather Station to InfluxDB ingestion script | |
| Fetches live data from local weather station API and writes to InfluxDB | |
| """ | |
| import requests | |
| import json | |
| from influxdb_client import InfluxDBClient, Point | |
| from influxdb_client.client.write_api import SYNCHRONOUS | |
| from datetime import datetime | |
| import sys | |
| import os | |
| # Configuration | |
| WEATHER_STATION_URL = "http://192.168.1.98/get_livedata_info" | |
| INFLUXDB_URL = os.getenv("INFLUXDB_URL", "http://localhost:8086") | |
| INFLUXDB_TOKEN = "REPLACE_ME" | |
| INFLUXDB_ORG = os.getenv("INFLUXDB_ORG", "REPLACE_ME") | |
| INFLUXDB_BUCKET = os.getenv("INFLUXDB_BUCKET", "REPLACE_ME") | |
| # Field ID mappings for Ambient Weather | |
| FIELD_MAPPING = { | |
| "0x02": "outdoor_temp", # Outdoor temperature | |
| "0x07": "outdoor_humidity", # Outdoor humidity | |
| "0x03": "dewpoint", # Dew point | |
| "0x0B": "wind_speed", # Wind speed | |
| "0x0C": "wind_gust", # Wind gust | |
| "0x19": "max_daily_gust", # Max daily gust | |
| "0x0A": "wind_direction", # Wind direction (degrees) | |
| "0x15": "solar_radiation", # Solar radiation | |
| "0x17": "uv_index", # UV index | |
| "0x0D": "rain_event", # Event rain | |
| "0x0E": "rain_rate", # Rain rate | |
| "0x10": "rain_hourly", # Hourly rain | |
| "0x11": "rain_daily", # Daily rain | |
| "0x12": "rain_weekly", # Weekly rain | |
| "0x13": "rain_monthly", # Monthly rain | |
| } | |
| def parse_value(val_str): | |
| """Extract numeric value from string, removing units""" | |
| if not val_str or val_str == "---.-": | |
| return None | |
| # Remove common units and convert to float | |
| # Handle inHg BEFORE removing 'in' to avoid breaking it | |
| val_str = val_str.replace("inHg", "").replace("F", "").replace("mph", "") | |
| val_str = val_str.replace("in/Hr", "").replace("in", "").replace("%", "").strip() | |
| try: | |
| return float(val_str) | |
| except ValueError: | |
| return None | |
| def fetch_weather_data(): | |
| """Fetch live data from weather station""" | |
| try: | |
| response = requests.get(WEATHER_STATION_URL, timeout=10) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.RequestException as e: | |
| print(f"Error fetching weather data: {e}", file=sys.stderr) | |
| return None | |
| def write_to_influxdb(data): | |
| """Write weather data to InfluxDB""" | |
| if not INFLUXDB_TOKEN: | |
| print("Error: INFLUXDB_TOKEN environment variable not set", file=sys.stderr) | |
| sys.exit(1) | |
| try: | |
| client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG) | |
| write_api = client.write_api(write_options=SYNCHRONOUS) | |
| timestamp = datetime.utcnow() | |
| points = [] | |
| # Process common_list fields | |
| for item in data.get("common_list", []): | |
| field_id = item.get("id") | |
| if field_id in FIELD_MAPPING: | |
| value = parse_value(item.get("val")) | |
| if value is not None: | |
| point = Point("weather") \ | |
| .tag("station", "ambient") \ | |
| .tag("location", "home") \ | |
| .field(FIELD_MAPPING[field_id], value) \ | |
| .time(timestamp) | |
| points.append(point) | |
| # Process rain data | |
| for item in data.get("rain", []): | |
| field_id = item.get("id") | |
| if field_id in FIELD_MAPPING: | |
| value = parse_value(item.get("val")) | |
| if value is not None: | |
| point = Point("weather") \ | |
| .tag("station", "ambient") \ | |
| .tag("location", "home") \ | |
| .field(FIELD_MAPPING[field_id], value) \ | |
| .time(timestamp) | |
| points.append(point) | |
| # Process indoor sensor (wh25) | |
| for item in data.get("wh25", []): | |
| if "intemp" in item: | |
| value = parse_value(item["intemp"]) | |
| if value is not None: | |
| points.append(Point("weather") | |
| .tag("station", "ambient") | |
| .tag("location", "indoor") | |
| .field("indoor_temp", value) | |
| .time(timestamp)) | |
| if "inhumi" in item: | |
| value = parse_value(item["inhumi"]) | |
| if value is not None: | |
| points.append(Point("weather") | |
| .tag("station", "ambient") | |
| .tag("location", "indoor") | |
| .field("indoor_humidity", value) | |
| .time(timestamp)) | |
| if "abs" in item: | |
| value = parse_value(item["abs"]) | |
| if value is not None: | |
| points.append(Point("weather") | |
| .tag("station", "ambient") | |
| .tag("location", "home") | |
| .field("pressure_absolute", value) | |
| .time(timestamp)) | |
| if "rel" in item: | |
| value = parse_value(item["rel"]) | |
| if value is not None: | |
| points.append(Point("weather") | |
| .tag("station", "ambient") | |
| .tag("location", "home") | |
| .field("pressure_relative", value) | |
| .time(timestamp)) | |
| # Write all points | |
| write_api.write(bucket=INFLUXDB_BUCKET, record=points) | |
| print(f"Successfully wrote {len(points)} data points to InfluxDB") | |
| client.close() | |
| return True | |
| except Exception as e: | |
| print(f"Error writing to InfluxDB: {e}", file=sys.stderr) | |
| return False | |
| def main(): | |
| """Main execution""" | |
| print(f"Fetching weather data from {WEATHER_STATION_URL}") | |
| data = fetch_weather_data() | |
| if not data: | |
| sys.exit(1) | |
| print(f"Writing data to InfluxDB at {INFLUXDB_URL}") | |
| success = write_to_influxdb(data) | |
| sys.exit(0 if success else 1) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment