Skip to content

Instantly share code, notes, and snippets.

@titaneric
Created October 19, 2025 13:25
Show Gist options
  • Select an option

  • Save titaneric/e1b35ccf37f3907ee9accb426d8c2d48 to your computer and use it in GitHub Desktop.

Select an option

Save titaneric/e1b35ccf37f3907ee9accb426d8c2d48 to your computer and use it in GitHub Desktop.
docker_proxy.py
#!/usr/bin/env python3
"""
HTTP proxy that adds delays to Docker API requests
"""
import asyncio
import aiohttp
from aiohttp import web
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DockerProxy:
def __init__(self, docker_host="unix:///var/run/docker.sock", delay=30, logs_delay=None):
self.docker_host = docker_host
self.delay = delay
self.logs_delay = logs_delay or delay # Use separate delay for logs streams
self.session = None
# Determine connection type but don't create connector yet
if docker_host.startswith('unix://'):
self.socket_path = docker_host[7:] # Remove 'unix://' prefix
self.base_url = 'http://localhost'
self.connection_type = 'unix'
else:
self.base_url = docker_host
self.connection_type = 'tcp'
async def _ensure_session(self):
"""Create session if it doesn't exist"""
if self.session is None:
if self.connection_type == 'unix':
connector = aiohttp.UnixConnector(path=self.socket_path)
else:
connector = aiohttp.TCPConnector()
# Configure session to handle streaming responses properly
timeout = aiohttp.ClientTimeout(total=None, connect=60)
self.session = aiohttp.ClientSession(
connector=connector,
timeout=timeout,
# Don't auto-decompress to avoid chunking issues
auto_decompress=False
)
async def proxy_request(self, request):
"""Proxy request to real Docker API with artificial delay"""
# Ensure session is created in async context
await self._ensure_session()
path = request.path_qs
method = request.method
# Determine delay based on request type
is_logs_stream = '/logs?' in path and 'follow=true' in path
current_delay = self.logs_delay if is_logs_stream else self.delay
if is_logs_stream:
logger.info(f"Proxying LOGS STREAM {method} {path} with {current_delay}s delay")
else:
logger.info(f"Proxying {method} {path} with {current_delay}s delay")
# Add artificial delay
await asyncio.sleep(current_delay)
# Prepare headers - remove problematic headers
headers = dict(request.headers)
# Remove headers that might cause issues with proxying
headers.pop('host', None)
headers.pop('content-length', None)
# Forward request to real Docker API
try:
async with self.session.request(
method,
f"{self.base_url}{path}",
headers=headers,
data=await request.read(),
allow_redirects=False
) as resp:
# Prepare response headers
response_headers = {}
for name, value in resp.headers.items():
# Skip headers that aiohttp will set automatically
if name.lower() not in ['content-encoding', 'transfer-encoding', 'content-length']:
response_headers[name] = value
# Handle streaming responses (like Docker events or logs)
if resp.headers.get('content-type', '').startswith('application/') and 'stream' in path:
# Stream the response
response = web.StreamResponse(
status=resp.status,
headers=response_headers
)
await response.prepare(request)
async for chunk in resp.content.iter_chunked(8192):
await response.write(chunk)
await response.write_eof()
return response
else:
# Regular response - read all content
body = await resp.read()
return web.Response(
body=body,
status=resp.status,
headers=response_headers
)
except asyncio.TimeoutError:
logger.error(f"Timeout proxying {method} {path}")
return web.Response(status=408, text="Request Timeout")
except Exception as e:
logger.error(f"Proxy error for {method} {path}: {e}")
return web.Response(status=500, text=f"Proxy error: {e}")
async def cleanup(self):
"""Clean up session"""
if self.session:
await self.session.close()
def create_proxy_app(docker_host="unix:///var/run/docker.sock", delay=30, logs_delay=None):
"""Create proxy application"""
proxy = DockerProxy(docker_host, delay, logs_delay)
app = web.Application()
# Store proxy instance for cleanup
app['proxy'] = proxy
# Add cleanup handler
async def cleanup_context(app):
yield
await app['proxy'].cleanup()
app.cleanup_ctx.append(cleanup_context)
# Catch all routes
app.router.add_route('*', '/{path:.*}', proxy.proxy_request)
return app
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Docker API proxy with delays')
parser.add_argument('--delay', type=int, default=30, help='Delay in seconds for regular requests')
parser.add_argument('--logs-delay', type=int, help='Delay in seconds for log stream requests (defaults to --delay)')
parser.add_argument('--port', type=int, default=2376, help='Port to listen on')
parser.add_argument('--docker-host', default='unix:///var/run/docker.sock',
help='Real Docker host to proxy to')
args = parser.parse_args()
app = create_proxy_app(args.docker_host, args.delay, args.logs_delay)
print(f"Starting Docker proxy on port {args.port}")
print(f"Proxying to: {args.docker_host}")
print(f"Regular delay: {args.delay}s")
print(f"Logs stream delay: {args.logs_delay or args.delay}s")
print("\nTo use with Vector, set docker_host to: http://localhost:2376")
print("\nExample usage:")
print(" # Slow everything:")
print(f" python main.py --delay 30")
print(" # Only slow log streams:")
print(f" python main.py --delay 0 --logs-delay 30")
print(" # Different delays:")
print(f" python main.py --delay 5 --logs-delay 30")
web.run_app(app, host='0.0.0.0', port=args.port)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment