Last active
November 9, 2025 12:01
-
-
Save deton/9eb25f9b938ba805842a6bb0bb63b6c9 to your computer and use it in GitHub Desktop.
lines intersect polygon
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Count trip entries into a polygon in 10-minute bins. | |
| # (Generated using GitHub Copilot) | |
| import argparse | |
| import json | |
| from datetime import datetime | |
| from shapely.geometry import shape, Point, LineString | |
| import pandas as pd | |
| from typing import NamedTuple, List | |
| from geopy.distance import geodesic | |
| TIMEFMT = '%Y-%m-%dT%H:%M' | |
| class EntryEvent(NamedTuple): | |
| """Represents a polygon entry event with time and speed information""" | |
| time: datetime | |
| trip_id: str | |
| speed_kmh: float | |
| def calculate_speed(coord1: List[float], coord2: List[float], | |
| time1_ms: int, time2_ms: int) -> float: | |
| """Calculate speed between two points in kilometers per hour using geopy's geodesic distance""" | |
| # Extract coordinates (geopy expects (lat, lon) order) | |
| point1 = (coord1[1], coord1[0]) # Convert from (lon, lat) to (lat, lon) | |
| point2 = (coord2[1], coord2[0]) | |
| distance_km = geodesic(point1, point2).kilometers | |
| time_diff_hours = (time2_ms - time1_ms) / (1000.0 * 3600.0) | |
| if time_diff_hours <= 0: | |
| return 0.0 | |
| return distance_km / time_diff_hours | |
| def load_geojson(file_path): | |
| with open(file_path, 'r') as f: | |
| return json.load(f) | |
| def main(polygon_path: str, trips_path: str, output_csv: str): | |
| polygon_data = load_geojson(polygon_path) | |
| polygon = shape(polygon_data['features'][0]['geometry']) | |
| trips_data = load_geojson(trips_path) | |
| # Store entry events with time and speed | |
| entry_events: List[EntryEvent] = [] | |
| for i, feature in enumerate(trips_data['features'], 1): | |
| coordinates = feature['geometry']['coordinates'] | |
| times = feature['properties']['times'] | |
| trip_id = f"trip_{i}" | |
| # Process coordinate points and their timestamps | |
| # Track whether the previous point was inside the polygon so we only count entries | |
| if len(coordinates) < 2: | |
| continue | |
| prev_inside = polygon.contains(Point(coordinates[0])) | |
| for i in range(len(coordinates) - 1): | |
| coord1 = coordinates[i] | |
| coord2 = coordinates[i + 1] | |
| t1 = times[i] | |
| t2 = times[i + 1] | |
| point1 = Point(coord1) | |
| point2 = Point(coord2) | |
| line = LineString([coord1, coord2]) | |
| event_time_ms = None | |
| # Condition to detect an entry: previous point is outside, and the segment | |
| # either intersects the polygon or the end point is inside | |
| is_point2_inside = polygon.contains(point2) | |
| intersects = polygon.intersects(line) | |
| if (not prev_inside) and (intersects or is_point2_inside): | |
| # Compute intersection between the segment and the polygon | |
| inter = polygon.intersection(line) | |
| pts = [] | |
| if inter.is_empty: | |
| pts = [] | |
| elif inter.geom_type == 'Point': | |
| pts = [inter] | |
| elif inter.geom_type in ('MultiPoint', 'GeometryCollection'): | |
| for g in inter: | |
| if g.geom_type == 'Point': | |
| pts.append(g) | |
| elif inter.geom_type == 'LineString': | |
| try: | |
| pts = [Point(inter.coords[0])] | |
| except Exception: | |
| pts = [] | |
| else: | |
| try: | |
| for g in inter: | |
| if hasattr(g, 'coords') and len(g.coords) > 0: | |
| pts.append(Point(g.coords[0])) | |
| except Exception: | |
| pts = [] | |
| earliest_proj = None | |
| for p in pts: | |
| try: | |
| proj = line.project(p) | |
| if earliest_proj is None or proj < earliest_proj: | |
| earliest_proj = proj | |
| except Exception: | |
| continue | |
| if earliest_proj is not None and line.length > 0: | |
| frac = earliest_proj / line.length | |
| event_time_ms = int(t1 + frac * (t2 - t1)) | |
| # Calculate entry speed using the segment's points | |
| entry_speed = calculate_speed(coord1, coord2, t1, t2) | |
| else: | |
| # If no intersection point could be extracted, fall back to the end-point time | |
| event_time_ms = t2 if is_point2_inside else None | |
| entry_speed = calculate_speed(coord1, coord2, t1, t2) if is_point2_inside else None | |
| # If an event time was found, store the entry event | |
| if event_time_ms is not None: | |
| dt = datetime.fromtimestamp(event_time_ms / 1000.0) | |
| entry_events.append(EntryEvent( | |
| time=dt, | |
| trip_id=trip_id, | |
| speed_kmh=entry_speed if entry_speed is not None else 0.0 | |
| )) | |
| # Update prev_inside for the next segment (based on whether end point is inside) | |
| prev_inside = is_point2_inside | |
| # Convert entry events to DataFrame | |
| df = pd.DataFrame([ | |
| { | |
| 'Time': evt.time.strftime(TIMEFMT), | |
| 'TripID': evt.trip_id, | |
| 'Speed_kmh': round(evt.speed_kmh, 1) | |
| } | |
| for evt in sorted(entry_events, key=lambda x: x.time) | |
| ]) | |
| # Add count per 10-minute bin | |
| df['Time_10min'] = df['Time'].apply( | |
| lambda x: datetime.strptime(x, TIMEFMT).replace( | |
| minute=(datetime.strptime(x, TIMEFMT).minute // 10 * 10), | |
| second=0, | |
| microsecond=0 | |
| ).strftime(TIMEFMT) | |
| ) | |
| # Calculate statistics per 10-minute bin | |
| stats_df = df.groupby('Time_10min').agg({ | |
| 'TripID': 'count', | |
| 'Speed_kmh': ['mean', 'min', 'max'] | |
| }).round(1) | |
| stats_df.columns = ['Count', 'Avg_Speed_kmh', 'Min_Speed_kmh', 'Max_Speed_kmh'] | |
| stats_df = stats_df.reset_index() | |
| # Save detailed events to CSV | |
| df.to_csv(output_csv, index=False) | |
| # Save statistics to a separate CSV | |
| stats_csv = output_csv.replace('.csv', '_stats.csv') | |
| stats_df.to_csv(stats_csv, index=False) | |
| def parse_args(): | |
| p = argparse.ArgumentParser(description='Count trip entries into a polygon in 10-minute bins') | |
| p.add_argument('--polygon', '-p', default='polygon.geojson', help='Path to polygon GeoJSON (default: polygon.geojson)') | |
| p.add_argument('--trips', '-t', default='trips-v7.geojson', help='Path to trips TimestampedGeoJSON (default: trips-v7.geojson)') | |
| p.add_argument('--output', '-o', default='polygon_intersection_counts.csv', help='Output CSV path (default: polygon_intersection_counts.csv)') | |
| return p.parse_args() | |
| if __name__ == "__main__": | |
| args = parse_args() | |
| main(args.polygon, args.trips, args.output) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # lines intersect polygon | |
| # https://qiita.com/nabe189/items/dde80b8494456737f42f | |
| import argparse | |
| import geopandas as gpd | |
| import shapely | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("lines_file", help="GeoJSON file of LineString FeatureCollection") | |
| parser.add_argument("polygon_file", help="GeoJSON file of a Polygon") | |
| parser.add_argument("-o", "--output_file", help="output filename", default="intersects.geojson") | |
| args = parser.parse_args() | |
| lines_gdf = gpd.read_file(args.lines_file, encoding="utf-8") | |
| with open(args.polygon_file) as f: | |
| polygon = shapely.from_geojson(f.read()) | |
| lines_gdf["intersects"] = lines_gdf["geometry"].intersects(polygon) | |
| lines_gdf[lines_gdf["intersects"]].drop(columns=["intersects"]).to_file(args.output_file, driver="GeoJSON") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import datetime | |
| import json | |
| import sys | |
| fname = sys.argv[1] | |
| outfname = f"keplerglTrip_{fname}" | |
| with open(fname) as f: | |
| geojson = json.load(f) | |
| for feature in geojson["features"]: | |
| if feature["geometry"]["type"] != "LineString": | |
| continue | |
| times = feature["properties"]["times"] | |
| coords = feature["geometry"]["coordinates"] | |
| for i, lonlat in enumerate(coords): | |
| if len(lonlat) > 4: # already has timestamp? | |
| continue | |
| if len(lonlat) == 2: | |
| lonlat.append(0) # altitude | |
| if len(lonlat) == 3: | |
| ts = times[i] | |
| if type(ts) is str: | |
| ts = datetime.datetime.fromisoformat(ts).timestamp() # TODO: other format | |
| lonlat.append(ts) | |
| del feature["properties"]["times"] | |
| with open(outfname, "w", encoding="utf-8") as f: | |
| json.dump(geojson, f) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import datetime | |
| import json | |
| # https://deck.gl/examples/trips-layer | |
| # Trips are taken from June 16, 2016 21:00 to 21:30 | |
| basets = datetime.datetime(2016, 6, 16, 21, 00).timestamp() | |
| # https://github.com/visgl/deck.gl-data/blob/master/examples/trips/trips-v7.json | |
| with open("trips-v7.json") as f: | |
| obj = json.load(f) | |
| features = [{ | |
| "type": "Feature", | |
| "properties": { | |
| "vendor": x["vendor"], | |
| "times": [(basets + t) * 1000 for t in x["timestamps"]], # [ms] | |
| }, | |
| "geometry": { | |
| "type": "LineString", | |
| "coordinates": x["path"], | |
| } | |
| } for x in obj] | |
| geojson = {"type": "FeatureCollection", "features": features} | |
| with open("trips-v7.geojson", "w") as f: | |
| json.dump(geojson, f) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment