# Type-Safe Pipelines and Compose

When dealing with multiple transformations that run sequentially, one could leverage the compose function to create pipelines. This article shows different way to implement them in a type-safe way. All the code was written using pure Python 3.11 and MyPy 1.5.1 and PyLance v2023.9.20

## Function Composition

There are two notations for function composition one with a right-to-left direction and one with a left-to-right direction. Namely:

Right to left:

$$f∘g = f(g(x))$$

Left to right:

$$f∘g = g(f(x))$$

The traditional notation in math is the right-to-left one. Some languages like Haskell has built-in syntax for both. In Haskell the usual way to do composition is with . which is right-to-left, it is also possible to use >>> for left-to-right composition.

On the other hand, one of the most common ways to compose functions in programming is through Unix Pipes which use the | operator, which conversely is left-to-right composition.

That means that when implementing a compose or a pipe function is not trivial which orientation to choose as they are competing notations. In the context of programming the left to right orientation is more common.

All the examples here will use the left-to-right orientation.

## The compose function

The compose function is a function that takes two functions f and g and returns a new function that performs g and then f. In Python, a generic implementation looks like this:

def compose(f, g):
def inner(*args, **kwargs):
return g(f(*args, **kwargs))
return inner


For this to work, it is necessary that the output of f matches the input of g, or otherwise the execution may fail. To enforce this relationship a typed version of the composed can be written as follows:

from typing import Callable, ParamSpec, TypeVar

T = TypeVar("T")
U = TypeVar("U")
P = ParamSpec("P")

def compose(f: Callable[P, T], g: Callable[[T], U]) -> Callable[P, U]:
def inner(*args: P.args, **kwargs: P.kwargs) -> U:
return g(f(*args, **kwargs))
return inner


This implementation uses Python 3.10's ParamSpec, which allows typing *args and **kwargs.

Having a typed version is important as one typical case is composing multiple functions and if the types are not enforced, unexpected behavior could arise.

One example of using this compose implementation is the following:

def add_one(x: int) -> int:
return x + 1

def convert_to_string(x: int) -> str:
return str(x)

composed_functions = compose(add_one, convert_to_string)


When using the typed version, both MyPy and Pyright can infer the resulting type:

Pyright (through Pylance)

MyPy

One important aspect of function composition is that it is not commutative, that is

$$f∘g \neq g∘f$$

. There may be cases where both are equivalent but most of the time it is not.

This is helpful when programming as the IDE can automatically detect and flag types' incompatibility. Take for example trying to add_one to the result of convert_to_string.

composed_incorrect = compose(convert_to_string, add_one)


Pyright (through Pylance)

MyPy

The error from MyPy may not be descriptive enough but it at least points in the right direction. Pyright can identify the exact cause of the problem: add_one takes an int but it receives an str as the output of convert_to_string.

## Extension to multiple functions

With that generic compose function, it is possible to compose any number of functions, however, the syntax is a little convoluted. These are the three sample functions to compose, one is int to int, another one is int to str and the last one is str to str, it is also interesting to use built-in functions such as len and str instead of user-defined ones.

def add_one(x: int) -> int:
return x + 1

def convert_to_string(x: int) -> str:
return str(x)

def sort_characters(x: str) -> str:
return "".join(sorted(x))


The composition of interest is the one that would be equivalent to doing:

def user_defined_composition(x: int) -> int:
return str(len(sort_characters(convert_to_string(add_one(x)))))


It is valid Python to create a function like that, but, it is difficult to read and to change since the "application order" as well as the final "type mapping" is hardcoded. Any variation would require a different signature and invocation order.

To leverage the generic compose, one alternative would be to write:

user_defined_composition = compose(
compose(compose(compose(add_one, convert_to_string), sort_characters), len), str
)
assert user_defined_composition(2) == "1"


Or equivalently:

user_defined_composition = (
compose(
compose(
compose(
compose(add_one, convert_to_string),
sort_characters
),
len
),
str
)
)
assert user_defined_composition(2) == "1"


Note: both approaches are valid and threw no errors when scanned with MyPy, nevertheless, Pyright is not able to fully detect the types.

Pyright:

MyPy:

Neither of the previous is any easier to read than the original one. Some languages do have a compact syntax for this use case. Haskell's . is an example of clear and concise syntax (Haskell has right to left application):

user_defined_composition = str . len . sort_characters . convert_to_string . add_one


This by no means implies that Python is not a good fit for composing functions, it just lacks built-in syntax to make it more approachable. Even though the previous example would work without issues, in the following sections easier and more practical alternatives will be introduced.

## Naive Iterative Pipe

The simplest evolution of compose to multiple functions in a cleaner way would be to wrap it in a for-loop:

from typing import Any, Callable

def pipe_iterative(*functions: Callable[..., Any]) -> Callable[..., Any]:
composed_function, *rest = functions
for function in rest:
composed_function = compose(composed_function, function)
return composed_function

user_defined_composition = pipe_iterative(
add_one,
convert_to_string,
sort_characters,
len,
str
)
assert user_defined_composition(2) == "1"


This approach, albeit it will work, loses all the type information.

Pyright:

MyPy:

At the moment, it is not possible to type *functions so that Python infers that each of the functions will have an output type that will match the next functions's input type.

It is possible though to type the pipe_iterative function if we assume that all input and output types will be the same:

from typing import Callable, TypeVar

T = TypeVar("T")

def pipe_iterative(*functions: Callable[[T], T]) -> Callable[[T], T]:
...


This signature is only applicable to a small subset of the possible function compositions and will throw several errors with the functions used in the previous examples.

Pyright:

In this case, the types are compatible with the pipe_iterative signature but are not what the function will expect.

MyPy

MyPy detects the conflicting types and assumes Any will be the input and output types.

Disagreeing two typing tools like this may be misleading and can cause issues down the line. Pylance's error message is much more detailed and helpful in this instance.

If this type of composition is required though, there is a cleaner way of using the functools module.

## Reduced Pipe

When a function takes an input type T, returns the same type, and has one identity element, it is said to be a Monoid. It is a common word in the Functional Programming jargon. The last signature of pipe_iterative follows this pattern since it always keeps its outputs and inputs within the type T.

Note: The definition of a Monoid includes the identity element, since pipes operate over functions, the identity element is a function that can be composed without altering the result. This is trivially doable in Python:

from typing import TypeVar

T = TypeVar("T")

def identity(x: T) -> T:
return x


There is a function that achieves the same and is built-in, the functools.reduce. The pipe function can then be rewritten as follows:

from functools import reduce
from typing import Any, Callable

def pipe(*functions: Callable[..., Any]) -> Callable[..., Any]:
return reduce(compose, functions)

user_defined_composition = pipe(
add_one,
convert_to_string,
sort_characters,
len,
str
)
assert user_defined_composition(2) == "1"


The same considerations could be made about the signature, this implementation can either be type-safe and only operate on a single type T or it could be used more generically as the previous pipe_iterative while losing type information.

Since the signature is the same, MyPy and Pyright show the same messages.

There is a way to circumvent this issue and that is through the use of Generics. Where types become an additional parameter. In Python, Generics can only be used in classes, therefore the next sections will migrate to an object-oriented programming style.

## Why Types matter

If the previous is a working implementation, why bother with adding types? There is a rising number of articles and online material arguing in favor of adding types. A summary could be the following.

Reasons to add types:

• Typed code produces fewer bugs
• Typed code enables enforcement of input and output compatibility
• Typed code can be checked statically, i.e. without running the code
• Typed code is easier to test comprehensively
• Typed code is easier to validate. See Pydantic for example
• Typed code is less ambiguous

There are also some reasons not to add types, namely:

• Types make the code more verbose
• Types make the learning curve for a codebase steeper
• Types are not supported in all versions. All stable versions support types (3.8+).
• Some Type constructs, like ParamSpec are not available in old versions. Use typing_extensions if something is not supported

And it is also important to clarify some myths

• Types will not make the code slower, they are ignored at runtime
• Types are not checked at runtime, they are just used by tools like IDEs and Linters

## Eager Monoidal Pipe

Before diving too deep into different alternatives, the first step would be to replicate what the previous pipe function achieved but now supporting different types.

from __future__ import annotations

from dataclasses import dataclass
from typing import Callable, Generic, TypeVar

T = TypeVar("T")
U = TypeVar("U")

@dataclass
class PipeEagerSameType(Generic[T]):
x: T

def then(self, f: Callable[[T], T]) -> PipeEagerSameType[T]:
self.x = f(self.x)
return self

def __call__(self) -> T:
return self.x

user_defined_composition = (
PipeEagerSameType(2)
.then(add_one)
.then(convert_to_string)
.then(sort_characters)
.then(len)
.then(str)
)
assert user_defined_composition() == "1"
assert user_defined_composition.x == "1"


The implementation is straightforward, the class stores the initial value x, and then when then is called, that value is transformed and saved. This method returns self to provide a Fluent Interface. that makes the code easier to read.

The result at the end could be obtained by either calling the instance and thus executing __call__ or by simply accessing x. The former approach makes the instance behave like a function and the latter like a plain object.

This first implementation using object-oriented programming produces similar yet different messages from both Pyright and MyPy:

Pyright:

The errors are analogous to the implementation using reduce.

MyPy:

This time the errors are much more descriptive and resemble those produced by Pyright.

Now, whenever we inspect the type, both pyright and Mypy will output the class name and the type will be encoded in the Generic, i.e. the type now is PipeEagerSameType[int] and it is the [int] part which tells the actual type. In this case, it is the final return type which is inaccurate because the assumption that all functions return the same type and the first one returns int.

The next step would be to try to keep the same easy-to-read syntax while at the same time being able to pass functions with different signatures.

The Eager in the class name is no coincidence, the previous examples using compose were not eagerly evaluated, they had a lazy evaluation strategy. This implementation though transforms the value at each step. In the case of lazy evaluation, all the transformations are defined first and evaluated only at the end. Lazy implementations for this Pipe class will be introduced later on.

## Eager Multitype Pipe

Due to the way Generics work, each instance can only hold a reference to a single type T. To support multiple types there are two possible solutions, either use as many TypeVars as types needed or use as many instances as types needed. The latter is more straightforward. The implementation looks like the following:

from __future__ import annotations

from dataclasses import dataclass
from typing import Callable, Generic, TypeVar

T = TypeVar("T")
U = TypeVar("U")

@dataclass
class PipeEager(Generic[T]):
x: T

def then(self, f: Callable[[T], U]) -> PipeEager[U]:
return PipeEager(f(self.x))

def __call__(self) -> T:
return self.x

user_defined_composition = (
PipeEager(2)
.then(add_one)
.then(convert_to_string)
.then(sort_characters)
.then(len)
.then(str)
)

assert user_defined_composition() == "1"
assert user_defined_composition.x == "1"


This time, instead of returning self, a new instance is created, with the updated value. Each instance now has information on the current step (through T), and the following step (through U). The following step is not defined at instantiation and therefore it is not needed to add U to the Generic.

From the outside, the API is preserved, with the additional support for functions with different types.

Both Pyright and MyPy reflect types are properly inferred:

Pyright:

MyPy:

Now, with a compatible implementation, the type has become PipeEager[str], correctly inferring that calling the instance will return a str object.

The next step is to make a Lazy implementation resemble the first ones built with compose.

## Lazy Pipe with Start

To make a lazy implementation, the value should not be stored and transformed, it will be provided last. Therefore what needs to be stored and mapped are the transformations.

To achieve this, a similar approach to the Eager classes can be used, but replacing the value x with a function f:

from __future__ import annotations

from dataclasses import dataclass
from typing import Callable, Generic, ParamSpec, TypeVar

T = TypeVar("T")
U = TypeVar("U")
P = ParamSpec("P")

@dataclass
class PipeLazyWithStart(Generic[P, T]):
f: Callable[P, T]

def then(self, g: Callable[[T], U]) -> PipeLazyWithStart[P, U]:
return PipeLazyWithStart(compose(self.f, g))

def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T:
return self.f(*args, **kwargs)

user_defined_composition = (
PipeLazyWithStart(add_one)
.then(convert_to_string)
.then(sort_characters)
.then(len)
.then(str)
)

assert user_defined_composition(2) == "1"


In this case, rather than applying a transformation to the value and storing the transformed result, an instance is created with a new transformation which is the current and the next one composed. This allows the creation of new instances each composing on the previous ones.

The API is kept somewhat similar, it now resembles the approach with pipe_iterative, where all the functions are passed first and then evaluated on a particular value.

Pyright:

MyPy:

With the lazy implementation, the types have changed slightly, now the type is PipeLazyWithStart[(x: int), str], which means that it takes a single parameter called x of type int and returns a single value of type str. The x name comes from the instance variable with the same name in the class.

In case the advantages of lazy evaluation are unclear, one of the following sections will explain the pros and cons succinctly.

The next step would be to simplify the class so that it can be instantiated without an initial function, this could be helpful for many reasons, one example is testing.

## Lazy Pipe

To remove the initial function, the first impression would be to mark f as Optional, however, has lots of caveats, and MyPy and Pyright will complain. A simple alternative is to create a helper class that surrogates the initial creation of the LazyPipeline.

from dataclasses import dataclass
from typing import Callable, Generic, ParamSpec, TypeVar

T = TypeVar("T")
P = ParamSpec("P")

@dataclass
class PipeLazy(Generic[P, T]):
def then(self, f: Callable[P, T]) -> PipeLazyWithStart[P, T]:
return PipeLazyWithStart(f)

user_defined_composition = (
PipeLazy[[int], int]()
.then(add_one)
.then(convert_to_string)
.then(sort_characters)
.then(len)
.then(str)
)
assert user_defined_composition(2) == "1"


This modification makes the implementation easily generalizable, being able to pass yet uninstantiated pipelines down the line. It also forces the user to specify the first input and output types of a given pipeline rather than infering it which could help prevent bugs as things are more explicit now.

Pyright:

MyPy:

Now the type is identical to the previous one but without specifying the x, that is because, at the moment of instantiation, the PipeLazy object takes no arguments. And even after being initialized with [[int], int] it accurately updated the return type to str by following the chain of compositions.

So far this implementation checks all the boxes, however, there is yet another way to implement similar behavior leveraging less common Python features. The next section will evolve this implementation to add support for decorators and operator overloading.

## Composable Objects

This section is kept to showcase some of the features of the language, but it may result in code that is harder to read.

Classes as decorators is an intermediate-advanced concept in Python, in fact, PipeLazyWithStart can already be used as a decorator since it only takes f which is a function.

The other feature of the language that can be used is operator overloading, in this case, the >> operator (also known as right shift) which resembles the >>> in Haskell for left-to-right composition. Other valid alternatives could have been the | or the & (bitwise or and bitwise and respectively). Python does not support the definition of custom operators so having a new >>> operator is not feasible.

To overload the >> operator one needs to define a __rshift__ method, which in this case will have the same functionality as the then method. The only difference would be that it would need to handle functions and objects of the same class.

The implementation looks like the following:

from __future__ import annotations

from dataclasses import dataclass
from typing import Callable, Generic, ParamSpec, TypeVar

T = TypeVar("T")
U = TypeVar("U")
P = ParamSpec("P")
Q = ParamSpec("Q")

@dataclass
class Composable(Generic[P, T]):
f: Callable[P, T]

def __rshift__(self, other: Callable[Q, U]) -> Composable[P, U]:
if not isinstance(other, Composable):
other = Composable(other)
return Composable(compose(self.f, other.f))

def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T:
return self.f(*args, **kwargs)

@Composable
def convert_to_string_decorated(x: int) -> str:
return str(x)

user_defined_composition = (
Composable(add_one)
>> convert_to_string_decorated
>> Composable(sort_characters)
>> len
>> str
)
assert user_defined_composition(2) == "1"


The resulting syntax with >> is something new, in some cases, like when building a framework or an SDK, introducing new syntax like this could have great benefit. One project that does this is Apache Airflow. However, most often it results in niche uses that, without proper documentation could make the code harder to read and maintain.

Pyright:

MyPy:

The types are identical to those of PipeLazyWithStart, however, Mypy complains about not being able to infer the type of the first parameter of compose, which is unexpected. This may have been due to the interaction between ParamSpecs and TypeVars in an edge case.

## Why does Lazy Evaluation matter?

Eager vs. Lazy evaluation is a topic that would deserve an article on its own. There are tools like polars and pandas that use eager evaluation and at the same time other tools like pyspark use lazy evaluation.

The following are some key questions to help decide which is better for each case:

• Speed - Either
• Easy to read code - Either
• Easy to test code - Either
• Easy to debug code / interactive programming - Eager
• Optimized transformations - Lazy
• Flexibility - Lazy
• Separation of transformations from data - Lazy

## Conclusion

There are scenarios where certain operations need to be applied sequentially, that is a hint for the pipeline pattern which can be implemented with either an Eager or a Lazy evaluation approach.

Knowing when to use which depends on the scenario but the key aspect is to structure the code so that the final result is easy to read, easy to test and easy to maintain.

On a personal note, I believe that unless required, the LazyPipe would suffice most of the time, and in case it doesn't switching to the Eager version is a minor change that should not introduce any breaks down the line.

If dealing with a lot of error handling, these pipelines could be combined with Railway Oriented Programming to streamline processes end to end.