Skip to content

Instantly share code, notes, and snippets.

@deton
Last active November 9, 2025 12:01
Show Gist options
  • Select an option

  • Save deton/9eb25f9b938ba805842a6bb0bb63b6c9 to your computer and use it in GitHub Desktop.

Select an option

Save deton/9eb25f9b938ba805842a6bb0bb63b6c9 to your computer and use it in GitHub Desktop.
lines intersect polygon
# Count trip entries into a polygon in 10-minute bins.
# (Generated using GitHub Copilot)
import argparse
import json
from datetime import datetime
from shapely.geometry import shape, Point, LineString
import pandas as pd
from typing import NamedTuple, List
from geopy.distance import geodesic
TIMEFMT = '%Y-%m-%dT%H:%M'
class EntryEvent(NamedTuple):
"""Represents a polygon entry event with time and speed information"""
time: datetime
trip_id: str
speed_kmh: float
def calculate_speed(coord1: List[float], coord2: List[float],
time1_ms: int, time2_ms: int) -> float:
"""Calculate speed between two points in kilometers per hour using geopy's geodesic distance"""
# Extract coordinates (geopy expects (lat, lon) order)
point1 = (coord1[1], coord1[0]) # Convert from (lon, lat) to (lat, lon)
point2 = (coord2[1], coord2[0])
distance_km = geodesic(point1, point2).kilometers
time_diff_hours = (time2_ms - time1_ms) / (1000.0 * 3600.0)
if time_diff_hours <= 0:
return 0.0
return distance_km / time_diff_hours
def load_geojson(file_path):
with open(file_path, 'r') as f:
return json.load(f)
def main(polygon_path: str, trips_path: str, output_csv: str):
polygon_data = load_geojson(polygon_path)
polygon = shape(polygon_data['features'][0]['geometry'])
trips_data = load_geojson(trips_path)
# Store entry events with time and speed
entry_events: List[EntryEvent] = []
for i, feature in enumerate(trips_data['features'], 1):
coordinates = feature['geometry']['coordinates']
times = feature['properties']['times']
trip_id = f"trip_{i}"
# Process coordinate points and their timestamps
# Track whether the previous point was inside the polygon so we only count entries
if len(coordinates) < 2:
continue
prev_inside = polygon.contains(Point(coordinates[0]))
for i in range(len(coordinates) - 1):
coord1 = coordinates[i]
coord2 = coordinates[i + 1]
t1 = times[i]
t2 = times[i + 1]
point1 = Point(coord1)
point2 = Point(coord2)
line = LineString([coord1, coord2])
event_time_ms = None
# Condition to detect an entry: previous point is outside, and the segment
# either intersects the polygon or the end point is inside
is_point2_inside = polygon.contains(point2)
intersects = polygon.intersects(line)
if (not prev_inside) and (intersects or is_point2_inside):
# Compute intersection between the segment and the polygon
inter = polygon.intersection(line)
pts = []
if inter.is_empty:
pts = []
elif inter.geom_type == 'Point':
pts = [inter]
elif inter.geom_type in ('MultiPoint', 'GeometryCollection'):
for g in inter:
if g.geom_type == 'Point':
pts.append(g)
elif inter.geom_type == 'LineString':
try:
pts = [Point(inter.coords[0])]
except Exception:
pts = []
else:
try:
for g in inter:
if hasattr(g, 'coords') and len(g.coords) > 0:
pts.append(Point(g.coords[0]))
except Exception:
pts = []
earliest_proj = None
for p in pts:
try:
proj = line.project(p)
if earliest_proj is None or proj < earliest_proj:
earliest_proj = proj
except Exception:
continue
if earliest_proj is not None and line.length > 0:
frac = earliest_proj / line.length
event_time_ms = int(t1 + frac * (t2 - t1))
# Calculate entry speed using the segment's points
entry_speed = calculate_speed(coord1, coord2, t1, t2)
else:
# If no intersection point could be extracted, fall back to the end-point time
event_time_ms = t2 if is_point2_inside else None
entry_speed = calculate_speed(coord1, coord2, t1, t2) if is_point2_inside else None
# If an event time was found, store the entry event
if event_time_ms is not None:
dt = datetime.fromtimestamp(event_time_ms / 1000.0)
entry_events.append(EntryEvent(
time=dt,
trip_id=trip_id,
speed_kmh=entry_speed if entry_speed is not None else 0.0
))
# Update prev_inside for the next segment (based on whether end point is inside)
prev_inside = is_point2_inside
# Convert entry events to DataFrame
df = pd.DataFrame([
{
'Time': evt.time.strftime(TIMEFMT),
'TripID': evt.trip_id,
'Speed_kmh': round(evt.speed_kmh, 1)
}
for evt in sorted(entry_events, key=lambda x: x.time)
])
# Add count per 10-minute bin
df['Time_10min'] = df['Time'].apply(
lambda x: datetime.strptime(x, TIMEFMT).replace(
minute=(datetime.strptime(x, TIMEFMT).minute // 10 * 10),
second=0,
microsecond=0
).strftime(TIMEFMT)
)
# Calculate statistics per 10-minute bin
stats_df = df.groupby('Time_10min').agg({
'TripID': 'count',
'Speed_kmh': ['mean', 'min', 'max']
}).round(1)
stats_df.columns = ['Count', 'Avg_Speed_kmh', 'Min_Speed_kmh', 'Max_Speed_kmh']
stats_df = stats_df.reset_index()
# Save detailed events to CSV
df.to_csv(output_csv, index=False)
# Save statistics to a separate CSV
stats_csv = output_csv.replace('.csv', '_stats.csv')
stats_df.to_csv(stats_csv, index=False)
def parse_args():
p = argparse.ArgumentParser(description='Count trip entries into a polygon in 10-minute bins')
p.add_argument('--polygon', '-p', default='polygon.geojson', help='Path to polygon GeoJSON (default: polygon.geojson)')
p.add_argument('--trips', '-t', default='trips-v7.geojson', help='Path to trips TimestampedGeoJSON (default: trips-v7.geojson)')
p.add_argument('--output', '-o', default='polygon_intersection_counts.csv', help='Output CSV path (default: polygon_intersection_counts.csv)')
return p.parse_args()
if __name__ == "__main__":
args = parse_args()
main(args.polygon, args.trips, args.output)
# lines intersect polygon
# https://qiita.com/nabe189/items/dde80b8494456737f42f
import argparse
import geopandas as gpd
import shapely
parser = argparse.ArgumentParser()
parser.add_argument("lines_file", help="GeoJSON file of LineString FeatureCollection")
parser.add_argument("polygon_file", help="GeoJSON file of a Polygon")
parser.add_argument("-o", "--output_file", help="output filename", default="intersects.geojson")
args = parser.parse_args()
lines_gdf = gpd.read_file(args.lines_file, encoding="utf-8")
with open(args.polygon_file) as f:
polygon = shapely.from_geojson(f.read())
lines_gdf["intersects"] = lines_gdf["geometry"].intersects(polygon)
lines_gdf[lines_gdf["intersects"]].drop(columns=["intersects"]).to_file(args.output_file, driver="GeoJSON")
import datetime
import json
import sys
fname = sys.argv[1]
outfname = f"keplerglTrip_{fname}"
with open(fname) as f:
geojson = json.load(f)
for feature in geojson["features"]:
if feature["geometry"]["type"] != "LineString":
continue
times = feature["properties"]["times"]
coords = feature["geometry"]["coordinates"]
for i, lonlat in enumerate(coords):
if len(lonlat) > 4: # already has timestamp?
continue
if len(lonlat) == 2:
lonlat.append(0) # altitude
if len(lonlat) == 3:
ts = times[i]
if type(ts) is str:
ts = datetime.datetime.fromisoformat(ts).timestamp() # TODO: other format
lonlat.append(ts)
del feature["properties"]["times"]
with open(outfname, "w", encoding="utf-8") as f:
json.dump(geojson, f)
import datetime
import json
# https://deck.gl/examples/trips-layer
# Trips are taken from June 16, 2016 21:00 to 21:30
basets = datetime.datetime(2016, 6, 16, 21, 00).timestamp()
# https://github.com/visgl/deck.gl-data/blob/master/examples/trips/trips-v7.json
with open("trips-v7.json") as f:
obj = json.load(f)
features = [{
"type": "Feature",
"properties": {
"vendor": x["vendor"],
"times": [(basets + t) * 1000 for t in x["timestamps"]], # [ms]
},
"geometry": {
"type": "LineString",
"coordinates": x["path"],
}
} for x in obj]
geojson = {"type": "FeatureCollection", "features": features}
with open("trips-v7.geojson", "w") as f:
json.dump(geojson, f)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment