Skip to content

Instantly share code, notes, and snippets.

@manuelep
Last active January 3, 2026 23:17
Show Gist options
  • Select an option

  • Save manuelep/0dbba21a7911a28d84174992bbb8b85b to your computer and use it in GitHub Desktop.

Select an option

Save manuelep/0dbba21a7911a28d84174992bbb8b85b to your computer and use it in GitHub Desktop.
Convertitore Batch da formati Catastali CXF/SUP

cxf2gis.py - Convertitore Batch da Catasto CXF/SUP a formati GIS standard

Descrizione

Python tool per la conversione massiva di file cartografici catastali (.CXF) in formato OGC GeoPackage.

Caratteristiche principali:

  • Elaborazione Batch: Supporta sia file singoli che intere cartelle di fogli di mappa.

  • Integrazione Metadati: Estrae automaticamente codice Comune (Belfiore), Foglio, Sezione e Allegato dalla nomenclatura dei file (es. C660A000100.cxf).

  • Join Anagrafico: Integra opzionalmente i nomi dei comuni, province (nomi e sigle) e regioni tramite lookup online dei dati ISTAT.

  • Dati Superficie: Associa le aree nominali leggendo i file .SUP corrispondenti.

  • Multi-layer: Genera layer separati per BORDO, TESTO, SIMBOLO, FIDUCIALE e LINEA.

  • Ready-to-use: Gestisce automaticamente i buchi (isole) nelle particelle e la riproiezione CRS tramite GeoPandas.

import os, glob
import geopandas as gpd
import pandas as pd
from shapely.geometry import Polygon, Point, LineString
from sqlalchemy import create_engine
class CXFConverter:
# Dizionario decodifica simboli da specifiche catastali
# DIZIONARIO_SIMBOLI = {
# '1': 'Punto di controllo', '2': 'Termine particellare', '3': 'Parametro',
# '4': 'Osso di morto', '5': 'Grande flusso acque', '6': 'Medio flusso acque',
# '7': 'Piccolo flusso acque', '8': 'Fiduciale trigonometrico', '9': 'Graffa piccola',
# '10': 'Ancora', '11': 'Termine provinciale', '13': 'Croce su roccia',
# '14': 'Graffa grande', '15': 'Baffettatura piccola', '16': 'Baffettatura grande',
# '20': 'Fiduciale semplice'
# }
def __init__(self, file_path, exclude_types=None, input_epsg="EPSG:6707", extra_info=False):
self.file_path = file_path
# Determina la lista dei file da processare
if os.path.isdir(file_path):
self.files_to_process = glob.glob(os.path.join(file_path, "*.cxf"))
# Gestione file SUP: se siamo in una cartella, leggeremo il SUP corrispondente a ogni file durante il parsing
else:
self.files_to_process = [file_path]
self.exclude_types = exclude_types or []
self.input_epsg = input_epsg
self.gdf_sup = None
# Contenitori per i diversi tipi di geometria
self.layers = {
'BORDO': [], # Poligoni (Particelle, Fabbricati, ecc.)
'TESTO': [], # Punti con attributo testo
'SIMBOLO': [], # Punti con codice simbolo
'FIDUCIALE': [], # Punti fiduciali
'LINEA': [] # Linee (archi, bordi di foglio, ecc.)
}
# Caricamento opzionale della tabella comuni (CSV ISTAT)
if extra_info is True:
self.df_comuni = self.get_mappa_comuni()
else:
self.df_comuni = None
def _decripta_nome_file(self, filename):
"""Estrae Comune, Sezione, Foglio e Allegato dal nome standard C660A000100"""
name = os.path.splitext(os.path.basename(filename))[0].upper()
meta = {
'comune': name[0:4],
'sezione': name[4:5] if len(name) > 4 else '',
'foglio': name[5:9] if len(name) >= 9 else '0',
'allegato': name[9:11] if len(name) >= 11 else '00',
'file_nome': filename
}
# Pulizia zeri iniziali per il foglio (es: 0001 -> 1)
try: meta['foglio'] = str(int(meta['foglio']))
except: pass
return meta
def _parse(self):
for file_cxf in self.files_to_process:
meta = self._decripta_nome_file(file_cxf)
df_sup = self._sup2gdf(file_cxf)
with open(file_cxf, 'r', encoding='latin-1') as f:
lines = [line.strip() for line in f if line.strip()]
i = 0
while i < len(lines):
tag = lines[i]
# Salto l'elemento se incluso nella lista di esclusione
if tag in self.exclude_types:
i += 1
continue
if tag == "BORDO":
i = self._handle_bordo(lines, i, df_sup, meta)
elif tag == "TESTO":
i = self._handle_testo(lines, i, meta)
elif tag == "SIMBOLO":
i = self._handle_simbolo(lines, i, meta)
elif tag == "FIDUCIALE":
i = self._handle_fiduciale(lines, i, meta)
elif tag == "LINEA":
i = self._handle_linea(lines, i, meta)
else:
i += 1
def _handle_bordo(self, lines, i, df_sup, meta):
codice = lines[i+1]
num_isole = int(lines[i+8])
num_tot_v = int(lines[i+9])
cursor = i + 10
isole_v = [int(lines[cursor+j]) for j in range(num_isole)]
cursor += num_isole
coords = [(float(lines[cursor+j*2]), float(lines[cursor+j*2+1])) for j in range(num_tot_v)]
cursor += (num_tot_v * 2)
if num_isole == 0:
geom = Polygon(coords)
else:
ext_idx = num_tot_v - sum(isole_v)
geom = Polygon(shell=coords[:ext_idx], holes=[coords[ext_idx:ext_idx+v] for v in isole_v])
classe = "PARTICELLA"
if codice.endswith('+'): classe = "FABBRICATO"
elif "STRADA" in codice.upper(): classe = "STRADA"
elif "ACQUA" in codice.upper(): classe = "ACQUA"
data = {
'geometry': geom, 'codice': codice, 'classe': classe,
'comune': meta['comune'], 'foglio': meta['foglio'],
'sezione': meta['sezione'], 'allegato': meta['allegato']
}
if df_sup is not None:
sup_match = df_sup[df_sup['codice'] == codice]
if not sup_match.empty:
data['area_nominale'] = sup_match.iloc[0]['area_nominale']
data['area_grafica'] = geom.area
self.layers['BORDO'].append(data)
return cursor
def _handle_testo(self, lines, i, meta):
self.layers['TESTO'].append({
'geometry': Point(float(lines[i+4]), float(lines[i+5])),
'testo': lines[i+1],
'angolo': float(lines[i+3]),
'foglio': meta['foglio'],
'comune': meta['comune'] # <-- AGGIUNGI QUESTO
})
return i + 8
def _handle_simbolo(self, lines, i, meta):
cod = lines[i+1]
self.layers['SIMBOLO'].append({
'geometry': Point(float(lines[i+3]), float(lines[i+4])),
'codice_simbolo': cod,
'angolo': float(lines[i+2]),
'foglio': meta['foglio'],
'comune': meta['comune'] # <-- AGGIUNGI QUESTO
})
return i + 6
def _handle_fiduciale(self, lines, i, meta):
self.layers['FIDUCIALE'].append({
'geometry': Point(float(lines[i+3]), float(lines[i+4])),
'id_fid': lines[i+1], 'foglio': meta['foglio']
})
return i + 5
def _handle_linea(self, lines, i, meta):
num_v = int(lines[i+2])
cursor = i + 3
coords = [(float(lines[cursor+j*2]), float(lines[cursor+j*2+1])) for j in range(num_v)]
self.layers['LINEA'].append({'geometry': LineString(coords), 'foglio': meta['foglio']})
return cursor + (num_v * 2)
def _sup2gdf(self, file_path):
"""
Cerca e parsa il file .SUP associato al file .CXF.
Ritorna un DataFrame con colonne ['codice', 'area_sup']
"""
# Il file SUP ha solitamente lo stesso base-name del CXF
base_path = os.path.splitext(file_path)[0]
sup_path = base_path + ".SUP"
if not os.path.exists(sup_path):
# print(f"Nota: File SUP non trovato in {sup_path}. Procedo senza dati di superficie.")
return None
records = []
with open(sup_path, 'r', encoding='latin-1') as f:
for line in f:
parts = line.strip().split()
if len(parts) >= 2:
# parts[0] è l'identificativo (es. 101, STRADA, ACQUA)
# parts[1] è l'area in mq
try:
records.append({
'codice': parts[0],
'area_nominale': float(parts[1])
})
except ValueError:
continue # Salta righe di intestazione non numeriche se presenti
return pd.DataFrame(records)
def get_geodataframe(self, layer_name, target_epsg=None):
data = self.layers.get(layer_name.upper())
if not data:
return None
gdf = gpd.GeoDataFrame(data, crs=self.input_epsg)
# JOIN CON ANAGRAFICA COMUNI
if self.df_comuni is not None and 'comune' in gdf.columns:
# Usiamo 'comune' (es. C660) per il join con 'codice_catastale'
gdf = gdf.merge(
self.df_comuni,
left_on='comune',
right_on='codice_catastale',
how='left'
)
# Rimuoviamo la colonna duplicata dopo il join
if 'codice_catastale' in gdf.columns:
gdf = gdf.drop(columns=['codice_catastale'])
# Calcolo sbilancio superfici (solo per BORDO)
if layer_name.upper() == 'BORDO' and 'area_nominale' in gdf.columns:
gdf['diff_area'] = gdf['area_nominale'] - gdf.geometry.area
if target_epsg:
gdf = gdf.to_crs(target_epsg)
return gdf
@staticmethod
def get_mappa_comuni():
"""
Recupera l'elenco dei comuni italiani da una sorgente open data
e restituisce un DataFrame filtrato con i codici catastali.
"""
# URL di un dataset open source affidabile (es. quello di Matteo Contrini o ISTAT)
url = "https://raw.githubusercontent.com/matteocontrini/comuni-json/refs/heads/master/comuni.json"
try:
df_raw = pd.read_json(url)
except Exception as e:
print(f"Errore nel recupero dei dati comuni: {e}")
return None
else:
# Estraiamo le informazioni richieste:
# Il dataset contiene oggetti annidati per provincia e regione
df_comuni = pd.DataFrame({
'codice_catastale': df_raw['codiceCatastale'],
'comune_nome': df_raw['nome'],
'provincia_sigla': df_raw['sigla'],
'provincia_nome': df_raw['provincia'].apply(lambda x: x['nome']),
'regione_nome': df_raw['regione'].apply(lambda x: x['nome'])
})
# Rimuoviamo eventuali duplicati o righe senza codice catastale
df_comuni = df_comuni.dropna(subset=['codice_catastale']).drop_duplicates('codice_catastale')
return df_comuni
def export(self, output_path, format="gpkg", target_epsg=None, postgis_url=None):
"""
Esporta tutti i layer caricati nel formato scelto.
"""
self._parse()
for layer_name in self.layers.keys():
gdf = self.get_geodataframe(layer_name, target_epsg)
if gdf is not None:
if format == "gpkg":
gdf.to_file(output_path, layer=layer_name.lower(), driver="GPKG")
elif format == "postgis" and postgis_url:
engine = create_engine(postgis_url)
gdf.to_postgis(layer_name.lower(), engine, if_exists='replace')
elif format == "shp":
gdf.to_file(f"{layer_name.lower()}.shp")
if __name__ == "__main__":
crs_in = "EPSG:6707" # CRS di input predefinito
# --- ESEMPIO DI UTILIZZO ---
# Se vuoi escludere ad esempio SIMBOLI e LINEE:
esclusioni = [
# "SIMBOLO", "LINEA"
]
# path = "./C660_2949171_1/C660A000100.cxf"
path = "./C660_2950073_1"
converter = CXFConverter(path, exclude_types=esclusioni, input_epsg=crs_in, extra_info=False)
# Salva in GeoPackage
converter.export("mappa_catastale.gpkg")
# Salva in PostGIS
# converter.save_to_postgis("postgresql://user:password@localhost:5432/db_catasto")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment