Awesome
NOTE: I'm no longer working on this SDK. I will be refocusing my efforts on downstream tools.
Unofficial Python SDK for the Temporal Workflow Engine
Status
This should be considered EXPERIMENTAL at the moment. At the moment, all I can say is that the test cases currently pass. I have not tested this for any real world use cases yet.
Installation
pip install temporal-python-sdk
Sample Code
Sample code for using this library can be found in Workflows in Python Using Temporal.
Hello World
import asyncio
import logging
from datetime import timedelta
from temporal.activity_method import activity_method
from temporal.workerfactory import WorkerFactory
from temporal.workflow import workflow_method, Workflow, WorkflowClient
logging.basicConfig(level=logging.INFO)
TASK_QUEUE = "HelloActivity-python-tq"
NAMESPACE = "default"
# Activities Interface
class GreetingActivities:
@activity_method(task_queue=TASK_QUEUE, schedule_to_close_timeout=timedelta(seconds=1000))
async def compose_greeting(self, greeting: str, name: str) -> str:
raise NotImplementedError
# Activities Implementation
class GreetingActivitiesImpl:
async def compose_greeting(self, greeting: str, name: str):
return greeting + " " + name + "!"
# Workflow Interface
class GreetingWorkflow:
@workflow_method(task_queue=TASK_QUEUE)
async def get_greeting(self, name: str) -> str:
raise NotImplementedError
# Workflow Implementation
class GreetingWorkflowImpl(GreetingWorkflow):
def __init__(self):
self.greeting_activities: GreetingActivities = Workflow.new_activity_stub(GreetingActivities)
pass
async def get_greeting(self, name):
return await self.greeting_activities.compose_greeting("Hello", name)
async def client_main():
client = WorkflowClient.new_client(namespace=NAMESPACE)
factory = WorkerFactory(client, NAMESPACE)
worker = factory.new_worker(TASK_QUEUE)
worker.register_activities_implementation(GreetingActivitiesImpl(), "GreetingActivities")
worker.register_workflow_implementation_type(GreetingWorkflowImpl)
factory.start()
greeting_workflow: GreetingWorkflow = client.new_workflow_stub(GreetingWorkflow)
result = await greeting_workflow.get_greeting("Python")
print(result)
print("Stopping workers.....")
await worker.stop()
print("Workers stopped......")
if __name__ == '__main__':
asyncio.run(client_main())
Roadmap
1.0
- Workflow argument passing and return values
- Activity invocation
- Activity heartbeat and Activity.getHeartbeatDetails()
- doNotCompleteOnReturn
- ActivityCompletionClient
- complete
- complete_exceptionally
- Activity get_namespace(), get_task_token() get_workflow_execution()
- Activity Retry
- Activity Failure Exceptions
- workflow_execution_timeout / workflow_run_timeout / workflow_task_timeout
- Workflow exceptions
- Cron workflows
- Workflow static methods:
- await_till()
- sleep()
- current_time_millis()
- now()
- random_uuid()
- new_random()
- get_workflow_id()
- get_run_id()
- get_version()
- get_logger()
- Activity invocation parameters
- Query method
- Signal methods
- Workflow start parameters - workflow_id etc...
- Workflow client - starting workflows synchronously
- Workflow client - starting workflows asynchronously (WorkflowClient.start)
- Get workflow result after async execution (client.wait_for_close)
- Workflow client - invoking signals
- Workflow client - invoking queries
1.1
- ActivityStub and Workflow.newUntypedActivityStub
- Remove threading, use coroutines for everything all concurrency
- Classes as arguments and return values to/from activity and workflow methods (DataConverter)
- Type hints for DataConverter
- Parallel activity execution (STATUS: there's a working but not finalized API).
1.2
- Timers
- Custom workflow ids through start() and new_workflow_stub()
Other:
- WorkflowStub and WorkflowClient.newUntypedWorkflowStub
- ContinueAsNew
- Sticky workflows
- Child Workflows
- Support for keyword arguments
- Compatibility with Java client
- Compatibility with Golang client
- Upgrade python-betterproto
- sideEffect/mutableSideEffect
- Local activity
- Cancellation Scopes
- Explicit activity ids for activity invocations