Skip to content

Instantly share code, notes, and snippets.

@jpapazian2000
Last active March 9, 2026 09:52
Show Gist options
  • Select an option

  • Save jpapazian2000/a41a0548bbd78d5d975f7c0e03e69eee to your computer and use it in GitHub Desktop.

Select an option

Save jpapazian2000/a41a0548bbd78d5d975f7c0e03e69eee to your computer and use it in GitHub Desktop.
vault_audit.py — HashiCorp Vault comprehensive audit script (secrets, users, last-access via audit log)
#!/usr/bin/env python3
"""
vault_audit.py — HCP Vault Comprehensive Audit Script
======================================================
Collects and reports:
• All secrets (names, metadata, last-accessed time) across all namespaces
• All users/entities (aliases, policies, last-login time, originating IPs)
"Last accessed" and "last login" require an audit log file (--audit-log).
Without it, the script still collects all structural data from the Vault API.
Requirements:
pip install requests urllib3
pip install rich # optional, for nicer console output
Usage:
export VAULT_ADDR="https://your-cluster.hashicorp.cloud:8200"
export VAULT_TOKEN="hvs.your-token"
# Basic scan (API data only):
python3 vault_audit.py
# With audit log for last-access timestamps:
python3 vault_audit.py --audit-log /var/log/vault/audit.log
# Limit to one namespace and its children:
python3 vault_audit.py --namespace admin
# Full options:
python3 vault_audit.py --help
"""
# ── Imports ───────────────────────────────────────────────────────────────────
import argparse
import csv
import json
import logging
import os
import re
import sys
import time
import threading
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any
try:
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
except ImportError:
print("Error: 'requests' is required. Install with: pip install requests urllib3")
sys.exit(1)
try:
from rich.console import Console as RichConsole
from rich.table import Table as RichTable
from rich.text import Text as RichText
from rich import box as rich_box
RICH = True
except ImportError:
RICH = False
# ── Logging ───────────────────────────────────────────────────────────────────
log = logging.getLogger("vault_audit")
# ── Data classes ─────────────────────────────────────────────────────────────
@dataclass
class NamespaceInfo:
path: str # full path from root, e.g. "team-a/prod/" (root = "")
ns_id: str = ""
custom_metadata: Dict[str, str] = field(default_factory=dict)
@dataclass
class SecretVersionInfo:
version: int
created_time: Optional[str] = None
deletion_time: Optional[str] = None
destroyed: bool = False
@dataclass
class EntityAliasRecord:
alias_id: str
name: str
mount_accessor: str
mount_path: str
mount_type: str
custom_metadata: Dict[str, str] = field(default_factory=dict)
creation_time: Optional[str] = None
last_update_time: Optional[str] = None
@dataclass
class TokenProxyRecord:
"""A live token that proxies as a last-login indicator when no audit log exists."""
accessor: str
display_name: str
auth_path: str # e.g. "auth/userpass/login/alice"
policies: List[str]
creation_time: Optional[str] = None # ISO8601 from epoch int
expire_time: Optional[str] = None
last_renewal_time: Optional[str] = None # ISO8601; updated on token renew
issue_time: Optional[str] = None # ISO8601; same as creation for new tokens
ttl: int = 0
meta: Dict[str, str] = field(default_factory=dict)
token_type: str = ""
renewable: bool = False
num_uses: int = 0
@dataclass
class SecretRecord:
namespace_path: str
mount_path: str
secret_path: str # role name (aws/tf/kmip) or KV path
engine_type: str = "kv" # kv, aws, terraform, kmip
kv_version: Optional[int] = None # 1 or 2 for KV only
# KV v2 metadata
created_time: Optional[str] = None
updated_time: Optional[str] = None
current_version: Optional[int] = None
oldest_version: Optional[int] = None
max_versions: Optional[int] = None
custom_metadata: Dict[str, Any] = field(default_factory=dict)
versions: List[SecretVersionInfo] = field(default_factory=list)
# Engine-specific raw config (aws role config, terraform role, kmip role ops…)
engine_data: Dict[str, Any] = field(default_factory=dict)
# Audit-log enrichment (None = no log provided or no match)
last_accessed_time: Optional[str] = None
last_accessed_by_entity_id: Optional[str] = None
last_accessed_by_display_name: Optional[str] = None
last_accessed_from_ip: Optional[str] = None
last_accessed_operation: Optional[str] = None
access_count: int = 0
access_history: List[Dict] = field(default_factory=list)
# Error
metadata_error: Optional[str] = None
@property
def full_path(self) -> str:
ns = self.namespace_path.rstrip("/")
return f"{ns}/{self.mount_path}/{self.secret_path}".lstrip("/")
@dataclass
class EntityRecord:
namespace_path: str
entity_id: str
name: str
disabled: bool = False
policies: List[str] = field(default_factory=list)
metadata: Dict[str, str] = field(default_factory=dict)
creation_time: Optional[str] = None
last_update_time: Optional[str] = None
aliases: List[EntityAliasRecord] = field(default_factory=list)
groups: List[str] = field(default_factory=list) # resolved group names
group_ids: List[str] = field(default_factory=list)
# Token proxy (best effort "last active" when no audit log)
latest_token: Optional[TokenProxyRecord] = None
# Auth-method specific info (from collect_auth_method_users)
auth_method_extra: Dict[str, Any] = field(default_factory=dict)
# Audit-log enrichment — last login (authentication event)
last_login_time: Optional[str] = None
last_login_from_ip: Optional[str] = None
last_login_auth_method: Optional[str] = None
last_login_auth_path: Optional[str] = None
last_login_namespace: Optional[str] = None
login_count: int = 0
login_history: List[Dict] = field(default_factory=list)
# Audit-log enrichment — last activity (ANY request: read, list, create…)
last_activity_time: Optional[str] = None
last_activity_path: Optional[str] = None
last_activity_ip: Optional[str] = None
last_activity_operation: Optional[str] = None
last_activity_mount_type: Optional[str] = None
activity_count: int = 0
@dataclass
class VaultAuditReport:
generated_at: str
vault_addr: str
vault_version: str
cluster_name: str
namespaces: List[NamespaceInfo] = field(default_factory=list)
secrets: List[SecretRecord] = field(default_factory=list)
entities: List[EntityRecord] = field(default_factory=list)
audit_log_path: Optional[str] = None
audit_log_entries_parsed: int = 0
access_errors: List[Dict] = field(default_factory=list)
stats: Dict = field(default_factory=dict)
# ── Rate limiter ──────────────────────────────────────────────────────────────
class TokenBucket:
"""Simple thread-safe token bucket rate limiter."""
def __init__(self, rate: float = 50.0, burst: float = 50.0):
self.rate = rate
self.burst = burst
self._tokens = burst
self._last = time.monotonic()
self._lock = threading.Lock()
def acquire(self):
with self._lock:
now = time.monotonic()
elapsed = now - self._last
self._tokens = min(self.burst, self._tokens + elapsed * self.rate)
self._last = now
if self._tokens >= 1.0:
self._tokens -= 1.0
return
wait = (1.0 - self._tokens) / self.rate
time.sleep(wait)
with self._lock:
self._tokens = max(0.0, self._tokens - 1.0)
# ── Vault API client ──────────────────────────────────────────────────────────
class VaultClient:
def __init__(
self,
addr: str,
token: str,
tls_verify: bool = True,
ca_cert: Optional[str] = None, # path to CA bundle (on-prem internal PKI)
timeout: int = 15,
rate_limiter: Optional[TokenBucket] = None,
):
self.addr = addr.rstrip("/")
self.token = token
# tls_verify can be False (skip), True (system CAs), or a CA bundle path
self.tls_verify: Any = ca_cert if ca_cert else tls_verify
self.timeout = timeout
self.rate_limiter = rate_limiter or TokenBucket(rate=50)
self._call_count = 0
self._lock = threading.Lock()
# Use a session per thread via thread-local storage
self._tls = threading.local()
def _session(self) -> requests.Session:
if not hasattr(self._tls, "session"):
s = requests.Session()
s.headers.update({
"X-Vault-Token": self.token,
"Content-Type": "application/json",
})
s.verify = self.tls_verify
self._tls.session = s
return self._tls.session
def _request(
self,
method: str,
path: str,
namespace: str = "",
max_retries: int = 4,
**kwargs,
) -> Optional[Dict]:
url = f"{self.addr}/v1/{path.lstrip('/')}"
headers = {}
if namespace:
headers["X-Vault-Namespace"] = namespace.strip("/")
backoff = 1.0
for attempt in range(max_retries + 1):
self.rate_limiter.acquire()
with self._lock:
self._call_count += 1
try:
r = self._session().request(
method, url,
headers=headers,
timeout=self.timeout,
**kwargs,
)
except requests.exceptions.ConnectionError as exc:
log.warning("Connection error [%s %s]: %s", method, url, exc)
if attempt < max_retries:
time.sleep(backoff)
backoff = min(backoff * 2, 30)
continue
return {"__error__": "connection_error", "__detail__": str(exc)}
except requests.exceptions.Timeout:
log.warning("Timeout [%s %s]", method, url)
if attempt < max_retries:
time.sleep(backoff)
backoff = min(backoff * 2, 30)
continue
return {"__error__": "timeout"}
if r.status_code == 200:
try:
return r.json()
except ValueError:
return {}
if r.status_code in (204, 205):
return {}
if r.status_code == 404:
return None # not found — caller decides
if r.status_code == 403:
try:
detail = r.json().get("errors", ["permission denied"])
except Exception:
detail = ["permission denied"]
return {"__error__": "permission_denied", "__detail__": detail}
if r.status_code in (429, 500, 502, 503):
if attempt < max_retries:
jitter = 0.5 * backoff
time.sleep(backoff + jitter)
backoff = min(backoff * 2, 30)
continue
return {"__error__": f"http_{r.status_code}"}
# Other 4xx/5xx
try:
errs = r.json().get("errors", [str(r.status_code)])
except Exception:
errs = [str(r.status_code)]
return {"__error__": f"http_{r.status_code}", "__detail__": errs}
return {"__error__": "max_retries_exceeded"}
def get(self, path: str, ns: str = "") -> Optional[Dict]:
return self._request("GET", path, namespace=ns)
def list(self, path: str, ns: str = "") -> Optional[Dict]:
return self._request("LIST", path, namespace=ns)
def post(self, path: str, ns: str = "", data: Optional[Dict] = None) -> Optional[Dict]:
return self._request("POST", path, namespace=ns, json=data or {})
@property
def call_count(self) -> int:
return self._call_count
# ── Convenience helpers ──────────────────────────────────────────────
def health(self) -> Dict:
r = self.get("sys/health")
return r or {}
def token_lookup_self(self) -> Dict:
r = self.get("auth/token/lookup-self")
return (r or {}).get("data", {})
def list_namespaces(self, ns: str = "") -> Dict:
"""Returns key_info dict or empty dict."""
r = self.list("sys/namespaces", ns=ns)
if r and "data" in r:
return r["data"].get("key_info", {})
return {}
def list_mounts(self, ns: str = "") -> Dict:
r = self.get("sys/mounts", ns=ns)
if not r or "__error__" in r:
return {}
data = r.get("data", r)
return {k: v for k, v in data.items() if isinstance(v, dict) and "type" in v}
def kv_list(self, mount: str, path: str, ns: str = "") -> List[str]:
p = f"{mount}/{path}".rstrip("/")
r = self.list(p, ns=ns)
if r and "data" in r:
return r["data"].get("keys", [])
return []
def kv2_list_meta(self, mount: str, path: str, ns: str = "") -> List[str]:
p = f"{mount}/metadata/{path}".rstrip("/")
r = self.list(p, ns=ns)
if r and "data" in r:
return r["data"].get("keys", [])
return []
def kv2_get_meta(self, mount: str, path: str, ns: str = "") -> Optional[Dict]:
p = f"{mount}/metadata/{path}"
r = self.get(p, ns=ns)
if r and "__error__" in r:
return r
if r and "data" in r:
return r["data"]
return None
def list_entity_ids(self, ns: str = "") -> List[str]:
r = self.list("identity/entity/id", ns=ns)
if r and "data" in r:
return r["data"].get("keys", [])
return []
def get_entity(self, eid: str, ns: str = "") -> Optional[Dict]:
r = self.get(f"identity/entity/id/{eid}", ns=ns)
if r and "data" in r:
return r["data"]
return None
def list_group_ids(self, ns: str = "") -> List[str]:
r = self.list("identity/group/id", ns=ns)
if r and "data" in r:
return r["data"].get("keys", [])
return []
def get_group(self, gid: str, ns: str = "") -> Optional[Dict]:
r = self.get(f"identity/group/id/{gid}", ns=ns)
if r and "data" in r:
return r["data"]
return None
def list_token_accessors(self, ns: str = "") -> List[str]:
r = self.list("auth/token/accessors", ns=ns)
if r and "data" in r:
return r["data"].get("keys", [])
return []
def lookup_accessor(self, accessor: str, ns: str = "") -> Optional[Dict]:
r = self.post("auth/token/lookup-accessor", ns=ns, data={"accessor": accessor})
if r and "data" in r:
return r["data"]
return None
def list_auth_methods(self, ns: str = "") -> Dict:
r = self.get("sys/auth", ns=ns)
if not r or "__error__" in r:
return {}
data = r.get("data", r)
return {k: v for k, v in data.items() if isinstance(v, dict) and "type" in v}
def list_keys(self, path: str, ns: str = "") -> List[str]:
"""Generic LIST returning keys array."""
r = self.list(path, ns=ns)
if r and "data" in r:
return r["data"].get("keys", [])
return []
def get_data(self, path: str, ns: str = "") -> Optional[Dict]:
"""Generic GET returning data dict."""
r = self.get(path, ns=ns)
if r and "data" in r:
return r["data"]
return None
# ── Namespace collection ──────────────────────────────────────────────────────
def collect_namespaces(client: VaultClient, start_ns: str = "") -> List[NamespaceInfo]:
"""
BFS traversal of all namespaces from start_ns (default root).
Namespaces are a Vault Enterprise feature. On Vault OSS (Community Edition)
the sys/namespaces endpoint returns 403 or 404. This is detected and logged
as a warning; the script then continues scanning only the root namespace.
"""
result: List[NamespaceInfo] = []
queue = [start_ns]
# Probe once to detect Vault OSS before iterating
probe = client._request("LIST", "sys/namespaces", namespace=start_ns)
if probe and probe.get("__error__") == "permission_denied":
log.warning(
"sys/namespaces returned 403 — this is normal for Vault OSS (Community "
"Edition) which does not support namespaces. Scanning root namespace only.\n"
"If you are running Vault Enterprise and expected namespaces, check that "
"your token has 'list' capability on sys/namespaces."
)
return result # empty — caller will still scan root as ""
if probe is None:
log.warning(
"sys/namespaces returned 404 — namespace feature not available. "
"Scanning root namespace only."
)
return result
while queue:
parent = queue.pop(0)
ns_label = parent or "(root)"
log.debug("Listing namespaces under: %s", ns_label)
key_info = client.list_namespaces(ns=parent)
if not key_info:
continue
for child_key, child_data in key_info.items():
child_name = child_key.rstrip("/")
full_path = f"{parent.rstrip('/')}/{child_name}".lstrip("/") + "/"
result.append(NamespaceInfo(
path=full_path,
ns_id=(child_data or {}).get("id", ""),
custom_metadata=(child_data or {}).get("custom_metadata") or {},
))
queue.append(full_path)
return result
# ── Secret collection ─────────────────────────────────────────────────────────
def _list_kv1_recursive(
client: VaultClient, mount: str, path: str, ns: str, max_depth: int, depth: int = 0
) -> List[str]:
if depth > max_depth:
return []
keys = client.kv_list(mount, path, ns=ns)
secrets = []
for k in keys:
full = f"{path}/{k}".lstrip("/")
if k.endswith("/"):
secrets.extend(_list_kv1_recursive(client, mount, full, ns, max_depth, depth + 1))
else:
secrets.append(full)
return secrets
def _list_kv2_recursive(
client: VaultClient, mount: str, path: str, ns: str, max_depth: int, depth: int = 0
) -> List[str]:
if depth > max_depth:
return []
keys = client.kv2_list_meta(mount, path, ns=ns)
secrets = []
for k in keys:
full = f"{path}/{k}".lstrip("/")
if k.endswith("/"):
secrets.extend(_list_kv2_recursive(client, mount, full, ns, max_depth, depth + 1))
else:
secrets.append(full)
return secrets
SCANNED_ENGINE_TYPES = {"kv", "aws", "terraform", "kmip"}
def collect_secrets(
client: VaultClient,
all_namespaces: List[NamespaceInfo],
max_depth: int = 12,
workers: int = 10,
access_errors: List[Dict] = None,
) -> List[SecretRecord]:
"""Collect secrets/roles from KV v1, KV v2, AWS, Terraform, and KMIP engines."""
if access_errors is None:
access_errors = []
scan_jobs = []
namespaces_to_scan = [NamespaceInfo(path="")] + all_namespaces
for ns_info in namespaces_to_scan:
ns = ns_info.path.rstrip("/")
mounts = client.list_mounts(ns=ns)
for mount_raw, mount_info in mounts.items():
if not isinstance(mount_info, dict):
continue
mtype = mount_info.get("type", "")
if mtype not in SCANNED_ENGINE_TYPES:
continue
opts = mount_info.get("options") or {}
kv_ver = int(opts.get("version", "1")) if mtype == "kv" else 0
mount = mount_raw.rstrip("/")
scan_jobs.append((ns, mount, mtype, kv_ver, mount_info))
log.info("Found %d secret engine mount(s) to scan across %d namespace(s) "
"(types: kv, aws, terraform, kmip)",
len(scan_jobs), len(namespaces_to_scan))
all_secrets: List[SecretRecord] = []
lock = threading.Lock()
def scan_mount(args):
ns, mount, mtype, kv_ver, mount_info = args
ns_label = ns + "/" if ns else "(root)"
log.debug("Scanning mount: [%s] %s (%s)", ns_label, mount, mtype)
records = []
# ── KV v2 ───────────────────────────────────────────────────────
if mtype == "kv" and kv_ver == 2:
paths = _list_kv2_recursive(client, mount, "", ns, max_depth)
for p in paths:
rec = SecretRecord(
namespace_path=ns_label,
mount_path=mount,
secret_path=p,
engine_type="kv",
kv_version=2,
)
meta = client.kv2_get_meta(mount, p, ns=ns)
if meta and "__error__" in meta:
rec.metadata_error = str(meta.get("__detail__", meta["__error__"]))
with lock:
access_errors.append({
"namespace": ns_label, "path": f"{mount}/metadata/{p}",
"error": rec.metadata_error,
})
elif meta:
rec.created_time = meta.get("created_time")
rec.updated_time = meta.get("updated_time")
rec.current_version = meta.get("current_version")
rec.oldest_version = meta.get("oldest_version")
rec.max_versions = meta.get("max_versions")
rec.custom_metadata = meta.get("custom_metadata") or {}
for ver_num, ver_data in (meta.get("versions") or {}).items():
rec.versions.append(SecretVersionInfo(
version=int(ver_num),
created_time=ver_data.get("created_time"),
deletion_time=ver_data.get("deletion_time"),
destroyed=ver_data.get("destroyed", False),
))
rec.versions.sort(key=lambda v: v.version)
records.append(rec)
# ── KV v1 ───────────────────────────────────────────────────────
elif mtype == "kv":
paths = _list_kv1_recursive(client, mount, "", ns, max_depth)
for p in paths:
records.append(SecretRecord(
namespace_path=ns_label,
mount_path=mount,
secret_path=p,
engine_type="kv",
kv_version=1,
))
# ── AWS secrets engine ───────────────────────────────────────────
# Lists configured roles. In audit logs, credential access appears as
# GET /{mount}/creds/{role} or GET /{mount}/sts/{role}.
elif mtype == "aws":
roles = client.list_keys(f"{mount}/roles", ns=ns)
for role in roles:
rdata = client.get_data(f"{mount}/roles/{role}", ns=ns) or {}
records.append(SecretRecord(
namespace_path=ns_label,
mount_path=mount,
secret_path=role,
engine_type="aws",
engine_data={
"credential_type": rdata.get("credential_type", ""),
"role_arns": rdata.get("role_arns") or [],
"policy_arns": rdata.get("policy_arns") or [],
"iam_tags": rdata.get("iam_tags") or [],
"default_sts_ttl": rdata.get("default_sts_ttl"),
"max_sts_ttl": rdata.get("max_sts_ttl"),
"permissions_boundary_arn": rdata.get("permissions_boundary_arn"),
},
))
# ── Terraform Cloud secrets engine ───────────────────────────────
# Lists configured roles. Credential access: GET /{mount}/creds/{role}.
elif mtype == "terraform":
roles = client.list_keys(f"{mount}/role", ns=ns)
for role in roles:
rdata = client.get_data(f"{mount}/role/{role}", ns=ns) or {}
records.append(SecretRecord(
namespace_path=ns_label,
mount_path=mount,
secret_path=role,
engine_type="terraform",
engine_data={
"organization": rdata.get("organization", ""),
"team_id": rdata.get("team_id", ""),
"user_id": rdata.get("user_id", ""),
"ttl": rdata.get("ttl"),
"max_ttl": rdata.get("max_ttl"),
"token_account_type": rdata.get("token_account_type", ""),
},
))
# ── KMIP secrets engine ──────────────────────────────────────────
# Structure: scopes → roles. Credential access logged as KMIP operations.
elif mtype == "kmip":
scopes = client.list_keys(f"{mount}/scope", ns=ns)
for scope in scopes:
roles = client.list_keys(f"{mount}/scope/{scope}/role", ns=ns)
for role in roles:
rdata = client.get_data(f"{mount}/scope/{scope}/role/{role}", ns=ns) or {}
# Collect only the operations that are enabled
ops = {k: v for k, v in rdata.items()
if k.startswith("operation_") and v is True}
records.append(SecretRecord(
namespace_path=ns_label,
mount_path=mount,
secret_path=f"{scope}/{role}",
engine_type="kmip",
engine_data={
"scope": scope,
"role": role,
"operations": list(ops.keys()),
"tls_client_key_type": rdata.get("tls_client_key_type"),
"tls_client_key_bits": rdata.get("tls_client_key_bits"),
},
))
log.debug(" → %d item(s) in [%s] %s (%s)", len(records), ns_label, mount, mtype)
return records
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = {pool.submit(scan_mount, job): job for job in scan_jobs}
for future in as_completed(futures):
try:
all_secrets.extend(future.result())
except Exception as exc:
job = futures[future]
log.error("Error scanning mount %s: %s", job, exc)
return all_secrets
# ── Entity/user collection ────────────────────────────────────────────────────
def collect_entities(
client: VaultClient,
all_namespaces: List[NamespaceInfo],
workers: int = 10,
access_errors: List[Dict] = None,
) -> Tuple[List[EntityRecord], Dict[str, str]]:
"""
Collect all identity entities across all namespaces.
Returns (entities, group_name_map) where group_name_map is {group_id -> group_name}.
"""
if access_errors is None:
access_errors = []
namespaces_to_scan = [NamespaceInfo(path="")] + all_namespaces
all_entity_ids: List[Tuple[str, str]] = [] # (entity_id, ns)
group_name_map: Dict[str, str] = {}
for ns_info in namespaces_to_scan:
ns = ns_info.path.rstrip("/")
ns_label = ns + "/" if ns else "(root)"
eids = client.list_entity_ids(ns=ns)
log.debug("[%s] %d entity ID(s)", ns_label, len(eids))
all_entity_ids.extend((eid, ns) for eid in eids)
# Collect groups while we're here
gids = client.list_group_ids(ns=ns)
for gid in gids:
g = client.get_group(gid, ns=ns)
if g:
group_name_map[gid] = g.get("name", gid)
log.info("Total entity IDs to fetch: %d", len(all_entity_ids))
entities: List[EntityRecord] = []
seen: set = set()
lock = threading.Lock()
def fetch_entity(args):
eid, ns = args
if eid in seen:
return None
data = client.get_entity(eid, ns=ns)
if not data:
return None
ns_label = ns + "/" if ns else "(root)"
aliases = []
for a in data.get("aliases") or []:
aliases.append(EntityAliasRecord(
alias_id=a.get("id", ""),
name=a.get("name", ""),
mount_accessor=a.get("mount_accessor", ""),
mount_path=a.get("mount_path", ""),
mount_type=a.get("mount_type", ""),
custom_metadata=a.get("custom_metadata") or {},
creation_time=a.get("creation_time"),
last_update_time=a.get("last_update_time"),
))
return EntityRecord(
namespace_path=ns_label,
entity_id=eid,
name=data.get("name", ""),
disabled=data.get("disabled", False),
policies=data.get("policies") or [],
metadata=data.get("metadata") or {},
creation_time=data.get("creation_time"),
last_update_time=data.get("last_update_time"),
aliases=aliases,
group_ids=data.get("group_ids") or [],
)
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = {pool.submit(fetch_entity, args): args for args in all_entity_ids}
for future in as_completed(futures):
try:
rec = future.result()
if rec and rec.entity_id not in seen:
seen.add(rec.entity_id)
with lock:
entities.append(rec)
except Exception as exc:
log.error("Error fetching entity: %s", exc)
# Resolve group names
for e in entities:
e.groups = [group_name_map.get(gid, gid) for gid in e.group_ids]
return entities, group_name_map
def scan_token_accessors(
client: VaultClient,
entities: List[EntityRecord],
all_namespaces: List[NamespaceInfo],
max_accessors: int = 2000,
workers: int = 5,
) -> List[EntityRecord]:
"""
Scan token accessors across namespaces to find latest token per entity.
The most recent token creation_time is a proxy for "last active" when no
audit log is available.
Returns a list of orphan-token pseudo-entities (no identity entity).
"""
entity_by_id: Dict[str, EntityRecord] = {e.entity_id: e for e in entities}
orphan_entities: List[EntityRecord] = []
lock = threading.Lock()
namespaces_to_scan = [NamespaceInfo(path="")] + all_namespaces
for ns_info in namespaces_to_scan:
ns = ns_info.path.rstrip("/")
ns_label = ns + "/" if ns else "(root)"
accessors = client.list_token_accessors(ns=ns)
if not accessors:
continue
log.debug("[%s] %d token accessor(s)", ns_label, len(accessors))
accessors = accessors[:max_accessors]
def lookup(accessor, _ns=ns, _ns_label=ns_label):
td = client.lookup_accessor(accessor, ns=_ns)
if not td:
return
eid = td.get("entity_id", "")
def _epoch_to_iso(val) -> Optional[str]:
if not val:
return None
try:
return datetime.fromtimestamp(int(val), tz=timezone.utc).isoformat()
except Exception:
return str(val)
tr = TokenProxyRecord(
accessor=accessor,
display_name=td.get("display_name", ""),
auth_path=td.get("path", ""),
policies=td.get("policies") or [],
creation_time=_epoch_to_iso(td.get("creation_time")),
expire_time=td.get("expire_time"),
# last_renewal_time: updated whenever the token is renewed via
# auth/token/renew or auth/token/renew-self — key proxy for
# "was active recently without re-authenticating"
last_renewal_time=_epoch_to_iso(td.get("last_renewal_time")),
issue_time=td.get("issue_time"),
ttl=td.get("ttl") or 0,
meta=td.get("meta") or {},
token_type=td.get("type", ""),
renewable=td.get("renewable", False),
num_uses=td.get("num_uses", 0),
)
with lock:
if eid and eid in entity_by_id:
e = entity_by_id[eid]
# Keep the most recently created token
if (e.latest_token is None or
(tr.creation_time and e.latest_token.creation_time and
tr.creation_time > e.latest_token.creation_time)):
e.latest_token = tr
elif not eid:
orphan_entities.append(EntityRecord(
namespace_path=_ns_label,
entity_id="",
name=td.get("display_name", "(orphan token)"),
disabled=False,
policies=td.get("policies") or [],
metadata=td.get("meta") or {},
latest_token=tr,
))
with ThreadPoolExecutor(max_workers=workers) as pool:
list(pool.map(lookup, accessors))
return orphan_entities
# ── Auth method user discovery ───────────────────────────────────────────────
def collect_auth_method_users(
client: VaultClient,
all_namespaces: List[NamespaceInfo],
existing_entities: List[EntityRecord],
) -> List[EntityRecord]:
"""
Scan every auth method mount across all namespaces and list configured
users/roles that may not yet have an identity entity (because they never
logged in) or that carry extra info not visible in the identity API.
Supported backends:
userpass — lists usernames + token_policies, token_ttl, etc.
ldap — lists explicitly configured LDAP user overrides + groups
github — lists mapped GitHub usernames/teams
approle — lists role names
cert — lists certificate roles
oidc/jwt — lists roles
radius — lists configured user overrides
aws — lists roles
azure — lists roles
gcp — lists roles
kubernetes — lists roles
Returns a list of NEW EntityRecord objects for users not already in
existing_entities. Also enriches existing entities' auth_method_extra.
"""
# Build lookup by alias name -> entity for matching
alias_lookup: Dict[str, EntityRecord] = {}
for e in existing_entities:
for a in e.aliases:
alias_lookup[a.name.lower()] = e
new_entities: List[EntityRecord] = []
namespaces_to_scan = [NamespaceInfo(path="")] + all_namespaces
for ns_info in namespaces_to_scan:
ns = ns_info.path.rstrip("/")
ns_label = ns + "/" if ns else "(root)"
auth_mounts = client.list_auth_methods(ns=ns)
for mount_raw, mount_info in auth_mounts.items():
if not isinstance(mount_info, dict):
continue
mount = mount_raw.rstrip("/")
mtype = mount_info.get("type", "")
accessor = mount_info.get("accessor", "")
description = mount_info.get("description", "")
# ── userpass ────────────────────────────────────────────────
if mtype == "userpass":
usernames = client.list_keys(f"{mount}/users", ns=ns)
for username in usernames:
udata = client.get_data(f"{mount}/users/{username}", ns=ns) or {}
extra = {
"auth_mount": mount,
"auth_type": "userpass",
"token_policies": udata.get("token_policies") or [],
"token_ttl": udata.get("token_ttl"),
"token_max_ttl": udata.get("token_max_ttl"),
"token_bound_cidrs": udata.get("token_bound_cidrs") or [],
}
existing = alias_lookup.get(username.lower())
if existing:
existing.auth_method_extra.update(extra)
else:
new_entities.append(EntityRecord(
namespace_path=ns_label,
entity_id="",
name=username,
policies=udata.get("token_policies") or [],
auth_method_extra=extra,
))
# ── ldap ────────────────────────────────────────────────────
elif mtype == "ldap":
usernames = client.list_keys(f"{mount}/users", ns=ns)
for username in usernames:
udata = client.get_data(f"{mount}/users/{username}", ns=ns) or {}
extra = {
"auth_mount": mount,
"auth_type": "ldap",
"ldap_groups": udata.get("groups") or [],
"token_policies": udata.get("token_policies") or udata.get("policies") or [],
}
existing = alias_lookup.get(username.lower())
if existing:
existing.auth_method_extra.update(extra)
else:
new_entities.append(EntityRecord(
namespace_path=ns_label,
entity_id="",
name=username,
policies=extra["token_policies"],
auth_method_extra=extra,
))
# Also list LDAP groups configured in Vault
groups = client.list_keys(f"{mount}/groups", ns=ns)
for grp in groups:
gdata = client.get_data(f"{mount}/groups/{grp}", ns=ns) or {}
log.debug(" LDAP group [%s] %s: policies=%s", ns_label, grp,
gdata.get("token_policies") or gdata.get("policies"))
# ── github ──────────────────────────────────────────────────
elif mtype == "github":
gh_users = client.list_keys(f"{mount}/map/users", ns=ns)
for gh_user in gh_users:
udata = client.get_data(f"{mount}/map/users/{gh_user}", ns=ns) or {}
extra = {
"auth_mount": mount,
"auth_type": "github",
"token_policies": (udata.get("value") or "").split(","),
}
existing = alias_lookup.get(gh_user.lower())
if existing:
existing.auth_method_extra.update(extra)
else:
new_entities.append(EntityRecord(
namespace_path=ns_label,
entity_id="",
name=gh_user,
auth_method_extra=extra,
))
# ── approle / cert / oidc / jwt / radius / aws / azure / gcp / k8s ──
# For these, list role names — they don't map 1:1 to users, but
# it's still useful to know what roles/principals are configured.
elif mtype in ("approle", "cert", "oidc", "jwt", "radius",
"aws", "azure", "gcp", "kubernetes"):
role_path_map = {
"approle": f"{mount}/role",
"cert": f"{mount}/certs",
"oidc": f"{mount}/role",
"jwt": f"{mount}/role",
"radius": f"{mount}/users",
"aws": f"{mount}/role",
"azure": f"{mount}/role",
"gcp": f"{mount}/role",
"kubernetes": f"{mount}/role",
}
role_path = role_path_map.get(mtype, f"{mount}/role")
roles = client.list_keys(role_path, ns=ns)
for role in roles:
existing = alias_lookup.get(role.lower())
extra = {
"auth_mount": mount,
"auth_type": mtype,
"role_name": role,
}
if existing:
if "roles" not in existing.auth_method_extra:
existing.auth_method_extra["roles"] = []
existing.auth_method_extra["roles"].append(extra)
else:
new_entities.append(EntityRecord(
namespace_path=ns_label,
entity_id="",
name=f"{role} ({mtype})",
auth_method_extra=extra,
))
log.info("Auth method scan found %d additional principal(s) not in identity store",
len(new_entities))
return new_entities
# ── Audit log parser ──────────────────────────────────────────────────────────
class AuditIndex:
"""
Parses a Vault audit JSONL log and builds two indexes:
secret_access[ns_path|mount/secret_path] -> list of events (sorted desc by time)
entity_login[entity_id] -> list of login events (sorted desc by time)
"""
MAX_HISTORY = 20
def __init__(self):
# {path_key -> [event_dict, ...]} (latest first, max MAX_HISTORY)
self.secret_access: Dict[str, List[Dict]] = defaultdict(list)
# {entity_id -> [login_event_dict, ...]}
self.entity_login: Dict[str, List[Dict]] = defaultdict(list)
# {entity_id -> [any_activity_event_dict, ...]}
self.entity_activity: Dict[str, List[Dict]] = defaultdict(list)
self.entries_parsed = 0
self.parse_errors = 0
def parse_file(self, path: str) -> None:
log.info("Parsing audit log: %s", path)
count = 0
with open(path, "r", errors="replace") as fh:
for raw in fh:
raw = raw.strip()
if not raw:
continue
self.entries_parsed += 1
count += 1
if count % 100_000 == 0:
log.info(" ... %d log entries parsed", count)
try:
entry = json.loads(raw)
except json.JSONDecodeError:
self.parse_errors += 1
continue
self._process(entry)
@staticmethod
def _resolve_ip(req: Dict) -> str:
"""
Return the most accurate client IP from an audit log request object.
On on-prem deployments behind a load balancer or reverse proxy, Vault's
remote_address may contain the proxy IP rather than the real client IP.
Vault can be configured with `x_forwarded_for_authorized_addrs` in its
listener config to trust X-Forwarded-For headers from certain proxy CIDRs.
When configured, Vault already writes the real client IP into remote_address.
When NOT configured that way, the real IP may still be preserved in the
request headers that Vault logs (if header logging is enabled). This method
checks both locations and returns the most specific non-RFC1918 proxy IP it
can find, falling back to remote_address if nothing better is available.
"""
remote = req.get("remote_address", "")
# Vault can log request headers; X-Forwarded-For is the standard proxy header.
# Header logging must be enabled in the audit device config:
# vault audit enable file path=/var/log/vault/audit.log log_raw=true
# or set VAULT_AUDIT_LOG_REQUESTS_HEADERS env var.
headers = req.get("headers") or {}
xff = (
headers.get("X-Forwarded-For")
or headers.get("x-forwarded-for")
or headers.get("X-Real-IP")
or headers.get("x-real-ip")
or ""
)
if isinstance(xff, list):
xff = xff[0] if xff else ""
if xff:
# XFF is a comma-separated list; leftmost is the original client.
# Example: "203.0.113.5, 10.0.0.1, 10.0.0.2"
# Strip port if present (IPv4 with port, or [IPv6]:port).
client_ip = xff.split(",")[0].strip()
# Remove port suffix if present
if client_ip.startswith("["):
# IPv6 with port: [::1]:PORT
client_ip = client_ip.split("]")[0].lstrip("[")
elif client_ip.count(":") == 1:
# IPv4 with port: 1.2.3.4:PORT
client_ip = client_ip.split(":")[0]
if client_ip:
return client_ip
# Strip port from remote_address if present
if remote.startswith("["):
remote = remote.split("]")[0].lstrip("[")
elif remote.count(":") == 1:
remote = remote.split(":")[0]
return remote
def _process(self, e: Dict) -> None:
if e.get("type") != "response":
return # only process response entries (avoids double counting)
ts = e.get("time", "")
req = e.get("request") or {}
auth = e.get("auth") or {}
resp = e.get("response") or {}
req_path = req.get("path", "")
req_op = req.get("operation", "")
# Use _resolve_ip to get real client IP even behind a proxy
remote_ip = self._resolve_ip(req)
ns_path = (req.get("namespace") or {}).get("path", "")
mount_type = req.get("mount_type", "")
mount_path = req.get("mount_accessor", "") # not perfect, but available
entity_id = auth.get("entity_id", "")
display_name = auth.get("display_name", "")
token_policies = auth.get("token_policies") or []
# ── Secret access event ──
# Capture reads from KV, AWS, Terraform, KMIP and any other secret engine.
# We exclude system paths (sys/, auth/, identity/) to avoid false matches.
if (req_op in ("read", "create", "update", "delete", "list") and
mount_type not in ("", "token", "system", "identity") and
not req_path.startswith(("sys/", "auth/", "identity/"))):
key = f"{ns_path}|{req_path}"
event = {
"time": ts,
"operation": req_op,
"entity_id": entity_id,
"display_name": display_name,
"remote_ip": remote_ip,
"namespace": ns_path,
}
lst = self.secret_access[key]
lst.append(event)
# Keep sorted descending by time (latest first), cap at MAX_HISTORY
lst.sort(key=lambda x: x["time"], reverse=True)
if len(lst) > self.MAX_HISTORY:
del lst[self.MAX_HISTORY:]
# ── Login event ──
resp_auth = resp.get("auth") or {}
if resp_auth and entity_id and req_path.startswith("auth/"):
method_parts = req_path.split("/")
auth_method = method_parts[1] if len(method_parts) > 1 else req_path
event = {
"time": ts,
"auth_path": req_path,
"auth_method": auth_method,
"display_name": display_name,
"remote_ip": remote_ip,
"namespace": ns_path,
"policies": token_policies,
}
lst = self.entity_login[entity_id]
lst.append(event)
lst.sort(key=lambda x: x["time"], reverse=True)
if len(lst) > self.MAX_HISTORY:
del lst[self.MAX_HISTORY:]
# ── Any activity by entity (login + all subsequent API calls) ──
# This distinguishes "last login" from "last activity":
# a service may log in once and keep using its token for weeks.
if entity_id:
event = {
"time": ts,
"path": req_path,
"operation": req_op,
"remote_ip": remote_ip,
"namespace": ns_path,
"mount_type": mount_type,
"display_name": display_name,
}
lst = self.entity_activity[entity_id]
lst.append(event)
lst.sort(key=lambda x: x["time"], reverse=True)
if len(lst) > self.MAX_HISTORY:
del lst[self.MAX_HISTORY:]
def get_secret(self, ns_path: str, req_path: str) -> Optional[Dict]:
"""Return latest access event for a secret path, or None."""
ns = ns_path.rstrip("/")
key = f"{ns}|{req_path}"
events = self.secret_access.get(key)
if not events:
# Try without namespace prefix (root)
key2 = f"|{req_path}"
events = self.secret_access.get(key2)
return events[0] if events else None
def get_all_secret_events(self, ns_path: str, req_path: str) -> List[Dict]:
ns = ns_path.rstrip("/")
key = f"{ns}|{req_path}"
return self.secret_access.get(key) or self.secret_access.get(f"|{req_path}") or []
def get_login(self, entity_id: str) -> Optional[Dict]:
events = self.entity_login.get(entity_id)
return events[0] if events else None
def get_all_logins(self, entity_id: str) -> List[Dict]:
return self.entity_login.get(entity_id) or []
def count_secret_accesses(self, ns_path: str, req_path: str) -> int:
return len(self.get_all_secret_events(ns_path, req_path))
def get_activity(self, entity_id: str) -> Optional[Dict]:
"""Return the most recent activity event for an entity (any operation)."""
events = self.entity_activity.get(entity_id)
return events[0] if events else None
def get_all_activities(self, entity_id: str) -> List[Dict]:
return self.entity_activity.get(entity_id) or []
# ── Enrichment ────────────────────────────────────────────────────────────────
def enrich_secrets(secrets: List[SecretRecord], audit: AuditIndex) -> None:
for s in secrets:
ns_for_lookup = s.namespace_path.replace("(root)", "").rstrip("/")
# Build the audit log request.path that matches this secret/role.
if s.engine_type == "kv":
# KV v2 data reads go through the /data/ prefix; metadata reads through /metadata/
prefix = "data/" if s.kv_version == 2 else ""
audit_path = f"{s.mount_path}/{prefix}{s.secret_path}"
elif s.engine_type in ("aws", "terraform"):
# Dynamic credential generation: GET /{mount}/creds/{role}
# (role definition at /roles/ or /role/ is an admin read, less common in logs)
audit_path = f"{s.mount_path}/creds/{s.secret_path}"
elif s.engine_type == "kmip":
# KMIP scope/role operations: /{mount}/scope/{scope}/role/{role}
audit_path = f"{s.mount_path}/scope/{s.secret_path}"
else:
audit_path = f"{s.mount_path}/{s.secret_path}"
evt = audit.get_secret(ns_for_lookup, audit_path)
if evt:
s.last_accessed_time = evt["time"]
s.last_accessed_by_entity_id = evt.get("entity_id")
s.last_accessed_by_display_name = evt.get("display_name")
s.last_accessed_from_ip = evt.get("remote_ip")
s.last_accessed_operation = evt.get("operation")
s.access_history = audit.get_all_secret_events(ns_for_lookup, audit_path)
s.access_count = len(s.access_history)
def enrich_entities(entities: List[EntityRecord], audit: AuditIndex) -> None:
for e in entities:
if not e.entity_id:
continue
# Last login (authentication event)
login_evt = audit.get_login(e.entity_id)
if login_evt:
e.last_login_time = login_evt["time"]
e.last_login_from_ip = login_evt.get("remote_ip")
e.last_login_auth_method = login_evt.get("auth_method")
e.last_login_auth_path = login_evt.get("auth_path")
e.last_login_namespace = login_evt.get("namespace")
e.login_history = audit.get_all_logins(e.entity_id)
e.login_count = len(e.login_history)
# Last activity (any API call made while authenticated as this entity)
# This is different from last login: a service may authenticate once and
# then continue making API calls for weeks without re-authenticating.
act_evt = audit.get_activity(e.entity_id)
if act_evt:
e.last_activity_time = act_evt["time"]
e.last_activity_path = act_evt.get("path")
e.last_activity_ip = act_evt.get("remote_ip")
e.last_activity_operation = act_evt.get("operation")
e.last_activity_mount_type = act_evt.get("mount_type")
e.activity_count = len(audit.get_all_activities(e.entity_id))
# ── Console output ─────────────────────────────────────────────────────────────
def _fmt_ts(ts: Optional[str]) -> str:
if not ts:
return "N/A"
return str(ts)[:19].replace("T", " ")
def _trunc(s: str, n: int) -> str:
return s[:n - 1] + "…" if len(s) > n else s
def _engine_label(s: "SecretRecord") -> str:
"""Short display label for the engine type of a secret record."""
if s.engine_type == "kv":
return f"kv_v{s.kv_version}"
return s.engine_type
def print_user_summary_table(entities: List["EntityRecord"]) -> None:
"""Print a compact one-row-per-user summary table."""
W = 150
print(f"\n{'─'*W}")
print(f" USER ACTIVITY SUMMARY ({len(entities)} total)")
print(f"{'─'*W}")
hdr = (
f" {'NAME':<28} {'NAMESPACE':<22} {'AUTH TYPE':<14}"
f" {'LAST LOGIN':<20} {'LAST ACTIVITY':<20} {'IP (login)':<18}"
f" {'STATUS':<9} {'LOGINS'}"
)
print(hdr)
print(f" {'─'*145}")
# Sort: active first, then by last activity desc, then by name
def _sort_key(e):
act = e.last_activity_time or e.last_login_time or ""
if e.latest_token:
act = act or e.latest_token.last_renewal_time or e.latest_token.creation_time or ""
return (e.disabled, -(len(act))) # disabled last, most recent first
for e in sorted(entities, key=_sort_key):
name = _trunc(e.name or "(unnamed)", 26)
ns = _trunc(e.namespace_path, 20)
auth_types = ", ".join(dict.fromkeys(
a.mount_type for a in e.aliases if a.mount_type
)) or ("-" if not e.auth_method_extra else e.auth_method_extra.get("auth_type", "-"))
auth_types = _trunc(auth_types, 12)
# Best login timestamp
if e.last_login_time:
last_login = _fmt_ts(e.last_login_time)
elif e.latest_token:
last_login = f"~{_fmt_ts(e.latest_token.creation_time)} (tkn)"
else:
last_login = "unknown"
# Best activity timestamp
if e.last_activity_time:
last_act = _fmt_ts(e.last_activity_time)
elif e.latest_token:
best = e.latest_token.last_renewal_time or e.latest_token.creation_time
last_act = f"~{_fmt_ts(best)} (tkn)" if best else "unknown"
else:
last_act = "unknown"
ip = _trunc(e.last_login_from_ip or e.last_activity_ip or "-", 16)
status = "DISABLED" if e.disabled else "active"
logins = str(e.login_count) if e.login_count else "-"
print(
f" {name:<28} {ns:<22} {auth_types:<14}"
f" {last_login:<20} {last_act:<20} {ip:<18}"
f" {status:<9} {logins}"
)
def print_report_plain(report: VaultAuditReport) -> None:
"""Fallback plain-text report."""
W = 100
sep = "═" * W
print(f"\n{sep}")
print(f" HCP VAULT AUDIT REPORT")
print(f" Generated : {report.generated_at}")
print(f" Cluster : {report.vault_addr} (version {report.vault_version}, {report.cluster_name})")
print(f" Namespaces: {len(report.namespaces)} | Secrets: {len(report.secrets)} | Entities: {len(report.entities)}")
audit_note = f"{report.audit_log_entries_parsed:,} entries from {report.audit_log_path}" if report.audit_log_path else "NOT PROVIDED (no last-access data)"
print(f" Audit log : {audit_note}")
print(sep)
# ── Namespace tree ────────────────────────────────────────────────────
print(f"\n{'─'*W}")
print(" NAMESPACE TREE")
print(f"{'─'*W}")
print(" (root)")
for ns in sorted(report.namespaces, key=lambda x: x.path):
indent = " " + " " * (ns.path.count("/") - 1)
print(f"{indent}└─ {ns.path}")
# ── Secrets ───────────────────────────────────────────────────────────
print(f"\n{'─'*W}")
print(f" SECRETS ({len(report.secrets)} total)")
print(f"{'─'*W}")
by_ns: Dict[str, List[SecretRecord]] = defaultdict(list)
for s in report.secrets:
by_ns[s.namespace_path].append(s)
for ns_label in sorted(by_ns):
items = by_ns[ns_label]
print(f"\n Namespace: {ns_label} ({len(items)} secrets)")
hdr = f" {'MOUNT':<22} {'PATH':<40} {'ENGINE':<8} {'CREATED':<20} {'UPDATED':<20} {'VER':<4} {'LAST READ':<20} {'BY':<25} {'IP'}"
print(hdr)
print(f" {'─'*99}")
for s in sorted(items, key=lambda x: (x.mount_path, x.secret_path)):
destroyed = ""
if s.versions:
latest = max(s.versions, key=lambda v: v.version)
if latest.destroyed:
destroyed = " [DESTROYED]"
eng = _engine_label(s)
print(
f" {_trunc(s.mount_path,20):<22}"
f" {_trunc(s.secret_path,38):<40}"
f" {eng:<8}"
f" {_fmt_ts(s.created_time):<20}"
f" {_fmt_ts(s.updated_time):<20}"
f" {str(s.current_version or '-'):<4}"
f" {_fmt_ts(s.last_accessed_time):<20}"
f" {_trunc(s.last_accessed_by_display_name or '-',23):<25}"
f" {s.last_accessed_from_ip or '-'}"
f"{destroyed}"
)
if s.custom_metadata:
print(f" ↳ kv metadata : {s.custom_metadata}")
if s.engine_data:
# Show key fields per engine type, skip empty values
ed = {k: v for k, v in s.engine_data.items() if v not in (None, "", [], {})}
if ed:
print(f" ↳ engine config: {ed}")
if s.metadata_error:
print(f" ↳ [ERROR] {s.metadata_error}")
# ── User summary table ────────────────────────────────────────────────
if report.entities:
print_user_summary_table(report.entities)
# ── Detailed entity blocks ─────────────────────────────────────────────
print(f"\n{'─'*W}")
print(f" USERS / ENTITIES — DETAIL ({len(report.entities)} total)")
print(f"{'─'*W}")
for e in sorted(report.entities, key=lambda x: (x.namespace_path, x.name)):
status = " [DISABLED]" if e.disabled else ""
print(f"\n {e.name or '(unnamed)'}{status} [ns: {e.namespace_path}]")
print(f" Entity ID : {e.entity_id or '(none)'}")
print(f" Created : {_fmt_ts(e.creation_time)}")
print(f" Last updated : {_fmt_ts(e.last_update_time)}")
print(f" Policies : {', '.join(e.policies) or '-'}")
print(f" Groups : {', '.join(e.groups) or '-'}")
if e.metadata:
print(f" Metadata : {e.metadata}")
if e.aliases:
print(f" Auth aliases ({len(e.aliases)}):")
for a in e.aliases:
print(f" • {a.name} [{a.mount_type} @ {a.mount_path or a.mount_accessor}]"
f" created={_fmt_ts(a.creation_time)}")
if e.auth_method_extra:
print(f" Auth config : {e.auth_method_extra}")
# ── Last login (authentication) ──────────────────────────────────
if e.last_login_time:
print(f" Last login : {_fmt_ts(e.last_login_time)}")
print(f" Login IP : {e.last_login_from_ip or '-'}")
print(f" Auth method : {e.last_login_auth_method or '-'} ({e.last_login_auth_path or '-'})")
print(f" Total logins : {e.login_count}")
elif e.latest_token:
t = e.latest_token
print(f" Last login : ~{_fmt_ts(t.creation_time)} (token creation proxy)")
print(f" Login IP : -")
print(f" Auth path : {t.auth_path or '-'}")
else:
print(" Last login : unknown (no audit log / no token scan)")
# ── Last activity (any API call) ──────────────────────────────────
if e.last_activity_time:
print(f" Last activity : {_fmt_ts(e.last_activity_time)}")
print(f" Activity IP : {e.last_activity_ip or '-'}")
print(f" Activity op : {e.last_activity_operation or '-'} → {e.last_activity_path or '-'}")
print(f" Total ops : {e.activity_count}")
elif e.latest_token and e.latest_token.last_renewal_time:
t = e.latest_token
print(f" Last activity : ~{_fmt_ts(t.last_renewal_time)} (token renewal proxy)")
elif e.latest_token:
t = e.latest_token
tok_best = t.last_renewal_time or t.creation_time
print(f" Last activity : ~{_fmt_ts(tok_best)} (token creation/renewal proxy)")
print(f" Token expires : {_fmt_ts(t.expire_time)} renewable={t.renewable}")
else:
print(" Last activity : unknown (no audit log / no token scan)")
# ── Active tokens summary ────────────────────────────────────────
if e.latest_token:
t = e.latest_token
renewal_note = f" last-renewed={_fmt_ts(t.last_renewal_time)}" if t.last_renewal_time else ""
print(f" Latest token : type={t.token_type} created={_fmt_ts(t.creation_time)}"
f"{renewal_note} expires={_fmt_ts(t.expire_time)}"
f" ttl={t.ttl}s renewable={t.renewable}")
# ── Errors ────────────────────────────────────────────────────────────
if report.access_errors:
print(f"\n{'─'*W}")
print(f" PERMISSION ERRORS ({len(report.access_errors)} path(s) skipped)")
print(f"{'─'*W}")
for err in report.access_errors[:50]:
print(f" [{err.get('namespace','-')}] {err.get('path','-')} → {err.get('error','-')}")
if len(report.access_errors) > 50:
print(f" ... and {len(report.access_errors)-50} more (see JSON output)")
# ── Stats ──────────────────────────────────────────────────────────────
print(f"\n{'─'*W}")
print(" SUMMARY STATISTICS")
print(f"{'─'*W}")
stats = report.stats
print(f" API calls made : {stats.get('api_calls', '-')}")
print(f" Elapsed time : {stats.get('elapsed_s', '-'):.1f}s")
no_access = sum(1 for s in report.secrets if not s.last_accessed_time)
print(f" Secrets without access: {no_access} / {len(report.secrets)}")
no_login = sum(1 for e in report.entities if not e.last_login_time and not e.latest_token)
no_activity = sum(1 for e in report.entities
if not e.last_activity_time
and not (e.latest_token and (e.latest_token.last_renewal_time
or e.latest_token.creation_time)))
print(f" Entities no login data: {no_login} / {len(report.entities)}")
print(f" Entities no activity : {no_activity} / {len(report.entities)}")
print()
def print_report_rich(report: VaultAuditReport) -> None:
"""Rich-formatted report (tables, colors)."""
console = RichConsole()
console.rule("[bold cyan]HCP Vault Audit Report[/bold cyan]")
console.print(f"Generated : [dim]{report.generated_at}[/dim]")
console.print(f"Cluster : [bold]{report.vault_addr}[/bold] v{report.vault_version} {report.cluster_name}")
ns_count = len(report.namespaces)
audit_note = (f"[green]{report.audit_log_entries_parsed:,} entries[/green] from {report.audit_log_path}"
if report.audit_log_path
else "[yellow]NOT PROVIDED[/yellow] — no last-access timestamps")
console.print(f"Audit log : {audit_note}")
console.print()
# ── Namespaces ────────────────────────────────────────────────────────
ns_table = RichTable(title=f"Namespaces ({ns_count + 1})", box=rich_box.SIMPLE)
ns_table.add_column("Path")
ns_table.add_column("ID")
ns_table.add_row("[dim](root)[/dim]", "")
for ns in sorted(report.namespaces, key=lambda x: x.path):
ns_table.add_row(ns.path, ns.ns_id)
console.print(ns_table)
# ── Secrets ───────────────────────────────────────────────────────────
s_table = RichTable(title=f"Secrets ({len(report.secrets)})", box=rich_box.SIMPLE, show_lines=False)
s_table.add_column("Namespace", style="dim")
s_table.add_column("Mount")
s_table.add_column("Secret Path")
s_table.add_column("Engine")
s_table.add_column("Created")
s_table.add_column("Updated")
s_table.add_column("Last Read / Cred Gen", style="green")
s_table.add_column("By")
s_table.add_column("From IP")
s_table.add_column("# Hits")
for s in sorted(report.secrets, key=lambda x: (x.namespace_path, x.mount_path, x.secret_path)):
last_read = _fmt_ts(s.last_accessed_time) if s.last_accessed_time else "[dim]unknown[/dim]"
read_by = s.last_accessed_by_display_name or "[dim]-[/dim]"
from_ip = s.last_accessed_from_ip or "[dim]-[/dim]"
latest_v = max(s.versions, key=lambda v: v.version) if s.versions else None
path_str = s.secret_path
if latest_v and latest_v.destroyed:
path_str = f"[red]{path_str} ✗DESTROYED[/red]"
# Engine config summary for non-KV types
eng_note = ""
if s.engine_type == "aws":
eng_note = s.engine_data.get("credential_type", "")
elif s.engine_type == "terraform":
eng_note = s.engine_data.get("organization", "")
elif s.engine_type == "kmip":
ops = s.engine_data.get("operations", [])
eng_note = ",".join(o.replace("operation_", "") for o in ops[:3])
if len(ops) > 3:
eng_note += f"+{len(ops)-3}"
s_table.add_row(
s.namespace_path,
s.mount_path,
path_str,
_engine_label(s) + (f"\n[dim]{eng_note}[/dim]" if eng_note else ""),
_fmt_ts(s.created_time),
_fmt_ts(s.updated_time),
last_read,
read_by,
from_ip,
str(s.access_count) if s.access_count else "[dim]0[/dim]",
)
console.print(s_table)
# ── User summary table (rich) ──────────────────────────────────────────
u_sum = RichTable(
title=f"User Activity Summary ({len(report.entities)} total)",
box=rich_box.SIMPLE, show_lines=False,
)
u_sum.add_column("Name")
u_sum.add_column("Namespace", style="dim")
u_sum.add_column("Auth Type")
u_sum.add_column("Last Login", style="green")
u_sum.add_column("Last Activity", style="cyan")
u_sum.add_column("Login IP")
u_sum.add_column("Status")
u_sum.add_column("Logins")
def _usort(e):
act = e.last_activity_time or e.last_login_time or ""
if e.latest_token:
act = act or e.latest_token.last_renewal_time or e.latest_token.creation_time or ""
return (e.disabled, -(len(act)))
for e in sorted(report.entities, key=_usort):
auth_types = ", ".join(dict.fromkeys(
a.mount_type for a in e.aliases if a.mount_type
)) or e.auth_method_extra.get("auth_type", "[dim]-[/dim]")
if e.last_login_time:
ll = _fmt_ts(e.last_login_time)
elif e.latest_token:
ll = f"[dim]~{_fmt_ts(e.latest_token.creation_time)} (tkn)[/dim]"
else:
ll = "[dim]unknown[/dim]"
if e.last_activity_time:
la = _fmt_ts(e.last_activity_time)
elif e.latest_token:
best = e.latest_token.last_renewal_time or e.latest_token.creation_time
la = f"[dim]~{_fmt_ts(best)} (tkn)[/dim]" if best else "[dim]unknown[/dim]"
else:
la = "[dim]unknown[/dim]"
ip = e.last_login_from_ip or e.last_activity_ip or "[dim]-[/dim]"
status = "[red]DISABLED[/red]" if e.disabled else "[green]active[/green]"
logins = str(e.login_count) if e.login_count else "[dim]-[/dim]"
u_sum.add_row(
e.name or "[dim](unnamed)[/dim]",
e.namespace_path,
auth_types or "[dim]-[/dim]",
ll, la, ip, status, logins,
)
console.print(u_sum)
# ── Entities ──────────────────────────────────────────────────────────
e_table = RichTable(
title=f"Users / Entities ({len(report.entities)})",
box=rich_box.SIMPLE, show_lines=True,
)
e_table.add_column("Namespace", style="dim")
e_table.add_column("Name")
e_table.add_column("Status")
e_table.add_column("Auth Aliases")
e_table.add_column("Policies / Groups")
e_table.add_column("Last Login", style="green")
e_table.add_column("Login IP")
e_table.add_column("Auth Method")
e_table.add_column("# Logins")
e_table.add_column("Last Activity", style="cyan")
e_table.add_column("Activity IP")
e_table.add_column("Activity Op → Path")
e_table.add_column("# Ops")
for e in sorted(report.entities, key=lambda x: (x.namespace_path, x.name)):
status = "[red]DISABLED[/red]" if e.disabled else "[green]active[/green]"
alias_str = "\n".join(
f"{a.name} ({a.mount_type})" for a in e.aliases
) or "[dim]-[/dim]"
pol_grp = ", ".join(e.policies)
if e.groups:
pol_grp += ("\n" if pol_grp else "") + "grp: " + ", ".join(e.groups)
pol_grp = pol_grp or "[dim]-[/dim]"
# Last login
if e.last_login_time:
last_login = _fmt_ts(e.last_login_time)
login_ip = e.last_login_from_ip or "-"
auth_method = e.last_login_auth_method or "-"
login_count = str(e.login_count)
elif e.latest_token:
t = e.latest_token
last_login = f"[dim]~{_fmt_ts(t.creation_time)} (token)[/dim]"
login_ip = "[dim]-[/dim]"
auth_method = f"[dim]{t.auth_path or '-'}[/dim]"
login_count = "[dim]-[/dim]"
else:
last_login = "[dim]unknown[/dim]"
login_ip = "[dim]-[/dim]"
auth_method = "[dim]-[/dim]"
login_count = "[dim]-[/dim]"
# Last activity
if e.last_activity_time:
last_act = _fmt_ts(e.last_activity_time)
act_ip = e.last_activity_ip or "-"
act_op = f"{e.last_activity_operation or '-'} → {_trunc(e.last_activity_path or '-', 40)}"
act_count = str(e.activity_count)
elif e.latest_token:
t = e.latest_token
best = t.last_renewal_time or t.creation_time
renewal_note = "(renewal)" if t.last_renewal_time else "(token)"
last_act = f"[dim]~{_fmt_ts(best)} {renewal_note}[/dim]"
act_ip = "[dim]-[/dim]"
act_op = f"[dim]ttl={t.ttl}s renewable={t.renewable}[/dim]"
act_count = "[dim]-[/dim]"
else:
last_act = "[dim]unknown[/dim]"
act_ip = "[dim]-[/dim]"
act_op = "[dim]-[/dim]"
act_count = "[dim]-[/dim]"
e_table.add_row(
e.namespace_path,
e.name or "[dim](unnamed)[/dim]",
status,
alias_str,
pol_grp,
last_login,
login_ip,
auth_method,
login_count,
last_act,
act_ip,
act_op,
act_count,
)
console.print(e_table)
# ── Errors ────────────────────────────────────────────────────────────
if report.access_errors:
console.print(f"\n[bold red]Permission Errors[/bold red] ({len(report.access_errors)} paths skipped)")
for err in report.access_errors[:30]:
console.print(f" [dim][{err.get('namespace','')}][/dim] {err.get('path','')} → [red]{err.get('error','')}[/red]")
# ── Stats ──────────────────────────────────────────────────────────────
stats = report.stats
no_access = sum(1 for s in report.secrets if not s.last_accessed_time)
no_login = sum(1 for e in report.entities if not e.last_login_time and not e.latest_token)
no_activity = sum(1 for e in report.entities
if not e.last_activity_time
and not (e.latest_token and (e.latest_token.last_renewal_time
or e.latest_token.creation_time)))
console.print(
f"\n[bold]Stats:[/bold] API calls={stats.get('api_calls', '-')} "
f"elapsed={stats.get('elapsed_s', 0):.1f}s "
f"secrets_without_access={no_access}/{len(report.secrets)} "
f"entities_no_login={no_login}/{len(report.entities)} "
f"entities_no_activity={no_activity}/{len(report.entities)}"
)
# ── Export functions ──────────────────────────────────────────────────────────
def _serialize(obj: Any) -> Any:
if isinstance(obj, list):
return [_serialize(i) for i in obj]
if isinstance(obj, dict):
return {k: _serialize(v) for k, v in obj.items()}
return obj
def export_json(report: VaultAuditReport, path: Path) -> None:
def _default(o):
if hasattr(o, "__dataclass_fields__"):
return asdict(o)
return str(o)
data = {
"meta": {
"generated_at": report.generated_at,
"vault_addr": report.vault_addr,
"vault_version": report.vault_version,
"cluster_name": report.cluster_name,
"audit_log_path": report.audit_log_path,
"audit_log_entries_parsed": report.audit_log_entries_parsed,
"stats": report.stats,
"access_errors": report.access_errors,
},
"namespaces": [asdict(n) for n in report.namespaces],
"secrets": [asdict(s) for s in report.secrets],
"entities": [asdict(e) for e in report.entities],
}
path.write_text(json.dumps(data, indent=2, default=_default))
log.info("JSON report saved: %s", path)
def export_secrets_csv(secrets: List[SecretRecord], path: Path) -> None:
fields = [
"namespace_path", "mount_path", "secret_path", "full_path",
"engine_type", "kv_version",
"created_time", "updated_time",
"current_version", "oldest_version", "max_versions", "total_versions",
"custom_metadata", "engine_data",
"last_accessed_time", "access_count",
"last_accessed_by_entity_id", "last_accessed_by_display_name",
"last_accessed_from_ip", "last_accessed_operation",
"metadata_error",
]
with path.open("w", newline="", encoding="utf-8") as fh:
w = csv.DictWriter(fh, fieldnames=fields, extrasaction="ignore")
w.writeheader()
for s in secrets:
row = asdict(s)
row["full_path"] = s.full_path
row["total_versions"] = len(s.versions)
row["custom_metadata"] = json.dumps(s.custom_metadata)
row["engine_data"] = json.dumps(s.engine_data)
for key in ("versions", "access_history"):
row.pop(key, None)
w.writerow(row)
log.info("Secrets CSV saved: %s", path)
def export_entities_csv(entities: List[EntityRecord], path: Path) -> None:
fields = [
"namespace_path", "entity_id", "name", "disabled",
"policies", "groups", "metadata",
"creation_time", "last_update_time",
"aliases_summary", "alias_count",
# Token proxy columns
"latest_token_created", "latest_token_last_renewal",
"latest_token_auth_path", "latest_token_expires",
"latest_token_ttl", "latest_token_renewable",
# Last login (authentication event)
"last_login_time", "last_login_from_ip",
"last_login_auth_method", "last_login_auth_path", "last_login_namespace",
"login_count",
# Last activity (any API call made with the token)
"last_activity_time", "last_activity_ip",
"last_activity_operation", "last_activity_path", "last_activity_mount_type",
"activity_count",
# Best estimate for "last seen" (most recent of login/activity/renewal)
"last_seen_time",
# Auth method config
"auth_method_extra",
]
with path.open("w", newline="", encoding="utf-8") as fh:
w = csv.DictWriter(fh, fieldnames=fields, extrasaction="ignore")
w.writeheader()
for e in entities:
aliases_summary = "; ".join(
f"{a.name} ({a.mount_type}@{a.mount_path or a.mount_accessor})"
for a in e.aliases
)
tok = e.latest_token
# Best-effort "last seen" = most recent of all known timestamps
candidates = [
t for t in [
e.last_activity_time,
e.last_login_time,
tok.last_renewal_time if tok else None,
tok.creation_time if tok else None,
] if t
]
last_seen = max(candidates) if candidates else ""
row = {
"namespace_path": e.namespace_path,
"entity_id": e.entity_id,
"name": e.name,
"disabled": e.disabled,
"policies": "; ".join(e.policies),
"groups": "; ".join(e.groups),
"metadata": json.dumps(e.metadata),
"creation_time": e.creation_time or "",
"last_update_time": e.last_update_time or "",
"aliases_summary": aliases_summary,
"alias_count": len(e.aliases),
"latest_token_created": tok.creation_time if tok else "",
"latest_token_last_renewal": tok.last_renewal_time if tok else "",
"latest_token_auth_path": tok.auth_path if tok else "",
"latest_token_expires": tok.expire_time if tok else "",
"latest_token_ttl": tok.ttl if tok else "",
"latest_token_renewable": tok.renewable if tok else "",
"last_login_time": e.last_login_time or "",
"last_login_from_ip": e.last_login_from_ip or "",
"last_login_auth_method": e.last_login_auth_method or "",
"last_login_auth_path": e.last_login_auth_path or "",
"last_login_namespace": e.last_login_namespace or "",
"login_count": e.login_count,
"last_activity_time": e.last_activity_time or "",
"last_activity_ip": e.last_activity_ip or "",
"last_activity_operation": e.last_activity_operation or "",
"last_activity_path": e.last_activity_path or "",
"last_activity_mount_type": e.last_activity_mount_type or "",
"activity_count": e.activity_count,
"last_seen_time": last_seen,
"auth_method_extra": json.dumps(e.auth_method_extra),
}
w.writerow(row)
log.info("Entities CSV saved: %s", path)
# ── CLI / Main ────────────────────────────────────────────────────────────────
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
prog="vault_audit.py",
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
grp_conn = p.add_argument_group("Connection")
grp_conn.add_argument("--addr", metavar="URL",
help="Vault address (default: $VAULT_ADDR)")
grp_conn.add_argument("--token", metavar="TOKEN",
help="Vault token (default: $VAULT_TOKEN)")
grp_conn.add_argument("--no-tls-verify", action="store_true",
help="Disable TLS certificate verification")
grp_conn.add_argument("--ca-cert", metavar="PATH",
help="Path to CA certificate bundle for on-prem TLS "
"(default: $VAULT_CACERT)")
grp_conn.add_argument("--timeout", type=int, default=15, metavar="SEC",
help="Per-request timeout in seconds (default: 15)")
grp_conn.add_argument("--namespace", metavar="NS", default="",
help="Start namespace scan from this namespace (default: root)")
grp_scope = p.add_argument_group("Scope")
grp_scope.add_argument("--no-secrets", action="store_true",
help="Skip secret collection")
grp_scope.add_argument("--no-users", action="store_true",
help="Skip user/entity collection")
grp_scope.add_argument("--no-token-scan", action="store_true",
help="Skip token accessor scan (faster, but loses last-login proxy data)")
grp_scope.add_argument("--no-auth-method-scan", action="store_true",
help="Skip scanning auth method mounts for configured users/roles")
grp_scope.add_argument("--max-accessors", type=int, default=2000, metavar="N",
help="Max token accessors to look up per namespace (default: 2000)")
grp_scope.add_argument("--max-depth", type=int, default=12, metavar="N",
help="Max KV directory depth to recurse (default: 12)")
grp_audit = p.add_argument_group("Audit Log")
grp_audit.add_argument("--audit-log", metavar="PATH",
help="Path to Vault audit JSONL log file for last-access data")
grp_out = p.add_argument_group("Output")
grp_out.add_argument("--output-dir", metavar="DIR", default="./vault_audit_output",
help="Directory for output files (default: ./vault_audit_output)")
grp_out.add_argument("--no-save", action="store_true",
help="Print to console only, do not save files")
grp_out.add_argument("--no-color", action="store_true",
help="Disable rich/color output")
grp_perf = p.add_argument_group("Performance")
grp_perf.add_argument("--workers", type=int, default=10, metavar="N",
help="Thread pool size for parallel API calls (default: 10)")
grp_perf.add_argument("--rate-limit", type=float, default=50.0, metavar="RPS",
help="Max API requests per second (default: 50)")
grp_debug = p.add_argument_group("Debug")
grp_debug.add_argument("-v", "--verbose", action="store_true",
help="Enable verbose/debug logging")
return p.parse_args()
def main() -> None:
args = parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s %(levelname)-7s %(message)s",
datefmt="%H:%M:%S",
)
# ── Credentials ──────────────────────────────────────────────────────
vault_addr = args.addr or os.environ.get("VAULT_ADDR", "").rstrip("/")
vault_token = args.token or os.environ.get("VAULT_TOKEN", "")
if not vault_addr:
log.error("VAULT_ADDR is not set. Use --addr or export VAULT_ADDR=https://...")
sys.exit(1)
if not vault_token:
log.error("VAULT_TOKEN is not set. Use --token or export VAULT_TOKEN=hvs....")
sys.exit(1)
tls_verify = not args.no_tls_verify
ca_cert = args.ca_cert or os.environ.get("VAULT_CACERT", "") or None
if ca_cert:
log.info("TLS CA cert: %s", ca_cert)
elif not tls_verify:
log.warning("TLS verification disabled — do not use in production")
log.info("Connecting to: %s (TLS verify=%s)", vault_addr, tls_verify)
limiter = TokenBucket(rate=args.rate_limit, burst=args.rate_limit)
client = VaultClient(vault_addr, vault_token, tls_verify=tls_verify,
ca_cert=ca_cert, timeout=args.timeout, rate_limiter=limiter)
# ── Health check ──────────────────────────────────────────────────────
t0 = time.monotonic()
health = client.health()
if not health or health.get("sealed"):
log.error("Cannot connect to Vault or Vault is sealed. Check VAULT_ADDR.")
if health.get("sealed"):
log.error("Vault reports sealed=true. Unseal before running this script.")
sys.exit(1)
vault_version = health.get("version", "unknown")
cluster_name = health.get("cluster_name", "unknown")
log.info("Vault version: %s cluster: %s", vault_version, cluster_name)
self_info = client.token_lookup_self()
if self_info:
log.info(
"Token owner: %s | policies: %s | ttl: %ss",
self_info.get("display_name", "?"),
", ".join(self_info.get("policies", [])),
self_info.get("ttl", "?"),
)
access_errors: List[Dict] = []
report = VaultAuditReport(
generated_at=datetime.now(tz=timezone.utc).isoformat(),
vault_addr=vault_addr,
vault_version=vault_version,
cluster_name=cluster_name,
access_errors=access_errors,
)
# ── HCP Vault auto-detection ──────────────────────────────────────────
# HCP Vault clusters always use "admin" as the root namespace.
# If no --namespace flag was given and the address matches HCP's domain,
# default to "admin" automatically.
start_namespace = args.namespace
if not start_namespace and ".hashicorp.cloud" in vault_addr:
start_namespace = "admin"
log.info("HCP Vault detected — defaulting to --namespace admin")
# ── Namespace discovery ───────────────────────────────────────────────
log.info("Phase 1/6: Discovering namespaces ...")
ns_list = collect_namespaces(client, start_ns=start_namespace)
report.namespaces = ns_list
log.info("Found %d namespace(s) (plus root)", len(ns_list))
# ── Interactive audit log prompt ──────────────────────────────────────
# If --audit-log was not given on the command line and we are running
# interactively (not piped), ask the user now.
if not args.audit_log and sys.stdin.isatty():
print()
print(" ┌─ Audit log ──────────────────────────────────────────────────────┐")
print(" │ Provide the path to your Vault audit log (JSON/JSONL format). │")
print(" │ This enables last-access and last-login timestamps for all │")
print(" │ secrets and users. │")
print(" │ │")
print(" │ HCP Vault: portal → cluster → Audit → enable + download logs │")
print(" │ On-prem : check your audit device path (vault audit list) │")
print(" │ Leave empty to continue without last-access data. │")
print(" └───────────────────────────────────────────────────────────────────┘")
try:
audit_input = input(" Audit log path > ").strip()
if audit_input:
args.audit_log = audit_input
except (EOFError, KeyboardInterrupt):
pass
print()
# ── Audit log ─────────────────────────────────────────────────────────
audit = AuditIndex()
if args.audit_log:
log.info("Phase 2/6: Parsing audit log: %s", args.audit_log)
try:
audit.parse_file(args.audit_log)
report.audit_log_path = args.audit_log
report.audit_log_entries_parsed = audit.entries_parsed
log.info(
"Audit log: %d entries parsed, %d secret paths, "
"%d entity logins, %d entity activity records",
audit.entries_parsed,
len(audit.secret_access),
len(audit.entity_login),
len(audit.entity_activity),
)
except FileNotFoundError:
log.error("Audit log not found: %s", args.audit_log)
except Exception as exc:
log.error("Failed to parse audit log: %s", exc)
else:
log.info(
"Phase 2/6: No audit log provided — last-access and last-activity data "
"will be unavailable.\n"
" Use --audit-log <path> to include access timestamps.\n"
" Token scan (enabled by default) will provide a creation-time proxy."
)
# ── Secrets ───────────────────────────────────────────────────────────
if not args.no_secrets:
log.info("Phase 3/6: Collecting secrets (kv, kv_v2, aws, terraform, kmip) ...")
secrets = collect_secrets(
client, ns_list,
max_depth=args.max_depth,
workers=args.workers,
access_errors=access_errors,
)
report.secrets = secrets
log.info("Collected %d secret(s)", len(secrets))
if report.audit_log_path:
log.info("Enriching secrets with audit log data ...")
enrich_secrets(secrets, audit)
else:
log.info("Phase 3/6: Secrets — skipped (--no-secrets)")
# ── Users / entities ──────────────────────────────────────────────────
if not args.no_users:
log.info("Phase 4/6: Collecting users/entities from identity store ...")
entities, group_name_map = collect_entities(
client, ns_list,
workers=args.workers,
access_errors=access_errors,
)
log.info("Identity store: %d entity/user record(s)", len(entities))
# Token accessor scan (on by default — gives last-renewal proxy)
if not args.no_token_scan:
log.info("Phase 5/6: Scanning token accessors for last-activity proxy ...")
orphans = scan_token_accessors(
client, entities, ns_list,
max_accessors=args.max_accessors,
workers=min(5, args.workers),
)
entities.extend(orphans)
log.info(
"Token scan complete. %d entity token(s) updated, %d orphan token(s).",
sum(1 for e in entities if e.latest_token and not e.entity_id == ""),
len(orphans),
)
else:
log.info("Phase 5/6: Token scan — skipped (--no-token-scan)")
# Auth method user scan (on by default — finds users never seen before)
if not args.no_auth_method_scan:
log.info("Phase 5b/6: Scanning auth method mounts for configured users ...")
auth_extras = collect_auth_method_users(client, ns_list, entities)
entities.extend(auth_extras)
log.info(
"Auth method scan complete. %d additional principal(s) discovered.",
len(auth_extras),
)
else:
log.info("Phase 5b/6: Auth method scan — skipped (--no-auth-method-scan)")
report.entities = entities
log.info("Total users/entities collected: %d", len(entities))
if report.audit_log_path:
log.info("Enriching entities with audit log data (login + activity) ...")
enrich_entities(entities, audit)
else:
log.info("Phase 4-5/6: Users — skipped (--no-users)")
# ── Output ────────────────────────────────────────────────────────────
log.info("Phase 6/6: Generating output ...")
elapsed = time.monotonic() - t0
report.stats = {
"api_calls": client.call_count,
"elapsed_s": round(elapsed, 2),
"namespace_count": len(ns_list),
"secret_count": len(report.secrets),
"entity_count": len(report.entities),
"access_error_count": len(access_errors),
}
use_rich = RICH and not args.no_color
if use_rich:
print_report_rich(report)
else:
print_report_plain(report)
if not args.no_save:
out_dir = Path(args.output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
export_json(report, out_dir / f"vault_audit_{ts}.json")
if report.secrets:
export_secrets_csv(report.secrets, out_dir / f"vault_secrets_{ts}.csv")
if report.entities:
export_entities_csv(report.entities, out_dir / f"vault_entities_{ts}.csv")
log.info("Output saved to: %s", out_dir.resolve())
log.info(
"Done. %d API calls in %.1fs. Secrets: %d. Entities: %d.",
client.call_count, elapsed, len(report.secrets), len(report.entities),
)
if __name__ == "__main__":
main()

vault_audit.py — HashiCorp Vault Comprehensive Audit Script

A Python 3 script that performs a full audit of a HashiCorp Vault cluster: secrets inventory across all namespaces, user/entity activity, and last-access timestamps sourced from the Vault audit log.

Works with HCP Vault (managed), Vault Enterprise (on-prem), and Vault OSS (namespaces disabled on OSS).


Features

Area What it collects
Namespaces Full recursive namespace tree via sys/namespaces
Secrets — KV v1 All secret paths (metadata not available in v1)
Secrets — KV v2 All secret paths + created_time, updated_time, version history, custom metadata
Secrets — AWS All configured roles (credential_type, role ARNs, policy ARNs)
Secrets — Terraform Cloud All configured roles (org, team_id, token_account_type)
Secrets — KMIP All scopes → roles (enabled operations, key type/bits)
Last access Exact timestamp, entity name, originating IP — requires audit log
Users/Entities Full identity store scan: aliases, policies, groups, auth method
Last login Exact timestamp, auth path, IP — requires audit log
Last activity Any API call by the entity (not just logins) — requires audit log
Token proxy Without audit log: token creation_time / last_renewal_time as a proxy
Auth method users Discovers users configured in userpass, ldap, github, approle, cert, oidc, jwt, k8s, etc.

Outputs

  • Console — coloured tables (requires rich) or plain text fallback
  • JSON — full structured report: vault_audit_YYYYMMDD_HHMMSS.json
  • Secrets CSV — one row per secret: vault_secrets_YYYYMMDD_HHMMSS.csv
  • Entities CSV — one row per user/entity: vault_entities_YYYYMMDD_HHMMSS.csv

Requirements

pip install requests urllib3        # required
pip install rich                    # optional — coloured console output

Python 3.8+. No other dependencies.


Setup

Environment variables

export VAULT_ADDR="https://your-cluster.hashicorp.cloud:8200"
export VAULT_TOKEN="hvs.your-audit-token"

# On-prem with internal CA:
export VAULT_CACERT="/etc/ssl/certs/internal-ca.pem"

Token requirements

If you have an audit log (recommended): run with --no-token-scan. The policy then requires only pure read/listno sudo, no update.

Without an audit log: the token scan (auth/token/accessors) acts as a last-activity proxy, but that endpoint requires sudo. Avoid this by enabling audit logging instead.

Path Capability Why
sys/namespaces, sys/namespaces/* list Namespace discovery
sys/mounts, sys/auth read Engine/auth mount enumeration
+/metadata, +/metadata/* read, list KV v2 secret metadata (no secret values read)
+/* list KV v1 path listing
+/roles, +/roles/* list, read AWS roles
+/role, +/role/* list, read Terraform / approle / oidc / etc. roles
+/scope, +/scope/*/role, +/scope/*/role/* list, read KMIP scopes and roles
identity/entity/id, identity/entity/id/* list, read User/entity collection
identity/group/id, identity/group/id/* list, read Group membership
auth/+/users, auth/+/users/* list, read userpass / ldap users
auth/+/groups, auth/+/groups/* list, read ldap groups
auth/+/role, auth/+/role/* list, read approle / oidc / jwt / k8s roles
auth/+/roles, auth/+/roles/* list, read aws / gcp / azure auth roles
auth/+/certs, auth/+/certs/* list, read cert auth roles
auth/+/map/users, auth/+/map/users/* list, read github auth users
auth/token/accessors sudo, list Token scan only — skip with --no-token-scan
auth/token/lookup-accessor update Token scan only — skip with --no-token-scan

Minimal read-only Vault policy

Save as vault-audit-readonly.hcl and apply:

# For HCP Vault or namespace-rooted clusters:
vault policy write -namespace=admin vault-audit-readonly vault-audit-readonly.hcl
vault token create -namespace=admin \
  -policy=vault-audit-readonly \
  -ttl=1h \
  -display-name="vault-audit-run"
# vault-audit-readonly.hcl
# Pure read/list — no sudo, no write access.
# Use with: python3 vault_audit.py --no-token-scan --audit-log /path/to/audit.log

# ── Namespace discovery ───────────────────────────────────────────────────────
path "sys/namespaces" {
  capabilities = ["list"]
}
path "sys/namespaces/*" {
  capabilities = ["list"]
}

# ── Mount enumeration ─────────────────────────────────────────────────────────
path "sys/mounts" {
  capabilities = ["read"]
}
path "sys/auth" {
  capabilities = ["read"]
}

# ── KV v2 (metadata only — secret values are never read) ──────────────────────
path "+/metadata" {
  capabilities = ["list"]
}
path "+/metadata/*" {
  capabilities = ["read", "list"]
}

# ── KV v1 (path listing only — secret values are never read) ──────────────────
path "+/*" {
  capabilities = ["list"]
}

# ── AWS secrets engine ────────────────────────────────────────────────────────
path "+/roles" {
  capabilities = ["list"]
}
path "+/roles/*" {
  capabilities = ["read"]
}

# ── Terraform Cloud / approle / oidc / jwt / k8s / aws-auth / azure / gcp ────
path "+/role" {
  capabilities = ["list"]
}
path "+/role/*" {
  capabilities = ["read"]
}

# ── KMIP secrets engine ───────────────────────────────────────────────────────
path "+/scope" {
  capabilities = ["list"]
}
path "+/scope/*/role" {
  capabilities = ["list"]
}
path "+/scope/*/role/*" {
  capabilities = ["read"]
}

