Skip to content

Instantly share code, notes, and snippets.

@thundergolfer
Created January 19, 2026 15:20
Show Gist options
  • Select an option

  • Save thundergolfer/931f08687d6b1b215083dbeb56f29f50 to your computer and use it in GitHub Desktop.

Select an option

Save thundergolfer/931f08687d6b1b215083dbeb56f29f50 to your computer and use it in GitHub Desktop.
"""
safetykit is a mini-library to demonstrate safety features designed for use in human-in-the-loop scripts.
Running scripts in production without safetykit features is like driving a car without a seatbelt.
Fasten your seatbelt, put on your helmet.
"""
import argparse
import hashlib
import http.server
import json
import os
import random
import secrets
import signal
import socket
import sys
import tempfile
import threading
import time
import urllib.error
import urllib.request
from typing import NoReturn
def praise_dryrun(dryrun: bool = True) -> None:
"""
The first technique is the dry run mode: --dry-run
Dry run allows an engineer to see what they would do before they actually do it.
In other words, it **separates planning from execution**.
The separation of planning from execution is a generally helpful in semi-automated systems.
**Example**
```python
def gather(paths):
files = []
for pattern in paths:
files.extend(glob.glob(pattern))
return files
def execute(files):
for f in files:
os.remove(f)
files = gather([os.path.join(tmp_dir, "*.txt")])
if dryrun:
print(f"Would remove: {files}")
else:
files = gather([os.path.join(tmp_dir, "*.txt")])
execute(files)
```
**Claim:** I have not met a script which couldn't trivially be made safer by adding a dry run mode.
read: https://www.gresearch.com/news/in-praise-of-dry-run
"""
tmp_dir = "/tmp"
files = [os.path.join(tmp_dir, f) for f in os.listdir(tmp_dir) if os.path.isfile(os.path.join(tmp_dir, f))]
file_to_remove = random.choice(files)
if dryrun:
print(f"🌵 Would remove: {file_to_remove}")
else:
os.remove(file_to_remove)
print(f"🌊 Removed: {file_to_remove}")
def ask_for_confirmation() -> bool:
"""
The second technique is asking for confirmation before a destructive action.
You've used this hundreds of times on SaaS and CSP dashboards. Confirmation
boxes exists because they work!
Confirmation boxes help users avoid what's called a 'slip'. A slip is when a user
accidentally performs an action they didn't intend to. Humans attention is flaky, especially
these days, and the confirmation software feature significantly reduces regrettable actions.
It's IMPORTANT to know that Y/N confirmation is significantly less effective than 'type this name' confirmation.
If the user is doing something destructive and irreversible, Y/N confirmation is not effective enough.
Encouraging mindless Y,Y,Y or tab-tab-tab tapping is a receipe for disaster.
The Therac-25 accident involved software which trained operators to mindlessly press the P key (Proceed : continue).
And they pressed it even when the machine was indirectly asking if they wanted to kill the patient.
read: Nancy G. Leveson and Clark S. Turner, "An Investigation of the Therac-25 Accidents,"
IEEE Computer, Vol. 26, No. 7, July 1993, pp. 18-41.
"""
import os
import random
tmp_dir = "/tmp"
files = [f for f in os.listdir(tmp_dir) if os.path.isfile(os.path.join(tmp_dir, f))]
if not files:
print("No files in /tmp to delete.")
return False
file_to_remove = random.choice(files)
print(f"Selected file for deletion: {file_to_remove}")
confirmation = input(f"Type the filename '{file_to_remove}' to confirm deletion: ")
if confirmation == file_to_remove:
os.remove(os.path.join(tmp_dir, file_to_remove))
print(f"✅ Removed: {file_to_remove}")
return True
else:
print("❌ Confirmation failed. File not removed.")
return False
def pause_a_beat():
"""
The third technique involves slowing down execution to give users a chance to review the action
or rethink their intentions.
You've surely in your life panic-mashed CTRL-C to stop a computer doing something you just realized
you didn't want. If the computer was operating at full gallop, your 100+ms monkey reflexes were probably too slow.
By adding pauses into execution, a human operator is able to:
1. Re-evaluate intentions *after* initiation.
2. Keep up with the computer's pace and intervene.
This technique is best suited when a script's wall clock performance is not at all important. For example,
if a dangerous script is run once a week and takes 10 seconds to run with pauses included, the gain in safety
is worth the 10 seconds of extra operator time.
"""
print("🌊 Running rm -rf on system root (/).\nLet's free some disk space!")
try:
time.sleep(2)
except KeyboardInterrupt:
print("glad I paused?")
return
print("🌵 Just kidding!")
def abort_safely():
"""
The fourth technique is a demonstration of a classic safety technique, used in factories
and airplanes all over the world: abort or fail into a safe state. "fail-safe" design.
A most famous example is Otis's elevator: emergency brakes in elevators use springs that clamp
onto guide rails when cable tension is lost.
In the domain of scripts, it's more relevant to talk about abort-safety.
Abort-safety is the presence of safety when a script is cancelled (SIGINT), terminated (SIGTERM), or killed (SIGKILL).
**Example:** Safely writing to a file such that ctrl-C or kill won't corrupt the file.
This function uses a temp file + an atomic move, and register handlers for SIGINT/SIGTERM to clean up.
read: https://en.wikipedia.org/wiki/Fail-safe
"""
data_to_write = "Important data that shouldn't be partially saved!\n"
output_path = os.path.join(tempfile.gettempdir(), "critical_output.txt")
temp_path = output_path + ".tmp"
def cleanup_temp(*_):
try:
os.remove(temp_path)
except OSError:
pass
sys.exit(1) # exit unhappily, but safely
# Register abort/termination signal handlers
for sig in (signal.SIGINT, signal.SIGTERM):
signal.signal(sig, cleanup_temp)
print(f"Writing data to {output_path} safely (interrupt with Ctrl-C or kill)...")
try:
with open(temp_path, "w") as f:
f.write(data_to_write)
f.flush()
os.fsync(f.fileno())
# Intentionally sleep to give user a chance to Ctrl+C
print("...writing (simulate slow write, try interrupting now)...")
time.sleep(5)
# Perform atomic move only if everything completed
os.replace(temp_path, output_path)
print(f"✅ Safely wrote data to {output_path}")
except Exception as exc:
print(f"❌ Exception during write: {exc}")
finally:
cleanup_temp()
def undo(): ...
def feedback(): ...
def audit(): ...
class TwoPersonRuleError(Exception):
pass
def two_person_rule(
secret: str | None = None,
initiator_url: str | None = None,
timeout_seconds: int = 300,
) -> bool:
"""
Per US Air Force Instruction (AFI) 91-104, "the two-person concept" is designed
to prevent accidental or malicious launch of nuclear weapons by a single individual.
We can do this with software scripts too!
This function implements a coordination mechanism where two people must independently
run the script and confirm each other before one script proceeds.
Args:
secret: A shared secret known to both persons. If None, reads from
TWO_PERSON_SECRET env var or prompts for input.
initiator_url: If provided, run as responder connecting to this URL.
If None, run as initiator (start server).
timeout_seconds: How long to wait for the second person (default 5 minutes).
Returns:
True if this script should proceed (initiator after confirmation).
False and exits if this script is the responder (other script proceeds).
Raises:
TwoPersonRuleError: If coordination fails (timeout, wrong secret, etc.)
ref: https://en.wikipedia.org/wiki/Two-person_rule
"""
def get_local_ip() -> str:
"""Get the local IP address that can be reached by other machines."""
try:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
except Exception:
return "127.0.0.1"
def run_as_initiator(secret_hash: str) -> bool:
"""Run as the initiator: start HTTP server and wait for responder."""
state = {
"confirmed": False,
"error": None,
"responder_nonce": None,
"initiator_nonce": secrets.token_hex(16),
"ack_received": False,
}
state_lock = threading.Lock()
confirmation_event = threading.Event()
class TwoPersonHandler(http.server.BaseHTTPRequestHandler):
def log_message(self, format, *args):
pass
def do_POST(self):
if self.path == "/join":
self._handle_join()
elif self.path == "/ack":
self._handle_ack()
else:
self.send_error(404)
def _handle_join(self):
try:
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8")
data = json.loads(body)
responder_secret_hash = data.get("secret_hash", "")
responder_nonce = data.get("nonce", "")
if not secrets.compare_digest(responder_secret_hash, secret_hash):
self.send_response(403)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"error": "Secret mismatch"}).encode())
with state_lock:
state["error"] = "Responder provided incorrect secret"
return
with state_lock:
state["responder_nonce"] = responder_nonce
state["confirmed"] = True
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
response = {
"status": "confirmed",
"initiator_nonce": state["initiator_nonce"],
}
self.wfile.write(json.dumps(response).encode())
except Exception as e:
self.send_error(400, str(e))
def _handle_ack(self):
try:
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length).decode("utf-8")
data = json.loads(body)
ack_nonce = data.get("initiator_nonce", "")
if secrets.compare_digest(ack_nonce, state["initiator_nonce"]):
with state_lock:
state["ack_received"] = True
confirmation_event.set()
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "ok"}).encode())
except Exception as e:
self.send_error(400, str(e))
server = http.server.HTTPServer(("0.0.0.0", 0), TwoPersonHandler)
port = server.server_address[1]
local_ip = get_local_ip()
print("=" * 60)
print("TWO-PERSON RULE: Initiator Mode")
print("=" * 60 + "\n")
print("Share the following with the second person:")
print(f" URL: http://{local_ip}:{port}")
print(" (They must also know the shared secret)\n")
print(f"Waiting for second person to connect (timeout: {timeout_seconds}s)...")
print("Press Ctrl+C to abort.\n")
server_thread = threading.Thread(target=server.serve_forever)
server_thread.daemon = True
server_thread.start()
try:
if not confirmation_event.wait(timeout=timeout_seconds):
server.shutdown()
raise TwoPersonRuleError(f"Timeout: No second person connected within {timeout_seconds} seconds.")
server.shutdown()
with state_lock:
if state["error"]:
raise TwoPersonRuleError(state["error"])
if not state["ack_received"]:
raise TwoPersonRuleError("Handshake incomplete: no acknowledgment received.")
print("\n" + ("=" * 60))
print("Two-person confirmation COMPLETE!")
print("Second person has confirmed. This script will proceed.")
print(("=" * 60) + "\n")
return True
except KeyboardInterrupt:
server.shutdown()
raise TwoPersonRuleError("Aborted by user (Ctrl+C).")
def run_as_responder(secret_hash: str) -> NoReturn:
"""Run as the responder: connect to initiator and confirm."""
print("=" * 60)
print("TWO-PERSON RULE: Responder Mode")
print("=" * 60)
print(f"\nConnecting to initiator at: {initiator_url}\n")
responder_nonce = secrets.token_hex(16)
join_url = initiator_url.rstrip("/") + "/join"
join_data = json.dumps(
{
"secret_hash": secret_hash,
"nonce": responder_nonce,
}
).encode("utf-8")
try:
req = urllib.request.Request(
join_url,
data=join_data,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=30) as response:
response_data = json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as e:
if e.code == 403:
raise TwoPersonRuleError("Secret mismatch! Your secret does not match the initiator's secret.")
raise TwoPersonRuleError(f"Connection failed: HTTP {e.code}")
except urllib.error.URLError as e:
raise TwoPersonRuleError(f"Connection failed: {e.reason}")
except Exception as e:
raise TwoPersonRuleError(f"Connection failed: {e}")
if response_data.get("status") != "confirmed":
raise TwoPersonRuleError(f"Unexpected response: {response_data}")
initiator_nonce = response_data.get("initiator_nonce", "")
if not initiator_nonce:
raise TwoPersonRuleError("Invalid response: missing initiator nonce")
print("Secret verified by initiator!")
print()
ack_url = initiator_url.rstrip("/") + "/ack"
ack_data = json.dumps(
{
"initiator_nonce": initiator_nonce,
}
).encode("utf-8")
try:
req = urllib.request.Request(
ack_url,
data=ack_data,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=30) as response:
response.read()
except Exception as e:
raise TwoPersonRuleError(f"Acknowledgment failed: {e}")
print("=" * 60)
print("Two-person confirmation COMPLETE!")
print("Initiator has been notified. THIS script will now EXIT.")
print("(The initiator's script will proceed.)")
print("=" * 60)
print()
sys.exit(0)
# Get the shared secret
if secret is None:
secret = os.environ.get("TWO_PERSON_SECRET")
if secret is None:
print("Two-Person Rule: Enter the shared secret known to both persons.")
secret = input("Secret: ").strip()
if not secret:
raise TwoPersonRuleError("A shared secret is required for two-person coordination.")
# Hash the secret to avoid transmitting it in plaintext
secret_hash = hashlib.sha256(secret.encode()).hexdigest()
if initiator_url is None:
confirmed = run_as_initiator(secret_hash)
else:
run_as_responder(secret_hash)
if confirmed:
print("Demo: The two-person rule was satisfied!")
print("In a real script, dangerous operations would proceed here.")
def main():
parser = argparse.ArgumentParser(description="SafetyKit Demos: Choose which safety technique to demo.")
parser.add_argument(
"demo",
choices=["dryrun", "confirm", "pause", "abort", "two_person"],
help=(
"Which demo to run: 'dryrun' (dry run mode), 'confirm' (ask for confirmation), "
"'pause' (pause before action), 'abort' (abort-safe writes), "
"'two_person' (two-person rule coordination)."
),
)
parser.add_argument(
"--connect",
metavar="URL",
help="For 'two_person' demo: connect to an initiator at this URL (responder mode).",
)
parser.add_argument(
"--secret",
help="For 'two_person' demo: the shared secret (can also use TWO_PERSON_SECRET env var).",
)
parser.add_argument(
"--timeout",
type=int,
default=300,
help="For 'two_person' demo: timeout in seconds waiting for second person (default: 300).",
)
args = parser.parse_args()
if args.demo == "dryrun":
praise_dryrun(dryrun=True)
elif args.demo == "confirm":
ask_for_confirmation()
elif args.demo == "pause":
pause_a_beat()
elif args.demo == "abort":
abort_safely()
elif args.demo == "two_person":
try:
two_person_rule(
secret=args.secret,
initiator_url=args.connect,
timeout_seconds=args.timeout,
)
except TwoPersonRuleError as e:
print(f"Two-person rule failed: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment