What Asynchrony Should Be In Python

In the past few years, the asyncasynchronous programming keyword and semantics have permeated many popular programming languages: JavaScript , Rust , C # , and many others . Of course, Python has it too async/await, it was introduced in Python 3.5.



In this article I want to discuss the problems of asynchronous code, speculate about alternatives, and propose a new approach to support both synchronous and asynchronous applications at the same time.



Function color



When asynchronous functions are included in a programming language, it essentially splits in two. Red functions appear (or asynchronous) and some functions remain blue (synchronous).



The main problem is that blue functions cannot call red ones, but red ones can potentially cause blue ones. In Python, for example, this is partially true: asynchronous functions can only call synchronous non-blocking functions. But it is impossible to determine from the description whether the function is blocking or not. Python is a scripting language.



This split leads to the division of the language into two subsets: synchronous and asynchronous. Python 3.5 was released over five years ago, but asyncit is still not nearly as well supported as Python's synchronous capabilities.



You can read more about function colors in this great article .



Duplicate code



Different colors of functions mean code duplication in practice.



Imagine you are developing a CLI tool for retrieving the size of a web page and you want to maintain both synchronous and asynchronous ways of doing it. For example, this is necessary if you are writing a library and do not know how your code will be used. And it's not only about PyPI libraries, but also about our own libraries with common logic for various services, written, for example, in Django and aiohttp. Although, of course, independent applications are mostly written either only synchronously or only asynchronously.



Let's start with the synchronous pseudocode:



def fetch_resource_size(url: str) -> int:
    response = client_get(url)
    return len(response.content)


Looks nice. Now let's look at the asynchronous analogue:



async def fetch_resource_size(url: str) -> int:
    response = await client_get(url)
    return len(response.content)


In general, this is the same code, but with the addition of the words asyncand await. And I didn't make it up - compare the code examples in the tutorial on httpx:





There is exactly the same picture.



Abstraction and composition



It turns out that you need to rewrite all the synchronous code and arrange here and there asyncand awaitso that the program becomes asynchronous.



Two principles can help solve this problem. First, let's rewrite the imperative pseudocode into functional. This will allow you to see the picture more clearly.



def fetch_resource_size(url: str) -> Abstraction[int]:
    return client_get(url).map(
        lambda response: len(response.content),
    )


You ask what this method is .map, what does it do. This is how the composition of complex abstractions and pure functions occurs in a functional style. This allows you to create a new abstraction with a new state from an existing one. Suppose it client_get(url)initially returns Abstraction[Response], and the call .map(lambda response: len(response.content))converts the response to the required instance Abstraction[int].



It becomes clear what to do next. Notice how easily we moved from a few independent steps to sequential function calls. In addition, we changed the type of response: now the function returns some abstraction.



Let's rewrite the code to work with the asynchronous version:



def fetch_resource_size(url: str) -> AsyncAbstraction[int]:
    return client_get(url).map(
        lambda response: len(response.content),
    )


The only thing that is different is the return type - AsyncAbstraction. The rest of the code is exactly the same. You no longer need to use keywords asyncand await. awaitis not used at all ( for the sake of this everything was started ), and without it there is no point in async.



The last thing is to decide which client we need: asynchronous or synchronous.



def fetch_resource_size(
    client_get: Callable[[str], AbstactionType[Response]],
    url: str,
) -> AbstactionType[int]:
    return client_get(url).map(
        lambda response: len(response.content),
    )


client_getis now a callable type argument that takes a URL string as input and returns some type AbstractionTypeover the object Response. AbstractionType- either Abstractionor AsyncAbstractionfrom the previous examples.



When we pass Abstraction, the code runs synchronously, when AsyncAbstraction- the same code automatically starts running asynchronously.



IOResult and FutureResult



Fortunately, the dry-python/returnscorrect abstractions are already in place .



Let me introduce you to a type-safe, mypy-friendly, framework-agnostic tool written entirely in Python. It has amazing, handy, wonderful abstractions that can be used in absolutely any project.



Synchronous option



First, we'll add dependencies to get a reproducible example.



pip install returns httpx anyio


Next, let's turn the pseudocode into working Python code. Let's start with the synchronous option.



from typing import Callable
 
import httpx
 
from returns.io import IOResultE, impure_safe
 
def fetch_resource_size(
    client_get: Callable[[str], IOResultE[httpx.Response]],
    url: str,
) -> IOResultE[int]:
    return client_get(url).map(
        lambda response: len(response.content),
    )
 
print(fetch_resource_size(
    impure_safe(httpx.get),
    'https://sobolevn.me',
))
# => <IOResult: <Success: 27972>>


It took a couple of changes to get working code:



  • Use IOResultEis a functional way of handling synchronous IO errors ( exceptions don't always work ). Types based on Resultallow you to simulate exceptions, but with separate values Failure(). Successful exits are then wrapped in a type Success. Usually nobody cares about exceptions, but we do.
  • Use httpxwhich can handle synchronous and asynchronous requests.
  • Use a function impure_safeto convert the return type httpx.getto abstraction IOResultE.


Asynchronous option



Let's try to do the same in asynchronous code.



from typing import Callable
 
import anyio
import httpx
 
from returns.future import FutureResultE, future_safe
 
def fetch_resource_size(
    client_get: Callable[[str], FutureResultE[httpx.Response]],
    url: str,
) -> FutureResultE[int]:
    return client_get(url).map(
        lambda response: len(response.content),
    )
 
page_size = fetch_resource_size(
    future_safe(httpx.AsyncClient().get),
    'https://sobolevn.me',
)
print(page_size)
print(anyio.run(page_size.awaitable))
# => <FutureResult: <coroutine object async_map at 0x10b17c320>>
# => <IOResult: <Success: 27972>>


You see: the result is exactly the same, but now the code is running asynchronously. However, its main part has not changed. However, you need to pay attention to the following:



  • Simultaneous IOResultEchanged to asynchronous FutureResultE, impure_safe- on future_safe. It works the same, but returns the different abstraction: FutureResultE.
  • Used AsyncClientfrom httpx.
  • The resulting value FutureResultneeds to be run because red functions cannot call themselves.
  • The utility anyiois used to show that this approach works with any asynchronous library: asyncio, trio, curio.


Two in one



I'll show you how to combine the synchronous and asynchronous versions in one type-safe API.



Higher Kinded Types and type-class for working with IO have not yet been released (they will appear in 0.15.0), so I will illustrate in the usual @overload:



from typing import Callable, Union, overload
 
import anyio
import httpx
 
from returns.future import FutureResultE, future_safe
from returns.io import IOResultE, impure_safe
 
@overload
def fetch_resource_size(
    client_get: Callable[[str], IOResultE[httpx.Response]],
    url: str,
) -> IOResultE[int]:
    """Sync case."""
 
@overload
def fetch_resource_size(
    client_get: Callable[[str], FutureResultE[httpx.Response]],
    url: str,
) -> FutureResultE[int]:
    """Async case."""
 
def fetch_resource_size(
    client_get: Union[
        Callable[[str], IOResultE[httpx.Response]],
        Callable[[str], FutureResultE[httpx.Response]],
    ],
    url: str,
) -> Union[IOResultE[int], FutureResultE[int]]:
    return client_get(url).map(
        lambda response: len(response.content),
    )


We use decorators to @overloaddescribe what input data is allowed and what type of return value will be. You @overloadcan read more about the decorator in my other article .



A function call with a synchronous or asynchronous client looks like this:



# Sync:
print(fetch_resource_size(
    impure_safe(httpx.get),
    'https://sobolevn.me',
))
# => <IOResult: <Success: 27972>>
 
# Async:
page_size = fetch_resource_size(
    future_safe(httpx.AsyncClient().get),
    'https://sobolevn.me',
)
print(page_size)
print(anyio.run(page_size.awaitable))
# => <FutureResult: <coroutine object async_map at 0x10b17c320>>
# => <IOResult: <Success: 27972>>


As you can see, fetch_resource_sizein the synchronous variant it immediately returns IOResultand executes it. Whereas in the asynchronous version, an event-loop is required, as for a regular coroutine. anyioused to display results.



In mypythis code there are no comments:



Β» mypy async_and_sync.py
Success: no issues found in 1 source file


Let's see what happens if something is messed up.



---lambda response: len(response.content),
+++lambda response: response.content,


mypy easily finds new errors:



Β» mypy async_and_sync.py
async_and_sync.py:33: error: Argument 1 to "map" of "IOResult" has incompatible type "Callable[[Response], bytes]"; expected "Callable[[Response], int]"
async_and_sync.py:33: error: Argument 1 to "map" of "FutureResult" has incompatible type "Callable[[Response], bytes]"; expected "Callable[[Response], int]"
async_and_sync.py:33: error: Incompatible return value type (got "bytes", expected "int")


Sleight of hand and no magic: writing asynchronous code with the correct abstractions requires only good old fashioned composition. But the fact that we get the same API for different types is really great. For example, it allows you to abstract from how HTTP requests work: synchronously or asynchronously.



Hopefully this example has shown how awesome asynchronous programs can really be. And if you try dry-python / returns , you will find many more interesting things. In the new version, we have already made the necessary primitives for working with Higher Kinded Types and all the necessary interfaces. The code above can now be rewritten like this:



from typing import Callable, TypeVar

import anyio
import httpx

from returns.future import future_safe
from returns.interfaces.specific.ioresult import IOResultLike2
from returns.io import impure_safe
from returns.primitives.hkt import Kind2, kinded

_IOKind = TypeVar('_IOKind', bound=IOResultLike2)

@kinded
def fetch_resource_size(
    client_get: Callable[[str], Kind2[_IOKind, httpx.Response, Exception]],
    url: str,
) -> Kind2[_IOKind, int, Exception]:
    return client_get(url).map(
        lambda response: len(response.content),
    )


# Sync:
print(fetch_resource_size(
    impure_safe(httpx.get),
    'https://sobolevn.me',
))
# => <IOResult: <Success: 27972>>

# Async:
page_size = fetch_resource_size(
    future_safe(httpx.AsyncClient().get),
    'https://sobolevn.me',
)
print(page_size)
print(anyio.run(page_size.awaitable))
# => <FutureResult: <coroutine object async_map at 0x10b17c320>>
# => <IOResult: <Success: 27972>>


See the `master` branch, it already works there.



More dry-python features



Here are a few other useful dry-python features that I'm most proud of.





from returns.curry import curry, partial
 
def example(a: int, b: str) -> float:
    ...
 
reveal_type(partial(example, 1))
# note: Revealed type is 'def (b: builtins.str) -> builtins.float'
 
reveal_type(curry(example))
# note: Revealed type is 'Overload(def (a: builtins.int) -> def (b: builtins.str) -> builtins.float, def (a: builtins.int, b: builtins.str) -> builtins.float)'


This allows you to use @curry, for example, like this:



@curry
def example(a: int, b: str) -> float:
    return float(a + len(b))
 
assert example(1, 'abc') == 4.0
assert example(1)('abc') == 4.0




Using a custom mypy plugin, you can build functional pipelines that return types.



from returns.pipeline import flow
assert flow(
    [1, 2, 3],
    lambda collection: max(collection),
    lambda max_number: -max_number,
) == -3


Usually in typed code it is very inconvenient to work with lambdas, because their arguments are always of type Any. The inference mypysolves this problem.



With its help, we now know what lambda collection: max(collection)type Callable[[List[int]], int], but lambda max_number: -max_numbersimple Callable[[int], int]. In flowcan pass any number of arguments, and they will work fine. All thanks to the plugin.





The abstraction over FutureResult, which we talked about earlier, can be used to explicitly pass dependencies to asynchronous programs in a functional style.



Plans for the future



Before we finally release version 1.0, we have to solve several important tasks:



  • Implement Higher Kinded Types or their emulation ( issue ).
  • Add proper type-classes to implement the required abstractions ( issue ).
  • Maybe try a compiler mypyc, which will potentially allow typed annotated Python programs to be compiled into a binary. Then the c code dry-python/returnswill work several times faster ( issue ).
  • Explore new ways to write functional code in Python, such as "do-notation" .


conclusions



Any problem can be solved with composition and abstraction. In this article, we looked at how to solve the function colors problem and write simple, readable, and flexible code that works. And do type checking.



Try dry-python / returns and join the Russian Python Week : at the conference, dry-python core developer Pablo Aguilar will hold a workshop on using dry-python to write business logic.



All Articles