Skip to content

Instantly share code, notes, and snippets.

@acceptableEngineering
Created January 1, 2026 03:02
Show Gist options
  • Select an option

  • Save acceptableEngineering/541ae58941008d01592469235f3f14a5 to your computer and use it in GitHub Desktop.

Select an option

Save acceptableEngineering/541ae58941008d01592469235f3f14a5 to your computer and use it in GitHub Desktop.
ambient-wx-2-influx.py
#!/usr/bin/env python3
"""
Ambient Weather Station to InfluxDB ingestion script
Fetches live data from local weather station API and writes to InfluxDB
"""
import requests
import json
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from datetime import datetime
import sys
import os
# Configuration
WEATHER_STATION_URL = "http://192.168.1.98/get_livedata_info"
INFLUXDB_URL = os.getenv("INFLUXDB_URL", "http://localhost:8086")
INFLUXDB_TOKEN = "REPLACE_ME"
INFLUXDB_ORG = os.getenv("INFLUXDB_ORG", "REPLACE_ME")
INFLUXDB_BUCKET = os.getenv("INFLUXDB_BUCKET", "REPLACE_ME")
# Field ID mappings for Ambient Weather
FIELD_MAPPING = {
"0x02": "outdoor_temp", # Outdoor temperature
"0x07": "outdoor_humidity", # Outdoor humidity
"0x03": "dewpoint", # Dew point
"0x0B": "wind_speed", # Wind speed
"0x0C": "wind_gust", # Wind gust
"0x19": "max_daily_gust", # Max daily gust
"0x0A": "wind_direction", # Wind direction (degrees)
"0x15": "solar_radiation", # Solar radiation
"0x17": "uv_index", # UV index
"0x0D": "rain_event", # Event rain
"0x0E": "rain_rate", # Rain rate
"0x10": "rain_hourly", # Hourly rain
"0x11": "rain_daily", # Daily rain
"0x12": "rain_weekly", # Weekly rain
"0x13": "rain_monthly", # Monthly rain
}
def parse_value(val_str):
"""Extract numeric value from string, removing units"""
if not val_str or val_str == "---.-":
return None
# Remove common units and convert to float
# Handle inHg BEFORE removing 'in' to avoid breaking it
val_str = val_str.replace("inHg", "").replace("F", "").replace("mph", "")
val_str = val_str.replace("in/Hr", "").replace("in", "").replace("%", "").strip()
try:
return float(val_str)
except ValueError:
return None
def fetch_weather_data():
"""Fetch live data from weather station"""
try:
response = requests.get(WEATHER_STATION_URL, timeout=10)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
print(f"Error fetching weather data: {e}", file=sys.stderr)
return None
def write_to_influxdb(data):
"""Write weather data to InfluxDB"""
if not INFLUXDB_TOKEN:
print("Error: INFLUXDB_TOKEN environment variable not set", file=sys.stderr)
sys.exit(1)
try:
client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG)
write_api = client.write_api(write_options=SYNCHRONOUS)
timestamp = datetime.utcnow()
points = []
# Process common_list fields
for item in data.get("common_list", []):
field_id = item.get("id")
if field_id in FIELD_MAPPING:
value = parse_value(item.get("val"))
if value is not None:
point = Point("weather") \
.tag("station", "ambient") \
.tag("location", "home") \
.field(FIELD_MAPPING[field_id], value) \
.time(timestamp)
points.append(point)
# Process rain data
for item in data.get("rain", []):
field_id = item.get("id")
if field_id in FIELD_MAPPING:
value = parse_value(item.get("val"))
if value is not None:
point = Point("weather") \
.tag("station", "ambient") \
.tag("location", "home") \
.field(FIELD_MAPPING[field_id], value) \
.time(timestamp)
points.append(point)
# Process indoor sensor (wh25)
for item in data.get("wh25", []):
if "intemp" in item:
value = parse_value(item["intemp"])
if value is not None:
points.append(Point("weather")
.tag("station", "ambient")
.tag("location", "indoor")
.field("indoor_temp", value)
.time(timestamp))
if "inhumi" in item:
value = parse_value(item["inhumi"])
if value is not None:
points.append(Point("weather")
.tag("station", "ambient")
.tag("location", "indoor")
.field("indoor_humidity", value)
.time(timestamp))
if "abs" in item:
value = parse_value(item["abs"])
if value is not None:
points.append(Point("weather")
.tag("station", "ambient")
.tag("location", "home")
.field("pressure_absolute", value)
.time(timestamp))
if "rel" in item:
value = parse_value(item["rel"])
if value is not None:
points.append(Point("weather")
.tag("station", "ambient")
.tag("location", "home")
.field("pressure_relative", value)
.time(timestamp))
# Write all points
write_api.write(bucket=INFLUXDB_BUCKET, record=points)
print(f"Successfully wrote {len(points)} data points to InfluxDB")
client.close()
return True
except Exception as e:
print(f"Error writing to InfluxDB: {e}", file=sys.stderr)
return False
def main():
"""Main execution"""
print(f"Fetching weather data from {WEATHER_STATION_URL}")
data = fetch_weather_data()
if not data:
sys.exit(1)
print(f"Writing data to InfluxDB at {INFLUXDB_URL}")
success = write_to_influxdb(data)
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment