Anatomy of a backpressure in jet streams

By reading numerous articles on the topic of reactive streams, the reader may come to the conclusion that:



  • backpressure is cool
  • backpressure is only available in libraries that implement the reactive streams specification
  • this specification is so complex that you shouldn't even try to implement it yourself


In this article I will try to show that:



  • backpressure is very simple
  • to implement an asynchronous backpressure, it is enough to make an asynchronous version of the semaphore
  • if there is an asynchronous semaphore implementation, the org.reactivestreams.Publisher interface is implemented in a few dozen lines of code


Backpressure is a feedback that adjusts the data producer's speed to match the consumer's speed. In the absence of such a connection, the faster producer can overflow the consumer's buffer or, if the buffer is dimensionless, exhaust all the RAM.



In multithreaded programming, this problem was solved by Dijkstroy, who proposed a new synchronization mechanism - the semaphore. A semaphore can be thought of as a permission counter. It is assumed that the producer requests permission from the semaphore before committing a resource-intensive action. If the semaphore is empty, then the producer thread is blocked.



Asynchronous programs cannot block threads, so they cannot access an empty semaphore for permission (but they can do all other semaphore operations). They must block their execution in another way. This other way is that they simply leave the worker thread they were running on, but before that they arrange to return to work as soon as the semaphore is full.



The most elegant way to pause and resume an asynchronous program is to structure it as a dataflow actor with ports :







A dataflow model - actors with ports, the directed connections between their ports, and initial tokens. Taken from: A Structured Description Of Dataflow Actors And Its Application



There are input and output ports. The input ports receive tokens (messages and signals) from the output ports of other actors. If the input port contains tokens, and the output port has a place for placing tokens, then it is considered active. If all ports of the actor are active, it is sent for execution. Thus, when resuming its work, the actor program can safely read tokens from input ports and write to the weekend. This simple mechanism contains all the wisdom of asynchronous programming. Allocating ports as separate actor subobjects greatly simplifies coding of asynchronous programs and allows you to increase their diversity by combining ports of different types.



The classic Hewitt actor contains 2 ports - one is visible, with a buffer for incoming messages, the other is a hidden binary that blocks when the actor is sent for execution and thus prevents the actor from restarting until the end of the initial launch. The desired asynchronous semaphore is a cross between these two ports. Like the buffer for messages, it can store many tokens, and like a hidden port, these tokens are black, that is, indistinguishable, like in Petri nets, and a token counter is enough to store them.



At the first level of the hierarchy, we have a class AbstractActorwith three nested classes - base Portand derivatives AsyncSemaPortand InPort, as well as with a mechanism for launching an actor for execution in the absence of blocked ports. In short, it looks like this:



public abstract class AbstractActor {
    /**    */
    private int blocked = 0;

    protected synchronized void restart() {
            controlPort.unBlock();
    }

    private synchronized void incBlockCount() {
        blocked++;
    }

    private synchronized void decBlockCount() {
        blocked--;
        if (blocked == 0) {
            controlPort.block();
            excecutor.execute(this::run);
        }
    }

    protected abstract void turn() throws Throwable;

    /**   */
    private void run() {
        try {
            turn();
            restart();
        } catch (Throwable throwable) {
            whenError(throwable);
        }
    }
}


It contains a minimal set of port classes:



Port- base class of all ports



    protected  class Port {
        private boolean isBlocked = true;

        public Port() {
            incBlockCount();
        }

        protected synchronized void block() {
            if (isBlocked) {
                return;
            }
            isBlocked = true;
            incBlockCount();
        }

        protected synchronized void unBlock() {
            if (!isBlocked) {
                return;
            }
            isBlocked = false;
            decBlockCount();
        }
    }


Asynchronous semaphore:



    public class AsyncSemaPort extends Port {
        private long permissions = 0;

        public synchronized void release(long n) {
            permissions += n;
            if (permissions > 0) {
                unBlock();
            }
        }

        public synchronized void aquire(long delta) {
            permissions -= delta;
            if (permissions <= 0) { 
                //    
                //        ,
                //       
                block();
            }
        }
    }


InPort - minimum buffer for one incoming message:



    public class InPort<T> extends Port implements OutMessagePort<T> {
        private T item;

        @Override
        public void onNext(T item) {
            this.item = item;
            unBlock();
        }

        public synchronized T poll() {
            T res = item;
            item = null;
            return res;
        }
    }


The full version of the class AbstractActorcan be viewed here.



At the next level of the hierarchy, we have three abstract actors with specific ports, but with undefined processing routines:



  • a class AbstractProduceris an actor with one port of the asynchronous semaphore type (and an internal control port, present in all actors by default).
  • the class AbstractTransformeris a regular Hewitt actor, with a reference to the input port of the next actor in the chain, where it sends the converted tokens.
  • the class AbstractConsumeris also an ordinary actor, but it does not send the converted tokens anywhere, while it has a link to the producer semaphore, and opens this semaphore after absorbing the input token. In this way, the number of tokens in process is kept constant and no buffer overflow occurs.


At the last level, already in the test directory, specific actors used in tests are defined :



  • the class ProducerActorgenerates a finite stream of integers.
  • the class TransformerActortakes the next number from the stream and sends it down the chain.
  • class ConsumerActor- accepts and prints the resulting numbers


Now we can build a chain of asynchronous, parallel working handlers as follows: producer - any number of transformers - consumer







Thus, we have implemented a backpressure, and even in a more general form than in the reactive streams specification - the feedback can span an arbitrary number of processing cascades, and not only adjacent ones, as in the specification.



To implement the specification, you need to define an output port that is sensitive to the number of permissions passed to it using the request () method - this will be Publisher, and supplement the existing one with a InPortcall to this method - this will be Subscriber. That is, we assume that interfaces PublisherandSubscriberdescribe the behavior of ports, not actors. But judging by the fact that in the list of interfaces there is also Processor, which can in no way be a port interface, the authors of the specification consider their interfaces to be actor interfaces. Well, we can make actors that implement all these interfaces by delegating the execution of interface functions to the corresponding ports.



For simplicity, let ours Publisherdo not have its own buffer and will write directly to the buffer Subscriber. To do this, you need someone to Subscribersubscribe and fulfill request(), that is, we have 2 conditions and, accordingly, we need 2 ports - InPort<Subscriber>and AsyncSemaPort. None of them are suitable as a base for implementationPublisher'a, since it contains unnecessary methods, so we will make these ports internal variables:



public class ReactiveOutPort<T> implements Publisher<T>, Subscription, OutMessagePort<T> {
    protected AbstractActor.InPort<Subscriber<? super T>> subscriber;
    protected AbstractActor.AsyncSemaPort sema;

    public ReactiveOutPort(AbstractActor actor) {
        subscriber = actor.new InPort<>();
        sema = actor.new AsyncSemaPort();
    }
}


This time, we ReactiveOutPortdid not define the class as nested, so it needed a constructor parameter - a reference to the enclosing actor to instantiate the ports defined as nested classes.



The method subscribe(Subscriber subscriber)boils down to saving the subscriber and calling subscriber.onSubscribe():



    public synchronized void subscribe(Subscriber<? super T> subscriber) {
        if (subscriber == null) {
            throw new NullPointerException();
        }
        if (this.subscriber.isFull()) {
            subscriber.onError(new IllegalStateException());
            return;
        }
        this.subscriber.onNext(subscriber);
        subscriber.onSubscribe(this);
    }


which usually results in a call Publisher.request()that boils down to raising the semaphore with a call AsyncSemaPort.release():



    public synchronized void request(long n) {
        if (subscriber.isEmpty()) {
            return; // this spec requirement
        }
        if (n <= 0) {
            subscriber.current().onError(new IllegalArgumentException());
            return;
        }
        sema.release(n);
    }


And now it remains for us not to forget to lower the semaphore using a call AsyncSemaPort.aquire()at the time of resource use:



    public synchronized void onNext(T item) {
        Subscriber<? super T> subscriber = this.subscriber.current();
        if (subscriber == null) {
            throw  new IllegalStateException();
        }
        sema.aquire();
        subscriber.onNext(item);
    }


The AsyncSemaphore project was specially designed for this article. It is intentionally made as compact as possible so as not to tire the reader. As a result, it contains significant limitations:



  • Publisher' Subscriber'
  • Subscriber' 1


In addition, AsyncSemaPortit is not a complete analogue of a synchronous semaphore - only one client can perform the operation aquire()y AsyncSemaPort(meaning the enclosing actor). But this is not a disadvantage - AsyncSemaPortit fulfills its role well. In principle, you can do it differently - take java.util.concurrent.Semaphoreand supplement it with an asynchronous subscription interface (see AsyncSemaphore.java from the DF4J project ). Such a semaphore can bind actors and threads of execution in any order.



In general, each type of synchronous (blocking) interaction has its own asynchronous (non-blocking) counterpart. So, in the same DF4J project there is an implementationBlockingQueue, supplemented by an asynchronous interface. This opens up the possibility of a step-by-step transformation of a multithreaded program into an asynchronous one, partly replacing threads with actors.



All Articles