|
import os, glob |
|
import geopandas as gpd |
|
import pandas as pd |
|
from shapely.geometry import Polygon, Point, LineString |
|
from sqlalchemy import create_engine |
|
|
|
|
|
class CXFConverter: |
|
|
|
# Dizionario decodifica simboli da specifiche catastali |
|
# DIZIONARIO_SIMBOLI = { |
|
# '1': 'Punto di controllo', '2': 'Termine particellare', '3': 'Parametro', |
|
# '4': 'Osso di morto', '5': 'Grande flusso acque', '6': 'Medio flusso acque', |
|
# '7': 'Piccolo flusso acque', '8': 'Fiduciale trigonometrico', '9': 'Graffa piccola', |
|
# '10': 'Ancora', '11': 'Termine provinciale', '13': 'Croce su roccia', |
|
# '14': 'Graffa grande', '15': 'Baffettatura piccola', '16': 'Baffettatura grande', |
|
# '20': 'Fiduciale semplice' |
|
# } |
|
|
|
def __init__(self, file_path, exclude_types=None, input_epsg="EPSG:6707", extra_info=False): |
|
|
|
self.file_path = file_path |
|
# Determina la lista dei file da processare |
|
if os.path.isdir(file_path): |
|
self.files_to_process = glob.glob(os.path.join(file_path, "*.cxf")) |
|
# Gestione file SUP: se siamo in una cartella, leggeremo il SUP corrispondente a ogni file durante il parsing |
|
else: |
|
self.files_to_process = [file_path] |
|
|
|
self.exclude_types = exclude_types or [] |
|
self.input_epsg = input_epsg |
|
self.gdf_sup = None |
|
# Contenitori per i diversi tipi di geometria |
|
self.layers = { |
|
'BORDO': [], # Poligoni (Particelle, Fabbricati, ecc.) |
|
'TESTO': [], # Punti con attributo testo |
|
'SIMBOLO': [], # Punti con codice simbolo |
|
'FIDUCIALE': [], # Punti fiduciali |
|
'LINEA': [] # Linee (archi, bordi di foglio, ecc.) |
|
} |
|
|
|
# Caricamento opzionale della tabella comuni (CSV ISTAT) |
|
if extra_info is True: |
|
self.df_comuni = self.get_mappa_comuni() |
|
else: |
|
self.df_comuni = None |
|
|
|
def _decripta_nome_file(self, filename): |
|
"""Estrae Comune, Sezione, Foglio e Allegato dal nome standard C660A000100""" |
|
name = os.path.splitext(os.path.basename(filename))[0].upper() |
|
meta = { |
|
'comune': name[0:4], |
|
'sezione': name[4:5] if len(name) > 4 else '', |
|
'foglio': name[5:9] if len(name) >= 9 else '0', |
|
'allegato': name[9:11] if len(name) >= 11 else '00', |
|
'file_nome': filename |
|
} |
|
# Pulizia zeri iniziali per il foglio (es: 0001 -> 1) |
|
try: meta['foglio'] = str(int(meta['foglio'])) |
|
except: pass |
|
return meta |
|
|
|
def _parse(self): |
|
for file_cxf in self.files_to_process: |
|
|
|
meta = self._decripta_nome_file(file_cxf) |
|
df_sup = self._sup2gdf(file_cxf) |
|
|
|
with open(file_cxf, 'r', encoding='latin-1') as f: |
|
lines = [line.strip() for line in f if line.strip()] |
|
|
|
i = 0 |
|
while i < len(lines): |
|
tag = lines[i] |
|
|
|
# Salto l'elemento se incluso nella lista di esclusione |
|
if tag in self.exclude_types: |
|
i += 1 |
|
continue |
|
|
|
if tag == "BORDO": |
|
i = self._handle_bordo(lines, i, df_sup, meta) |
|
elif tag == "TESTO": |
|
i = self._handle_testo(lines, i, meta) |
|
elif tag == "SIMBOLO": |
|
i = self._handle_simbolo(lines, i, meta) |
|
elif tag == "FIDUCIALE": |
|
i = self._handle_fiduciale(lines, i, meta) |
|
elif tag == "LINEA": |
|
i = self._handle_linea(lines, i, meta) |
|
else: |
|
i += 1 |
|
|
|
def _handle_bordo(self, lines, i, df_sup, meta): |
|
codice = lines[i+1] |
|
num_isole = int(lines[i+8]) |
|
num_tot_v = int(lines[i+9]) |
|
cursor = i + 10 |
|
isole_v = [int(lines[cursor+j]) for j in range(num_isole)] |
|
cursor += num_isole |
|
coords = [(float(lines[cursor+j*2]), float(lines[cursor+j*2+1])) for j in range(num_tot_v)] |
|
cursor += (num_tot_v * 2) |
|
|
|
if num_isole == 0: |
|
geom = Polygon(coords) |
|
else: |
|
ext_idx = num_tot_v - sum(isole_v) |
|
geom = Polygon(shell=coords[:ext_idx], holes=[coords[ext_idx:ext_idx+v] for v in isole_v]) |
|
|
|
classe = "PARTICELLA" |
|
if codice.endswith('+'): classe = "FABBRICATO" |
|
elif "STRADA" in codice.upper(): classe = "STRADA" |
|
elif "ACQUA" in codice.upper(): classe = "ACQUA" |
|
|
|
data = { |
|
'geometry': geom, 'codice': codice, 'classe': classe, |
|
'comune': meta['comune'], 'foglio': meta['foglio'], |
|
'sezione': meta['sezione'], 'allegato': meta['allegato'] |
|
} |
|
|
|
if df_sup is not None: |
|
sup_match = df_sup[df_sup['codice'] == codice] |
|
if not sup_match.empty: |
|
data['area_nominale'] = sup_match.iloc[0]['area_nominale'] |
|
data['area_grafica'] = geom.area |
|
|
|
self.layers['BORDO'].append(data) |
|
return cursor |
|
|
|
def _handle_testo(self, lines, i, meta): |
|
self.layers['TESTO'].append({ |
|
'geometry': Point(float(lines[i+4]), float(lines[i+5])), |
|
'testo': lines[i+1], |
|
'angolo': float(lines[i+3]), |
|
'foglio': meta['foglio'], |
|
'comune': meta['comune'] # <-- AGGIUNGI QUESTO |
|
}) |
|
return i + 8 |
|
|
|
def _handle_simbolo(self, lines, i, meta): |
|
cod = lines[i+1] |
|
self.layers['SIMBOLO'].append({ |
|
'geometry': Point(float(lines[i+3]), float(lines[i+4])), |
|
'codice_simbolo': cod, |
|
'angolo': float(lines[i+2]), |
|
'foglio': meta['foglio'], |
|
'comune': meta['comune'] # <-- AGGIUNGI QUESTO |
|
}) |
|
return i + 6 |
|
|
|
def _handle_fiduciale(self, lines, i, meta): |
|
self.layers['FIDUCIALE'].append({ |
|
'geometry': Point(float(lines[i+3]), float(lines[i+4])), |
|
'id_fid': lines[i+1], 'foglio': meta['foglio'] |
|
}) |
|
return i + 5 |
|
|
|
def _handle_linea(self, lines, i, meta): |
|
num_v = int(lines[i+2]) |
|
cursor = i + 3 |
|
coords = [(float(lines[cursor+j*2]), float(lines[cursor+j*2+1])) for j in range(num_v)] |
|
self.layers['LINEA'].append({'geometry': LineString(coords), 'foglio': meta['foglio']}) |
|
return cursor + (num_v * 2) |
|
|
|
def _sup2gdf(self, file_path): |
|
""" |
|
Cerca e parsa il file .SUP associato al file .CXF. |
|
Ritorna un DataFrame con colonne ['codice', 'area_sup'] |
|
""" |
|
# Il file SUP ha solitamente lo stesso base-name del CXF |
|
base_path = os.path.splitext(file_path)[0] |
|
sup_path = base_path + ".SUP" |
|
|
|
if not os.path.exists(sup_path): |
|
# print(f"Nota: File SUP non trovato in {sup_path}. Procedo senza dati di superficie.") |
|
return None |
|
|
|
records = [] |
|
with open(sup_path, 'r', encoding='latin-1') as f: |
|
for line in f: |
|
parts = line.strip().split() |
|
if len(parts) >= 2: |
|
# parts[0] è l'identificativo (es. 101, STRADA, ACQUA) |
|
# parts[1] è l'area in mq |
|
try: |
|
records.append({ |
|
'codice': parts[0], |
|
'area_nominale': float(parts[1]) |
|
}) |
|
except ValueError: |
|
continue # Salta righe di intestazione non numeriche se presenti |
|
|
|
return pd.DataFrame(records) |
|
|
|
def get_geodataframe(self, layer_name, target_epsg=None): |
|
data = self.layers.get(layer_name.upper()) |
|
if not data: |
|
return None |
|
|
|
gdf = gpd.GeoDataFrame(data, crs=self.input_epsg) |
|
|
|
# JOIN CON ANAGRAFICA COMUNI |
|
if self.df_comuni is not None and 'comune' in gdf.columns: |
|
# Usiamo 'comune' (es. C660) per il join con 'codice_catastale' |
|
gdf = gdf.merge( |
|
self.df_comuni, |
|
left_on='comune', |
|
right_on='codice_catastale', |
|
how='left' |
|
) |
|
# Rimuoviamo la colonna duplicata dopo il join |
|
if 'codice_catastale' in gdf.columns: |
|
gdf = gdf.drop(columns=['codice_catastale']) |
|
|
|
# Calcolo sbilancio superfici (solo per BORDO) |
|
if layer_name.upper() == 'BORDO' and 'area_nominale' in gdf.columns: |
|
gdf['diff_area'] = gdf['area_nominale'] - gdf.geometry.area |
|
|
|
if target_epsg: |
|
gdf = gdf.to_crs(target_epsg) |
|
|
|
return gdf |
|
|
|
@staticmethod |
|
def get_mappa_comuni(): |
|
""" |
|
Recupera l'elenco dei comuni italiani da una sorgente open data |
|
e restituisce un DataFrame filtrato con i codici catastali. |
|
""" |
|
# URL di un dataset open source affidabile (es. quello di Matteo Contrini o ISTAT) |
|
url = "https://raw.githubusercontent.com/matteocontrini/comuni-json/refs/heads/master/comuni.json" |
|
|
|
try: |
|
df_raw = pd.read_json(url) |
|
except Exception as e: |
|
print(f"Errore nel recupero dei dati comuni: {e}") |
|
return None |
|
else: |
|
# Estraiamo le informazioni richieste: |
|
# Il dataset contiene oggetti annidati per provincia e regione |
|
df_comuni = pd.DataFrame({ |
|
'codice_catastale': df_raw['codiceCatastale'], |
|
'comune_nome': df_raw['nome'], |
|
'provincia_sigla': df_raw['sigla'], |
|
'provincia_nome': df_raw['provincia'].apply(lambda x: x['nome']), |
|
'regione_nome': df_raw['regione'].apply(lambda x: x['nome']) |
|
}) |
|
|
|
# Rimuoviamo eventuali duplicati o righe senza codice catastale |
|
df_comuni = df_comuni.dropna(subset=['codice_catastale']).drop_duplicates('codice_catastale') |
|
|
|
return df_comuni |
|
|
|
def export(self, output_path, format="gpkg", target_epsg=None, postgis_url=None): |
|
""" |
|
Esporta tutti i layer caricati nel formato scelto. |
|
""" |
|
self._parse() |
|
|
|
for layer_name in self.layers.keys(): |
|
gdf = self.get_geodataframe(layer_name, target_epsg) |
|
if gdf is not None: |
|
if format == "gpkg": |
|
gdf.to_file(output_path, layer=layer_name.lower(), driver="GPKG") |
|
elif format == "postgis" and postgis_url: |
|
engine = create_engine(postgis_url) |
|
gdf.to_postgis(layer_name.lower(), engine, if_exists='replace') |
|
elif format == "shp": |
|
gdf.to_file(f"{layer_name.lower()}.shp") |
|
|
|
if __name__ == "__main__": |
|
|
|
crs_in = "EPSG:6707" # CRS di input predefinito |
|
|
|
# --- ESEMPIO DI UTILIZZO --- |
|
# Se vuoi escludere ad esempio SIMBOLI e LINEE: |
|
esclusioni = [ |
|
# "SIMBOLO", "LINEA" |
|
] |
|
# path = "./C660_2949171_1/C660A000100.cxf" |
|
path = "./C660_2950073_1" |
|
converter = CXFConverter(path, exclude_types=esclusioni, input_epsg=crs_in, extra_info=False) |
|
|
|
# Salva in GeoPackage |
|
converter.export("mappa_catastale.gpkg") |
|
|
|
# Salva in PostGIS |
|
# converter.save_to_postgis("postgresql://user:password@localhost:5432/db_catasto") |