# ── Identity store ────────────────────────────────────────────────────────────
path "identity/entity/id" {
  capabilities = ["list"]
}
path "identity/entity/id/*" {
  capabilities = ["read"]
}
path "identity/group/id" {
  capabilities = ["list"]
}
path "identity/group/id/*" {
  capabilities = ["read"]
}

# ── Auth method user/role discovery ───────────────────────────────────────────
path "auth/+/users" {
  capabilities = ["list"]
}
path "auth/+/users/*" {
  capabilities = ["read"]
}
path "auth/+/groups" {
  capabilities = ["list"]
}
path "auth/+/groups/*" {
  capabilities = ["read"]
}
path "auth/+/map/users" {
  capabilities = ["list"]
}
path "auth/+/map/users/*" {
  capabilities = ["read"]
}
path "auth/+/role" {
  capabilities = ["list"]
}
path "auth/+/role/*" {
  capabilities = ["read"]
}
path "auth/+/roles" {
  capabilities = ["list"]
}
path "auth/+/roles/*" {
  capabilities = ["read"]
}
path "auth/+/certs" {
  capabilities = ["list"]
}
path "auth/+/certs/*" {
  capabilities = ["read"]
}

# ── Token scan (only needed WITHOUT --no-token-scan) ──────────────────────────
# If you have an audit log, use --no-token-scan and omit these two blocks.
# auth/token/accessors requires the "sudo" capability — avoid if possible.
#
# path "auth/token/accessors" {
#   capabilities = ["sudo", "list"]
# }
# path "auth/token/lookup-accessor" {
#   capabilities = ["update"]   # POST endpoint, but it's a read operation
# }

+ vs * in path globs + matches exactly one path segment (e.g. the mount name). * matches the rest of the path including slashes. Using + is more precise and avoids overly broad grants.


Usage

Quick start

# HCP Vault — root namespace is auto-detected, starts in "admin"
# With audit log (recommended — enables exact timestamps, no sudo needed)
python3 vault_audit.py --no-token-scan --audit-log /path/to/audit.log

# HCP Vault — interactive: script will prompt for the audit log path
python3 vault_audit.py --no-token-scan

# On-prem Enterprise
python3 vault_audit.py --no-token-scan --audit-log /var/log/vault/audit.log

# Scan a specific namespace subtree only
python3 vault_audit.py --no-token-scan --namespace team-a/prod

# Save nothing to disk (console only)
python3 vault_audit.py --no-token-scan --no-save

# Verbose/debug logging
python3 vault_audit.py --no-token-scan -v

All options

Connection:
  --addr URL          Vault address (default: $VAULT_ADDR)
  --token TOKEN       Vault token (default: $VAULT_TOKEN)
  --no-tls-verify     Disable TLS certificate verification
  --ca-cert PATH      CA bundle for on-prem TLS ($VAULT_CACERT)
  --timeout SEC       Per-request timeout, seconds (default: 15)
  --namespace NS      Start scan from this namespace (default: root)

Scope:
  --no-secrets               Skip secret collection
  --no-users                 Skip user/entity collection
  --no-token-scan            Skip token accessor scan (use this when you have an audit log)
  --no-auth-method-scan      Skip auth method user discovery
  --max-accessors N          Max token accessors per namespace (default: 2000)
  --max-depth N              Max KV directory recursion depth (default: 12)

Audit Log:
  --audit-log PATH    Path to Vault audit JSONL log (enables timestamps)

Output:
  --output-dir DIR    Output directory (default: ./vault_audit_output)
  --no-save           Console only, no files
  --no-color          Disable rich/colour output

Performance:
  --workers N         Thread pool size (default: 10)
  --rate-limit RPS    Max API requests/second (default: 50)

Debug:
  -v, --verbose       Debug logging

About audit logs

"Last accessed" and "last login" are not stored natively in Vault. They only become available by parsing the Vault audit log.

Enabling audit logging

# File-based audit device
vault audit enable file file_path=/var/log/vault/audit.log

# Verify
vault audit list

HCP Vault: Portal → your cluster → ObservabilityAudit Logging → enable, then stream/export to an S3 bucket, Datadog, Splunk, etc. and download the JSONL file.

Audit log format

Vault writes one JSON object per line (JSONL). Relevant fields the script uses:

