Created
October 30, 2025 05:36
-
-
Save monarchmaisuriya/ebc273ac2962bb2248aaac1a6bfeafec to your computer and use it in GitHub Desktop.
Search Web Using SearXNG
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| title: Web Search using SearXNG and Scrape first N Pages | |
| Usage: | |
| from web_search import Tools | |
| import asyncio | |
| async def main(): | |
| tools = Tools() | |
| # Search and scrape top results | |
| print(await tools.search_web("site:example.com privacy policy")) | |
| # Fetch a single page and normalize content | |
| print(await tools.get_website("https://example.com")) | |
| asyncio.run(main()) | |
| """ | |
| import re | |
| import json | |
| from urllib.parse import urlparse | |
| import concurrent.futures | |
| import unicodedata | |
| from dataclasses import dataclass | |
| from typing import Callable, Any, Optional | |
| import inspect | |
| from bs4 import BeautifulSoup | |
| from requests.adapters import HTTPAdapter | |
| from urllib3.util.retry import Retry | |
| import requests | |
| class HelpFunctions: | |
| """Utility helpers for parsing and sanitizing web content. | |
| Centralizes reusable transformations: | |
| - HTML-to-text normalization with tag stripping and whitespace collapsing | |
| - Emoji filtering based on Unicode categories | |
| - Text truncation and base URL extraction | |
| Used by both the web search flow and direct website fetching. | |
| """ | |
| def __init__(self): | |
| """Initialize helper utilities; currently stateless.""" | |
| def get_base_url(self, url): | |
| """Return the scheme+hostname part of a URL. | |
| - Accepts any absolute URL and extracts `scheme://host`. | |
| - Useful for grouping citations by site or avoiding overlong labels. | |
| Example: for "https://example.com/path?a=1", returns "https://example.com". | |
| """ | |
| parsed_url = urlparse(url) | |
| base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" | |
| return base_url | |
| def generate_excerpt(self, content, max_length=200): | |
| """Create a short excerpt with an ellipsis if content is long. | |
| - Extracts the first `max_length` characters and appends "..." if truncated. | |
| - Intended for quick previews; full content is separately normalized/truncated. | |
| """ | |
| return content[:max_length] + "..." if len(content) > max_length else content | |
| def format_text(self, original_text): | |
| """Convert HTML to normalized plain text in a single pass. | |
| - Drops noisy elements like `script`, `style`, and `noscript` before extraction | |
| - Extracts visible text using BeautifulSoup with `separator=" "` to preserve spacing | |
| - Normalizes Unicode to NFKC and collapses runs of whitespace | |
| - Removes emoji-like symbols via `remove_emojis` | |
| Returns a clean, single-line text suitable for indexing or display. | |
| """ | |
| # Parse HTML to plain text once | |
| soup = BeautifulSoup(original_text, "html.parser") | |
| # Drop non-content elements to avoid noise in extracted text | |
| for tag in soup(["script", "style", "noscript"]): | |
| tag.decompose() | |
| formatted_text = soup.get_text(separator=" ", strip=True) | |
| formatted_text = unicodedata.normalize("NFKC", formatted_text) | |
| formatted_text = re.sub(r"\s+", " ", formatted_text) | |
| formatted_text = formatted_text.strip() | |
| formatted_text = self.remove_emojis(formatted_text) | |
| return formatted_text | |
| def remove_emojis(self, text): | |
| """Filter out characters classified as 'Symbol, other' (So). | |
| - Removes many pictographic/emoji characters to improve readability and matching. | |
| - Keeps alphanumeric and punctuation intact. | |
| """ | |
| return "".join(c for c in text if not unicodedata.category(c).startswith("So")) | |
| def process_search_result( | |
| self, | |
| result, | |
| valves, | |
| session: requests.Session, | |
| headers: dict, | |
| ignored_hosts: list, | |
| ): | |
| """Fetch and process a single search result URL. | |
| Behavior: | |
| - Validates result fields and skips non-`http(s)` URLs | |
| - Applies domain ignore rules (exact host or subdomain match) | |
| - Uses shared `requests.Session` with retries/backoff from the caller | |
| - Parses HTML-only; non-HTML returns a short skip message | |
| - Normalizes and truncates text to `valves.PAGE_CONTENT_WORDS_LIMIT` | |
| Returns a dict with `title`, `url`, `content`, and `snippet`, or `None` on failure. | |
| """ | |
| # Validate presence of expected fields and filter invalid schemes | |
| title_raw = result.get("title") | |
| url_site = result.get("url") | |
| if not url_site or not isinstance(url_site, str): | |
| return None | |
| parsed = urlparse(url_site) | |
| if parsed.scheme.lower() not in {"http", "https"}: | |
| return None | |
| title_site = self.remove_emojis(title_raw or "") | |
| snippet = result.get("content", "") | |
| # Check if website host is in ignored list (exact or subdomain) | |
| if ignored_hosts: | |
| host = urlparse(url_site).netloc.lower() | |
| if any(host == h or host.endswith("." + h) for h in ignored_hosts): | |
| return None | |
| try: | |
| response_site = session.get( | |
| url_site, | |
| headers=headers, | |
| timeout=(valves.CONNECT_TIMEOUT, valves.READ_TIMEOUT), | |
| ) | |
| response_site.raise_for_status() | |
| html_content = response_site.text | |
| ctype = (response_site.headers.get("Content-Type", "") or "").lower() | |
| # Only attempt to parse HTML-like responses; skip others gracefully | |
| if "html" not in ctype: | |
| return { | |
| "title": title_site, | |
| "url": url_site, | |
| "content": f"Non-HTML content skipped (Content-Type: {ctype})", | |
| "snippet": self.remove_emojis(snippet), | |
| } | |
| content_site = self.format_text(html_content) | |
| truncated_content = self.truncate_to_n_words( | |
| content_site, valves.PAGE_CONTENT_WORDS_LIMIT | |
| ) | |
| return { | |
| "title": title_site, | |
| "url": url_site, | |
| "content": truncated_content, | |
| "snippet": self.remove_emojis(snippet), | |
| } | |
| except requests.exceptions.RequestException: | |
| return None | |
| def truncate_to_n_words(self, text, token_limit): | |
| """Truncate `text` to at most `token_limit` whitespace-separated words. | |
| - Splits on whitespace; avoids mid-word cuts for cleaner excerpts. | |
| - Used after normalization to enforce content size limits. | |
| """ | |
| tokens = text.split() | |
| truncated_tokens = tokens[:token_limit] | |
| return " ".join(truncated_tokens) | |
| class EventEmitter: | |
| """Async-friendly status emitter that calls a provided callback. | |
| - Accepts either an async or sync callable and invokes it appropriately | |
| - Emits structured status events with `type="status"` and `data` payload | |
| - Used to report progress, errors, and completion without blocking the loop | |
| """ | |
| def __init__(self, event_emitter: Optional[Callable[[dict], Any]] = None): | |
| """Store an optional callback to receive status events.""" | |
| self.event_emitter = event_emitter | |
| async def emit(self, description="Unknown State", status="in_progress", done=False): | |
| """Emit a status event to the external callback, if available. | |
| Event payload: | |
| - `type`: always "status" | |
| - `data.status`: one of "in_progress", "error", "complete" | |
| - `data.description`: short human-readable message | |
| - `data.done`: boolean indicating whether the operation is finished | |
| """ | |
| if self.event_emitter: | |
| event = { | |
| "type": "status", | |
| "data": { | |
| "status": status, | |
| "description": description, | |
| "done": done, | |
| }, | |
| } | |
| # Support both async and sync callbacks | |
| if inspect.iscoroutinefunction(self.event_emitter): | |
| await self.event_emitter(event) | |
| else: | |
| self.event_emitter(event) | |
| class Tools: | |
| """Core web search tools. | |
| - `search_web`: queries a SearXNG-compatible endpoint and scrapes top results | |
| - `get_website`: fetches and normalizes content for a single URL | |
| Shares a resilient `requests.Session` configured with retry/backoff. | |
| """ | |
| @dataclass | |
| class Valves: | |
| """Configuration knobs for the tool's behavior. | |
| - `SEARXNG_ENGINE_API_BASE_URL`: SearXNG-compatible JSON search endpoint | |
| - `IGNORED_WEBSITES`: comma-separated hosts to skip (supports subdomains) | |
| - `RETURNED_SCRAPPED_PAGES_NO`: max results included in final output | |
| - `SCRAPPED_PAGES_NO`: number of search results to fetch/process | |
| - `PAGE_CONTENT_WORDS_LIMIT`: word cap per page after normalization | |
| - `CITATION_LINKS`: emit citation events with content and source metadata | |
| - `MAX_WORKERS`: upper bound for thread pool workers when scraping | |
| - `CONNECT_TIMEOUT`/`READ_TIMEOUT`: network timeouts in seconds for requests | |
| """ | |
| SEARXNG_ENGINE_API_BASE_URL: str = "https://example.com/search" | |
| IGNORED_WEBSITES: str = "" | |
| RETURNED_SCRAPPED_PAGES_NO: int = 3 | |
| SCRAPPED_PAGES_NO: int = 5 | |
| PAGE_CONTENT_WORDS_LIMIT: int = 5000 | |
| CITATION_LINKS: bool = False | |
| # Upper bound for thread pool workers when scraping pages | |
| MAX_WORKERS: int = 16 | |
| # Network timeout settings (seconds) | |
| CONNECT_TIMEOUT: int = 5 | |
| READ_TIMEOUT: int = 20 | |
| def __init__(self): | |
| """Initialize configuration, HTTP headers, and a resilient Session. | |
| Behavior: | |
| - Creates a shared `requests.Session` for connection reuse | |
| - Configures `Retry` with conservative defaults and GET-only retries | |
| - Mounts the adapter on both `http://` and `https://` | |
| - Sets default headers for broad server compatibility | |
| """ | |
| self.valves = self.Valves() | |
| self.headers = { | |
| "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/117.0", | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", | |
| "Accept-Encoding": "gzip, deflate", | |
| } | |
| # Reuse connections and add retries for resilience | |
| self.session = requests.Session() | |
| # Configure conservative retries to handle transient errors and rate limits | |
| retry = Retry( | |
| total=3, | |
| backoff_factor=1, | |
| status_forcelist=[429, 500, 502, 503, 504], | |
| method_whitelist=frozenset(["GET"]), | |
| ) | |
| adapter = HTTPAdapter(max_retries=retry) | |
| self.session.mount("http://", adapter) | |
| self.session.mount("https://", adapter) | |
| async def search_web( | |
| self, | |
| query: str, | |
| __event_emitter__: Optional[Callable[[dict], Any]] = None, | |
| ) -> str: | |
| """Search SearXNG and scrape the top N relevant pages. | |
| Behavior: | |
| - Calls the configured search endpoint and validates the JSON payload | |
| - Deduplicates and filters to `http(s)` URLs, then limits to `SCRAPPED_PAGES_NO` | |
| - Precomputes ignored hosts and skips matching domains/subdomains | |
| - Processes pages concurrently in a bounded thread pool (up to `MAX_WORKERS`) | |
| - Stops early once `RETURNED_SCRAPPED_PAGES_NO` valid pages are collected | |
| - Optionally emits citation events for each included result | |
| Returns a JSON string of normalized page objects. | |
| """ | |
| functions = HelpFunctions() | |
| emitter = EventEmitter(__event_emitter__) | |
| await emitter.emit(f"Initiating web search for: {query}") | |
| search_engine_url = self.valves.SEARXNG_ENGINE_API_BASE_URL | |
| # Guard against default/placeholder misconfiguration | |
| if search_engine_url.startswith("https://example.com"): | |
| await emitter.emit( | |
| status="error", | |
| description="Search engine URL not configured", | |
| done=True, | |
| ) | |
| return json.dumps({"error": "Search engine URL not configured"}) | |
| # Ensure RETURNED_SCRAPPED_PAGES_NO does not exceed SCRAPPED_PAGES_NO | |
| if self.valves.RETURNED_SCRAPPED_PAGES_NO > self.valves.SCRAPPED_PAGES_NO: | |
| self.valves.RETURNED_SCRAPPED_PAGES_NO = self.valves.SCRAPPED_PAGES_NO | |
| params = { | |
| "q": query, | |
| "format": "json", | |
| "number_of_results": self.valves.RETURNED_SCRAPPED_PAGES_NO, | |
| } | |
| try: | |
| await emitter.emit("Sending request to search engine") | |
| resp = self.session.get( | |
| search_engine_url, params=params, headers=self.headers, timeout=(5, 15) | |
| ) | |
| resp.raise_for_status() | |
| # Validate JSON payload | |
| data = resp.json() | |
| if not isinstance(data, dict): | |
| raise ValueError("Unexpected search response format") | |
| results = data.get("results", []) | |
| if not isinstance(results, list): | |
| raise ValueError("Search results are not a list") | |
| # Deduplicate by URL before limiting, and keep only http(s) | |
| seen_urls = set() | |
| filtered = [] | |
| for r in results: | |
| url_r = r.get("url") | |
| if not url_r or not isinstance(url_r, str): | |
| continue | |
| p = urlparse(url_r) | |
| if p.scheme.lower() not in {"http", "https"}: | |
| continue | |
| if url_r in seen_urls: | |
| continue | |
| seen_urls.add(url_r) | |
| filtered.append(r) | |
| limited_results = filtered[: self.valves.SCRAPPED_PAGES_NO] | |
| await emitter.emit(f"Retrieved {len(limited_results)} search results") | |
| except (requests.exceptions.RequestException, ValueError) as e: | |
| await emitter.emit( | |
| status="error", | |
| description=f"Error during search: {str(e)}", | |
| done=True, | |
| ) | |
| return json.dumps({"error": str(e)}) | |
| results_json = [] | |
| if limited_results: | |
| await emitter.emit("Processing search results") | |
| # Precompute normalized ignored host list once | |
| ignored_hosts: list[str] = [] | |
| if self.valves.IGNORED_WEBSITES: | |
| ignored_hosts = [ | |
| s.strip().lower().lstrip(".") | |
| for s in self.valves.IGNORED_WEBSITES.split(",") | |
| if s.strip() | |
| ] | |
| # Bound concurrency to avoid oversubscription and reduce contention | |
| max_workers = max( | |
| 1, | |
| min( | |
| self.valves.MAX_WORKERS, | |
| len(limited_results), | |
| self.valves.SCRAPPED_PAGES_NO, | |
| ), | |
| ) | |
| with concurrent.futures.ThreadPoolExecutor( | |
| max_workers=max_workers | |
| ) as executor: | |
| futures = [ | |
| executor.submit( | |
| functions.process_search_result, | |
| result, | |
| self.valves, | |
| self.session, | |
| self.headers, | |
| ignored_hosts, | |
| ) | |
| for result in limited_results | |
| ] | |
| for future in concurrent.futures.as_completed(futures): | |
| result_json = future.result() | |
| if result_json: | |
| try: | |
| json.dumps(result_json) | |
| results_json.append(result_json) | |
| except (TypeError, ValueError): | |
| continue | |
| # Early stop once we have the requested number of valid pages | |
| if len(results_json) >= self.valves.RETURNED_SCRAPPED_PAGES_NO: | |
| break | |
| results_json = results_json[: self.valves.RETURNED_SCRAPPED_PAGES_NO] | |
| if self.valves.CITATION_LINKS and __event_emitter__: | |
| for result in results_json: | |
| await __event_emitter__( | |
| { | |
| "type": "citation", | |
| "data": { | |
| "document": [result["content"]], | |
| "metadata": [{"source": result["url"]}], | |
| "source": {"name": result["title"]}, | |
| }, | |
| } | |
| ) | |
| await emitter.emit( | |
| status="complete", | |
| description=f"Web search completed. Retrieved content from {len(results_json)} pages", | |
| done=True, | |
| ) | |
| return json.dumps(results_json, ensure_ascii=False) | |
| async def get_website( | |
| self, url: str, __event_emitter__: Optional[Callable[[dict], Any]] = None | |
| ) -> str: | |
| """Fetch a single URL and return normalized content. | |
| Behavior: | |
| - Uses the shared session with retry/backoff and configured timeouts | |
| - Parses only HTML-like responses; non-HTML emits a clear skip message | |
| - Normalizes text via `format_text` and truncates to `PAGE_CONTENT_WORDS_LIMIT` | |
| - Optionally emits a citation event with source metadata | |
| Returns a JSON string containing one normalized result object. | |
| """ | |
| functions = HelpFunctions() | |
| emitter = EventEmitter(__event_emitter__) | |
| await emitter.emit(f"Fetching content from URL: {url}") | |
| results_json = [] | |
| try: | |
| response_site = self.session.get( | |
| url, | |
| headers=self.headers, | |
| timeout=(self.valves.CONNECT_TIMEOUT, self.valves.READ_TIMEOUT), | |
| ) | |
| response_site.raise_for_status() | |
| html_content = response_site.text | |
| await emitter.emit("Parsing website content") | |
| ctype = response_site.headers.get("Content-Type", "") | |
| if "html" in ctype: | |
| soup = BeautifulSoup(html_content, "html.parser") | |
| title_tag = soup.title | |
| page_title = ( | |
| title_tag.get_text(strip=True) if title_tag else "No title found" | |
| ) | |
| else: | |
| page_title = "Non-HTML content" | |
| page_title = unicodedata.normalize("NFKC", page_title) | |
| page_title = functions.remove_emojis(page_title) | |
| title_site = page_title | |
| url_site = url | |
| if "html" in ctype: | |
| content_site = functions.format_text(html_content) | |
| else: | |
| content_site = f"Non-HTML content skipped (Content-Type: {ctype})" | |
| truncated_content = functions.truncate_to_n_words( | |
| content_site, self.valves.PAGE_CONTENT_WORDS_LIMIT | |
| ) | |
| result_site = { | |
| "title": title_site, | |
| "url": url_site, | |
| "content": truncated_content, | |
| "excerpt": functions.generate_excerpt(content_site), | |
| } | |
| results_json.append(result_site) | |
| if self.valves.CITATION_LINKS and __event_emitter__: | |
| await __event_emitter__( | |
| { | |
| "type": "citation", | |
| "data": { | |
| "document": [truncated_content], | |
| "metadata": [{"source": url_site}], | |
| "source": {"name": title_site}, | |
| }, | |
| } | |
| ) | |
| await emitter.emit( | |
| status="complete", | |
| description="Website content retrieved and processed successfully", | |
| done=True, | |
| ) | |
| except requests.exceptions.RequestException as e: | |
| results_json.append( | |
| { | |
| "url": url, | |
| "content": f"Failed to retrieve the page. Error: {str(e)}", | |
| } | |
| ) | |
| await emitter.emit( | |
| status="error", | |
| description=f"Error fetching website content: {str(e)}", | |
| done=True, | |
| ) | |
| return json.dumps(results_json, ensure_ascii=False) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment