Last active
November 25, 2025 05:57
-
-
Save josephlou5/0cec3c759a0582639c748e21a512b0b1 to your computer and use it in GitHub Desktop.
Fetch data from a public Google Sheet using the Graph Visualization Query Language
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Fetches data from a public Google Sheet using the Graph Visualization | |
| Query Language. | |
| See: https://developers.google.com/chart/interactive/docs/querylanguage | |
| ======================================================================== | |
| Changelog | |
| ======================================================================== | |
| v1.0 (2025-10-07) | |
| - Initial version | |
| """ | |
| import dataclasses | |
| import datetime | |
| import enum | |
| import functools | |
| import json | |
| import re | |
| import sys | |
| from typing import Self | |
| import requests | |
| DATE_RE = re.compile(r"Date\((?P<year>\d+),(?P<month0>\d+),(?P<day>\d+)\)") | |
| DATETIME_RE = re.compile( | |
| r"Date\(" | |
| r"(?P<year>\d+),(?P<month0>\d+),(?P<day>\d+)," | |
| r"(?P<hour>\d+),(?P<minute>\d+),(?P<second>\d+)" | |
| r"\)" | |
| ) | |
| TIME_OF_DAY_RE = re.compile( | |
| r"^(?P<hour>\d\d):(?P<minute>\d\d):(?P<second>\d\d)" | |
| r"(?:\.(?P<millisecond>\d\d\d))?$" | |
| ) | |
| RESPONSE_RE = re.compile( | |
| r"^/\*O_o\*/\ngoogle\.visualization\.Query\.setResponse\((?P<data>.*)\);$" | |
| ) | |
| class ColumnType(enum.Enum): | |
| """Type of a Column.""" | |
| BOOLEAN = "boolean" | |
| NUMBER = "number" | |
| STRING = "string" | |
| DATE = "date" | |
| # Note that datetimes are precise to the second only. | |
| DATETIME = "datetime" | |
| TIME_OF_DAY = "timeofday" | |
| @functools.total_ordering | |
| @dataclasses.dataclass() | |
| class TimeOfDay: | |
| """Represents a time of day. | |
| Similar to `datetime.time`, but only goes up to milliseconds. | |
| """ | |
| hour: int | |
| minute: int | |
| second: int | |
| millisecond: int | None | |
| def __str__(self) -> str: | |
| time_of_day = f"{self.hour:02d}:{self.minute:02d}:{self.second:02d}" | |
| if self.millisecond is not None: | |
| time_of_day += f".{self.millisecond:03d}" | |
| return time_of_day | |
| @classmethod | |
| def parse(cls, time_of_day: str) -> Self: | |
| # Unused. | |
| m = TIME_OF_DAY_RE.match(time_of_day) | |
| if m is None: | |
| raise ValueError( | |
| "Failed to parse time of day string. Expected format " | |
| f"'HH:mm:ss[.SSS]', but got: {time_of_day}" | |
| ) | |
| millisecond = m.group("millisecond") | |
| if millisecond is not None: | |
| millisecond = int(millisecond) | |
| return cls(*map(int, m.group("hour", "minute", "second")), millisecond) | |
| def __lt__(self, other) -> bool: | |
| if not isinstance(other, TimeOfDay): | |
| return False | |
| def get_tuple(tod: TimeOfDay): | |
| return ( | |
| tod.hour, | |
| tod.minute, | |
| tod.second, | |
| tod.millisecond or 0, | |
| ) | |
| return get_tuple(self) < get_tuple(other) | |
| @dataclasses.dataclass(frozen=True) | |
| class Column: | |
| """Represents a column in the table.""" | |
| type: ColumnType | |
| # The column label. | |
| id: str | |
| # The header name. | |
| label: str | None | |
| @classmethod | |
| def from_col(cls, column): | |
| col_type = column.get("type") | |
| if not col_type: | |
| raise ValueError('Column is missing "type" field') | |
| col_id = column.get("id", "") | |
| if not col_id: | |
| # This is technically optional, but it seems to always | |
| # be returned, so I'll just require it. | |
| raise ValueError('Column is missing "id" field') | |
| label = column.get("label") | |
| # There are also 'pattern' and 'p' keys, but we can ignore. | |
| return cls(ColumnType(col_type), col_id, label) | |
| @dataclasses.dataclass(frozen=True) | |
| class Cell: | |
| """Represents a cell in the table.""" | |
| header: Column | |
| value: bool | int | float | str | datetime.date | TimeOfDay | None | |
| formatted: str | None | |
| @classmethod | |
| def from_cell(cls, header: Column, cell): | |
| if cell is None: | |
| return cls(header, None, None) | |
| return cls( | |
| header, | |
| _value_from_column_type(header.type, cell.get("v")), | |
| cell.get("f"), | |
| ) | |
| class DataTable: | |
| """Represents a two-dimensional, immutable table of values. | |
| Note: Google has the `gviz-api` package which also defines this, but | |
| it's a little hard to use. | |
| Properties: | |
| headers (tuple[Column]): The column headers. | |
| data (tuple[tuple[Cell]]): The rows of data. Each row has the | |
| same length as `headers`. | |
| json (Any): The raw JSON object from the response. | |
| See: | |
| https://developers.google.com/chart/interactive/docs/reference#DataTable | |
| https://developers.google.com/chart/interactive/docs/dev/gviz_api_lib | |
| """ | |
| def __init__(self, table): | |
| self._json = table | |
| if not table: | |
| raise ValueError('"table" is not populated') | |
| if not isinstance(table, dict): | |
| raise ValueError('"table" must be a mapping') | |
| cols = table.get("cols") | |
| rows = table.get("rows") | |
| # There is also a 'parsedNumHeaders' key, but it seems to not be | |
| # important. | |
| if cols is None: | |
| raise ValueError('"table" mapping is missing "cols" key') | |
| if rows is None: | |
| raise ValueError('"table" mapping is missing "rows" key') | |
| if not isinstance(cols, list): | |
| raise ValueError('"table.cols" must be a list') | |
| if not isinstance(rows, list): | |
| raise ValueError('"table.rows" must be a list') | |
| self._headers = tuple(Column.from_col(col) for col in cols) | |
| data_rows = [] | |
| for row in rows: | |
| cells = row["c"] | |
| data_rows.append( | |
| tuple( | |
| Cell.from_cell(header, cell) | |
| for header, cell in zip(self._headers, cells) | |
| ) | |
| # Add the remaining cells that don't have any values. | |
| + tuple( | |
| Cell(self._headers[i], None, None) | |
| for i in range(len(cells), len(self._headers)) | |
| ) | |
| ) | |
| self._data = tuple(data_rows) | |
| @property | |
| def headers(self) -> tuple[Column]: | |
| return self._headers | |
| @property | |
| def data(self) -> tuple[tuple[Cell]]: | |
| return self._data | |
| @property | |
| def json(self): | |
| return self._json | |
| def print_table(self): | |
| """Prints the data as a table.""" | |
| headers = ["#"] | |
| widths = [len(h) for h in headers] | |
| for col in self._headers: | |
| header_str = col.id | |
| if col.label: | |
| header_str += f": {col.label}" | |
| headers.append(header_str) | |
| widths.append(len(header_str)) | |
| rows = [] | |
| for i, row in enumerate(self._data): | |
| row_strs = [str(i + 1)] | |
| for cell in row: | |
| if cell.formatted is not None: | |
| row_strs.append(cell.formatted) | |
| elif cell.value is not None: | |
| row_strs.append(str(cell.value)) | |
| else: | |
| row_strs.append("") | |
| rows.append(row_strs) | |
| for i, value in enumerate(row_strs): | |
| widths[i] = max(widths[i], len(value)) | |
| LINE = "+".join(("", *("-" * (width + 2) for width in widths), "")) | |
| def make_row_str(row): | |
| return "|".join( | |
| ( | |
| "", | |
| *( | |
| f" {value.ljust(width)} " | |
| for width, value in zip(widths, row) | |
| ), | |
| "", | |
| ) | |
| ) | |
| print(LINE) | |
| print(make_row_str(headers)) | |
| print(LINE) | |
| if len(rows) == 0: | |
| print("No rows") | |
| else: | |
| for row in rows: | |
| print(make_row_str(row)) | |
| print(LINE) | |
| def _value_from_column_type(col_type: ColumnType, value): | |
| if value is None: | |
| return None | |
| # Booleans, numbers, and strings are expected to already have the | |
| # proper type. | |
| if col_type in [ColumnType.BOOLEAN, ColumnType.NUMBER, ColumnType.STRING]: | |
| return value | |
| if col_type is ColumnType.DATE: | |
| return _parse_date(value) | |
| if col_type is ColumnType.DATETIME: | |
| return _parse_datetime(value) | |
| if col_type is ColumnType.TIME_OF_DAY: | |
| if not isinstance(value, list): | |
| raise TypeError( | |
| f"Expected list for {col_type!r} value, but got " | |
| f"{type(value).__class__.__name__}: {value}" | |
| ) | |
| for x in value: | |
| if not isinstance(x, int): | |
| raise TypeError( | |
| f"Expected list of integers for {col_type!r} value, but " | |
| f"got: {value}" | |
| ) | |
| if len(value) == 3: | |
| value.append(None) | |
| elif len(value) == 4: | |
| pass | |
| else: | |
| raise TypeError( | |
| f"Expected tuple of 3 or 4 integers for {col_type!r} value, " | |
| f"but got: {value}" | |
| ) | |
| return TimeOfDay(*value) | |
| raise RuntimeError(f"Unexpected header type: {col_type}") | |
| def _parse_date(date_str: str) -> datetime.date: | |
| m = DATE_RE.match(date_str) | |
| if m is None: | |
| raise ValueError(f"Failed to parse date: {date_str}") | |
| year, month, day = map(int, m.group("year", "month0", "day")) | |
| # JavaScript uses 0-indexed months, but Python uses 1-indexed | |
| # months. | |
| month += 1 | |
| try: | |
| return datetime.date(year, month, day) | |
| except ValueError as e: | |
| raise ValueError(f"Failed to parse date: {date_str}") from e | |
| def _parse_datetime(datetime_str: str) -> datetime.datetime: | |
| m = DATETIME_RE.match(datetime_str) | |
| if m is None: | |
| raise ValueError(f"Failed to parse datetime: {datetime_str}") | |
| year, month, day, hour, minute, second = map( | |
| int, m.group("year", "month0", "day", "hour", "minute", "second") | |
| ) | |
| # JavaScript uses 0-indexed months, but Python uses 1-indexed | |
| # months. | |
| month += 1 | |
| try: | |
| return datetime.datetime(year, month, day, hour, minute, second) | |
| except ValueError as e: | |
| raise ValueError(f"Failed to parse datetime: {datetime_str}") from e | |
| def query_spreadsheet( | |
| spreadsheet_id: str, | |
| sheet_id: int | str | None = None, | |
| *, | |
| query: str = "", | |
| headers: int = 1, | |
| timeout_seconds: int = 60, | |
| ) -> DataTable: | |
| """Queries the given spreadsheet. | |
| Args: | |
| spreadsheet_id (str): The spreadsheet ID. | |
| sheet_id (int | str | None): The sheet ID. If None, the first | |
| sheet in the spreadsheet will be queried (even if hidden). | |
| query (str): The query to execute. Note that the column labels | |
| (e.g., "A", "B", "AC", etc.) must be used, not numbered | |
| columns (e.g., "Col1", "Col10", etc.). | |
| See | |
| https://developers.google.com/chart/interactive/docs/querylanguage. | |
| headers (int): The number of headers. | |
| timeout_seconds (int): The timeout for the request in seconds. | |
| Returns: | |
| DataTable: The resulting data. | |
| Raises: | |
| RuntimeError: If the request failed. | |
| ValueError: If the response could not be parsed. | |
| """ | |
| url = f"https://docs.google.com/spreadsheets/d/{spreadsheet_id}/gviz/tq" | |
| params = { | |
| "tq": query, | |
| "tqx": "out:json", | |
| "headers": headers, | |
| "gid": sheet_id, | |
| } | |
| r = requests.get(url, params, timeout=timeout_seconds) | |
| r.raise_for_status() | |
| if r.status_code != requests.codes.ok: # pylint: disable=no-member | |
| raise RuntimeError(f"Request failed with status {r.status_code}") | |
| res_text = r.text | |
| m = RESPONSE_RE.fullmatch(res_text) | |
| if not m: | |
| raise ValueError(f"Failed to parse response:\n{res_text}") | |
| try: | |
| json_data = json.loads(m.group("data")) | |
| except ValueError as e: | |
| raise ValueError( | |
| f"Failed to parse response JSON data:\n{res_text}" | |
| ) from e | |
| status = json_data.get("status") | |
| if status != "ok": | |
| error_messages = [ | |
| msg | |
| for err in json_data.get("errors", []) | |
| if (msg := err.get("detailed_message")) | |
| ] | |
| if not error_messages: | |
| error_messages.append(res_text) | |
| raise ValueError(f"Failed with status {status!r}: {error_messages}") | |
| try: | |
| return DataTable(json_data.get("table")) | |
| except Exception as e: | |
| raise ValueError(f"Invalid JSON data:\n{res_text}") from e | |
| def main() -> int: | |
| script, *args = sys.argv | |
| USAGE = ( | |
| f"Usage: python {script} " | |
| "SPREADSHEET_ID [SHEET_ID] [QUERY] [HEADERS] [TIMEOUT_SECONDS]" | |
| ) | |
| if len(args) == 0: | |
| print("Must provide spreadsheet ID") | |
| print(USAGE) | |
| return 1 | |
| spreadsheet_id = args[0] | |
| sheet_id = None | |
| query = "" | |
| headers = 1 | |
| timeout_seconds = 60 | |
| if len(args) >= 2: | |
| sheet_id = args[1] | |
| if len(args) >= 3: | |
| query = args[2] | |
| if len(args) >= 4: | |
| try: | |
| headers = int(args[3]) | |
| except ValueError: | |
| print("`headers` must be a positive integer:", args[3]) | |
| print(USAGE) | |
| return 1 | |
| if headers <= 0: | |
| print("`headers` must be a positive integer:", args[3]) | |
| print(USAGE) | |
| return 1 | |
| if len(args) >= 5: | |
| try: | |
| timeout_seconds = int(args[4]) | |
| except ValueError: | |
| print("`timeout_seconds` must be a positive integer:", args[4]) | |
| print(USAGE) | |
| return 1 | |
| if timeout_seconds <= 0: | |
| print("`timeout_seconds` must be a positive integer:", args[4]) | |
| print(USAGE) | |
| return 1 | |
| query_spreadsheet( | |
| spreadsheet_id, | |
| sheet_id, | |
| query=query, | |
| headers=headers, | |
| timeout_seconds=timeout_seconds, | |
| ).print_table() | |
| return 0 | |
| if __name__ == "__main__": | |
| sys.exit(main() or 0) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * @fileoverview | |
| * Fetches data from a public Google Sheets using the Graph Visualization Query | |
| * Language. | |
| * | |
| * See: https://developers.google.com/chart/interactive/docs/querylanguage | |
| * | |
| * Hint: Run with: `npx tsx publicSheetFetch.ts` | |
| * | |
| * ============================================================================= | |
| * Changelog | |
| * ============================================================================= | |
| * | |
| * v1.0 (2025-11-24) | |
| * - Initial version | |
| */ | |
| const RESPONSE_RE = | |
| /^\/\*O_o\*\/\ngoogle\.visualization\.Query\.setResponse\((?<data>.*)\);$/; | |
| enum ColumnType { | |
| BOOLEAN = "boolean", | |
| NUMBER = "number", | |
| STRING = "string", | |
| DATE = "date", | |
| DATETIME = "datetime", | |
| TIME_OF_DAY = "timeofday", | |
| } | |
| type Column = { | |
| type: ColumnType; | |
| id: string; | |
| label: string | null; | |
| }; | |
| type TimeOfDay = { | |
| hour: number; | |
| minute: number; | |
| second: number; | |
| millisecond?: number; | |
| }; | |
| type CellValue = boolean | number | string | Date | TimeOfDay | null; | |
| type Cell = { | |
| header: Column; | |
| value: CellValue; | |
| formatted: string | null; | |
| }; | |
| type RawColumnType = | |
| | "boolean" | |
| | "number" | |
| | "string" | |
| | "date" | |
| | "datetime" | |
| | "timeofday"; | |
| type RawCellValue = boolean | number | string | Date | number[] | null; | |
| type RawTable = { | |
| cols: Array<{ type: RawColumnType; id: string; label?: string | null }>; | |
| rows: Array<{ c: Array<{ v: RawCellValue; f?: string | null } | null> }>; | |
| }; | |
| function rawColumnTypeToColumnType(raw: RawColumnType): ColumnType { | |
| switch (raw) { | |
| case "boolean": | |
| return ColumnType.BOOLEAN; | |
| case "number": | |
| return ColumnType.NUMBER; | |
| case "string": | |
| return ColumnType.STRING; | |
| case "date": | |
| return ColumnType.DATE; | |
| case "datetime": | |
| return ColumnType.DATETIME; | |
| case "timeofday": | |
| return ColumnType.TIME_OF_DAY; | |
| } | |
| } | |
| function rawCellValueToCellValue( | |
| columnType: ColumnType, | |
| raw: RawCellValue | undefined | |
| ): CellValue { | |
| if (raw == null) return null; | |
| if (Array.isArray(raw)) { | |
| if (columnType !== ColumnType.TIME_OF_DAY) { | |
| throw new Error('Value is an array, but column type is not "timeofday"'); | |
| } | |
| for (const x of raw) { | |
| if (!Number.isInteger(x)) { | |
| throw new Error( | |
| 'Expected list of integers for "timeofday" column, but got: ' + | |
| JSON.stringify(raw) | |
| ); | |
| } | |
| } | |
| let millisecond: number | undefined = undefined; | |
| if (raw.length === 3) { | |
| // Pass. | |
| } else if (raw.length === 4) { | |
| millisecond = raw[3]; | |
| } else { | |
| throw new Error( | |
| 'Expected list of 3 or 4 integers for "timeofday" column, but got: ' + | |
| JSON.stringify(raw) | |
| ); | |
| } | |
| return { hour: raw[0], minute: raw[1], second: raw[2], millisecond }; | |
| } | |
| if (columnType === ColumnType.TIME_OF_DAY) { | |
| throw new Error( | |
| 'Expected list of integers for "timeofday" column, but got: ' + | |
| JSON.stringify(raw) | |
| ); | |
| } | |
| // The values for everything else is expected to have the proper type. | |
| return raw; | |
| } | |
| /** | |
| * Represents a two-dimensional, immutable table of values. | |
| * | |
| * Note: Google has `google.visualization.*` classes, but they're a little hard | |
| * to use. | |
| * | |
| * See: | |
| * https://developers.google.com/chart/interactive/docs/reference#DataTable | |
| * https://developers.google.com/chart/interactive/docs/dev/gviz_api_lib | |
| */ | |
| class DataTable { | |
| /** The column headers. */ | |
| public readonly headers: ReadonlyArray<Column>; | |
| /** The rows of data. Each row has the same length as `headers`. */ | |
| public readonly data: ReadonlyArray<ReadonlyArray<Cell>>; | |
| /** The raw JSON object from the response. */ | |
| public readonly json: RawTable; | |
| constructor(table: unknown | undefined) { | |
| if (!table) { | |
| throw new Error('"table" is not populated'); | |
| } | |
| this.json = table as RawTable; | |
| const cols = this.json.cols; | |
| const rows = this.json.rows; | |
| if (cols == null) { | |
| throw new Error('"table" mapping is missing "cols" key'); | |
| } | |
| if (rows == null) { | |
| throw new Error('"table" mapping is missing "rows" key'); | |
| } | |
| if (!Array.isArray(cols)) { | |
| throw new Error('"table.cols" must be a list'); | |
| } | |
| if (!Array.isArray(rows)) { | |
| throw new Error('"table.rows" must be a list'); | |
| } | |
| this.headers = cols.map((col, i) => { | |
| const colName = `table.cols[${i}]`; | |
| if (!col) { | |
| throw new Error(`"${colName}" is not populated`); | |
| } | |
| const colType = col.type; | |
| if (!colType) { | |
| throw new Error(`"${colName}" is missing "type" field`); | |
| } | |
| const colId = col.id; | |
| if (!colId) { | |
| // This is technically optional, but it seems to always be returned, so | |
| // I'll just require it. | |
| throw new Error(`"${colName}" is missing "id" field`); | |
| } | |
| const label = col.label ?? null; | |
| return { type: rawColumnTypeToColumnType(colType), id: colId, label }; | |
| }); | |
| this.data = rows.map((row, i) => { | |
| const rowName = `table.rows[${i}]`; | |
| const rawCells = row?.c; | |
| if (!rawCells) { | |
| throw new Error(`"${rowName}" is not populated`); | |
| } | |
| if (!Array.isArray(rawCells)) { | |
| throw new Error(`"${rowName}.c" must be a list`); | |
| } | |
| return this.headers.map((header, j) => { | |
| // Even if there aren't enough raw cells, this is handled gracefully. | |
| const cell = rawCells[j]; | |
| let value; | |
| try { | |
| value = rawCellValueToCellValue(header.type, cell?.v); | |
| } catch (err) { | |
| throw new Error(`"${rowName}.c[${j}].v": ${err}`); | |
| } | |
| return { | |
| header, | |
| value, | |
| formatted: cell?.f ?? null, | |
| }; | |
| }) satisfies Cell[]; | |
| }); | |
| } | |
| } | |
| /** | |
| * Queries the given spreadsheet. | |
| * | |
| * @param spreadsheetId - The spreadsheet ID. | |
| * @param sheetId - The sheet ID. If `null`, the first sheet in the spreadsheet | |
| * will be queried (even if hidden). | |
| * @param options - Options. | |
| * @param options.query - The query to execute. Note that the column labels | |
| * (e.g., "A", "B", "AC", etc.) must be used, not numbered columns (e.g., | |
| * "Col1", "Col10", etc.). | |
| * | |
| * See https://developers.google.com/chart/interactive/docs/querylanguage. | |
| * @param options.headers - The number of headers. | |
| * @returns {Promise<DataTable>} The resulting data. | |
| * @throws {Error} If the request failed. | |
| * @throws {Error} If the response could not be parsed. | |
| */ | |
| async function querySpreadsheet( | |
| spreadsheetId: string, | |
| sheetId: number | string | null = null, | |
| { query = "", headers = 1 }: Partial<{ query: string; headers: number }> = {} | |
| ): Promise<DataTable> { | |
| const url = new URL( | |
| `https://docs.google.com/spreadsheets/d/${spreadsheetId}/gviz/tq` | |
| ); | |
| if (query) { | |
| url.searchParams.set("tq", query); | |
| } | |
| url.searchParams.set("tqx", "out:json"); | |
| url.searchParams.set("headers", String(headers)); | |
| if (sheetId !== null) { | |
| url.searchParams.set("gid", String(sheetId)); | |
| } | |
| const res = await fetch(url); | |
| if (!res.ok || res.status !== 200) { | |
| throw new Error(`Request failed with status: ${res.status}`); | |
| } | |
| const resText = await res.text(); | |
| const match = RESPONSE_RE.exec(resText); | |
| if (!match) { | |
| throw new Error(`Failed to parse response:\n${resText}`); | |
| } | |
| let jsonData; | |
| try { | |
| jsonData = JSON.parse(match.groups!["data"]); | |
| } catch (err) { | |
| throw new Error( | |
| `Failed to parse response JSON data:\n${err}\n\n${resText}` | |
| ); | |
| } | |
| const status = jsonData.status; | |
| if (status != "ok") { | |
| const errorMessages = []; | |
| for (const err of jsonData.errors ?? []) { | |
| if (!err) continue; | |
| const msg = err.detailed_message; | |
| if (!msg) continue; | |
| errorMessages.push(msg); | |
| } | |
| if (errorMessages.length === 0) { | |
| errorMessages.push(resText); | |
| } | |
| throw new Error( | |
| `Failed with status ${JSON.stringify(status)}: ` + | |
| `${JSON.stringify(errorMessages)}` | |
| ); | |
| } | |
| try { | |
| return new DataTable(jsonData.table); | |
| } catch (err) { | |
| throw new Error(`Invalid JSON data:\n${err}\n\n${resText}`); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment