Skip to content

Instantly share code, notes, and snippets.

@meepak
Last active March 6, 2026 08:08
Show Gist options
  • Select an option

  • Save meepak/b35e3d54e936604822c1d98945ff5a83 to your computer and use it in GitHub Desktop.

Select an option

Save meepak/b35e3d54e936604822c1d98945ff5a83 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Local Autonomous DeepSeek Sysadmin
Runs commands directly on the local machine with real-time output streaming
"""
import json
import time
import subprocess
import select
import os
import sys
import threading
import queue
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime
from openai import OpenAI
# ============================================================
# HARD-CODE YOUR API KEY HERE
# ============================================================
DEEPSEEK_API_KEY = "sk-.........MA"
# ============================================================
client = OpenAI(
api_key=DEEPSEEK_API_KEY,
base_url="https://api.deepseek.com/v1" # DeepSeek API endpoint
)
# ---------------- COLORS (ANSI) ----------------
# Disable colors if output is not a terminal
if not sys.stdout.isatty():
RESET = COL_SEP = COL_THOUGHT = COL_CMD = COL_INFO = ""
COL_WARN = COL_STDOUT_LABEL = COL_STDERR_LABEL = COL_EXIT_OK = ""
COL_EXIT_BAD = COL_INPUT = COL_RISK = COL_TIMESTAMP = ""
else:
RESET = "\033[0m"
COL_SEP = "\033[95m" # magenta
COL_THOUGHT = "\033[96m" # bright cyan
COL_CMD = "\033[93m" # bright yellow
COL_INFO = "\033[94m" # blue
COL_WARN = "\033[91m" # red
COL_STDOUT_LABEL = "\033[92m" # green
COL_STDERR_LABEL = "\033[91m" # red
COL_EXIT_OK = "\033[92m" # green
COL_EXIT_BAD = "\033[91m" # red
COL_INPUT = "\033[90m" # dim gray
COL_RISK = "\033[33m" # yellow-ish
COL_TIMESTAMP = "\033[90m" # dim gray for timestamps
# ---------------- DEEPSEEK CALL ----------------
def call_deepseek(messages: List[Dict[str, str]]) -> Dict[str, Any]:
"""
Call DeepSeek API with the given messages.
Uses the chat completions endpoint with DeepSeek's preferred model.
"""
try:
response = client.chat.completions.create(
model="deepseek-chat", # DeepSeek's main model
messages=messages,
max_tokens=600,
temperature=0.2,
response_format={"type": "json_object"} # Enforce JSON output
)
# Extract the response content
text = response.choices[0].message.content.strip()
# Handle possible ```json ... ``` wrapping
if text.startswith("```"):
text = text.strip("`")
if text.startswith("json"):
text = text[4:].lstrip()
# Parse JSON
try:
return json.loads(text)
except json.JSONDecodeError as e:
# If JSON parsing fails, try to extract JSON from the text
import re
json_match = re.search(r'\{.*\}', text, re.DOTALL)
if json_match:
return json.loads(json_match.group())
else:
raise RuntimeError(f"Model did not return valid JSON: {text[:200]}...")
except Exception as e:
raise RuntimeError(f"DeepSeek API call failed: {str(e)}")
# ---------------- PROMPT DETECTION ----------------
def is_password_prompt(line: str, username: Optional[str] = None) -> bool:
"""
Detect lines that are likely asking specifically for a password.
We handle these automatically with the stored password.
"""
lower = line.lower().strip()
if "password" not in lower and "passphrase" not in lower:
return False
# Typical sudo prompts
if "sudo" in lower:
return True
if username and username.lower() in lower:
return True
if lower.endswith(":"):
return True
return True
def is_interactive_prompt(line: str) -> bool:
"""
Detect lines that are likely asking for interactive *non-password* input:
confirmations, choices, etc.
"""
lower = line.lower().rstrip()
if not lower:
return False
# Don't treat password prompts as generic interactive prompts
if "password" in lower or "passphrase" in lower:
return False
looks_like_prompt = (
lower.endswith(":")
or lower.endswith("?")
or "[y/n]" in lower
or "[y/n]" in lower.replace(" ", "")
or "[Y/n]" in lower
or "[y/N]" in lower
)
if not looks_like_prompt:
return False
keywords = [
"do you want to continue",
"continue?",
"press enter",
"press return",
"enter choice",
"enter selection",
"enter new",
"are you sure",
"proceed?",
"confirm",
"choose",
"select",
]
return any(k in lower for k in keywords)
# ---------------- COMMAND SANITISER ----------------
def make_noninteractive(command: str) -> str:
"""
Rewrite certain commands to be non-interactive.
- For any 'systemctl' invocation, ensure '--no-pager' is present.
"""
parts = command.strip().split()
if not parts:
return command
# Find 'systemctl' in the token list
try:
idx = parts.index("systemctl")
except ValueError:
return command # nothing to change
# If '--no-pager' is not already present, insert it after 'systemctl'
if "--no-pager" not in parts[idx + 1:]:
parts.insert(idx + 1, "--no-pager")
return " ".join(parts)
def make_apt_noninteractive(cmd: str) -> str:
"""Add non-interactive flags to apt commands"""
if "apt-get" in cmd or " apt " in cmd or cmd.startswith("apt "):
# prepend DEBIAN_FRONTEND if not present
if "DEBIAN_FRONTEND=" not in cmd:
cmd = "DEBIAN_FRONTEND=noninteractive " + cmd
# ensure -y is present for apt-get/apt install
if ("apt-get" in cmd or " apt " in cmd) and " install " in cmd and " -y" not in cmd:
cmd = cmd.replace(" install ", " install -y ")
# ensure -qq for less output during updates
if "apt-get" in cmd and " -qq" not in cmd and " update" in cmd:
cmd = cmd.replace("apt-get", "apt-get -qq")
return cmd
# ---------------- LOCAL EXECUTOR ----------------
class LocalExecutor:
"""
Executes commands directly on the local machine with real-time output streaming.
Handles interactive prompts, sudo passwords, and timeouts.
"""
def __init__(self, sudo_password: Optional[str] = None):
self.sudo_password = sudo_password
self.username = os.environ.get("USER", os.environ.get("USERNAME", "unknown"))
self.process = None
self.stdout_thread = None
self.stderr_thread = None
self.running = False
self.stdout_lines = []
self.stderr_lines = []
self.password_sent = False
def connect(self):
"""No-op for local execution"""
pass
def close(self):
"""Clean up any running processes"""
self.running = False
if self.process and self.process.poll() is None:
try:
self.process.terminate()
time.sleep(0.5)
if self.process.poll() is None:
self.process.kill()
except:
pass
def _send_password(self):
"""Send sudo password to the process"""
if not self.password_sent and self.process and self.process.stdin and not self.process.stdin.closed:
try:
self.process.stdin.write(self.sudo_password + "\n")
self.process.stdin.flush()
self.password_sent = True
print(f"{COL_INFO}[AUTO] Sent sudo password{RESET}")
except (BrokenPipeError, OSError):
pass
def _handle_prompt(self, line: str):
"""Handle interactive prompts by asking the user"""
if self.process and self.process.stdin and not self.process.stdin.closed:
try:
user_val = input(f"{COL_INPUT}[INPUT REQUIRED]{RESET} {line.strip()} ")
self.process.stdin.write(user_val + "\n")
self.process.stdin.flush()
except (BrokenPipeError, OSError):
pass
def _reader_thread(self, pipe, is_stderr=False):
"""Thread function to continuously read from a pipe"""
try:
for line in iter(pipe.readline, ''):
if line:
if is_stderr:
self.stderr_lines.append(line)
# Print immediately with label
if not hasattr(self, 'stderr_started'):
self.stderr_started = True
print(f"{COL_STDERR_LABEL}[STDERR]{RESET} ", end="")
print(line, end="", flush=True)
# Check for prompts
if is_password_prompt(line, self.username) and self.sudo_password:
self._send_password()
elif is_interactive_prompt(line):
self._handle_prompt(line)
else:
self.stdout_lines.append(line)
# Print immediately with label
if not hasattr(self, 'stdout_started'):
self.stdout_started = True
print(f"{COL_STDOUT_LABEL}[STDOUT]{RESET} ", end="")
print(line, end="", flush=True)
# Check for prompts
if is_password_prompt(line, self.username) and self.sudo_password:
self._send_password()
elif is_interactive_prompt(line):
self._handle_prompt(line)
else:
break
except (IOError, OSError, ValueError) as e:
# Pipe was closed or other error
pass
finally:
try:
pipe.close()
except:
pass
def run(self, command: str, idle_timeout: int = 300) -> Dict[str, Any]:
"""
Run a command locally with real-time output streaming.
Args:
command: The command to execute
idle_timeout: Seconds of no output before killing the process
Returns:
Dict with stdout, stderr, exit_status, timed_out, idle_timeout
"""
# Reset state
self.stdout_lines = []
self.stderr_lines = []
self.password_sent = False
self.stdout_started = False
self.stderr_started = False
# Sanitize the command
cmd_stripped = command.strip()
if not cmd_stripped:
return {
"stdout": "",
"stderr": "Empty command",
"exit_status": -1,
"timed_out": False,
"idle_timeout": idle_timeout
}
cmd_sanitized = make_noninteractive(cmd_stripped)
cmd_sanitized = make_apt_noninteractive(cmd_sanitized)
uses_sudo = cmd_sanitized.startswith("sudo ")
# For sudo commands, ensure we can feed password
effective_cmd = cmd_sanitized
if uses_sudo and self.sudo_password:
# Add -S to read password from stdin and -k to ignore cached credentials
parts = cmd_sanitized.split()
if len(parts) >= 1 and parts[0] == "sudo":
# Add -S and -k flags if not present
flags_to_add = []
if "-S" not in parts[1:3]: # Check first few args
flags_to_add.append("-S")
if "-k" not in parts[1:3]:
flags_to_add.append("-k")
if flags_to_add:
# Insert flags after 'sudo'
for flag in reversed(flags_to_add):
parts.insert(1, flag)
effective_cmd = " ".join(parts)
print(f"{COL_CMD}[EXECUTING]{RESET} {effective_cmd}")
# Start the process with pipes for real-time output
try:
self.process = subprocess.Popen(
effective_cmd,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1, # Line buffered
universal_newlines=True
)
except Exception as e:
return {
"stdout": "",
"stderr": f"Failed to start process: {str(e)}",
"exit_status": -1,
"timed_out": False,
"idle_timeout": idle_timeout
}
# Start reader threads for continuous output
self.stdout_thread = threading.Thread(
target=self._reader_thread,
args=(self.process.stdout, False)
)
self.stderr_thread = threading.Thread(
target=self._reader_thread,
args=(self.process.stderr, True)
)
self.stdout_thread.daemon = True
self.stderr_thread.daemon = True
self.stdout_thread.start()
self.stderr_thread.start()
# Send initial sudo password if needed
if uses_sudo and self.sudo_password:
self._send_password()
# Monitor process with timeout
timed_out = False
last_output_time = time.time()
# Main monitoring loop
while True:
# Check if process has finished
exit_code = self.process.poll()
# Update last output time based on new lines
if self.stdout_lines or self.stderr_lines:
# Check the last few lines to see if any are new
current_stdout_len = len(self.stdout_lines)
current_stderr_len = len(self.stderr_lines)
# Simple heuristic - if we have lines, there was output
if current_stdout_len > 0 or current_stderr_len > 0:
last_output_time = time.time()
# Check for idle timeout (only if process is still running)
if exit_code is None: # Process still running
if time.time() - last_output_time > idle_timeout:
timed_out = True
timeout_msg = f"\n[CLIENT NOTICE] No output for {idle_timeout} seconds. Killing process.\n"
print(f"{COL_WARN}{timeout_msg}{RESET}", end="", flush=True)
self.stderr_lines.append(timeout_msg)
# Kill the process
try:
self.process.terminate()
time.sleep(0.5)
if self.process.poll() is None:
self.process.kill()
except Exception:
pass
break
else:
# Process finished, wait a moment for threads to finish reading
time.sleep(0.2)
break
# Brief sleep to prevent CPU spinning
time.sleep(0.1)
# Wait for reader threads to finish (with timeout)
if self.stdout_thread and self.stdout_thread.is_alive():
self.stdout_thread.join(timeout=1)
if self.stderr_thread and self.stderr_thread.is_alive():
self.stderr_thread.join(timeout=1)
# Close stdin to avoid broken pipe errors
if self.process.stdin and not self.process.stdin.closed:
try:
self.process.stdin.close()
except Exception:
pass
# Get final exit status
if timed_out:
exit_status = -1
else:
exit_status = self.process.returncode if self.process.returncode is not None else -1
# Combine all output
stdout = "".join(self.stdout_lines)
stderr = "".join(self.stderr_lines)
# Check for common sudo errors
if uses_sudo and "not in the sudoers file" in stderr:
print(f"\n{COL_WARN}[SUDO ERROR]{RESET} User '{self.username}' is not allowed to use sudo.")
elif uses_sudo and "no password was provided" in stderr.lower():
print(f"\n{COL_WARN}[SUDO ERROR]{RESET} Sudo password incorrect or not provided.")
return {
"stdout": stdout,
"stderr": stderr,
"exit_status": exit_status,
"timed_out": timed_out,
"idle_timeout": idle_timeout
}
# ---------------- SYSTEM PROMPT ----------------
SYSTEM_PROMPT = """
You are an autonomous Linux sysadmin assistant running directly on the local machine.
You MUST respond with STRICT JSON only.
IMPORTANT RULES ABOUT OUTPUT:
- Your ENTIRE reply MUST be exactly ONE JSON object.
- Do NOT include multiple JSON objects.
- Do NOT include any extra text, comments, or markdown.
- Do NOT wrap the JSON in ``` fences.
INTERACTION MODEL:
- You can propose commands to be executed directly on this machine.
- The client runs your commands and streams back stdout/stderr in real-time.
- The client supports answering line-based prompts (like 'Do you want to continue?'),
but it CANNOT reliably drive interactive TUIs or full-screen console menus.
ABSOLUTE RESTRICTIONS:
- NEVER use interactive console/TUI tools like: whiptail, dialog, nmtui, top, htop,
alsamixer, raspi-config, menuconfig, or dpkg-reconfigure without a fully
non-interactive mode.
- NEVER rely on curses-style full-screen UIs, wizard menus, or arrow-key navigation.
- ALWAYS use non-interactive configuration methods:
- edit config files directly with sed/awk/cat/tee,
- or use tools/flags that are specifically documented as non-interactive.
APT / PACKAGE MANAGEMENT:
- When using apt-get/apt:
- Always include: `DEBIAN_FRONTEND=noninteractive` in the environment.
- Use: `apt-get -y` (assume yes to prompts).
Example:
DEBIAN_FRONTEND=noninteractive sudo apt-get update
DEBIAN_FRONTEND=noninteractive sudo apt-get -y install rkhunter
LONG-RUNNING COMMANDS:
- The client shows output in REAL TIME as it arrives from the command.
- If a command produces no output for the idle timeout period, it will be killed.
- Do NOT use the 'timeout' utility - the client handles this automatically.
COMMAND RESPONSE FORMAT:
At each step you MUST output JSON ONLY, in one of these forms:
1) To run a command:
{
"type": "command",
"thought": "<brief reasoning>",
"risk": "<brief risk description>",
"command": "<bash command>"
}
2) When finished:
{
"type": "done",
"thought": "<why>",
"summary": "<final result for the user>"
}
The "risk" field is REQUIRED for every command.
General behavior:
- Keep commands simple and robust.
- Prefer non-destructive checks first (ls, cat, grep, systemctl status).
- Avoid obviously destructive commands (rm -rf /, dropping entire databases, etc.)
unless the user explicitly requested such destructive action.
- Use previous command outputs to adjust your next step.
"""
def truncate(s: str, limit: int = 2000) -> str:
"""Truncate long strings for API calls"""
s = s or ""
if len(s) <= limit:
return s
return s[:limit] + f"\n...[truncated {len(s)-limit} chars]"
def compute_effective_idle_timeout(command: str, default_idle: int) -> int:
"""
If command starts with 'timeout N ...', use max(default_idle, N+30) as idle timeout,
so we don't kill it earlier than the command's own timeout.
"""
cmd = command.strip()
if not cmd.startswith("timeout "):
return default_idle
parts = cmd.split()
if len(parts) < 2:
return default_idle
try:
secs = int(parts[1])
return max(default_idle, secs + 30)
except ValueError:
return max(default_idle, 600)
def run_agent(goal: str, executor: LocalExecutor, max_steps: int = 50, idle_timeout: int = 300):
"""Main agent loop"""
history = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": f"GOAL: {goal}"},
]
allow_all = False # When True, auto-run all commands without confirmation
for step in range(1, max_steps + 1):
print(f"\n\n{COL_SEP}==================== STEP {step} ===================={RESET}\n")
history.append({"role": "user", "content": f"STEP {step}: Decide next action."})
try:
action = call_deepseek(history)
except Exception as e:
print(f"{COL_WARN}[ERROR]{RESET} Failed to get response from DeepSeek: {e}")
retry = input(f"{COL_INPUT}Retry? [y/n]: {RESET}").strip().lower()
if retry == 'y':
continue
else:
break
action_type = action.get("type")
thought = action.get("thought", "")
risk = action.get("risk", "").strip()
thought_text = thought or "(no thought provided)"
print(f"{COL_THOUGHT}[MODEL THOUGHT]{RESET} {COL_THOUGHT}{thought_text}{RESET}")
if not risk:
risk = "Risk not specified (assume minimal, but review command carefully)."
print(f"{COL_RISK}[RISK]{RESET} {COL_RISK}{risk}{RESET}")
if action_type == "done":
summary = action.get("summary", "") or "(no summary provided)"
print(f"{COL_INFO}[SUMMARY]{RESET} {COL_INFO}{summary}{RESET}")
# Ask if user wants to continue or finish
while True:
follow = input(f"\n{COL_INPUT}Task completed? Is there anything else? [y/n]: {RESET}").strip().lower()
if follow in ("y", "n"):
break
print(f"{COL_WARN}Please enter 'y' or 'n'.{RESET}")
if follow == "n":
print(f"\n{COL_INFO}[SESSION CLOSED]{RESET} Bye!")
break
else:
extra = input(f"{COL_INPUT}What else would you like me to do? {RESET}").strip()
if not extra:
print(f"\n{COL_INFO}[SESSION CLOSED]{RESET} Bye!")
break
history.append({
"role": "user",
"content": f"New request: {extra}\n\nPlease continue from the current state."
})
continue
command = action.get("command", "").strip()
if not command:
print(f"{COL_WARN}[WARN]{RESET} No command returned by model. Aborting.")
print(f"\n{COL_INFO}[SESSION CLOSED]{RESET} Bye!")
break
print(f"{COL_CMD}[PROPOSED COMMAND]{RESET} {COL_CMD}{command}{RESET}")
# Guardrail: block known interactive TUI tools
TUI_BLOCKLIST = [
"whiptail", "dialog ", "nmtui", "raspi-config", "menuconfig",
"htop", "top ", "alsamixer", "dpkg-reconfigure", "vim", "nano",
"emacs", "less", "more", "man "
]
if any(bad in command for bad in TUI_BLOCKLIST) and not any(flag in command for flag in ["-y", "--no-pager", "-n"]):
print(f"{COL_WARN}[BLOCKED]{RESET} Command appears to use an interactive tool.")
if not allow_all:
force = input(f"{COL_INPUT}Force run anyway? [y/N]: {RESET}").strip().lower()
if force != 'y':
history.append({
"role": "user",
"content": f"Command '{command}' was blocked for being interactive. Please propose a non-interactive alternative."
})
continue
# Decide whether to prompt or auto-run
if not allow_all:
while True:
choice = input(f"\n{COL_INPUT}Allow this command? [y=run, s=skip, n=terminate, e=edit thought, a=allow all]: {RESET}").strip().lower()
if choice in ("y", "s", "n", "e", "a"):
break
print(f"{COL_WARN}Please enter 'y', 's', 'n', 'e', or 'a'.{RESET}")
if choice == "n":
print(f"\n{COL_INFO}[SESSION CLOSED]{RESET} Bye!")
break
if choice == "s":
history.append({
"role": "user",
"content": f"I refused to run this command:\n{command}\n\nRisk: {risk}\n\nPlease propose an alternative."
})
continue
if choice == "e":
user_edit = input(f"{COL_INPUT}Enter your guidance: {RESET}").strip()
if not user_edit:
user_edit = "Please propose a better approach."
history.append({
"role": "user",
"content": f"Instead of the previous command, follow this guidance:\n{user_edit}"
})
continue
if choice == "a":
allow_all = True
print(f"{COL_INFO}[INFO]{RESET} All future commands will be auto-run.")
# Execute the command with real-time output
effective_idle = compute_effective_idle_timeout(command, idle_timeout)
print(f"{COL_INFO}[INFO]{RESET} Executing with idle timeout = {effective_idle}s...\n")
# Add a small separator before command output
print(f"{COL_SEP}--- COMMAND OUTPUT ---{RESET}")
result = executor.run(command, idle_timeout=effective_idle)
# Show exit status
if result["timed_out"] or result["exit_status"] != 0:
col = COL_EXIT_BAD
else:
col = COL_EXIT_OK
print(f"\n{col}--- COMMAND COMPLETED ---{RESET}")
if result["timed_out"]:
print(f"{col}[EXIT STATUS]{RESET} TIMEOUT (idle timeout: {result['idle_timeout']}s)")
else:
print(f"{col}[EXIT STATUS]{RESET} {result['exit_status']}")
# Add result to history (truncated for API)
feedback = (
f"Command: {command}\n"
f"Exit status: {result['exit_status']}\n"
f"Timed out: {result['timed_out']}\n"
f"STDOUT:\n{truncate(result['stdout'])}\n\n"
f"STDERR:\n{truncate(result['stderr'])}\n"
)
history.append({"role": "user", "content": "RESULT:\n" + feedback})
def main():
"""Main entry point"""
import argparse
parser = argparse.ArgumentParser(description="Local Autonomous DeepSeek Sysadmin with Real-time Output")
parser.add_argument("--sudo-password", help="Sudo password (if needed)")
parser.add_argument("--goal", help="Task/goal for the agent to perform")
parser.add_argument("--idle-timeout", type=int, default=300, help="Idle timeout in seconds")
parser.add_argument("--max-steps", type=int, default=50, help="Maximum number of steps")
args = parser.parse_args()
print(f"{COL_SEP}=== Local Autonomous DeepSeek Sysadmin (Real-time Output) ==={RESET}\n")
print(f"{COL_INFO}Running on: {os.uname().nodename}{RESET}")
print(f"{COL_INFO}User: {os.environ.get('USER', os.environ.get('USERNAME', 'unknown'))}{RESET}\n")
# Get sudo password if needed
sudo_password = args.sudo_password
if sudo_password is None:
need_sudo = input(f"{COL_INPUT}Will you need sudo access? [y/N]: {RESET}").strip().lower()
if need_sudo == 'y':
import getpass
sudo_password = getpass.getpass(f"{COL_INPUT}Enter sudo password: {RESET}")
# Get goal
goal = args.goal
if not goal:
goal = input(f"{COL_INPUT}Enter goal (task to perform): {RESET}").strip()
if not goal:
print(f"{COL_WARN}No goal provided. Exiting.{RESET}")
return
# Create executor
executor = LocalExecutor(sudo_password=sudo_password)
try:
run_agent(goal, executor, max_steps=args.max_steps, idle_timeout=args.idle_timeout)
except KeyboardInterrupt:
print(f"\n\n{COL_WARN}[INTERRUPTED]{RESET} Session terminated by user.")
except Exception as e:
print(f"\n{COL_WARN}[ERROR]{RESET} {e}")
finally:
executor.close()
print(f"\n{COL_INFO}[SESSION ENDED]{RESET}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment