|
import json |
|
import sys |
|
from collections import defaultdict |
|
from typing import Any, NewType |
|
|
|
GeoJSONGeometry = NewType('GeoJSONGeometry', dict[str, Any]) |
|
|
|
Node = NewType('Node', tuple[float, float]) |
|
Edge = NewType('Edge', tuple[Node, Node]) |
|
Way = NewType('Way', list[Node]) # Like OSM Way |
|
|
|
|
|
def load_geometries_from_geojson(filename: str) -> list[GeoJSONGeometry]: |
|
data = json.load(open(filename, 'r')) |
|
|
|
geojson_geometries = [] |
|
for feature in data['features']: |
|
if 'geometries' in feature['geometry']: |
|
for geometry in feature['geometry']['geometries']: |
|
geojson_geometries.append(geometry) |
|
else: |
|
geojson_geometries.append(feature['geometry']) |
|
|
|
return geojson_geometries |
|
|
|
|
|
def create_edges( |
|
geometries: list[GeoJSONGeometry] |
|
) -> tuple[set[Edge], dict[Node, set[Node]]]: |
|
""" |
|
:return: tuple of splitted_edges and edges_by_nodes. |
|
|
|
splitted_edges – unique Edge type create between each to connected |
|
nodes in the geometries. |
|
|
|
edges_by_nodes – allows to get all connected edges to specific node: |
|
- Key is node which create edges |
|
- Values are second (end) nodes of edges – len(values) == number of edges |
|
""" |
|
splitted_edges: set[Edge] = set() |
|
edges_by_nodes: dict[Node, set[Node]] = defaultdict(set) |
|
|
|
for geometry in geometries: |
|
ways: list[Way] = [] |
|
if geometry['type'] == 'Polygon': |
|
for ring in geometry['coordinates']: |
|
ways.append(ring) |
|
elif geometry['type'] == 'MultiPolygon': |
|
for polygon in geometry['coordinates']: |
|
for ring in polygon: |
|
ways.append(ring) |
|
else: |
|
raise NotImplementedError( |
|
f'Unsupported geometry type {geometry["type"]}' |
|
) |
|
|
|
for way in ways: |
|
for i in range(len(way) - 1): |
|
node1 = Node(tuple(way[i])) |
|
node2 = Node(tuple(way[i + 1])) |
|
|
|
# keep sorted to avoid duplicaton from reversed geometries |
|
if node1[0] < node2[0]: |
|
node1, node2 = node2, node1 |
|
|
|
splitted_edges.add(Edge((node1, node2))) |
|
|
|
# fill edge by nodes to merge step |
|
edges_by_nodes[node1].add(node2) |
|
edges_by_nodes[node2].add(node1) |
|
|
|
return splitted_edges, edges_by_nodes |
|
|
|
|
|
def merge_edges_to_unique_ways( |
|
splitted_edges: set[Edge], |
|
edges_by_nodes: dict[Node, set[Node]] |
|
) -> list[Way]: |
|
def get_forward_nodes(way: Way) -> list[Node]: |
|
return [node for node in edges_by_nodes[way[-1]] if node != way[-2]] |
|
|
|
def get_backward_nodes(way: Way) -> list[Node]: |
|
return [node for node in edges_by_nodes[way[0]] if node != way[1]] |
|
|
|
unique_ways: list[Way] = [] |
|
current_way = Way((list(splitted_edges.pop()))) |
|
while True: |
|
if current_way[0] == current_way[-1]: # cycle completed |
|
unique_ways.append(current_way) |
|
if splitted_edges: |
|
current_way = Way((list(splitted_edges.pop()))) |
|
else: |
|
break |
|
|
|
forward_nodes = get_forward_nodes(current_way) |
|
if len(forward_nodes) == 1: |
|
last_node = current_way[-1] |
|
current_way.append(forward_nodes[0]) |
|
# Remove in 2 variants to include reversed edges |
|
splitted_edges.discard(Edge((forward_nodes[0], last_node))) |
|
splitted_edges.discard(Edge((last_node, forward_nodes[0]))) |
|
continue |
|
|
|
backward_nodes = get_backward_nodes(current_way) |
|
if len(backward_nodes) == 1: |
|
first_node = current_way[0] |
|
current_way.insert(0, backward_nodes[0]) |
|
# Remove in 2 variants to include reversed edges |
|
splitted_edges.discard(Edge((backward_nodes[0], first_node))) |
|
splitted_edges.discard(Edge((first_node, backward_nodes[0]))) |
|
continue |
|
|
|
if splitted_edges: |
|
unique_ways.append(current_way) |
|
current_way = Way((list(splitted_edges.pop()))) |
|
else: |
|
break |
|
|
|
return unique_ways |
|
|
|
|
|
def export_to_geojson_file(filename: str, unique_ways: list[Way]) -> None: |
|
features = [] |
|
for unique_way in unique_ways: |
|
if unique_way[0] != unique_way[-1]: |
|
features.append({ |
|
'type': 'Feature', |
|
'properties': {}, |
|
'geometry': { |
|
'type': "LineString", |
|
'coordinates': unique_way |
|
} |
|
}) |
|
else: |
|
features.append({ |
|
'type': 'Feature', |
|
'properties': {}, |
|
'geometry': { |
|
'type': "Polygon", |
|
'coordinates': [unique_way] |
|
} |
|
}) |
|
|
|
output_data = {'type': 'FeatureCollection', 'features': features} |
|
json.dump(output_data, open(filename, 'w'), indent=2) |
|
|
|
|
|
def main(): |
|
input_filename = sys.argv[1] |
|
output_filename = f'result_{input_filename}' |
|
|
|
geojson_geometries = load_geometries_from_geojson(input_filename) |
|
splitted_edges, edges_by_node = create_edges(geojson_geometries) |
|
unique_ways = merge_edges_to_unique_ways(splitted_edges, edges_by_node) |
|
export_to_geojson_file(output_filename, unique_ways) |
|
|
|
|
|
if __name__ == '__main__': |
|
main() |