Created
January 22, 2026 15:37
-
-
Save mypy-play/0b409a081597752464043f4f845e1065 to your computer and use it in GitHub Desktop.
Shared via mypy Playground
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from typing import Protocol, Callable | |
| # --- workflow contracts (the only supported implementation style) --- | |
| class SayHelloWorkflow(Protocol): | |
| def run(self, name: str) -> str: ... | |
| # A benefit to this class-based approach to defining workflows over the Go | |
| # SDK is that we can also raise type errors for missing query or signal | |
| # handlers. In Go we define signals in protobuf but have no way to assert | |
| # that the workflow code has been updated to handle them. | |
| def handle_greeting_response_signal(self, response: str) -> str: ... | |
| class SayGoodbyeWorkflow(Protocol): | |
| def run(self, name: str) -> str: ... | |
| # --- service stub contract: MUST return workflow instances --- | |
| class HelloWorldStub(Protocol): | |
| def say_hello(self) -> SayHelloWorkflow: ... | |
| def say_goodbye(self) -> SayGoodbyeWorkflow: ... | |
| def register_hello_world_service(impl: HelloWorldStub) -> None: | |
| # Normally this would register the workflows and activities with temporal; | |
| # for now we'll just invoke them to demonstrate that invocation works at | |
| # runtime. | |
| print("[worker] say_hello:", impl.say_hello().run("Larry")) | |
| print("[worker] say_goodbye:", impl.say_goodbye().run("Larry")) | |
| # --- user code: implement the workflows as classes --- | |
| class MySayHello: | |
| # Hopefully this should raise a nice type error in mypy! | |
| def run(self, name: bool) -> str: | |
| return f"Hello, {name}!" | |
| class MySayGoodbye: | |
| def run(self, name: str) -> str: | |
| return f"Goodbye, {name}!" | |
| class MyHelloWorldService: | |
| def say_hello(self) -> SayHelloWorkflow: | |
| return MySayHello() | |
| def say_goodbye(self) -> SayGoodbyeWorkflow: | |
| return MySayGoodbye() | |
| if __name__ == "__main__": | |
| register_hello_world_service(MyHelloWorldService()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment