Skip to content

Instantly share code, notes, and snippets.

@pringlized
Last active July 28, 2025 15:21
Show Gist options
  • Select an option

  • Save pringlized/b4a30676d8fb03449a8fdf3e52fa59b4 to your computer and use it in GitHub Desktop.

Select an option

Save pringlized/b4a30676d8fb03449a8fdf3e52fa59b4 to your computer and use it in GitHub Desktop.
OpenWebUI pipe to connect to a local FastAPI PydanticAI Server
"""
title: FastAPI Razor Pipe
author: Jim Pringle
version: 0.0.3
This function connects to a local FastAPI server running an Ollama model.
"""
from typing import Optional, Callable, Awaitable
from pydantic import BaseModel, Field
import time
import requests
class Pipe:
class Valves(BaseModel):
fastapi_url: str = Field(default="http://localhost:8000/v1/chat/completions")
model: str = Field(default="llama3.2:latest")
emit_interval: float = Field(
default=2.0, description="Interval in seconds between status emissions"
)
enable_status_indicator: bool = Field(
default=True, description="Enable or disable status indicator emissions"
)
def __init__(self):
self.type = "pipe"
self.id = "fastapi_razor_pipe"
self.name = "FastAPI Razor Pipe"
self.valves = self.Valves()
self.last_emit_time = 0
async def emit_status(
self,
__event_emitter__: Callable[[dict], Awaitable[None]],
level: str,
message: str,
done: bool,
):
current_time = time.time()
if (
__event_emitter__
and self.valves.enable_status_indicator
and (
current_time - self.last_emit_time >= self.valves.emit_interval or done
)
):
await __event_emitter__(
{
"type": "status",
"data": {
"status": "complete" if done else "in_progress",
"level": level,
"description": message,
"done": done,
},
}
)
self.last_emit_time = current_time
async def pipe(
self,
body: dict,
__user__: Optional[dict] = None,
__event_emitter__: Callable[[dict], Awaitable[None]] = None,
__event_call__: Callable[[dict], Awaitable[dict]] = None,
) -> Optional[dict]:
await self.emit_status(
__event_emitter__, "info", "Calling FastAPI Ollama server...", False
)
messages = body.get("messages", [])
if messages:
user_message = messages[-1]["content"]
try:
# Construct request payload
payload = {
"model": self.valves.model,
"messages": [{"role": "user", "content": user_message}],
}
# Send request to FastAPI server
headers = {"Content-Type": "application/json"}
response = requests.post(
self.valves.fastapi_url, json=payload, headers=headers
)
# Handle response
if response.status_code == 200:
api_response = response.json()
print(api_response)
assistant_reply = api_response["choices"][0]["message"]["content"]
print(assistant_reply)
else:
raise Exception(f"Error: {response.status_code} - {response.text}")
# Append assistant's response to the message list
body["messages"].append(
{"role": "assistant", "content": assistant_reply}
)
except Exception as e:
error_msg = f"Error during request: {str(e)}"
await self.emit_status(__event_emitter__, "error", error_msg, True)
# Ensure the error is appended to messages, so it's visible in Open WebUI
body["messages"].append({"role": "assistant", "content": error_msg})
else:
error_msg = "No messages found in the request body"
await self.emit_status(__event_emitter__, "error", error_msg, True)
# Append a system error message to chat history
body["messages"].append({"role": "assistant", "content": error_msg})
await self.emit_status(__event_emitter__, "info", "Request complete", True)
return assistant_reply # Always return the modified body
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment