Writing a simple ORM with the ability to change databases on the fly

image



Hello, Habr! Karma was drained due to a careless comment under the holivar article, which means you need to write an interesting (I hope) post and rehabilitate yourself.



I have been using a telegram server client in php for several years. And like many users - tired of the constant growth in memory consumption. Some sessions can take from 1 to 8 gigabytes of RAM! Database support has been promised for a long time, but there has been no progress in this direction. I had to solve the problem myself :) The popularity of the open source project imposed interesting requirements on the pull request:



  1. Backward compatibility . All existing sessions should continue to work in the new version (session is a serialized instance of the application in a file);
  2. Freedom of choice of database . The ability to change the storage type without losing data and at any time, since users have different configurations of the environment;
  3. Extensibility . Ease of adding new types of databases;
  4. Save interface . Application code that manipulates data should not change;
  5. Asynchrony . The project uses amphp, so all database operations must be non-blocking;


For details I invite everyone under cat.



What will we transfer



Most of MadelineProto's memory is occupied by chats, users and files. For example, in the peer cache, I have more than 20 thousand entries. These are all the users that the account has ever seen (including members of all groups), as well as channels, bots and groups. The older and more active the account, the more data will be in memory. These are tens and hundreds of megabytes, and most of them are not used. But you cannot clear the entire cache, because the telegram will immediately severely restrict the account when trying to receive the same data multiple times. For example, after re-creating the session on my public demo server, telegrams within a week answered most of the requests with the FLOOD_WAIT error and nothing really worked. After the cache warmed up, everything returned to normal.



From a code point of view, this data is stored as arrays in the properties of a pair of classes.



Architecture



Based on the requirements, a scheme was born:



  • All "heavy" arrays are replaced with objects that implement ArrayAccess;
  • For each type of database, we create our own classes that inherit the base one;
  • Objects are created and written to properties during __consrtuct and __awake;
  • The abstract factory selects the desired class for the object, depending on the selected database in the application settings;
  • If the application already has another type of storage, then we read all data from there and write the array to the new storage.


Asynchronous world problems



The first thing I did was create interfaces and a class for storing arrays in memory. This was the default, identical in behavior to the older version of the program. On the first evening, I was very excited about the success of the prototype. The code was nice and simple. So far it has not been discovered that it is impossible to use generators inside methods of the Iterator interface and inside methods responsible for unset and isset.



It should be clarified here that amphp uses generator syntax to implement async in php. Yield becomes analogous to async ... await from js. If a method uses asynchrony, then in order to get a result from it, you need to wait for this result in the code using yield. For instance:



<?php

include 'vendor/autoload.php';

$MadelineProto = new \danog\MadelineProto\API('session.madeline');
$MadelineProto->async(true);

$MadelineProto->loop(function() use($MadelineProto) {
    $myAsyncFunction = function() use($MadelineProto): \Generator {
        $me = yield $MadelineProto->start();
        yield $MadelineProto->echo(json_encode($me, JSON_PRETTY_PRINT|JSON_UNESCAPED_UNICODE));
    };

    yield $myAsyncFunction();
});


If from string
yield $myAsyncFunction();
remove yield, then the application will be terminated before this code is executed. We will not get the result.



Adding yield before calling methods and functions is not very difficult. But since the ArrayAccess interface is used, the methods are not called directly. For example, unset () calls offsetUnset (), and isset () calls offsetIsset (). The situation is similar with foreach iterators when using the Iterator interface.



Adding yield in front of built-in methods raises an error as these methods are not designed to work with generators. A little more in the comments: here and here .



I had to compromise and rewrite the code to use my own methods. Fortunately, there were very few such places. In most cases, arrays were used for reading or writing by key. This functionality made great friends with generators.



The resulting interface is:



<?php

use Amp\Producer;
use Amp\Promise;

interface DbArray extends DbType, \ArrayAccess, \Countable
{
    public function getArrayCopy(): Promise;
    public function isset($key): Promise;
    public function offsetGet($offset): Promise;
    public function offsetSet($offset, $value);
    public function offsetUnset($offset): Promise;
    public function count(): Promise;
    public function getIterator(): Producer;

    /**
     * @deprecated
     * @internal
     * @see DbArray::isset();
     *
     * @param mixed $offset
     *
     * @return bool
     */
    public function offsetExists($offset);
}


Examples of working with data



<?php
...
//
$existingChat = yield $this->chats[$user['id']];

//. 
yield $this->chats[$user['id']] = $user;
//   yield,           .
$this->chats[$user['id']] = $user;


//unset
yield $this->chats->offsetUnset($id);

//foreach
$iterator = $this->chats->getIterator();
while (yield $iterator->advance()) {
    [$key, $value] = $iterator->getCurrent();
    //  
}


Data storage



The easiest way to store data is serialized. I had to abandon the use of json in order to support objects. The table has two main columns: key and value.



An example of a sql query to create a table:



            CREATE TABLE IF NOT EXISTS `{$this->table}`
            (
                `key` VARCHAR(255) NOT NULL,
                `value` MEDIUMBLOB NULL,
                `ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
                PRIMARY KEY (`key`)
            )
            ENGINE = InnoDB
            CHARACTER SET 'utf8mb4' 
            COLLATE 'utf8mb4_general_ci'


Every time the application starts, we try to create a table for each property. Telegram clients are not recommended to restart more than once every few hours, so we will not have several requests to create tables per second :)



Since the primary key is without autoincrement, then data insertion and updating can be done with one request, as in a regular array:



INSERT INTO `{$this->table}` 
            SET `key` = :index, `value` = :value 
            ON DUPLICATE KEY UPDATE `value` = :value


A table with a name in the format% account_id% _% class% _% variable_name% is created for each variable. But when you first start the application, there is no account yet. In this case, you have to generate a random temporary id with the tmp prefix. At each launch, the class of each variable checks if the account id has appeared. If id is present, then tables will be renamed.



Indexes



The structure of the database is as simple as possible so that new properties will be added automatically in the future. There are no connections. Only PRIMARY key indexes are used. But there are situations when you need to search in other fields.



For example, there is an array / table chats. The key in it is the chat id. But often you have to search by username. When the application was storing data in arrays, the search by username was performed as usual by iterating over the array in foreach. This search worked at an acceptable speed in memory, but not in the database. Therefore, another table / array was created and the corresponding property in the class. The key is username, the value is the chat id. The only drawback of this approach is that you have to write additional code to synchronize the two tables.



Caching



Local mysql is fast, but a little caching never hurts. Especially if the same value is used several times in a row. For example, first we check the presence of a chat in the database, and then we get some data from it.



A simple bike trait was written .



<?php

namespace danog\MadelineProto\Db;

use Amp\Loop;
use danog\MadelineProto\Logger;

trait ArrayCacheTrait
{
    /**
     * Values stored in this format:
     * [
     *      [
     *          'value' => mixed,
     *          'ttl' => int
     *      ],
     *      ...
     * ].
     * @var array
     */
    protected array $cache = [];
    protected string $ttl = '+5 minutes';
    private string $ttlCheckInterval = '+1 minute';

    protected function getCache(string $key, $default = null)
    {
        $cacheItem = $this->cache[$key] ?? null;
        $result = $default;

        if (\is_array($cacheItem)) {
            $result = $cacheItem['value'];
            $this->cache[$key]['ttl'] = \strtotime($this->ttl);
        }

        return $result;
    }

    /**
     * Save item in cache.
     *
     * @param string $key
     * @param $value
     */
    protected function setCache(string $key, $value): void
    {
        $this->cache[$key] = [
            'value' => $value,
            'ttl' => \strtotime($this->ttl),
        ];
    }

    /**
     * Remove key from cache.
     *
     * @param string $key
     */
    protected function unsetCache(string $key): void
    {
        unset($this->cache[$key]);
    }

    protected function startCacheCleanupLoop(): void
    {
        Loop::repeat(\strtotime($this->ttlCheckInterval, 0) * 1000, fn () => $this->cleanupCache());
    }

    /**
     * Remove all keys from cache.
     */
    protected function cleanupCache(): void
    {
        $now = \time();
        $oldKeys = [];
        foreach ($this->cache as $cacheKey => $cacheValue) {
            if ($cacheValue['ttl'] < $now) {
                $oldKeys[] = $cacheKey;
            }
        }
        foreach ($oldKeys as $oldKey) {
            $this->unsetCache($oldKey);
        }

        Logger::log(
            \sprintf(
                "cache for table:%s; keys left: %s; keys removed: %s",
                $this->table,
                \count($this->cache),
                \count($oldKeys)
            ),
            Logger::VERBOSE
        );
    }
}


I would like to pay special attention to startCacheCleanupLoop. Thanks to the magic of amphp, invalidating the cache is as simple as possible. The callback starts at the specified interval, loops through all values ​​and looks at the ts field, which stores the timestamp of the last call to this element. If the call was more than 5 minutes ago (configurable in the settings), then the element is deleted. It is very easy to implement a ttl analog from redis or memcache using amphp. All this happens in the background and does not block the main thread.



With the help of cache and asynchrony, not only reads are accelerated, but also writes.



Here is the source code for the method that writes data to the database.



/**
     * Set value for an offset.
     *
     * @link https://php.net/manual/en/arrayiterator.offsetset.php
     *
     * @param string $index <p>
     * The index to set for.
     * </p>
     * @param $value
     *
     * @throws \Throwable
     */

    public function offsetSet($index, $value): Promise
    {
        if ($this->getCache($index) === $value) {
            return call(fn () =>null);
        }

        $this->setCache($index, $value);

        $request = $this->request(
            "
            INSERT INTO `{$this->table}` 
            SET `key` = :index, `value` = :value 
            ON DUPLICATE KEY UPDATE `value` = :value
        ",
            [
                'index' => $index,
                'value' => \serialize($value),
            ]
        );

        //Ensure that cache is synced with latest insert in case of concurrent requests.
        $request->onResolve(fn () => $this->setCache($index, $value));

        return $request;
    }


$ this-> request creates a Promise that writes data asynchronously. And operations with the cache occur synchronously. That is, you can not wait for a write to the database and at the same time be sure that read operations will immediately start returning new data.



The onResolve method from amphp turned out to be very useful. After the insert is complete, the data will be written to the cache again. If some write operation is late and the cache and the base start to differ, then the cache will be updated with the value written to the base last. Those. our cache will again become consistent with the base.



Source



β†’ Link to pull request



And just like that another user added support for postgre. It took only 5 minutes to write the instructions for it.



The amount of code could be reduced by moving the duplicate methods into the general abstract class SqlArray.



One more thing



It was noticed that while downloading media files from telegram, the standard garbage collector php does not cope with the work and pieces of the file remain in memory. Typically, the leaks were the same size as the file. Possible Cause: The garbage collector is automatically triggered when 10,000 links accumulate. In our case, the links were few (dozens), but each could refer to megabytes of data in memory. It was very lazy to study thousands of lines of code with the mtproto implementation. Why not try the elegant crutch with \ gc_collect_cycles (); first?



Surprisingly, it solved the problem. This means that it is enough to configure the periodic start of cleaning. Fortunately, amphp provides simple tools for background execution at specified intervals.



Clearing memory every second seemed too easy and not very effective. I settled on an algorithm that checks the memory gain since the last cleanup. Clearing occurs if the gain is greater than the threshold.



<?php

namespace danog\MadelineProto\MTProtoTools;

use Amp\Loop;
use danog\MadelineProto\Logger;

class GarbageCollector
{
    /**
     * Ensure only one instance of GarbageCollector
     * 		when multiple instances of MadelineProto running.
     * @var bool
     */
    public static bool $lock = false;

    /**
     * How often will check memory.
     * @var int
     */
    public static int $checkIntervalMs = 1000;

    /**
     * Next cleanup will be triggered when memory consumption will increase by this amount.
     * @var int
     */
    public static int $memoryDiffMb = 1;

    /**
     * Memory consumption after last cleanup.
     * @var int
     */
    private static int $memoryConsumption = 0;

    public static function start(): void
    {
        if (static::$lock) {
            return;
        }
        static::$lock = true;

        Loop::repeat(static::$checkIntervalMs, static function () {
            $currentMemory = static::getMemoryConsumption();
            if ($currentMemory > static::$memoryConsumption + static::$memoryDiffMb) {
                \gc_collect_cycles();
                static::$memoryConsumption = static::getMemoryConsumption();
                $cleanedMemory = $currentMemory - static::$memoryConsumption;
                Logger::log("gc_collect_cycles done. Cleaned memory: $cleanedMemory Mb", Logger::VERBOSE);
            }
        });
    }

    private static function getMemoryConsumption(): int
    {
        $memory = \round(\memory_get_usage()/1024/1024, 1);
        Logger::log("Memory consumption: $memory Mb", Logger::ULTRA_VERBOSE);
        return (int) $memory;
    }
}



All Articles