Skip to content

Instantly share code, notes, and snippets.

@hp0404
Created November 17, 2025 15:29
Show Gist options
  • Select an option

  • Save hp0404/3764a79e8b5f9ac054aac718ede0218b to your computer and use it in GitHub Desktop.

Select an option

Save hp0404/3764a79e8b5f9ac054aac718ede0218b to your computer and use it in GitHub Desktop.
import json
import logging
from datetime import datetime
from typing import Any
from sqlalchemy import create_engine, text
logger = logging.getLogger("llm_logger")
DEFAULT_DATABASE_URL = (
"CONNECTION_STRING"
)
def llm_call(
*,
system_prompt: str | None = None,
user_message: str | None = None,
model: str | None = None,
provider: str | None = None,
temperature: float = 1.0,
completion: Any | None = None,
task: str | None = None,
completion_type: str | None = None,
set_daily_max_tokens: int | None = None, # kept for backwards compatibility, ignored
is_batched: bool = False,
database_url: str | None = None,
) -> None:
"""
Minimal standalone logger for LLM calls.
Inserts new rows into the existing `llm_interactions` table (no daily limit checks).
"""
if completion is None:
raise ValueError("completion must not be None")
completion_id = getattr(completion, "id", None)
if completion_id is None:
raise ValueError("completion object must have an 'id' attribute")
created_ts = getattr(completion, "created", None)
if created_ts:
created_at = datetime.fromtimestamp(created_ts)
else:
created_at = datetime.now()
# Extract token usage
token_fields = {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0,
"prompt_tokens_cached": 0,
"prompt_tokens_audio": 0,
"completion_tokens_reasoning": 0,
"completion_tokens_audio": 0,
"completion_tokens_accepted": 0,
"completion_tokens_rejected": 0,
}
usage = getattr(completion, "usage", None)
if usage is not None:
token_fields["prompt_tokens"] = getattr(usage, "prompt_tokens", 0) or 0
token_fields["completion_tokens"] = getattr(usage, "completion_tokens", 0) or 0
token_fields["total_tokens"] = getattr(usage, "total_tokens", 0) or 0
prompt_details = getattr(usage, "prompt_tokens_details", None)
if prompt_details is not None:
token_fields["prompt_tokens_cached"] = (
getattr(prompt_details, "cached_tokens", 0) or 0
)
token_fields["prompt_tokens_audio"] = (
getattr(prompt_details, "audio_tokens", 0) or 0
)
completion_details = getattr(usage, "completion_tokens_details", None)
if completion_details is not None:
token_fields["completion_tokens_reasoning"] = (
getattr(completion_details, "reasoning_tokens", 0) or 0
)
token_fields["completion_tokens_audio"] = (
getattr(completion_details, "audio_tokens", 0) or 0
)
token_fields["completion_tokens_accepted"] = (
getattr(completion_details, "accepted_prediction_tokens", 0) or 0
)
token_fields["completion_tokens_rejected"] = (
getattr(completion_details, "rejected_prediction_tokens", 0) or 0
)
# Provider / model / task / completion_type defaults
provider = "kaggle"
if completion_type is None:
completion_type = getattr(completion, "object", "chat")
if model is None:
model_val = getattr(completion, "model", None)
if model_val is None:
raise ValueError("Either 'model' argument or completion.model must be set")
model = str(model_val)
if "/" not in model:
model = f"openai/{model}"
if task is None:
task = "chat_completion"
user_message_db = user_message or ""
# Convert completion object to JSON-serialisable dict for JSONB
if hasattr(completion, "model_dump"):
completion_dict = completion.model_dump()
elif hasattr(completion, "dict"):
completion_dict = completion.dict()
elif isinstance(completion, dict):
completion_dict = completion
else:
completion_dict = {
k: getattr(completion, k)
for k in dir(completion)
if not k.startswith("_")
and not callable(getattr(completion, k))
}
completion_json = json.dumps(completion_dict)
db_url = database_url or os.getenv("LLMLOGER_DATABASE_URL", DEFAULT_DATABASE_URL)
engine = create_engine(db_url)
try:
with engine.begin() as conn:
# Duplicate check
exists = conn.execute(
text(
"SELECT 1 FROM llm_interactions "
"WHERE completion_id = :cid LIMIT 1"
),
{"cid": completion_id},
).scalar()
if exists:
logger.debug(
"Skipping duplicate LLM interaction with ID %s", completion_id
)
return
params = {
"completion_id": completion_id,
"completion_type": completion_type,
"task": task,
"model": model,
"provider": provider,
"temperature": float(temperature),
"system_message": system_prompt,
"user_message": user_message_db,
"created_at": created_at,
"completion": completion_json,
"is_batched": bool(is_batched),
"error": None,
**token_fields,
}
conn.execute(
text(
"""
INSERT INTO llm_interactions (
completion_id,
completion_type,
task,
model,
provider,
temperature,
system_message,
user_message,
created_at,
completion,
prompt_tokens,
completion_tokens,
total_tokens,
prompt_tokens_cached,
prompt_tokens_audio,
completion_tokens_reasoning,
completion_tokens_audio,
completion_tokens_accepted,
completion_tokens_rejected,
error,
is_batched
)
VALUES (
:completion_id,
:completion_type,
:task,
:model,
:provider,
:temperature,
:system_message,
:user_message,
:created_at,
CAST(:completion AS jsonb),
:prompt_tokens,
:completion_tokens,
:total_tokens,
:prompt_tokens_cached,
:prompt_tokens_audio,
:completion_tokens_reasoning,
:completion_tokens_audio,
:completion_tokens_accepted,
:completion_tokens_rejected,
:error,
:is_batched
)
"""
),
params,
)
logger.debug("Logged LLM interaction with ID %s", completion_id)
except Exception:
logger.error("Failed to log LLM interaction", exc_info=True)
raise
@hp0404
Copy link
Author

hp0404 commented Nov 17, 2025

usage:

def call_api(
    client: OpenAI,
    model: str,
    system_prompt: str,
    user_message: str,
    cache_file: Path,
) -> Classification | None:

    # we want to avoid making the second call and guarantee a ChatCompletion object
    if cache_file.exists():
        with cache_file.open("r", encoding="utf-8") as f:
            cached_response = f.read()
        completion = ChatCompletion.model_validate_json(cached_response)
        completion.choices[0].message.parsed = Classification(
            **completion.choices[0].message.parsed
        )
        return completion.choices[0].message.parsed

    completion = client.chat.completions.parse(
        model=model,
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_message},
        ],
        response_format=Classification,
    )

    with cache_file.open("w", encoding="utf-8") as f:
        f.write(completion.to_json())

    llm_call(
        system_prompt=system_prompt,
        user_message=user_message,
        model="oss:20b",
        provider="local",
        completion=completion,
        task="wacko - annotation",
        completion_type="chat.completions.parse",
        set_daily_max_tokens=False,
    )

    return completion.choices[0].message.parsed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment