flowmium

This is the documentation for Python client and flow definition framework for the Flowmium Kubernetes Workflow orchestrator. See repository documentation for deployment details.

Installation

python3 -m pip install flowmium

Defining flows

A flow is made of multiple tasks that are dependent on each other. Each task is a Python function and runs in its own Kubernetes pod. Two tasks are dependent on each other if one tasks uses the output of another task. A function can be turned into a task and the dependencies between tasks can also be marked using the @flow.task decorator.

class flowmium.Flow(name: str)

Defines a flow and provides decorator to register python functions as a task in that flow.

Parameters:

name – Name of the flow, this name will be used when you list flows using the flowctl CLI tool.

task(inputs: dict[str, ~typing.Callable] = {}, serializer: ~flowmium.serializers.Serializer = Serializer(dump_func=<built-in function dump>, load_func=<built-in function load>, ext='pkl', write_mode='wb', read_mode='rb')) Callable

Decorator to register a function as a task in the flow by marking them as @flow.task

Parameters:
  • inputs – Dictionary the holds dependent tasks of this function. The keys of the dictionary should the name of one of the function’s argument and the value is another function that has been marked with this decorator as @flow.task. The output of the other function will be passed as an argument to this function.

  • serializer – Each task (function marked as @flow.task) runs in its own pod. To pass the return output of one task to another, the orchestrator serializes the output and uploads it to s3 like storage (like MinIO). This argument accepts the serializer to be used for the output of the task. The default serializer is pickle. There are other serializers available and you can also define your own. See Serializers section for more details.

run(secrets_refs: dict[str, str] = {}) None

Runs the flow. Your module or package should call this function when executed using python3 or python3 -m to be able to run as a task from the orchestrator.

Parameters:

secrets_refs – Dictionary that maps secrets held by the orchestrator to environment variables for the task. The key is the name of the environment variable for the task and the value if the name of the secret registered in the orchestrator.

get_dag_dict(image: str, cmd: list[str], secrets_refs: dict[str, str]) dict[str, Any]

Construct the flow definition that will be submitted to the orchestrator.

Parameters:
  • image – All python flows (packages and modules that call flow.run() when executed) are packaged into a docker image and uploaded to registry that is accessible to the Kubernetes cluster that the executor is running in. This argument is the name of the uploaded image, example localhost:5180/py-flow-test:latest.

  • cmd – This command to execute the module or package which inturn will run flow.run(). For example ['python3' 'test.py'].

  • secrets_refs – Dictionary that maps secrets held by the orchestrator to environment variables for the task. The key is the name of the environment variable for the task and the value if the name of the secret registered in the orchestrator.

Example

An example flow would look like this.

from flowmium import Flow, FlowContext
from flowmium.serializers import plain_text, json_text, pkl


flow = Flow("testing")


@flow.task(serializer=json_text)
def foo() -> str:
    return "Hallo world"


@flow.task({"input_str": foo}, serializer=plain_text)
def replace_letter_a(input_str: str, flowctx: FlowContext) -> str:
    return input_str.replace("a", "e") + str(flowctx.task_id)


@flow.task({"input_str": foo}, serializer=pkl)
def replace_letter_t(input_str: str) -> str:
    return input_str.replace("t", "d")


@flow.task(
    {"first": replace_letter_t, "second": replace_letter_a}, serializer=plain_text
)
def concat(first: str, second: str) -> str:
    return f"{first} {second}"


if __name__ == "__main__":
    flow.run()

To run this, first you would package it as a docker image and upload it to a registry that is accessible from the Kubernetes cluster that the orchestrator is running on. Then you can run something like below to submit and run the flow

python3 flow.py --image registry:5000/py-flow-test:latest --cmd python3 flow.py --flowmium-server http://localhost:8080

See the examples folder for more details.

Serializers

Each task (function marked as @flow.task) runs in its own pod. To pass the return output of one task to another, the orchestrator serializes the output and uploads it to s3 like storage (like MinIO). The default serializer is pickle. There are other serializers available and you can also define your own.

Included serializers

Contains built in serializers

flowmium.serializers.pkl

Uses python pickle to serialize data.

flowmium.serializers.plain_text

Uses the str() function in python to serialize to plain text.

flowmium.serializers.json_text

Uses the json module to serialize data to json string.

Defining a custom serializer

To define a custom serializer, you instantiate the Serializer class and pass it as an argument to the @flow.task decorator.

class flowmium.serializers.Serializer(dump_func: Callable[[Any, IO[Any]], Any], load_func: Callable[[IO[Any]], Any], ext: str, write_mode: str, read_mode: str)

Define a custom serializer

Parameters:
  • dump_func – A functions that takes in any object and a file pointer and writes the serialized form of the object to the file pointer.

  • load_func – A function that takes in a file pointer and returns a python object or type by deserializing the contents of the file pointer.

  • ext – Name of the file extension used by the serialization format, example json

  • write_mode – The mode of the file pointer for dump_func. For example 'wb' or 'w'.

  • read_mode – The mode of the file pointer for load_func. For example 'rb' or 'r'.