Home

Awesome

iwf-python-samples

samples for iWF Python SDK

Running Requirements

Case1: Money transfer workflow/SAGA Patten

This example shows how to transfer money from one account to another account. The transfer involves multiple steps. When any step fails, the whole transfer is canceled with some compensation steps.

Case 2: User sign-up workflow

A common use case that is almost everywhere -- new user sign-up/register a new account in a website/system. E.g. Amazon/Linkedin/Google/etc...

Use case requirements

<img width="303" alt="user case requirements" src="https://github.com/indeedeng/iwf-python-sdk/assets/4523955/356a4284-b816-42d3-9e44-b371a91834e4">

Some old solution

With some other existing technologies, you solve it using message queue(like SQS which has timer) + Database like below:

<img width="309" alt="old solution" src="https://github.com/indeedeng/iwf-python-sdk/assets/4523955/49ef8846-9589-4a28-91bd-c575daf37dcf">

It's complicated and hard to maintain and extend.

New solution with iWF

The solution with iWF: <img width="752" alt="iwf solution" src="https://github.com/indeedeng/iwf-python-sdk/assets/4523955/4cec7742-a965-4a2d-868b-693ffba372fa">

It's so simple & easy to do that the business logic code like this can be shown here!

class SubmitState(WorkflowState[Form]):
    def execute(self, ctx: WorkflowContext, input: Form, command_results: CommandResults, persistence: Persistence,
                communication: Communication,
                ) -> StateDecision:
        persistence.set_data_attribute(data_attribute_form, input)
        persistence.set_data_attribute(data_attribute_status, "waiting")
        print(f"API to send verification email to {input.email}")
        return StateDecision.single_next_state(VerifyState)


class VerifyState(WorkflowState[None]):
    def wait_until(self, ctx: WorkflowContext, input: T, persistence: Persistence, communication: Communication,
                   ) -> CommandRequest:
        return CommandRequest.for_any_command_completed(
            TimerCommand.timer_command_by_duration(
                timedelta(seconds=10)
            ),  # use 10 seconds for demo
            InternalChannelCommand.by_name(verify_channel),
        )

    def execute(self, ctx: WorkflowContext, input: T, command_results: CommandResults, persistence: Persistence,
                communication: Communication,
                ) -> StateDecision:
        form = persistence.get_data_attribute(data_attribute_form)
        if (
                command_results.internal_channel_commands[0].status
                == ChannelRequestStatus.RECEIVED
        ):
            print(f"API to send welcome email to {form.email}")
            return StateDecision.graceful_complete_workflow("done")
        else:
            print(f"API to send the a reminder email to {form.email}")
            return StateDecision.single_next_state(VerifyState)


class UserSignupWorkflow(ObjectWorkflow):
    def get_workflow_states(self) -> StateSchema:
        return StateSchema.with_starting_state(SubmitState(), VerifyState())

    def get_persistence_schema(self) -> PersistenceSchema:
        return PersistenceSchema.create(
            PersistenceField.data_attribute_def(data_attribute_form, Form),
            PersistenceField.data_attribute_def(data_attribute_status, str),
            PersistenceField.data_attribute_def(data_attribute_verified_source, str),
        )

    def get_communication_schema(self) -> CommunicationSchema:
        return CommunicationSchema.create(
            CommunicationMethod.internal_channel_def(verify_channel, None)
        )

    @rpc()
    def verify(
            self, source: str, persistence: Persistence, communication: Communication
    ) -> str:
        status = persistence.get_data_attribute(data_attribute_status)
        if status == "verified":
            return "already verified"
        persistence.set_data_attribute(data_attribute_status, "verified")
        persistence.set_data_attribute(data_attribute_verified_source, source)
        communication.publish_to_internal_channel(verify_channel)
        return "done"

And the application code will be simply interacting with the workflow like below:

@flask_app.route("/signup/submit")
def signup_submit():
    username = request.args["username"]
    form = Form(
        ...
    )
    client.start_workflow(UserSignupWorkflow, username, 3600, form)
    return "workflow started"


@flask_app.route("/signup/verify")
def signup_verify():
    username = request.args["username"]
    source = request.args["source"]
    return client.invoke_rpc(username, UserSignupWorkflow.verify, source)

development tips

When update iwf-python-sdk if the dependency is not updated:

poetry cache clear pypi --all && poetry update