Teleporting tons of data to PostgreSQL

Today I will share some useful architectural solutions that arose during the development of our tool for mass analysis of the performance of PostgeSQL servers , and which help us now to "fit" full monitoring and analysis of more than a thousand hosts into the same hardware, which at first was barely enough for one hundred ...





Intro



Let me remind you of some introductory notes:



  • we are building a service that receives information from the logs of PostgreSQL servers
  • collecting logs, we want to do something with them (parse, analyze, request additional information) online
  • everything collected and "analyzed" must be saved somewhere


Let's talk about the last point - how all this can be delivered to the PostgreSQL storage . In our case, such data is multiples of the original - load statistics in the context of a specific application and plan template, resource consumption and the calculation of derived problems with an accuracy of a single plan node, monitoring locks, and much more.

More fully about the principles of the service can be seen in the video report and read in the article "Mass optimization of PostgreSQL queries" .


push vs pull



There are two main models for obtaining logs or some other constantly arriving metrics:



  • push - there are many peer-to-peer receivers on the service , on the monitored servers - some local agent periodically dumping the accumulated information into the service
  • pull - on the service, each process / thread / coroutine / ... processes information from only one "own" source , the receipt of data from which is initiated by itself


Each of these models has positive and negative sides.



push



The interaction is initiated by the observed node:



... is beneficial if:



  • you have a lot of sources (hundreds of thousands)
  • the load on them does not differ much among themselves and does not exceed ~ 1rps
  • some complicated processing is not needed




Example: the receiver of the OFD operator receiving checks from each client cash register.



... causes problems:



  • locks / deadlocks when trying to write dictionaries / analytics / aggregates in the context of the monitoring object from different streams
  • the worst utilization of the cache of each BL process / connection to the database - for example, the same connection to the database must first be written to one table or index segment, and immediately to another
  • a special agent is needed to be placed on each source, which increases the load on it
  • high overhead in network interaction - headers have to "tie" the sending of each packet, and not the entire connection to the source as a whole


pull



The initiator is a specific host / process / thread of the collector, which "binds" the node to itself and independently extracts data from the "target":



... is beneficial if:



  • you have few sources (hundreds-thousands)
  • there is almost always a load from them, sometimes it reaches 1Krps
  • requires complex processing with segmentation by source




Example: loader / analyzer of trades in the context of each trading platform.



... causes problems:



  • limiting resources for processing one source by one process (CPU core), since it cannot be "spread" across two recipients
  • a coordinator is needed to dynamically redistribute the load from sources to existing processes / threads / resources


Since our load model for PostgreSQL monitoring clearly gravitated towards the pull algorithm, and the resources of one process and the core of a modern CPU are quite enough for us for one source, we settled on it.



Pull-pull logs



Our communication with the server provided for very many network operations and work with slaboformatirovannymi text strings , so as a collector core JavaScript went perfect in his incarnation as a server Node.js .



The simplest solution for getting data from the server log turned out to be "mirroring" the entire log file to the console using a simple linux command tail -F <current.log>. Only our console is not simple, but virtual - inside a secure connection to the server stretched over the SSH protocol .



Therefore, sitting on the second side of the SSH-connection, the collector receives a full copy of all log traffic as input. And if necessary, it asks the server for extended system information about the current state of affairs.



Why not syslog



There are two main reasons:



  1. syslogworks on a push model, so it is impossible to quickly manage the processing load of the stream generated by it at the receiving point. That is, if some pair of hosts suddenly began to "pour" thousands of plans of slow requests, then it is extremely difficult to separate their processing across different nodes.



    Processing here means not so much "stupid" reception / parsing of the log, as parsing plans and calculating the real resource intensity of each of the nodes .
  2. PostgreSQL, , «» (relation/page/tuple/...).

    «DBA: ».


-



In principle, other solutions could be used as a DBMS for storing data parsed from the log, but the volume of incoming information of 150-200GB / day does not leave too much room for maneuver. Therefore, we also chose PostgreSQL as a storage.



- PostgreSQL for storing logs? Seriously?

- Firstly, there are far from only and not so much logs as various analytical representations . Secondly, "you just don't know how to cook them!" :)






Server settings



This point is subjective and strongly depends on your hardware, but we have made the following principles for ourselves for configuring the PostgreSQL host for active recording.



File system settings

The most significant factor affecting write performance is the [not] correct mounting of the data partition. We have chosen the following rules:



  • the PGDATA directory is mounted (in the case of ext4) with parametersnoatime,nodiratime,barrier=0,errors=remount-ro,data=writeback,nobh
  • directory PGDATA / pg_stat_tmp is moved totmpfs
  • the PGDATA / pg_wal directory is moved to another medium, if it is reasonable


see PostgreSQL File System Tuning



Choosing the Optimal I / O Scheduler

By default, many distributions have selected as the I / O schedulercfq , sharpened for "desktop" use, in RedHat and CentOS - noop. But it turned out to be more useful for us deadline.



see PostgreSQL vs. I / O schedulers (cfq, noop and deadline)



Reducing the size of the "dirty" cache

This parameter vm.dirty_background_bytessets the size of the cache in bytes, upon reaching which the system starts the background process of flushing it to disk. There is a similar, but mutually exclusive parameter vm.dirty_background_ratio- it sets the same value as a percentage of the total memory size - by default, it is set, and not "... bytes".



On most distributions it is 10%, on CentOS it is 5%. This means that with a total server memory of 16GB, the system may try to write more than 850MB to disk one time, resulting in a peak IOps load.



We decrease it experimentally until the recording peaks begin to smooth out. From experience, to avoid spikes, the size should be less than the maximum media throughput (in IOps) times the memory page size. That is, for example, for 7K IOps (~ 7000 x 4096) - about 28MB.



see Configuring Linux Kernel Options for PostgreSQL Optimization



Settings in postgresql.conf

What parameters should be seen, twisted to speed up the recording. Everything here is purely individual, so I will give only some thoughts on the topic:



  • shared_buffers - it should be made smaller, since with targeted recording of especially overlapping "common" data, processes do not arise
  • synchronous_commit = off - you can always disable waiting for commit writing if you trust the battery of your RAID controller
  • fsync- if the data is not at all critical, you can try to turn it off - "in the limit" you can even get in-memory DB


Database table structure



I have already published some articles about optimization of physical data storage:





But about different keys in the data - there was not yet. I'll tell you about them.



Foreign Keys are evil for write-heavy systems. In fact, these are "crutches" that do not allow a careless programmer to write to the database what supposedly should not be there.



Many developers are accustomed to the fact that logically related business entities at the level of describing database tables must be linked through FK. But this is not the case!



Of course, this point very much depends on the goals that you set when writing data to the database. If you are not a bank (and if you are also a bank, then not processing!), Then the need for FK in a heavy-write database is a big question.



"Technically" each FK makes a separate SELECT when inserting a recordfrom the referenced table. Now look at the table where you are actively writing, where you have 2-3 FKs hanging, and evaluate whether it is worth it for your specific task to provide a kind-of-integrity drop in performance by 3-4 times ... Or is a logical connection by value enough? We've removed all the FKs here.



UUID Keys are good . Since the probability of a collision of UUIDs generated at different unrelated points is extremely small, this load (by generating some surrogate IDs) can be safely removed from the database to the "consumer". The use of UUIDs is good practice in connected, non-sync distributed systems.

You can read about other variants of unique identifiers in PostgreSQL in the article "PostgreSQL Antipatterns: Unique Identifiers ".


Natural keys are good too , even if they consist of multiple fields. One should be afraid not of composite keys, but of an extra surrogate PK-field and an index on it in a loaded table, which you can easily do without.



At the same time, no one forbids combining approaches. For example, we have a surrogate UUID assigned to a "batch" of sequential log records related to one original transaction (since there is simply no natural key), but a pair is used as a PK (pack::uuid, recno::int2), where recnois the "natural" sequence number of the record within the batch.



"Endless" COPY streams



PostgreSQL, like OC, “doesn’t like” when data is written to it in huge batches (such INSERTas 1000 lines ). But it COPYis much more tolerant of balanced write streams (through ). But they must be able to cook very carefully.



  1. Since at the previous stage we removed all FKs , now we can write information about itself packand a set of related ones reordin an arbitrary order, asynchronously . In this case, it is most effective to keep a constantly active COPYchannel for each target table .
  2. , , «», ( — COPY-) . , — 100, .
  3. , , . . .



    , , «» , . , .
  4. , node-pg, PostgreSQL Node.js, API — stream.write(data) COPY- true, , false, .





    , , « », COPY .
  5. COPY- LRU «». .




Here it should be noted the main advantage that we got with this scheme of reading and writing logs - in our database "facts" become available for analysis almost online , after a few seconds.



Refinement with a file



Everything seems to be good. Where is the "rake" in the previous scheme? Let's start simple ...



Over-sync



One of the big troubles of loaded systems is the over-synchronization of some operations that don't require it. Sometimes “because they didn't notice”, sometimes “it was easier that way,” but sooner or later you have to get rid of it.



This is easy to achieve. We have already set up almost 1000 servers for monitoring, each is processed by a separate logical thread, and each thread dumps the accumulated information for sending to the database with a certain frequency, like this:



setInterval(writeDB, interval)


The problem here lies precisely in the fact that all streams start at about the same time, so the moments of sending they almost always coincide "to the point".





Fortunately, this is easy enough to fix - by adding a "random" time interval both for the start time and for the interval:



setInterval(writeDB, interval * (1 + 0.1 * (Math.random() - 0.5)))






This method allows you to statistically "spread" the load on the recording, turning it into almost uniform.



Scaling by CPU cores



One processor core is clearly not enough for our entire load, but the cluster module will help us here , which allows us to easily manage the creation of child processes and communicate with them via IPC.



Now we have 16 child processes for 16 processor cores - and that's good, we can use the entire CPU! But in each process we write in 16 target plates , and when the peak load comes, we also open additional COPY channels. That is, based on constantly 256+ actively writing threads ... oh! Such chaos has no good effect on disk performance, and the base started to burn.



This was especially sad when trying to write down some common dictionaries - for example, the same request text that came from different nodes - unnecessary locks, waiting ...





Let's "reverse" the situation - that is, let the child processes still collect and process information from their sources, but do not write to the database! Instead, let them send a message via IPC to master, and he already writes something where it needs to be:





Whoever immediately saw the problem in the scheme of the previous paragraph - well done. It lies exactly in the moment that master is also a process with limited resources. Therefore, at some point, we discovered that he was already starting to burn - it simply stopped coping with shifting all-all threads to the database, since it was also limited by the resources of one CPU core . As a result, we left most of the least loaded "dictionary" streams to be written through master, and the most loaded, but not requiring additional processing, were returned to workers:





Multicollector



But even one node is not enough to service all the available load - it's time to think about linear scaling. The solution was a multi- collector, self-balancing according to the load, with a coordinator at the head.





Each master dumps the current load of all his workers to him, and in response receives recommendations on which node monitoring should be transferred to another worker or even to another collector. There will be a separate article about such balancing algorithms.



Pooling and Queue Limiting



The next correct question is what to do with write streams when there is a sudden peak load .



After all, we cannot open more and more new connections to the base endlessly - it is ineffective, and it will not help. A trivial solution - let's limit it so that we have no more than 16 simultaneously active threads for each of the target tables. But what to do with the data that we still "did not have time" to write? ..



If this "surge" of load is exactly peak, that is, short-term , then we can temporarily save the data in the queue in the memory of the collector itself. As soon as some channel to the base is released, we retrieve the record from the queue and send it to the stream.



Yes, this requires the collector to have some buffer for storing queues, but it is rather small and is quickly released:





Queue priorities



The attentive reader, having looked at the previous picture, was again puzzled, “what will happen when the memory is completely exhausted ? ..” There are already few options - someone will have to be sacrificed.



But not all the records we want to deliver to the database are "equally useful." It is in our interest to write them down as many as possible, quantitatively. The primitive "exponential prioritization" by the size of the written string will help us with this:



let priority = Math.trunc(Math.log2(line.length));
queue[priority].push(line);


Accordingly, when writing to a channel, we always start scooping up from the "lower" queues - it's just that each separate line is shorter there, and we can send them quantitatively more:



let qkeys = Object.keys(queue);
qkeys.sort((x, y) => x.valueOf() - y.valueOf()); // - - !


Defeating blockages



Now let's go back two steps. By the time we decided to leave a maximum of 16 threads to the address of one table. If the target table is "streaming", that is, the records do not correlate with each other, everything is fine. Maximum - we will have "physical" locks at the disk level.



But if this is a table of aggregates or even a "dictionary", then when we try to write rows with the same PK from different streams, we will receive a wait on the lock, or even a deadlock. It's sad ...



But after all, what to write - we define ourselves! The key point is not to try to write one PK from different places .



That is, when passing the queue, we immediately look to see if some thread is already writing to the same table (we remember that they are all in the common address space of one process) with such PK. If not, we take it for ourselves and write it down into the in-memory dictionary "for ourselves", if it is already someone else's, we put it in the queue.



At the end of the transaction, we simply "clean out" the attachment "to ourselves" from the dictionary.



A little proof



First, with LRU, the “first” connections and the PostgreSQL processes that serve them are almost always running all the time. This means that the OS switches them between CPU cores much less often , minimizing downtime.





Secondly, if you work with the same processes on the server side almost all the time, the chances that some two processes will be active at the same time are sharply reduced - accordingly, the peak load on the CPU as a whole decreases (gray area in the second graph from the left ) and LA falls because fewer processes are waiting for their turn.





That's all for today.



And let me remind you that with the help of explain.tensor.ru you can see various options for visualizing the query execution plan , which will help you clearly see the problem areas.



All Articles