Skip to content

Instantly share code, notes, and snippets.

@akiraaisha
Last active December 11, 2025 14:38
Show Gist options
  • Select an option

  • Save akiraaisha/34574625b65fcc51f4793991c0f1652a to your computer and use it in GitHub Desktop.

Select an option

Save akiraaisha/34574625b65fcc51f4793991c0f1652a to your computer and use it in GitHub Desktop.
"""
title: Paperless-ngx RAG Filter
author: Sherlock Think Alpha
author_url: https://github.com/open-webui/pipelines/
description: RAG Filter for Paperless-ngx. Retrieves relevant docs, injects them into the prompt, and lets the LLM answer.
required_open_webui_version: 0.4.3
requirements: requests, pydantic
version: 1.2
license: MIT
"""
from typing import List, Union, Generator, Iterator, Dict, Any, Optional, Callable, Awaitable
from pydantic import BaseModel, Field
import requests
import os
import time
from datetime import datetime
from logging import getLogger
logger = getLogger(__name__)
logger.setLevel("INFO")
class Pipeline:
class Valves(BaseModel):
# Core Config
PAPERLESS_URL: str = Field(default="http://192.168.0.156:8000", description="Paperless-ngx URL")
PAPERLESS_TOKEN: str = Field(default="1ce945c1f9f87fed485a8ebf276b39c6c2b56328", description="Paperless-ngx API Token")
# Retrieval Settings
TOP_K: int = Field(default=5, description="Number of documents to retrieve")
MAX_CONTENT_LEN: int = Field(default=6000, description="Max characters per document to prevent context overflow")
ENABLED: bool = Field(default=True, description="Enable RAG injection")
# Behavior
CITATION_MODE: bool = Field(default=True, description="Append instructions to cite Document IDs")
DEBUG_MODE: bool = Field(default=False, description="Log detailed retrieval info to console")
def __init__(self):
self.name = "Paperless-ngx RAG"
self._update_valves()
def _update_valves(self):
# Load environment variables if set, otherwise use defaults
self.valves = self.Valves(
**{k: os.getenv(k, v.default) for k, v in self.Valves.model_fields.items()}
)
self.valves.PAPERLESS_URL = self.valves.PAPERLESS_URL.rstrip('/')
def _get_headers(self):
return {'Authorization': f'Token {self.valves.PAPERLESS_TOKEN}'}
def _search_paperless(self, query: str) -> List[Dict]:
"""
Searches for documents and retrieves their content.
"""
if not self.valves.PAPERLESS_TOKEN:
return []
# 1. Search for IDs
search_params = {
"query": query,
"page_size": self.valves.TOP_K,
"ordering": "-created"
}
try:
search_url = f"{self.valves.PAPERLESS_URL}/api/documents/"
resp = requests.get(search_url, headers=self._get_headers(), params=search_params, timeout=5)
resp.raise_for_status()
results = resp.json().get('results', [])
except Exception as e:
logger.error(f"Paperless Search Error: {e}")
return []
# 2. Fetch Content for results
# Note: We still do N+1 here because search results usually don't contain full content.
# However, we limit this by TOP_K.
documents = []
for doc in results:
try:
doc_id = doc.get("id")
# Get full details including content
detail_url = f"{self.valves.PAPERLESS_URL}/api/documents/{doc_id}/"
detail_resp = requests.get(detail_url, headers=self._get_headers(), timeout=5)
if detail_resp.status_code == 200:
data = detail_resp.json()
content = data.get("content", "").strip()
if content:
documents.append({
"id": doc_id,
"title": data.get("title", f"Doc {doc_id}"),
"created": data.get("created", "")[:10],
"content": content[:self.valves.MAX_CONTENT_LEN],
"url": f"{self.valves.PAPERLESS_URL}/documents/{doc_id}"
})
except Exception as e:
logger.error(f"Error fetching doc {doc.get('id')}: {e}")
continue
return documents
def _format_injection(self, docs: List[Dict]) -> str:
"""
Formats the docs into a context block with explicit Link instructions.
"""
if not docs:
return ""
# We build a 'Source List' to help the LLM understand the mapping clearly
source_list = []
context_body = ""
for index, doc in enumerate(docs):
doc_ref = index + 1
source_list.append(f"[{doc_ref}] {doc['title']}: {doc['url']}")
context_body += (
f"\n---\n"
f"**Source ID:** [{doc_ref}]\n"
f"**Title:** {doc['title']}\n"
f"**Date:** {doc['created']}\n"
f"**Direct Link:** {doc['url']}\n"
f"**Content snippet:**\n{doc['content']}\n"
)
# Join the list of sources for the system prompt
sources_str = "\n".join(source_list)
instructions = (
"### INSTRUCTIONS:\n"
"1. Answer the user's question using ONLY the context provided below.\n"
"2. **CITATIONS ARE MANDATORY:** When you reference information, you MUST create a Markdown link using the Title and Direct Link provided.\n"
"3. Format: `[Document Title](Direct Link)`.\n"
"4. Example: 'According to the [Meralco Invoice](http://192.x.x.x...), the total is...'\n"
)
return f"{instructions}\n\n### RETRIEVED DOCUMENTS:\n{context_body}\n\n---\n"
async def inlet(self, body: dict, user: Optional[dict] = None, __event_emitter__: Callable[[Any], Awaitable[None]] = None) -> dict:
if not self.valves.ENABLED:
return body
messages = body.get("messages", [])
if not messages or messages[-1]["role"] != "user":
return body
query = messages[-1]["content"]
# 1. Emit Status
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"description": "๐Ÿ” Searching Paperless...", "done": False}})
# 2. Retrieve
start_time = time.time()
docs = self._search_paperless(query)
duration = time.time() - start_time
# 3. Inject
if docs:
context_block = self._format_injection(docs)
# We wrap the User Query to ensure the LLM sees the Context FIRST
messages[-1]["content"] = (
f"{context_block}\n\n"
f"### USER QUESTION:\n{query}"
)
status_msg = f"Found {len(docs)} docs (took {duration:.2f}s)"
else:
status_msg = "No docs found."
# 4. Final Status
if __event_emitter__:
await __event_emitter__({"type": "status", "data": {"description": status_msg, "done": True}})
body["messages"] = messages
return body
async def inlet(self, body: dict, user: Optional[dict] = None, __event_emitter__: Callable[[Any], Awaitable[None]] = None) -> dict:
"""
The Filter Hook. Runs BEFORE the request is sent to the LLM.
"""
if not self.valves.ENABLED:
return body
messages = body.get("messages", [])
if not messages:
return body
# Get the last user message
last_message = messages[-1]
if last_message["role"] != "user":
return body
query = last_message["content"]
# 1. Emit Status Update (UI Spinner)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": "๐Ÿ” Searching Paperless docs...", "done": False},
}
)
# 2. Perform Retrieval
start_time = time.time()
docs = self._search_paperless(query)
duration = time.time() - start_time
# 3. Inject Context
if docs:
context_block = self._format_injection(docs)
# Prepend context to the user's message (Standard RAG pattern)
last_message["content"] = f"{context_block}\n\n### User Question:\n{query}"
status_msg = f"Found {len(docs)} docs in {duration:.2f}s"
else:
status_msg = "No relevant docs found in Paperless."
# 4. Emit Final Status
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {"description": status_msg, "done": True},
}
)
# 5. Return modified body to Open WebUI
body["messages"][-1] = last_message
return body
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment