Skip to content

Instantly share code, notes, and snippets.

@grahama1970
Last active November 29, 2025 14:38
Show Gist options
  • Select an option

  • Save grahama1970/b73bb255087e672d936c3314ffba7a55 to your computer and use it in GitHub Desktop.

Select an option

Save grahama1970/b73bb255087e672d936c3314ffba7a55 to your computer and use it in GitHub Desktop.
“Bash helper script that wraps ripgrep or similar tools to provide fast, interactive search from Warp terminal, simplifying grep-style workflows in projects.”
#!/usr/bin/env python3
"""
chibi_grep.py — minimal local "WarpGrep-style" search primitive for LLM agents.
http://googleusercontent.com/image_generation_content/5
This script is intentionally *simpler* than Morph's WarpGrep and does not
implement an MCP server:
- It is a single-file CLI wrapper around ripgrep (`rg --json`).
- It runs ONE ripgrep process over a local workspace per invocation.
- It groups matches per file into contiguous line ranges (with optional context).
- It prints a single JSON object to stdout.
What it deliberately does NOT do:
- No internal "search strategy" or planning:
- The *agent* is responsible for choosing patterns, paths, and when to call it.
- No semantic / embedding / FAISS / tree-sitter indexing.
- No MCP server boilerplate, no remote execution providers, no pricing/quotas.
Use this script when:
- You control the agent/orchestrator loop, and
- You just want a fast, predictable lexical search primitive that returns
compact file+range JSON for the agent to rerank and reason over.
"""
from __future__ import annotations
import json
import os
import subprocess
import sys
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Set
import typer
app = typer.Typer(add_completion=False)
def _build_rg_command(
root: Path,
patterns: List[str],
include_hidden: bool,
exclude: List[str],
include_glob: List[str],
threads: int,
) -> List[str]:
"""
Build the ripgrep command-line for a single search over the workspace.
"""
cmd: List[str] = [
"rg",
"--json",
"--line-number",
"--no-messages", # suppress summary chatter
]
if include_hidden:
cmd.append("--hidden")
# Exclude patterns (directories / globs)
for pat in exclude:
# Allow user to pass either "node_modules" or "!node_modules"
normalized = pat[1:] if pat.startswith("!") else pat
cmd.extend(["--glob", f"!{normalized}"])
# Additional include globs
for glob in include_glob:
cmd.extend(["--glob", glob])
if threads > 0:
cmd.extend(["--threads", str(threads)])
# All patterns in a single rg invocation
for pat in patterns:
cmd.extend(["-e", pat])
cmd.append(str(root))
return cmd
def _collect_matches(cmd: List[str]) -> Dict[str, Set[int]]:
"""
Run ripgrep and collect a mapping:
{ "path/to/file": {line_numbers} }
Treat exit code 1 ("no matches") as a non-error.
"""
matches: Dict[str, Set[int]] = defaultdict(set)
try:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
except FileNotFoundError:
typer.echo(
"Error: ripgrep (rg) is not installed or not on PATH.",
err=True,
)
raise typer.Exit(code=2)
assert proc.stdout is not None
for line in proc.stdout:
line = line.strip()
if not line:
continue
try:
event = json.loads(line)
except json.JSONDecodeError:
# Ignore any non-JSON noise
continue
if event.get("type") != "match":
continue
data = event.get("data", {})
path_info = data.get("path", {})
file_path = path_info.get("text")
line_number = data.get("line_number")
if file_path and isinstance(line_number, int):
matches[file_path].add(line_number)
# Drain stderr to avoid zombie processes; ignore its content
if proc.stderr is not None:
_ = proc.stderr.read()
proc.wait()
# rg exits 1 when no matches are found; that's fine.
if proc.returncode not in (0, 1):
typer.echo(f"Warning: rg exited with code {proc.returncode}", err=True)
return matches
def _group_and_merge_ranges(
lines: List[int],
context: int,
) -> List[Dict[str, object]]:
"""
Group sorted line numbers into contiguous ranges, add context on each side,
then merge overlapping ranges that context expansion may have created.
"""
if not lines:
return []
# Unique & sorted
lines_sorted = sorted(set(lines))
ranges: List[Dict[str, object]] = []
start = prev = lines_sorted[0]
current_matches = [lines_sorted[0]]
# First pass: contiguous blocks (before context)
for ln in lines_sorted[1:]:
if ln == prev + 1:
prev = ln
current_matches.append(ln)
else:
ranges.append(
{
"start_line": max(1, start - context),
"end_line": prev + context,
"match_lines": current_matches.copy(),
}
)
start = prev = ln
current_matches = [ln]
# Close last block
ranges.append(
{
"start_line": max(1, start - context),
"end_line": prev + context,
"match_lines": current_matches.copy(),
}
)
if not ranges:
return ranges
# Second pass: merge overlapping ranges caused by context expansion
ranges.sort(key=lambda r: r["start_line"]) # type: ignore[arg-type]
merged: List[Dict[str, object]] = [ranges[0]]
for r in ranges[1:]:
last = merged[-1]
if r["start_line"] <= last["end_line"]:
# Overlap: extend the end_line and merge match_lines
if r["end_line"] > last["end_line"]:
last["end_line"] = r["end_line"]
# Deduplicate and sort match_lines
merged_lines = set(last["match_lines"]) # type: ignore[arg-type]
merged_lines.update(r["match_lines"]) # type: ignore[arg-type]
last["match_lines"] = sorted(merged_lines)
else:
merged.append(r)
return merged
def _build_result_json(
root: Path,
patterns: List[str],
matches: Dict[str, Set[int]],
max_files: int | None,
context: int,
) -> Dict[str, object]:
"""
Turn {file_path -> {line_numbers}} into a JSON-serializable result.
"""
root_abs = root.resolve()
# Simple, mechanical ordering: more matches first.
scored_files = sorted(
matches.items(),
key=lambda kv: len(kv[1]),
reverse=True,
)
if max_files is not None:
scored_files = scored_files[:max_files]
results = []
for file_path, line_numbers in scored_files:
ranges = _group_and_merge_ranges(list(line_numbers), context=context)
results.append(
{
"file": os.path.relpath(file_path, root_abs),
"total_matches": len(line_numbers),
"ranges": ranges,
}
)
return {
"root": str(root_abs),
"patterns": patterns,
"total_files_with_matches": len(matches),
"results": results,
}
@app.command()
def search(
patterns: List[str] = typer.Argument(
...,
help="Lexical search patterns (each passed to ripgrep as -e PATTERN).",
),
root: Path = typer.Option(
Path("."),
"--root",
"-r",
help="Root directory to search.",
),
max_files: int = typer.Option(
32,
"--max-files",
"-n",
min=1,
help="Maximum number of files to include in results.",
),
no_limit: bool = typer.Option(
False,
"--no-limit",
help="If set, do not cap the number of files (ignore --max-files).",
),
context: int = typer.Option(
0,
"--context",
"-c",
min=0,
help="Context lines before/after each contiguous block of matches.",
),
include_hidden: bool = typer.Option(
True,
"--hidden/--no-hidden",
help="Include hidden files and directories in the search.",
),
exclude: List[str] = typer.Option(
[".git", "node_modules"],
"--exclude",
help=(
"Directory or glob to exclude (passed as !PATTERN to ripgrep). "
"Can be supplied multiple times."
),
),
include_glob: List[str] = typer.Option(
[],
"--glob",
help=(
"Additional glob patterns to include (passed as --glob to ripgrep). "
"Can be supplied multiple times."
),
),
threads: int = typer.Option(
0,
"--threads",
help=(
"Override ripgrep's thread count (passed as --threads). "
"0 = let ripgrep decide."
),
),
) -> None:
"""
Run a fast, single-pass lexical search and emit compact JSON grouped
by file and line ranges.
This function is intentionally *policy-light*:
- It does NOT try to "plan" multi-step searches.
- It does NOT perform semantic or graph-based reranking.
- It does only a simple mechanical sort by total match count.
Callers (LLM agents, CLIs, MCP servers) are expected to handle:
- search planning (which patterns, which roots),
- higher-level reranking (embeddings, graphs, task context),
- and decisions about what to actually load into model context.
"""
effective_max_files: int | None = None if no_limit else max_files
cmd = _build_rg_command(
root=root,
patterns=patterns,
include_hidden=include_hidden,
exclude=exclude,
include_glob=include_glob,
threads=threads,
)
matches = _collect_matches(cmd)
result = _build_result_json(
root=root,
patterns=patterns,
matches=matches,
max_files=effective_max_files,
context=context,
)
json.dump(result, sys.stdout, indent=2)
sys.stdout.write("\n")
def main() -> None:
app()
if __name__ == "__main__":
main()
@grahama1970
Copy link
Author

grahama1970 commented Nov 29, 2025

Gemini_Generated_Image_2nhwm92nhwm92nhw

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment