Skip to content

Instantly share code, notes, and snippets.

@Lonami
Lonami / launchasync.py
Last active January 8, 2025 23:22
Wrapper to launch async tasks from threaded code
import threading
import asyncio
from queue import Queue as BlockingQueue
class TwoSidedQueue:
"""
Behaves like an `asyncio.Queue`, but `get` and `put` act on different ends.
"""
def __init__(self, queue_in, queue_out):