|
#!/usr/bin/env python3 |
|
""" |
|
vault_audit.py — HCP Vault Comprehensive Audit Script |
|
====================================================== |
|
Collects and reports: |
|
• All secrets (names, metadata, last-accessed time) across all namespaces |
|
• All users/entities (aliases, policies, last-login time, originating IPs) |
|
|
|
"Last accessed" and "last login" require an audit log file (--audit-log). |
|
Without it, the script still collects all structural data from the Vault API. |
|
|
|
Requirements: |
|
pip install requests urllib3 |
|
pip install rich # optional, for nicer console output |
|
|
|
Usage: |
|
export VAULT_ADDR="https://your-cluster.hashicorp.cloud:8200" |
|
export VAULT_TOKEN="hvs.your-token" |
|
|
|
# Basic scan (API data only): |
|
python3 vault_audit.py |
|
|
|
# With audit log for last-access timestamps: |
|
python3 vault_audit.py --audit-log /var/log/vault/audit.log |
|
|
|
# Limit to one namespace and its children: |
|
python3 vault_audit.py --namespace admin |
|
|
|
# Full options: |
|
python3 vault_audit.py --help |
|
""" |
|
|
|
# ── Imports ─────────────────────────────────────────────────────────────────── |
|
import argparse |
|
import csv |
|
import json |
|
import logging |
|
import os |
|
import re |
|
import sys |
|
import time |
|
import threading |
|
from collections import defaultdict |
|
from concurrent.futures import ThreadPoolExecutor, as_completed |
|
from dataclasses import dataclass, field, asdict |
|
from datetime import datetime, timezone |
|
from pathlib import Path |
|
from typing import Dict, List, Optional, Tuple, Any |
|
|
|
try: |
|
import requests |
|
import urllib3 |
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) |
|
except ImportError: |
|
print("Error: 'requests' is required. Install with: pip install requests urllib3") |
|
sys.exit(1) |
|
|
|
try: |
|
from rich.console import Console as RichConsole |
|
from rich.table import Table as RichTable |
|
from rich.text import Text as RichText |
|
from rich import box as rich_box |
|
RICH = True |
|
except ImportError: |
|
RICH = False |
|
|
|
# ── Logging ─────────────────────────────────────────────────────────────────── |
|
log = logging.getLogger("vault_audit") |
|
|
|
|
|
# ── Data classes ───────────────────────────────────────────────────────────── |
|
|
|
@dataclass |
|
class NamespaceInfo: |
|
path: str # full path from root, e.g. "team-a/prod/" (root = "") |
|
ns_id: str = "" |
|
custom_metadata: Dict[str, str] = field(default_factory=dict) |
|
|
|
|
|
@dataclass |
|
class SecretVersionInfo: |
|
version: int |
|
created_time: Optional[str] = None |
|
deletion_time: Optional[str] = None |
|
destroyed: bool = False |
|
|
|
|
|
@dataclass |
|
class EntityAliasRecord: |
|
alias_id: str |
|
name: str |
|
mount_accessor: str |
|
mount_path: str |
|
mount_type: str |
|
custom_metadata: Dict[str, str] = field(default_factory=dict) |
|
creation_time: Optional[str] = None |
|
last_update_time: Optional[str] = None |
|
|
|
|
|
@dataclass |
|
class TokenProxyRecord: |
|
"""A live token that proxies as a last-login indicator when no audit log exists.""" |
|
accessor: str |
|
display_name: str |
|
auth_path: str # e.g. "auth/userpass/login/alice" |
|
policies: List[str] |
|
creation_time: Optional[str] = None # ISO8601 from epoch int |
|
expire_time: Optional[str] = None |
|
last_renewal_time: Optional[str] = None # ISO8601; updated on token renew |
|
issue_time: Optional[str] = None # ISO8601; same as creation for new tokens |
|
ttl: int = 0 |
|
meta: Dict[str, str] = field(default_factory=dict) |
|
token_type: str = "" |
|
renewable: bool = False |
|
num_uses: int = 0 |
|
|
|
|
|
@dataclass |
|
class SecretRecord: |
|
namespace_path: str |
|
mount_path: str |
|
secret_path: str # role name (aws/tf/kmip) or KV path |
|
engine_type: str = "kv" # kv, aws, terraform, kmip |
|
kv_version: Optional[int] = None # 1 or 2 for KV only |
|
# KV v2 metadata |
|
created_time: Optional[str] = None |
|
updated_time: Optional[str] = None |
|
current_version: Optional[int] = None |
|
oldest_version: Optional[int] = None |
|
max_versions: Optional[int] = None |
|
custom_metadata: Dict[str, Any] = field(default_factory=dict) |
|
versions: List[SecretVersionInfo] = field(default_factory=list) |
|
# Engine-specific raw config (aws role config, terraform role, kmip role ops…) |
|
engine_data: Dict[str, Any] = field(default_factory=dict) |
|
# Audit-log enrichment (None = no log provided or no match) |
|
last_accessed_time: Optional[str] = None |
|
last_accessed_by_entity_id: Optional[str] = None |
|
last_accessed_by_display_name: Optional[str] = None |
|
last_accessed_from_ip: Optional[str] = None |
|
last_accessed_operation: Optional[str] = None |
|
access_count: int = 0 |
|
access_history: List[Dict] = field(default_factory=list) |
|
# Error |
|
metadata_error: Optional[str] = None |
|
|
|
@property |
|
def full_path(self) -> str: |
|
ns = self.namespace_path.rstrip("/") |
|
return f"{ns}/{self.mount_path}/{self.secret_path}".lstrip("/") |
|
|
|
|
|
@dataclass |
|
class EntityRecord: |
|
namespace_path: str |
|
entity_id: str |
|
name: str |
|
disabled: bool = False |
|
policies: List[str] = field(default_factory=list) |
|
metadata: Dict[str, str] = field(default_factory=dict) |
|
creation_time: Optional[str] = None |
|
last_update_time: Optional[str] = None |
|
aliases: List[EntityAliasRecord] = field(default_factory=list) |
|
groups: List[str] = field(default_factory=list) # resolved group names |
|
group_ids: List[str] = field(default_factory=list) |
|
# Token proxy (best effort "last active" when no audit log) |
|
latest_token: Optional[TokenProxyRecord] = None |
|
# Auth-method specific info (from collect_auth_method_users) |
|
auth_method_extra: Dict[str, Any] = field(default_factory=dict) |
|
# Audit-log enrichment — last login (authentication event) |
|
last_login_time: Optional[str] = None |
|
last_login_from_ip: Optional[str] = None |
|
last_login_auth_method: Optional[str] = None |
|
last_login_auth_path: Optional[str] = None |
|
last_login_namespace: Optional[str] = None |
|
login_count: int = 0 |
|
login_history: List[Dict] = field(default_factory=list) |
|
# Audit-log enrichment — last activity (ANY request: read, list, create…) |
|
last_activity_time: Optional[str] = None |
|
last_activity_path: Optional[str] = None |
|
last_activity_ip: Optional[str] = None |
|
last_activity_operation: Optional[str] = None |
|
last_activity_mount_type: Optional[str] = None |
|
activity_count: int = 0 |
|
|
|
|
|
@dataclass |
|
class VaultAuditReport: |
|
generated_at: str |
|
vault_addr: str |
|
vault_version: str |
|
cluster_name: str |
|
namespaces: List[NamespaceInfo] = field(default_factory=list) |
|
secrets: List[SecretRecord] = field(default_factory=list) |
|
entities: List[EntityRecord] = field(default_factory=list) |
|
audit_log_path: Optional[str] = None |
|
audit_log_entries_parsed: int = 0 |
|
access_errors: List[Dict] = field(default_factory=list) |
|
stats: Dict = field(default_factory=dict) |
|
|
|
|
|
# ── Rate limiter ────────────────────────────────────────────────────────────── |
|
|
|
class TokenBucket: |
|
"""Simple thread-safe token bucket rate limiter.""" |
|
def __init__(self, rate: float = 50.0, burst: float = 50.0): |
|
self.rate = rate |
|
self.burst = burst |
|
self._tokens = burst |
|
self._last = time.monotonic() |
|
self._lock = threading.Lock() |
|
|
|
def acquire(self): |
|
with self._lock: |
|
now = time.monotonic() |
|
elapsed = now - self._last |
|
self._tokens = min(self.burst, self._tokens + elapsed * self.rate) |
|
self._last = now |
|
if self._tokens >= 1.0: |
|
self._tokens -= 1.0 |
|
return |
|
wait = (1.0 - self._tokens) / self.rate |
|
time.sleep(wait) |
|
with self._lock: |
|
self._tokens = max(0.0, self._tokens - 1.0) |
|
|
|
|
|
# ── Vault API client ────────────────────────────────────────────────────────── |
|
|
|
class VaultClient: |
|
def __init__( |
|
self, |
|
addr: str, |
|
token: str, |
|
tls_verify: bool = True, |
|
ca_cert: Optional[str] = None, # path to CA bundle (on-prem internal PKI) |
|
timeout: int = 15, |
|
rate_limiter: Optional[TokenBucket] = None, |
|
): |
|
self.addr = addr.rstrip("/") |
|
self.token = token |
|
# tls_verify can be False (skip), True (system CAs), or a CA bundle path |
|
self.tls_verify: Any = ca_cert if ca_cert else tls_verify |
|
self.timeout = timeout |
|
self.rate_limiter = rate_limiter or TokenBucket(rate=50) |
|
self._call_count = 0 |
|
self._lock = threading.Lock() |
|
# Use a session per thread via thread-local storage |
|
self._tls = threading.local() |
|
|
|
def _session(self) -> requests.Session: |
|
if not hasattr(self._tls, "session"): |
|
s = requests.Session() |
|
s.headers.update({ |
|
"X-Vault-Token": self.token, |
|
"Content-Type": "application/json", |
|
}) |
|
s.verify = self.tls_verify |
|
self._tls.session = s |
|
return self._tls.session |
|
|
|
def _request( |
|
self, |
|
method: str, |
|
path: str, |
|
namespace: str = "", |
|
max_retries: int = 4, |
|
**kwargs, |
|
) -> Optional[Dict]: |
|
url = f"{self.addr}/v1/{path.lstrip('/')}" |
|
headers = {} |
|
if namespace: |
|
headers["X-Vault-Namespace"] = namespace.strip("/") |
|
|
|
backoff = 1.0 |
|
for attempt in range(max_retries + 1): |
|
self.rate_limiter.acquire() |
|
with self._lock: |
|
self._call_count += 1 |
|
|
|
try: |
|
r = self._session().request( |
|
method, url, |
|
headers=headers, |
|
timeout=self.timeout, |
|
**kwargs, |
|
) |
|
except requests.exceptions.ConnectionError as exc: |
|
log.warning("Connection error [%s %s]: %s", method, url, exc) |
|
if attempt < max_retries: |
|
time.sleep(backoff) |
|
backoff = min(backoff * 2, 30) |
|
continue |
|
return {"__error__": "connection_error", "__detail__": str(exc)} |
|
except requests.exceptions.Timeout: |
|
log.warning("Timeout [%s %s]", method, url) |
|
if attempt < max_retries: |
|
time.sleep(backoff) |
|
backoff = min(backoff * 2, 30) |
|
continue |
|
return {"__error__": "timeout"} |
|
|
|
if r.status_code == 200: |
|
try: |
|
return r.json() |
|
except ValueError: |
|
return {} |
|
|
|
if r.status_code in (204, 205): |
|
return {} |
|
|
|
if r.status_code == 404: |
|
return None # not found — caller decides |
|
|
|
if r.status_code == 403: |
|
try: |
|
detail = r.json().get("errors", ["permission denied"]) |
|
except Exception: |
|
detail = ["permission denied"] |
|
return {"__error__": "permission_denied", "__detail__": detail} |
|
|
|
if r.status_code in (429, 500, 502, 503): |
|
if attempt < max_retries: |
|
jitter = 0.5 * backoff |
|
time.sleep(backoff + jitter) |
|
backoff = min(backoff * 2, 30) |
|
continue |
|
return {"__error__": f"http_{r.status_code}"} |
|
|
|
# Other 4xx/5xx |
|
try: |
|
errs = r.json().get("errors", [str(r.status_code)]) |
|
except Exception: |
|
errs = [str(r.status_code)] |
|
return {"__error__": f"http_{r.status_code}", "__detail__": errs} |
|
|
|
return {"__error__": "max_retries_exceeded"} |
|
|
|
def get(self, path: str, ns: str = "") -> Optional[Dict]: |
|
return self._request("GET", path, namespace=ns) |
|
|
|
def list(self, path: str, ns: str = "") -> Optional[Dict]: |
|
return self._request("LIST", path, namespace=ns) |
|
|
|
def post(self, path: str, ns: str = "", data: Optional[Dict] = None) -> Optional[Dict]: |
|
return self._request("POST", path, namespace=ns, json=data or {}) |
|
|
|
@property |
|
def call_count(self) -> int: |
|
return self._call_count |
|
|
|
# ── Convenience helpers ────────────────────────────────────────────── |
|
|
|
def health(self) -> Dict: |
|
r = self.get("sys/health") |
|
return r or {} |
|
|
|
def token_lookup_self(self) -> Dict: |
|
r = self.get("auth/token/lookup-self") |
|
return (r or {}).get("data", {}) |
|
|
|
def list_namespaces(self, ns: str = "") -> Dict: |
|
"""Returns key_info dict or empty dict.""" |
|
r = self.list("sys/namespaces", ns=ns) |
|
if r and "data" in r: |
|
return r["data"].get("key_info", {}) |
|
return {} |
|
|
|
def list_mounts(self, ns: str = "") -> Dict: |
|
r = self.get("sys/mounts", ns=ns) |
|
if not r or "__error__" in r: |
|
return {} |
|
data = r.get("data", r) |
|
return {k: v for k, v in data.items() if isinstance(v, dict) and "type" in v} |
|
|
|
def kv_list(self, mount: str, path: str, ns: str = "") -> List[str]: |
|
p = f"{mount}/{path}".rstrip("/") |
|
r = self.list(p, ns=ns) |
|
if r and "data" in r: |
|
return r["data"].get("keys", []) |
|
return [] |
|
|
|
def kv2_list_meta(self, mount: str, path: str, ns: str = "") -> List[str]: |
|
p = f"{mount}/metadata/{path}".rstrip("/") |
|
r = self.list(p, ns=ns) |
|
if r and "data" in r: |
|
return r["data"].get("keys", []) |
|
return [] |
|
|
|
def kv2_get_meta(self, mount: str, path: str, ns: str = "") -> Optional[Dict]: |
|
p = f"{mount}/metadata/{path}" |
|
r = self.get(p, ns=ns) |
|
if r and "__error__" in r: |
|
return r |
|
if r and "data" in r: |
|
return r["data"] |
|
return None |
|
|
|
def list_entity_ids(self, ns: str = "") -> List[str]: |
|
r = self.list("identity/entity/id", ns=ns) |
|
if r and "data" in r: |
|
return r["data"].get("keys", []) |
|
return [] |
|
|
|
def get_entity(self, eid: str, ns: str = "") -> Optional[Dict]: |
|
r = self.get(f"identity/entity/id/{eid}", ns=ns) |
|
if r and "data" in r: |
|
return r["data"] |
|
return None |
|
|
|
def list_group_ids(self, ns: str = "") -> List[str]: |
|
r = self.list("identity/group/id", ns=ns) |
|
if r and "data" in r: |
|
return r["data"].get("keys", []) |
|
return [] |
|
|
|
def get_group(self, gid: str, ns: str = "") -> Optional[Dict]: |
|
r = self.get(f"identity/group/id/{gid}", ns=ns) |
|
if r and "data" in r: |
|
return r["data"] |
|
return None |
|
|
|
def list_token_accessors(self, ns: str = "") -> List[str]: |
|
r = self.list("auth/token/accessors", ns=ns) |
|
if r and "data" in r: |
|
return r["data"].get("keys", []) |
|
return [] |
|
|
|
def lookup_accessor(self, accessor: str, ns: str = "") -> Optional[Dict]: |
|
r = self.post("auth/token/lookup-accessor", ns=ns, data={"accessor": accessor}) |
|
if r and "data" in r: |
|
return r["data"] |
|
return None |
|
|
|
def list_auth_methods(self, ns: str = "") -> Dict: |
|
r = self.get("sys/auth", ns=ns) |
|
if not r or "__error__" in r: |
|
return {} |
|
data = r.get("data", r) |
|
return {k: v for k, v in data.items() if isinstance(v, dict) and "type" in v} |
|
|
|
def list_keys(self, path: str, ns: str = "") -> List[str]: |
|
"""Generic LIST returning keys array.""" |
|
r = self.list(path, ns=ns) |
|
if r and "data" in r: |
|
return r["data"].get("keys", []) |
|
return [] |
|
|
|
def get_data(self, path: str, ns: str = "") -> Optional[Dict]: |
|
"""Generic GET returning data dict.""" |
|
r = self.get(path, ns=ns) |
|
if r and "data" in r: |
|
return r["data"] |
|
return None |
|
|
|
|
|
# ── Namespace collection ────────────────────────────────────────────────────── |
|
|
|
def collect_namespaces(client: VaultClient, start_ns: str = "") -> List[NamespaceInfo]: |
|
""" |
|
BFS traversal of all namespaces from start_ns (default root). |
|
|
|
Namespaces are a Vault Enterprise feature. On Vault OSS (Community Edition) |
|
the sys/namespaces endpoint returns 403 or 404. This is detected and logged |
|
as a warning; the script then continues scanning only the root namespace. |
|
""" |
|
result: List[NamespaceInfo] = [] |
|
queue = [start_ns] |
|
|
|
# Probe once to detect Vault OSS before iterating |
|
probe = client._request("LIST", "sys/namespaces", namespace=start_ns) |
|
if probe and probe.get("__error__") == "permission_denied": |
|
log.warning( |
|
"sys/namespaces returned 403 — this is normal for Vault OSS (Community " |
|
"Edition) which does not support namespaces. Scanning root namespace only.\n" |
|
"If you are running Vault Enterprise and expected namespaces, check that " |
|
"your token has 'list' capability on sys/namespaces." |
|
) |
|
return result # empty — caller will still scan root as "" |
|
if probe is None: |
|
log.warning( |
|
"sys/namespaces returned 404 — namespace feature not available. " |
|
"Scanning root namespace only." |
|
) |
|
return result |
|
|
|
while queue: |
|
parent = queue.pop(0) |
|
ns_label = parent or "(root)" |
|
log.debug("Listing namespaces under: %s", ns_label) |
|
|
|
key_info = client.list_namespaces(ns=parent) |
|
if not key_info: |
|
continue |
|
|
|
for child_key, child_data in key_info.items(): |
|
child_name = child_key.rstrip("/") |
|
full_path = f"{parent.rstrip('/')}/{child_name}".lstrip("/") + "/" |
|
result.append(NamespaceInfo( |
|
path=full_path, |
|
ns_id=(child_data or {}).get("id", ""), |
|
custom_metadata=(child_data or {}).get("custom_metadata") or {}, |
|
)) |
|
queue.append(full_path) |
|
|
|
return result |
|
|
|
|
|
# ── Secret collection ───────────────────────────────────────────────────────── |
|
|
|
def _list_kv1_recursive( |
|
client: VaultClient, mount: str, path: str, ns: str, max_depth: int, depth: int = 0 |
|
) -> List[str]: |
|
if depth > max_depth: |
|
return [] |
|
keys = client.kv_list(mount, path, ns=ns) |
|
secrets = [] |
|
for k in keys: |
|
full = f"{path}/{k}".lstrip("/") |
|
if k.endswith("/"): |
|
secrets.extend(_list_kv1_recursive(client, mount, full, ns, max_depth, depth + 1)) |
|
else: |
|
secrets.append(full) |
|
return secrets |
|
|
|
|
|
def _list_kv2_recursive( |
|
client: VaultClient, mount: str, path: str, ns: str, max_depth: int, depth: int = 0 |
|
) -> List[str]: |
|
if depth > max_depth: |
|
return [] |
|
keys = client.kv2_list_meta(mount, path, ns=ns) |
|
secrets = [] |
|
for k in keys: |
|
full = f"{path}/{k}".lstrip("/") |
|
if k.endswith("/"): |
|
secrets.extend(_list_kv2_recursive(client, mount, full, ns, max_depth, depth + 1)) |
|
else: |
|
secrets.append(full) |
|
return secrets |
|
|
|
|
|
SCANNED_ENGINE_TYPES = {"kv", "aws", "terraform", "kmip"} |
|
|
|
|
|
def collect_secrets( |
|
client: VaultClient, |
|
all_namespaces: List[NamespaceInfo], |
|
max_depth: int = 12, |
|
workers: int = 10, |
|
access_errors: List[Dict] = None, |
|
) -> List[SecretRecord]: |
|
"""Collect secrets/roles from KV v1, KV v2, AWS, Terraform, and KMIP engines.""" |
|
if access_errors is None: |
|
access_errors = [] |
|
|
|
scan_jobs = [] |
|
namespaces_to_scan = [NamespaceInfo(path="")] + all_namespaces |
|
|
|
for ns_info in namespaces_to_scan: |
|
ns = ns_info.path.rstrip("/") |
|
mounts = client.list_mounts(ns=ns) |
|
for mount_raw, mount_info in mounts.items(): |
|
if not isinstance(mount_info, dict): |
|
continue |
|
mtype = mount_info.get("type", "") |
|
if mtype not in SCANNED_ENGINE_TYPES: |
|
continue |
|
opts = mount_info.get("options") or {} |
|
kv_ver = int(opts.get("version", "1")) if mtype == "kv" else 0 |
|
mount = mount_raw.rstrip("/") |
|
scan_jobs.append((ns, mount, mtype, kv_ver, mount_info)) |
|
|
|
log.info("Found %d secret engine mount(s) to scan across %d namespace(s) " |
|
"(types: kv, aws, terraform, kmip)", |
|
len(scan_jobs), len(namespaces_to_scan)) |
|
|
|
all_secrets: List[SecretRecord] = [] |
|
lock = threading.Lock() |
|
|
|
def scan_mount(args): |
|
ns, mount, mtype, kv_ver, mount_info = args |
|
ns_label = ns + "/" if ns else "(root)" |
|
log.debug("Scanning mount: [%s] %s (%s)", ns_label, mount, mtype) |
|
records = [] |
|
|
|
# ── KV v2 ─────────────────────────────────────────────────────── |
|
if mtype == "kv" and kv_ver == 2: |
|
paths = _list_kv2_recursive(client, mount, "", ns, max_depth) |
|
for p in paths: |
|
rec = SecretRecord( |
|
namespace_path=ns_label, |
|
mount_path=mount, |
|
secret_path=p, |
|
engine_type="kv", |
|
kv_version=2, |
|
) |
|
meta = client.kv2_get_meta(mount, p, ns=ns) |
|
if meta and "__error__" in meta: |
|
rec.metadata_error = str(meta.get("__detail__", meta["__error__"])) |
|
with lock: |
|
access_errors.append({ |
|
"namespace": ns_label, "path": f"{mount}/metadata/{p}", |
|
"error": rec.metadata_error, |
|
}) |
|
elif meta: |
|
rec.created_time = meta.get("created_time") |
|
rec.updated_time = meta.get("updated_time") |
|
rec.current_version = meta.get("current_version") |
|
rec.oldest_version = meta.get("oldest_version") |
|
rec.max_versions = meta.get("max_versions") |
|
rec.custom_metadata = meta.get("custom_metadata") or {} |
|
for ver_num, ver_data in (meta.get("versions") or {}).items(): |
|
rec.versions.append(SecretVersionInfo( |
|
version=int(ver_num), |
|
created_time=ver_data.get("created_time"), |
|
deletion_time=ver_data.get("deletion_time"), |
|
destroyed=ver_data.get("destroyed", False), |
|
)) |
|
rec.versions.sort(key=lambda v: v.version) |
|
records.append(rec) |
|
|
|
# ── KV v1 ─────────────────────────────────────────────────────── |
|
elif mtype == "kv": |
|
paths = _list_kv1_recursive(client, mount, "", ns, max_depth) |
|
for p in paths: |
|
records.append(SecretRecord( |
|
namespace_path=ns_label, |
|
mount_path=mount, |
|
secret_path=p, |
|
engine_type="kv", |
|
kv_version=1, |
|
)) |
|
|
|
# ── AWS secrets engine ─────────────────────────────────────────── |
|
# Lists configured roles. In audit logs, credential access appears as |
|
# GET /{mount}/creds/{role} or GET /{mount}/sts/{role}. |
|
elif mtype == "aws": |
|
roles = client.list_keys(f"{mount}/roles", ns=ns) |
|
for role in roles: |
|
rdata = client.get_data(f"{mount}/roles/{role}", ns=ns) or {} |
|
records.append(SecretRecord( |
|
namespace_path=ns_label, |
|
mount_path=mount, |
|
secret_path=role, |
|
engine_type="aws", |
|
engine_data={ |
|
"credential_type": rdata.get("credential_type", ""), |
|
"role_arns": rdata.get("role_arns") or [], |
|
"policy_arns": rdata.get("policy_arns") or [], |
|
"iam_tags": rdata.get("iam_tags") or [], |
|
"default_sts_ttl": rdata.get("default_sts_ttl"), |
|
"max_sts_ttl": rdata.get("max_sts_ttl"), |
|
"permissions_boundary_arn": rdata.get("permissions_boundary_arn"), |
|
}, |
|
)) |
|
|
|
# ── Terraform Cloud secrets engine ─────────────────────────────── |
|
# Lists configured roles. Credential access: GET /{mount}/creds/{role}. |
|
elif mtype == "terraform": |
|
roles = client.list_keys(f"{mount}/role", ns=ns) |
|
for role in roles: |
|
rdata = client.get_data(f"{mount}/role/{role}", ns=ns) or {} |
|
records.append(SecretRecord( |
|
namespace_path=ns_label, |
|
mount_path=mount, |
|
secret_path=role, |
|
engine_type="terraform", |
|
engine_data={ |
|
"organization": rdata.get("organization", ""), |
|
"team_id": rdata.get("team_id", ""), |
|
"user_id": rdata.get("user_id", ""), |
|
"ttl": rdata.get("ttl"), |
|
"max_ttl": rdata.get("max_ttl"), |
|
"token_account_type": rdata.get("token_account_type", ""), |
|
}, |
|
)) |
|
|
|
# ── KMIP secrets engine ────────────────────────────────────────── |
|
# Structure: scopes → roles. Credential access logged as KMIP operations. |
|
elif mtype == "kmip": |
|
scopes = client.list_keys(f"{mount}/scope", ns=ns) |
|
for scope in scopes: |
|
roles = client.list_keys(f"{mount}/scope/{scope}/role", ns=ns) |
|
for role in roles: |
|
rdata = client.get_data(f"{mount}/scope/{scope}/role/{role}", ns=ns) or {} |
|
# Collect only the operations that are enabled |
|
ops = {k: v for k, v in rdata.items() |
|
if k.startswith("operation_") and v is True} |
|
records.append(SecretRecord( |
|
namespace_path=ns_label, |
|
mount_path=mount, |
|
secret_path=f"{scope}/{role}", |
|
engine_type="kmip", |
|
engine_data={ |
|
"scope": scope, |
|
"role": role, |
|
"operations": list(ops.keys()), |
|
"tls_client_key_type": rdata.get("tls_client_key_type"), |
|
"tls_client_key_bits": rdata.get("tls_client_key_bits"), |
|
}, |
|
)) |
|
|
|
log.debug(" → %d item(s) in [%s] %s (%s)", len(records), ns_label, mount, mtype) |
|
return records |
|
|
|
with ThreadPoolExecutor(max_workers=workers) as pool: |
|
futures = {pool.submit(scan_mount, job): job for job in scan_jobs} |
|
for future in as_completed(futures): |
|
try: |
|
all_secrets.extend(future.result()) |
|
except Exception as exc: |
|
job = futures[future] |
|
log.error("Error scanning mount %s: %s", job, exc) |
|
|
|
return all_secrets |
|
|
|
|
|
# ── Entity/user collection ──────────────────────────────────────────────────── |
|
|
|
def collect_entities( |
|
client: VaultClient, |
|
all_namespaces: List[NamespaceInfo], |
|
workers: int = 10, |
|
access_errors: List[Dict] = None, |
|
) -> Tuple[List[EntityRecord], Dict[str, str]]: |
|
""" |
|
Collect all identity entities across all namespaces. |
|
Returns (entities, group_name_map) where group_name_map is {group_id -> group_name}. |
|
""" |
|
if access_errors is None: |
|
access_errors = [] |
|
|
|
namespaces_to_scan = [NamespaceInfo(path="")] + all_namespaces |
|
all_entity_ids: List[Tuple[str, str]] = [] # (entity_id, ns) |
|
group_name_map: Dict[str, str] = {} |
|
|
|
for ns_info in namespaces_to_scan: |
|
ns = ns_info.path.rstrip("/") |
|
ns_label = ns + "/" if ns else "(root)" |
|
|
|
eids = client.list_entity_ids(ns=ns) |
|
log.debug("[%s] %d entity ID(s)", ns_label, len(eids)) |
|
all_entity_ids.extend((eid, ns) for eid in eids) |
|
|
|
# Collect groups while we're here |
|
gids = client.list_group_ids(ns=ns) |
|
for gid in gids: |
|
g = client.get_group(gid, ns=ns) |
|
if g: |
|
group_name_map[gid] = g.get("name", gid) |
|
|
|
log.info("Total entity IDs to fetch: %d", len(all_entity_ids)) |
|
|
|
entities: List[EntityRecord] = [] |
|
seen: set = set() |
|
lock = threading.Lock() |
|
|
|
def fetch_entity(args): |
|
eid, ns = args |
|
if eid in seen: |
|
return None |
|
data = client.get_entity(eid, ns=ns) |
|
if not data: |
|
return None |
|
ns_label = ns + "/" if ns else "(root)" |
|
|
|
aliases = [] |
|
for a in data.get("aliases") or []: |
|
aliases.append(EntityAliasRecord( |
|
alias_id=a.get("id", ""), |
|
name=a.get("name", ""), |
|
mount_accessor=a.get("mount_accessor", ""), |
|
mount_path=a.get("mount_path", ""), |
|
mount_type=a.get("mount_type", ""), |
|
custom_metadata=a.get("custom_metadata") or {}, |
|
creation_time=a.get("creation_time"), |
|
last_update_time=a.get("last_update_time"), |
|
)) |
|
|
|
return EntityRecord( |
|
namespace_path=ns_label, |
|
entity_id=eid, |
|
name=data.get("name", ""), |
|
disabled=data.get("disabled", False), |
|
policies=data.get("policies") or [], |
|
metadata=data.get("metadata") or {}, |
|
creation_time=data.get("creation_time"), |
|
last_update_time=data.get("last_update_time"), |
|
aliases=aliases, |
|
group_ids=data.get("group_ids") or [], |
|
) |
|
|
|
with ThreadPoolExecutor(max_workers=workers) as pool: |
|
futures = {pool.submit(fetch_entity, args): args for args in all_entity_ids} |
|
for future in as_completed(futures): |
|
try: |
|
rec = future.result() |
|
if rec and rec.entity_id not in seen: |
|
seen.add(rec.entity_id) |
|
with lock: |
|
entities.append(rec) |
|
except Exception as exc: |
|
log.error("Error fetching entity: %s", exc) |
|
|
|
# Resolve group names |
|
for e in entities: |
|
e.groups = [group_name_map.get(gid, gid) for gid in e.group_ids] |
|
|
|
return entities, group_name_map |
|
|
|
|
|
def scan_token_accessors( |
|
client: VaultClient, |
|
entities: List[EntityRecord], |
|
all_namespaces: List[NamespaceInfo], |
|
max_accessors: int = 2000, |
|
workers: int = 5, |
|
) -> List[EntityRecord]: |
|
""" |
|
Scan token accessors across namespaces to find latest token per entity. |
|
The most recent token creation_time is a proxy for "last active" when no |
|
audit log is available. |
|
Returns a list of orphan-token pseudo-entities (no identity entity). |
|
""" |
|
entity_by_id: Dict[str, EntityRecord] = {e.entity_id: e for e in entities} |
|
orphan_entities: List[EntityRecord] = [] |
|
lock = threading.Lock() |
|
|
|
namespaces_to_scan = [NamespaceInfo(path="")] + all_namespaces |
|
|
|
for ns_info in namespaces_to_scan: |
|
ns = ns_info.path.rstrip("/") |
|
ns_label = ns + "/" if ns else "(root)" |
|
accessors = client.list_token_accessors(ns=ns) |
|
if not accessors: |
|
continue |
|
log.debug("[%s] %d token accessor(s)", ns_label, len(accessors)) |
|
accessors = accessors[:max_accessors] |
|
|
|
def lookup(accessor, _ns=ns, _ns_label=ns_label): |
|
td = client.lookup_accessor(accessor, ns=_ns) |
|
if not td: |
|
return |
|
eid = td.get("entity_id", "") |
|
|
|
def _epoch_to_iso(val) -> Optional[str]: |
|
if not val: |
|
return None |
|
try: |
|
return datetime.fromtimestamp(int(val), tz=timezone.utc).isoformat() |
|
except Exception: |
|
return str(val) |
|
|
|
tr = TokenProxyRecord( |
|
accessor=accessor, |
|
display_name=td.get("display_name", ""), |
|
auth_path=td.get("path", ""), |
|
policies=td.get("policies") or [], |
|
creation_time=_epoch_to_iso(td.get("creation_time")), |
|
expire_time=td.get("expire_time"), |
|
# last_renewal_time: updated whenever the token is renewed via |
|
# auth/token/renew or auth/token/renew-self — key proxy for |
|
# "was active recently without re-authenticating" |
|
last_renewal_time=_epoch_to_iso(td.get("last_renewal_time")), |
|
issue_time=td.get("issue_time"), |
|
ttl=td.get("ttl") or 0, |
|
meta=td.get("meta") or {}, |
|
token_type=td.get("type", ""), |
|
renewable=td.get("renewable", False), |
|
num_uses=td.get("num_uses", 0), |
|
) |
|
with lock: |
|
if eid and eid in entity_by_id: |
|
e = entity_by_id[eid] |
|
# Keep the most recently created token |
|
if (e.latest_token is None or |
|
(tr.creation_time and e.latest_token.creation_time and |
|
tr.creation_time > e.latest_token.creation_time)): |
|
e.latest_token = tr |
|
elif not eid: |
|
orphan_entities.append(EntityRecord( |
|
namespace_path=_ns_label, |
|
entity_id="", |
|
name=td.get("display_name", "(orphan token)"), |
|
disabled=False, |
|
policies=td.get("policies") or [], |
|
metadata=td.get("meta") or {}, |
|
latest_token=tr, |
|
)) |
|
|
|
with ThreadPoolExecutor(max_workers=workers) as pool: |
|
list(pool.map(lookup, accessors)) |
|
|
|
return orphan_entities |
|
|
|
|
|
# ── Auth method user discovery ─────────────────────────────────────────────── |
|
|
|
def collect_auth_method_users( |
|
client: VaultClient, |
|
all_namespaces: List[NamespaceInfo], |
|
existing_entities: List[EntityRecord], |
|
) -> List[EntityRecord]: |
|
""" |
|
Scan every auth method mount across all namespaces and list configured |
|
users/roles that may not yet have an identity entity (because they never |
|
logged in) or that carry extra info not visible in the identity API. |
|
|
|
Supported backends: |
|
userpass — lists usernames + token_policies, token_ttl, etc. |
|
ldap — lists explicitly configured LDAP user overrides + groups |
|
github — lists mapped GitHub usernames/teams |
|
approle — lists role names |
|
cert — lists certificate roles |
|
oidc/jwt — lists roles |
|
radius — lists configured user overrides |
|
aws — lists roles |
|
azure — lists roles |
|
gcp — lists roles |
|
kubernetes — lists roles |
|
|
|
Returns a list of NEW EntityRecord objects for users not already in |
|
existing_entities. Also enriches existing entities' auth_method_extra. |
|
""" |
|
# Build lookup by alias name -> entity for matching |
|
alias_lookup: Dict[str, EntityRecord] = {} |
|
for e in existing_entities: |
|
for a in e.aliases: |
|
alias_lookup[a.name.lower()] = e |
|
|
|
new_entities: List[EntityRecord] = [] |
|
namespaces_to_scan = [NamespaceInfo(path="")] + all_namespaces |
|
|
|
for ns_info in namespaces_to_scan: |
|
ns = ns_info.path.rstrip("/") |
|
ns_label = ns + "/" if ns else "(root)" |
|
|
|
auth_mounts = client.list_auth_methods(ns=ns) |
|
|
|
for mount_raw, mount_info in auth_mounts.items(): |
|
if not isinstance(mount_info, dict): |
|
continue |
|
mount = mount_raw.rstrip("/") |
|
mtype = mount_info.get("type", "") |
|
accessor = mount_info.get("accessor", "") |
|
description = mount_info.get("description", "") |
|
|
|
# ── userpass ──────────────────────────────────────────────── |
|
if mtype == "userpass": |
|
usernames = client.list_keys(f"{mount}/users", ns=ns) |
|
for username in usernames: |
|
udata = client.get_data(f"{mount}/users/{username}", ns=ns) or {} |
|
extra = { |
|
"auth_mount": mount, |
|
"auth_type": "userpass", |
|
"token_policies": udata.get("token_policies") or [], |
|
"token_ttl": udata.get("token_ttl"), |
|
"token_max_ttl": udata.get("token_max_ttl"), |
|
"token_bound_cidrs": udata.get("token_bound_cidrs") or [], |
|
} |
|
existing = alias_lookup.get(username.lower()) |
|
if existing: |
|
existing.auth_method_extra.update(extra) |
|
else: |
|
new_entities.append(EntityRecord( |
|
namespace_path=ns_label, |
|
entity_id="", |
|
name=username, |
|
policies=udata.get("token_policies") or [], |
|
auth_method_extra=extra, |
|
)) |
|
|
|
# ── ldap ──────────────────────────────────────────────────── |
|
elif mtype == "ldap": |
|
usernames = client.list_keys(f"{mount}/users", ns=ns) |
|
for username in usernames: |
|
udata = client.get_data(f"{mount}/users/{username}", ns=ns) or {} |
|
extra = { |
|
"auth_mount": mount, |
|
"auth_type": "ldap", |
|
"ldap_groups": udata.get("groups") or [], |
|
"token_policies": udata.get("token_policies") or udata.get("policies") or [], |
|
} |
|
existing = alias_lookup.get(username.lower()) |
|
if existing: |
|
existing.auth_method_extra.update(extra) |
|
else: |
|
new_entities.append(EntityRecord( |
|
namespace_path=ns_label, |
|
entity_id="", |
|
name=username, |
|
policies=extra["token_policies"], |
|
auth_method_extra=extra, |
|
)) |
|
# Also list LDAP groups configured in Vault |
|
groups = client.list_keys(f"{mount}/groups", ns=ns) |
|
for grp in groups: |
|
gdata = client.get_data(f"{mount}/groups/{grp}", ns=ns) or {} |
|
log.debug(" LDAP group [%s] %s: policies=%s", ns_label, grp, |
|
gdata.get("token_policies") or gdata.get("policies")) |
|
|
|
# ── github ────────────────────────────────────────────────── |
|
elif mtype == "github": |
|
gh_users = client.list_keys(f"{mount}/map/users", ns=ns) |
|
for gh_user in gh_users: |
|
udata = client.get_data(f"{mount}/map/users/{gh_user}", ns=ns) or {} |
|
extra = { |
|
"auth_mount": mount, |
|
"auth_type": "github", |
|
"token_policies": (udata.get("value") or "").split(","), |
|
} |
|
existing = alias_lookup.get(gh_user.lower()) |
|
if existing: |
|
existing.auth_method_extra.update(extra) |
|
else: |
|
new_entities.append(EntityRecord( |
|
namespace_path=ns_label, |
|
entity_id="", |
|
name=gh_user, |
|
auth_method_extra=extra, |
|
)) |
|
|
|
# ── approle / cert / oidc / jwt / radius / aws / azure / gcp / k8s ── |
|
# For these, list role names — they don't map 1:1 to users, but |
|
# it's still useful to know what roles/principals are configured. |
|
elif mtype in ("approle", "cert", "oidc", "jwt", "radius", |
|
"aws", "azure", "gcp", "kubernetes"): |
|
role_path_map = { |
|
"approle": f"{mount}/role", |
|
"cert": f"{mount}/certs", |
|
"oidc": f"{mount}/role", |
|
"jwt": f"{mount}/role", |
|
"radius": f"{mount}/users", |
|
"aws": f"{mount}/role", |
|
"azure": f"{mount}/role", |
|
"gcp": f"{mount}/role", |
|
"kubernetes": f"{mount}/role", |
|
} |
|
role_path = role_path_map.get(mtype, f"{mount}/role") |
|
roles = client.list_keys(role_path, ns=ns) |
|
for role in roles: |
|
existing = alias_lookup.get(role.lower()) |
|
extra = { |
|
"auth_mount": mount, |
|
"auth_type": mtype, |
|
"role_name": role, |
|
} |
|
if existing: |
|
if "roles" not in existing.auth_method_extra: |
|
existing.auth_method_extra["roles"] = [] |
|
existing.auth_method_extra["roles"].append(extra) |
|
else: |
|
new_entities.append(EntityRecord( |
|
namespace_path=ns_label, |
|
entity_id="", |
|
name=f"{role} ({mtype})", |
|
auth_method_extra=extra, |
|
)) |
|
|
|
log.info("Auth method scan found %d additional principal(s) not in identity store", |
|
len(new_entities)) |
|
return new_entities |
|
|
|
|
|
# ── Audit log parser ────────────────────────────────────────────────────────── |
|
|
|
class AuditIndex: |
|
""" |
|
Parses a Vault audit JSONL log and builds two indexes: |
|
secret_access[ns_path|mount/secret_path] -> list of events (sorted desc by time) |
|
entity_login[entity_id] -> list of login events (sorted desc by time) |
|
""" |
|
MAX_HISTORY = 20 |
|
|
|
def __init__(self): |
|
# {path_key -> [event_dict, ...]} (latest first, max MAX_HISTORY) |
|
self.secret_access: Dict[str, List[Dict]] = defaultdict(list) |
|
# {entity_id -> [login_event_dict, ...]} |
|
self.entity_login: Dict[str, List[Dict]] = defaultdict(list) |
|
# {entity_id -> [any_activity_event_dict, ...]} |
|
self.entity_activity: Dict[str, List[Dict]] = defaultdict(list) |
|
self.entries_parsed = 0 |
|
self.parse_errors = 0 |
|
|
|
def parse_file(self, path: str) -> None: |
|
log.info("Parsing audit log: %s", path) |
|
count = 0 |
|
with open(path, "r", errors="replace") as fh: |
|
for raw in fh: |
|
raw = raw.strip() |
|
if not raw: |
|
continue |
|
self.entries_parsed += 1 |
|
count += 1 |
|
if count % 100_000 == 0: |
|
log.info(" ... %d log entries parsed", count) |
|
try: |
|
entry = json.loads(raw) |
|
except json.JSONDecodeError: |
|
self.parse_errors += 1 |
|
continue |
|
self._process(entry) |
|
|
|
@staticmethod |
|
def _resolve_ip(req: Dict) -> str: |
|
""" |
|
Return the most accurate client IP from an audit log request object. |
|
|
|
On on-prem deployments behind a load balancer or reverse proxy, Vault's |
|
remote_address may contain the proxy IP rather than the real client IP. |
|
|
|
Vault can be configured with `x_forwarded_for_authorized_addrs` in its |
|
listener config to trust X-Forwarded-For headers from certain proxy CIDRs. |
|
When configured, Vault already writes the real client IP into remote_address. |
|
|
|
When NOT configured that way, the real IP may still be preserved in the |
|
request headers that Vault logs (if header logging is enabled). This method |
|
checks both locations and returns the most specific non-RFC1918 proxy IP it |
|
can find, falling back to remote_address if nothing better is available. |
|
""" |
|
remote = req.get("remote_address", "") |
|
|
|
# Vault can log request headers; X-Forwarded-For is the standard proxy header. |
|
# Header logging must be enabled in the audit device config: |
|
# vault audit enable file path=/var/log/vault/audit.log log_raw=true |
|
# or set VAULT_AUDIT_LOG_REQUESTS_HEADERS env var. |
|
headers = req.get("headers") or {} |
|
xff = ( |
|
headers.get("X-Forwarded-For") |
|
or headers.get("x-forwarded-for") |
|
or headers.get("X-Real-IP") |
|
or headers.get("x-real-ip") |
|
or "" |
|
) |
|
if isinstance(xff, list): |
|
xff = xff[0] if xff else "" |
|
|
|
if xff: |
|
# XFF is a comma-separated list; leftmost is the original client. |
|
# Example: "203.0.113.5, 10.0.0.1, 10.0.0.2" |
|
# Strip port if present (IPv4 with port, or [IPv6]:port). |
|
client_ip = xff.split(",")[0].strip() |
|
# Remove port suffix if present |
|
if client_ip.startswith("["): |
|
# IPv6 with port: [::1]:PORT |
|
client_ip = client_ip.split("]")[0].lstrip("[") |
|
elif client_ip.count(":") == 1: |
|
# IPv4 with port: 1.2.3.4:PORT |
|
client_ip = client_ip.split(":")[0] |
|
if client_ip: |
|
return client_ip |
|
|
|
# Strip port from remote_address if present |
|
if remote.startswith("["): |
|
remote = remote.split("]")[0].lstrip("[") |
|
elif remote.count(":") == 1: |
|
remote = remote.split(":")[0] |
|
|
|
return remote |
|
|
|
def _process(self, e: Dict) -> None: |
|
if e.get("type") != "response": |
|
return # only process response entries (avoids double counting) |
|
|
|
ts = e.get("time", "") |
|
req = e.get("request") or {} |
|
auth = e.get("auth") or {} |
|
resp = e.get("response") or {} |
|
|
|
req_path = req.get("path", "") |
|
req_op = req.get("operation", "") |
|
# Use _resolve_ip to get real client IP even behind a proxy |
|
remote_ip = self._resolve_ip(req) |
|
ns_path = (req.get("namespace") or {}).get("path", "") |
|
mount_type = req.get("mount_type", "") |
|
mount_path = req.get("mount_accessor", "") # not perfect, but available |
|
|
|
entity_id = auth.get("entity_id", "") |
|
display_name = auth.get("display_name", "") |
|
token_policies = auth.get("token_policies") or [] |
|
|
|
# ── Secret access event ── |
|
# Capture reads from KV, AWS, Terraform, KMIP and any other secret engine. |
|
# We exclude system paths (sys/, auth/, identity/) to avoid false matches. |
|
if (req_op in ("read", "create", "update", "delete", "list") and |
|
mount_type not in ("", "token", "system", "identity") and |
|
not req_path.startswith(("sys/", "auth/", "identity/"))): |
|
key = f"{ns_path}|{req_path}" |
|
event = { |
|
"time": ts, |
|
"operation": req_op, |
|
"entity_id": entity_id, |
|
"display_name": display_name, |
|
"remote_ip": remote_ip, |
|
"namespace": ns_path, |
|
} |
|
lst = self.secret_access[key] |
|
lst.append(event) |
|
# Keep sorted descending by time (latest first), cap at MAX_HISTORY |
|
lst.sort(key=lambda x: x["time"], reverse=True) |
|
if len(lst) > self.MAX_HISTORY: |
|
del lst[self.MAX_HISTORY:] |
|
|
|
# ── Login event ── |
|
resp_auth = resp.get("auth") or {} |
|
if resp_auth and entity_id and req_path.startswith("auth/"): |
|
method_parts = req_path.split("/") |
|
auth_method = method_parts[1] if len(method_parts) > 1 else req_path |
|
event = { |
|
"time": ts, |
|
"auth_path": req_path, |
|
"auth_method": auth_method, |
|
"display_name": display_name, |
|
"remote_ip": remote_ip, |
|
"namespace": ns_path, |
|
"policies": token_policies, |
|
} |
|
lst = self.entity_login[entity_id] |
|
lst.append(event) |
|
lst.sort(key=lambda x: x["time"], reverse=True) |
|
if len(lst) > self.MAX_HISTORY: |
|
del lst[self.MAX_HISTORY:] |
|
|
|
# ── Any activity by entity (login + all subsequent API calls) ── |
|
# This distinguishes "last login" from "last activity": |
|
# a service may log in once and keep using its token for weeks. |
|
if entity_id: |
|
event = { |
|
"time": ts, |
|
"path": req_path, |
|
"operation": req_op, |
|
"remote_ip": remote_ip, |
|
"namespace": ns_path, |
|
"mount_type": mount_type, |
|
"display_name": display_name, |
|
} |
|
lst = self.entity_activity[entity_id] |
|
lst.append(event) |
|
lst.sort(key=lambda x: x["time"], reverse=True) |
|
if len(lst) > self.MAX_HISTORY: |
|
del lst[self.MAX_HISTORY:] |
|
|
|
def get_secret(self, ns_path: str, req_path: str) -> Optional[Dict]: |
|
"""Return latest access event for a secret path, or None.""" |
|
ns = ns_path.rstrip("/") |
|
key = f"{ns}|{req_path}" |
|
events = self.secret_access.get(key) |
|
if not events: |
|
# Try without namespace prefix (root) |
|
key2 = f"|{req_path}" |
|
events = self.secret_access.get(key2) |
|
return events[0] if events else None |
|
|
|
def get_all_secret_events(self, ns_path: str, req_path: str) -> List[Dict]: |
|
ns = ns_path.rstrip("/") |
|
key = f"{ns}|{req_path}" |
|
return self.secret_access.get(key) or self.secret_access.get(f"|{req_path}") or [] |
|
|
|
def get_login(self, entity_id: str) -> Optional[Dict]: |
|
events = self.entity_login.get(entity_id) |
|
return events[0] if events else None |
|
|
|
def get_all_logins(self, entity_id: str) -> List[Dict]: |
|
return self.entity_login.get(entity_id) or [] |
|
|
|
def count_secret_accesses(self, ns_path: str, req_path: str) -> int: |
|
return len(self.get_all_secret_events(ns_path, req_path)) |
|
|
|
def get_activity(self, entity_id: str) -> Optional[Dict]: |
|
"""Return the most recent activity event for an entity (any operation).""" |
|
events = self.entity_activity.get(entity_id) |
|
return events[0] if events else None |
|
|
|
def get_all_activities(self, entity_id: str) -> List[Dict]: |
|
return self.entity_activity.get(entity_id) or [] |
|
|
|
|
|
# ── Enrichment ──────────────────────────────────────────────────────────────── |
|
|
|
def enrich_secrets(secrets: List[SecretRecord], audit: AuditIndex) -> None: |
|
for s in secrets: |
|
ns_for_lookup = s.namespace_path.replace("(root)", "").rstrip("/") |
|
|
|
# Build the audit log request.path that matches this secret/role. |
|
if s.engine_type == "kv": |
|
# KV v2 data reads go through the /data/ prefix; metadata reads through /metadata/ |
|
prefix = "data/" if s.kv_version == 2 else "" |
|
audit_path = f"{s.mount_path}/{prefix}{s.secret_path}" |
|
elif s.engine_type in ("aws", "terraform"): |
|
# Dynamic credential generation: GET /{mount}/creds/{role} |
|
# (role definition at /roles/ or /role/ is an admin read, less common in logs) |
|
audit_path = f"{s.mount_path}/creds/{s.secret_path}" |
|
elif s.engine_type == "kmip": |
|
# KMIP scope/role operations: /{mount}/scope/{scope}/role/{role} |
|
audit_path = f"{s.mount_path}/scope/{s.secret_path}" |
|
else: |
|
audit_path = f"{s.mount_path}/{s.secret_path}" |
|
|
|
evt = audit.get_secret(ns_for_lookup, audit_path) |
|
if evt: |
|
s.last_accessed_time = evt["time"] |
|
s.last_accessed_by_entity_id = evt.get("entity_id") |
|
s.last_accessed_by_display_name = evt.get("display_name") |
|
s.last_accessed_from_ip = evt.get("remote_ip") |
|
s.last_accessed_operation = evt.get("operation") |
|
s.access_history = audit.get_all_secret_events(ns_for_lookup, audit_path) |
|
s.access_count = len(s.access_history) |
|
|
|
|
|
def enrich_entities(entities: List[EntityRecord], audit: AuditIndex) -> None: |
|
for e in entities: |
|
if not e.entity_id: |
|
continue |
|
|
|
# Last login (authentication event) |
|
login_evt = audit.get_login(e.entity_id) |
|
if login_evt: |
|
e.last_login_time = login_evt["time"] |
|
e.last_login_from_ip = login_evt.get("remote_ip") |
|
e.last_login_auth_method = login_evt.get("auth_method") |
|
e.last_login_auth_path = login_evt.get("auth_path") |
|
e.last_login_namespace = login_evt.get("namespace") |
|
e.login_history = audit.get_all_logins(e.entity_id) |
|
e.login_count = len(e.login_history) |
|
|
|
# Last activity (any API call made while authenticated as this entity) |
|
# This is different from last login: a service may authenticate once and |
|
# then continue making API calls for weeks without re-authenticating. |
|
act_evt = audit.get_activity(e.entity_id) |
|
if act_evt: |
|
e.last_activity_time = act_evt["time"] |
|
e.last_activity_path = act_evt.get("path") |
|
e.last_activity_ip = act_evt.get("remote_ip") |
|
e.last_activity_operation = act_evt.get("operation") |
|
e.last_activity_mount_type = act_evt.get("mount_type") |
|
e.activity_count = len(audit.get_all_activities(e.entity_id)) |
|
|
|
|
|
# ── Console output ───────────────────────────────────────────────────────────── |
|
|
|
def _fmt_ts(ts: Optional[str]) -> str: |
|
if not ts: |
|
return "N/A" |
|
return str(ts)[:19].replace("T", " ") |
|
|
|
|
|
def _trunc(s: str, n: int) -> str: |
|
return s[:n - 1] + "…" if len(s) > n else s |
|
|
|
|
|
def _engine_label(s: "SecretRecord") -> str: |
|
"""Short display label for the engine type of a secret record.""" |
|
if s.engine_type == "kv": |
|
return f"kv_v{s.kv_version}" |
|
return s.engine_type |
|
|
|
|
|
def print_user_summary_table(entities: List["EntityRecord"]) -> None: |
|
"""Print a compact one-row-per-user summary table.""" |
|
W = 150 |
|
print(f"\n{'─'*W}") |
|
print(f" USER ACTIVITY SUMMARY ({len(entities)} total)") |
|
print(f"{'─'*W}") |
|
hdr = ( |
|
f" {'NAME':<28} {'NAMESPACE':<22} {'AUTH TYPE':<14}" |
|
f" {'LAST LOGIN':<20} {'LAST ACTIVITY':<20} {'IP (login)':<18}" |
|
f" {'STATUS':<9} {'LOGINS'}" |
|
) |
|
print(hdr) |
|
print(f" {'─'*145}") |
|
|
|
# Sort: active first, then by last activity desc, then by name |
|
def _sort_key(e): |
|
act = e.last_activity_time or e.last_login_time or "" |
|
if e.latest_token: |
|
act = act or e.latest_token.last_renewal_time or e.latest_token.creation_time or "" |
|
return (e.disabled, -(len(act))) # disabled last, most recent first |
|
|
|
for e in sorted(entities, key=_sort_key): |
|
name = _trunc(e.name or "(unnamed)", 26) |
|
ns = _trunc(e.namespace_path, 20) |
|
auth_types = ", ".join(dict.fromkeys( |
|
a.mount_type for a in e.aliases if a.mount_type |
|
)) or ("-" if not e.auth_method_extra else e.auth_method_extra.get("auth_type", "-")) |
|
auth_types = _trunc(auth_types, 12) |
|
|
|
# Best login timestamp |
|
if e.last_login_time: |
|
last_login = _fmt_ts(e.last_login_time) |
|
elif e.latest_token: |
|
last_login = f"~{_fmt_ts(e.latest_token.creation_time)} (tkn)" |
|
else: |
|
last_login = "unknown" |
|
|
|
# Best activity timestamp |
|
if e.last_activity_time: |
|
last_act = _fmt_ts(e.last_activity_time) |
|
elif e.latest_token: |
|
best = e.latest_token.last_renewal_time or e.latest_token.creation_time |
|
last_act = f"~{_fmt_ts(best)} (tkn)" if best else "unknown" |
|
else: |
|
last_act = "unknown" |
|
|
|
ip = _trunc(e.last_login_from_ip or e.last_activity_ip or "-", 16) |
|
status = "DISABLED" if e.disabled else "active" |
|
logins = str(e.login_count) if e.login_count else "-" |
|
|
|
print( |
|
f" {name:<28} {ns:<22} {auth_types:<14}" |
|
f" {last_login:<20} {last_act:<20} {ip:<18}" |
|
f" {status:<9} {logins}" |
|
) |
|
|
|
|
|
def print_report_plain(report: VaultAuditReport) -> None: |
|
"""Fallback plain-text report.""" |
|
W = 100 |
|
sep = "═" * W |
|
|
|
print(f"\n{sep}") |
|
print(f" HCP VAULT AUDIT REPORT") |
|
print(f" Generated : {report.generated_at}") |
|
print(f" Cluster : {report.vault_addr} (version {report.vault_version}, {report.cluster_name})") |
|
print(f" Namespaces: {len(report.namespaces)} | Secrets: {len(report.secrets)} | Entities: {len(report.entities)}") |
|
audit_note = f"{report.audit_log_entries_parsed:,} entries from {report.audit_log_path}" if report.audit_log_path else "NOT PROVIDED (no last-access data)" |
|
print(f" Audit log : {audit_note}") |
|
print(sep) |
|
|
|
# ── Namespace tree ──────────────────────────────────────────────────── |
|
print(f"\n{'─'*W}") |
|
print(" NAMESPACE TREE") |
|
print(f"{'─'*W}") |
|
print(" (root)") |
|
for ns in sorted(report.namespaces, key=lambda x: x.path): |
|
indent = " " + " " * (ns.path.count("/") - 1) |
|
print(f"{indent}└─ {ns.path}") |
|
|
|
# ── Secrets ─────────────────────────────────────────────────────────── |
|
print(f"\n{'─'*W}") |
|
print(f" SECRETS ({len(report.secrets)} total)") |
|
print(f"{'─'*W}") |
|
|
|
by_ns: Dict[str, List[SecretRecord]] = defaultdict(list) |
|
for s in report.secrets: |
|
by_ns[s.namespace_path].append(s) |
|
|
|
for ns_label in sorted(by_ns): |
|
items = by_ns[ns_label] |
|
print(f"\n Namespace: {ns_label} ({len(items)} secrets)") |
|
hdr = f" {'MOUNT':<22} {'PATH':<40} {'ENGINE':<8} {'CREATED':<20} {'UPDATED':<20} {'VER':<4} {'LAST READ':<20} {'BY':<25} {'IP'}" |
|
print(hdr) |
|
print(f" {'─'*99}") |
|
for s in sorted(items, key=lambda x: (x.mount_path, x.secret_path)): |
|
destroyed = "" |
|
if s.versions: |
|
latest = max(s.versions, key=lambda v: v.version) |
|
if latest.destroyed: |
|
destroyed = " [DESTROYED]" |
|
eng = _engine_label(s) |
|
print( |
|
f" {_trunc(s.mount_path,20):<22}" |
|
f" {_trunc(s.secret_path,38):<40}" |
|
f" {eng:<8}" |
|
f" {_fmt_ts(s.created_time):<20}" |
|
f" {_fmt_ts(s.updated_time):<20}" |
|
f" {str(s.current_version or '-'):<4}" |
|
f" {_fmt_ts(s.last_accessed_time):<20}" |
|
f" {_trunc(s.last_accessed_by_display_name or '-',23):<25}" |
|
f" {s.last_accessed_from_ip or '-'}" |
|
f"{destroyed}" |
|
) |
|
if s.custom_metadata: |
|
print(f" ↳ kv metadata : {s.custom_metadata}") |
|
if s.engine_data: |
|
# Show key fields per engine type, skip empty values |
|
ed = {k: v for k, v in s.engine_data.items() if v not in (None, "", [], {})} |
|
if ed: |
|
print(f" ↳ engine config: {ed}") |
|
if s.metadata_error: |
|
print(f" ↳ [ERROR] {s.metadata_error}") |
|
|
|
# ── User summary table ──────────────────────────────────────────────── |
|
if report.entities: |
|
print_user_summary_table(report.entities) |
|
|
|
# ── Detailed entity blocks ───────────────────────────────────────────── |
|
print(f"\n{'─'*W}") |
|
print(f" USERS / ENTITIES — DETAIL ({len(report.entities)} total)") |
|
print(f"{'─'*W}") |
|
|
|
for e in sorted(report.entities, key=lambda x: (x.namespace_path, x.name)): |
|
status = " [DISABLED]" if e.disabled else "" |
|
print(f"\n {e.name or '(unnamed)'}{status} [ns: {e.namespace_path}]") |
|
print(f" Entity ID : {e.entity_id or '(none)'}") |
|
print(f" Created : {_fmt_ts(e.creation_time)}") |
|
print(f" Last updated : {_fmt_ts(e.last_update_time)}") |
|
print(f" Policies : {', '.join(e.policies) or '-'}") |
|
print(f" Groups : {', '.join(e.groups) or '-'}") |
|
if e.metadata: |
|
print(f" Metadata : {e.metadata}") |
|
|
|
if e.aliases: |
|
print(f" Auth aliases ({len(e.aliases)}):") |
|
for a in e.aliases: |
|
print(f" • {a.name} [{a.mount_type} @ {a.mount_path or a.mount_accessor}]" |
|
f" created={_fmt_ts(a.creation_time)}") |
|
|
|
if e.auth_method_extra: |
|
print(f" Auth config : {e.auth_method_extra}") |
|
|
|
# ── Last login (authentication) ────────────────────────────────── |
|
if e.last_login_time: |
|
print(f" Last login : {_fmt_ts(e.last_login_time)}") |
|
print(f" Login IP : {e.last_login_from_ip or '-'}") |
|
print(f" Auth method : {e.last_login_auth_method or '-'} ({e.last_login_auth_path or '-'})") |
|
print(f" Total logins : {e.login_count}") |
|
elif e.latest_token: |
|
t = e.latest_token |
|
print(f" Last login : ~{_fmt_ts(t.creation_time)} (token creation proxy)") |
|
print(f" Login IP : -") |
|
print(f" Auth path : {t.auth_path or '-'}") |
|
else: |
|
print(" Last login : unknown (no audit log / no token scan)") |
|
|
|
# ── Last activity (any API call) ────────────────────────────────── |
|
if e.last_activity_time: |
|
print(f" Last activity : {_fmt_ts(e.last_activity_time)}") |
|
print(f" Activity IP : {e.last_activity_ip or '-'}") |
|
print(f" Activity op : {e.last_activity_operation or '-'} → {e.last_activity_path or '-'}") |
|
print(f" Total ops : {e.activity_count}") |
|
elif e.latest_token and e.latest_token.last_renewal_time: |
|
t = e.latest_token |
|
print(f" Last activity : ~{_fmt_ts(t.last_renewal_time)} (token renewal proxy)") |
|
elif e.latest_token: |
|
t = e.latest_token |
|
tok_best = t.last_renewal_time or t.creation_time |
|
print(f" Last activity : ~{_fmt_ts(tok_best)} (token creation/renewal proxy)") |
|
print(f" Token expires : {_fmt_ts(t.expire_time)} renewable={t.renewable}") |
|
else: |
|
print(" Last activity : unknown (no audit log / no token scan)") |
|
|
|
# ── Active tokens summary ──────────────────────────────────────── |
|
if e.latest_token: |
|
t = e.latest_token |
|
renewal_note = f" last-renewed={_fmt_ts(t.last_renewal_time)}" if t.last_renewal_time else "" |
|
print(f" Latest token : type={t.token_type} created={_fmt_ts(t.creation_time)}" |
|
f"{renewal_note} expires={_fmt_ts(t.expire_time)}" |
|
f" ttl={t.ttl}s renewable={t.renewable}") |
|
|
|
# ── Errors ──────────────────────────────────────────────────────────── |
|
if report.access_errors: |
|
print(f"\n{'─'*W}") |
|
print(f" PERMISSION ERRORS ({len(report.access_errors)} path(s) skipped)") |
|
print(f"{'─'*W}") |
|
for err in report.access_errors[:50]: |
|
print(f" [{err.get('namespace','-')}] {err.get('path','-')} → {err.get('error','-')}") |
|
if len(report.access_errors) > 50: |
|
print(f" ... and {len(report.access_errors)-50} more (see JSON output)") |
|
|
|
# ── Stats ────────────────────────────────────────────────────────────── |
|
print(f"\n{'─'*W}") |
|
print(" SUMMARY STATISTICS") |
|
print(f"{'─'*W}") |
|
stats = report.stats |
|
print(f" API calls made : {stats.get('api_calls', '-')}") |
|
print(f" Elapsed time : {stats.get('elapsed_s', '-'):.1f}s") |
|
no_access = sum(1 for s in report.secrets if not s.last_accessed_time) |
|
print(f" Secrets without access: {no_access} / {len(report.secrets)}") |
|
no_login = sum(1 for e in report.entities if not e.last_login_time and not e.latest_token) |
|
no_activity = sum(1 for e in report.entities |
|
if not e.last_activity_time |
|
and not (e.latest_token and (e.latest_token.last_renewal_time |
|
or e.latest_token.creation_time))) |
|
print(f" Entities no login data: {no_login} / {len(report.entities)}") |
|
print(f" Entities no activity : {no_activity} / {len(report.entities)}") |
|
print() |
|
|
|
|
|
def print_report_rich(report: VaultAuditReport) -> None: |
|
"""Rich-formatted report (tables, colors).""" |
|
console = RichConsole() |
|
|
|
console.rule("[bold cyan]HCP Vault Audit Report[/bold cyan]") |
|
console.print(f"Generated : [dim]{report.generated_at}[/dim]") |
|
console.print(f"Cluster : [bold]{report.vault_addr}[/bold] v{report.vault_version} {report.cluster_name}") |
|
ns_count = len(report.namespaces) |
|
audit_note = (f"[green]{report.audit_log_entries_parsed:,} entries[/green] from {report.audit_log_path}" |
|
if report.audit_log_path |
|
else "[yellow]NOT PROVIDED[/yellow] — no last-access timestamps") |
|
console.print(f"Audit log : {audit_note}") |
|
console.print() |
|
|
|
# ── Namespaces ──────────────────────────────────────────────────────── |
|
ns_table = RichTable(title=f"Namespaces ({ns_count + 1})", box=rich_box.SIMPLE) |
|
ns_table.add_column("Path") |
|
ns_table.add_column("ID") |
|
ns_table.add_row("[dim](root)[/dim]", "") |
|
for ns in sorted(report.namespaces, key=lambda x: x.path): |
|
ns_table.add_row(ns.path, ns.ns_id) |
|
console.print(ns_table) |
|
|
|
# ── Secrets ─────────────────────────────────────────────────────────── |
|
s_table = RichTable(title=f"Secrets ({len(report.secrets)})", box=rich_box.SIMPLE, show_lines=False) |
|
s_table.add_column("Namespace", style="dim") |
|
s_table.add_column("Mount") |
|
s_table.add_column("Secret Path") |
|
s_table.add_column("Engine") |
|
s_table.add_column("Created") |
|
s_table.add_column("Updated") |
|
s_table.add_column("Last Read / Cred Gen", style="green") |
|
s_table.add_column("By") |
|
s_table.add_column("From IP") |
|
s_table.add_column("# Hits") |
|
|
|
for s in sorted(report.secrets, key=lambda x: (x.namespace_path, x.mount_path, x.secret_path)): |
|
last_read = _fmt_ts(s.last_accessed_time) if s.last_accessed_time else "[dim]unknown[/dim]" |
|
read_by = s.last_accessed_by_display_name or "[dim]-[/dim]" |
|
from_ip = s.last_accessed_from_ip or "[dim]-[/dim]" |
|
latest_v = max(s.versions, key=lambda v: v.version) if s.versions else None |
|
path_str = s.secret_path |
|
if latest_v and latest_v.destroyed: |
|
path_str = f"[red]{path_str} ✗DESTROYED[/red]" |
|
|
|
# Engine config summary for non-KV types |
|
eng_note = "" |
|
if s.engine_type == "aws": |
|
eng_note = s.engine_data.get("credential_type", "") |
|
elif s.engine_type == "terraform": |
|
eng_note = s.engine_data.get("organization", "") |
|
elif s.engine_type == "kmip": |
|
ops = s.engine_data.get("operations", []) |
|
eng_note = ",".join(o.replace("operation_", "") for o in ops[:3]) |
|
if len(ops) > 3: |
|
eng_note += f"+{len(ops)-3}" |
|
|
|
s_table.add_row( |
|
s.namespace_path, |
|
s.mount_path, |
|
path_str, |
|
_engine_label(s) + (f"\n[dim]{eng_note}[/dim]" if eng_note else ""), |
|
_fmt_ts(s.created_time), |
|
_fmt_ts(s.updated_time), |
|
last_read, |
|
read_by, |
|
from_ip, |
|
str(s.access_count) if s.access_count else "[dim]0[/dim]", |
|
) |
|
|
|
console.print(s_table) |
|
|
|
# ── User summary table (rich) ────────────────────────────────────────── |
|
u_sum = RichTable( |
|
title=f"User Activity Summary ({len(report.entities)} total)", |
|
box=rich_box.SIMPLE, show_lines=False, |
|
) |
|
u_sum.add_column("Name") |
|
u_sum.add_column("Namespace", style="dim") |
|
u_sum.add_column("Auth Type") |
|
u_sum.add_column("Last Login", style="green") |
|
u_sum.add_column("Last Activity", style="cyan") |
|
u_sum.add_column("Login IP") |
|
u_sum.add_column("Status") |
|
u_sum.add_column("Logins") |
|
|
|
def _usort(e): |
|
act = e.last_activity_time or e.last_login_time or "" |
|
if e.latest_token: |
|
act = act or e.latest_token.last_renewal_time or e.latest_token.creation_time or "" |
|
return (e.disabled, -(len(act))) |
|
|
|
for e in sorted(report.entities, key=_usort): |
|
auth_types = ", ".join(dict.fromkeys( |
|
a.mount_type for a in e.aliases if a.mount_type |
|
)) or e.auth_method_extra.get("auth_type", "[dim]-[/dim]") |
|
|
|
if e.last_login_time: |
|
ll = _fmt_ts(e.last_login_time) |
|
elif e.latest_token: |
|
ll = f"[dim]~{_fmt_ts(e.latest_token.creation_time)} (tkn)[/dim]" |
|
else: |
|
ll = "[dim]unknown[/dim]" |
|
|
|
if e.last_activity_time: |
|
la = _fmt_ts(e.last_activity_time) |
|
elif e.latest_token: |
|
best = e.latest_token.last_renewal_time or e.latest_token.creation_time |
|
la = f"[dim]~{_fmt_ts(best)} (tkn)[/dim]" if best else "[dim]unknown[/dim]" |
|
else: |
|
la = "[dim]unknown[/dim]" |
|
|
|
ip = e.last_login_from_ip or e.last_activity_ip or "[dim]-[/dim]" |
|
status = "[red]DISABLED[/red]" if e.disabled else "[green]active[/green]" |
|
logins = str(e.login_count) if e.login_count else "[dim]-[/dim]" |
|
|
|
u_sum.add_row( |
|
e.name or "[dim](unnamed)[/dim]", |
|
e.namespace_path, |
|
auth_types or "[dim]-[/dim]", |
|
ll, la, ip, status, logins, |
|
) |
|
|
|
console.print(u_sum) |
|
|
|
# ── Entities ────────────────────────────────────────────────────────── |
|
e_table = RichTable( |
|
title=f"Users / Entities ({len(report.entities)})", |
|
box=rich_box.SIMPLE, show_lines=True, |
|
) |
|
e_table.add_column("Namespace", style="dim") |
|
e_table.add_column("Name") |
|
e_table.add_column("Status") |
|
e_table.add_column("Auth Aliases") |
|
e_table.add_column("Policies / Groups") |
|
e_table.add_column("Last Login", style="green") |
|
e_table.add_column("Login IP") |
|
e_table.add_column("Auth Method") |
|
e_table.add_column("# Logins") |
|
e_table.add_column("Last Activity", style="cyan") |
|
e_table.add_column("Activity IP") |
|
e_table.add_column("Activity Op → Path") |
|
e_table.add_column("# Ops") |
|
|
|
for e in sorted(report.entities, key=lambda x: (x.namespace_path, x.name)): |
|
status = "[red]DISABLED[/red]" if e.disabled else "[green]active[/green]" |
|
alias_str = "\n".join( |
|
f"{a.name} ({a.mount_type})" for a in e.aliases |
|
) or "[dim]-[/dim]" |
|
pol_grp = ", ".join(e.policies) |
|
if e.groups: |
|
pol_grp += ("\n" if pol_grp else "") + "grp: " + ", ".join(e.groups) |
|
pol_grp = pol_grp or "[dim]-[/dim]" |
|
|
|
# Last login |
|
if e.last_login_time: |
|
last_login = _fmt_ts(e.last_login_time) |
|
login_ip = e.last_login_from_ip or "-" |
|
auth_method = e.last_login_auth_method or "-" |
|
login_count = str(e.login_count) |
|
elif e.latest_token: |
|
t = e.latest_token |
|
last_login = f"[dim]~{_fmt_ts(t.creation_time)} (token)[/dim]" |
|
login_ip = "[dim]-[/dim]" |
|
auth_method = f"[dim]{t.auth_path or '-'}[/dim]" |
|
login_count = "[dim]-[/dim]" |
|
else: |
|
last_login = "[dim]unknown[/dim]" |
|
login_ip = "[dim]-[/dim]" |
|
auth_method = "[dim]-[/dim]" |
|
login_count = "[dim]-[/dim]" |
|
|
|
# Last activity |
|
if e.last_activity_time: |
|
last_act = _fmt_ts(e.last_activity_time) |
|
act_ip = e.last_activity_ip or "-" |
|
act_op = f"{e.last_activity_operation or '-'} → {_trunc(e.last_activity_path or '-', 40)}" |
|
act_count = str(e.activity_count) |
|
elif e.latest_token: |
|
t = e.latest_token |
|
best = t.last_renewal_time or t.creation_time |
|
renewal_note = "(renewal)" if t.last_renewal_time else "(token)" |
|
last_act = f"[dim]~{_fmt_ts(best)} {renewal_note}[/dim]" |
|
act_ip = "[dim]-[/dim]" |
|
act_op = f"[dim]ttl={t.ttl}s renewable={t.renewable}[/dim]" |
|
act_count = "[dim]-[/dim]" |
|
else: |
|
last_act = "[dim]unknown[/dim]" |
|
act_ip = "[dim]-[/dim]" |
|
act_op = "[dim]-[/dim]" |
|
act_count = "[dim]-[/dim]" |
|
|
|
e_table.add_row( |
|
e.namespace_path, |
|
e.name or "[dim](unnamed)[/dim]", |
|
status, |
|
alias_str, |
|
pol_grp, |
|
last_login, |
|
login_ip, |
|
auth_method, |
|
login_count, |
|
last_act, |
|
act_ip, |
|
act_op, |
|
act_count, |
|
) |
|
|
|
console.print(e_table) |
|
|
|
# ── Errors ──────────────────────────────────────────────────────────── |
|
if report.access_errors: |
|
console.print(f"\n[bold red]Permission Errors[/bold red] ({len(report.access_errors)} paths skipped)") |
|
for err in report.access_errors[:30]: |
|
console.print(f" [dim][{err.get('namespace','')}][/dim] {err.get('path','')} → [red]{err.get('error','')}[/red]") |
|
|
|
# ── Stats ────────────────────────────────────────────────────────────── |
|
stats = report.stats |
|
no_access = sum(1 for s in report.secrets if not s.last_accessed_time) |
|
no_login = sum(1 for e in report.entities if not e.last_login_time and not e.latest_token) |
|
no_activity = sum(1 for e in report.entities |
|
if not e.last_activity_time |
|
and not (e.latest_token and (e.latest_token.last_renewal_time |
|
or e.latest_token.creation_time))) |
|
console.print( |
|
f"\n[bold]Stats:[/bold] API calls={stats.get('api_calls', '-')} " |
|
f"elapsed={stats.get('elapsed_s', 0):.1f}s " |
|
f"secrets_without_access={no_access}/{len(report.secrets)} " |
|
f"entities_no_login={no_login}/{len(report.entities)} " |
|
f"entities_no_activity={no_activity}/{len(report.entities)}" |
|
) |
|
|
|
|
|
# ── Export functions ────────────────────────────────────────────────────────── |
|
|
|
def _serialize(obj: Any) -> Any: |
|
if isinstance(obj, list): |
|
return [_serialize(i) for i in obj] |
|
if isinstance(obj, dict): |
|
return {k: _serialize(v) for k, v in obj.items()} |
|
return obj |
|
|
|
|
|
def export_json(report: VaultAuditReport, path: Path) -> None: |
|
def _default(o): |
|
if hasattr(o, "__dataclass_fields__"): |
|
return asdict(o) |
|
return str(o) |
|
|
|
data = { |
|
"meta": { |
|
"generated_at": report.generated_at, |
|
"vault_addr": report.vault_addr, |
|
"vault_version": report.vault_version, |
|
"cluster_name": report.cluster_name, |
|
"audit_log_path": report.audit_log_path, |
|
"audit_log_entries_parsed": report.audit_log_entries_parsed, |
|
"stats": report.stats, |
|
"access_errors": report.access_errors, |
|
}, |
|
"namespaces": [asdict(n) for n in report.namespaces], |
|
"secrets": [asdict(s) for s in report.secrets], |
|
"entities": [asdict(e) for e in report.entities], |
|
} |
|
path.write_text(json.dumps(data, indent=2, default=_default)) |
|
log.info("JSON report saved: %s", path) |
|
|
|
|
|
def export_secrets_csv(secrets: List[SecretRecord], path: Path) -> None: |
|
fields = [ |
|
"namespace_path", "mount_path", "secret_path", "full_path", |
|
"engine_type", "kv_version", |
|
"created_time", "updated_time", |
|
"current_version", "oldest_version", "max_versions", "total_versions", |
|
"custom_metadata", "engine_data", |
|
"last_accessed_time", "access_count", |
|
"last_accessed_by_entity_id", "last_accessed_by_display_name", |
|
"last_accessed_from_ip", "last_accessed_operation", |
|
"metadata_error", |
|
] |
|
with path.open("w", newline="", encoding="utf-8") as fh: |
|
w = csv.DictWriter(fh, fieldnames=fields, extrasaction="ignore") |
|
w.writeheader() |
|
for s in secrets: |
|
row = asdict(s) |
|
row["full_path"] = s.full_path |
|
row["total_versions"] = len(s.versions) |
|
row["custom_metadata"] = json.dumps(s.custom_metadata) |
|
row["engine_data"] = json.dumps(s.engine_data) |
|
for key in ("versions", "access_history"): |
|
row.pop(key, None) |
|
w.writerow(row) |
|
log.info("Secrets CSV saved: %s", path) |
|
|
|
|
|
def export_entities_csv(entities: List[EntityRecord], path: Path) -> None: |
|
fields = [ |
|
"namespace_path", "entity_id", "name", "disabled", |
|
"policies", "groups", "metadata", |
|
"creation_time", "last_update_time", |
|
"aliases_summary", "alias_count", |
|
# Token proxy columns |
|
"latest_token_created", "latest_token_last_renewal", |
|
"latest_token_auth_path", "latest_token_expires", |
|
"latest_token_ttl", "latest_token_renewable", |
|
# Last login (authentication event) |
|
"last_login_time", "last_login_from_ip", |
|
"last_login_auth_method", "last_login_auth_path", "last_login_namespace", |
|
"login_count", |
|
# Last activity (any API call made with the token) |
|
"last_activity_time", "last_activity_ip", |
|
"last_activity_operation", "last_activity_path", "last_activity_mount_type", |
|
"activity_count", |
|
# Best estimate for "last seen" (most recent of login/activity/renewal) |
|
"last_seen_time", |
|
# Auth method config |
|
"auth_method_extra", |
|
] |
|
with path.open("w", newline="", encoding="utf-8") as fh: |
|
w = csv.DictWriter(fh, fieldnames=fields, extrasaction="ignore") |
|
w.writeheader() |
|
for e in entities: |
|
aliases_summary = "; ".join( |
|
f"{a.name} ({a.mount_type}@{a.mount_path or a.mount_accessor})" |
|
for a in e.aliases |
|
) |
|
tok = e.latest_token |
|
|
|
# Best-effort "last seen" = most recent of all known timestamps |
|
candidates = [ |
|
t for t in [ |
|
e.last_activity_time, |
|
e.last_login_time, |
|
tok.last_renewal_time if tok else None, |
|
tok.creation_time if tok else None, |
|
] if t |
|
] |
|
last_seen = max(candidates) if candidates else "" |
|
|
|
row = { |
|
"namespace_path": e.namespace_path, |
|
"entity_id": e.entity_id, |
|
"name": e.name, |
|
"disabled": e.disabled, |
|
"policies": "; ".join(e.policies), |
|
"groups": "; ".join(e.groups), |
|
"metadata": json.dumps(e.metadata), |
|
"creation_time": e.creation_time or "", |
|
"last_update_time": e.last_update_time or "", |
|
"aliases_summary": aliases_summary, |
|
"alias_count": len(e.aliases), |
|
"latest_token_created": tok.creation_time if tok else "", |
|
"latest_token_last_renewal": tok.last_renewal_time if tok else "", |
|
"latest_token_auth_path": tok.auth_path if tok else "", |
|
"latest_token_expires": tok.expire_time if tok else "", |
|
"latest_token_ttl": tok.ttl if tok else "", |
|
"latest_token_renewable": tok.renewable if tok else "", |
|
"last_login_time": e.last_login_time or "", |
|
"last_login_from_ip": e.last_login_from_ip or "", |
|
"last_login_auth_method": e.last_login_auth_method or "", |
|
"last_login_auth_path": e.last_login_auth_path or "", |
|
"last_login_namespace": e.last_login_namespace or "", |
|
"login_count": e.login_count, |
|
"last_activity_time": e.last_activity_time or "", |
|
"last_activity_ip": e.last_activity_ip or "", |
|
"last_activity_operation": e.last_activity_operation or "", |
|
"last_activity_path": e.last_activity_path or "", |
|
"last_activity_mount_type": e.last_activity_mount_type or "", |
|
"activity_count": e.activity_count, |
|
"last_seen_time": last_seen, |
|
"auth_method_extra": json.dumps(e.auth_method_extra), |
|
} |
|
w.writerow(row) |
|
log.info("Entities CSV saved: %s", path) |
|
|
|
|
|
# ── CLI / Main ──────────────────────────────────────────────────────────────── |
|
|
|
def parse_args() -> argparse.Namespace: |
|
p = argparse.ArgumentParser( |
|
prog="vault_audit.py", |
|
description=__doc__, |
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
) |
|
grp_conn = p.add_argument_group("Connection") |
|
grp_conn.add_argument("--addr", metavar="URL", |
|
help="Vault address (default: $VAULT_ADDR)") |
|
grp_conn.add_argument("--token", metavar="TOKEN", |
|
help="Vault token (default: $VAULT_TOKEN)") |
|
grp_conn.add_argument("--no-tls-verify", action="store_true", |
|
help="Disable TLS certificate verification") |
|
grp_conn.add_argument("--ca-cert", metavar="PATH", |
|
help="Path to CA certificate bundle for on-prem TLS " |
|
"(default: $VAULT_CACERT)") |
|
grp_conn.add_argument("--timeout", type=int, default=15, metavar="SEC", |
|
help="Per-request timeout in seconds (default: 15)") |
|
grp_conn.add_argument("--namespace", metavar="NS", default="", |
|
help="Start namespace scan from this namespace (default: root)") |
|
|
|
grp_scope = p.add_argument_group("Scope") |
|
grp_scope.add_argument("--no-secrets", action="store_true", |
|
help="Skip secret collection") |
|
grp_scope.add_argument("--no-users", action="store_true", |
|
help="Skip user/entity collection") |
|
grp_scope.add_argument("--no-token-scan", action="store_true", |
|
help="Skip token accessor scan (faster, but loses last-login proxy data)") |
|
grp_scope.add_argument("--no-auth-method-scan", action="store_true", |
|
help="Skip scanning auth method mounts for configured users/roles") |
|
grp_scope.add_argument("--max-accessors", type=int, default=2000, metavar="N", |
|
help="Max token accessors to look up per namespace (default: 2000)") |
|
grp_scope.add_argument("--max-depth", type=int, default=12, metavar="N", |
|
help="Max KV directory depth to recurse (default: 12)") |
|
|
|
grp_audit = p.add_argument_group("Audit Log") |
|
grp_audit.add_argument("--audit-log", metavar="PATH", |
|
help="Path to Vault audit JSONL log file for last-access data") |
|
|
|
grp_out = p.add_argument_group("Output") |
|
grp_out.add_argument("--output-dir", metavar="DIR", default="./vault_audit_output", |
|
help="Directory for output files (default: ./vault_audit_output)") |
|
grp_out.add_argument("--no-save", action="store_true", |
|
help="Print to console only, do not save files") |
|
grp_out.add_argument("--no-color", action="store_true", |
|
help="Disable rich/color output") |
|
|
|
grp_perf = p.add_argument_group("Performance") |
|
grp_perf.add_argument("--workers", type=int, default=10, metavar="N", |
|
help="Thread pool size for parallel API calls (default: 10)") |
|
grp_perf.add_argument("--rate-limit", type=float, default=50.0, metavar="RPS", |
|
help="Max API requests per second (default: 50)") |
|
|
|
grp_debug = p.add_argument_group("Debug") |
|
grp_debug.add_argument("-v", "--verbose", action="store_true", |
|
help="Enable verbose/debug logging") |
|
|
|
return p.parse_args() |
|
|
|
|
|
def main() -> None: |
|
args = parse_args() |
|
|
|
logging.basicConfig( |
|
level=logging.DEBUG if args.verbose else logging.INFO, |
|
format="%(asctime)s %(levelname)-7s %(message)s", |
|
datefmt="%H:%M:%S", |
|
) |
|
|
|
# ── Credentials ────────────────────────────────────────────────────── |
|
vault_addr = args.addr or os.environ.get("VAULT_ADDR", "").rstrip("/") |
|
vault_token = args.token or os.environ.get("VAULT_TOKEN", "") |
|
|
|
if not vault_addr: |
|
log.error("VAULT_ADDR is not set. Use --addr or export VAULT_ADDR=https://...") |
|
sys.exit(1) |
|
if not vault_token: |
|
log.error("VAULT_TOKEN is not set. Use --token or export VAULT_TOKEN=hvs....") |
|
sys.exit(1) |
|
|
|
tls_verify = not args.no_tls_verify |
|
ca_cert = args.ca_cert or os.environ.get("VAULT_CACERT", "") or None |
|
|
|
if ca_cert: |
|
log.info("TLS CA cert: %s", ca_cert) |
|
elif not tls_verify: |
|
log.warning("TLS verification disabled — do not use in production") |
|
|
|
log.info("Connecting to: %s (TLS verify=%s)", vault_addr, tls_verify) |
|
|
|
limiter = TokenBucket(rate=args.rate_limit, burst=args.rate_limit) |
|
client = VaultClient(vault_addr, vault_token, tls_verify=tls_verify, |
|
ca_cert=ca_cert, timeout=args.timeout, rate_limiter=limiter) |
|
|
|
# ── Health check ────────────────────────────────────────────────────── |
|
t0 = time.monotonic() |
|
health = client.health() |
|
if not health or health.get("sealed"): |
|
log.error("Cannot connect to Vault or Vault is sealed. Check VAULT_ADDR.") |
|
if health.get("sealed"): |
|
log.error("Vault reports sealed=true. Unseal before running this script.") |
|
sys.exit(1) |
|
|
|
vault_version = health.get("version", "unknown") |
|
cluster_name = health.get("cluster_name", "unknown") |
|
log.info("Vault version: %s cluster: %s", vault_version, cluster_name) |
|
|
|
self_info = client.token_lookup_self() |
|
if self_info: |
|
log.info( |
|
"Token owner: %s | policies: %s | ttl: %ss", |
|
self_info.get("display_name", "?"), |
|
", ".join(self_info.get("policies", [])), |
|
self_info.get("ttl", "?"), |
|
) |
|
|
|
access_errors: List[Dict] = [] |
|
report = VaultAuditReport( |
|
generated_at=datetime.now(tz=timezone.utc).isoformat(), |
|
vault_addr=vault_addr, |
|
vault_version=vault_version, |
|
cluster_name=cluster_name, |
|
access_errors=access_errors, |
|
) |
|
|
|
# ── HCP Vault auto-detection ────────────────────────────────────────── |
|
# HCP Vault clusters always use "admin" as the root namespace. |
|
# If no --namespace flag was given and the address matches HCP's domain, |
|
# default to "admin" automatically. |
|
start_namespace = args.namespace |
|
if not start_namespace and ".hashicorp.cloud" in vault_addr: |
|
start_namespace = "admin" |
|
log.info("HCP Vault detected — defaulting to --namespace admin") |
|
|
|
# ── Namespace discovery ─────────────────────────────────────────────── |
|
log.info("Phase 1/6: Discovering namespaces ...") |
|
ns_list = collect_namespaces(client, start_ns=start_namespace) |
|
report.namespaces = ns_list |
|
log.info("Found %d namespace(s) (plus root)", len(ns_list)) |
|
|
|
# ── Interactive audit log prompt ────────────────────────────────────── |
|
# If --audit-log was not given on the command line and we are running |
|
# interactively (not piped), ask the user now. |
|
if not args.audit_log and sys.stdin.isatty(): |
|
print() |
|
print(" ┌─ Audit log ──────────────────────────────────────────────────────┐") |
|
print(" │ Provide the path to your Vault audit log (JSON/JSONL format). │") |
|
print(" │ This enables last-access and last-login timestamps for all │") |
|
print(" │ secrets and users. │") |
|
print(" │ │") |
|
print(" │ HCP Vault: portal → cluster → Audit → enable + download logs │") |
|
print(" │ On-prem : check your audit device path (vault audit list) │") |
|
print(" │ Leave empty to continue without last-access data. │") |
|
print(" └───────────────────────────────────────────────────────────────────┘") |
|
try: |
|
audit_input = input(" Audit log path > ").strip() |
|
if audit_input: |
|
args.audit_log = audit_input |
|
except (EOFError, KeyboardInterrupt): |
|
pass |
|
print() |
|
|
|
# ── Audit log ───────────────────────────────────────────────────────── |
|
audit = AuditIndex() |
|
if args.audit_log: |
|
log.info("Phase 2/6: Parsing audit log: %s", args.audit_log) |
|
try: |
|
audit.parse_file(args.audit_log) |
|
report.audit_log_path = args.audit_log |
|
report.audit_log_entries_parsed = audit.entries_parsed |
|
log.info( |
|
"Audit log: %d entries parsed, %d secret paths, " |
|
"%d entity logins, %d entity activity records", |
|
audit.entries_parsed, |
|
len(audit.secret_access), |
|
len(audit.entity_login), |
|
len(audit.entity_activity), |
|
) |
|
except FileNotFoundError: |
|
log.error("Audit log not found: %s", args.audit_log) |
|
except Exception as exc: |
|
log.error("Failed to parse audit log: %s", exc) |
|
else: |
|
log.info( |
|
"Phase 2/6: No audit log provided — last-access and last-activity data " |
|
"will be unavailable.\n" |
|
" Use --audit-log <path> to include access timestamps.\n" |
|
" Token scan (enabled by default) will provide a creation-time proxy." |
|
) |
|
|
|
# ── Secrets ─────────────────────────────────────────────────────────── |
|
if not args.no_secrets: |
|
log.info("Phase 3/6: Collecting secrets (kv, kv_v2, aws, terraform, kmip) ...") |
|
secrets = collect_secrets( |
|
client, ns_list, |
|
max_depth=args.max_depth, |
|
workers=args.workers, |
|
access_errors=access_errors, |
|
) |
|
report.secrets = secrets |
|
log.info("Collected %d secret(s)", len(secrets)) |
|
|
|
if report.audit_log_path: |
|
log.info("Enriching secrets with audit log data ...") |
|
enrich_secrets(secrets, audit) |
|
else: |
|
log.info("Phase 3/6: Secrets — skipped (--no-secrets)") |
|
|
|
# ── Users / entities ────────────────────────────────────────────────── |
|
if not args.no_users: |
|
log.info("Phase 4/6: Collecting users/entities from identity store ...") |
|
entities, group_name_map = collect_entities( |
|
client, ns_list, |
|
workers=args.workers, |
|
access_errors=access_errors, |
|
) |
|
log.info("Identity store: %d entity/user record(s)", len(entities)) |
|
|
|
# Token accessor scan (on by default — gives last-renewal proxy) |
|
if not args.no_token_scan: |
|
log.info("Phase 5/6: Scanning token accessors for last-activity proxy ...") |
|
orphans = scan_token_accessors( |
|
client, entities, ns_list, |
|
max_accessors=args.max_accessors, |
|
workers=min(5, args.workers), |
|
) |
|
entities.extend(orphans) |
|
log.info( |
|
"Token scan complete. %d entity token(s) updated, %d orphan token(s).", |
|
sum(1 for e in entities if e.latest_token and not e.entity_id == ""), |
|
len(orphans), |
|
) |
|
else: |
|
log.info("Phase 5/6: Token scan — skipped (--no-token-scan)") |
|
|
|
# Auth method user scan (on by default — finds users never seen before) |
|
if not args.no_auth_method_scan: |
|
log.info("Phase 5b/6: Scanning auth method mounts for configured users ...") |
|
auth_extras = collect_auth_method_users(client, ns_list, entities) |
|
entities.extend(auth_extras) |
|
log.info( |
|
"Auth method scan complete. %d additional principal(s) discovered.", |
|
len(auth_extras), |
|
) |
|
else: |
|
log.info("Phase 5b/6: Auth method scan — skipped (--no-auth-method-scan)") |
|
|
|
report.entities = entities |
|
log.info("Total users/entities collected: %d", len(entities)) |
|
|
|
if report.audit_log_path: |
|
log.info("Enriching entities with audit log data (login + activity) ...") |
|
enrich_entities(entities, audit) |
|
else: |
|
log.info("Phase 4-5/6: Users — skipped (--no-users)") |
|
|
|
# ── Output ──────────────────────────────────────────────────────────── |
|
log.info("Phase 6/6: Generating output ...") |
|
|
|
elapsed = time.monotonic() - t0 |
|
report.stats = { |
|
"api_calls": client.call_count, |
|
"elapsed_s": round(elapsed, 2), |
|
"namespace_count": len(ns_list), |
|
"secret_count": len(report.secrets), |
|
"entity_count": len(report.entities), |
|
"access_error_count": len(access_errors), |
|
} |
|
|
|
use_rich = RICH and not args.no_color |
|
if use_rich: |
|
print_report_rich(report) |
|
else: |
|
print_report_plain(report) |
|
|
|
if not args.no_save: |
|
out_dir = Path(args.output_dir) |
|
out_dir.mkdir(parents=True, exist_ok=True) |
|
ts = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
|
export_json(report, out_dir / f"vault_audit_{ts}.json") |
|
if report.secrets: |
|
export_secrets_csv(report.secrets, out_dir / f"vault_secrets_{ts}.csv") |
|
if report.entities: |
|
export_entities_csv(report.entities, out_dir / f"vault_entities_{ts}.csv") |
|
|
|
log.info("Output saved to: %s", out_dir.resolve()) |
|
|
|
log.info( |
|
"Done. %d API calls in %.1fs. Secrets: %d. Entities: %d.", |
|
client.call_count, elapsed, len(report.secrets), len(report.entities), |
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
{ "type": "response", // "request" entries are skipped (avoids double-counting) "time": "2025-03-01T14:23:11.123456Z", "auth": { "entity_id": "abc-123", // NOT hashed — used to correlate with identity store "display_name": "alice", "token_type": "service", "policies": ["default", "kv-read"] }, "request": { "operation": "read", "path": "secret/data/myapp/db-creds", "namespace": { "id": "...", "path": "team-a/" }, "remote_address": "10.0.1.5" // NOT hashed — real IP (or proxy IP if behind LB) }, "response": { "auth": { ... } // present only for login events } }