Last active
November 29, 2025 15:38
-
-
Save AkBKukU/35a1bdf2da0c74afcdd79a35a59ca6c0 to your computer and use it in GitHub Desktop.
host-this.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # Python System | |
| import argparse | |
| import datetime | |
| import sys | |
| import os | |
| import json | |
| from pprint import pprint | |
| import asyncio | |
| import signal | |
| from multiprocessing import Process | |
| # External Modules | |
| class WebInterface(object): | |
| try: | |
| # External Modules | |
| from flask import Flask | |
| from flask import Response | |
| from flask import request | |
| from flask import send_file | |
| from flask import redirect | |
| from flask import make_response | |
| from flask import send_from_directory | |
| except Exception as e: | |
| print("Need to install Python module [flask]") | |
| sys.exit(1) | |
| try: | |
| # External Modules | |
| from flask_autoindex import AutoIndex | |
| except Exception as e: | |
| print("Need to install Python module [flask_autoindex]") | |
| sys.exit(1) | |
| """Web interface for managing rips | |
| """ | |
| def __init__(self,args): | |
| self.host_dir=os.path.realpath(__file__).replace(os.path.basename(__file__),"") | |
| self.app = self.Flask("Web Host") | |
| self.app.logger.disabled = True | |
| #log = logging.getLogger('werkzeug') | |
| #log.disabled = True | |
| self.AutoIndex(self.app, browse_root=dirs[0]) | |
| # Static content | |
| # self.app.static_folder=dirs[0] | |
| # self.app.static_url_path='/' | |
| # Define routes in class to use with flask | |
| # self.app.add_url_rule('/','home', self.index) | |
| self.host = args.ip | |
| self.port = args.port | |
| self.dirs = args.dirs | |
| async def start(self): | |
| """ Run Flask in a process thread that is non-blocking """ | |
| print("Starting Flask") | |
| self.web_thread = Process(target=self.app.run, | |
| kwargs={ | |
| "host":self.host, | |
| "port":self.port, | |
| "debug":False, | |
| "use_reloader":False | |
| } | |
| ) | |
| self.web_thread.start() | |
| def stop(self): | |
| """ Send SIGKILL and join thread to end Flask server """ | |
| if hasattr(self, "web_thread") and self.web_thread is not None: | |
| self.web_thread.terminate() | |
| self.web_thread.join() | |
| if hasattr(self, "rip_thread"): | |
| self.rip_thread.terminate() | |
| self.rip_thread.join() | |
| # ------ Async Server Handler ------ | |
| global loop_state | |
| global server | |
| loop_state = True | |
| server = None | |
| async def asyncLoop(): | |
| """ Blocking main loop to provide time for async tasks to run""" | |
| print('Blocking main loop') | |
| global loop_state | |
| while loop_state: | |
| await asyncio.sleep(1) | |
| def exit_handler(sig, frame): | |
| """ Handle CTRL-C to gracefully end program and API connections """ | |
| global loop_state | |
| print('You pressed Ctrl+C!') | |
| loop_state = False | |
| server.stop() | |
| # ------ Async Server Handler ------ | |
| async def startWeb(args): | |
| # Internal Modules | |
| global server | |
| server = WebInterface(args) | |
| """ Start connections to async modules """ | |
| # Setup CTRL-C signal to end programm | |
| signal.signal(signal.SIGINT, exit_handler) | |
| print('Press Ctrl+C to exit program') | |
| # Start async modules | |
| L = await asyncio.gather( | |
| server.start(), | |
| asyncLoop() | |
| ) | |
| def main(): | |
| """ Execute as a CLI and process parameters | |
| """ | |
| # Setup CLI arguments | |
| parser = argparse.ArgumentParser( | |
| prog="host-this", | |
| description='Adhoc web server', | |
| epilog='') | |
| parser.add_argument('-i', '--ip', help="Web server listening IP", default="0.0.0.0") | |
| parser.add_argument('-p', '--port', help="Web server listening IP", default="5000") | |
| parser.add_argument('dirs', help="", default=None, nargs=argparse.REMAINDER) | |
| args = parser.parse_args() | |
| # Run web server | |
| asyncio.run(startWeb(args)) | |
| sys.exit(0) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment