Skip to content

Instantly share code, notes, and snippets.

@IlyaGulya
Last active August 10, 2025 15:31
Show Gist options
  • Select an option

  • Save IlyaGulya/17f881adc06e7d80e26bdea774f0f13c to your computer and use it in GitHub Desktop.

Select an option

Save IlyaGulya/17f881adc06e7d80e26bdea774f0f13c to your computer and use it in GitHub Desktop.
Bitwarden Ansible vault password daemon
#!/bin/bash
# Set required Bitwarden item name
export VAULT_SECRETD_BW_ITEM="Ansible vault key" # CHANGE THIS VALUE TO NAME OF YOUR BITWARDEN PASSWORD ITEM
# Ansible Vault password script using vault_secretd.py daemon
set -euo pipefail
die(){ echo "ERROR: $*" >&2; exit 1; }
# Get the directory where this script lives
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
VAULT_SECRETD="$SCRIPT_DIR/vault_secretd.py"
# Check if vault_secretd.py exists and is executable
if [[ ! -f "$VAULT_SECRETD" ]]; then
die "vault_secretd.py not found at $VAULT_SECRETD"
fi
if [[ ! -x "$VAULT_SECRETD" ]]; then
chmod +x "$VAULT_SECRETD" 2>/dev/null || die "vault_secretd.py is not executable"
fi
# Handle command line arguments
verbose_flag=""
if [[ $# -gt 0 && "$1" == "--verbose" ]]; then
verbose_flag="--verbose"
shift
fi
if [[ $# -gt 0 && "$1" == "--clear-cache" ]]; then
"$VAULT_SECRETD" $verbose_flag clear 2>/dev/null || true
exit 0
fi
# Try to get password from environment first
if [[ -n "${ANSIBLE_VAULT_KEY:-}" ]]; then
printf '%s' "$ANSIBLE_VAULT_KEY"
exit 0
fi
# Get the password from vault_secretd.py
if vault_password="$("$VAULT_SECRETD" $verbose_flag get)"; then
if [[ -n "$vault_password" ]]; then
printf '%s' "$vault_password"
exit 0
fi
fi
die "Could not get vault password"
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
vault_secretd.py — Memory-only secret daemon with GUI/TTY prompt and TTL
- Uses stdlib (tkinter for GUI) and Bitwarden CLI (`bw`) for secret retrieval.
- Caches Ansible Vault password in-memory with configurable TTL.
- Prompts for Bitwarden master password via TTY or GUI (Tkinter).
- Socket: ~/Library/Caches/ansible-vault-secretd/secretd.sock
- API (newline-delimited JSON arrays):
["ping"] -> ["ok"] or ["error", "message"]
["status"] -> ["ok", present, ttl_seconds, seconds_remaining]
["clear"] -> ["ok"] or ["error", "message"]
["get", allow_prompt, ttl_seconds] -> ["ok", secret] or ["error", "message"]
- CLI:
vault_secretd.py get [--no-prompt] [--ttl-seconds N] [--verbose]
vault_secretd.py clear [--verbose]
vault_secretd.py status [--verbose]
vault_secretd.py serve [--ttl-seconds N] [--socket PATH] [--verbose]
- Environment:
SECRET_TTL_SECONDS (default: 3600)
VAULT_SECRETD_SOCKET (override socket path)
VAULT_SECRETD_PROMPT_TITLE (GUI title, default: "Bitwarden")
VAULT_SECRETD_PROMPT_TEXT (GUI/TTY text, default: "Enter Bitwarden Master Password:")
VAULT_SECRETD_BW (Bitwarden CLI path, default: "bw")
VAULT_SECRETD_BW_ITEM (required: item name or ID)
VAULT_SECRETD_BW_FIELD (field to read: password|notes|<custom-field>, default: "password")
"""
import argparse
import json
import logging
import os
import signal
import socket
import subprocess
import sys
import threading
import time
from typing import List, Optional, Union, Tuple
# ---------------- Configuration ----------------
DEFAULT_TTL = int(os.environ.get("SECRET_TTL_SECONDS", "3600"))
DEFAULT_TITLE = os.environ.get("VAULT_SECRETD_PROMPT_TITLE", "Bitwarden")
DEFAULT_TEXT = os.environ.get("VAULT_SECRETD_PROMPT_TEXT", "Enter Bitwarden Master Password:")
BW_CMD = os.environ.get("VAULT_SECRETD_BW", "bw")
BW_ITEM = os.environ.get("VAULT_SECRETD_BW_ITEM", "").strip()
BW_FIELD = os.environ.get("VAULT_SECRETD_BW_FIELD", "password").strip() or "password"
SOCKET_PATH = os.environ.get("VAULT_SECRETD_SOCKET", None)
def get_default_socket_path() -> str:
base = os.path.join(os.path.expanduser("~"), "Library", "Caches", "ansible-vault-secretd")
os.makedirs(base, exist_ok=True)
os.chmod(base, 0o700)
return os.path.join(base, "secretd.sock")
# ---------------- Logging Setup ----------------
def setup_logging(verbose: bool) -> None:
if verbose:
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler(sys.stderr)]
)
else:
# Quiet by default - suppress all logging output
logging.basicConfig(level=logging.CRITICAL + 1)
# ---------------- Secret Cache ----------------
class SecretCache:
def __init__(self, ttl_seconds: int):
self._lock = threading.RLock()
self._secret: bytes = b""
self._expires_at: float = 0.0
self._ttl: int = ttl_seconds
self._timer: Optional[threading.Timer] = None
def _schedule_expiry(self) -> None:
if self._timer:
self._timer.cancel()
remaining = max(0.0, self._expires_at - time.monotonic())
if remaining <= 0:
self.clear()
return
self._timer = threading.Timer(remaining, self.clear)
self._timer.daemon = True
self._timer.start()
def set(self, secret: bytes, ttl_seconds: Optional[int] = None) -> None:
with self._lock:
self._secret = secret[:]
self._ttl = ttl_seconds if ttl_seconds is not None else self._ttl
self._expires_at = time.monotonic() + self._ttl
self._schedule_expiry()
def get(self) -> bytes:
with self._lock:
if not self._secret or time.monotonic() >= self._expires_at:
self.clear()
return b""
return self._secret[:]
def clear(self) -> None:
with self._lock:
self._secret = b""
self._expires_at = 0.0
if self._timer:
self._timer.cancel()
self._timer = None
def status(self) -> Tuple[bool, int, int]:
with self._lock:
remaining = max(0.0, self._expires_at - time.monotonic()) if self._secret else 0.0
return (bool(self._secret), self._ttl, int(remaining))
# ---------------- Bitwarden Prompt ----------------
def prompt_tty(prompt: str) -> str:
if not os.path.exists("/dev/tty"):
logging.debug("/dev/tty not available")
return ""
try:
import getpass
# Use getpass for secure password input
return getpass.getpass(prompt)
except (KeyboardInterrupt, EOFError):
logging.debug("TTY prompt cancelled by user")
return ""
except Exception as e:
logging.debug(f"TTY prompt failed: {e}")
return ""
def prompt_gui(title, text):
try:
import tkinter as tk
from tkinter import ttk
root = tk.Tk()
root.withdraw()
dialog = tk.Toplevel(root)
dialog.title(title)
dialog.resizable(False, False)
dialog.withdraw()
dialog.transient(root)
frame = ttk.Frame(dialog, padding="20")
frame.grid(row=0, column=0, sticky="ew")
label = ttk.Label(frame, text=text)
label.grid(row=0, column=0, columnspan=2, pady=(0, 10))
password_var = tk.StringVar()
entry = ttk.Entry(frame, textvariable=password_var, show="*", width=30)
entry.grid(row=1, column=0, columnspan=2, pady=(0, 10))
result = [None]
def close_with(value):
result[0] = value
try:
dialog.grab_release()
except Exception:
pass
dialog.withdraw()
dialog.update_idletasks()
dialog.destroy()
def on_ok():
close_with(password_var.get())
def on_cancel():
close_with(None)
def on_enter(event):
on_ok()
button_frame = ttk.Frame(frame)
button_frame.grid(row=2, column=0, columnspan=2)
ok_button = ttk.Button(button_frame, text="OK", command=on_ok)
ok_button.grid(row=0, column=0, padx=(0, 5))
cancel_button = ttk.Button(button_frame, text="Cancel", command=on_cancel)
cancel_button.grid(row=0, column=1, padx=(5, 0))
entry.bind("<Return>", on_enter)
dialog.bind("<Return>", on_enter)
dialog.protocol("WM_DELETE_WINDOW", on_cancel)
dialog.update_idletasks()
width = dialog.winfo_reqwidth()
height = dialog.winfo_reqheight()
screen_width = dialog.winfo_screenwidth()
screen_height = dialog.winfo_screenheight()
x = (screen_width - width) // 2
y = (screen_height - height) // 2
dialog.geometry(f"{width}x{height}+{x}+{y}")
dialog.deiconify()
dialog.grab_set()
dialog.focus_set()
entry.focus_set()
dialog.wait_window()
try:
root.update()
except Exception:
pass
root.destroy()
return result[0] if result[0] is not None else ""
except Exception as e:
logging.debug(f"GUI prompt failed: {e}")
return ""
def prompt_bitwarden_master() -> str:
logging.debug("Attempting to prompt for Bitwarden master password...")
# First try GUI
logging.debug("Trying GUI prompt first")
result = prompt_gui(DEFAULT_TITLE, DEFAULT_TEXT)
if result:
logging.debug("GUI prompt succeeded")
return result
logging.debug("GUI prompt failed or empty")
# Fallback to TTY if available
if os.isatty(sys.stdin.fileno()) and os.isatty(sys.stderr.fileno()):
logging.debug("GUI failed, trying TTY prompt as fallback")
result = prompt_tty(f"{DEFAULT_TEXT} ")
if result:
logging.debug("TTY prompt succeeded")
return result
logging.debug("TTY prompt failed or empty")
else:
logging.debug("No TTY available for fallback")
return ""
# ---------------- Bitwarden Integration ----------------
def run_bw(args: List[str], input_text: Optional[str] = None, session_token: Optional[str] = None) -> str:
env = os.environ.copy()
if session_token is not None:
env["BW_SESSION"] = session_token
else:
env["BW_SESSION"] = ""
cmd = [BW_CMD] + args
logging.debug(f"Running Bitwarden command: {' '.join(cmd)} (session: {'***' if session_token else 'none'})")
try:
result = subprocess.run(
cmd,
input=input_text,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
check=True,
timeout=30 # Add 30 second timeout
)
logging.debug(f"Bitwarden command succeeded, output length: {len(result.stdout)}")
return result.stdout.strip()
except FileNotFoundError:
raise RuntimeError(f"Bitwarden CLI '{BW_CMD}' not found")
except subprocess.TimeoutExpired:
raise RuntimeError(f"Bitwarden command timed out after 30 seconds: {' '.join(cmd)}")
except subprocess.CalledProcessError as e:
logging.debug(f"Bitwarden command failed with exit code {e.returncode}")
logging.debug(f"Bitwarden stderr: {e.stderr}")
raise RuntimeError(f"Bitwarden command failed: {e.stderr}")
def get_bitwarden_token(master_password: str) -> str:
return run_bw(["unlock", "--raw"], input_text=master_password + "\n")
def get_vault_password(token: str) -> str:
if not BW_ITEM:
raise RuntimeError("VAULT_SECRETD_BW_ITEM is not set")
if BW_FIELD == "password":
return run_bw(["get", "password", BW_ITEM], session_token=token)
item_json = run_bw(["get", "item", BW_ITEM], session_token=token)
try:
item = json.loads(item_json)
except json.JSONDecodeError:
raise RuntimeError("Bitwarden returned invalid JSON")
if BW_FIELD == "notes":
value = item.get("notes", "")
if not isinstance(value, str):
raise RuntimeError("Item notes is missing or not a string")
return value
for field in item.get("fields", []):
if field.get("name") == BW_FIELD:
value = field.get("value", "")
if not isinstance(value, str):
raise RuntimeError(f"Custom field '{BW_FIELD}' is not a string")
return value
raise RuntimeError(f"Custom field '{BW_FIELD}' not found")
def acquire_vault_secret() -> str:
if not BW_ITEM:
raise RuntimeError("VAULT_SECRETD_BW_ITEM must be set")
logging.info("Prompting for Bitwarden master password...")
master_password = prompt_bitwarden_master()
if not master_password:
raise RuntimeError("No Bitwarden master password provided (prompt cancelled or failed)")
logging.info("Unlocking Bitwarden vault...")
try:
token = get_bitwarden_token(master_password)
except Exception as e:
raise RuntimeError(f"Bitwarden unlock failed: {e}")
if not token:
raise RuntimeError("Bitwarden unlock failed: empty token")
logging.info(f"Retrieving vault password from Bitwarden item '{BW_ITEM}'...")
try:
return get_vault_password(token)
except Exception as e:
raise RuntimeError(f"Failed to retrieve vault password: {e}")
# ---------------- Command Handler ----------------
class CommandHandler:
def __init__(self, cache: SecretCache):
self.cache = cache
def handle(self, request: List[Union[str, bool, int]]) -> List[Union[str, bool, int]]:
if not request or not isinstance(request, list) or not request[0]:
return ["error", "Invalid or empty request"]
cmd = request[0]
if cmd == "ping":
return ["ok"]
if cmd == "status":
present, ttl, remaining = self.cache.status()
return ["ok", present, ttl, remaining]
if cmd == "clear":
self.cache.clear()
return ["ok"]
if cmd == "get":
# Daemon only returns cached secrets, no prompting
secret = self.cache.get()
if secret:
return ["ok", secret.decode("utf-8")]
else:
return ["error", "No secret cached"]
if cmd == "store":
# Store a secret passed from the client
if len(request) < 2:
return ["error", "No secret provided to store"]
secret = request[1]
if isinstance(secret, str):
secret = secret.encode("utf-8")
ttl = int(request[2]) if len(request) > 2 and request[2] is not None else None
self.cache.set(secret, ttl)
return ["ok", "Secret stored"]
return ["error", f"Unknown command: {cmd}"]
# ---------------- Secret Server ----------------
class SecretServer:
def __init__(self, socket_path: str, ttl_seconds: int, verbose: bool = False):
self.socket_path = socket_path or get_default_socket_path()
self.cache = SecretCache(ttl_seconds)
self.handler = CommandHandler(self.cache)
self.verbose = verbose
self._server_socket: Optional[socket.socket] = None
self._stop_event = threading.Event()
def start(self) -> None:
logging.info(f"Starting server on {self.socket_path} with TTL {self.cache._ttl}s")
if os.path.exists(self.socket_path):
try:
test_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
test_socket.settimeout(1.0) # Quick timeout for testing
test_socket.connect(self.socket_path)
test_socket.close()
raise RuntimeError(f"Socket {self.socket_path} is already in use")
except (FileNotFoundError, ConnectionRefusedError, socket.timeout):
# Socket file exists but no server listening - remove stale socket
logging.info(f"Removing stale socket file {self.socket_path}")
os.unlink(self.socket_path)
except OSError as e:
# General connection error - likely stale socket
logging.info(f"Removing stale socket file {self.socket_path} (OSError: {e})")
os.unlink(self.socket_path)
os.makedirs(os.path.dirname(self.socket_path), exist_ok=True)
self._server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self._server_socket.bind(self.socket_path)
os.chmod(self.socket_path, 0o600)
self._server_socket.listen(10)
signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)
logging.info("Server started successfully")
try:
while not self._stop_event.is_set():
self._server_socket.settimeout(0.5)
try:
conn, _ = self._server_socket.accept()
threading.Thread(target=self._handle_client, args=(conn,), daemon=True).start()
except socket.timeout:
continue
finally:
self._cleanup()
def _handle_signal(self, signum: int, frame) -> None:
logging.info(f"Received signal {signum}, shutting down")
self._stop_event.set()
def _cleanup(self) -> None:
logging.info("Cleaning up server")
self.cache.clear()
if self._server_socket:
self._server_socket.close()
if os.path.exists(self.socket_path):
os.unlink(self.socket_path)
def _handle_client(self, conn: socket.socket) -> None:
try:
data = conn.recv(4096).decode("utf-8").split("\n", 1)[0]
if not data:
self._send_response(conn, ["error", "Empty request"])
return
try:
request = json.loads(data)
response = self.handler.handle(request)
except json.JSONDecodeError:
response = ["error", "Invalid JSON"]
except Exception as e:
response = ["error", str(e)]
self._send_response(conn, response)
except Exception as e:
logging.error(f"Client error: {e}")
self._send_response(conn, ["error", str(e)])
finally:
conn.close()
def _send_response(self, conn: socket.socket, response: List) -> None:
conn.sendall((json.dumps(response, separators=(",", ":")) + "\n").encode("utf-8"))
# ---------------- Client Functions ----------------
def client_request(request: List, socket_path: str) -> List:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
s.connect(socket_path)
s.sendall((json.dumps(request) + "\n").encode("utf-8"))
data = s.recv(4096).decode("utf-8").split("\n", 1)[0]
return json.loads(data)
def start_daemon(socket_path: str, ttl: int, verbose: bool) -> None:
# Clear any threading state that might interfere with fork()
try:
# Force garbage collection before fork to clean up any threading locks
import gc
gc.collect()
except Exception:
pass
if verbose:
# In verbose mode, log daemon output to a temp file
log_file_path = f"/tmp/vault_secretd_daemon_{os.getpid()}.log"
try:
pid = os.fork()
except OSError as e:
logging.error(f"Fork failed: {e}")
raise RuntimeError(f"Cannot start daemon: fork failed ({e})")
if pid > 0:
# Parent waits a bit then prints daemon log
time.sleep(1.0)
try:
if os.path.exists(log_file_path):
with open(log_file_path, 'r') as f:
daemon_output = f.read().strip()
if daemon_output:
print(f"DAEMON OUTPUT:\n{daemon_output}", file=sys.stderr)
os.unlink(log_file_path)
except Exception:
pass
return
# Child process - first fork
try:
os.setsid()
pid = os.fork()
if pid > 0:
os._exit(0)
except OSError as e:
try:
with open(log_file_path, 'w') as f:
f.write(f"Second fork failed: {e}\n")
except:
pass
os._exit(1)
# Grandchild process - actual daemon
os.umask(0o077)
try:
os.chdir("/")
# Redirect all output to log file in verbose mode
with open(log_file_path, 'w') as log_file:
os.dup2(log_file.fileno(), 1) # stdout to log file
os.dup2(log_file.fileno(), 2) # stderr to log file
with open("/dev/null", "r") as devnull:
os.dup2(devnull.fileno(), 0) # stdin to /dev/null
except Exception as e:
try:
with open(log_file_path, 'a') as f:
f.write(f"Daemon setup failed: {e}\n")
except:
pass
os._exit(1)
try:
# Reinitialize logging for the daemon process
setup_logging(verbose)
logging.info(f"Starting daemon with socket {socket_path} and TTL {ttl}")
SecretServer(socket_path, ttl, verbose).start()
except Exception as e:
logging.error(f"Server start failed: {e}")
os._exit(1)
os._exit(0)
else:
# Standard daemon behavior for non-verbose mode
try:
pid = os.fork()
except OSError as e:
raise RuntimeError(f"Cannot start daemon: fork failed ({e})")
if pid > 0:
return
os.setsid()
pid = os.fork()
if pid > 0:
os._exit(0)
os.umask(0o077)
try:
os.chdir("/")
with open("/dev/null", "r+") as devnull:
for fd in (0, 1, 2):
os.dup2(devnull.fileno(), fd)
except Exception as e:
os._exit(1)
try:
SecretServer(socket_path, ttl, verbose).start()
except Exception as e:
os._exit(1)
os._exit(0)
def ensure_daemon(socket_path: str, ttl: int, verbose: bool) -> None:
try:
client_request(["ping"], socket_path)
if verbose:
logging.info("Daemon already running")
return
except Exception as e:
if verbose:
logging.info(f"Daemon not running ({e}), starting new daemon")
start_daemon(socket_path, ttl, verbose)
deadline = time.time() + 10.0 # Increased timeout to 10 seconds
last_error = None
while time.time() < deadline:
try:
client_request(["ping"], socket_path)
if verbose:
logging.info("Daemon started successfully")
return
except Exception as e:
last_error = e
time.sleep(0.2) # Slightly longer sleep between attempts
error_msg = f"Failed to start daemon after 10 seconds"
if last_error:
error_msg += f". Last error: {last_error}"
if verbose:
logging.error(error_msg)
raise RuntimeError(error_msg)
# ---------------- CLI Commands ----------------
def cmd_serve(args: argparse.Namespace) -> None:
setup_logging(args.verbose)
SecretServer(args.socket, args.ttl_seconds, args.verbose).start()
def cmd_get(args: argparse.Namespace) -> None:
setup_logging(args.verbose)
ensure_daemon(args.socket, DEFAULT_TTL, args.verbose)
# First try to get cached secret from daemon
response = client_request(["get"], args.socket)
if response[0] == "ok":
print(response[1], end="")
return
# If no cached secret and prompting is disabled, fail
if args.no_prompt:
print(response[1], file=sys.stderr)
sys.exit(1)
# Client-side prompting and storing
if not BW_ITEM:
print("VAULT_SECRETD_BW_ITEM must be set", file=sys.stderr)
sys.exit(1)
try:
logging.info("Prompting for Bitwarden master password...")
master_password = prompt_bitwarden_master()
if not master_password:
print("No Bitwarden master password provided", file=sys.stderr)
sys.exit(1)
logging.info("Unlocking Bitwarden vault...")
token = get_bitwarden_token(master_password)
if not token:
print("Bitwarden unlock failed", file=sys.stderr)
sys.exit(1)
logging.info(f"Retrieving vault password from Bitwarden item '{BW_ITEM}'...")
vault_secret = get_vault_password(token)
# Store the secret in daemon
store_response = client_request(["store", vault_secret, args.ttl_seconds], args.socket)
if store_response[0] != "ok":
print(f"Failed to store secret: {store_response[1]}", file=sys.stderr)
sys.exit(1)
# Return the secret
print(vault_secret, end="")
except Exception as e:
print(f"Failed to acquire secret: {e}", file=sys.stderr)
sys.exit(1)
def cmd_clear(args: argparse.Namespace) -> None:
setup_logging(args.verbose)
ensure_daemon(args.socket, DEFAULT_TTL, args.verbose)
response = client_request(["clear"], args.socket)
if response[0] != "ok":
print(response[1], file=sys.stderr)
sys.exit(1)
def cmd_store(args: argparse.Namespace) -> None:
setup_logging(args.verbose)
ensure_daemon(args.socket, DEFAULT_TTL, args.verbose)
# Read secret from stdin
if args.secret:
secret = args.secret
else:
secret = sys.stdin.read().strip()
if not secret:
print("No secret provided", file=sys.stderr)
sys.exit(1)
response = client_request(["store", secret, args.ttl_seconds], args.socket)
if response[0] != "ok":
print(response[1], file=sys.stderr)
sys.exit(1)
if args.verbose:
print("Secret stored successfully")
def cmd_status(args: argparse.Namespace) -> None:
setup_logging(args.verbose)
ensure_daemon(args.socket, DEFAULT_TTL, args.verbose)
response = client_request(["status"], args.socket)
if response[0] != "ok":
print(response[1], file=sys.stderr)
sys.exit(1)
present, ttl, remaining = response[1:4]
print(f"present={present} ttl={ttl}s remaining={remaining}s")
# ---------------- Main ----------------
def main() -> None:
parser = argparse.ArgumentParser(description="Ansible Vault secret daemon")
parser.add_argument("--verbose", action="store_true", help="Enable verbose logging")
parser.add_argument("--socket", default=SOCKET_PATH or get_default_socket_path(), help="Socket path")
subparsers = parser.add_subparsers(dest="command", required=True)
serve_parser = subparsers.add_parser("serve", help="Run daemon in foreground")
serve_parser.add_argument("--ttl-seconds", type=int, default=DEFAULT_TTL, help="Secret TTL in seconds")
get_parser = subparsers.add_parser("get", help="Get secret")
get_parser.add_argument("--no-prompt", action="store_true", help="Disable prompting")
get_parser.add_argument("--ttl-seconds", type=int, help="Override TTL in seconds")
store_parser = subparsers.add_parser("store", help="Store secret")
store_parser.add_argument("--secret", help="Secret to store (reads from stdin if not provided)")
store_parser.add_argument("--ttl-seconds", type=int, help="Override TTL in seconds")
subparsers.add_parser("clear", help="Clear cached secret")
subparsers.add_parser("status", help="Show cache status")
args = parser.parse_args()
commands = {
"serve": cmd_serve,
"get": cmd_get,
"store": cmd_store,
"clear": cmd_clear,
"status": cmd_status,
}
commands[args.command](args)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment