Skip to content

Instantly share code, notes, and snippets.

@evalstate
Last active January 19, 2026 10:45
Show Gist options
  • Select an option

  • Save evalstate/658963ebdbacbad73df929bfed8ceb5c to your computer and use it in GitHub Desktop.

Select an option

Save evalstate/658963ebdbacbad73df929bfed8ceb5c to your computer and use it in GitHub Desktop.
name shell agents function_tools tool_hooks
dev
true
ripgrep_search
multilspy_tools.py:lsp_hover
multilspy_tools.py:lsp_definition
multilspy_tools.py:lsp_references
multilspy_tools.py:lsp_document_symbols
multilspy_tools.py:lsp_workspace_symbols
multilspy_tools.py:lsp_diagnostics
before_llm_call
dev_hooks.py:before_llm_call

(your system prompt here etc)

You are a helpful AI Agent.

{{serverInstructions}} {{agentSkills}} {{file_silent:AGENTS.md}} {{env}}

The current date is {{currentDate}}.

"""Function tools for ty-backed MultiLSPy queries."""
from __future__ import annotations
import asyncio
import json
import logging
import os
from contextlib import AsyncExitStack, asynccontextmanager
from pathlib import Path
from shutil import which
from typing import Any, AsyncIterator
from urllib.parse import urlparse
from multilspy.language_server import LanguageServer
from multilspy.lsp_protocol_handler.server import ProcessLaunchInfo
from multilspy.multilspy_config import Language, MultilspyConfig
from multilspy.multilspy_exceptions import MultilspyException
from multilspy.multilspy_logger import MultilspyLogger
_REPO_ROOT = Path(__file__).resolve().parents[2]
_ALLOWED_DIRS = {"src", "tests", "test", "examples"}
_server_lock = asyncio.Lock()
_server_stack: AsyncExitStack | None = None
_server: "TyServer" | None = None
class TyServer(LanguageServer):
"""Language server wrapper for ty language server."""
def __init__(self, config: MultilspyConfig, logger: MultilspyLogger, repository_root_path: str):
ty_cmd = _resolve_ty_cmd()
super().__init__(
config,
logger,
repository_root_path,
ProcessLaunchInfo(cmd=ty_cmd, cwd=repository_root_path),
"python",
)
self.diagnostics: dict[str, list[dict[str, Any]]] = {}
def _get_initialize_params(self, repository_absolute_path: str) -> dict[str, Any]:
root_uri = Path(repository_absolute_path).as_uri()
return {
"processId": os.getpid(),
"rootPath": repository_absolute_path,
"rootUri": root_uri,
"workspaceFolders": [
{
"uri": root_uri,
"name": Path(repository_absolute_path).name,
}
],
"capabilities": {
"workspace": {"workspaceFolders": True},
"textDocument": {"hover": {"contentFormat": ["markdown", "plaintext"]}},
},
}
@asynccontextmanager
async def start_server(self) -> "AsyncIterator[TyServer]":
async def do_nothing(params: Any) -> None:
return None
async def window_log_message(msg: Any) -> None:
self.logger.log(f"LSP: window/logMessage: {msg}", logging.INFO)
async def publish_diagnostics(params: dict[str, Any]) -> None:
uri = params.get("uri")
if not uri:
return
self.diagnostics[uri] = params.get("diagnostics", [])
self.server.on_notification("window/logMessage", window_log_message)
self.server.on_request("workspace/executeClientCommand", do_nothing)
self.server.on_notification("$/progress", do_nothing)
self.server.on_notification("textDocument/publishDiagnostics", publish_diagnostics)
async with super().start_server():
self.logger.log("Starting ty language server process", logging.INFO)
await self.server.start()
initialize_params = self._get_initialize_params(self.repository_root_path)
self.logger.log(
"Sending initialize request from LSP client to ty language server", logging.INFO
)
await self.server.send.initialize(initialize_params)
self.server.notify.initialized({})
yield self
await self.server.shutdown()
await self.server.stop()
def _resolve_ty_cmd() -> str:
executable = which("ty")
if executable is None:
raise MultilspyException(
"ty is not available on PATH. Install the 'ty' package."
)
return f"{executable} server"
def _resolve_relative_path(file_path: str) -> str:
path = Path(file_path)
if not path.is_absolute():
path = (_REPO_ROOT / path).resolve()
else:
path = path.resolve()
try:
relative_path = path.relative_to(_REPO_ROOT)
except ValueError as exc: # pragma: no cover - defensive guard
raise ValueError("Path is outside the repository root.") from exc
if not relative_path.parts:
raise ValueError("Path must point to a file within the repository.")
if relative_path.parts[0] not in _ALLOWED_DIRS:
allowed = ", ".join(sorted(_ALLOWED_DIRS))
raise ValueError(f"Path must live under {allowed}.")
if not path.exists():
raise ValueError(f"File not found: {path}")
return str(relative_path)
async def _ensure_server() -> TyServer:
global _server_stack, _server
if _server is not None and _server.server_started:
return _server
async with _server_lock:
if _server is not None and _server.server_started:
return _server
config = MultilspyConfig(code_language=Language.PYTHON)
logger = MultilspyLogger()
server = TyServer(config, logger, str(_REPO_ROOT))
stack = AsyncExitStack()
await stack.enter_async_context(server.start_server())
_server = server
_server_stack = stack
return server
def _format_range(range_data: dict[str, Any] | None) -> str:
if not range_data:
return ""
start = range_data.get("start", {})
line = start.get("line")
character = start.get("character")
if line is None or character is None:
return ""
return f"{line + 1}:{character + 1}"
def _uri_to_relative(uri: str | None) -> str:
if not uri:
return ""
if uri.startswith("file:"):
parsed = urlparse(uri)
path = Path(parsed.path)
try:
return str(path.relative_to(_REPO_ROOT))
except ValueError:
return str(path)
return uri
def _format_locations(locations: list[dict[str, Any]]) -> str:
if not locations:
return "No locations returned."
lines = ["| path | line |", "| --- | --- |"]
for location in locations:
path = (
location.get("relativePath")
or location.get("absolutePath")
or _uri_to_relative(location.get("uri"))
)
line = _format_range(location.get("range"))
lines.append(f"| {path} | {line} |")
return "\n".join(lines)
def _format_hover_contents(contents: Any) -> str:
if contents is None:
return "No hover contents returned."
if isinstance(contents, str):
return contents
if isinstance(contents, list):
return "\n\n".join(_format_hover_contents(item) for item in contents)
if isinstance(contents, dict):
value = contents.get("value")
if isinstance(value, str):
return value
return json.dumps(contents, indent=2)
return str(contents)
def _format_symbol_kind(kind: Any) -> str:
if not isinstance(kind, int):
return str(kind or "")
return {
1: "File",
2: "Module",
3: "Namespace",
4: "Package",
5: "Class",
6: "Method",
7: "Property",
8: "Field",
9: "Constructor",
10: "Enum",
11: "Interface",
12: "Function",
13: "Variable",
14: "Constant",
15: "String",
16: "Number",
17: "Boolean",
18: "Array",
19: "Object",
20: "Key",
21: "Null",
22: "EnumMember",
23: "Struct",
24: "Event",
25: "Operator",
26: "TypeParameter",
}.get(kind, str(kind))
def _format_symbols(symbols: list[dict[str, Any]], default_path: str | None = None) -> str:
if not symbols:
return "No symbols returned."
lines = ["| name | kind | location | detail |", "| --- | --- | --- | --- |"]
for symbol in symbols:
location = symbol.get("location") or {}
path = (
location.get("relativePath")
or location.get("absolutePath")
or _uri_to_relative(location.get("uri"))
or ""
)
if not path and default_path:
path = default_path
range_data = location.get("range") or symbol.get("range") or symbol.get("selectionRange")
line = _format_range(range_data)
if path and line:
location_display = f"{path} ({line})"
else:
location_display = path
lines.append(
"| {name} | {kind} | {location} | {detail} |".format(
name=symbol.get("name", ""),
kind=_format_symbol_kind(symbol.get("kind")),
location=location_display,
detail=symbol.get("detail", "") or "",
)
)
return "\n".join(lines)
async def lsp_hover(file_path: str, line: int, character: int) -> str:
"""Return hover information for a symbol at the given location."""
try:
relative_path = _resolve_relative_path(file_path)
server = await _ensure_server()
hover = await server.request_hover(relative_path, line, character)
if not hover:
return "No hover information returned."
contents = hover.get("contents")
return _format_hover_contents(contents)
except (ValueError, MultilspyException) as exc:
return f"Error: {exc}"
async def lsp_definition(file_path: str, line: int, character: int) -> str:
"""Return definition locations for a symbol at the given location."""
try:
relative_path = _resolve_relative_path(file_path)
server = await _ensure_server()
locations = await server.request_definition(relative_path, line, character)
if not locations:
return "No locations returned."
return _format_locations([dict(location) for location in locations])
except (ValueError, MultilspyException) as exc:
message = str(exc)
if "Unexpected response from Language Server" in message:
return "No locations returned."
return f"Error: {exc}"
except Exception as exc: # pragma: no cover - defensive guard
message = str(exc)
if "Unexpected response from Language Server" in message:
return "No locations returned."
return f"Error: {exc}"
async def lsp_references(file_path: str, line: int, character: int) -> str:
"""Return reference locations for a symbol at the given location."""
try:
relative_path = _resolve_relative_path(file_path)
server = await _ensure_server()
locations = await server.request_references(relative_path, line, character)
if not locations:
return "No locations returned."
return _format_locations([dict(location) for location in locations])
except (ValueError, MultilspyException) as exc:
message = str(exc)
if "Unexpected response from Language Server" in message:
return "No locations returned."
return f"Error: {exc}"
except Exception as exc: # pragma: no cover - defensive guard
message = str(exc)
if "Unexpected response from Language Server" in message:
return "No locations returned."
return f"Error: {exc}"
async def lsp_document_symbols(file_path: str) -> str:
"""Return document symbols for a file."""
try:
relative_path = _resolve_relative_path(file_path)
server = await _ensure_server()
symbols, _ = await server.request_document_symbols(relative_path)
return _format_symbols([dict(symbol) for symbol in symbols], default_path=relative_path)
except (ValueError, MultilspyException) as exc:
return f"Error: {exc}"
async def lsp_workspace_symbols(query: str) -> str:
"""Return workspace symbols matching a query string."""
try:
server = await _ensure_server()
symbols = await server.request_workspace_symbol(query)
if symbols is None:
return "No symbols returned."
return _format_symbols([dict(symbol) for symbol in symbols])
except (ValueError, MultilspyException) as exc:
return f"Error: {exc}"
async def lsp_diagnostics(file_path: str | None = None) -> str:
"""Return cached diagnostics from ty server."""
try:
server = await _ensure_server()
if file_path is None:
diagnostics = server.diagnostics
else:
relative_path = _resolve_relative_path(file_path)
uri = Path(_REPO_ROOT / relative_path).as_uri()
diagnostics = {uri: server.diagnostics.get(uri, [])}
if not diagnostics:
return "No diagnostics cached."
return json.dumps(diagnostics, indent=2)
except (ValueError, MultilspyException) as exc:
return f"Error: {exc}"
"""Function tools for typescript-language-server-backed MultiLSPy queries."""
from __future__ import annotations
import asyncio
import json
import logging
import os
from contextlib import AsyncExitStack, asynccontextmanager
from pathlib import Path
from shutil import which
from typing import Any, AsyncIterator
from urllib.parse import urlparse
from multilspy.language_server import LanguageServer
from multilspy.lsp_protocol_handler.server import ProcessLaunchInfo
from multilspy.multilspy_config import Language, MultilspyConfig
from multilspy.multilspy_exceptions import MultilspyException
from multilspy.multilspy_logger import MultilspyLogger
_REPO_ROOT = Path(__file__).resolve().parents[2]
_ALLOWED_DIRS = {"investigate-openai-int"}
_server_lock = asyncio.Lock()
_server_stack: AsyncExitStack | None = None
_server: "TypeScriptServer" | None = None
class TypeScriptServer(LanguageServer):
"""Language server wrapper for typescript-language-server."""
def __init__(self, config: MultilspyConfig, logger: MultilspyLogger, repository_root_path: str):
ts_cmd = _resolve_typescript_server_cmd()
super().__init__(
config,
logger,
repository_root_path,
ProcessLaunchInfo(cmd=ts_cmd, cwd=repository_root_path),
"typescript",
)
self.diagnostics: dict[str, list[dict[str, Any]]] = {}
def _get_initialize_params(self, repository_absolute_path: str) -> dict[str, Any]:
root_uri = Path(repository_absolute_path).as_uri()
return {
"processId": os.getpid(),
"rootPath": repository_absolute_path,
"rootUri": root_uri,
"workspaceFolders": [
{
"uri": root_uri,
"name": Path(repository_absolute_path).name,
}
],
"capabilities": {
"workspace": {"workspaceFolders": True},
"textDocument": {"hover": {"contentFormat": ["markdown", "plaintext"]}},
},
}
@asynccontextmanager
async def start_server(self) -> "AsyncIterator[TypeScriptServer]":
async def do_nothing(params: Any) -> None:
return None
async def window_log_message(msg: Any) -> None:
self.logger.log(f"LSP: window/logMessage: {msg}", logging.INFO)
async def publish_diagnostics(params: dict[str, Any]) -> None:
uri = params.get("uri")
if not uri:
return
self.diagnostics[uri] = params.get("diagnostics", [])
self.server.on_notification("window/logMessage", window_log_message)
self.server.on_request("workspace/executeClientCommand", do_nothing)
self.server.on_notification("$/progress", do_nothing)
self.server.on_notification("textDocument/publishDiagnostics", publish_diagnostics)
async with super().start_server():
self.logger.log("Starting typescript-language-server process", logging.INFO)
await self.server.start()
initialize_params = self._get_initialize_params(self.repository_root_path)
self.logger.log(
"Sending initialize request from LSP client to typescript-language-server",
logging.INFO,
)
await self.server.send.initialize(initialize_params)
self.server.notify.initialized({})
yield self
await self.server.shutdown()
await self.server.stop()
def _resolve_typescript_server_cmd() -> str:
executable = which("typescript-language-server")
if executable is None:
raise MultilspyException(
"typescript-language-server is not available on PATH. Install it via npm." # noqa: EM102
)
return f"{executable} --stdio"
def _resolve_relative_path(file_path: str) -> str:
path = Path(file_path)
if not path.is_absolute():
path = (_REPO_ROOT / path).resolve()
else:
path = path.resolve()
try:
relative_path = path.relative_to(_REPO_ROOT)
except ValueError as exc: # pragma: no cover - defensive guard
raise ValueError("Path is outside the repository root.") from exc
if not relative_path.parts:
raise ValueError("Path must point to a file within the repository.")
if relative_path.parts[0] not in _ALLOWED_DIRS:
allowed = ", ".join(sorted(_ALLOWED_DIRS))
raise ValueError(f"Path must live under {allowed}.")
if not path.exists():
raise ValueError(f"File not found: {path}")
return str(relative_path)
async def _ensure_server() -> TypeScriptServer:
global _server_stack, _server
if _server is not None and _server.server_started:
return _server
async with _server_lock:
if _server is not None and _server.server_started:
return _server
config = MultilspyConfig(code_language=Language.TYPESCRIPT)
logger = MultilspyLogger()
server = TypeScriptServer(config, logger, str(_REPO_ROOT))
stack = AsyncExitStack()
await stack.enter_async_context(server.start_server())
_server = server
_server_stack = stack
return server
def _format_range(range_data: dict[str, Any] | None) -> str:
if not range_data:
return ""
start = range_data.get("start", {})
line = start.get("line")
character = start.get("character")
if line is None or character is None:
return ""
return f"{line + 1}:{character + 1}"
def _uri_to_relative(uri: str | None) -> str:
if not uri:
return ""
if uri.startswith("file:"):
parsed = urlparse(uri)
path = Path(parsed.path)
try:
return str(path.relative_to(_REPO_ROOT))
except ValueError:
return str(path)
return uri
def _format_locations(locations: list[dict[str, Any]]) -> str:
if not locations:
return "No locations returned."
lines = ["| path | line |", "| --- | --- |"]
for location in locations:
path = (
location.get("relativePath")
or location.get("absolutePath")
or _uri_to_relative(location.get("uri"))
)
line = _format_range(location.get("range"))
lines.append(f"| {path} | {line} |")
return "\n".join(lines)
def _format_hover_contents(contents: Any) -> str:
if contents is None:
return "No hover contents returned."
if isinstance(contents, str):
return contents
if isinstance(contents, list):
return "\n\n".join(_format_hover_contents(item) for item in contents)
if isinstance(contents, dict):
value = contents.get("value")
if isinstance(value, str):
return value
return json.dumps(contents, indent=2)
return str(contents)
def _format_symbol_kind(kind: Any) -> str:
if not isinstance(kind, int):
return str(kind or "")
return {
1: "File",
2: "Module",
3: "Namespace",
4: "Package",
5: "Class",
6: "Method",
7: "Property",
8: "Field",
9: "Constructor",
10: "Enum",
11: "Interface",
12: "Function",
13: "Variable",
14: "Constant",
15: "String",
16: "Number",
17: "Boolean",
18: "Array",
19: "Object",
20: "Key",
21: "Null",
22: "EnumMember",
23: "Struct",
24: "Event",
25: "Operator",
26: "TypeParameter",
}.get(kind, str(kind))
def _format_symbols(symbols: list[dict[str, Any]], default_path: str | None = None) -> str:
if not symbols:
return "No symbols returned."
lines = ["| name | kind | location | detail |", "| --- | --- | --- | --- |"]
for symbol in symbols:
location = symbol.get("location") or {}
path = (
location.get("relativePath")
or location.get("absolutePath")
or _uri_to_relative(location.get("uri"))
or ""
)
if not path and default_path:
path = default_path
range_data = location.get("range") or symbol.get("range") or symbol.get("selectionRange")
line = _format_range(range_data)
if path and line:
location_display = f"{path} ({line})"
else:
location_display = path
lines.append(
"| {name} | {kind} | {location} | {detail} |".format(
name=symbol.get("name", ""),
kind=_format_symbol_kind(symbol.get("kind")),
location=location_display,
detail=symbol.get("detail", "") or "",
)
)
return "\n".join(lines)
async def lsp_hover(file_path: str, line: int, character: int) -> str:
"""Return hover information for a symbol at the given location."""
try:
relative_path = _resolve_relative_path(file_path)
server = await _ensure_server()
hover = await server.request_hover(relative_path, line, character)
if not hover:
return "No hover information returned."
contents = hover.get("contents")
return _format_hover_contents(contents)
except (ValueError, MultilspyException) as exc:
return f"Error: {exc}"
async def lsp_definition(file_path: str, line: int, character: int) -> str:
"""Return definition locations for a symbol at the given location."""
try:
relative_path = _resolve_relative_path(file_path)
server = await _ensure_server()
locations = await server.request_definition(relative_path, line, character)
if not locations:
return "No locations returned."
return _format_locations([dict(location) for location in locations])
except (ValueError, MultilspyException) as exc:
message = str(exc)
if "Unexpected response from Language Server" in message:
return "No locations returned."
return f"Error: {exc}"
except Exception as exc: # pragma: no cover - defensive guard
message = str(exc)
if "Unexpected response from Language Server" in message:
return "No locations returned."
return f"Error: {exc}"
async def lsp_references(file_path: str, line: int, character: int) -> str:
"""Return reference locations for a symbol at the given location."""
try:
relative_path = _resolve_relative_path(file_path)
server = await _ensure_server()
locations = await server.request_references(relative_path, line, character)
if not locations:
return "No locations returned."
return _format_locations([dict(location) for location in locations])
except (ValueError, MultilspyException) as exc:
message = str(exc)
if "Unexpected response from Language Server" in message:
return "No locations returned."
return f"Error: {exc}"
except Exception as exc: # pragma: no cover - defensive guard
message = str(exc)
if "Unexpected response from Language Server" in message:
return "No locations returned."
return f"Error: {exc}"
async def lsp_document_symbols(file_path: str) -> str:
"""Return document symbols for a file."""
try:
relative_path = _resolve_relative_path(file_path)
server = await _ensure_server()
symbols, _ = await server.request_document_symbols(relative_path)
return _format_symbols([dict(symbol) for symbol in symbols], default_path=relative_path)
except (ValueError, MultilspyException) as exc:
return f"Error: {exc}"
async def lsp_workspace_symbols(query: str) -> str:
"""Return workspace symbols matching a query string."""
try:
server = await _ensure_server()
symbols = await server.request_workspace_symbol(query)
if symbols is None:
return "No symbols returned."
return _format_symbols([dict(symbol) for symbol in symbols])
except (ValueError, MultilspyException) as exc:
return f"Error: {exc}"
async def lsp_diagnostics(file_path: str | None = None) -> str:
"""Return cached diagnostics from typescript-language-server."""
try:
server = await _ensure_server()
if file_path is None:
diagnostics = server.diagnostics
else:
relative_path = _resolve_relative_path(file_path)
uri = Path(_REPO_ROOT / relative_path).as_uri()
diagnostics = {uri: server.diagnostics.get(uri, [])}
if not diagnostics:
return "No diagnostics cached."
return json.dumps(diagnostics, indent=2)
except (ValueError, MultilspyException) as exc:
return f"Error: {exc}"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment