Skip to content

Instantly share code, notes, and snippets.

@symbioquine
Created November 15, 2025 16:38
Show Gist options
  • Select an option

  • Save symbioquine/78004b277301f4d4efa88cb9b6c9dcd3 to your computer and use it in GitHub Desktop.

Select an option

Save symbioquine/78004b277301f4d4efa88cb9b6c9dcd3 to your computer and use it in GitHub Desktop.
# WIP: Lots of stuff copied from https://github.com/liamcottle/reticulum-meshchat so should also be MIT licensed
import re
import logging
import RNS
import json
import time
import asyncio
import traceback
from collections import defaultdict
from typing import Callable, List
from enum import IntFlag, auto
from aiohttp import web
routes = web.RouteTableDef()
class ProgressStage(IntFlag):
AWAITING_PATH = auto()
AWAITING_LINK = auto()
AWAITING_REQUEST = auto()
PATH_LOOKUP_TIMEOUT = 15
LINK_ESTABLISHMENT_TIMEOUT = 15
nomadnet_link_locks = defaultdict(asyncio.Lock)
nomadnet_cached_links = {}
async def get_nomadnetwork_node_dest(destination_hash: bytes, on_progress: Callable):
timeout_after_seconds = time.time() + PATH_LOOKUP_TIMEOUT
if not RNS.Transport.has_path(destination_hash):
RNS.Transport.request_path(destination_hash)
while not RNS.Transport.has_path(destination_hash) and time.time() < timeout_after_seconds:
await on_progress(stage=ProgressStage.AWAITING_PATH)
await asyncio.sleep(0.1)
if not RNS.Transport.has_path(destination_hash):
raise Exception("Could not find path to destination.")
identity = RNS.Identity.recall(destination_hash)
destination = RNS.Destination(
identity,
RNS.Destination.OUT,
RNS.Destination.SINGLE,
"nomadnetwork",
'node',
)
return destination
async def get_established_nomadnetwork_node_link(destination_hash: bytes, on_progress: Callable):
timeout_after_seconds = time.time() + LINK_ESTABLISHMENT_TIMEOUT
async with nomadnet_link_locks[destination_hash]:
if not destination_hash in nomadnet_cached_links or nomadnet_cached_links[destination_hash][0].status in {RNS.Link.STALE, RNS.Link.CLOSED}:
destination = await get_nomadnetwork_node_dest(destination_hash, on_progress)
# print("[NomadnetDownloader] establishing new link for request")
link = RNS.Link(destination)
nomadnet_cached_links[destination_hash] = (link, timeout_after_seconds)
link, existing_establishment_timeout = nomadnet_cached_links[destination_hash]
while link.status in {RNS.Link.PENDING, RNS.Link.HANDSHAKE} and time.time() < existing_establishment_timeout:
await on_progress(stage=ProgressStage.AWAITING_LINK, link=link)
if link.status is RNS.Link.CLOSED:
break
await asyncio.sleep(0.1)
if link.status is not RNS.Link.ACTIVE:
raise Exception(f"Link not active after timeout of {LINK_ESTABLISHMENT_TIMEOUT} seconds.")
return link
async def download_nomadnetwork_page(destination_hash: bytes, path: str, data: str|None, on_progress: Callable):
link = await get_established_nomadnetwork_node_link(destination_hash, on_progress)
receipt = link.request(
path=path,
data=data,
)
while receipt.status not in {RNS.RequestReceipt.FAILED, RNS.RequestReceipt.READY}:
await on_progress(stage=ProgressStage.AWAITING_REQUEST, receipt=receipt)
await asyncio.sleep(0.1)
if receipt.status is RNS.RequestReceipt.FAILED:
raise Exception("Request failed")
if receipt.status is not RNS.RequestReceipt.READY:
raise Exception("Request timed out")
return receipt.response.decode("utf-8")
request_path_pattern = re.compile(r'(?P<destination_hash>[0-9A-Fa-f]{32})(:(?P<page_path>[^`]+))?(`(?P<page_data>[^`]*))?')
@routes.view('/{args:.*}')
class MyView(web.View):
async def get(self):
print(repr(self.request.match_info))
args = self.request.match_info.get('args')
m = request_path_pattern.match(args)
if m is None:
return web.Response(status=400, text=f"Invalid request path format: {args!r}")
request_path_parts = m.groupdict()
res = web.StreamResponse()
res.content_type = "text/html; charset=utf-8"
# Note that the actual outcome isn't known yet but we have to
# set a status before the headers can be sent so we'll send a 202
res.set_status(202)
res.enable_chunked_encoding()
await res.prepare(self.request)
await res.write('<head></head><body>'.encode('utf-8'))
destination_hash = bytes.fromhex(request_path_parts['destination_hash'])
page_path = request_path_parts.get('page_path', '/page/index.mu')
# TODO: populate this...
combined_data = {}
progress_state = {}
async def on_progress(stage, link=None, receipt=None):
timeof_last_msg = progress_state.get('timeof_last_msg', None)
# Don't spam multiple progress messages per second
if timeof_last_msg is not None and (time.time() - timeof_last_msg) < 1.0:
return
progress_msg = None
if stage is ProgressStage.AWAITING_PATH:
progress_msg = 'Requesting Destination Path...'
elif stage is ProgressStage.AWAITING_LINK:
if link.status is RNS.Link.HANDSHAKE:
progress_msg = 'Link Handshake...'
else:
progress_msg = 'Requesting Link...'
elif stage is ProgressStage.AWAITING_REQUEST:
if receipt.status is RNS.RequestReceipt.SENT:
progress_msg = 'Request Sent...'
elif receipt.status is RNS.RequestReceipt.DELIVERED:
progress_msg = 'Request Delivered...'
elif receipt.status is RNS.RequestReceipt.RECEIVING:
percent_progress = receipt.progress * 100
progress_msg = f'Receiving Response... {percent_progress:.1f}%'
last_progress_msg = progress_state.get('last_msg', None)
# Don't spam the same progress message multiple times in a row
if progress_msg == last_progress_msg:
return
progress_state['last_msg'] = progress_msg
progress_state['timeof_last_msg'] = time.time()
await res.write(f"<div class='progress'>{progress_msg}<br></div>\n".encode())
try:
page_content = await download_nomadnetwork_page(destination_hash, page_path, combined_data, on_progress)
logging.debug('got page content: ' + repr(page_content))
raw_page_content_lines = page_content.split('\n')
page_state = {
'source_ident': destination_hash.hex(),
}
html_content_lines = [
convert_micron_line(line_idx, raw_page_content_line, page_state) for line_idx, raw_page_content_line in enumerate(raw_page_content_lines)
]
await res.write("\n".join(html_content_lines).encode())
# Hide the progress now that we have page content
await res.write('''
<style>
.progress {
display: none;
}
</style>'''.encode('utf-8'))
except Exception as e:
traceback.print_exc()
await res.write(f"<div class='failure'>{e}<br></div>\n".encode())
await res.write('</body>'.encode('utf-8'))
await res.write_eof()
return res
class MicronState(IntFlag):
TEXT = auto()
ESCAPED = auto()
FORMATTING = auto()
LINK = auto()
def convert_micron_link(source_ident, raw_link):
print('convert_micron_link', source_ident, raw_link)
label = None
raw_link_path = ''
raw_link_fields = ''
if '`' in raw_link:
link_parts = raw_link.split('`')
if len(link_parts) == 2:
label, raw_link_path = link_parts
elif len(link_parts) >= 3:
label, raw_link_path, raw_link_fields = link_parts[:3]
else:
raw_link_path = raw_link
link_ident = source_ident
link_path = raw_link_path
if ':' in raw_link_path:
link_ident, link_path = raw_link_path.split(':', 1)
if not link_ident:
link_ident = source_ident
# TODO: Include fields in link
return f'<a target="_self" href="http://nomadnet/{link_ident}:{link_path}">{label}</a>'
def convert_micron_line(line_idx, raw_page_content_line, page_state):
if not raw_page_content_line or raw_page_content_line[0] == '#':
return '<br>'
output_line = ''
state = MicronState.TEXT
i = -1
while (i + 1) < len(raw_page_content_line):
i += 1
c = raw_page_content_line[i]
if state is MicronState.FORMATTING:
state = MicronState.TEXT
# TODO: Implement formatting
if c == '_':
# state.formatting.underline = !state.formatting.underline;
# break;
pass
if c == '!':
# state.formatting.bold = !state.formatting.bold;
# break;
pass
if c == '*':
# state.formatting.italic = !state.formatting.italic;
# break;
pass
if c == 'F':
i += 3
# // next 3 chars = fg color
# if (line.length >= i+4) {
# let color = line.substr(i+1,3);
# state.fg_color = color;
# skip = 3;
# }
# break;
pass
if c == 'f':
pass
# // reset fg
# state.fg_color = this.SELECTED_STYLES.plain.fg;
# break;
if c == 'B':
i += 3
# if (line.length >= i+4) {
# let color = line.substr(i+1,3);
# state.bg_color = color;
# skip = 3;
# }
# break;
if c == 'b':
pass
# // reset bg
# state.bg_color = this.DEFAULT_BG;
# break;
if c == '`':
pass
# // reset all formatting
# state.formatting.bold = false;
# state.formatting.underline = false;
# state.formatting.italic = false;
# state.fg_color = this.SELECTED_STYLES.plain.fg;
# state.bg_color = this.DEFAULT_BG;
# state.align = state.default_align;
# break;
if c == 'c':
pass
# state.align = (state.align === "center") ? state.default_align : "center";
# break;
if c == 'l':
pass
# state.align = (state.align === "left") ? state.default_align : "left";
# break;
if c == 'r':
pass
# state.align = (state.align === "right") ? state.default_align : "right";
# break;
if c == 'a':
pass
# state.align = state.default_align;
# break;
if c == '<':
pass
# // Flush current text first
# if (part.length > 0) {
# output.push([this.stateToStyle(state), part]);
# part = "";
# }
# let fieldData = this.parseField(line, i, state);
# if (fieldData) {
# output.push(fieldData.obj);
# i += fieldData.skip;
# continue;
# }
# break;
if c == '[':
link_end_idx = raw_page_content_line.find(']', i + 1)
if link_end_idx == -1:
output_line += c
continue
output_line += convert_micron_link(page_state['source_ident'], raw_page_content_line[i + 1:link_end_idx])
i = link_end_idx
continue
if state is MicronState.ESCAPED:
output_line += c
state = MicronState.TEXT
continue
if state is MicronState.TEXT and c == '\\':
state = MicronState.ESCAPED
continue
if state is MicronState.TEXT and c == '`':
state = MicronState.FORMATTING
continue
if c == '<':
output_line += '&lt;'
else:
output_line += c
return output_line + '<br>'
reticulum = RNS.Reticulum(None)
app = web.Application()
logging.basicConfig(level=logging.DEBUG)
app.add_routes(routes)
web.run_app(app, host="127.0.0.1", port=8888)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment