Suppose you need to process a large (no, not that ... LARGE) number of records in PostgreSQL to count some aggregates. In the previous article , we analyzed various options for how this can be organized, and in this article we will see how not to block anyone especially , including the "oncoming stream" of data.
For example, it can be recalculating balances and maintaining consolidated sales by goods with their constant shipments, or aggregating balances and turnovers for accounting accounts, with massive changes in transactions, or something else ... In any management system of such tasks, there will be a slide, and VLSI is also no exception.
But all these situations have a common point - the number of changes is much greater than the number of target aggregates. For example: thousands of goods, each with tens of thousands of shipments per day.
In further considerations, we will rely on this model of "Amazon with goods".
Target - consolidated daily sales
We want to have sales aggregates broken down by product / day / quantity .
Specifically, in this case, we will make aggregates "right in the database" in order to be able to quickly and seamlessly receive them for various reports.
, , , - ClickHouse, . , , , , , , , ...
- ( ), 2PC- , - .
, - , , - , . , "" , .
?.. ...
, , "" - - , - - , "" , "" .
- "", - "--".
""
"". , - , .
"" . INSERT, UPDATE DELETE
, "" - INSERT
. , PostgreSQL , - unique-.
, " " , - . , - , "" flow-.
"" ""
flow- , / "" "- - ".
, , . , --, , .
, , , "" - … fail, :
DELETE FROM flow WHERE (it, dt) = (1, '2018-07-29') RETURNING *;
, - " ". , 1K/, 10K/.
:
SET statement_timeout = 1000;
, ! , , - flow- , . …
, "" , . " ", .
, flow , , "" .
DECLARE curs CURSOR FOR SELECT ctid, * FROM flow WHERE (it, dt) = (1, '2018-07-29') FOR UPDATE;
-- , ,
FETCH %d FROM curs;
DELETE FROM flow WHERE ctid = ANY(...);
flow ctid - "" .
SAVEPOINT
, "". %d FETCH? - , - ... ?
PostgreSQL " " SAVEPOINT/ROLLBACK TO, "" .
:
, ( - , ).
.
/ - COMMIT' .
, - COMMIT'.
, - , "" ( , ).
, !
BEGIN;
DECLARE curs CURSOR FOR SELECT ctid, * FROM flow WHERE (it, dt) = (1, '2018-07-29') FOR UPDATE;
FETCH 1 FROM curs;
DELETE FROM flow WHERE ctid = ANY(...);
-- processing
INSERT INTO agg ...
SAVEPOINT _1;
FETCH 2 FROM curs;
DELETE FROM flow WHERE ctid = ANY(...);
-- processing
INSERT INTO agg ...
SAVEPOINT _2;
FETCH 4 FROM curs;
DELETE FROM flow WHERE ctid = ANY(...);
-- processing...
INSERT INTO agg ...
-- oops! timeout exception!
ROLLBACK TO _2;
CLOSE curs;
COMMIT;
. , , !