Created
January 19, 2026 15:20
-
-
Save thundergolfer/931f08687d6b1b215083dbeb56f29f50 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| safetykit is a mini-library to demonstrate safety features designed for use in human-in-the-loop scripts. | |
| Running scripts in production without safetykit features is like driving a car without a seatbelt. | |
| Fasten your seatbelt, put on your helmet. | |
| """ | |
| import argparse | |
| import hashlib | |
| import http.server | |
| import json | |
| import os | |
| import random | |
| import secrets | |
| import signal | |
| import socket | |
| import sys | |
| import tempfile | |
| import threading | |
| import time | |
| import urllib.error | |
| import urllib.request | |
| from typing import NoReturn | |
| def praise_dryrun(dryrun: bool = True) -> None: | |
| """ | |
| The first technique is the dry run mode: --dry-run | |
| Dry run allows an engineer to see what they would do before they actually do it. | |
| In other words, it **separates planning from execution**. | |
| The separation of planning from execution is a generally helpful in semi-automated systems. | |
| **Example** | |
| ```python | |
| def gather(paths): | |
| files = [] | |
| for pattern in paths: | |
| files.extend(glob.glob(pattern)) | |
| return files | |
| def execute(files): | |
| for f in files: | |
| os.remove(f) | |
| files = gather([os.path.join(tmp_dir, "*.txt")]) | |
| if dryrun: | |
| print(f"Would remove: {files}") | |
| else: | |
| files = gather([os.path.join(tmp_dir, "*.txt")]) | |
| execute(files) | |
| ``` | |
| **Claim:** I have not met a script which couldn't trivially be made safer by adding a dry run mode. | |
| read: https://www.gresearch.com/news/in-praise-of-dry-run | |
| """ | |
| tmp_dir = "/tmp" | |
| files = [os.path.join(tmp_dir, f) for f in os.listdir(tmp_dir) if os.path.isfile(os.path.join(tmp_dir, f))] | |
| file_to_remove = random.choice(files) | |
| if dryrun: | |
| print(f"🌵 Would remove: {file_to_remove}") | |
| else: | |
| os.remove(file_to_remove) | |
| print(f"🌊 Removed: {file_to_remove}") | |
| def ask_for_confirmation() -> bool: | |
| """ | |
| The second technique is asking for confirmation before a destructive action. | |
| You've used this hundreds of times on SaaS and CSP dashboards. Confirmation | |
| boxes exists because they work! | |
| Confirmation boxes help users avoid what's called a 'slip'. A slip is when a user | |
| accidentally performs an action they didn't intend to. Humans attention is flaky, especially | |
| these days, and the confirmation software feature significantly reduces regrettable actions. | |
| It's IMPORTANT to know that Y/N confirmation is significantly less effective than 'type this name' confirmation. | |
| If the user is doing something destructive and irreversible, Y/N confirmation is not effective enough. | |
| Encouraging mindless Y,Y,Y or tab-tab-tab tapping is a receipe for disaster. | |
| The Therac-25 accident involved software which trained operators to mindlessly press the P key (Proceed : continue). | |
| And they pressed it even when the machine was indirectly asking if they wanted to kill the patient. | |
| read: Nancy G. Leveson and Clark S. Turner, "An Investigation of the Therac-25 Accidents," | |
| IEEE Computer, Vol. 26, No. 7, July 1993, pp. 18-41. | |
| """ | |
| import os | |
| import random | |
| tmp_dir = "/tmp" | |
| files = [f for f in os.listdir(tmp_dir) if os.path.isfile(os.path.join(tmp_dir, f))] | |
| if not files: | |
| print("No files in /tmp to delete.") | |
| return False | |
| file_to_remove = random.choice(files) | |
| print(f"Selected file for deletion: {file_to_remove}") | |
| confirmation = input(f"Type the filename '{file_to_remove}' to confirm deletion: ") | |
| if confirmation == file_to_remove: | |
| os.remove(os.path.join(tmp_dir, file_to_remove)) | |
| print(f"✅ Removed: {file_to_remove}") | |
| return True | |
| else: | |
| print("❌ Confirmation failed. File not removed.") | |
| return False | |
| def pause_a_beat(): | |
| """ | |
| The third technique involves slowing down execution to give users a chance to review the action | |
| or rethink their intentions. | |
| You've surely in your life panic-mashed CTRL-C to stop a computer doing something you just realized | |
| you didn't want. If the computer was operating at full gallop, your 100+ms monkey reflexes were probably too slow. | |
| By adding pauses into execution, a human operator is able to: | |
| 1. Re-evaluate intentions *after* initiation. | |
| 2. Keep up with the computer's pace and intervene. | |
| This technique is best suited when a script's wall clock performance is not at all important. For example, | |
| if a dangerous script is run once a week and takes 10 seconds to run with pauses included, the gain in safety | |
| is worth the 10 seconds of extra operator time. | |
| """ | |
| print("🌊 Running rm -rf on system root (/).\nLet's free some disk space!") | |
| try: | |
| time.sleep(2) | |
| except KeyboardInterrupt: | |
| print("glad I paused?") | |
| return | |
| print("🌵 Just kidding!") | |
| def abort_safely(): | |
| """ | |
| The fourth technique is a demonstration of a classic safety technique, used in factories | |
| and airplanes all over the world: abort or fail into a safe state. "fail-safe" design. | |
| A most famous example is Otis's elevator: emergency brakes in elevators use springs that clamp | |
| onto guide rails when cable tension is lost. | |
| In the domain of scripts, it's more relevant to talk about abort-safety. | |
| Abort-safety is the presence of safety when a script is cancelled (SIGINT), terminated (SIGTERM), or killed (SIGKILL). | |
| **Example:** Safely writing to a file such that ctrl-C or kill won't corrupt the file. | |
| This function uses a temp file + an atomic move, and register handlers for SIGINT/SIGTERM to clean up. | |
| read: https://en.wikipedia.org/wiki/Fail-safe | |
| """ | |
| data_to_write = "Important data that shouldn't be partially saved!\n" | |
| output_path = os.path.join(tempfile.gettempdir(), "critical_output.txt") | |
| temp_path = output_path + ".tmp" | |
| def cleanup_temp(*_): | |
| try: | |
| os.remove(temp_path) | |
| except OSError: | |
| pass | |
| sys.exit(1) # exit unhappily, but safely | |
| # Register abort/termination signal handlers | |
| for sig in (signal.SIGINT, signal.SIGTERM): | |
| signal.signal(sig, cleanup_temp) | |
| print(f"Writing data to {output_path} safely (interrupt with Ctrl-C or kill)...") | |
| try: | |
| with open(temp_path, "w") as f: | |
| f.write(data_to_write) | |
| f.flush() | |
| os.fsync(f.fileno()) | |
| # Intentionally sleep to give user a chance to Ctrl+C | |
| print("...writing (simulate slow write, try interrupting now)...") | |
| time.sleep(5) | |
| # Perform atomic move only if everything completed | |
| os.replace(temp_path, output_path) | |
| print(f"✅ Safely wrote data to {output_path}") | |
| except Exception as exc: | |
| print(f"❌ Exception during write: {exc}") | |
| finally: | |
| cleanup_temp() | |
| def undo(): ... | |
| def feedback(): ... | |
| def audit(): ... | |
| class TwoPersonRuleError(Exception): | |
| pass | |
| def two_person_rule( | |
| secret: str | None = None, | |
| initiator_url: str | None = None, | |
| timeout_seconds: int = 300, | |
| ) -> bool: | |
| """ | |
| Per US Air Force Instruction (AFI) 91-104, "the two-person concept" is designed | |
| to prevent accidental or malicious launch of nuclear weapons by a single individual. | |
| We can do this with software scripts too! | |
| This function implements a coordination mechanism where two people must independently | |
| run the script and confirm each other before one script proceeds. | |
| Args: | |
| secret: A shared secret known to both persons. If None, reads from | |
| TWO_PERSON_SECRET env var or prompts for input. | |
| initiator_url: If provided, run as responder connecting to this URL. | |
| If None, run as initiator (start server). | |
| timeout_seconds: How long to wait for the second person (default 5 minutes). | |
| Returns: | |
| True if this script should proceed (initiator after confirmation). | |
| False and exits if this script is the responder (other script proceeds). | |
| Raises: | |
| TwoPersonRuleError: If coordination fails (timeout, wrong secret, etc.) | |
| ref: https://en.wikipedia.org/wiki/Two-person_rule | |
| """ | |
| def get_local_ip() -> str: | |
| """Get the local IP address that can be reached by other machines.""" | |
| try: | |
| with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: | |
| s.connect(("8.8.8.8", 80)) | |
| return s.getsockname()[0] | |
| except Exception: | |
| return "127.0.0.1" | |
| def run_as_initiator(secret_hash: str) -> bool: | |
| """Run as the initiator: start HTTP server and wait for responder.""" | |
| state = { | |
| "confirmed": False, | |
| "error": None, | |
| "responder_nonce": None, | |
| "initiator_nonce": secrets.token_hex(16), | |
| "ack_received": False, | |
| } | |
| state_lock = threading.Lock() | |
| confirmation_event = threading.Event() | |
| class TwoPersonHandler(http.server.BaseHTTPRequestHandler): | |
| def log_message(self, format, *args): | |
| pass | |
| def do_POST(self): | |
| if self.path == "/join": | |
| self._handle_join() | |
| elif self.path == "/ack": | |
| self._handle_ack() | |
| else: | |
| self.send_error(404) | |
| def _handle_join(self): | |
| try: | |
| content_length = int(self.headers.get("Content-Length", 0)) | |
| body = self.rfile.read(content_length).decode("utf-8") | |
| data = json.loads(body) | |
| responder_secret_hash = data.get("secret_hash", "") | |
| responder_nonce = data.get("nonce", "") | |
| if not secrets.compare_digest(responder_secret_hash, secret_hash): | |
| self.send_response(403) | |
| self.send_header("Content-Type", "application/json") | |
| self.end_headers() | |
| self.wfile.write(json.dumps({"error": "Secret mismatch"}).encode()) | |
| with state_lock: | |
| state["error"] = "Responder provided incorrect secret" | |
| return | |
| with state_lock: | |
| state["responder_nonce"] = responder_nonce | |
| state["confirmed"] = True | |
| self.send_response(200) | |
| self.send_header("Content-Type", "application/json") | |
| self.end_headers() | |
| response = { | |
| "status": "confirmed", | |
| "initiator_nonce": state["initiator_nonce"], | |
| } | |
| self.wfile.write(json.dumps(response).encode()) | |
| except Exception as e: | |
| self.send_error(400, str(e)) | |
| def _handle_ack(self): | |
| try: | |
| content_length = int(self.headers.get("Content-Length", 0)) | |
| body = self.rfile.read(content_length).decode("utf-8") | |
| data = json.loads(body) | |
| ack_nonce = data.get("initiator_nonce", "") | |
| if secrets.compare_digest(ack_nonce, state["initiator_nonce"]): | |
| with state_lock: | |
| state["ack_received"] = True | |
| confirmation_event.set() | |
| self.send_response(200) | |
| self.send_header("Content-Type", "application/json") | |
| self.end_headers() | |
| self.wfile.write(json.dumps({"status": "ok"}).encode()) | |
| except Exception as e: | |
| self.send_error(400, str(e)) | |
| server = http.server.HTTPServer(("0.0.0.0", 0), TwoPersonHandler) | |
| port = server.server_address[1] | |
| local_ip = get_local_ip() | |
| print("=" * 60) | |
| print("TWO-PERSON RULE: Initiator Mode") | |
| print("=" * 60 + "\n") | |
| print("Share the following with the second person:") | |
| print(f" URL: http://{local_ip}:{port}") | |
| print(" (They must also know the shared secret)\n") | |
| print(f"Waiting for second person to connect (timeout: {timeout_seconds}s)...") | |
| print("Press Ctrl+C to abort.\n") | |
| server_thread = threading.Thread(target=server.serve_forever) | |
| server_thread.daemon = True | |
| server_thread.start() | |
| try: | |
| if not confirmation_event.wait(timeout=timeout_seconds): | |
| server.shutdown() | |
| raise TwoPersonRuleError(f"Timeout: No second person connected within {timeout_seconds} seconds.") | |
| server.shutdown() | |
| with state_lock: | |
| if state["error"]: | |
| raise TwoPersonRuleError(state["error"]) | |
| if not state["ack_received"]: | |
| raise TwoPersonRuleError("Handshake incomplete: no acknowledgment received.") | |
| print("\n" + ("=" * 60)) | |
| print("Two-person confirmation COMPLETE!") | |
| print("Second person has confirmed. This script will proceed.") | |
| print(("=" * 60) + "\n") | |
| return True | |
| except KeyboardInterrupt: | |
| server.shutdown() | |
| raise TwoPersonRuleError("Aborted by user (Ctrl+C).") | |
| def run_as_responder(secret_hash: str) -> NoReturn: | |
| """Run as the responder: connect to initiator and confirm.""" | |
| print("=" * 60) | |
| print("TWO-PERSON RULE: Responder Mode") | |
| print("=" * 60) | |
| print(f"\nConnecting to initiator at: {initiator_url}\n") | |
| responder_nonce = secrets.token_hex(16) | |
| join_url = initiator_url.rstrip("/") + "/join" | |
| join_data = json.dumps( | |
| { | |
| "secret_hash": secret_hash, | |
| "nonce": responder_nonce, | |
| } | |
| ).encode("utf-8") | |
| try: | |
| req = urllib.request.Request( | |
| join_url, | |
| data=join_data, | |
| headers={"Content-Type": "application/json"}, | |
| method="POST", | |
| ) | |
| with urllib.request.urlopen(req, timeout=30) as response: | |
| response_data = json.loads(response.read().decode("utf-8")) | |
| except urllib.error.HTTPError as e: | |
| if e.code == 403: | |
| raise TwoPersonRuleError("Secret mismatch! Your secret does not match the initiator's secret.") | |
| raise TwoPersonRuleError(f"Connection failed: HTTP {e.code}") | |
| except urllib.error.URLError as e: | |
| raise TwoPersonRuleError(f"Connection failed: {e.reason}") | |
| except Exception as e: | |
| raise TwoPersonRuleError(f"Connection failed: {e}") | |
| if response_data.get("status") != "confirmed": | |
| raise TwoPersonRuleError(f"Unexpected response: {response_data}") | |
| initiator_nonce = response_data.get("initiator_nonce", "") | |
| if not initiator_nonce: | |
| raise TwoPersonRuleError("Invalid response: missing initiator nonce") | |
| print("Secret verified by initiator!") | |
| print() | |
| ack_url = initiator_url.rstrip("/") + "/ack" | |
| ack_data = json.dumps( | |
| { | |
| "initiator_nonce": initiator_nonce, | |
| } | |
| ).encode("utf-8") | |
| try: | |
| req = urllib.request.Request( | |
| ack_url, | |
| data=ack_data, | |
| headers={"Content-Type": "application/json"}, | |
| method="POST", | |
| ) | |
| with urllib.request.urlopen(req, timeout=30) as response: | |
| response.read() | |
| except Exception as e: | |
| raise TwoPersonRuleError(f"Acknowledgment failed: {e}") | |
| print("=" * 60) | |
| print("Two-person confirmation COMPLETE!") | |
| print("Initiator has been notified. THIS script will now EXIT.") | |
| print("(The initiator's script will proceed.)") | |
| print("=" * 60) | |
| print() | |
| sys.exit(0) | |
| # Get the shared secret | |
| if secret is None: | |
| secret = os.environ.get("TWO_PERSON_SECRET") | |
| if secret is None: | |
| print("Two-Person Rule: Enter the shared secret known to both persons.") | |
| secret = input("Secret: ").strip() | |
| if not secret: | |
| raise TwoPersonRuleError("A shared secret is required for two-person coordination.") | |
| # Hash the secret to avoid transmitting it in plaintext | |
| secret_hash = hashlib.sha256(secret.encode()).hexdigest() | |
| if initiator_url is None: | |
| confirmed = run_as_initiator(secret_hash) | |
| else: | |
| run_as_responder(secret_hash) | |
| if confirmed: | |
| print("Demo: The two-person rule was satisfied!") | |
| print("In a real script, dangerous operations would proceed here.") | |
| def main(): | |
| parser = argparse.ArgumentParser(description="SafetyKit Demos: Choose which safety technique to demo.") | |
| parser.add_argument( | |
| "demo", | |
| choices=["dryrun", "confirm", "pause", "abort", "two_person"], | |
| help=( | |
| "Which demo to run: 'dryrun' (dry run mode), 'confirm' (ask for confirmation), " | |
| "'pause' (pause before action), 'abort' (abort-safe writes), " | |
| "'two_person' (two-person rule coordination)." | |
| ), | |
| ) | |
| parser.add_argument( | |
| "--connect", | |
| metavar="URL", | |
| help="For 'two_person' demo: connect to an initiator at this URL (responder mode).", | |
| ) | |
| parser.add_argument( | |
| "--secret", | |
| help="For 'two_person' demo: the shared secret (can also use TWO_PERSON_SECRET env var).", | |
| ) | |
| parser.add_argument( | |
| "--timeout", | |
| type=int, | |
| default=300, | |
| help="For 'two_person' demo: timeout in seconds waiting for second person (default: 300).", | |
| ) | |
| args = parser.parse_args() | |
| if args.demo == "dryrun": | |
| praise_dryrun(dryrun=True) | |
| elif args.demo == "confirm": | |
| ask_for_confirmation() | |
| elif args.demo == "pause": | |
| pause_a_beat() | |
| elif args.demo == "abort": | |
| abort_safely() | |
| elif args.demo == "two_person": | |
| try: | |
| two_person_rule( | |
| secret=args.secret, | |
| initiator_url=args.connect, | |
| timeout_seconds=args.timeout, | |
| ) | |
| except TwoPersonRuleError as e: | |
| print(f"Two-person rule failed: {e}") | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment