Skip to content

Instantly share code, notes, and snippets.

@zombie110year
Created April 27, 2023 01:05
Show Gist options
  • Select an option

  • Save zombie110year/5701044a4e510bda23ec8437ececd1e6 to your computer and use it in GitHub Desktop.

Select an option

Save zombie110year/5701044a4e510bda23ec8437ececd1e6 to your computer and use it in GitHub Desktop.
在异步代码中向 Runner 中添加任务,其保证按添加顺序执行
import asyncio
import logging
from asyncio import Queue
from dataclasses import dataclass
from random import randint
from uuid import UUID, uuid4
from sys import stdout
logging.basicConfig(level=logging.DEBUG, filename="main.log")
def randn(length: int) -> list[int]:
return [randint(0, 100) for _ in range(length)]
async def main():
asyncio.create_task(RUNNER.run())
_tasks = [
asyncio.create_task(caller(f"caller-{i:02}", randn(8)))
for i in range(16)
]
await asyncio.gather(*_tasks)
async def caller(name, param):
logging.debug(f"{name}: start {param!r}")
ticket = await RUNNER.add(param)
logging.debug(f"{name}: wait for {ticket!r}")
result = await RUNNER.get_result(ticket)
logging.debug(f"{name}: end sum({param!r}) = {result!r}")
print(f"{name}: end sum({param!r}) = {result!r}")
stdout.flush()
@dataclass
class MyTask:
id: UUID
param: list[int]
result: int | None
class Runner:
"""包含一个等待队列,按照添加顺序执行任务
"""
def __init__(self) -> None:
self.wait_list = Queue()
self.done_list = dict()
async def add(self, param) -> UUID:
"""添加一个任务,返回该任务的小票"""
ticket = uuid4()
task = MyTask(id=ticket, param=param, result=None)
await self.wait_list.put(task)
return ticket
async def work(self, param) -> int:
"""执行一个任务,返回其结果"""
await asyncio.sleep(3)
return sum(param)
async def run(self):
"""主循环"""
while True:
task = await self.wait_list.get()
logging.debug(f"RUN: start {task!r}")
result = await self.work(task.param)
logging.debug(f"RUN: end {task!r}")
self.done_list[task.id] = result
self.wait_list.task_done()
async def get_result(self, ticket: UUID):
"""通过小票检查某任务是否完成"""
while True:
if ticket not in self.done_list:
await asyncio.sleep(3)
continue
result = self.done_list[ticket]
del self.done_list[ticket]
return result
RUNNER = Runner()
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment