Pipelines

The main idea behind functional programming is functional composition.

We provide several tools to make composition easy, readable, pythonic, and useful.

Note

Make sure you are familiar with our Pointfree tools, because pipelines and pointfree functions are best friends!

flow

flow allows to easily compose multiple functions together into a pipeline. It is useful when you already have an instance to compose functions with.

Note

flow is the recommended way to write your code with returns!

Let’s see an example:

>>> from returns.pipeline import flow
>>> assert flow(
...     [1, 2, 3],
...     lambda collection: max(collection),
...     lambda max_number: -max_number,
... ) == -3

This allows you to write declarative steps that should be performed on an existing value.

Note

Technical note: flow has the best type inference mechanism among all other tools we provide here. This happens due to our mypy plugins.

You can also use flow with pointfree functions and containers:

>>> from returns.result import Result, Success, Failure
>>> from returns.pointfree import bind
>>> from returns.pipeline import flow

>>> def regular_function(arg: int) -> float:
...     return float(arg)

>>> def returns_container(arg: float) -> Result[str, ValueError]:
...     if arg != 0:
...         return Success(str(arg))
...     return Failure(ValueError('Wrong arg'))

>>> def also_returns_container(arg: str) -> Result[str, ValueError]:
...     return Success(arg + '!')

>>> assert flow(
...     1,  # initial value
...     regular_function,  # composes easily
...     returns_container,  # also composes easily, but returns a container
...     # So we need to `bind` the next function to allow it to consume
...     # the container from the previous step.
...     bind(also_returns_container),
... ) == Success('1.0!')

>>> # And this will fail:
>>> assert flow(
...     0,  # initial value
...     regular_function,  # composes easily
...     returns_container,  # also composes easily, but returns a container
...     # So we need to `bind` the next function to allow it to consume
...     # the container from the previous step.
...     bind(also_returns_container),
... ).failure().args == ('Wrong arg', )

And now let’s get to know pipe, it is very similar, but has different usage pattern.

pipe

pipe is an easy way to compose functions together. It is useful when you don’t have an instance to compose functions with yet.

Note

pipe requires to use our mypy plugins.

Let’s see an example.

>>> from returns.pipeline import pipe

>>> pipeline = pipe(str, lambda x: x + 'b', str.upper)
>>> assert pipeline(1) == '1B'

It might be later used with multiple values:

>>> assert pipeline(2) == '2B'

It is also might be useful to compose containers together:

>>> from returns.pipeline import pipe
>>> from returns.result import Result, Success, Failure
>>> from returns.pointfree import bind

>>> def regular_function(arg: int) -> float:
...     return float(arg)

>>> def returns_container(arg: float) -> Result[str, ValueError]:
...     if arg != 0:
...         return Success(str(arg))
...     return Failure(ValueError('Wrong arg'))

>>> def also_returns_container(arg: str) -> Result[str, ValueError]:
...     return Success(arg + '!')

>>> transaction = pipe(
...     regular_function,  # composes easily
...     returns_container,  # also composes easily, but returns a container
...     # So we need to `bind` the next function to allow it to consume
...     # the container from the previous step.
...     bind(also_returns_container),
... )
>>> result = transaction(1)  # running the pipeline
>>> assert result == Success('1.0!')

You might consider pipe() as returns.functions.compose() on steroids. The main difference is that compose takes strictly two arguments (or you might say that it has an arity of two), while pipe has infinite possible arguments.

managed

A really common task is to work with something stateful, like database connections or files.

First, you need to acquire some resource, then use it and do your thing, and clear things up and release the acquired resource.

There are several rules here:

  1. If the acquiring failed, then do nothing: do not try to use the resource or release it

  2. If the resource is acquired, then try to use it and then release it despite of the usage result

In other words, if you cannot open a file, then do nothing. If you opened it, then try to read it. And then always close it.

Let’s say you have to read a file’s contents:

>>> from typing import TextIO
>>> from returns.pipeline import managed, is_successful
>>> from returns.result import ResultE
>>> from returns.io import IOResultE, impure_safe

>>> def read_file(file_obj: TextIO) -> IOResultE[str]:
...     return impure_safe(file_obj.read)()  # this will be the final result

>>> def close_file(
...     file_obj: TextIO,
...     file_contents: ResultE[str],
... ) -> IOResultE[None]:  # sometimes might require to use `untap`
...     return impure_safe(file_obj.close)()  # this value will be dropped

>>> managed_read = managed(read_file, close_file)

>>> read_result = managed_read(
...     impure_safe(lambda filename: open(filename, 'r'))('pyproject.toml'),
... )
>>> assert is_successful(read_result)  # file content is inside `IOSuccess`

And here’s how we recommend to combine managed with other pipeline functions:

>>> import tomlkit
>>> from returns.pipeline import flow
>>> from returns.pointfree import bind_result
>>> from returns.result import safe
>>> from returns.io import IOSuccess

>>> @safe
... def parse_toml(file_contents: str) -> dict:
...     return tomlkit.parse(file_contents)

>>> @safe
... def get_project_name(parsed: dict) -> str:
...     return parsed['tool']['poetry']['name']

>>> pipeline_result = flow(
...     'pyproject.toml',  # filename we work with
...     impure_safe(lambda filename: open(filename, 'r')),
...     managed_read,
...     bind_result(parse_toml),
...     bind_result(get_project_name),
... )
>>> assert pipeline_result == IOSuccess('returns')

Notice a few tricks here:

  1. We use managed with and without flow here, both are fine!

  2. We have created a managed_read managed function, so we don’t need to specify it every time we want to read a file in a functional way

  3. We are using impure and pure operations inside the pipeline: this helps us to understand how our app works. Which parts do access the file system and which just work

However, you can still use the imperative approach with with: or try/finally wrapped into @impure_safe decorator, your choice! We don’t recommend to mix these two. Stick to one you like the most.

managed can be used with:

  • IOResult

  • FutureResult

  • RequiresContextIOResult

  • RequiresContextFutureResult

is_successful

is_successful is used to tell whether or not your result is a success. We treat only three types that do not throw as successful ones, basically: Success, IOSuccess, and Some

>>> from returns.result import Success, Failure
>>> from returns.pipeline import is_successful

>>> assert is_successful(Success(1)) is True
>>> assert is_successful(Failure('text')) is False

Further reading

API Reference

flow(instance, *functions)[source]

Allows to compose a value and up to multiple functions that use this value.

All starts with the value itself. Each next function uses the previous result as an input parameter.

We use a custom mypy plugin to make sure types are correct. Otherwise, it is currently impossible to properly type this function.

Currently, flow has a hard limit of 21 steps. Because, it is not possible to type it otherwise. We need a hard limit. See: https://github.com/dry-python/returns/issues/461

Here’s how it should be used:

>>> from returns.pipeline import flow

>>> # => executes: str(float(int('1')))
>>> assert flow('1', int, float, str) == '1.0'

This function is closely related to pipe:

>>> from returns.pipeline import pipe
>>> assert flow('1', int, float, str) == pipe(int, float, str)('1')

Requires our mypy plugin.

Parameters:
  • instance (TypeVar(_InstanceType)) –

  • functions (TypeVar(_PipelineStepType)) –

Return type:

TypeVar(_ReturnType)

pipe(*functions)[source]

Allows to compose a value and up to 7 functions that use this value.

We use a custom mypy plugin to make sure types are correct. Otherwise, it is currently impossible to properly type this function.

Each next function uses the previous result as an input parameter. Here’s how it should be used:

>>> from returns.pipeline import pipe

>>> # => executes: str(float(int('1')))
>>> assert pipe(int, float, str)('1') == '1.0'

This function is closely related to pipe:

>>> from returns.pipeline import flow
>>> assert pipe(int, float, str)('1') == flow('1', int, float, str)
managed(use, release)[source]

Allows to run managed computation.

Managed computations consist of three steps:

  1. acquire when we get some initial resource to work with

  2. use when the main logic is done

  3. release when we release acquired resource

Let’s look at the example:

  1. We need to acquire an opened file to read it later

  2. We need to use acquired file to read its content

  3. We need to release the acquired file in the end

Here’s a code example:

>>> from returns.pipeline import managed
>>> from returns.io import IOSuccess, IOFailure, impure_safe

>>> class Lock(object):
...     '''Example class to emulate state to acquire and release.'''
...     def __init__(self, default: bool = False) -> None:
...         self.set = default
...     def __eq__(self, lock) -> bool:  # we need this for testing
...         return self.set == lock.set
...     def release(self) -> None:
...         self.set = False

>>> pipeline = managed(
...     lambda lock: IOSuccess(lock) if lock.set else IOFailure(False),
...     lambda lock, use_result: impure_safe(lock.release)(),
... )

>>> assert pipeline(IOSuccess(Lock(True))) == IOSuccess(Lock(False))
>>> assert pipeline(IOSuccess(Lock())) == IOFailure(False)
>>> assert pipeline(IOFailure('no lock')) == IOFailure('no lock')

Implementation

This class requires some explanation.

First of all, we modeled this function as a class, so it can be partially applied easily.

Secondly, we used imperative approach of programming inside this class. Functional approached was 2 times slower. And way more complex to read and understand.

Lastly, we try to hide these two things for the end user. We pretend that this is not a class, but a function. We also do not break a functional abstraction for the end user. It is just an implementation detail.

Type inference does not work so well with lambda functions. But, we do not recommend to use this function with lambda functions.

Parameters:
  • use (Callable[[TypeVar(_FirstType)], KindN[TypeVar(_IOResultLikeType, bound= IOResultLikeN), TypeVar(_UpdatedType), TypeVar(_SecondType), TypeVar(_ThirdType)]]) –

  • release (Callable[[TypeVar(_FirstType), Result[TypeVar(_UpdatedType), TypeVar(_SecondType)]], KindN[TypeVar(_IOResultLikeType, bound= IOResultLikeN), None, TypeVar(_SecondType), TypeVar(_ThirdType)]]) –

Return type:

Kinded[Callable[[KindN[TypeVar(_IOResultLikeType, bound= IOResultLikeN), TypeVar(_FirstType), TypeVar(_SecondType), TypeVar(_ThirdType)]], KindN[TypeVar(_IOResultLikeType, bound= IOResultLikeN), TypeVar(_UpdatedType), TypeVar(_SecondType), TypeVar(_ThirdType)]]]

is_successful(container)[source]

Determines if a container was successful or not.

>>> from returns.maybe import Some, Nothing
>>> from returns.result import Failure, Success
>>> from returns.io import IOSuccess, IOFailure

>>> assert is_successful(Some(1))
>>> assert not is_successful(Nothing)

>>> assert is_successful(Success(1))
>>> assert not is_successful(Failure(1))

>>> assert is_successful(IOSuccess(1))
>>> assert not is_successful(IOFailure(1))

This function can work with containers that are instance of returns.interfaces.unwrappable.Unwrappable.

Parameters:

container (Unwrappable) –

Return type:

bool