Last active
March 6, 2026 08:08
-
-
Save meepak/b35e3d54e936604822c1d98945ff5a83 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| Local Autonomous DeepSeek Sysadmin | |
| Runs commands directly on the local machine with real-time output streaming | |
| """ | |
| import json | |
| import time | |
| import subprocess | |
| import select | |
| import os | |
| import sys | |
| import threading | |
| import queue | |
| from typing import List, Dict, Any, Optional, Tuple | |
| from datetime import datetime | |
| from openai import OpenAI | |
| # ============================================================ | |
| # HARD-CODE YOUR API KEY HERE | |
| # ============================================================ | |
| DEEPSEEK_API_KEY = "sk-.........MA" | |
| # ============================================================ | |
| client = OpenAI( | |
| api_key=DEEPSEEK_API_KEY, | |
| base_url="https://api.deepseek.com/v1" # DeepSeek API endpoint | |
| ) | |
| # ---------------- COLORS (ANSI) ---------------- | |
| # Disable colors if output is not a terminal | |
| if not sys.stdout.isatty(): | |
| RESET = COL_SEP = COL_THOUGHT = COL_CMD = COL_INFO = "" | |
| COL_WARN = COL_STDOUT_LABEL = COL_STDERR_LABEL = COL_EXIT_OK = "" | |
| COL_EXIT_BAD = COL_INPUT = COL_RISK = COL_TIMESTAMP = "" | |
| else: | |
| RESET = "\033[0m" | |
| COL_SEP = "\033[95m" # magenta | |
| COL_THOUGHT = "\033[96m" # bright cyan | |
| COL_CMD = "\033[93m" # bright yellow | |
| COL_INFO = "\033[94m" # blue | |
| COL_WARN = "\033[91m" # red | |
| COL_STDOUT_LABEL = "\033[92m" # green | |
| COL_STDERR_LABEL = "\033[91m" # red | |
| COL_EXIT_OK = "\033[92m" # green | |
| COL_EXIT_BAD = "\033[91m" # red | |
| COL_INPUT = "\033[90m" # dim gray | |
| COL_RISK = "\033[33m" # yellow-ish | |
| COL_TIMESTAMP = "\033[90m" # dim gray for timestamps | |
| # ---------------- DEEPSEEK CALL ---------------- | |
| def call_deepseek(messages: List[Dict[str, str]]) -> Dict[str, Any]: | |
| """ | |
| Call DeepSeek API with the given messages. | |
| Uses the chat completions endpoint with DeepSeek's preferred model. | |
| """ | |
| try: | |
| response = client.chat.completions.create( | |
| model="deepseek-chat", # DeepSeek's main model | |
| messages=messages, | |
| max_tokens=600, | |
| temperature=0.2, | |
| response_format={"type": "json_object"} # Enforce JSON output | |
| ) | |
| # Extract the response content | |
| text = response.choices[0].message.content.strip() | |
| # Handle possible ```json ... ``` wrapping | |
| if text.startswith("```"): | |
| text = text.strip("`") | |
| if text.startswith("json"): | |
| text = text[4:].lstrip() | |
| # Parse JSON | |
| try: | |
| return json.loads(text) | |
| except json.JSONDecodeError as e: | |
| # If JSON parsing fails, try to extract JSON from the text | |
| import re | |
| json_match = re.search(r'\{.*\}', text, re.DOTALL) | |
| if json_match: | |
| return json.loads(json_match.group()) | |
| else: | |
| raise RuntimeError(f"Model did not return valid JSON: {text[:200]}...") | |
| except Exception as e: | |
| raise RuntimeError(f"DeepSeek API call failed: {str(e)}") | |
| # ---------------- PROMPT DETECTION ---------------- | |
| def is_password_prompt(line: str, username: Optional[str] = None) -> bool: | |
| """ | |
| Detect lines that are likely asking specifically for a password. | |
| We handle these automatically with the stored password. | |
| """ | |
| lower = line.lower().strip() | |
| if "password" not in lower and "passphrase" not in lower: | |
| return False | |
| # Typical sudo prompts | |
| if "sudo" in lower: | |
| return True | |
| if username and username.lower() in lower: | |
| return True | |
| if lower.endswith(":"): | |
| return True | |
| return True | |
| def is_interactive_prompt(line: str) -> bool: | |
| """ | |
| Detect lines that are likely asking for interactive *non-password* input: | |
| confirmations, choices, etc. | |
| """ | |
| lower = line.lower().rstrip() | |
| if not lower: | |
| return False | |
| # Don't treat password prompts as generic interactive prompts | |
| if "password" in lower or "passphrase" in lower: | |
| return False | |
| looks_like_prompt = ( | |
| lower.endswith(":") | |
| or lower.endswith("?") | |
| or "[y/n]" in lower | |
| or "[y/n]" in lower.replace(" ", "") | |
| or "[Y/n]" in lower | |
| or "[y/N]" in lower | |
| ) | |
| if not looks_like_prompt: | |
| return False | |
| keywords = [ | |
| "do you want to continue", | |
| "continue?", | |
| "press enter", | |
| "press return", | |
| "enter choice", | |
| "enter selection", | |
| "enter new", | |
| "are you sure", | |
| "proceed?", | |
| "confirm", | |
| "choose", | |
| "select", | |
| ] | |
| return any(k in lower for k in keywords) | |
| # ---------------- COMMAND SANITISER ---------------- | |
| def make_noninteractive(command: str) -> str: | |
| """ | |
| Rewrite certain commands to be non-interactive. | |
| - For any 'systemctl' invocation, ensure '--no-pager' is present. | |
| """ | |
| parts = command.strip().split() | |
| if not parts: | |
| return command | |
| # Find 'systemctl' in the token list | |
| try: | |
| idx = parts.index("systemctl") | |
| except ValueError: | |
| return command # nothing to change | |
| # If '--no-pager' is not already present, insert it after 'systemctl' | |
| if "--no-pager" not in parts[idx + 1:]: | |
| parts.insert(idx + 1, "--no-pager") | |
| return " ".join(parts) | |
| def make_apt_noninteractive(cmd: str) -> str: | |
| """Add non-interactive flags to apt commands""" | |
| if "apt-get" in cmd or " apt " in cmd or cmd.startswith("apt "): | |
| # prepend DEBIAN_FRONTEND if not present | |
| if "DEBIAN_FRONTEND=" not in cmd: | |
| cmd = "DEBIAN_FRONTEND=noninteractive " + cmd | |
| # ensure -y is present for apt-get/apt install | |
| if ("apt-get" in cmd or " apt " in cmd) and " install " in cmd and " -y" not in cmd: | |
| cmd = cmd.replace(" install ", " install -y ") | |
| # ensure -qq for less output during updates | |
| if "apt-get" in cmd and " -qq" not in cmd and " update" in cmd: | |
| cmd = cmd.replace("apt-get", "apt-get -qq") | |
| return cmd | |
| # ---------------- LOCAL EXECUTOR ---------------- | |
| class LocalExecutor: | |
| """ | |
| Executes commands directly on the local machine with real-time output streaming. | |
| Handles interactive prompts, sudo passwords, and timeouts. | |
| """ | |
| def __init__(self, sudo_password: Optional[str] = None): | |
| self.sudo_password = sudo_password | |
| self.username = os.environ.get("USER", os.environ.get("USERNAME", "unknown")) | |
| self.process = None | |
| self.stdout_thread = None | |
| self.stderr_thread = None | |
| self.running = False | |
| self.stdout_lines = [] | |
| self.stderr_lines = [] | |
| self.password_sent = False | |
| def connect(self): | |
| """No-op for local execution""" | |
| pass | |
| def close(self): | |
| """Clean up any running processes""" | |
| self.running = False | |
| if self.process and self.process.poll() is None: | |
| try: | |
| self.process.terminate() | |
| time.sleep(0.5) | |
| if self.process.poll() is None: | |
| self.process.kill() | |
| except: | |
| pass | |
| def _send_password(self): | |
| """Send sudo password to the process""" | |
| if not self.password_sent and self.process and self.process.stdin and not self.process.stdin.closed: | |
| try: | |
| self.process.stdin.write(self.sudo_password + "\n") | |
| self.process.stdin.flush() | |
| self.password_sent = True | |
| print(f"{COL_INFO}[AUTO] Sent sudo password{RESET}") | |
| except (BrokenPipeError, OSError): | |
| pass | |
| def _handle_prompt(self, line: str): | |
| """Handle interactive prompts by asking the user""" | |
| if self.process and self.process.stdin and not self.process.stdin.closed: | |
| try: | |
| user_val = input(f"{COL_INPUT}[INPUT REQUIRED]{RESET} {line.strip()} ") | |
| self.process.stdin.write(user_val + "\n") | |
| self.process.stdin.flush() | |
| except (BrokenPipeError, OSError): | |
| pass | |
| def _reader_thread(self, pipe, is_stderr=False): | |
| """Thread function to continuously read from a pipe""" | |
| try: | |
| for line in iter(pipe.readline, ''): | |
| if line: | |
| if is_stderr: | |
| self.stderr_lines.append(line) | |
| # Print immediately with label | |
| if not hasattr(self, 'stderr_started'): | |
| self.stderr_started = True | |
| print(f"{COL_STDERR_LABEL}[STDERR]{RESET} ", end="") | |
| print(line, end="", flush=True) | |
| # Check for prompts | |
| if is_password_prompt(line, self.username) and self.sudo_password: | |
| self._send_password() | |
| elif is_interactive_prompt(line): | |
| self._handle_prompt(line) | |
| else: | |
| self.stdout_lines.append(line) | |
| # Print immediately with label | |
| if not hasattr(self, 'stdout_started'): | |
| self.stdout_started = True | |
| print(f"{COL_STDOUT_LABEL}[STDOUT]{RESET} ", end="") | |
| print(line, end="", flush=True) | |
| # Check for prompts | |
| if is_password_prompt(line, self.username) and self.sudo_password: | |
| self._send_password() | |
| elif is_interactive_prompt(line): | |
| self._handle_prompt(line) | |
| else: | |
| break | |
| except (IOError, OSError, ValueError) as e: | |
| # Pipe was closed or other error | |
| pass | |
| finally: | |
| try: | |
| pipe.close() | |
| except: | |
| pass | |
| def run(self, command: str, idle_timeout: int = 300) -> Dict[str, Any]: | |
| """ | |
| Run a command locally with real-time output streaming. | |
| Args: | |
| command: The command to execute | |
| idle_timeout: Seconds of no output before killing the process | |
| Returns: | |
| Dict with stdout, stderr, exit_status, timed_out, idle_timeout | |
| """ | |
| # Reset state | |
| self.stdout_lines = [] | |
| self.stderr_lines = [] | |
| self.password_sent = False | |
| self.stdout_started = False | |
| self.stderr_started = False | |
| # Sanitize the command | |
| cmd_stripped = command.strip() | |
| if not cmd_stripped: | |
| return { | |
| "stdout": "", | |
| "stderr": "Empty command", | |
| "exit_status": -1, | |
| "timed_out": False, | |
| "idle_timeout": idle_timeout | |
| } | |
| cmd_sanitized = make_noninteractive(cmd_stripped) | |
| cmd_sanitized = make_apt_noninteractive(cmd_sanitized) | |
| uses_sudo = cmd_sanitized.startswith("sudo ") | |
| # For sudo commands, ensure we can feed password | |
| effective_cmd = cmd_sanitized | |
| if uses_sudo and self.sudo_password: | |
| # Add -S to read password from stdin and -k to ignore cached credentials | |
| parts = cmd_sanitized.split() | |
| if len(parts) >= 1 and parts[0] == "sudo": | |
| # Add -S and -k flags if not present | |
| flags_to_add = [] | |
| if "-S" not in parts[1:3]: # Check first few args | |
| flags_to_add.append("-S") | |
| if "-k" not in parts[1:3]: | |
| flags_to_add.append("-k") | |
| if flags_to_add: | |
| # Insert flags after 'sudo' | |
| for flag in reversed(flags_to_add): | |
| parts.insert(1, flag) | |
| effective_cmd = " ".join(parts) | |
| print(f"{COL_CMD}[EXECUTING]{RESET} {effective_cmd}") | |
| # Start the process with pipes for real-time output | |
| try: | |
| self.process = subprocess.Popen( | |
| effective_cmd, | |
| shell=True, | |
| stdin=subprocess.PIPE, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True, | |
| bufsize=1, # Line buffered | |
| universal_newlines=True | |
| ) | |
| except Exception as e: | |
| return { | |
| "stdout": "", | |
| "stderr": f"Failed to start process: {str(e)}", | |
| "exit_status": -1, | |
| "timed_out": False, | |
| "idle_timeout": idle_timeout | |
| } | |
| # Start reader threads for continuous output | |
| self.stdout_thread = threading.Thread( | |
| target=self._reader_thread, | |
| args=(self.process.stdout, False) | |
| ) | |
| self.stderr_thread = threading.Thread( | |
| target=self._reader_thread, | |
| args=(self.process.stderr, True) | |
| ) | |
| self.stdout_thread.daemon = True | |
| self.stderr_thread.daemon = True | |
| self.stdout_thread.start() | |
| self.stderr_thread.start() | |
| # Send initial sudo password if needed | |
| if uses_sudo and self.sudo_password: | |
| self._send_password() | |
| # Monitor process with timeout | |
| timed_out = False | |
| last_output_time = time.time() | |
| # Main monitoring loop | |
| while True: | |
| # Check if process has finished | |
| exit_code = self.process.poll() | |
| # Update last output time based on new lines | |
| if self.stdout_lines or self.stderr_lines: | |
| # Check the last few lines to see if any are new | |
| current_stdout_len = len(self.stdout_lines) | |
| current_stderr_len = len(self.stderr_lines) | |
| # Simple heuristic - if we have lines, there was output | |
| if current_stdout_len > 0 or current_stderr_len > 0: | |
| last_output_time = time.time() | |
| # Check for idle timeout (only if process is still running) | |
| if exit_code is None: # Process still running | |
| if time.time() - last_output_time > idle_timeout: | |
| timed_out = True | |
| timeout_msg = f"\n[CLIENT NOTICE] No output for {idle_timeout} seconds. Killing process.\n" | |
| print(f"{COL_WARN}{timeout_msg}{RESET}", end="", flush=True) | |
| self.stderr_lines.append(timeout_msg) | |
| # Kill the process | |
| try: | |
| self.process.terminate() | |
| time.sleep(0.5) | |
| if self.process.poll() is None: | |
| self.process.kill() | |
| except Exception: | |
| pass | |
| break | |
| else: | |
| # Process finished, wait a moment for threads to finish reading | |
| time.sleep(0.2) | |
| break | |
| # Brief sleep to prevent CPU spinning | |
| time.sleep(0.1) | |
| # Wait for reader threads to finish (with timeout) | |
| if self.stdout_thread and self.stdout_thread.is_alive(): | |
| self.stdout_thread.join(timeout=1) | |
| if self.stderr_thread and self.stderr_thread.is_alive(): | |
| self.stderr_thread.join(timeout=1) | |
| # Close stdin to avoid broken pipe errors | |
| if self.process.stdin and not self.process.stdin.closed: | |
| try: | |
| self.process.stdin.close() | |
| except Exception: | |
| pass | |
| # Get final exit status | |
| if timed_out: | |
| exit_status = -1 | |
| else: | |
| exit_status = self.process.returncode if self.process.returncode is not None else -1 | |
| # Combine all output | |
| stdout = "".join(self.stdout_lines) | |
| stderr = "".join(self.stderr_lines) | |
| # Check for common sudo errors | |
| if uses_sudo and "not in the sudoers file" in stderr: | |
| print(f"\n{COL_WARN}[SUDO ERROR]{RESET} User '{self.username}' is not allowed to use sudo.") | |
| elif uses_sudo and "no password was provided" in stderr.lower(): | |
| print(f"\n{COL_WARN}[SUDO ERROR]{RESET} Sudo password incorrect or not provided.") | |
| return { | |
| "stdout": stdout, | |
| "stderr": stderr, | |
| "exit_status": exit_status, | |
| "timed_out": timed_out, | |
| "idle_timeout": idle_timeout | |
| } | |
| # ---------------- SYSTEM PROMPT ---------------- | |
| SYSTEM_PROMPT = """ | |
| You are an autonomous Linux sysadmin assistant running directly on the local machine. | |
| You MUST respond with STRICT JSON only. | |
| IMPORTANT RULES ABOUT OUTPUT: | |
| - Your ENTIRE reply MUST be exactly ONE JSON object. | |
| - Do NOT include multiple JSON objects. | |
| - Do NOT include any extra text, comments, or markdown. | |
| - Do NOT wrap the JSON in ``` fences. | |
| INTERACTION MODEL: | |
| - You can propose commands to be executed directly on this machine. | |
| - The client runs your commands and streams back stdout/stderr in real-time. | |
| - The client supports answering line-based prompts (like 'Do you want to continue?'), | |
| but it CANNOT reliably drive interactive TUIs or full-screen console menus. | |
| ABSOLUTE RESTRICTIONS: | |
| - NEVER use interactive console/TUI tools like: whiptail, dialog, nmtui, top, htop, | |
| alsamixer, raspi-config, menuconfig, or dpkg-reconfigure without a fully | |
| non-interactive mode. | |
| - NEVER rely on curses-style full-screen UIs, wizard menus, or arrow-key navigation. | |
| - ALWAYS use non-interactive configuration methods: | |
| - edit config files directly with sed/awk/cat/tee, | |
| - or use tools/flags that are specifically documented as non-interactive. | |
| APT / PACKAGE MANAGEMENT: | |
| - When using apt-get/apt: | |
| - Always include: `DEBIAN_FRONTEND=noninteractive` in the environment. | |
| - Use: `apt-get -y` (assume yes to prompts). | |
| Example: | |
| DEBIAN_FRONTEND=noninteractive sudo apt-get update | |
| DEBIAN_FRONTEND=noninteractive sudo apt-get -y install rkhunter | |
| LONG-RUNNING COMMANDS: | |
| - The client shows output in REAL TIME as it arrives from the command. | |
| - If a command produces no output for the idle timeout period, it will be killed. | |
| - Do NOT use the 'timeout' utility - the client handles this automatically. | |
| COMMAND RESPONSE FORMAT: | |
| At each step you MUST output JSON ONLY, in one of these forms: | |
| 1) To run a command: | |
| { | |
| "type": "command", | |
| "thought": "<brief reasoning>", | |
| "risk": "<brief risk description>", | |
| "command": "<bash command>" | |
| } | |
| 2) When finished: | |
| { | |
| "type": "done", | |
| "thought": "<why>", | |
| "summary": "<final result for the user>" | |
| } | |
| The "risk" field is REQUIRED for every command. | |
| General behavior: | |
| - Keep commands simple and robust. | |
| - Prefer non-destructive checks first (ls, cat, grep, systemctl status). | |
| - Avoid obviously destructive commands (rm -rf /, dropping entire databases, etc.) | |
| unless the user explicitly requested such destructive action. | |
| - Use previous command outputs to adjust your next step. | |
| """ | |
| def truncate(s: str, limit: int = 2000) -> str: | |
| """Truncate long strings for API calls""" | |
| s = s or "" | |
| if len(s) <= limit: | |
| return s | |
| return s[:limit] + f"\n...[truncated {len(s)-limit} chars]" | |
| def compute_effective_idle_timeout(command: str, default_idle: int) -> int: | |
| """ | |
| If command starts with 'timeout N ...', use max(default_idle, N+30) as idle timeout, | |
| so we don't kill it earlier than the command's own timeout. | |
| """ | |
| cmd = command.strip() | |
| if not cmd.startswith("timeout "): | |
| return default_idle | |
| parts = cmd.split() | |
| if len(parts) < 2: | |
| return default_idle | |
| try: | |
| secs = int(parts[1]) | |
| return max(default_idle, secs + 30) | |
| except ValueError: | |
| return max(default_idle, 600) | |
| def run_agent(goal: str, executor: LocalExecutor, max_steps: int = 50, idle_timeout: int = 300): | |
| """Main agent loop""" | |
| history = [ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": f"GOAL: {goal}"}, | |
| ] | |
| allow_all = False # When True, auto-run all commands without confirmation | |
| for step in range(1, max_steps + 1): | |
| print(f"\n\n{COL_SEP}==================== STEP {step} ===================={RESET}\n") | |
| history.append({"role": "user", "content": f"STEP {step}: Decide next action."}) | |
| try: | |
| action = call_deepseek(history) | |
| except Exception as e: | |
| print(f"{COL_WARN}[ERROR]{RESET} Failed to get response from DeepSeek: {e}") | |
| retry = input(f"{COL_INPUT}Retry? [y/n]: {RESET}").strip().lower() | |
| if retry == 'y': | |
| continue | |
| else: | |
| break | |
| action_type = action.get("type") | |
| thought = action.get("thought", "") | |
| risk = action.get("risk", "").strip() | |
| thought_text = thought or "(no thought provided)" | |
| print(f"{COL_THOUGHT}[MODEL THOUGHT]{RESET} {COL_THOUGHT}{thought_text}{RESET}") | |
| if not risk: | |
| risk = "Risk not specified (assume minimal, but review command carefully)." | |
| print(f"{COL_RISK}[RISK]{RESET} {COL_RISK}{risk}{RESET}") | |
| if action_type == "done": | |
| summary = action.get("summary", "") or "(no summary provided)" | |
| print(f"{COL_INFO}[SUMMARY]{RESET} {COL_INFO}{summary}{RESET}") | |
| # Ask if user wants to continue or finish | |
| while True: | |
| follow = input(f"\n{COL_INPUT}Task completed? Is there anything else? [y/n]: {RESET}").strip().lower() | |
| if follow in ("y", "n"): | |
| break | |
| print(f"{COL_WARN}Please enter 'y' or 'n'.{RESET}") | |
| if follow == "n": | |
| print(f"\n{COL_INFO}[SESSION CLOSED]{RESET} Bye!") | |
| break | |
| else: | |
| extra = input(f"{COL_INPUT}What else would you like me to do? {RESET}").strip() | |
| if not extra: | |
| print(f"\n{COL_INFO}[SESSION CLOSED]{RESET} Bye!") | |
| break | |
| history.append({ | |
| "role": "user", | |
| "content": f"New request: {extra}\n\nPlease continue from the current state." | |
| }) | |
| continue | |
| command = action.get("command", "").strip() | |
| if not command: | |
| print(f"{COL_WARN}[WARN]{RESET} No command returned by model. Aborting.") | |
| print(f"\n{COL_INFO}[SESSION CLOSED]{RESET} Bye!") | |
| break | |
| print(f"{COL_CMD}[PROPOSED COMMAND]{RESET} {COL_CMD}{command}{RESET}") | |
| # Guardrail: block known interactive TUI tools | |
| TUI_BLOCKLIST = [ | |
| "whiptail", "dialog ", "nmtui", "raspi-config", "menuconfig", | |
| "htop", "top ", "alsamixer", "dpkg-reconfigure", "vim", "nano", | |
| "emacs", "less", "more", "man " | |
| ] | |
| if any(bad in command for bad in TUI_BLOCKLIST) and not any(flag in command for flag in ["-y", "--no-pager", "-n"]): | |
| print(f"{COL_WARN}[BLOCKED]{RESET} Command appears to use an interactive tool.") | |
| if not allow_all: | |
| force = input(f"{COL_INPUT}Force run anyway? [y/N]: {RESET}").strip().lower() | |
| if force != 'y': | |
| history.append({ | |
| "role": "user", | |
| "content": f"Command '{command}' was blocked for being interactive. Please propose a non-interactive alternative." | |
| }) | |
| continue | |
| # Decide whether to prompt or auto-run | |
| if not allow_all: | |
| while True: | |
| choice = input(f"\n{COL_INPUT}Allow this command? [y=run, s=skip, n=terminate, e=edit thought, a=allow all]: {RESET}").strip().lower() | |
| if choice in ("y", "s", "n", "e", "a"): | |
| break | |
| print(f"{COL_WARN}Please enter 'y', 's', 'n', 'e', or 'a'.{RESET}") | |
| if choice == "n": | |
| print(f"\n{COL_INFO}[SESSION CLOSED]{RESET} Bye!") | |
| break | |
| if choice == "s": | |
| history.append({ | |
| "role": "user", | |
| "content": f"I refused to run this command:\n{command}\n\nRisk: {risk}\n\nPlease propose an alternative." | |
| }) | |
| continue | |
| if choice == "e": | |
| user_edit = input(f"{COL_INPUT}Enter your guidance: {RESET}").strip() | |
| if not user_edit: | |
| user_edit = "Please propose a better approach." | |
| history.append({ | |
| "role": "user", | |
| "content": f"Instead of the previous command, follow this guidance:\n{user_edit}" | |
| }) | |
| continue | |
| if choice == "a": | |
| allow_all = True | |
| print(f"{COL_INFO}[INFO]{RESET} All future commands will be auto-run.") | |
| # Execute the command with real-time output | |
| effective_idle = compute_effective_idle_timeout(command, idle_timeout) | |
| print(f"{COL_INFO}[INFO]{RESET} Executing with idle timeout = {effective_idle}s...\n") | |
| # Add a small separator before command output | |
| print(f"{COL_SEP}--- COMMAND OUTPUT ---{RESET}") | |
| result = executor.run(command, idle_timeout=effective_idle) | |
| # Show exit status | |
| if result["timed_out"] or result["exit_status"] != 0: | |
| col = COL_EXIT_BAD | |
| else: | |
| col = COL_EXIT_OK | |
| print(f"\n{col}--- COMMAND COMPLETED ---{RESET}") | |
| if result["timed_out"]: | |
| print(f"{col}[EXIT STATUS]{RESET} TIMEOUT (idle timeout: {result['idle_timeout']}s)") | |
| else: | |
| print(f"{col}[EXIT STATUS]{RESET} {result['exit_status']}") | |
| # Add result to history (truncated for API) | |
| feedback = ( | |
| f"Command: {command}\n" | |
| f"Exit status: {result['exit_status']}\n" | |
| f"Timed out: {result['timed_out']}\n" | |
| f"STDOUT:\n{truncate(result['stdout'])}\n\n" | |
| f"STDERR:\n{truncate(result['stderr'])}\n" | |
| ) | |
| history.append({"role": "user", "content": "RESULT:\n" + feedback}) | |
| def main(): | |
| """Main entry point""" | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Local Autonomous DeepSeek Sysadmin with Real-time Output") | |
| parser.add_argument("--sudo-password", help="Sudo password (if needed)") | |
| parser.add_argument("--goal", help="Task/goal for the agent to perform") | |
| parser.add_argument("--idle-timeout", type=int, default=300, help="Idle timeout in seconds") | |
| parser.add_argument("--max-steps", type=int, default=50, help="Maximum number of steps") | |
| args = parser.parse_args() | |
| print(f"{COL_SEP}=== Local Autonomous DeepSeek Sysadmin (Real-time Output) ==={RESET}\n") | |
| print(f"{COL_INFO}Running on: {os.uname().nodename}{RESET}") | |
| print(f"{COL_INFO}User: {os.environ.get('USER', os.environ.get('USERNAME', 'unknown'))}{RESET}\n") | |
| # Get sudo password if needed | |
| sudo_password = args.sudo_password | |
| if sudo_password is None: | |
| need_sudo = input(f"{COL_INPUT}Will you need sudo access? [y/N]: {RESET}").strip().lower() | |
| if need_sudo == 'y': | |
| import getpass | |
| sudo_password = getpass.getpass(f"{COL_INPUT}Enter sudo password: {RESET}") | |
| # Get goal | |
| goal = args.goal | |
| if not goal: | |
| goal = input(f"{COL_INPUT}Enter goal (task to perform): {RESET}").strip() | |
| if not goal: | |
| print(f"{COL_WARN}No goal provided. Exiting.{RESET}") | |
| return | |
| # Create executor | |
| executor = LocalExecutor(sudo_password=sudo_password) | |
| try: | |
| run_agent(goal, executor, max_steps=args.max_steps, idle_timeout=args.idle_timeout) | |
| except KeyboardInterrupt: | |
| print(f"\n\n{COL_WARN}[INTERRUPTED]{RESET} Session terminated by user.") | |
| except Exception as e: | |
| print(f"\n{COL_WARN}[ERROR]{RESET} {e}") | |
| finally: | |
| executor.close() | |
| print(f"\n{COL_INFO}[SESSION ENDED]{RESET}") | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment