Skip to content

Instantly share code, notes, and snippets.

@herczy
Last active June 29, 2022 07:38
Show Gist options
  • Select an option

  • Save herczy/ddc1f53d04157427f9dcf9da16ba8d91 to your computer and use it in GitHub Desktop.

Select an option

Save herczy/ddc1f53d04157427f9dcf9da16ba8d91 to your computer and use it in GitHub Desktop.
Small man in the middle script
#!/usr/bin/env python3
import argparse
import socket
import select
def main():
parser = argparse.ArgumentParser(description="MITM proxy")
parser.add_argument(
"--bind-host", type=str, default="localhost", help="Bind address"
)
parser.add_argument("-p", "--bind-port", type=int, required=True, help="Bind port")
parser.add_argument(
"-H", "--connect-host", type=str, required=True, help="Connect host"
)
parser.add_argument(
"--connect-port", type=int, help="Bind port (same as bind port by default)"
)
ns = parser.parse_args()
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((ns.bind_host, ns.bind_port))
server.listen(10)
connect_port = ns.connect_port or ns.bind_port
connections = {}
print(f"Server is {ns.bind_host}:{ns.bind_port}")
print(f"Client is {ns.connect_host}:{connect_port}")
try:
while True:
r, _w, _x = select.select([server, *connections], [], [], 100.0)
for soc in r:
if soc == server:
client, addr = server.accept()
cli_host, cli_port = addr
cli_fd = client.fileno()
remote = socket.socket(
socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP
)
remote.connect((ns.connect_host, connect_port))
remote_fd = remote.fileno()
connections[remote] = client
connections[client] = remote
print(
f"[local={cli_fd} remote={remote_fd}] Connection received from {cli_host}:{cli_port}"
)
else:
assert soc in connections
data = soc.recv(4096)
if not data:
remote = connections.pop(soc)
del connections[remote]
print(
f"[socket={soc.fileno()} remote={remote.fileno()}] Connection ended"
)
soc.close()
remote.close()
continue
print(
f"[socket={soc.fileno()}] Received {len(data)} byte(s) of data"
)
connections[soc].send(data)
finally:
socket.shutdown(socket.SHUT_RDWR)
socket.close()
for soc in connections:
soc.shutdown(socket.SHUT_RDWR)
soc.close()
if __name__ == "__main__":
exit(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment