Created
April 27, 2023 01:05
-
-
Save zombie110year/5701044a4e510bda23ec8437ececd1e6 to your computer and use it in GitHub Desktop.
在异步代码中向 Runner 中添加任务,其保证按添加顺序执行
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import logging | |
| from asyncio import Queue | |
| from dataclasses import dataclass | |
| from random import randint | |
| from uuid import UUID, uuid4 | |
| from sys import stdout | |
| logging.basicConfig(level=logging.DEBUG, filename="main.log") | |
| def randn(length: int) -> list[int]: | |
| return [randint(0, 100) for _ in range(length)] | |
| async def main(): | |
| asyncio.create_task(RUNNER.run()) | |
| _tasks = [ | |
| asyncio.create_task(caller(f"caller-{i:02}", randn(8))) | |
| for i in range(16) | |
| ] | |
| await asyncio.gather(*_tasks) | |
| async def caller(name, param): | |
| logging.debug(f"{name}: start {param!r}") | |
| ticket = await RUNNER.add(param) | |
| logging.debug(f"{name}: wait for {ticket!r}") | |
| result = await RUNNER.get_result(ticket) | |
| logging.debug(f"{name}: end sum({param!r}) = {result!r}") | |
| print(f"{name}: end sum({param!r}) = {result!r}") | |
| stdout.flush() | |
| @dataclass | |
| class MyTask: | |
| id: UUID | |
| param: list[int] | |
| result: int | None | |
| class Runner: | |
| """包含一个等待队列,按照添加顺序执行任务 | |
| """ | |
| def __init__(self) -> None: | |
| self.wait_list = Queue() | |
| self.done_list = dict() | |
| async def add(self, param) -> UUID: | |
| """添加一个任务,返回该任务的小票""" | |
| ticket = uuid4() | |
| task = MyTask(id=ticket, param=param, result=None) | |
| await self.wait_list.put(task) | |
| return ticket | |
| async def work(self, param) -> int: | |
| """执行一个任务,返回其结果""" | |
| await asyncio.sleep(3) | |
| return sum(param) | |
| async def run(self): | |
| """主循环""" | |
| while True: | |
| task = await self.wait_list.get() | |
| logging.debug(f"RUN: start {task!r}") | |
| result = await self.work(task.param) | |
| logging.debug(f"RUN: end {task!r}") | |
| self.done_list[task.id] = result | |
| self.wait_list.task_done() | |
| async def get_result(self, ticket: UUID): | |
| """通过小票检查某任务是否完成""" | |
| while True: | |
| if ticket not in self.done_list: | |
| await asyncio.sleep(3) | |
| continue | |
| result = self.done_list[ticket] | |
| del self.done_list[ticket] | |
| return result | |
| RUNNER = Runner() | |
| asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment