Monitoring daemon on Asyncio + Dependency Injector - dependency injection tutorial

Hi,



I am the creator of Dependency Injector . This is a dependency injection framework for Python.



This is another tutorial for building applications with the Dependency Injector.



Today I want to show how you can build an asynchronous daemon based on a module asyncio.



The manual consists of the following parts:



  1. What are we going to build?
  2. Tool check
  3. Project structure
  4. Preparing the environment
  5. Logging and configuration
  6. Dispatcher
  7. Monitoring example.com
  8. Monitoring httpbin.org
  9. Tests
  10. Conclusion


The completed project can be found on Github .



To start, it is desirable to have:



  • Initial knowledge of asyncio
  • Understanding the principle of dependency injection


What are we going to build?



We will be building a monitoring daemon that will monitor access to web services.



The daemon will send requests to example.com and httpbin.org every few seconds. When receiving a response, it will write the following data to the log:



  • Response code
  • Number of bytes in response
  • Time taken to complete the request






Tool check



We will be using Docker and docker-compose . Let's check that they are installed:



docker --version
docker-compose --version


The output should look something like this:



Docker version 19.03.12, build 48a66213fe
docker-compose version 1.26.2, build eefe0d31


If Docker or docker-compose are not installed, they need to be installed before proceeding. Follow these guides:





The tools are ready. Let's move on to the structure of the project.



Project structure



Create a project folder and go to it:



mkdir monitoring-daemon-tutorial
cd monitoring-daemon-tutorial


Now we need to create an initial project structure. Create files and folders following the structure below. All files will be empty for now. We'll fill them in later.



Initial project structure:



./
├── monitoringdaemon/
│   ├── __init__.py
│   ├── __main__.py
│   └── containers.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt


The initial project structure is ready. We will expand it in the following sections.



Next, we are waiting for the preparation of the environment.



Preparing the environment



In this section, we will prepare the environment for starting our daemon.



First you need to define dependencies. We will use packages like this:



  • dependency-injector - dependency injection framework
  • aiohttp - web framework (we only need an http client)
  • pyyaml - library for parsing YAML files, used to read the config
  • pytest - testing framework
  • pytest-asyncio- helper library for testing asyncioapplications
  • pytest-cov - helper library for measuring code coverage by tests


Let's add the following lines to the file requirements.txt:



dependency-injector
aiohttp
pyyaml
pytest
pytest-asyncio
pytest-cov


And execute in the terminal:



pip install -r requirements.txt


Next, we create Dockerfile. It will describe the process of building and starting our daemon. We will use it python:3.8-busteras a base image.



Let's add the following lines to the file Dockerfile:



FROM python:3.8-buster

ENV PYTHONUNBUFFERED=1

WORKDIR /code
COPY . /code/

RUN apt-get install openssl \
 && pip install --upgrade pip \
 && pip install -r requirements.txt \
 && rm -rf ~/.cache

CMD ["python", "-m", "monitoringdaemon"]


The last step is to define the settings docker-compose.



Let's add the following lines to the file docker-compose.yml:



version: "3.7"

services:

  monitor:
    build: ./
    image: monitoring-daemon
    volumes:
      - "./:/code"


Everything is ready. Let's start building the image and check that the environment is configured correctly.



Let's execute in the terminal:



docker-compose build


The build process can take several minutes. At the end, you should see:



Successfully built 5b4ee5e76e35
Successfully tagged monitoring-daemon:latest


After the build process is complete, start the container:



docker-compose up


You will see:



Creating network "monitoring-daemon-tutorial_default" with the default driver
Creating monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitoring-daemon-tutorial_monitor_1 exited with code 0


The environment is ready. The container starts and exits with code 0.



The next step is to set up logging and reading the configuration file.



Logging and configuration



In this section, we will configure logging and reading the configuration file.



Let's start by adding the main part of our application - the dependency container (further just the container). The container will contain all the components of the application.



Let's add the first two components. This is a configuration object and a function for configuring logging.



Let's edit containers.py:



"""Application containers module."""

import logging
import sys

from dependency_injector import containers, providers


class ApplicationContainer(containers.DeclarativeContainer):
    """Application container."""

    config = providers.Configuration()

    configure_logging = providers.Callable(
        logging.basicConfig,
        stream=sys.stdout,
        level=config.log.level,
        format=config.log.format,
    )


We used the configuration parameters before setting their values. This is the principle by which the provider works Configuration.



First we use, then we set the values.



Logging settings will be contained in the configuration file.



Let's edit config.yml:



log:
  level: "INFO"
  format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"


Now let's define a function that will start our daemon. She is usually called main(). It will create a container. The container will be used to read the configuration file and call the logging settings function.



Let's edit __main__.py:



"""Main module."""

from .containers import ApplicationContainer


def main() -> None:
    """Run the application."""
    container = ApplicationContainer()

    container.config.from_yaml('config.yml')
    container.configure_logging()


if __name__ == '__main__':
    main()


The container is the first object in the application. It is used to get all other objects.


Configuration logging and reading is configured. In the next section, we will create a monitoring task manager.



Dispatcher



It's time to add a monitoring task manager.



The dispatcher will contain a list of monitoring tasks and control their execution. He will carry out each task according to the schedule. Class Monitor- base class for monitoring tasks. To create specific tasks, you need to add child classes and implement the method check().





Let's add a dispatcher and a base class for the monitoring task.



Let's create dispatcher.pyand monitors.pyin the package monitoringdaemon:



./
├── monitoringdaemon/
│   ├── __init__.py
│   ├── __main__.py
│   ├── containers.py
│   ├── dispatcher.py
│   └── monitors.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt


Let's add the following lines to the file monitors.py:



"""Monitors module."""

import logging


class Monitor:

    def __init__(self, check_every: int) -> None:
        self.check_every = check_every
        self.logger = logging.getLogger(self.__class__.__name__)

    async def check(self) -> None:
        raise NotImplementedError()


and to the file dispatcher.py:



""""Dispatcher module."""

import asyncio
import logging
import signal
import time
from typing import List

from .monitors import Monitor


class Dispatcher:

    def __init__(self, monitors: List[Monitor]) -> None:
        self._monitors = monitors
        self._monitor_tasks: List[asyncio.Task] = []
        self._logger = logging.getLogger(self.__class__.__name__)
        self._stopping = False

    def run(self) -> None:
        asyncio.run(self.start())

    async def start(self) -> None:
        self._logger.info('Starting up')

        for monitor in self._monitors:
            self._monitor_tasks.append(
                asyncio.create_task(self._run_monitor(monitor)),
            )

        asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, self.stop)
        asyncio.get_event_loop().add_signal_handler(signal.SIGINT, self.stop)

        await asyncio.gather(*self._monitor_tasks, return_exceptions=True)

        self.stop()

    def stop(self) -> None:
        if self._stopping:
            return

        self._stopping = True

        self._logger.info('Shutting down')
        for task, monitor in zip(self._monitor_tasks, self._monitors):
            task.cancel()
        self._logger.info('Shutdown finished successfully')

    @staticmethod
    async def _run_monitor(monitor: Monitor) -> None:
        def _until_next(last: float) -> float:
            time_took = time.time() - last
            return monitor.check_every - time_took

        while True:
            time_start = time.time()

            try:
                await monitor.check()
            except asyncio.CancelledError:
                break
            except Exception:
                monitor.logger.exception('Error executing monitor check')

            await asyncio.sleep(_until_next(last=time_start))


The dispatcher needs to be added to the container.



Let's edit containers.py:



"""Application containers module."""

import logging
import sys

from dependency_injector import containers, providers

from . import dispatcher


class ApplicationContainer(containers.DeclarativeContainer):
    """Application container."""

    config = providers.Configuration()

    configure_logging = providers.Callable(
        logging.basicConfig,
        stream=sys.stdout,
        level=config.log.level,
        format=config.log.format,
    )

    dispatcher = providers.Factory(
        dispatcher.Dispatcher,
        monitors=providers.List(
            # TODO: add monitors
        ),
    )


Each component is added to the container.


Finally, we need to update the function main(). We will get the dispatcher from the container and call its method run().



Let's edit __main__.py:



"""Main module."""

from .containers import ApplicationContainer


def main() -> None:
    """Run the application."""
    container = ApplicationContainer()

    container.config.from_yaml('config.yml')
    container.configure_logging()

    dispatcher = container.dispatcher()
    dispatcher.run()


if __name__ == '__main__':
    main()


Now let's start the daemon and test its work.



Let's execute in the terminal:



docker-compose up


The output should look like this:



Starting monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitor_1  | [2020-08-08 16:12:35,772] [INFO] [Dispatcher]: Starting up
monitor_1  | [2020-08-08 16:12:35,774] [INFO] [Dispatcher]: Shutting down
monitor_1  | [2020-08-08 16:12:35,774] [INFO] [Dispatcher]: Shutdown finished successfully
monitoring-daemon-tutorial_monitor_1 exited with code 0


Everything works correctly. The dispatcher starts and stops since there are no monitoring tasks.



By the end of this section, the skeleton of our demon is ready. In the next section, we will add the first monitoring task.



Monitoring example.com



In this section, we will add a monitoring task that will monitor access to http://example.com .



We'll start by extending our class model with a new type of monitoring task HttpMonitor.



HttpMonitorit is a child class Monitor. We will implement the check () method. It will send an HTTP request and log the received response. The details of the HTTP request will be delegated to the class HttpClient.





Let's add first HttpClient.



Let's create a file http.pyin a package monitoringdaemon:



./
├── monitoringdaemon/
│   ├── __init__.py
│   ├── __main__.py
│   ├── containers.py
│   ├── dispatcher.py
│   ├── http.py
│   └── monitors.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt


And add the following lines to it:



"""Http client module."""

from aiohttp import ClientSession, ClientTimeout, ClientResponse


class HttpClient:

    async def request(self, method: str, url: str, timeout: int) -> ClientResponse:
        async with ClientSession(timeout=ClientTimeout(timeout)) as session:
            async with session.request(method, url) as response:
                return response


Next, you need to add HttpClientto the container.



Let's edit containers.py:



"""Application containers module."""

import logging
import sys

from dependency_injector import containers, providers

from . import http, dispatcher


class ApplicationContainer(containers.DeclarativeContainer):
    """Application container."""

    config = providers.Configuration()

    configure_logging = providers.Callable(
        logging.basicConfig,
        stream=sys.stdout,
        level=config.log.level,
        format=config.log.format,
    )

    http_client = providers.Factory(http.HttpClient)

    dispatcher = providers.Factory(
        dispatcher.Dispatcher,
        monitors=providers.List(
            # TODO: add monitors
        ),
    )


We are now ready to add HttpMonitor. Let's add it to the module monitors.



Let's edit monitors.py:



"""Monitors module."""

import logging
import time
from typing import Dict, Any

from .http import HttpClient


class Monitor:

    def __init__(self, check_every: int) -> None:
        self.check_every = check_every
        self.logger = logging.getLogger(self.__class__.__name__)

    async def check(self) -> None:
        raise NotImplementedError()


class HttpMonitor(Monitor):

    def __init__(
            self,
            http_client: HttpClient,
            options: Dict[str, Any],
    ) -> None:
        self._client = http_client
        self._method = options.pop('method')
        self._url = options.pop('url')
        self._timeout = options.pop('timeout')
        super().__init__(check_every=options.pop('check_every'))

    @property
    def full_name(self) -> str:
        return '{0}.{1}(url="{2}")'.format(__name__, self.__class__.__name__, self._url)

    async def check(self) -> None:
        time_start = time.time()

        response = await self._client.request(
            method=self._method,
            url=self._url,
            timeout=self._timeout,
        )

        time_end = time.time()
        time_took = time_end - time_start

        self.logger.info(
            'Response code: %s, content length: %s, request took: %s seconds',
            response.status,
            response.content_length,
            round(time_took, 3)
        )


We are all set to add the check for http://example.com . We need to make two changes to the container:



  • Add a factory example_monitor.
  • Transfer example_monitorto the dispatcher.


Let's edit containers.py:



"""Application containers module."""

import logging
import sys

from dependency_injector import containers, providers

from . import http, monitors, dispatcher


class ApplicationContainer(containers.DeclarativeContainer):
    """Application container."""

    config = providers.Configuration()

    configure_logging = providers.Callable(
        logging.basicConfig,
        stream=sys.stdout,
        level=config.log.level,
        format=config.log.format,
    )

    http_client = providers.Factory(http.HttpClient)

    example_monitor = providers.Factory(
        monitors.HttpMonitor,
        http_client=http_client,
        options=config.monitors.example,
    )

    dispatcher = providers.Factory(
        dispatcher.Dispatcher,
        monitors=providers.List(
            example_monitor,
        ),
    )


The provider example_monitoris dependent on configuration values. Let's add these values:



Edit config.yml:



log:
  level: "INFO"
  format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"

monitors:

  example:
    method: "GET"
    url: "http://example.com"
    timeout: 5
    check_every: 5


Everything is ready. We start the daemon and check the work.



We execute in the terminal:



docker-compose up


And we see a similar conclusion:



Starting monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitor_1  | [2020-08-08 17:06:41,965] [INFO] [Dispatcher]: Starting up
monitor_1  | [2020-08-08 17:06:42,033] [INFO] [HttpMonitor]: Check
monitor_1  |     GET http://example.com
monitor_1  |     response code: 200
monitor_1  |     content length: 648
monitor_1  |     request took: 0.067 seconds
monitor_1  |
monitor_1  | [2020-08-08 17:06:47,040] [INFO] [HttpMonitor]: Check
monitor_1  |     GET http://example.com
monitor_1  |     response code: 200
monitor_1  |     content length: 648
monitor_1  |     request took: 0.073 seconds


Our daemon can monitor the availability of access to http://example.com .



Let's add monitoring https://httpbin.org .



Monitoring httpbin.org



In this section, we will add a monitoring task that will monitor access to http://example.com .



Adding a monitoring task for https://httpbin.org will be easier because all the components are ready. We just need to add a new provider to the container and update the configuration.



Let's edit containers.py:



"""Application containers module."""

import logging
import sys

from dependency_injector import containers, providers

from . import http, monitors, dispatcher


class ApplicationContainer(containers.DeclarativeContainer):
    """Application container."""

    config = providers.Configuration()

    configure_logging = providers.Callable(
        logging.basicConfig,
        stream=sys.stdout,
        level=config.log.level,
        format=config.log.format,
    )

    http_client = providers.Factory(http.HttpClient)

    example_monitor = providers.Factory(
        monitors.HttpMonitor,
        http_client=http_client,
        options=config.monitors.example,
    )

    httpbin_monitor = providers.Factory(
        monitors.HttpMonitor,
        http_client=http_client,
        options=config.monitors.httpbin,
    )

    dispatcher = providers.Factory(
        dispatcher.Dispatcher,
        monitors=providers.List(
            example_monitor,
            httpbin_monitor,
        ),
    )


Let's edit config.yml:



log:
  level: "INFO"
  format: "[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s"

monitors:

  example:
    method: "GET"
    url: "http://example.com"
    timeout: 5
    check_every: 5

  httpbin:
    method: "GET"
    url: "https://httpbin.org/get"
    timeout: 5
    check_every: 5


Let's start the daemon and check the logs.



Let's execute in the terminal:



docker-compose up


And we see a similar conclusion:



Starting monitoring-daemon-tutorial_monitor_1 ... done
Attaching to monitoring-daemon-tutorial_monitor_1
monitor_1  | [2020-08-08 18:09:08,540] [INFO] [Dispatcher]: Starting up
monitor_1  | [2020-08-08 18:09:08,618] [INFO] [HttpMonitor]: Check
monitor_1  |     GET http://example.com
monitor_1  |     response code: 200
monitor_1  |     content length: 648
monitor_1  |     request took: 0.077 seconds
monitor_1  |
monitor_1  | [2020-08-08 18:09:08,722] [INFO] [HttpMonitor]: Check
monitor_1  |     GET https://httpbin.org/get
monitor_1  |     response code: 200
monitor_1  |     content length: 310
monitor_1  |     request took: 0.18 seconds
monitor_1  |
monitor_1  | [2020-08-08 18:09:13,619] [INFO] [HttpMonitor]: Check
monitor_1  |     GET http://example.com
monitor_1  |     response code: 200
monitor_1  |     content length: 648
monitor_1  |     request took: 0.066 seconds
monitor_1  |
monitor_1  | [2020-08-08 18:09:13,681] [INFO] [HttpMonitor]: Check
monitor_1  |     GET https://httpbin.org/get
monitor_1  |     response code: 200
monitor_1  |     content length: 310
monitor_1  |     request took: 0.126 seconds


The functional part is completed. The daemon monitors the availability of access to http://example.com and https://httpbin.org .



In the next section, we'll add some tests.



Tests



It would be nice to add some tests. Let's do that.



Create a file tests.pyin a package monitoringdaemon:



./
├── monitoringdaemon/
│   ├── __init__.py
│   ├── __main__.py
│   ├── containers.py
│   ├── dispatcher.py
│   ├── http.py
│   ├── monitors.py
│   └── tests.py
├── config.yml
├── docker-compose.yml
├── Dockerfile
└── requirements.txt


and add the following lines to it:



"""Tests module."""

import asyncio
import dataclasses
from unittest import mock

import pytest

from .containers import ApplicationContainer


@dataclasses.dataclass
class RequestStub:
    status: int
    content_length: int


@pytest.fixture
def container():
    container = ApplicationContainer()
    container.config.from_dict({
        'log': {
            'level': 'INFO',
            'formant': '[%(asctime)s] [%(levelname)s] [%(name)s]: %(message)s',
        },
        'monitors': {
            'example': {
                'method': 'GET',
                'url': 'http://fake-example.com',
                'timeout': 1,
                'check_every': 1,
            },
            'httpbin': {
                'method': 'GET',
                'url': 'https://fake-httpbin.org/get',
                'timeout': 1,
                'check_every': 1,
            },
        },
    })
    return container


@pytest.mark.asyncio
async def test_example_monitor(container, caplog):
    caplog.set_level('INFO')

    http_client_mock = mock.AsyncMock()
    http_client_mock.request.return_value = RequestStub(
        status=200,
        content_length=635,
    )

    with container.http_client.override(http_client_mock):
        example_monitor = container.example_monitor()
        await example_monitor.check()

    assert 'http://fake-example.com' in caplog.text
    assert 'response code: 200' in caplog.text
    assert 'content length: 635' in caplog.text


@pytest.mark.asyncio
async def test_dispatcher(container, caplog, event_loop):
    caplog.set_level('INFO')

    example_monitor_mock = mock.AsyncMock()
    httpbin_monitor_mock = mock.AsyncMock()

    with container.example_monitor.override(example_monitor_mock), \
            container.httpbin_monitor.override(httpbin_monitor_mock):

        dispatcher = container.dispatcher()
        event_loop.create_task(dispatcher.start())
        await asyncio.sleep(0.1)
        dispatcher.stop()

    assert example_monitor_mock.check.called
    assert httpbin_monitor_mock.check.called


To run the tests, run in the terminal:



docker-compose run --rm monitor py.test monitoringdaemon/tests.py --cov=monitoringdaemon


You should get a similar result:



platform linux -- Python 3.8.3, pytest-6.0.1, py-1.9.0, pluggy-0.13.1
rootdir: /code
plugins: asyncio-0.14.0, cov-2.10.0
collected 2 items

monitoringdaemon/tests.py ..                                    [100%]

----------- coverage: platform linux, python 3.8.3-final-0 -----------
Name                             Stmts   Miss  Cover
----------------------------------------------------
monitoringdaemon/__init__.py         0      0   100%
monitoringdaemon/__main__.py         9      9     0%
monitoringdaemon/containers.py      11      0   100%
monitoringdaemon/dispatcher.py      43      5    88%
monitoringdaemon/http.py             6      3    50%
monitoringdaemon/monitors.py        23      1    96%
monitoringdaemon/tests.py           37      0   100%
----------------------------------------------------
TOTAL                              129     18    86%


Notice how in the test test_example_monitorwe substitute HttpClientmock using the method .override(). In this way, you can override the return value of any provider.



The same actions are performed in the test test_dispatcherto replace monitoring tasks with mocks.





Conclusion



We built a monitoring daemon based on the asyncioprinciple of dependency injection. We used Dependency Injector as a dependency injection framework.



The advantage you get with Dependency Injector is container.



The container starts to pay off when you need to understand or change the structure of your application. With a container, it's easy because all the components of the application and their dependencies are in one place:



"""Application containers module."""

import logging
import sys

from dependency_injector import containers, providers

from . import http, monitors, dispatcher


class ApplicationContainer(containers.DeclarativeContainer):
    """Application container."""

    config = providers.Configuration()

    configure_logging = providers.Callable(
        logging.basicConfig,
        stream=sys.stdout,
        level=config.log.level,
        format=config.log.format,
    )

    http_client = providers.Factory(http.HttpClient)

    example_monitor = providers.Factory(
        monitors.HttpMonitor,
        http_client=http_client,
        options=config.monitors.example,
    )

    httpbin_monitor = providers.Factory(
        monitors.HttpMonitor,
        http_client=http_client,
        options=config.monitors.httpbin,
    )

    dispatcher = providers.Factory(
        dispatcher.Dispatcher,
        monitors=providers.List(
            example_monitor,
            httpbin_monitor,
        ),
    )




A container as a map of your application. You always know what depends on what.



What's next?






All Articles