Last active
August 10, 2025 15:31
-
-
Save IlyaGulya/17f881adc06e7d80e26bdea774f0f13c to your computer and use it in GitHub Desktop.
Bitwarden Ansible vault password daemon
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/bin/bash | |
| # Set required Bitwarden item name | |
| export VAULT_SECRETD_BW_ITEM="Ansible vault key" # CHANGE THIS VALUE TO NAME OF YOUR BITWARDEN PASSWORD ITEM | |
| # Ansible Vault password script using vault_secretd.py daemon | |
| set -euo pipefail | |
| die(){ echo "ERROR: $*" >&2; exit 1; } | |
| # Get the directory where this script lives | |
| SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" | |
| VAULT_SECRETD="$SCRIPT_DIR/vault_secretd.py" | |
| # Check if vault_secretd.py exists and is executable | |
| if [[ ! -f "$VAULT_SECRETD" ]]; then | |
| die "vault_secretd.py not found at $VAULT_SECRETD" | |
| fi | |
| if [[ ! -x "$VAULT_SECRETD" ]]; then | |
| chmod +x "$VAULT_SECRETD" 2>/dev/null || die "vault_secretd.py is not executable" | |
| fi | |
| # Handle command line arguments | |
| verbose_flag="" | |
| if [[ $# -gt 0 && "$1" == "--verbose" ]]; then | |
| verbose_flag="--verbose" | |
| shift | |
| fi | |
| if [[ $# -gt 0 && "$1" == "--clear-cache" ]]; then | |
| "$VAULT_SECRETD" $verbose_flag clear 2>/dev/null || true | |
| exit 0 | |
| fi | |
| # Try to get password from environment first | |
| if [[ -n "${ANSIBLE_VAULT_KEY:-}" ]]; then | |
| printf '%s' "$ANSIBLE_VAULT_KEY" | |
| exit 0 | |
| fi | |
| # Get the password from vault_secretd.py | |
| if vault_password="$("$VAULT_SECRETD" $verbose_flag get)"; then | |
| if [[ -n "$vault_password" ]]; then | |
| printf '%s' "$vault_password" | |
| exit 0 | |
| fi | |
| fi | |
| die "Could not get vault password" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| vault_secretd.py — Memory-only secret daemon with GUI/TTY prompt and TTL | |
| - Uses stdlib (tkinter for GUI) and Bitwarden CLI (`bw`) for secret retrieval. | |
| - Caches Ansible Vault password in-memory with configurable TTL. | |
| - Prompts for Bitwarden master password via TTY or GUI (Tkinter). | |
| - Socket: ~/Library/Caches/ansible-vault-secretd/secretd.sock | |
| - API (newline-delimited JSON arrays): | |
| ["ping"] -> ["ok"] or ["error", "message"] | |
| ["status"] -> ["ok", present, ttl_seconds, seconds_remaining] | |
| ["clear"] -> ["ok"] or ["error", "message"] | |
| ["get", allow_prompt, ttl_seconds] -> ["ok", secret] or ["error", "message"] | |
| - CLI: | |
| vault_secretd.py get [--no-prompt] [--ttl-seconds N] [--verbose] | |
| vault_secretd.py clear [--verbose] | |
| vault_secretd.py status [--verbose] | |
| vault_secretd.py serve [--ttl-seconds N] [--socket PATH] [--verbose] | |
| - Environment: | |
| SECRET_TTL_SECONDS (default: 3600) | |
| VAULT_SECRETD_SOCKET (override socket path) | |
| VAULT_SECRETD_PROMPT_TITLE (GUI title, default: "Bitwarden") | |
| VAULT_SECRETD_PROMPT_TEXT (GUI/TTY text, default: "Enter Bitwarden Master Password:") | |
| VAULT_SECRETD_BW (Bitwarden CLI path, default: "bw") | |
| VAULT_SECRETD_BW_ITEM (required: item name or ID) | |
| VAULT_SECRETD_BW_FIELD (field to read: password|notes|<custom-field>, default: "password") | |
| """ | |
| import argparse | |
| import json | |
| import logging | |
| import os | |
| import signal | |
| import socket | |
| import subprocess | |
| import sys | |
| import threading | |
| import time | |
| from typing import List, Optional, Union, Tuple | |
| # ---------------- Configuration ---------------- | |
| DEFAULT_TTL = int(os.environ.get("SECRET_TTL_SECONDS", "3600")) | |
| DEFAULT_TITLE = os.environ.get("VAULT_SECRETD_PROMPT_TITLE", "Bitwarden") | |
| DEFAULT_TEXT = os.environ.get("VAULT_SECRETD_PROMPT_TEXT", "Enter Bitwarden Master Password:") | |
| BW_CMD = os.environ.get("VAULT_SECRETD_BW", "bw") | |
| BW_ITEM = os.environ.get("VAULT_SECRETD_BW_ITEM", "").strip() | |
| BW_FIELD = os.environ.get("VAULT_SECRETD_BW_FIELD", "password").strip() or "password" | |
| SOCKET_PATH = os.environ.get("VAULT_SECRETD_SOCKET", None) | |
| def get_default_socket_path() -> str: | |
| base = os.path.join(os.path.expanduser("~"), "Library", "Caches", "ansible-vault-secretd") | |
| os.makedirs(base, exist_ok=True) | |
| os.chmod(base, 0o700) | |
| return os.path.join(base, "secretd.sock") | |
| # ---------------- Logging Setup ---------------- | |
| def setup_logging(verbose: bool) -> None: | |
| if verbose: | |
| logging.basicConfig( | |
| level=logging.DEBUG, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| handlers=[logging.StreamHandler(sys.stderr)] | |
| ) | |
| else: | |
| # Quiet by default - suppress all logging output | |
| logging.basicConfig(level=logging.CRITICAL + 1) | |
| # ---------------- Secret Cache ---------------- | |
| class SecretCache: | |
| def __init__(self, ttl_seconds: int): | |
| self._lock = threading.RLock() | |
| self._secret: bytes = b"" | |
| self._expires_at: float = 0.0 | |
| self._ttl: int = ttl_seconds | |
| self._timer: Optional[threading.Timer] = None | |
| def _schedule_expiry(self) -> None: | |
| if self._timer: | |
| self._timer.cancel() | |
| remaining = max(0.0, self._expires_at - time.monotonic()) | |
| if remaining <= 0: | |
| self.clear() | |
| return | |
| self._timer = threading.Timer(remaining, self.clear) | |
| self._timer.daemon = True | |
| self._timer.start() | |
| def set(self, secret: bytes, ttl_seconds: Optional[int] = None) -> None: | |
| with self._lock: | |
| self._secret = secret[:] | |
| self._ttl = ttl_seconds if ttl_seconds is not None else self._ttl | |
| self._expires_at = time.monotonic() + self._ttl | |
| self._schedule_expiry() | |
| def get(self) -> bytes: | |
| with self._lock: | |
| if not self._secret or time.monotonic() >= self._expires_at: | |
| self.clear() | |
| return b"" | |
| return self._secret[:] | |
| def clear(self) -> None: | |
| with self._lock: | |
| self._secret = b"" | |
| self._expires_at = 0.0 | |
| if self._timer: | |
| self._timer.cancel() | |
| self._timer = None | |
| def status(self) -> Tuple[bool, int, int]: | |
| with self._lock: | |
| remaining = max(0.0, self._expires_at - time.monotonic()) if self._secret else 0.0 | |
| return (bool(self._secret), self._ttl, int(remaining)) | |
| # ---------------- Bitwarden Prompt ---------------- | |
| def prompt_tty(prompt: str) -> str: | |
| if not os.path.exists("/dev/tty"): | |
| logging.debug("/dev/tty not available") | |
| return "" | |
| try: | |
| import getpass | |
| # Use getpass for secure password input | |
| return getpass.getpass(prompt) | |
| except (KeyboardInterrupt, EOFError): | |
| logging.debug("TTY prompt cancelled by user") | |
| return "" | |
| except Exception as e: | |
| logging.debug(f"TTY prompt failed: {e}") | |
| return "" | |
| def prompt_gui(title, text): | |
| try: | |
| import tkinter as tk | |
| from tkinter import ttk | |
| root = tk.Tk() | |
| root.withdraw() | |
| dialog = tk.Toplevel(root) | |
| dialog.title(title) | |
| dialog.resizable(False, False) | |
| dialog.withdraw() | |
| dialog.transient(root) | |
| frame = ttk.Frame(dialog, padding="20") | |
| frame.grid(row=0, column=0, sticky="ew") | |
| label = ttk.Label(frame, text=text) | |
| label.grid(row=0, column=0, columnspan=2, pady=(0, 10)) | |
| password_var = tk.StringVar() | |
| entry = ttk.Entry(frame, textvariable=password_var, show="*", width=30) | |
| entry.grid(row=1, column=0, columnspan=2, pady=(0, 10)) | |
| result = [None] | |
| def close_with(value): | |
| result[0] = value | |
| try: | |
| dialog.grab_release() | |
| except Exception: | |
| pass | |
| dialog.withdraw() | |
| dialog.update_idletasks() | |
| dialog.destroy() | |
| def on_ok(): | |
| close_with(password_var.get()) | |
| def on_cancel(): | |
| close_with(None) | |
| def on_enter(event): | |
| on_ok() | |
| button_frame = ttk.Frame(frame) | |
| button_frame.grid(row=2, column=0, columnspan=2) | |
| ok_button = ttk.Button(button_frame, text="OK", command=on_ok) | |
| ok_button.grid(row=0, column=0, padx=(0, 5)) | |
| cancel_button = ttk.Button(button_frame, text="Cancel", command=on_cancel) | |
| cancel_button.grid(row=0, column=1, padx=(5, 0)) | |
| entry.bind("<Return>", on_enter) | |
| dialog.bind("<Return>", on_enter) | |
| dialog.protocol("WM_DELETE_WINDOW", on_cancel) | |
| dialog.update_idletasks() | |
| width = dialog.winfo_reqwidth() | |
| height = dialog.winfo_reqheight() | |
| screen_width = dialog.winfo_screenwidth() | |
| screen_height = dialog.winfo_screenheight() | |
| x = (screen_width - width) // 2 | |
| y = (screen_height - height) // 2 | |
| dialog.geometry(f"{width}x{height}+{x}+{y}") | |
| dialog.deiconify() | |
| dialog.grab_set() | |
| dialog.focus_set() | |
| entry.focus_set() | |
| dialog.wait_window() | |
| try: | |
| root.update() | |
| except Exception: | |
| pass | |
| root.destroy() | |
| return result[0] if result[0] is not None else "" | |
| except Exception as e: | |
| logging.debug(f"GUI prompt failed: {e}") | |
| return "" | |
| def prompt_bitwarden_master() -> str: | |
| logging.debug("Attempting to prompt for Bitwarden master password...") | |
| # First try GUI | |
| logging.debug("Trying GUI prompt first") | |
| result = prompt_gui(DEFAULT_TITLE, DEFAULT_TEXT) | |
| if result: | |
| logging.debug("GUI prompt succeeded") | |
| return result | |
| logging.debug("GUI prompt failed or empty") | |
| # Fallback to TTY if available | |
| if os.isatty(sys.stdin.fileno()) and os.isatty(sys.stderr.fileno()): | |
| logging.debug("GUI failed, trying TTY prompt as fallback") | |
| result = prompt_tty(f"{DEFAULT_TEXT} ") | |
| if result: | |
| logging.debug("TTY prompt succeeded") | |
| return result | |
| logging.debug("TTY prompt failed or empty") | |
| else: | |
| logging.debug("No TTY available for fallback") | |
| return "" | |
| # ---------------- Bitwarden Integration ---------------- | |
| def run_bw(args: List[str], input_text: Optional[str] = None, session_token: Optional[str] = None) -> str: | |
| env = os.environ.copy() | |
| if session_token is not None: | |
| env["BW_SESSION"] = session_token | |
| else: | |
| env["BW_SESSION"] = "" | |
| cmd = [BW_CMD] + args | |
| logging.debug(f"Running Bitwarden command: {' '.join(cmd)} (session: {'***' if session_token else 'none'})") | |
| try: | |
| result = subprocess.run( | |
| cmd, | |
| input=input_text, | |
| text=True, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| env=env, | |
| check=True, | |
| timeout=30 # Add 30 second timeout | |
| ) | |
| logging.debug(f"Bitwarden command succeeded, output length: {len(result.stdout)}") | |
| return result.stdout.strip() | |
| except FileNotFoundError: | |
| raise RuntimeError(f"Bitwarden CLI '{BW_CMD}' not found") | |
| except subprocess.TimeoutExpired: | |
| raise RuntimeError(f"Bitwarden command timed out after 30 seconds: {' '.join(cmd)}") | |
| except subprocess.CalledProcessError as e: | |
| logging.debug(f"Bitwarden command failed with exit code {e.returncode}") | |
| logging.debug(f"Bitwarden stderr: {e.stderr}") | |
| raise RuntimeError(f"Bitwarden command failed: {e.stderr}") | |
| def get_bitwarden_token(master_password: str) -> str: | |
| return run_bw(["unlock", "--raw"], input_text=master_password + "\n") | |
| def get_vault_password(token: str) -> str: | |
| if not BW_ITEM: | |
| raise RuntimeError("VAULT_SECRETD_BW_ITEM is not set") | |
| if BW_FIELD == "password": | |
| return run_bw(["get", "password", BW_ITEM], session_token=token) | |
| item_json = run_bw(["get", "item", BW_ITEM], session_token=token) | |
| try: | |
| item = json.loads(item_json) | |
| except json.JSONDecodeError: | |
| raise RuntimeError("Bitwarden returned invalid JSON") | |
| if BW_FIELD == "notes": | |
| value = item.get("notes", "") | |
| if not isinstance(value, str): | |
| raise RuntimeError("Item notes is missing or not a string") | |
| return value | |
| for field in item.get("fields", []): | |
| if field.get("name") == BW_FIELD: | |
| value = field.get("value", "") | |
| if not isinstance(value, str): | |
| raise RuntimeError(f"Custom field '{BW_FIELD}' is not a string") | |
| return value | |
| raise RuntimeError(f"Custom field '{BW_FIELD}' not found") | |
| def acquire_vault_secret() -> str: | |
| if not BW_ITEM: | |
| raise RuntimeError("VAULT_SECRETD_BW_ITEM must be set") | |
| logging.info("Prompting for Bitwarden master password...") | |
| master_password = prompt_bitwarden_master() | |
| if not master_password: | |
| raise RuntimeError("No Bitwarden master password provided (prompt cancelled or failed)") | |
| logging.info("Unlocking Bitwarden vault...") | |
| try: | |
| token = get_bitwarden_token(master_password) | |
| except Exception as e: | |
| raise RuntimeError(f"Bitwarden unlock failed: {e}") | |
| if not token: | |
| raise RuntimeError("Bitwarden unlock failed: empty token") | |
| logging.info(f"Retrieving vault password from Bitwarden item '{BW_ITEM}'...") | |
| try: | |
| return get_vault_password(token) | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to retrieve vault password: {e}") | |
| # ---------------- Command Handler ---------------- | |
| class CommandHandler: | |
| def __init__(self, cache: SecretCache): | |
| self.cache = cache | |
| def handle(self, request: List[Union[str, bool, int]]) -> List[Union[str, bool, int]]: | |
| if not request or not isinstance(request, list) or not request[0]: | |
| return ["error", "Invalid or empty request"] | |
| cmd = request[0] | |
| if cmd == "ping": | |
| return ["ok"] | |
| if cmd == "status": | |
| present, ttl, remaining = self.cache.status() | |
| return ["ok", present, ttl, remaining] | |
| if cmd == "clear": | |
| self.cache.clear() | |
| return ["ok"] | |
| if cmd == "get": | |
| # Daemon only returns cached secrets, no prompting | |
| secret = self.cache.get() | |
| if secret: | |
| return ["ok", secret.decode("utf-8")] | |
| else: | |
| return ["error", "No secret cached"] | |
| if cmd == "store": | |
| # Store a secret passed from the client | |
| if len(request) < 2: | |
| return ["error", "No secret provided to store"] | |
| secret = request[1] | |
| if isinstance(secret, str): | |
| secret = secret.encode("utf-8") | |
| ttl = int(request[2]) if len(request) > 2 and request[2] is not None else None | |
| self.cache.set(secret, ttl) | |
| return ["ok", "Secret stored"] | |
| return ["error", f"Unknown command: {cmd}"] | |
| # ---------------- Secret Server ---------------- | |
| class SecretServer: | |
| def __init__(self, socket_path: str, ttl_seconds: int, verbose: bool = False): | |
| self.socket_path = socket_path or get_default_socket_path() | |
| self.cache = SecretCache(ttl_seconds) | |
| self.handler = CommandHandler(self.cache) | |
| self.verbose = verbose | |
| self._server_socket: Optional[socket.socket] = None | |
| self._stop_event = threading.Event() | |
| def start(self) -> None: | |
| logging.info(f"Starting server on {self.socket_path} with TTL {self.cache._ttl}s") | |
| if os.path.exists(self.socket_path): | |
| try: | |
| test_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
| test_socket.settimeout(1.0) # Quick timeout for testing | |
| test_socket.connect(self.socket_path) | |
| test_socket.close() | |
| raise RuntimeError(f"Socket {self.socket_path} is already in use") | |
| except (FileNotFoundError, ConnectionRefusedError, socket.timeout): | |
| # Socket file exists but no server listening - remove stale socket | |
| logging.info(f"Removing stale socket file {self.socket_path}") | |
| os.unlink(self.socket_path) | |
| except OSError as e: | |
| # General connection error - likely stale socket | |
| logging.info(f"Removing stale socket file {self.socket_path} (OSError: {e})") | |
| os.unlink(self.socket_path) | |
| os.makedirs(os.path.dirname(self.socket_path), exist_ok=True) | |
| self._server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) | |
| self._server_socket.bind(self.socket_path) | |
| os.chmod(self.socket_path, 0o600) | |
| self._server_socket.listen(10) | |
| signal.signal(signal.SIGTERM, self._handle_signal) | |
| signal.signal(signal.SIGINT, self._handle_signal) | |
| logging.info("Server started successfully") | |
| try: | |
| while not self._stop_event.is_set(): | |
| self._server_socket.settimeout(0.5) | |
| try: | |
| conn, _ = self._server_socket.accept() | |
| threading.Thread(target=self._handle_client, args=(conn,), daemon=True).start() | |
| except socket.timeout: | |
| continue | |
| finally: | |
| self._cleanup() | |
| def _handle_signal(self, signum: int, frame) -> None: | |
| logging.info(f"Received signal {signum}, shutting down") | |
| self._stop_event.set() | |
| def _cleanup(self) -> None: | |
| logging.info("Cleaning up server") | |
| self.cache.clear() | |
| if self._server_socket: | |
| self._server_socket.close() | |
| if os.path.exists(self.socket_path): | |
| os.unlink(self.socket_path) | |
| def _handle_client(self, conn: socket.socket) -> None: | |
| try: | |
| data = conn.recv(4096).decode("utf-8").split("\n", 1)[0] | |
| if not data: | |
| self._send_response(conn, ["error", "Empty request"]) | |
| return | |
| try: | |
| request = json.loads(data) | |
| response = self.handler.handle(request) | |
| except json.JSONDecodeError: | |
| response = ["error", "Invalid JSON"] | |
| except Exception as e: | |
| response = ["error", str(e)] | |
| self._send_response(conn, response) | |
| except Exception as e: | |
| logging.error(f"Client error: {e}") | |
| self._send_response(conn, ["error", str(e)]) | |
| finally: | |
| conn.close() | |
| def _send_response(self, conn: socket.socket, response: List) -> None: | |
| conn.sendall((json.dumps(response, separators=(",", ":")) + "\n").encode("utf-8")) | |
| # ---------------- Client Functions ---------------- | |
| def client_request(request: List, socket_path: str) -> List: | |
| with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s: | |
| s.connect(socket_path) | |
| s.sendall((json.dumps(request) + "\n").encode("utf-8")) | |
| data = s.recv(4096).decode("utf-8").split("\n", 1)[0] | |
| return json.loads(data) | |
| def start_daemon(socket_path: str, ttl: int, verbose: bool) -> None: | |
| # Clear any threading state that might interfere with fork() | |
| try: | |
| # Force garbage collection before fork to clean up any threading locks | |
| import gc | |
| gc.collect() | |
| except Exception: | |
| pass | |
| if verbose: | |
| # In verbose mode, log daemon output to a temp file | |
| log_file_path = f"/tmp/vault_secretd_daemon_{os.getpid()}.log" | |
| try: | |
| pid = os.fork() | |
| except OSError as e: | |
| logging.error(f"Fork failed: {e}") | |
| raise RuntimeError(f"Cannot start daemon: fork failed ({e})") | |
| if pid > 0: | |
| # Parent waits a bit then prints daemon log | |
| time.sleep(1.0) | |
| try: | |
| if os.path.exists(log_file_path): | |
| with open(log_file_path, 'r') as f: | |
| daemon_output = f.read().strip() | |
| if daemon_output: | |
| print(f"DAEMON OUTPUT:\n{daemon_output}", file=sys.stderr) | |
| os.unlink(log_file_path) | |
| except Exception: | |
| pass | |
| return | |
| # Child process - first fork | |
| try: | |
| os.setsid() | |
| pid = os.fork() | |
| if pid > 0: | |
| os._exit(0) | |
| except OSError as e: | |
| try: | |
| with open(log_file_path, 'w') as f: | |
| f.write(f"Second fork failed: {e}\n") | |
| except: | |
| pass | |
| os._exit(1) | |
| # Grandchild process - actual daemon | |
| os.umask(0o077) | |
| try: | |
| os.chdir("/") | |
| # Redirect all output to log file in verbose mode | |
| with open(log_file_path, 'w') as log_file: | |
| os.dup2(log_file.fileno(), 1) # stdout to log file | |
| os.dup2(log_file.fileno(), 2) # stderr to log file | |
| with open("/dev/null", "r") as devnull: | |
| os.dup2(devnull.fileno(), 0) # stdin to /dev/null | |
| except Exception as e: | |
| try: | |
| with open(log_file_path, 'a') as f: | |
| f.write(f"Daemon setup failed: {e}\n") | |
| except: | |
| pass | |
| os._exit(1) | |
| try: | |
| # Reinitialize logging for the daemon process | |
| setup_logging(verbose) | |
| logging.info(f"Starting daemon with socket {socket_path} and TTL {ttl}") | |
| SecretServer(socket_path, ttl, verbose).start() | |
| except Exception as e: | |
| logging.error(f"Server start failed: {e}") | |
| os._exit(1) | |
| os._exit(0) | |
| else: | |
| # Standard daemon behavior for non-verbose mode | |
| try: | |
| pid = os.fork() | |
| except OSError as e: | |
| raise RuntimeError(f"Cannot start daemon: fork failed ({e})") | |
| if pid > 0: | |
| return | |
| os.setsid() | |
| pid = os.fork() | |
| if pid > 0: | |
| os._exit(0) | |
| os.umask(0o077) | |
| try: | |
| os.chdir("/") | |
| with open("/dev/null", "r+") as devnull: | |
| for fd in (0, 1, 2): | |
| os.dup2(devnull.fileno(), fd) | |
| except Exception as e: | |
| os._exit(1) | |
| try: | |
| SecretServer(socket_path, ttl, verbose).start() | |
| except Exception as e: | |
| os._exit(1) | |
| os._exit(0) | |
| def ensure_daemon(socket_path: str, ttl: int, verbose: bool) -> None: | |
| try: | |
| client_request(["ping"], socket_path) | |
| if verbose: | |
| logging.info("Daemon already running") | |
| return | |
| except Exception as e: | |
| if verbose: | |
| logging.info(f"Daemon not running ({e}), starting new daemon") | |
| start_daemon(socket_path, ttl, verbose) | |
| deadline = time.time() + 10.0 # Increased timeout to 10 seconds | |
| last_error = None | |
| while time.time() < deadline: | |
| try: | |
| client_request(["ping"], socket_path) | |
| if verbose: | |
| logging.info("Daemon started successfully") | |
| return | |
| except Exception as e: | |
| last_error = e | |
| time.sleep(0.2) # Slightly longer sleep between attempts | |
| error_msg = f"Failed to start daemon after 10 seconds" | |
| if last_error: | |
| error_msg += f". Last error: {last_error}" | |
| if verbose: | |
| logging.error(error_msg) | |
| raise RuntimeError(error_msg) | |
| # ---------------- CLI Commands ---------------- | |
| def cmd_serve(args: argparse.Namespace) -> None: | |
| setup_logging(args.verbose) | |
| SecretServer(args.socket, args.ttl_seconds, args.verbose).start() | |
| def cmd_get(args: argparse.Namespace) -> None: | |
| setup_logging(args.verbose) | |
| ensure_daemon(args.socket, DEFAULT_TTL, args.verbose) | |
| # First try to get cached secret from daemon | |
| response = client_request(["get"], args.socket) | |
| if response[0] == "ok": | |
| print(response[1], end="") | |
| return | |
| # If no cached secret and prompting is disabled, fail | |
| if args.no_prompt: | |
| print(response[1], file=sys.stderr) | |
| sys.exit(1) | |
| # Client-side prompting and storing | |
| if not BW_ITEM: | |
| print("VAULT_SECRETD_BW_ITEM must be set", file=sys.stderr) | |
| sys.exit(1) | |
| try: | |
| logging.info("Prompting for Bitwarden master password...") | |
| master_password = prompt_bitwarden_master() | |
| if not master_password: | |
| print("No Bitwarden master password provided", file=sys.stderr) | |
| sys.exit(1) | |
| logging.info("Unlocking Bitwarden vault...") | |
| token = get_bitwarden_token(master_password) | |
| if not token: | |
| print("Bitwarden unlock failed", file=sys.stderr) | |
| sys.exit(1) | |
| logging.info(f"Retrieving vault password from Bitwarden item '{BW_ITEM}'...") | |
| vault_secret = get_vault_password(token) | |
| # Store the secret in daemon | |
| store_response = client_request(["store", vault_secret, args.ttl_seconds], args.socket) | |
| if store_response[0] != "ok": | |
| print(f"Failed to store secret: {store_response[1]}", file=sys.stderr) | |
| sys.exit(1) | |
| # Return the secret | |
| print(vault_secret, end="") | |
| except Exception as e: | |
| print(f"Failed to acquire secret: {e}", file=sys.stderr) | |
| sys.exit(1) | |
| def cmd_clear(args: argparse.Namespace) -> None: | |
| setup_logging(args.verbose) | |
| ensure_daemon(args.socket, DEFAULT_TTL, args.verbose) | |
| response = client_request(["clear"], args.socket) | |
| if response[0] != "ok": | |
| print(response[1], file=sys.stderr) | |
| sys.exit(1) | |
| def cmd_store(args: argparse.Namespace) -> None: | |
| setup_logging(args.verbose) | |
| ensure_daemon(args.socket, DEFAULT_TTL, args.verbose) | |
| # Read secret from stdin | |
| if args.secret: | |
| secret = args.secret | |
| else: | |
| secret = sys.stdin.read().strip() | |
| if not secret: | |
| print("No secret provided", file=sys.stderr) | |
| sys.exit(1) | |
| response = client_request(["store", secret, args.ttl_seconds], args.socket) | |
| if response[0] != "ok": | |
| print(response[1], file=sys.stderr) | |
| sys.exit(1) | |
| if args.verbose: | |
| print("Secret stored successfully") | |
| def cmd_status(args: argparse.Namespace) -> None: | |
| setup_logging(args.verbose) | |
| ensure_daemon(args.socket, DEFAULT_TTL, args.verbose) | |
| response = client_request(["status"], args.socket) | |
| if response[0] != "ok": | |
| print(response[1], file=sys.stderr) | |
| sys.exit(1) | |
| present, ttl, remaining = response[1:4] | |
| print(f"present={present} ttl={ttl}s remaining={remaining}s") | |
| # ---------------- Main ---------------- | |
| def main() -> None: | |
| parser = argparse.ArgumentParser(description="Ansible Vault secret daemon") | |
| parser.add_argument("--verbose", action="store_true", help="Enable verbose logging") | |
| parser.add_argument("--socket", default=SOCKET_PATH or get_default_socket_path(), help="Socket path") | |
| subparsers = parser.add_subparsers(dest="command", required=True) | |
| serve_parser = subparsers.add_parser("serve", help="Run daemon in foreground") | |
| serve_parser.add_argument("--ttl-seconds", type=int, default=DEFAULT_TTL, help="Secret TTL in seconds") | |
| get_parser = subparsers.add_parser("get", help="Get secret") | |
| get_parser.add_argument("--no-prompt", action="store_true", help="Disable prompting") | |
| get_parser.add_argument("--ttl-seconds", type=int, help="Override TTL in seconds") | |
| store_parser = subparsers.add_parser("store", help="Store secret") | |
| store_parser.add_argument("--secret", help="Secret to store (reads from stdin if not provided)") | |
| store_parser.add_argument("--ttl-seconds", type=int, help="Override TTL in seconds") | |
| subparsers.add_parser("clear", help="Clear cached secret") | |
| subparsers.add_parser("status", help="Show cache status") | |
| args = parser.parse_args() | |
| commands = { | |
| "serve": cmd_serve, | |
| "get": cmd_get, | |
| "store": cmd_store, | |
| "clear": cmd_clear, | |
| "status": cmd_status, | |
| } | |
| commands[args.command](args) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment