Skip to content

Instantly share code, notes, and snippets.

@shiroyuki
Last active July 20, 2025 13:58
Show Gist options
  • Select an option

  • Save shiroyuki/9fd9923585f1f40f5f1b3cae6d96fc29 to your computer and use it in GitHub Desktop.

Select an option

Save shiroyuki/9fd9923585f1f40f5f1b3cae6d96fc29 to your computer and use it in GitHub Desktop.
Prototype of Upcoming Wrapper for Iceberg
"""
Prototype Iceberg Wrapper
Copyright Juti Noppornpitak
Licensed with Apache 2
"""
from collections import defaultdict
from typing import Any
from pandas import DataFrame
import pyarrow
from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema, NestedField
from pyiceberg.types import IcebergType
# TODO Implement the DBAPI 2 to use with SQLAlchemy 2
class UndefinedSchemaError(RuntimeError):
pass
class Table:
def __init__(self, catalog: "Catalog", namespace: str, name: str):
self._catalog = catalog
self._namespace = namespace
self._name = name
self._fields: dict[str, NestedField] = dict()
self._schema: Schema | None = None
def define(self, name: str, kind: IcebergType, required: bool = False) -> "Table":
self._fields[name] = NestedField(len(self._fields) + 1, name, kind, required=required)
return self
def create_if_not_exists(self):
if not self._fields:
raise UndefinedSchemaError('No schema defined')
if not self._schema:
self._schema = Schema(*[
nf
for nf in sorted(self._fields.values(), key=lambda nf: nf.field_id)
])
self._catalog._api.create_table_if_not_exists(
f'{self._namespace}.{self._name}',
self._schema
)
else:
pass # NO NOOP
def _convert_rows_to_table(self, rows: list[dict[str, Any]]) -> pyarrow.Table:
fields = [
nf.name
for nf in sorted(self._fields.values(), key=lambda nf: nf.field_id)
]
series = defaultdict(list)
for row in rows:
for field in fields:
series[field].append(row[field])
return pyarrow.table(series)
def append(self, rows: list[dict[str, Any]]):
self.create_if_not_exists()
self._get_api().append(self._convert_rows_to_table(rows))
def overwrite(self, rows: list[dict[str, Any]]):
self.create_if_not_exists()
self._get_api().overwrite(self._convert_rows_to_table(rows))
def query(self) -> DataFrame:
# TODO Implement row_filter
return self._get_api().scan().to_pandas()
def delete(self):
self._catalog._api.drop_table(f'{self._namespace}.{self._name}')
def _get_api(self):
return self._catalog._api.load_table(f'{self._namespace}.{self._name}')
class Catalog:
def __init__(self, name: str, **config):
self._name = name
self._iceberg_config = config
self._api = load_catalog(self._name, **self._iceberg_config)
def get_namespaces(self) -> list[str]:
return [i[0] for i in self._api.list_namespaces()]
def get_tables(self, namespace: str):
self._api.create_namespace_if_not_exists(namespace)
return [i[1] for i in self._api.list_tables(namespace=namespace)]
def table(self, namespace: str, name: str) -> Table:
""" Get a table by namespace and table name. """
return Table(catalog=self, namespace=namespace, name=name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment