Skip to content

Instantly share code, notes, and snippets.

@mdbenito
Last active September 18, 2025 13:27
Show Gist options
  • Select an option

  • Save mdbenito/8fc5a3d40337fcf3fa8aac8807486d67 to your computer and use it in GitHub Desktop.

Select an option

Save mdbenito/8fc5a3d40337fcf3fa8aac8807486d67 to your computer and use it in GitHub Desktop.
kuzu STRUCT(...)[] bug
import itertools
import logging
import shutil
import tempfile
from pathlib import Path
import kuzu
logging.basicConfig(format="%(message)s", level=logging.DEBUG)
logger = logging.getLogger(__name__)
def test_struct_array_json_corruption():
temp_dir = Path(tempfile.mkdtemp())
db_path = temp_dir / "test_db"
try:
db = kuzu.Database(db_path)
conn = kuzu.Connection(db)
conn.execute("INSTALL json;")
conn.execute("LOAD json;")
create_table_query = """
CREATE NODE TABLE TestNode (
id STRING,
grids STRUCT(`type` STRING, `display` BOOL, `params` JSON)[],
PRIMARY KEY (id)
);
"""
logger.debug(create_table_query)
conn.execute(create_table_query)
grid_query = """
CREATE (n:TestNode { `id`: $node_id, `grids`: [{ `type`: "column", `display`: true, `params`: to_json({color: {r: $r, g: $g, b: $b}}) }] });
"""
no_grid_query = """
CREATE (n:TestNode { `id`: $node_id, `grids`: [] });
"""
for r, g, b in itertools.product(range(0, 2), repeat=3):
node_id = f"test_{r}_{g}_{b}"
empty = (r + g + b) % 2 == 0 # Alternate
try:
if empty:
prepared_query = no_grid_query.replace(
"$node_id", f"'{node_id}'"
)
else:
prepared_query = (
grid_query.replace("$node_id", f"'{node_id}'")
.replace("$r", str(r))
.replace("$g", str(g))
.replace("$b", str(b))
)
conn.execute(prepared_query)
logger.info(f"✓ Inserted {node_id}")
logger.debug(" with" + prepared_query)
conn.execute("CHECKPOINT")
read_query = (
f"MATCH (n:TestNode) WHERE n.id = '{node_id}' RETURN n.grids;"
)
result = conn.execute(read_query).get_all()
if not empty and result[0][0][0]["`type`"] != "column":
logger.warning(f" ⚠️ CORRUPTION DETECTED in {node_id}!")
logger.info(f" Grid data immediately after insert: {result[0][0]}")
except Exception as e:
logger.info(f"✗ Failed to insert {node_id}: {e}")
logger.info(f"Query: {grid_query}")
continue
logger.info("\n" + "=" * 60)
count_corruptions_query = 'MATCH (n:TestNode) WITH n, size(n.grids) AS k WHERE k > 0 WITH n WHERE struct_extract(n.grids[1], "`type`") = \'\' RETURN count(n);'
logger.debug(count_corruptions_query)
result = conn.execute(count_corruptions_query).get_all()
logger.info(f"Total nodes with corrupted grids: {result[0][0]}")
export_dir = Path(tempfile.mkdtemp())
if export_dir.exists():
shutil.rmtree(export_dir)
conn.execute(f"EXPORT DATABASE '{export_dir}';")
logger.info(f"Database exported to {export_dir}")
except Exception as e:
logger.error(f"Test failed with error: {e}")
import traceback
traceback.print_exc()
finally:
if temp_dir.exists():
shutil.rmtree(temp_dir)
logger.info(f"\nCleaned up temporary directory: {temp_dir}")
if __name__ == "__main__":
test_struct_array_json_corruption()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment