Skip to content

Instantly share code, notes, and snippets.

@thomasht86
Forked from jeffreytolar/test-requests.py
Last active February 21, 2025 08:58
Show Gist options
  • Select an option

  • Save thomasht86/8033c296e12418662f281a9599e9ebdb to your computer and use it in GitHub Desktop.

Select an option

Save thomasht86/8033c296e12418662f281a9599e9ebdb to your computer and use it in GitHub Desktop.
# /// script
# dependencies = [
# "certifi==2024.2.2",
# "cffi==1.16.0",
# "charset-normalizer==3.3.2",
# "cryptography==42.0.7",
# "idna==3.7",
# "pycparser==2.22",
# "pyOpenSSL==24.1.0",
# "requests==2.32.3",
# "urllib3==2.2.1",
# ]
# ///
from concurrent.futures import ThreadPoolExecutor, wait
import http.server
import random
import socketserver
import ssl
import time
from OpenSSL import crypto
def generate_certificate(subj, key_file, cert_file):
# Create a key pair
key = crypto.PKey()
key.generate_key(crypto.TYPE_RSA, 2048)
cert = crypto.X509()
cert.get_subject().CN = subj
cert.set_serial_number(random.randrange(10, 10000))
cert.gmtime_adj_notBefore(0)
cert.gmtime_adj_notAfter(24*60*60)
cert.set_issuer(cert.get_subject())
cert.set_pubkey(key)
if subj == "localhost":
cert.add_extensions([
crypto.X509Extension(b"subjectAltName", False, b"DNS:localhost, IP:127.0.0.1")
])
cert.sign(key, 'sha256')
with open(cert_file, "wb") as fh:
fh.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
with open(key_file, "wb") as fh:
fh.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, key))
class MyHTTPRequestHandler(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.end_headers()
cert = self.connection.getpeercert()
subject = dict(x[0] for x in cert['subject'])
subject_name = subject.get('commonName', 'Unknown')
self.wfile.write(f'CN: {subject_name} / URL: {self.path}'.encode())
def log_message(self, format, *args):
pass
class HTTPSServer(socketserver.TCPServer):
def __init__(self, server_address, RequestHandlerClass):
super().__init__(server_address, RequestHandlerClass, bind_and_activate=True)
self.ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
self.ssl_context.load_cert_chain("server.crt", "server.key")
self.ssl_context.set_ciphers("@SECLEVEL=1:ALL")
self.ssl_context.verify_mode = ssl.CERT_REQUIRED
self.ssl_context.load_verify_locations("client1.crt")
self.ssl_context.load_verify_locations("client2.crt")
self.ssl_context.load_verify_locations("client3.crt")
self.socket = self.ssl_context.wrap_socket(self.socket, server_side=True)
generate_certificate("localhost", "server.key", "server.crt")
generate_certificate("client1", "client1.key", "client1.crt")
generate_certificate("client2", "client2.key", "client2.crt")
generate_certificate("client3", "client3.key", "client3.crt")
# hack to add our self-signed cert to the default context; `verify=server.crt` will use a dedicated SSLContext
import certifi.core
certifi.core._CACERT_PATH = "server.crt"
import requests
stop_server = None
def run_server():
global stop_server
with HTTPSServer(('localhost', 8443), MyHTTPRequestHandler) as httpsd:
print("Server started on port 8443...")
stop_server = httpsd.shutdown
httpsd.serve_forever()
def run_client(name):
session = requests.Session()
session.cert = (f"{name}.crt", f"{name}.key")
time.sleep(0.5) # let the server start up
try:
resp = session.request("GET", f"https://127.0.0.1:8443/{name}")
expected = f"CN: {name} / URL: /{name}"
if resp.text != expected:
print(f"FAIL {name}: {resp.text}")
else:
print(f" OK {name}: {resp.text}")
except Exception as e:
print(f"FAIL {name}: {e}")
ex = ThreadPoolExecutor()
ex.submit(run_server)
client_futures = []
client_futures.append(ex.submit(run_client, "client1"))
client_futures.append(ex.submit(run_client, "client2"))
client_futures.append(ex.submit(run_client, "client3"))
wait(client_futures)
if stop_server:
stop_server()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment