Overclocking REACTOR

Who will be interested?

The reactor today is stylish, fashionable, youthful. Why do so many of us practice reactive programming? Few can answer this question unequivocally. Good - if you understand your gain, bad - if the reactor is imposed by the organization as a given. Most of the "FOR" arguments are the use of a microservice architecture, which in turn obliges microservices to communicate often and a lot with each other. For communication, in most cases, HTTP interaction is chosen. HTTP needs a lightweight web server, what comes to mind first? Tomcat. Here there are problems with the limit on the maximum number of sessions, after exceeding which the web server begins to reject requests (although this limit is not so easy to achieve). Here the reactor comes to the rescue, which is not limited by such limits, and, for example,Netty as a web server that works with reactivity out of the box. Since there is a reactive web server, you need a reactive web client (Spring WebClient or Reactive Feign), and since the client is reactive, then all this horror seeps into business logic, Mono and Flux become your best friends (although at first there is only hate :) )





Among business tasks, very often there are serious procedures that process large amounts of data, and we have to use a reactor for them as well. Here surprises begin, if you don't know how to cook the reactor, you can get a lot of problems. Exceeding the limit of file descriptors on the server, OutOfMemory due to the uncontrolled speed of non-blocking code, and much more, which we will talk about today. My colleagues and I have experienced a lot of difficulties due to problems with understanding how to keep the reactor under control, but everything that does not kill us makes us smarter!





Blocking and non-blocking code

You won't understand anything further if you don't understand the difference between blocking and non-blocking code. Therefore, let's stop and carefully understand what the difference is. You already know, the blocking code for the reactor is the enemy, the non-blocking code is bro. The only problem is that at the moment, not all interactions have non-blocking counterparts.





The leader here is HTTP interaction, there are a lot of options, choose any. I prefer Playtika's Reactive Feign, in combination with Spring Boot + WebFlux + Eureka we get a very good build for microservice architecture.





-: , , reactive, - :) Hibernate + PostgreSQL - , JavaMail - , IBMMQ - . , , MongoDB - . , , , (Thread.sleep() / Socket.read() ), - . ? , . 2 :





  • . BlockHound ( )





  • , , : Schedulers.boundedElastic()



    . publishOn



    & subscribeOn







, , !





1





    @Test
    fun testLevel1() {
        val result = Mono.just("")
            .map { "123" }
            .block()

        assertEquals("123", result)
    }
      
      



, reactor . ? Mono.just



:) map



"123" block



subscribe



.





block



, , , . block



RestController



, .





2





    fun nonBlockingMethod1sec(data: String) 
    = data.toMono().delayElement(Duration.ofMillis(1000))

    @Test
    fun testLevel2() {
        val result = nonBlockingMethod1sec("Hello world")
            .flatMap { nonBlockingMethod1sec(it) }
            .block()

        assertEquals("Hello world", result)
    }
      
      



, nonBlockingMethod1sec



, - . - , , .





3





    fun collectTasks() = (0..99)

    @Test
    fun testLevel3() {
        val result = nonBlockingMethod1sec("Hello world")
            .flatMap { businessContext ->
                collectTasks()
                    .toFlux()
                    .map {
                        businessContext + it
                    }
                    .collectList()
            }
            .block()!!

        assertEquals(collectTasks().toList().size, result.size)
    }
      
      



- Flux



! collectTasks



, , Flux



- . map. collectList



.





, . " ", .





4





    fun collectTasks() = (0..100)
    
    @Test
    fun testLevel4() {
        val result = nonBlockingMethod1sec("Hello world")
            .flatMap { businessContext ->
                collectTasks().toFlux()
                    .flatMap {
                        Mono.deferContextual { reactiveContext ->
                            val hash = businessContext + it + reactiveContext["requestId"]
                            hash.toMono()
                        }
                    }.collectList()
            }
            .contextWrite { it.put("requestId", UUID.randomUUID().toString()) }
            .block()!!

        assertEquals(collectTasks().toList().size, result.size)
    }
      
      



. (15)



, (10)



. .





5





    fun collectTasks() = (0..1000)
    
    fun doSomethingNonBlocking(data: String)
        = data.toMono().delayElement(Duration.ofMillis(1000))
    
    fun doSomethingBlocking(data: String): String {
        Thread.sleep(1000); return data
    }

    val pool = Schedulers.newBoundedElastic(10, Int.MAX_VALUE, "test-pool")
    private val logger = getLogger()

    @Test
    fun testLevel5() {
        val counter = AtomicInteger(0)
        val result = nonBlockingMethod1sec("Hello world")
            .flatMap { _ ->
                collectTasks().toFlux()
                    .parallel()
                    .runOn(pool)
                    .flatMap {
                        Mono.deferContextual { _ ->
                            doSomethingNonBlocking(it.toString())
                                .doOnRequest { logger.info("Added task in pool ${counter.incrementAndGet()}") }
                                .doOnNext { logger.info("Non blocking code finished ${counter.get()}") }
                                .map { doSomethingBlocking(it) }
                                .doOnNext { logger.info("Removed task from pool ${counter.decrementAndGet()}") }
                        }
                    }.sequential()
                    .collectList()
            }
            .block()!!

        assertEquals(collectTasks().toList().size, result.size)
    }
      
      



! , . : doSomethingNonBlocking



(3)



& doSomethingBlocking



(6)



- , . (10)



, (15)



. parallel



(19)



sequential



(29)



. (20)



. , , doOnRequest



( ), doOnNext



( ). - , .





"", , . - , , . , , . , Flux .





. . , ? 100 , 1 , 1 , 10 ? ( senior reactor developer :))





12 . :) , 100 10 , 10 . , . " " .





(26) .map { doSomethingBlocking(it) }



. , , ?





2 ! 1 " " 1 . 100 . 10 ? ? .





collectTasks()



... 1000? 15000? ?





2 ! 1 " " 1 . . . ?





?

? ? ? 30000 , , , , ( web-client feign, ?) , , SSH . , , " ".





. Thread Pool & Reactor

- , - X , X , - . ? :) .





thread pool - . - , .





reactor! ?





, , . ? epoll , . . , , . , " ?", , . . , - , 500 -, . ! , , Schedulers.boundedElastic()



.





"", ?





!

, , , , , , 4-8 production 32 .





parallel



parallelism







parallelism



, rails ( , , ). Prefetch .





parallelism , .





flatMap



( Flux) , maxConcurrency







maxConcurrency



, Integer.MAX_VALUE



( . ?





, , ( http ), ! .





.





:





  • parallel (parallelism)





  • flatMap (maxConcurrency)









, .





- * Integer.MAX_VALUE *







, 5 5 . !





        val result = nonBlockingMethod1sec("Hello world")
            .flatMap { _ ->
                collectTasks().toFlux()
                    .parallel(1)
                    .runOn(pool, 1)
                    .flatMap({
                        Mono.deferContextual { _ ->
                            doSomethingNonBlocking(it.toString())
                        }
                    }, false, 1, 1)
                    .sequential()
                    .collectList()
            }
            .block()!!
      
      



, ?





Thread Pool

? . - , , ! ? , :)





, Schedulers.parallel() ? =) ( parallel, ) , , , .





. , , , . , , production . .





, round-robin, .





Well loaded reactor (tasks are evenly distributed).  54 blocking tasks (1 sec each), round-robin distribution on 6 rails
( ). 54 ( 1),
 round-robin 6

production , , , .





Poorly loaded pool (tasks are not evenly distributed) 54 blocking tasks (each for 1 second except for 2 seconds), round-robin distribution on 6 rails
( ) 54 ( 1 2),
 round-robin 6

collectList()



, , 1 . , , .









  • concatMap



    flatMap



    ( , )





  • , ( )





  • , ( )





  • prefetch



    ( !)





prefetch



flatMap



& runOn



, , , . - 256. 1, "work stealing", , , , .





Well-loaded pool (tasks are evenly distributed) 54 blocking tasks (each for 1 second except for 2 seconds), round-robin distribution on 6 Prefetch rails!
( ) 54 ( 1 2),
 round-robin 6 Prefetch !

That's all for me. It will be interesting to read your remarks and comments, I do not pretend to be 100% true, but all the results are supported by practical examples, on Spring Boot + Project Reactor 3.4. Thanks to all!








All Articles