{
  "type": "response",                     // "request" entries are skipped (avoids double-counting)
  "time": "2025-03-01T14:23:11.123456Z",
  "auth": {
    "entity_id": "abc-123",               // NOT hashed — used to correlate with identity store
    "display_name": "alice",
    "token_type": "service",
    "policies": ["default", "kv-read"]
  },
  "request": {
    "operation": "read",
    "path": "secret/data/myapp/db-creds",
    "namespace": { "id": "...", "path": "team-a/" },
    "remote_address": "10.0.1.5"         // NOT hashed — real IP (or proxy IP if behind LB)
  },
  "response": {
    "auth": { ... }                       // present only for login events
  }
}

Note on IPs behind a load balancer: if your Vault is behind a proxy/LB, remote_address will show the proxy IP. Enable X-Forwarded-For forwarding with:

vault write sys/config/auditing/request-headers/X-Forwarded-For insensitive=true

The script automatically extracts the real client IP from X-Forwarded-For when present.


Architecture

main()
 ├─ Phase 1 — Namespace discovery      sys/namespaces (recursive)
 ├─ (interactive prompt for audit log if TTY and --audit-log not given)
 ├─ Phase 2 — Audit log parsing        JSONL → AuditIndex in-memory map
 ├─ Phase 3 — Secret collection        ThreadPoolExecutor over all mounts
 │   ├─ KV v1  list_kv1_recursive()
 │   ├─ KV v2  list_kv2_recursive() + kv2_get_meta()
 │   ├─ AWS    LIST {mount}/roles → GET {mount}/roles/{role}
 │   ├─ TF     LIST {mount}/role  → GET {mount}/role/{role}
 │   └─ KMIP   LIST {mount}/scope → LIST scope/{s}/role → GET scope/{s}/role/{r}
 ├─ Phase 3b — Audit enrichment        enrich_secrets() matches paths in AuditIndex
 ├─ Phase 4 — Entity collection        LIST + GET identity/entity/id/*
 ├─ Phase 5 — Token accessor scan      LIST auth/token/accessors → lookup-accessor
 │                                     (skipped with --no-token-scan)
 ├─ Phase 5b — Auth method user scan   collect_auth_method_users()
 ├─ Phase 5c — Audit enrichment        enrich_entities() (login + activity events)
 └─ Phase 6 — Output
     ├─ Console (rich or plain)
     ├─ vault_audit_<ts>.json
     ├─ vault_secrets_<ts>.csv
     └─ vault_entities_<ts>.csv

Key classes

Class Purpose
VaultClient HTTP wrapper with rate-limiting, retry, thread-local sessions, namespace header
TokenBucket Thread-safe token-bucket rate limiter
AuditIndex Parses JSONL audit log into three in-memory dicts: secret_access, entity_login, entity_activity
SecretRecord One per discovered secret/role — engine type, metadata, last-access from audit log
EntityRecord One per user/entity — aliases, policies, last login, last activity from audit log
TokenProxyRecord One per token accessor — last_renewal_time used as activity proxy without audit log
VaultAuditReport Top-level container passed to all output functions

Example outputs

Console — startup log

10:42:01  INFO     Connecting to: https://myvault.example.com:8200  (TLS verify=True)
10:42:01  INFO     Vault version: 1.17.3+ent, cluster: vault-cluster-prod
10:42:01  INFO     Phase 1/6: Discovering namespaces from root ...
10:42:02  INFO     Found 13 namespace(s) (plus root)

  ┌─ Audit log ──────────────────────────────────────────────────────┐
  │ Provide the path to your Vault audit log (JSON/JSONL format).    │
  │ This enables last-access and last-login timestamps for all       │
  │ secrets and users.                                                │
  │                                                                   │
  │ HCP Vault: portal → cluster → Audit → enable + download logs     │
  │ On-prem  : check your audit device path (vault audit list)        │
  │ Leave empty to continue without last-access data.                 │
  └───────────────────────────────────────────────────────────────────┘
  Audit log path > /var/log/vault/audit.log

10:42:05  INFO     Phase 2/6: Parsing audit log: /var/log/vault/audit.log
10:42:07  INFO     Audit log: 142,831 entries parsed, 48 secret paths, 312 entity logins, 1,205 entity activity records
10:42:07  INFO     Phase 3/6: Collecting secrets from all engine mounts ...
10:42:07  INFO     Found 9 secret engine mount(s) to scan across 14 namespace(s) (types: kv, aws, terraform, kmip)
10:42:09  INFO     Collected 34 secret(s)
10:42:09  INFO     Phase 4/6: Collecting users/entities from identity store ...
10:42:10  INFO     Identity store: 28 entity/user record(s)
10:42:11  INFO     Phase 5/6: Token scan — skipped (--no-token-scan)
10:42:11  INFO     Phase 5b/6: Scanning auth method mounts for configured users ...
10:42:12  INFO     Auth method scan complete. 5 additional principal(s) discovered.
10:42:12  INFO     Phase 6/6: Generating output ...
10:42:12  INFO     Done. 194 API calls in 11.1s. Secrets: 34. Entities: 33.

Console — namespace tree

══════════════════════════════════════════════════════════════════════════════════════
  HCP VAULT AUDIT REPORT
  Generated : 2025-03-01T10:42:12Z
  Cluster   : https://myvault.example.com:8200  (version 1.17.3+ent, vault-cluster-prod)
  Namespaces: 13  |  Secrets: 34  |  Entities: 33
  Audit log : 142,831 entries from /var/log/vault/audit.log
══════════════════════════════════════════════════════════════════════════════════════

──────────────────────────────────────────────────────────────────────────────────────
  NAMESPACE TREE
──────────────────────────────────────────────────────────────────────────────────────
  (root)
  └─ admin/
    └─ team-a/
      └─ team-a/prod/
      └─ team-a/staging/
    └─ team-b/
    └─ platform/
      └─ platform/aws/
      └─ platform/kmip/

Console — secrets table

──────────────────────────────────────────────────────────────────────────────────────
  SECRETS  (34 total)
──────────────────────────────────────────────────────────────────────────────────────

  Namespace: admin/team-a/prod/  (11 secrets)
  MOUNT                  PATH                                     ENGINE   CREATED              UPDATED              VER  LAST READ            BY                        IP
  ─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
  secret                 myapp/db-creds                           kv_v2    2024-08-10 09:12:44  2025-01-15 16:33:01   3   2025-02-28 14:23:11  alice (alice@ldap)         10.0.1.42
  secret                 myapp/api-keys                           kv_v2    2024-08-10 09:13:22  2024-11-20 10:01:55   1   2025-02-20 08:55:30  svc-backend (approle)     10.0.2.100
  secret                 shared/tls-cert                          kv_v2    2024-06-01 00:00:00  2025-01-01 00:00:00   5   never                -                         -
  legacy                 old-config/settings                      kv_v1    -                    -                         2025-01-10 11:20:00  bob (bob@ldap)             10.0.1.55
  aws                    prod-role-readonly                        aws      -                    -                         2025-02-25 09:00:12  svc-infra (approle)        10.0.2.101
  aws                    prod-role-admin                           aws      -                    -                         2025-02-01 17:44:03  charlie (charlie@ldap)     10.0.1.88
  terraform              tfc-workspace-deploy                      terraform -                   -                         2025-02-19 14:05:55  svc-ci (approle)           10.0.2.105

  Namespace: admin/platform/kmip/  (4 secrets)
  MOUNT                  PATH                                     ENGINE   ...
  kmip                   acme-corp/db-encrypt                     kmip     ...   2025-02-27 22:10:03  svc-db (token)            10.0.3.10
  kmip                   acme-corp/backup-keys                    kmip     ...   never                -                         -

Console — user summary table

──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
  USER ACTIVITY SUMMARY  (33 total)
──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
  NAME                         NAMESPACE              AUTH TYPE      LAST LOGIN           LAST ACTIVITY        IP (login)         STATUS    LOGINS
  ─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────
  alice                        admin/                 ldap           2025-03-01 08:44:02  2025-03-01 14:23:11  10.0.1.42          active    142
  svc-backend                  admin/team-a/prod/     approle        2025-02-28 01:00:00  2025-03-01 12:10:55  10.0.2.100         active    8,312
  svc-infra                    admin/team-a/prod/     approle        2025-02-25 09:00:08  2025-02-25 09:00:12  10.0.2.101         active    44
  bob                          admin/                 ldap           2025-01-10 11:18:44  2025-01-10 11:20:00  10.0.1.55          active    23
  charlie                      admin/                 ldap           2025-02-01 17:43:50  2025-02-01 17:44:03  10.0.1.88          active    11
  svc-ci                       admin/platform/        approle        2025-02-19 14:05:50  2025-02-19 14:05:55  10.0.2.105         active    189
  old-service-account          admin/                 approle        2024-06-30 23:59:00  2024-06-30 23:59:00  10.0.2.200         DISABLED  1,044

Secrets CSV columns

namespace_path, mount_path, secret_path, engine_type, kv_version,
created_time, updated_time, current_version, oldest_version, max_versions,
last_accessed_time, last_accessed_by_entity_id, last_accessed_by_display_name,
last_accessed_from_ip, last_accessed_operation, access_count,
metadata_error, custom_metadata, engine_data

Entities CSV columns

namespace_path, entity_id, name, disabled, policies, groups,
creation_time, last_update_time,
aliases_summary,
token_accessor, token_display_name, token_auth_path,
token_creation_time, token_expire_time, token_last_renewal_time,
last_login_time, last_login_from_ip, last_login_auth_method,
last_login_auth_path, last_login_namespace, login_count,
last_activity_time, last_activity_ip, last_activity_operation,
last_activity_path, last_activity_mount_type, activity_count,
last_seen_time, auth_method_extra

Limitations & notes

Limitation Detail
Vault OSS No namespace support — sys/namespaces returns 403; script warns and continues
HCP Vault Audit logs are not queryable via API — export from portal first
HCP Vault root All real resources live under admin/ — auto-detected from .hashicorp.cloud in URL
Audit log required for timestamps Without it, only token creation_time / last_renewal_time is available as a proxy
HMAC Secret values and some metadata fields are HMAC-hashed in audit logs (by design); entity_id, request.path, and remote_address are NOT hashed
Proxy IPs If Vault is behind a load balancer, remote_address shows the proxy IP unless X-Forwarded-For is configured on the audit device
AWS STS roles Access via {mount}/sts/{role} is also captured (in addition to /creds/)
KMIP The script lists roles (configs); actual KMIP protocol operations are logged differently
Rate limiting Default 50 RPS; tune with --rate-limit to avoid overwhelming Vault

Security recommendations

  1. Use a dedicated audit token — apply the minimal policy above; do not use root.
  2. Always use --no-token-scan when you have an audit log — it removes the only sudo requirement from the policy.
  3. Never paste tokens in chat or logs — use environment variables only.
  4. Rotate the token immediately after the audit run (or use short-TTL tokens: -ttl=1h).
  5. Protect the output files — they contain your full secret inventory. Store them in a restricted location and delete when done.
  6. Keep audit logging enabled — without it, there is no reliable record of who accessed what and when.

License

MIT. Use at your own risk. Not an official HashiCorp product.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment