Skip to content

Instantly share code, notes, and snippets.

@daveleroy
Last active March 18, 2025 09:15
Show Gist options
  • Select an option

  • Save daveleroy/536b0a11280aa6c3ec668f4c980a866f to your computer and use it in GitHub Desktop.

Select an option

Save daveleroy/536b0a11280aa6c3ec668f4c980a866f to your computer and use it in GitHub Desktop.
import sublime
import asyncio
import threading
import sys
class Handle:
def __init__(self, callback, args):
self.callback = callback
self.args = args
def __call__(self):
if self.callback:
self.callback(*self.args)
def cancel(self):
self.callback = None
self.args = None
# I DID NOT TEST WINDOWS
if sys.platform == 'win32':
DefaultEventLoop = asyncio.ProactorEventLoop
else:
DefaultEventLoop = asyncio.SelectorEventLoop
# override the methods that are intended to return work to the event loop thread
class SublimeEventLoop(DefaultEventLoop):
def call_soon(self, callback, *args, context=None): # type: ignore
handle = Handle(callback, args)
sublime.set_timeout(handle, 0)
return handle
def call_later(self, delay, callback, *args, context=None): # type: ignore
handle = Handle(callback, args)
sublime.set_timeout(handle, delay * 1000)
return handle
# Methods for interacting with threads.
def call_soon_threadsafe(self, callback, *args): # type: ignore
handle = Handle(callback, args)
sublime.set_timeout(handle, 0)
return handle
# todo: implement
def call_at(self, when, callback, *args, context=None):
raise NotImplementedError
class EventLoopPolicy(asyncio.DefaultEventLoopPolicy):
def new_event_loop(self):
loop = SublimeEventLoop()
return loop
asyncio.set_event_loop_policy(EventLoopPolicy())
# Create the loop that everyone can share
loop = asyncio.new_event_loop()
asyncio._set_running_loop(loop) # what is going on here
thread = threading.Thread(target=loop.run_forever).start()
# Below is some random tests ensuring the event loop is returning work on the correct thread
# Would need to test a lot more things (and libraries that use asyncio)
def ensure_main_thread():
if threading.currentThread() != threading.main_thread():
raise Exception('We should be in the main thread...')
async def handle_client_test(reader, writer):
ensure_main_thread()
addr = writer.get_extra_info('peername')
print(f'Connection from {addr!r}', threading.currentThread())
while True:
data = await reader.read(1024)
ensure_main_thread()
if not data:
break
message = data.decode()
print(f'Received {message!r} from {addr!r}', threading.currentThread())
writer.write(data)
await writer.drain()
ensure_main_thread()
if message == 'quit':
break
print(f'Close the connection from {addr!r}', threading.currentThread())
writer.close()
ensure_main_thread()
async def server_test():
ensure_main_thread()
server = await asyncio.start_server(handle_client_test, '127.0.0.1', 8888)
ensure_main_thread()
async with server:
ensure_main_thread()
print('Serving on {}'.format(server.sockets[0].getsockname()), threading.currentThread())
await server.serve_forever()
async def sleep_test():
ensure_main_thread()
await asyncio.sleep(1)
ensure_main_thread()
print('done sleep test', threading.currentThread())
asyncio.create_task(sleep_test())
asyncio.create_task(server_test())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment