I am the creator of Dependency Injector . This is a dependency injection framework for Python.
This is another tutorial for building applications with the Dependency Injector.
Today I want to show how you can build an asynchronous daemon based on a module
asyncio
.
The manual consists of the following parts:
- What are we going to build?
- Tool check
- Project structure
- Preparing the environment
- Logging and configuration
- Dispatcher
- Monitoring example.com
- Monitoring httpbin.org
- Tests
- Conclusion
The completed project can be found on Github .
To start, it is desirable to have:
- Initial knowledge of
asyncio
- Understanding the principle of dependency injection
What are we going to build?
We will be building a monitoring daemon that will monitor access to web services.
The daemon will send requests to example.com and httpbin.org every few seconds. When receiving a response, it will write the following data to the log:
- Response code
- Number of bytes in response
- Time taken to complete the request
Tool check
We will be using Docker and docker-compose . Let's check that they are installed:
docker --version
docker-compose --version
The output should look something like this:
Docker version 19.03.12, build 48a66213fe
docker-compose version 1.26.2, build eefe0d31
If Docker or docker-compose are not installed, they need to be installed before proceeding. Follow these guides:
The tools are ready. Let's move on to the structure of the project.
Project structure
Create a project folder and go to it:
mkdir monitoring-daemon-tutorial
cd monitoring-daemon-tutorial
Now we need to create an initial project structure. Create files and folders following the structure below. All files will be empty for now. We'll fill them in later.
Initial project structure:
./
├── monitoringdaemon/
│ ├── __init__.py
│ ├── __main__.py
│ └── containers.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt
The initial project structure is ready. We will expand it in the following sections.
Next, we are waiting for the preparation of the environment.
Preparing the environment
In this section, we will prepare the environment for starting our daemon.
First you need to define dependencies. We will use packages like this:
dependency-injector
- dependency injection frameworkaiohttp
- web framework (we only need an http client)pyyaml
- library for parsing YAML files, used to read the configpytest
- testing frameworkpytest-asyncio
- helper library for testingasyncio
applicationspytest-cov
- helper library for measuring code coverage by tests
Let's add the following lines to the file
requirements.txt
:
dependency-injector
aiohttp
pyyaml
pytest
pytest-asyncio
pytest-cov
And execute in the terminal:
pip install -r requirements.txt
Next, we create
Dockerfile
. It will describe the process of building and starting our daemon. We will use it python:3.8-buster
as a base image.
Let's add the following lines to the file
Dockerfile
:
FROM python:3.8-buster
ENV PYTHONUNBUFFERED=1
WORKDIR /code
COPY . /code/
RUN apt-get install openssl \
&& pip install --upgrade pip \
&& pip install -r requirements.txt \
&& rm -rf ~/.cache
CMD ["python", "-m", "monitoringdaemon"]
The last step is to define the settings
docker-compose
.
Let's add the following lines to the file
docker-compose.yml
:
version: "3.7"
services:
monitor:
build: ./
image: monitoring-daemon
volumes:
- "./:/code"
Everything is ready. Let's start building the image and check that the environment is configured correctly.
Let's execute in the terminal:
docker-compose build
The build process can take several minutes. At the end, you should see:
Successfully built 5b4ee5e76e35
Successfully tagged monitoring-daemon:latest
After the build process is complete, start the container:
docker-compose up
You will see:
Creating network "monitoring-daemon-tutorial_default" with the default driver
Creating monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitoring-daemon-tutorial_monitor_1 exited with code 0
The environment is ready. The container starts and exits with code
0
.
The next step is to set up logging and reading the configuration file.
Logging and configuration
In this section, we will configure logging and reading the configuration file.
Let's start by adding the main part of our application - the dependency container (further just the container). The container will contain all the components of the application.
Let's add the first two components. This is a configuration object and a function for configuring logging.
Let's edit
containers.py
:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
class ApplicationContainer(containers.DeclarativeContainer):
"""Application container."""
config = providers.Configuration()
configure_logging = providers.Callable(
logging.basicConfig,
stream=sys.stdout,
level=config.log.level,
format=config.log.format,
)
We used the configuration parameters before setting their values. This is the principle by which the provider worksConfiguration
.
First we use, then we set the values.
Logging settings will be contained in the configuration file.
Let's edit
config.yml
:
log:
level: "INFO"
format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"
Now let's define a function that will start our daemon. She is usually called
main()
. It will create a container. The container will be used to read the configuration file and call the logging settings function.
Let's edit
__main__.py
:
"""Main module."""
from .containers import ApplicationContainer
def main() -> None:
"""Run the application."""
container = ApplicationContainer()
container.config.from_yaml('config.yml')
container.configure_logging()
if __name__ == '__main__':
main()
The container is the first object in the application. It is used to get all other objects.
Configuration logging and reading is configured. In the next section, we will create a monitoring task manager.
Dispatcher
It's time to add a monitoring task manager.
The dispatcher will contain a list of monitoring tasks and control their execution. He will carry out each task according to the schedule. Class
Monitor
- base class for monitoring tasks. To create specific tasks, you need to add child classes and implement the method check()
.
Let's add a dispatcher and a base class for the monitoring task.
Let's create
dispatcher.py
and monitors.py
in the package monitoringdaemon
:
./
├── monitoringdaemon/
│ ├── __init__.py
│ ├── __main__.py
│ ├── containers.py
│ ├── dispatcher.py
│ └── monitors.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt
Let's add the following lines to the file
monitors.py
:
"""Monitors module."""
import logging
class Monitor:
def __init__(self, check_every: int) -> None:
self.check_every = check_every
self.logger = logging.getLogger(self.__class__.__name__)
async def check(self) -> None:
raise NotImplementedError()
and to the file
dispatcher.py
:
""""Dispatcher module."""
import asyncio
import logging
import signal
import time
from typing import List
from .monitors import Monitor
class Dispatcher:
def __init__(self, monitors: List[Monitor]) -> None:
self._monitors = monitors
self._monitor_tasks: List[asyncio.Task] = []
self._logger = logging.getLogger(self.__class__.__name__)
self._stopping = False
def run(self) -> None:
asyncio.run(self.start())
async def start(self) -> None:
self._logger.info('Starting up')
for monitor in self._monitors:
self._monitor_tasks.append(
asyncio.create_task(self._run_monitor(monitor)),
)
asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, self.stop)
asyncio.get_event_loop().add_signal_handler(signal.SIGINT, self.stop)
await asyncio.gather(*self._monitor_tasks, return_exceptions=True)
self.stop()
def stop(self) -> None:
if self._stopping:
return
self._stopping = True
self._logger.info('Shutting down')
for task, monitor in zip(self._monitor_tasks, self._monitors):
task.cancel()
self._logger.info('Shutdown finished successfully')
@staticmethod
async def _run_monitor(monitor: Monitor) -> None:
def _until_next(last: float) -> float:
time_took = time.time() - last
return monitor.check_every - time_took
while True:
time_start = time.time()
try:
await monitor.check()
except asyncio.CancelledError:
break
except Exception:
monitor.logger.exception('Error executing monitor check')
await asyncio.sleep(_until_next(last=time_start))
The dispatcher needs to be added to the container.
Let's edit
containers.py
:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
from . import dispatcher
class ApplicationContainer(containers.DeclarativeContainer):
"""Application container."""
config = providers.Configuration()
configure_logging = providers.Callable(
logging.basicConfig,
stream=sys.stdout,
level=config.log.level,
format=config.log.format,
)
dispatcher = providers.Factory(
dispatcher.Dispatcher,
monitors=providers.List(
# TODO: add monitors
),
)
Each component is added to the container.
Finally, we need to update the function
main()
. We will get the dispatcher from the container and call its method run()
.
Let's edit
__main__.py
:
"""Main module."""
from .containers import ApplicationContainer
def main() -> None:
"""Run the application."""
container = ApplicationContainer()
container.config.from_yaml('config.yml')
container.configure_logging()
dispatcher = container.dispatcher()
dispatcher.run()
if __name__ == '__main__':
main()
Now let's start the daemon and test its work.
Let's execute in the terminal:
docker-compose up
The output should look like this:
Starting monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitor_1 | [2020-08-08 16:12:35,772] [INFO] [Dispatcher]: Starting up
monitor_1 | [2020-08-08 16:12:35,774] [INFO] [Dispatcher]: Shutting down
monitor_1 | [2020-08-08 16:12:35,774] [INFO] [Dispatcher]: Shutdown finished successfully
monitoring-daemon-tutorial_monitor_1 exited with code 0
Everything works correctly. The dispatcher starts and stops since there are no monitoring tasks.
By the end of this section, the skeleton of our demon is ready. In the next section, we will add the first monitoring task.
Monitoring example.com
In this section, we will add a monitoring task that will monitor access to http://example.com .
We'll start by extending our class model with a new type of monitoring task
HttpMonitor
.
HttpMonitor
it is a child class Monitor
. We will implement the check () method. It will send an HTTP request and log the received response. The details of the HTTP request will be delegated to the class HttpClient
.
Let's add first
HttpClient
.
Let's create a file
http.py
in a package monitoringdaemon
:
./
├── monitoringdaemon/
│ ├── __init__.py
│ ├── __main__.py
│ ├── containers.py
│ ├── dispatcher.py
│ ├── http.py
│ └── monitors.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt
And add the following lines to it:
"""Http client module."""
from aiohttp import ClientSession, ClientTimeout, ClientResponse
class HttpClient:
async def request(self, method: str, url: str, timeout: int) -> ClientResponse:
async with ClientSession(timeout=ClientTimeout(timeout)) as session:
async with session.request(method, url) as response:
return response
Next, you need to add
HttpClient
to the container.
Let's edit
containers.py
:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
from . import http, dispatcher
class ApplicationContainer(containers.DeclarativeContainer):
"""Application container."""
config = providers.Configuration()
configure_logging = providers.Callable(
logging.basicConfig,
stream=sys.stdout,
level=config.log.level,
format=config.log.format,
)
http_client = providers.Factory(http.HttpClient)
dispatcher = providers.Factory(
dispatcher.Dispatcher,
monitors=providers.List(
# TODO: add monitors
),
)
We are now ready to add
HttpMonitor
. Let's add it to the module monitors
.
Let's edit
monitors.py
:
"""Monitors module."""
import logging
import time
from typing import Dict, Any
from .http import HttpClient
class Monitor:
def __init__(self, check_every: int) -> None:
self.check_every = check_every
self.logger = logging.getLogger(self.__class__.__name__)
async def check(self) -> None:
raise NotImplementedError()
class HttpMonitor(Monitor):
def __init__(
self,
http_client: HttpClient,
options: Dict[str, Any],
) -> None:
self._client = http_client
self._method = options.pop('method')
self._url = options.pop('url')
self._timeout = options.pop('timeout')
super().__init__(check_every=options.pop('check_every'))
@property
def full_name(self) -> str:
return '{0}.{1}(url="{2}")'.format(__name__, self.__class__.__name__, self._url)
async def check(self) -> None:
time_start = time.time()
response = await self._client.request(
method=self._method,
url=self._url,
timeout=self._timeout,
)
time_end = time.time()
time_took = time_end - time_start
self.logger.info(
'Response code: %s, content length: %s, request took: %s seconds',
response.status,
response.content_length,
round(time_took, 3)
)
We are all set to add the check for http://example.com . We need to make two changes to the container:
- Add a factory
example_monitor
. - Transfer
example_monitor
to the dispatcher.
Let's edit
containers.py
:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
from . import http, monitors, dispatcher
class ApplicationContainer(containers.DeclarativeContainer):
"""Application container."""
config = providers.Configuration()
configure_logging = providers.Callable(
logging.basicConfig,
stream=sys.stdout,
level=config.log.level,
format=config.log.format,
)
http_client = providers.Factory(http.HttpClient)
example_monitor = providers.Factory(
monitors.HttpMonitor,
http_client=http_client,
options=config.monitors.example,
)
dispatcher = providers.Factory(
dispatcher.Dispatcher,
monitors=providers.List(
example_monitor,
),
)
The provider
example_monitor
is dependent on configuration values. Let's add these values:
Edit
config.yml
:
log:
level: "INFO"
format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"
monitors:
example:
method: "GET"
url: "http://example.com"
timeout: 5
check_every: 5
Everything is ready. We start the daemon and check the work.
We execute in the terminal:
docker-compose up
And we see a similar conclusion:
Starting monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitor_1 | [2020-08-08 17:06:41,965] [INFO] [Dispatcher]: Starting up
monitor_1 | [2020-08-08 17:06:42,033] [INFO] [HttpMonitor]: Check
monitor_1 | GET http://example.com
monitor_1 | response code: 200
monitor_1 | content length: 648
monitor_1 | request took: 0.067 seconds
monitor_1 |
monitor_1 | [2020-08-08 17:06:47,040] [INFO] [HttpMonitor]: Check
monitor_1 | GET http://example.com
monitor_1 | response code: 200
monitor_1 | content length: 648
monitor_1 | request took: 0.073 seconds
Our daemon can monitor the availability of access to http://example.com .
Let's add monitoring https://httpbin.org .
Monitoring httpbin.org
In this section, we will add a monitoring task that will monitor access to http://example.com .
Adding a monitoring task for https://httpbin.org will be easier because all the components are ready. We just need to add a new provider to the container and update the configuration.
Let's edit
containers.py
:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
from . import http, monitors, dispatcher
class ApplicationContainer(containers.DeclarativeContainer):
"""Application container."""
config = providers.Configuration()
configure_logging = providers.Callable(
logging.basicConfig,
stream=sys.stdout,
level=config.log.level,
format=config.log.format,
)
http_client = providers.Factory(http.HttpClient)
example_monitor = providers.Factory(
monitors.HttpMonitor,
http_client=http_client,
options=config.monitors.example,
)
httpbin_monitor = providers.Factory(
monitors.HttpMonitor,
http_client=http_client,
options=config.monitors.httpbin,
)
dispatcher = providers.Factory(
dispatcher.Dispatcher,
monitors=providers.List(
example_monitor,
httpbin_monitor,
),
)
Let's edit
config.yml
:
log:
level: "INFO"
format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"
monitors:
example:
method: "GET"
url: "http://example.com"
timeout: 5
check_every: 5
httpbin:
method: "GET"
url: "https://httpbin.org/get"
timeout: 5
check_every: 5
Let's start the daemon and check the logs.
Let's execute in the terminal:
docker-compose up
And we see a similar conclusion:
Starting monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitor_1 | [2020-08-08 18:09:08,540] [INFO] [Dispatcher]: Starting up
monitor_1 | [2020-08-08 18:09:08,618] [INFO] [HttpMonitor]: Check
monitor_1 | GET http://example.com
monitor_1 | response code: 200
monitor_1 | content length: 648
monitor_1 | request took: 0.077 seconds
monitor_1 |
monitor_1 | [2020-08-08 18:09:08,722] [INFO] [HttpMonitor]: Check
monitor_1 | GET https://httpbin.org/get
monitor_1 | response code: 200
monitor_1 | content length: 310
monitor_1 | request took: 0.18 seconds
monitor_1 |
monitor_1 | [2020-08-08 18:09:13,619] [INFO] [HttpMonitor]: Check
monitor_1 | GET http://example.com
monitor_1 | response code: 200
monitor_1 | content length: 648
monitor_1 | request took: 0.066 seconds
monitor_1 |
monitor_1 | [2020-08-08 18:09:13,681] [INFO] [HttpMonitor]: Check
monitor_1 | GET https://httpbin.org/get
monitor_1 | response code: 200
monitor_1 | content length: 310
monitor_1 | request took: 0.126 seconds
The functional part is completed. The daemon monitors the availability of access to http://example.com and https://httpbin.org .
In the next section, we'll add some tests.
Tests
It would be nice to add some tests. Let's do that.
Create a file
tests.py
in a package monitoringdaemon
:
./
├── monitoringdaemon/
│ ├── __init__.py
│ ├── __main__.py
│ ├── containers.py
│ ├── dispatcher.py
│ ├── http.py
│ ├── monitors.py
│ └── tests.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt
and add the following lines to it:
"""Tests module."""
import asyncio
import dataclasses
from unittest import mock
import pytest
from .containers import ApplicationContainer
@dataclasses.dataclass
class RequestStub:
status: int
content_length: int
@pytest.fixture
def container():
container = ApplicationContainer()
container.config.from_dict({
'log': {
'level': 'INFO',
'formant': '[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s',
},
'monitors': {
'example': {
'method': 'GET',
'url': 'http://fake-example.com',
'timeout': 1,
'check_every': 1,
},
'httpbin': {
'method': 'GET',
'url': 'https://fake-httpbin.org/get',
'timeout': 1,
'check_every': 1,
},
},
})
return container
@pytest.mark.asyncio
async def test_example_monitor(container, caplog):
caplog.set_level('INFO')
http_client_mock = mock.AsyncMock()
http_client_mock.request.return_value = RequestStub(
status=200,
content_length=635,
)
with container.http_client.override(http_client_mock):
example_monitor = container.example_monitor()
await example_monitor.check()
assert 'http://fake-example.com' in caplog.text
assert 'response code: 200' in caplog.text
assert 'content length: 635' in caplog.text
@pytest.mark.asyncio
async def test_dispatcher(container, caplog, event_loop):
caplog.set_level('INFO')
example_monitor_mock = mock.AsyncMock()
httpbin_monitor_mock = mock.AsyncMock()
with container.example_monitor.override(example_monitor_mock), \
container.httpbin_monitor.override(httpbin_monitor_mock):
dispatcher = container.dispatcher()
event_loop.create_task(dispatcher.start())
await asyncio.sleep(0.1)
dispatcher.stop()
assert example_monitor_mock.check.called
assert httpbin_monitor_mock.check.called
To run the tests, run in the terminal:
docker-compose run --rm monitor py.test monitoringdaemon/tests.py --cov=monitoringdaemon
You should get a similar result:
platform linux -- Python 3.8.3, pytest-6.0.1, py-1.9.0, pluggy-0.13.1
rootdir: /code
plugins: asyncio-0.14.0, cov-2.10.0
collected 2 items
monitoringdaemon/tests.py .. [100%]
----------- coverage: platform linux, python 3.8.3-final-0 -----------
Name Stmts Miss Cover
----------------------------------------------------
monitoringdaemon/__init__.py 0 0 100%
monitoringdaemon/__main__.py 9 9 0%
monitoringdaemon/containers.py 11 0 100%
monitoringdaemon/dispatcher.py 43 5 88%
monitoringdaemon/http.py 6 3 50%
monitoringdaemon/monitors.py 23 1 96%
monitoringdaemon/tests.py 37 0 100%
----------------------------------------------------
TOTAL 129 18 86%
Notice how in the testtest_example_monitor
we substituteHttpClient
mock using the method.override()
. In this way, you can override the return value of any provider.
The same actions are performed in the testtest_dispatcher
to replace monitoring tasks with mocks.
Conclusion
We built a monitoring daemon based on the
asyncio
principle of dependency injection. We used Dependency Injector as a dependency injection framework.
The advantage you get with Dependency Injector is container.
The container starts to pay off when you need to understand or change the structure of your application. With a container, it's easy because all the components of the application and their dependencies are in one place:
"""Application containers module."""
import logging
import sys
from dependency_injector import containers, providers
from . import http, monitors, dispatcher
class ApplicationContainer(containers.DeclarativeContainer):
"""Application container."""
config = providers.Configuration()
configure_logging = providers.Callable(
logging.basicConfig,
stream=sys.stdout,
level=config.log.level,
format=config.log.format,
)
http_client = providers.Factory(http.HttpClient)
example_monitor = providers.Factory(
monitors.HttpMonitor,
http_client=http_client,
options=config.monitors.example,
)
httpbin_monitor = providers.Factory(
monitors.HttpMonitor,
http_client=http_client,
options=config.monitors.httpbin,
)
dispatcher = providers.Factory(
dispatcher.Dispatcher,
monitors=providers.List(
example_monitor,
httpbin_monitor,
),
)
A container as a map of your application. You always know what depends on what.
What's next?
- Learn more about Dependency Injector on GitHub
- Check out the documentation at Read the Docs
- Have a question or find a bug? Open an issue on Github