Last active
September 5, 2025 11:57
-
-
Save giladbarnea/ce4b52f731512e6f7b24c90e3a4a13a3 to your computer and use it in GitHub Desktop.
convert_aistudio_chats_to_text.py: iterates a chat json file and prints user and assistant texts with xml tags
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #! /usr/bin/env python3 | |
| """ | |
| Export Google AI Studio chat as a readable transcript with filters. | |
| Usage: | |
| python export-ai-studio.py.py CHAT.json | |
| [--tag {xml,markdown}] | |
| [--no-thoughts] | |
| [--no-files] | |
| [--no-code | --no-code-source | --no-code-output] | |
| [--no-code-errors] | |
| [--no-images] | |
| Notes: | |
| - Messages are picked from the longest "message-like" array (in GAIS exports usually | |
| `chunkedPrompt.chunks`). | |
| - Thought messages: top-level `isThought==True` OR any part with `thought==True`. | |
| - File messages: user messages with `driveImage` / `driveDocument` keys. | |
| - Code source: messages/parts with `executableCode`. | |
| - Code result: messages/parts with `codeExecutionResult`. | |
| - `--no-code-errors` removes entire blocks of consecutive code sources directly followed | |
| by a single code result, **only** if that result has `outcome == "OUTCOME_FAILED"`. | |
| - Image messages: messages that embed inline image data either as top-level `inlineImage` | |
| or as `parts[*].inlineData` with an image MIME type. `--no-images` drops those. | |
| """ | |
| from __future__ import annotations | |
| import argparse, json, sys | |
| from pathlib import Path | |
| from typing import Any, List, Tuple, Literal, TypedDict | |
| # ---- Minimal TypedDicts (informative only) ---- | |
| class UserTextMessage(TypedDict): | |
| role: Literal['user'] | |
| text: str | |
| tokenCount: int | |
| class TextPart(TypedDict): | |
| text: str | |
| class ModelTextMessageParts(TypedDict): | |
| role: Literal['model'] | |
| text: str | |
| parts: List[TextPart] | |
| tokenCount: int | |
| class InlineData(TypedDict): | |
| mimeType: str | |
| data: str | |
| class ModelImageMessageInline(TypedDict): | |
| role: Literal['model'] | |
| inlineImage: InlineData | |
| finishReason: str | |
| tokenCount: int | |
| parts: List[dict] | |
| # -------------------- discovery -------------------- | |
| def find_message_lists(node: Any, path: str="") -> List[Tuple[str, list]]: | |
| found: List[Tuple[str, list]] = [] | |
| if isinstance(node, list): | |
| def looks_like_msg(x: Any) -> bool: | |
| return isinstance(x, dict) and (("role" in x) or ("parts" in x) or ("content" in x) or ("inlineImage" in x)) | |
| if any(looks_like_msg(x) for x in node[:10]): | |
| found.append((path, node)) | |
| elif isinstance(node, dict): | |
| for k, v in node.items(): | |
| child = f"{path}.{k}" if path else k | |
| found.extend(find_message_lists(v, child)) | |
| return found | |
| def get_messages(data: Any) -> Tuple[str, list]: | |
| cands = find_message_lists(data) | |
| if not cands: | |
| print("No message arrays found.", file=sys.stderr) | |
| sys.exit(1) | |
| return max(cands, key=lambda x: len(x[1])) | |
| # -------------------- classification -------------------- | |
| def is_thought_message(m: dict) -> bool: | |
| if m.get("isThought") is True: | |
| return True | |
| parts = m.get("parts") | |
| if isinstance(parts, list): | |
| for p in parts: | |
| if isinstance(p, dict) and p.get("thought") is True: | |
| return True | |
| return False | |
| def is_file_message(m: dict) -> bool: | |
| return ("driveImage" in m) or ("driveDocument" in m) | |
| def has_executable_code(m: dict) -> bool: | |
| if "executableCode" in m: | |
| return True | |
| parts = m.get("parts") | |
| if isinstance(parts, list): | |
| for p in parts: | |
| if isinstance(p, dict) and "executableCode" in p: | |
| return True | |
| return False | |
| def has_code_result(m: dict) -> bool: | |
| if "codeExecutionResult" in m: | |
| return True | |
| parts = m.get("parts") | |
| if isinstance(parts, list): | |
| for p in parts: | |
| if isinstance(p, dict) and "codeExecutionResult" in p: | |
| return True | |
| return False | |
| def code_result_failed(m: dict) -> bool: | |
| """Only treat outcome=='OUTCOME_FAILED' as failure (case-sensitive).""" | |
| def check_obj(obj: dict) -> bool: | |
| return obj.get("outcome") == "OUTCOME_FAILED" | |
| if isinstance(m.get("codeExecutionResult"), dict) and check_obj(m["codeExecutionResult"]): | |
| return True | |
| parts = m.get("parts") | |
| if isinstance(parts, list): | |
| for p in parts: | |
| if isinstance(p, dict) and isinstance(p.get("codeExecutionResult"), dict): | |
| if check_obj(p["codeExecutionResult"]): | |
| return True | |
| return False | |
| def is_image_message(m: dict) -> bool: | |
| img = m.get("inlineImage") | |
| if isinstance(img, dict): | |
| mt = img.get("mimeType") | |
| if isinstance(mt, str) and mt.lower().startswith("image/"): | |
| return True | |
| if "data" in img: | |
| return True | |
| parts = m.get("parts") | |
| if isinstance(parts, list): | |
| for p in parts: | |
| if isinstance(p, dict) and isinstance(p.get("inlineData"), dict): | |
| idata = p["inlineData"] | |
| mt = idata.get("mimeType") | |
| if isinstance(mt, str) and mt.lower().startswith("image/"): | |
| return True | |
| if "data" in idata: | |
| return True | |
| return False | |
| # -------------------- text extraction -------------------- | |
| def extract_text(m: dict) -> str: | |
| parts = m.get("parts") | |
| chunks: List[str] = [] | |
| if isinstance(parts, list): | |
| for p in parts: | |
| if isinstance(p, dict) and isinstance(p.get("text"), str): | |
| chunks.append(p["text"]) | |
| if chunks: | |
| return "\n".join(chunks) | |
| t = m.get("text") | |
| return t if isinstance(t, str) else "" | |
| # -------------------- main -------------------- | |
| # ---- short output helpers ---- | |
| def compute_short_limit() -> int: | |
| import os | |
| try: | |
| cols = int(os.environ.get("COLUMNS", "0")) | |
| except Exception: | |
| cols = 0 | |
| two_thirds = (2 * cols) // 3 if cols > 0 else 0 | |
| return max(two_thirds, 80) | |
| def maybe_shorten(s: str, enable: bool) -> str: | |
| """Paragraph mode: split on single '\n', shorten each line independently | |
| to max(2/3*$COLUMNS, 80) with ellipsis, and rejoin with '\n'. | |
| Empty lines are preserved, so double line breaks remain double.""" | |
| if not enable or not isinstance(s, str): | |
| return s | |
| limit = compute_short_limit() | |
| lines = s.split('\n') | |
| out_lines = [] | |
| cut = max(limit - 3, 0) | |
| for line in lines: | |
| if len(line) > limit: | |
| out_lines.append((line[:cut] + '...') if cut > 0 else '...') | |
| else: | |
| out_lines.append(line) | |
| return '\n'.join(out_lines) | |
| # ---- analyze mode helpers ---- | |
| from typing import Any, List, Tuple | |
| _HEAVY_KEYS = {"text","inlineData","data","code","stdout","stderr","content"} | |
| def _is_message_like(x: Any) -> bool: | |
| return isinstance(x, dict) and (("role" in x) or ("parts" in x) or ("content" in x) or ("inlineImage" in x)) | |
| def _find_message_lists(node: Any, path: str="") -> List[Tuple[str, list]]: | |
| found: List[Tuple[str, list]] = [] | |
| if isinstance(node, list): | |
| if any(_is_message_like(x) for x in node[:10]): | |
| found.append((path, node)) | |
| elif isinstance(node, dict): | |
| for k, v in node.items(): | |
| child = f"{path}.{k}" if path else k | |
| found.extend(_find_message_lists(v, child)) | |
| return found | |
| def _get_messages_for_analyze(data: Any) -> Tuple[str, list]: | |
| cands = _find_message_lists(data) | |
| if not cands: | |
| return "", [] | |
| return max(cands, key=lambda x: len(x[1])) | |
| def _is_censor_key(key: str) -> bool: | |
| k = (key or "").lower() | |
| if k in _HEAVY_KEYS: | |
| return True | |
| if k.endswith("base64") or k.endswith("bytes"): | |
| return True | |
| return False | |
| def _is_metadata_like_string(s: str) -> bool: | |
| return len(s) <= 80 | |
| def _type_name(v): | |
| if v is None: return "null" | |
| if isinstance(v, bool): return "bool" | |
| if isinstance(v, int): return "int" | |
| if isinstance(v, float): return "float" | |
| if isinstance(v, str): return "str" | |
| if isinstance(v, list): return "list" | |
| if isinstance(v, dict): return "dict" | |
| return type(v).__name__ | |
| def _repr_value(key: str, v): | |
| if isinstance(v, str): | |
| if _is_censor_key(key) or not _is_metadata_like_string(v): | |
| return "..." | |
| import json as _json | |
| return _json.dumps(v) | |
| if isinstance(v, bool): | |
| return "True" if v else "False" | |
| if v is None: | |
| return "None" | |
| return repr(v) | |
| def _analyze_emit_kv(lines, key, v, indent=0): | |
| pad = " " * indent | |
| t = _type_name(v) | |
| if isinstance(v, dict): | |
| lines.append(f"{pad}{key}: {t} =") | |
| keys = list(v.keys()) | |
| preferred = ["role","finishReason","tokenCount","isThought","isJson","mimeType","outcome"] | |
| keys_sorted = [k for k in preferred if k in v] + [k for k in keys if k not in preferred] | |
| for k in keys_sorted: | |
| _analyze_emit_kv(lines, k, v[k], indent+1) | |
| elif isinstance(v, list): | |
| lines.append(f"{pad}{key}: {t} =") | |
| for idx, item in enumerate(v, start=1): | |
| if isinstance(item, (dict,list)): | |
| lines.append(f"{pad} {idx}: {_type_name(item)} =") | |
| _analyze_emit_kv(lines, f"[{idx}]", item, indent+2) | |
| else: | |
| lines.append(f"{pad} {idx}: {_type_name(item)} = {_repr_value(key, item)}") | |
| else: | |
| lines.append(f"{pad}{key}: {t} = {_repr_value(key, v)}") | |
| def analyze_file(json_path: str) -> str: | |
| import json as _json | |
| data = _json.loads(Path(json_path).read_text(encoding="utf-8")) | |
| mpath, msgs = _get_messages_for_analyze(data) | |
| out = [] | |
| out.append(f"# analyze: messages at: {mpath}; count={len(msgs)}") | |
| for i, m in enumerate(msgs): | |
| if not isinstance(m, dict): | |
| out.append(f"- message {i}: {type(m).__name__}") | |
| continue | |
| role = m.get("role") | |
| out.append(f"message {i}:") | |
| _analyze_emit_kv(out, "role", role, indent=1) | |
| for k in [kk for kk in m.keys() if kk != "role"]: | |
| _analyze_emit_kv(out, k, m[k], indent=1) | |
| out.append("") | |
| return "\n".join(out) | |
| def main(argv: List[str]) -> int: | |
| ap = argparse.ArgumentParser() | |
| ap.add_argument("json_file") | |
| ap.add_argument("--tag", choices=["xml","markdown"], default="xml") | |
| ap.add_argument("--no-thoughts", action="store_true") | |
| ap.add_argument("--no-files", action="store_true") | |
| ap.add_argument("--no-code", action="store_true") | |
| ap.add_argument("--no-code-source", action="store_true") | |
| ap.add_argument("--no-code-output", action="store_true") | |
| ap.add_argument("--no-code-errors", action="store_true", | |
| help="Drop consecutive code-source block(s) + their immediate result if the result outcome=='OUTCOME_FAILED'") | |
| ap.add_argument("--no-images", action="store_true", | |
| help="Drop messages that embed inline images (inlineImage/inlineData)") | |
| ap.add_argument("--short", action="store_true", | |
| help="Limit message content per line (paragraph mode) to max(2/3*$COLUMNS, 80) with ellipsis") | |
| args = ap.parse_args(argv) | |
| data = json.loads(Path(args.json_file).read_text(encoding="utf-8")) | |
| messages_path, messages = get_messages(data) | |
| # Identify flush-style blocks: <codesrc>...<codesrc><coderesult> | |
| code_blocks: List[Tuple[List[int], int]] = [] # ([source_idx...], result_idx) | |
| i = 0 | |
| n = len(messages) | |
| while i < n: | |
| m = messages[i] if isinstance(messages[i], dict) else None | |
| if m and has_executable_code(m): | |
| src_idxs = [i] | |
| j = i + 1 | |
| while j < n: | |
| mj = messages[j] if isinstance(messages[j], dict) else None | |
| if mj and has_executable_code(mj): | |
| src_idxs.append(j) | |
| j += 1 | |
| continue | |
| break | |
| if j < n: | |
| mr = messages[j] if isinstance(messages[j], dict) else None | |
| if mr and has_code_result(mr): | |
| code_blocks.append((src_idxs, j)) | |
| i = j + 1 | |
| continue | |
| i += 1 | |
| else: | |
| i += 1 | |
| # Compute exclusions for --no-code-errors | |
| exclude_due_to_errors: set[int] = set() | |
| if args.no_code_errors: | |
| for src_idxs, res_idx in code_blocks: | |
| res_msg = messages[res_idx] if (0 <= res_idx < n) else None | |
| if res_msg and code_result_failed(res_msg): | |
| exclude_due_to_errors.update(src_idxs) | |
| exclude_due_to_errors.add(res_idx) | |
| # Render | |
| out_lines: List[str] = [] | |
| for idx, m in enumerate(messages): | |
| if not isinstance(m, dict): | |
| continue | |
| if idx in exclude_due_to_errors: | |
| continue | |
| if args.no_files and is_file_message(m): | |
| continue | |
| if args.no_thoughts and is_thought_message(m): | |
| continue | |
| if args.no_code or args.no_code_source: | |
| if has_executable_code(m): | |
| continue | |
| if args.no_code or args.no_code_output: | |
| if has_code_result(m): | |
| continue | |
| if args.no_images and is_image_message(m): | |
| continue | |
| role = m.get("role","?") | |
| text = extract_text(m) | |
| text = maybe_shorten(text, getattr(args, 'short', False)) | |
| # Skip emitting if empty after filters | |
| if not isinstance(text, str) or text.strip() == "": | |
| continue | |
| if args.tag == "xml": | |
| tag = "user" if role == "user" else ("assistant" if role == "model" else role) | |
| out_lines.append(f"<{tag}>\n{text}\n</{tag}>") | |
| else: | |
| label = "User" if role == "user" else ("Assistant" if role == "model" else role.title()) | |
| out_lines.append(f"### {label}\n\n{text}\n") | |
| print("\n".join(out_lines)) | |
| return 0 | |
| if __name__ == "__main__": | |
| # ## PRE-ARGPARSE ANALYZE SHIM ## | |
| import sys as _sys | |
| av = list(_sys.argv[1:]) | |
| if "analyze" in av: | |
| try: | |
| ix = av.index("analyze") | |
| jpath = av[ix+1] | |
| except Exception: | |
| print("Usage: export-ai-studio.py analyze <file.json>", file=sys.stderr) | |
| raise SystemExit(2) | |
| print(analyze_file(jpath)) | |
| raise SystemExit(0) | |
| raise SystemExit(main(sys.argv[1:])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment