Pipelines

The main idea behind functional programming is functional composition.

We provide several tools to make composition easy, readable, pythonic, and useful.

Let’s start with the first one.

flow

flow allows to easily compose multiple functions together. It is useful when you already have an instance to compose functions with.

Let’s see an example.

>>> from returns.pipeline import flow
>>> assert flow(
...     [1, 2, 3],
...     lambda collection: max(collection),
...     lambda max_number: -max_number,
... ) == -3

Use it when you need to compose a lot of functions together.

And now let’s get to know pipe, it is very similar, but has different usage pattern.

pipe

pipe is an easy way to compose functions together. It is useful when you don’t have an instance to compose functions with yet.

Let’s see an example.

>>> from returns.pipeline import pipe

>>> pipeline = pipe(str, lambda x: x + 'b', str.upper)
>>> assert pipeline(1) == '1B'

It might be later used with multiple values:

>>> assert pipeline(2) == '2B'

It is also might be useful to compose containers together:

>>> from returns.pipeline import pipe
>>> from returns.result import Result, Success, Failure
>>> from returns.pointfree import bind

>>> def regular_function(arg: int) -> float:
...     return float(arg)
...

>>> def returns_container(arg: float) -> Result[str, ValueError]:
...     if arg != 0:
...          return Success(str(arg))
...     return Failure(ValueError())
...

>>> def also_returns_container(arg: str) -> Result[str, ValueError]:
...     return Success(arg + '!')
...

>>> transaction = pipe(
...     regular_function,  # composes easily
...     returns_container,  # also composes easily, but returns a container
...     # So we need to `bind` the next function to allow it to consume
...     # the container from the previous step.
...     bind(also_returns_container),
... )
>>> result = transaction(1)  # running the pipeline
>>> assert result == Success('1.0!')

You might consider pipe() as returns.functions.compose() on steroids. The main difference is that compose takes strictly two arguments (or you might say that it has an arity of two), while pipe has infinite possible arguments.

Limitations

But, composition with pipe is limited to two things:

  1. It only allows to pipe up to 7 functions. If you need more - send a PR with the type annotations. Python cannot figure things out by itself.

  2. It is flexible. Sometimes you might need more power. Use @pipeline in this case!

In contrast flow does not have these problems.

pipeline

What is a @pipeline? It is a more user-friendly syntax to work with containers that support both async and regular functions.

@pipeline decorator allows you to .unwrap container values from containers and work with them as with regular values (which they are in this context).

It is something like do-notation if you wish.

Works with both Maybe and Result container.

Consider this task. We were asked to create a method that will connect together a simple pipeline of three steps:

  1. We validate passed username and email

  2. We create a new Account with this data, if it does not exists

  3. We create a new User associated with the Account

And we know that this pipeline can fail in several places:

  1. Wrong username or email might be passed, so the validation will fail

  2. Account with this username or email might already exist

  3. User creation might fail as well, since it also makes an HTTP request to another micro-service deep inside

Here’s the code to illustrate the task.

from returns.result import Result, ResultE, Success, Failure, safe
from returns.pipeline import pipeline

def create_account_and_user(
    username: str,
    email: str,
) -> Result['User', str]:
    # TODO: we need to create a pipeline of these functions somehow...

# Protected functions:

def _validate_user(
    username: str, email: str,
) -> ResultE['UserSchema']:
    """Returns an UserSchema for valid input, otherwise a Failure."""
    if username and '@' in email:
        return Success({'username': username, 'email': email})
    return Failure(ValueError('Not valid!'))

def _create_account(
    user_schema: 'UserSchema',
) -> ResultE['Account']:
    """Creates an Account for valid UserSchema's. Or returns a Failure."""
    return safe(Accounts.save)(user_schema)

def _create_user(
    account: 'Account',
) -> ResultE['User']:
    """Create an User instance. If user already exists returns Failure."""
    return safe(User.objects.create)(
        username=account.username,
        account=account,
    )

Using bind technique

We can implement this feature using a traditional bind method.

def create_account_and_user(
    username: str,
    email: str,
) -> Result['User', Exception]:
    """Can return a Success(user) or Failure(exception)."""
    return _validate_user(username, email).bind(
        _create_account,
    ).bind(
        _create_user,
    )

# Protected functions:
# ...

And this will work without any problems. But, is it easy to read a code like this? No, it is not.

What alternative we can provide? pipe and @pipeline! Read more about them if you want to compose your containers easily.

Using @pipeline

@pipeline is a very powerful tool to compose things. Let’s see an example.

@pipeline(Result)
def create_account_and_user(
    username: str,
    email: str,
) -> Result['User', Exception]:
    """Can return a Success(user) or Failure(exception)."""
    user_schema = _validate_user(username, email).unwrap()
    account = _create_account(user_schema).unwrap()
    return _create_user(account)

# Protected functions:
# ...

Let’s see how this new .unwrap() method works:

  • if you result is Success it will return its inner value

  • if your result is Failure it will raise a UnwrapFailedError

And that’s where @pipeline decorator becomes in handy. It will catch any UnwrapFailedError during the pipeline and then return a simple Failure result.

sequenceDiagram participant pipeline participant validation participant account creation participant user creation pipeline->>validation: runs the first step validation-->>pipeline: returns Failure(validation message) if fails validation->>account creation: passes Success(UserSchema) if valid account creation-->>pipeline: return Failure(account exists) if fails account creation->>user creation: passes Success(Account) if valid user creation-->>pipeline: returns Failure(http status) if fails user creation-->>pipeline: returns Success(user) if user is created

Pipeline execution.

See, do notation allows you to write simple yet powerful pipelines with multiple and complex steps. And at the same time the produced code is simple and readable.

Limitations

There’s currently a typing-related issue with Result: you can unwrap wrong failure instance. And the returning value will be different.

from returns.result import Result
from returns.pipeline import pipeline

@pipeline(Result)
def example() -> Result[int, str]:
    other: Result[int, Exception]
    new_value = other.unwrap() + 1  # hidden boom!
    return Success(new_value)

Since mypy cannot know the context of .unwrap() method - it cannot really tell is it allowed to unwrap a value or not.

In this case other might fail and Result[int, Exception] might be returned.

What to do to minimize the effect?

  1. Always stick to the same error type in your @pipeline results

  2. Unit test things

  3. Write a custom mypy plugin to check that and submit a PR :)

is_successful

is_successful is used to tell whether or not your result is a success. We treat only treat types that does not throw as a successful ones, basically: Success.

>>> from returns.result import Success, Failure
>>> from returns.pipeline import is_successful

>>> is_successful(Success(1))
True

>>> is_successful(Failure('text'))
False

API Reference

flow(instance, *functions)

Allows to compose a value and up to multiple functions that use this value.

All starts with the value itself. Each next function uses the previous result as an input parameter.

This function is closely related to pipe and solves several typing related issues.

Here’s how it should be used:

>>> from returns.pipeline import flow

# => executes: str(float(int('1')))
>>> assert flow('1', int, float, str) == '1.0'
pipe(*functions)

Allows to compose a value and up to 7 functions that use this value.

Each next function uses the previous result as an input parameter. Here’s how it should be used:

>>> from returns.pipeline import pipe

# => executes: str(float(int('1')))
>>> assert pipe(int, float, str)('1') == '1.0'

A friendly hint: do not start pipe definition with lambda function. mypy will complain: error: Cannot infer type argument 1 of "_pipe". The same might happen with regular generics. It might be a good idea to start with a function with concrete types.

To fix it there are two options:

  1. Use regular annotated functions

  2. Type the variable itself: user: Callable[[int], float] = pipe(...)

  3. Use flow function

pipeline(container_type)

Decorator to enable do-notation context.

Should be used for series of computations that rely on .unwrap method. Supports both async and regular functions.

Works with both Maybe and Result containers.

Example: .. code:: python

>>> from typing import Optional
>>> from returns.pipeline import pipeline
>>> from returns.maybe import Maybe
>>> @pipeline(Maybe)
... def test(one: Optional[int], two: Optional[int]) -> Maybe[int]:
...      first = Maybe.new(one).unwrap()
...      second = Maybe.new(two).unwrap()
...      return Maybe.new(first + second)
...
>>> str(test(1, 2))
'<Some: 3>'
>>> str(test(2, None))
'<Nothing>'

Make sure to supply the correct container type when creating a pipeline.

is_successful(container)[source]

Determins if a container was successful or not.

We treat container that raise UnwrapFailedError on .unwrap() not successful.

>>> from returns.maybe import Some, Nothing
>>> from returns.result import Failure, Success
>>> is_successful(Some(1))
True
>>> is_successful(Nothing)
False
>>> is_successful(Success(1))
True
>>> is_successful(Failure(1))
False

This function can work with containers that support returns.primitives.interfaces.Unwrapable protocol. But only non-lazy containers are supported.

Return type

bool