|
import os |
|
import psutil |
|
import socket |
|
from pathlib import Path |
|
|
|
from aiohttp import web |
|
import aiohttp_jinja2 |
|
import jinja2 |
|
|
|
UPLOAD_DIR = "uploads" |
|
BASE_DIR = Path(__file__).resolve().parent |
|
|
|
# -------------------------------------------------- |
|
# Get a list of IP addresses for all network interfaces |
|
# -------------------------------------------------- |
|
def list_all_interface_ips(): |
|
result = {} |
|
|
|
addrs = psutil.net_if_addrs() |
|
for ifname, addr_list in addrs.items(): |
|
ipv4_list = [] |
|
ipv6_list = [] |
|
for addr in addr_list: |
|
if addr.family == socket.AF_INET: # IPv4 |
|
ipv4_list.append(addr.address) |
|
elif addr.family == socket.AF_INET6: # IPv6 |
|
ipv6 = addr.address.split('%')[0] |
|
ipv6_list.append(ipv6) |
|
|
|
if ipv4_list or ipv6_list: |
|
result[ifname] = { |
|
"ipv4": ipv4_list, |
|
"ipv6": ipv6_list, |
|
} |
|
|
|
return result |
|
|
|
|
|
def print_interface_ips(): |
|
interfaces = list_all_interface_ips() |
|
print("=== Interface IP addresses ===") |
|
if not interfaces: |
|
print("No interfaces found.") |
|
return |
|
|
|
for ifname, info in interfaces.items(): |
|
print(f"[{ifname}]") |
|
for ip in info["ipv4"]: |
|
print(f" IPv4: {ip}") |
|
for ip in info["ipv6"]: |
|
print(f" IPv6: {ip}") |
|
print("================================") |
|
|
|
|
|
# -------------------------------------------------- |
|
# Handlers |
|
# -------------------------------------------------- |
|
|
|
@aiohttp_jinja2.template("template.html") |
|
async def index(request: web.Request): |
|
# No message on initial display |
|
return { |
|
"message": None, |
|
"is_error": False, |
|
} |
|
|
|
|
|
async def handle_upload(request: web.Request) -> web.Response: |
|
""" |
|
Upload handler that receives multiple files (name="files"). |
|
""" |
|
reader = await request.multipart() |
|
|
|
saved_files: list[str] = [] |
|
|
|
# Process each multipart field in sequence |
|
while True: |
|
field = await reader.next() |
|
if field is None: |
|
break |
|
|
|
# Only handle the field with name="files" |
|
if field.name != "files": |
|
continue |
|
|
|
filename = field.filename |
|
if not filename: |
|
continue |
|
|
|
# Protect against directory traversal |
|
filename = os.path.basename(filename) |
|
|
|
# Create directory if needed |
|
os.makedirs(UPLOAD_DIR, exist_ok=True) |
|
|
|
# Get unique path |
|
unique_path = get_unique_filepath(UPLOAD_DIR, filename) |
|
|
|
# Save file |
|
with open(unique_path, "wb") as f: |
|
while True: |
|
chunk = await field.read_chunk() |
|
if not chunk: |
|
break |
|
f.write(chunk) |
|
|
|
saved_name = os.path.basename(unique_path) |
|
saved_files.append(saved_name) |
|
|
|
if not saved_files: |
|
# If no files were saved at all, treat as an error |
|
context = { |
|
"message": "No files were uploaded or all filenames were empty.", |
|
"is_error": True, |
|
} |
|
else: |
|
if len(saved_files) == 1: |
|
msg = f"File saved: {saved_files[0]}" |
|
else: |
|
file_list_text = ", ".join(saved_files) |
|
msg = f"{len(saved_files)} files saved: {file_list_text}" |
|
|
|
context = { |
|
"message": msg, |
|
"is_error": False, |
|
} |
|
|
|
# Render and return the Jinja2 template |
|
response = aiohttp_jinja2.render_template( |
|
"template.html", |
|
request, |
|
context, |
|
) |
|
return response |
|
|
|
|
|
# -------------------------------------------------- |
|
# Prevent filename conflicts |
|
# -------------------------------------------------- |
|
def get_unique_filepath(dirpath: str, filename: str) -> str: |
|
""" |
|
If filename already exists in dirpath, generate a non-conflicting path |
|
such as foo.png, foo_1.png, foo_2.png ... |
|
""" |
|
base, ext = os.path.splitext(filename) |
|
candidate = os.path.join(dirpath, filename) |
|
counter = 1 |
|
|
|
while os.path.exists(candidate): |
|
candidate_name = f"{base}_{counter}{ext}" |
|
candidate = os.path.join(dirpath, candidate_name) |
|
counter += 1 |
|
|
|
return candidate |
|
|
|
|
|
# -------------------------------------------------- |
|
# App factory |
|
# -------------------------------------------------- |
|
def create_app() -> web.Application: |
|
app = web.Application() |
|
|
|
# Set up Jinja2 |
|
aiohttp_jinja2.setup( |
|
app, |
|
loader=jinja2.FileSystemLoader("."), |
|
) |
|
|
|
app.add_routes([ |
|
web.get("/", index), |
|
web.post("/upload", handle_upload), |
|
]) |
|
|
|
if not os.path.exists(UPLOAD_DIR): |
|
os.mkdir(UPLOAD_DIR) |
|
# Static file listing |
|
app.router.add_static('/static', UPLOAD_DIR, show_index=True) |
|
|
|
return app |
|
|
|
|
|
if __name__ == "__main__": |
|
print_interface_ips() |
|
|
|
app = create_app() |
|
web.run_app(app, host="0.0.0.0", port=8080) |