flowmium
This is the documentation for Python client and flow definition framework for the Flowmium Kubernetes Workflow orchestrator. See repository documentation for deployment details.
Installation
python3 -m pip install flowmium
Defining flows
A flow is made of multiple tasks that are dependent on each other. Each task is a Python function and runs in its own
Kubernetes pod. Two tasks are dependent on each other if one tasks uses the output of another task. A function can be turned into
a task and the dependencies between tasks can also be marked using the @flow.task
decorator.
- class flowmium.Flow(name: str)
Defines a flow and provides decorator to register python functions as a task in that flow.
- Parameters:
name – Name of the flow, this name will be used when you list flows using the
flowctl
CLI tool.
- task(inputs: dict[str, ~typing.Callable] = {}, serializer: ~flowmium.serializers.Serializer = Serializer(dump_func=<built-in function dump>, load_func=<built-in function load>, ext='pkl', write_mode='wb', read_mode='rb')) Callable
Decorator to register a function as a task in the flow by marking them as
@flow.task
- Parameters:
inputs – Dictionary the holds dependent tasks of this function. The keys of the dictionary should the name of one of the function’s argument and the value is another function that has been marked with this decorator as
@flow.task
. The output of the other function will be passed as an argument to this function.serializer – Each task (function marked as
@flow.task
) runs in its own pod. To pass the return output of one task to another, the orchestrator serializes the output and uploads it to s3 like storage (like MinIO). This argument accepts the serializer to be used for the output of the task. The default serializer is pickle. There are other serializers available and you can also define your own. See Serializers section for more details.
- run(secrets_refs: dict[str, str] = {}) None
Runs the flow. Your module or package should call this function when executed using
python3
orpython3 -m
to be able to run as a task from the orchestrator.- Parameters:
secrets_refs – Dictionary that maps secrets held by the orchestrator to environment variables for the task. The key is the name of the environment variable for the task and the value if the name of the secret registered in the orchestrator.
- get_dag_dict(image: str, cmd: list[str], secrets_refs: dict[str, str]) dict[str, Any]
Construct the flow definition that will be submitted to the orchestrator.
- Parameters:
image – All python flows (packages and modules that call
flow.run()
when executed) are packaged into a docker image and uploaded to registry that is accessible to the Kubernetes cluster that the executor is running in. This argument is the name of the uploaded image, examplelocalhost:5180/py-flow-test:latest
.cmd – This command to execute the module or package which inturn will run
flow.run()
. For example['python3' 'test.py']
.secrets_refs – Dictionary that maps secrets held by the orchestrator to environment variables for the task. The key is the name of the environment variable for the task and the value if the name of the secret registered in the orchestrator.
Example
An example flow would look like this.
from flowmium import Flow, FlowContext
from flowmium.serializers import plain_text, json_text, pkl
flow = Flow("testing")
@flow.task(serializer=json_text)
def foo() -> str:
return "Hallo world"
@flow.task({"input_str": foo}, serializer=plain_text)
def replace_letter_a(input_str: str, flowctx: FlowContext) -> str:
return input_str.replace("a", "e") + str(flowctx.task_id)
@flow.task({"input_str": foo}, serializer=pkl)
def replace_letter_t(input_str: str) -> str:
return input_str.replace("t", "d")
@flow.task(
{"first": replace_letter_t, "second": replace_letter_a}, serializer=plain_text
)
def concat(first: str, second: str) -> str:
return f"{first} {second}"
if __name__ == "__main__":
flow.run()
To run this, first you would package it as a docker image and upload it to a registry that is accessible from the Kubernetes cluster that the orchestrator is running on. Then you can run something like below to submit and run the flow
python3 flow.py --image registry:5000/py-flow-test:latest --cmd python3 flow.py --flowmium-server http://localhost:8080
See the examples folder for more details.
Serializers
Each task (function marked as @flow.task
) runs in its own pod. To pass the return output of one task to another,
the orchestrator serializes the output and uploads it to s3 like storage (like MinIO).
The default serializer is pickle. There are other serializers available and you can also define your own.
Included serializers
Contains built in serializers
- flowmium.serializers.pkl
Uses python pickle to serialize data.
- flowmium.serializers.plain_text
Uses the
str()
function in python to serialize to plain text.
- flowmium.serializers.json_text
Uses the
json
module to serialize data to json string.
Defining a custom serializer
To define a custom serializer, you instantiate the Serializer
class and pass it as an argument to
the @flow.task
decorator.
- class flowmium.serializers.Serializer(dump_func: Callable[[Any, IO[Any]], Any], load_func: Callable[[IO[Any]], Any], ext: str, write_mode: str, read_mode: str)
Define a custom serializer
- Parameters:
dump_func – A functions that takes in any object and a file pointer and writes the serialized form of the object to the file pointer.
load_func – A function that takes in a file pointer and returns a python object or type by deserializing the contents of the file pointer.
ext – Name of the file extension used by the serialization format, example
json
write_mode – The mode of the file pointer for
dump_func
. For example'wb'
or'w'
.read_mode – The mode of the file pointer for
load_func
. For example'rb'
or'r'
.