Skip to content

Instantly share code, notes, and snippets.

@monarchmaisuriya
Created October 30, 2025 05:36
Show Gist options
  • Select an option

  • Save monarchmaisuriya/ebc273ac2962bb2248aaac1a6bfeafec to your computer and use it in GitHub Desktop.

Select an option

Save monarchmaisuriya/ebc273ac2962bb2248aaac1a6bfeafec to your computer and use it in GitHub Desktop.
Search Web Using SearXNG
"""
title: Web Search using SearXNG and Scrape first N Pages
Usage:
from web_search import Tools
import asyncio
async def main():
tools = Tools()
# Search and scrape top results
print(await tools.search_web("site:example.com privacy policy"))
# Fetch a single page and normalize content
print(await tools.get_website("https://example.com"))
asyncio.run(main())
"""
import re
import json
from urllib.parse import urlparse
import concurrent.futures
import unicodedata
from dataclasses import dataclass
from typing import Callable, Any, Optional
import inspect
from bs4 import BeautifulSoup
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import requests
class HelpFunctions:
"""Utility helpers for parsing and sanitizing web content.
Centralizes reusable transformations:
- HTML-to-text normalization with tag stripping and whitespace collapsing
- Emoji filtering based on Unicode categories
- Text truncation and base URL extraction
Used by both the web search flow and direct website fetching.
"""
def __init__(self):
"""Initialize helper utilities; currently stateless."""
def get_base_url(self, url):
"""Return the scheme+hostname part of a URL.
- Accepts any absolute URL and extracts `scheme://host`.
- Useful for grouping citations by site or avoiding overlong labels.
Example: for "https://example.com/path?a=1", returns "https://example.com".
"""
parsed_url = urlparse(url)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
return base_url
def generate_excerpt(self, content, max_length=200):
"""Create a short excerpt with an ellipsis if content is long.
- Extracts the first `max_length` characters and appends "..." if truncated.
- Intended for quick previews; full content is separately normalized/truncated.
"""
return content[:max_length] + "..." if len(content) > max_length else content
def format_text(self, original_text):
"""Convert HTML to normalized plain text in a single pass.
- Drops noisy elements like `script`, `style`, and `noscript` before extraction
- Extracts visible text using BeautifulSoup with `separator=" "` to preserve spacing
- Normalizes Unicode to NFKC and collapses runs of whitespace
- Removes emoji-like symbols via `remove_emojis`
Returns a clean, single-line text suitable for indexing or display.
"""
# Parse HTML to plain text once
soup = BeautifulSoup(original_text, "html.parser")
# Drop non-content elements to avoid noise in extracted text
for tag in soup(["script", "style", "noscript"]):
tag.decompose()
formatted_text = soup.get_text(separator=" ", strip=True)
formatted_text = unicodedata.normalize("NFKC", formatted_text)
formatted_text = re.sub(r"\s+", " ", formatted_text)
formatted_text = formatted_text.strip()
formatted_text = self.remove_emojis(formatted_text)
return formatted_text
def remove_emojis(self, text):
"""Filter out characters classified as 'Symbol, other' (So).
- Removes many pictographic/emoji characters to improve readability and matching.
- Keeps alphanumeric and punctuation intact.
"""
return "".join(c for c in text if not unicodedata.category(c).startswith("So"))
def process_search_result(
self,
result,
valves,
session: requests.Session,
headers: dict,
ignored_hosts: list,
):
"""Fetch and process a single search result URL.
Behavior:
- Validates result fields and skips non-`http(s)` URLs
- Applies domain ignore rules (exact host or subdomain match)
- Uses shared `requests.Session` with retries/backoff from the caller
- Parses HTML-only; non-HTML returns a short skip message
- Normalizes and truncates text to `valves.PAGE_CONTENT_WORDS_LIMIT`
Returns a dict with `title`, `url`, `content`, and `snippet`, or `None` on failure.
"""
# Validate presence of expected fields and filter invalid schemes
title_raw = result.get("title")
url_site = result.get("url")
if not url_site or not isinstance(url_site, str):
return None
parsed = urlparse(url_site)
if parsed.scheme.lower() not in {"http", "https"}:
return None
title_site = self.remove_emojis(title_raw or "")
snippet = result.get("content", "")
# Check if website host is in ignored list (exact or subdomain)
if ignored_hosts:
host = urlparse(url_site).netloc.lower()
if any(host == h or host.endswith("." + h) for h in ignored_hosts):
return None
try:
response_site = session.get(
url_site,
headers=headers,
timeout=(valves.CONNECT_TIMEOUT, valves.READ_TIMEOUT),
)
response_site.raise_for_status()
html_content = response_site.text
ctype = (response_site.headers.get("Content-Type", "") or "").lower()
# Only attempt to parse HTML-like responses; skip others gracefully
if "html" not in ctype:
return {
"title": title_site,
"url": url_site,
"content": f"Non-HTML content skipped (Content-Type: {ctype})",
"snippet": self.remove_emojis(snippet),
}
content_site = self.format_text(html_content)
truncated_content = self.truncate_to_n_words(
content_site, valves.PAGE_CONTENT_WORDS_LIMIT
)
return {
"title": title_site,
"url": url_site,
"content": truncated_content,
"snippet": self.remove_emojis(snippet),
}
except requests.exceptions.RequestException:
return None
def truncate_to_n_words(self, text, token_limit):
"""Truncate `text` to at most `token_limit` whitespace-separated words.
- Splits on whitespace; avoids mid-word cuts for cleaner excerpts.
- Used after normalization to enforce content size limits.
"""
tokens = text.split()
truncated_tokens = tokens[:token_limit]
return " ".join(truncated_tokens)
class EventEmitter:
"""Async-friendly status emitter that calls a provided callback.
- Accepts either an async or sync callable and invokes it appropriately
- Emits structured status events with `type="status"` and `data` payload
- Used to report progress, errors, and completion without blocking the loop
"""
def __init__(self, event_emitter: Optional[Callable[[dict], Any]] = None):
"""Store an optional callback to receive status events."""
self.event_emitter = event_emitter
async def emit(self, description="Unknown State", status="in_progress", done=False):
"""Emit a status event to the external callback, if available.
Event payload:
- `type`: always "status"
- `data.status`: one of "in_progress", "error", "complete"
- `data.description`: short human-readable message
- `data.done`: boolean indicating whether the operation is finished
"""
if self.event_emitter:
event = {
"type": "status",
"data": {
"status": status,
"description": description,
"done": done,
},
}
# Support both async and sync callbacks
if inspect.iscoroutinefunction(self.event_emitter):
await self.event_emitter(event)
else:
self.event_emitter(event)
class Tools:
"""Core web search tools.
- `search_web`: queries a SearXNG-compatible endpoint and scrapes top results
- `get_website`: fetches and normalizes content for a single URL
Shares a resilient `requests.Session` configured with retry/backoff.
"""
@dataclass
class Valves:
"""Configuration knobs for the tool's behavior.
- `SEARXNG_ENGINE_API_BASE_URL`: SearXNG-compatible JSON search endpoint
- `IGNORED_WEBSITES`: comma-separated hosts to skip (supports subdomains)
- `RETURNED_SCRAPPED_PAGES_NO`: max results included in final output
- `SCRAPPED_PAGES_NO`: number of search results to fetch/process
- `PAGE_CONTENT_WORDS_LIMIT`: word cap per page after normalization
- `CITATION_LINKS`: emit citation events with content and source metadata
- `MAX_WORKERS`: upper bound for thread pool workers when scraping
- `CONNECT_TIMEOUT`/`READ_TIMEOUT`: network timeouts in seconds for requests
"""
SEARXNG_ENGINE_API_BASE_URL: str = "https://example.com/search"
IGNORED_WEBSITES: str = ""
RETURNED_SCRAPPED_PAGES_NO: int = 3
SCRAPPED_PAGES_NO: int = 5
PAGE_CONTENT_WORDS_LIMIT: int = 5000
CITATION_LINKS: bool = False
# Upper bound for thread pool workers when scraping pages
MAX_WORKERS: int = 16
# Network timeout settings (seconds)
CONNECT_TIMEOUT: int = 5
READ_TIMEOUT: int = 20
def __init__(self):
"""Initialize configuration, HTTP headers, and a resilient Session.
Behavior:
- Creates a shared `requests.Session` for connection reuse
- Configures `Retry` with conservative defaults and GET-only retries
- Mounts the adapter on both `http://` and `https://`
- Sets default headers for broad server compatibility
"""
self.valves = self.Valves()
self.headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/117.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Encoding": "gzip, deflate",
}
# Reuse connections and add retries for resilience
self.session = requests.Session()
# Configure conservative retries to handle transient errors and rate limits
retry = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
method_whitelist=frozenset(["GET"]),
)
adapter = HTTPAdapter(max_retries=retry)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
async def search_web(
self,
query: str,
__event_emitter__: Optional[Callable[[dict], Any]] = None,
) -> str:
"""Search SearXNG and scrape the top N relevant pages.
Behavior:
- Calls the configured search endpoint and validates the JSON payload
- Deduplicates and filters to `http(s)` URLs, then limits to `SCRAPPED_PAGES_NO`
- Precomputes ignored hosts and skips matching domains/subdomains
- Processes pages concurrently in a bounded thread pool (up to `MAX_WORKERS`)
- Stops early once `RETURNED_SCRAPPED_PAGES_NO` valid pages are collected
- Optionally emits citation events for each included result
Returns a JSON string of normalized page objects.
"""
functions = HelpFunctions()
emitter = EventEmitter(__event_emitter__)
await emitter.emit(f"Initiating web search for: {query}")
search_engine_url = self.valves.SEARXNG_ENGINE_API_BASE_URL
# Guard against default/placeholder misconfiguration
if search_engine_url.startswith("https://example.com"):
await emitter.emit(
status="error",
description="Search engine URL not configured",
done=True,
)
return json.dumps({"error": "Search engine URL not configured"})
# Ensure RETURNED_SCRAPPED_PAGES_NO does not exceed SCRAPPED_PAGES_NO
if self.valves.RETURNED_SCRAPPED_PAGES_NO > self.valves.SCRAPPED_PAGES_NO:
self.valves.RETURNED_SCRAPPED_PAGES_NO = self.valves.SCRAPPED_PAGES_NO
params = {
"q": query,
"format": "json",
"number_of_results": self.valves.RETURNED_SCRAPPED_PAGES_NO,
}
try:
await emitter.emit("Sending request to search engine")
resp = self.session.get(
search_engine_url, params=params, headers=self.headers, timeout=(5, 15)
)
resp.raise_for_status()
# Validate JSON payload
data = resp.json()
if not isinstance(data, dict):
raise ValueError("Unexpected search response format")
results = data.get("results", [])
if not isinstance(results, list):
raise ValueError("Search results are not a list")
# Deduplicate by URL before limiting, and keep only http(s)
seen_urls = set()
filtered = []
for r in results:
url_r = r.get("url")
if not url_r or not isinstance(url_r, str):
continue
p = urlparse(url_r)
if p.scheme.lower() not in {"http", "https"}:
continue
if url_r in seen_urls:
continue
seen_urls.add(url_r)
filtered.append(r)
limited_results = filtered[: self.valves.SCRAPPED_PAGES_NO]
await emitter.emit(f"Retrieved {len(limited_results)} search results")
except (requests.exceptions.RequestException, ValueError) as e:
await emitter.emit(
status="error",
description=f"Error during search: {str(e)}",
done=True,
)
return json.dumps({"error": str(e)})
results_json = []
if limited_results:
await emitter.emit("Processing search results")
# Precompute normalized ignored host list once
ignored_hosts: list[str] = []
if self.valves.IGNORED_WEBSITES:
ignored_hosts = [
s.strip().lower().lstrip(".")
for s in self.valves.IGNORED_WEBSITES.split(",")
if s.strip()
]
# Bound concurrency to avoid oversubscription and reduce contention
max_workers = max(
1,
min(
self.valves.MAX_WORKERS,
len(limited_results),
self.valves.SCRAPPED_PAGES_NO,
),
)
with concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
) as executor:
futures = [
executor.submit(
functions.process_search_result,
result,
self.valves,
self.session,
self.headers,
ignored_hosts,
)
for result in limited_results
]
for future in concurrent.futures.as_completed(futures):
result_json = future.result()
if result_json:
try:
json.dumps(result_json)
results_json.append(result_json)
except (TypeError, ValueError):
continue
# Early stop once we have the requested number of valid pages
if len(results_json) >= self.valves.RETURNED_SCRAPPED_PAGES_NO:
break
results_json = results_json[: self.valves.RETURNED_SCRAPPED_PAGES_NO]
if self.valves.CITATION_LINKS and __event_emitter__:
for result in results_json:
await __event_emitter__(
{
"type": "citation",
"data": {
"document": [result["content"]],
"metadata": [{"source": result["url"]}],
"source": {"name": result["title"]},
},
}
)
await emitter.emit(
status="complete",
description=f"Web search completed. Retrieved content from {len(results_json)} pages",
done=True,
)
return json.dumps(results_json, ensure_ascii=False)
async def get_website(
self, url: str, __event_emitter__: Optional[Callable[[dict], Any]] = None
) -> str:
"""Fetch a single URL and return normalized content.
Behavior:
- Uses the shared session with retry/backoff and configured timeouts
- Parses only HTML-like responses; non-HTML emits a clear skip message
- Normalizes text via `format_text` and truncates to `PAGE_CONTENT_WORDS_LIMIT`
- Optionally emits a citation event with source metadata
Returns a JSON string containing one normalized result object.
"""
functions = HelpFunctions()
emitter = EventEmitter(__event_emitter__)
await emitter.emit(f"Fetching content from URL: {url}")
results_json = []
try:
response_site = self.session.get(
url,
headers=self.headers,
timeout=(self.valves.CONNECT_TIMEOUT, self.valves.READ_TIMEOUT),
)
response_site.raise_for_status()
html_content = response_site.text
await emitter.emit("Parsing website content")
ctype = response_site.headers.get("Content-Type", "")
if "html" in ctype:
soup = BeautifulSoup(html_content, "html.parser")
title_tag = soup.title
page_title = (
title_tag.get_text(strip=True) if title_tag else "No title found"
)
else:
page_title = "Non-HTML content"
page_title = unicodedata.normalize("NFKC", page_title)
page_title = functions.remove_emojis(page_title)
title_site = page_title
url_site = url
if "html" in ctype:
content_site = functions.format_text(html_content)
else:
content_site = f"Non-HTML content skipped (Content-Type: {ctype})"
truncated_content = functions.truncate_to_n_words(
content_site, self.valves.PAGE_CONTENT_WORDS_LIMIT
)
result_site = {
"title": title_site,
"url": url_site,
"content": truncated_content,
"excerpt": functions.generate_excerpt(content_site),
}
results_json.append(result_site)
if self.valves.CITATION_LINKS and __event_emitter__:
await __event_emitter__(
{
"type": "citation",
"data": {
"document": [truncated_content],
"metadata": [{"source": url_site}],
"source": {"name": title_site},
},
}
)
await emitter.emit(
status="complete",
description="Website content retrieved and processed successfully",
done=True,
)
except requests.exceptions.RequestException as e:
results_json.append(
{
"url": url,
"content": f"Failed to retrieve the page. Error: {str(e)}",
}
)
await emitter.emit(
status="error",
description=f"Error fetching website content: {str(e)}",
done=True,
)
return json.dumps(results_json, ensure_ascii=False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment