Treating Java Reactor with Kotlin Coroutines

At the current job, we write in Reactor. The technology is cool, but as always there are many BUTs. Some things are annoying, the code is harder to write and read, and ThreadLocal is a real disaster. I decided to see what problems will go away if you switch to Kotlin Coroutines, and what problems, on the contrary, will be added.





Patient card

I wrote a small project for the article , reproducing the problems I encountered at work. The main code is here . The algorithm deliberately did not break it down into separate methods, so the problems are better seen.





In a nutshell about the algorithm: 





We transfer money from one account to another, recording transactions about the fact of the transfer. 





The translation is idempotent, so if the transaction is already in the database, then we reply to the client that everything is fine. When inserting a transaction, a DataIntegrityViolationException may be thrown, this also means that the transaction already exists.





In order not to go into the negative, there is a check in the + Optimistic lock code, which does not allow competitive updating of accounts. To make it work, you need retry and additional error handling.





For those who don't like the algorithm itself

The algorithm for the project was chosen to reproduce the problems, not to be efficient and architecturally correct. Instead of one transaction, you need to insert semiconductors, an optimistic lock is not needed at all (instead of checking the positiveness of the account in sql), select + insert should be replaced with upsert.





Patient complaints

  1. Stacktrace .





  2. , . 





  3. - flatMap.





  4. .





  5. Mono.empty().





  6. , - , traceId. ( , ThreadLocal , SpringSecurity)





  7. .





  8. api .





PR Java Kotlin. 









com.fasterxml.jackson.module:jackson-module-kotlin data org.jetbrains.kotlin.plugin.spring open .





suspend fun transfer(@RequestBody request: TransferRequest)



public Mono<Void> transfer(@RequestBody TransferRequest request)







suspend fun save(account: Account): Account



Mono<Account> save(Account account);



, , suspend , , Reactor .





runBlocking { … }



, suspend .





Retry kotlin-retry. , , ( PR).





, , . -.





:





public Mono<Void> transfer(String transactionKey, long fromAccountId,
                           long toAccountId, BigDecimal amount) {
  return transactionRepository.findByUniqueKey(transactionKey)
    .map(Optional::of)
    .defaultIfEmpty(Optional.empty())
    .flatMap(withMDC(foundTransaction -> {
      if (foundTransaction.isPresent()) {
        log.warn("retry of transaction " + transactionKey);
        return Mono.empty();
      }
      return accountRepository.findById(fromAccountId)
        .switchIfEmpty(Mono.error(new AccountNotFound()))
        .flatMap(fromAccount -> accountRepository.findById(toAccountId)
          .switchIfEmpty(Mono.error(new AccountNotFound()))
          .flatMap(toAccount -> {
            var transactionToInsert = Transaction.builder()
              .amount(amount)
              .fromAccountId(fromAccountId)
              .toAccountId(toAccountId)
              .uniqueKey(transactionKey)
              .build();
            var amountAfter = fromAccount.getAmount().subtract(amount);
            if (amountAfter.compareTo(BigDecimal.ZERO) < 0) {
              return Mono.error(new NotEnoghtMoney());
            }
            return transactionalOperator.transactional(
              transactionRepository.save(transactionToInsert)
                .onErrorResume(error -> {
                  //transaction was inserted on parallel transaction,
                  //we may return success response
                  if (error instanceof DataIntegrityViolationException
             && error.getMessage().contains("TRANSACTION_UNIQUE_KEY")) {
                    return Mono.empty();
                  } else {
                    return Mono.error(error);
                  }
                })
                .then(accountRepository.transferAmount(
                  fromAccount.getId(), fromAccount.getVersion(), 
                  amount.negate()
                ))
                .then(accountRepository.transferAmount(
                  toAccount.getId(), toAccount.getVersion(), amount
                ))
            );
          }));
    }))
    .retryWhen(Retry.backoff(3, Duration.ofMillis(1))
      .filter(OptimisticLockException.class::isInstance)
      .onRetryExhaustedThrow((__, retrySignal) -> retrySignal.failure())
    )
    .onErrorMap(
      OptimisticLockException.class,
      e -> new ResponseStatusException(
        BANDWIDTH_LIMIT_EXCEEDED,
        "limit of OptimisticLockException exceeded", e
      )
    )
    .onErrorResume(withMDC(e -> {
      log.error("error on transfer", e);
      return Mono.error(e);
    }));
}
      
      



:





suspend fun transfer(transactionKey: String, fromAccountId: Long,
                     toAccountId: Long, amount: BigDecimal) {
  try {
    try {
      retry(limitAttempts(3) + filter { it is OptimisticLockException }) {
        val foundTransaction = transactionRepository
          .findByUniqueKey(transactionKey)
        if (foundTransaction != null) {
          logger.warn("retry of transaction $transactionKey")
          return@retry
        }

        val fromAccount = accountRepository.findById(fromAccountId)
          ?: throw AccountNotFound()
        val toAccount = accountRepository.findById(toAccountId)
          ?: throw AccountNotFound()

        if (fromAccount.amount - amount < BigDecimal.ZERO) {
          throw NotEnoghtMoney()
        }
        val transactionToInsert = Transaction(
          amount = amount,
          fromAccountId = fromAccountId,
          toAccountId = toAccountId,
          uniqueKey = transactionKey
        )
        transactionalOperator.executeAndAwait {
          try {
            transactionRepository.save(transactionToInsert)
          } catch (e: DataIntegrityViolationException) {
            if (e.message?.contains("TRANSACTION_UNIQUE_KEY") != true) {
              throw e;
            }
          }

          accountRepository.transferAmount(
            fromAccount.id!!, fromAccount.version, amount.negate()
          )
          accountRepository.transferAmount(
            toAccount.id!!, toAccount.version, amount
          )
        }
      }
    } catch (e: OptimisticLockException) {
      throw ResponseStatusException(
        BANDWIDTH_LIMIT_EXCEEDED, 
        "limit of OptimisticLockException exceeded", e
      )
    }
  } catch (e: Exception) {
    logger.error(e) { "error on transfer" }
    throw e;
  }
}
      
      



Stacktraces

, . 





:





o.s.w.s.ResponseStatusException: 509 BANDWIDTH_LIMIT_EXCEEDED "limit of OptimisticLockException exceeded"; nested exception is c.g.c.v.r.OptimisticLockException
	at c.g.c.v.r.services.Ledger.lambda$transfer$5(Ledger.java:75)
	...
Caused by: c.g.c.v.r.OptimisticLockException: null
	at c.g.c.v.r.repos.AccountRepositoryImpl.lambda$transferAmount$0(AccountRepositoryImpl.java:27)
	at r.c.p.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125)
  ...
      
      



:





error on transfer o.s.w.s.ResponseStatusException: 509 BANDWIDTH_LIMIT_EXCEEDED "limit of OptimisticLockException exceeded"; nested exception is c.g.c.v.r.OptimisticLockException
	at c.g.c.v.r.services.Ledger.transfer$suspendImpl(Ledger.kt:70)
	at c.g.c.v.r.services.Ledger$transfer$1.invokeSuspend(Ledger.kt)
	...
Caused by: c.g.c.v.r.OptimisticLockException: null
	at c.g.c.v.r.repos.AccountRepositoryImpl.transferAmount(AccountRepositoryImpl.kt:24)
	...
	at c.g.c.v.r.services.Ledger$transfer$3$1.invokeSuspend(Ledger.kt:65)
	at c.g.c.v.r.services.Ledger$transfer$3$1.invoke(Ledger.kt)
	at o.s.t.r.TransactionalOperatorExtensionsKt$executeAndAwait$2$1.invokeSuspend(TransactionalOperatorExtensions.kt:30)
	(Coroutine boundary)
	at o.s.t.r.TransactionalOperatorExtensionsKt.executeAndAwait(TransactionalOperatorExtensions.kt:31)
	at c.g.c.v.r.services.Ledger$transfer$3.invokeSuspend(Ledger.kt:56)
	at com.github.michaelbull.retry.RetryKt$retry$3.invokeSuspend(Retry.kt:38)
	at c.g.c.v.r.services.Ledger.transfer$suspendImpl(Ledger.kt:35)
	at c.g.c.v.r.controllers.LedgerController$transfer$2$1.invokeSuspend(LedgerController.kt:20)
	at c.g.c.v.r.controllers.LedgerController$transfer$2.invokeSuspend(LedgerController.kt:19)
	at kotlin.reflect.full.KCallables.callSuspend(KCallables.kt:55)
	at o.s.c.CoroutinesUtils$invokeSuspendingFunction$mono$1.invokeSuspend(CoroutinesUtils.kt:64)
	(Coroutine creation stacktrace)
	at k.c.i.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:122)
	at k.c.i.CancellableKt.startCoroutineCancellable(Cancellable.kt:30)
	...
Caused by: c.g.c.v.r.OptimisticLockException: null
	at c.g.c.v.r.repos.AccountRepositoryImpl.transferAmount(AccountRepositoryImpl.kt:24)
	...
	at c.g.c.v.r.services.Ledger$transfer$3$1.invokeSuspend(Ledger.kt:65)
	at c.g.c.v.r.services.Ledger$transfer$3$1.invoke(Ledger.kt)
	at o.s.t.r.TransactionalOperatorExtensionsKt$executeAndAwait$2$1.invokeSuspend(TransactionalOperatorExtensions.kt:30)
	...

      
      



, ( , ).





Java . , . . . Kotlin .





, - . ? . , - traceId (thread name ) .





Kotlin , , . ( : ).





flatMap. - try catch, .





:





return accountRepository.findById(fromAccountId)
  .switchIfEmpty(Mono.error(new AccountNotFound()))
  .flatMap(fromAccount -> accountRepository.findById(toAccountId)
    .switchIfEmpty(Mono.error(new AccountNotFound()))
    .flatMap(toAccount -> {
      ...
    })
      
      



:





val fromAccount = accountRepository.findById(fromAccountId)
  ?: throw AccountNotFound()
val toAccount = accountRepository.findById(toAccountId)
  ?: throw AccountNotFound()
...
      
      



try catch, .





:





return transactionRepository.findByUniqueKey(transactionKey)
  ...
  .onErrorMap(
    OptimisticLockException.class,
    e -> new ResponseStatusException(
      BANDWIDTH_LIMIT_EXCEEDED, 
      "limit of OptimisticLockException exceeded", e
    )
  )
      
      



:





try {
  val foundTransaction = transactionRepository
    .findByUniqueKey(transactionKey)
  ...
} catch (e: OptimisticLockException) {
  throw ResponseStatusException(
    BANDWIDTH_LIMIT_EXCEEDED, 
    "limit of OptimisticLockException exceeded", e
  )
}
      
      



throw, . Reactor :





.flatMap(foo -> {
  if (foo.isEmpty()) { 
    return Mono.error(new IllegalStateException());
  } else {
    return Mono.just(foo);
  }
})
      
      



, , . - .





Mono.empty()

. null . ¨C5C. 





Ide , mono . . , - .





Kotlin not null , , . nullable - .





:





:





return transactionRepository.findByUniqueKey(transactionKey)
  .map(Optional::of)
  .defaultIfEmpty(Optional.empty())
  .flatMap(foundTransaction -> {
    if (foundTransaction.isPresent()) {
      log.warn("retry of transaction " + transactionKey);
      return Mono.empty();
    }
...
      
      



:





val foundTransaction = transactionRepository
  .findByUniqueKey(transactionKey)
if (foundTransaction != null) {
  logger.warn("retry of transaction $transactionKey")
  return@retry
}
...
      
      



, - Reactor, .





, traceId . ThreadLocal , MDC ( ). ?





. Reactor Coroutines immutable, MDC ( ).





Java , traceId :





@Component
public class TraceIdFilter implements WebFilter {
  @Override
  public Mono<Void> filter(
    ServerWebExchange exchange, WebFilterChain chain
  ) {
    var traceId = Optional.ofNullable(
      exchange.getRequest().getHeaders().get("X-B3-TRACEID")
    )
      .orElse(Collections.emptyList())
      .stream().findAny().orElse(UUID.randomUUID().toString());
    return chain.filter(exchange)
      .contextWrite(context ->
        LoggerHelper.addEntryToMDCContext(context, "traceId", traceId)
      );
  }
}
      
      



, - , traceId MDC:





public static <T, R> Function<T, Mono<R>> withMDC(
  Function<T, Mono<R>> block
) {
  return value -> Mono.deferContextual(context -> {
    Optional<Map<String, String>> mdcContext = context
      .getOrEmpty(MDC_ID_KEY);
    if (mdcContext.isPresent()) {
      try {
        MDC.setContextMap(mdcContext.get());
        return block.apply(value);
      } finally {
        MDC.clear();
      }
    } else {
      return block.apply(value);
    }
  });
}
      
      



, Mono. .. , Mono. :





.onErrorResume(withMDC(e -> {
  log.error("error on transfer", e);
  return Mono.error(e);
}))
      
      



Kotlin . , traceId MDC:





@Component
class TraceIdFilter : WebFilter {
  override fun filter(
    exchange: ServerWebExchange, chain: WebFilterChain
  ): Mono<Void> {
    val traceId = exchange.request.headers["X-B3-TRACEID"]?.first() 
    MDC.put("traceId", traceId ?: UUID.randomUUID().toString())
    return chain.filter(exchange)
  }
}
      
      



withContext(MDCContext()) { … }







, MDC traceId. .





Java Reactor , : , , breakpoints ...





: stepOver, , ( ). 





, suspend . issue. , , Java Reactor evaluate , .





, , .





:





return Mono.zip(
  transactionRepository.findByUniqueKey(transactionKey)
    .map(Optional::of)
    .defaultIfEmpty(Optional.empty()),
  accountRepository.findById(fromAccountId)
    .switchIfEmpty(Mono.error(new AccountNotFound())),
  accountRepository.findById(toAccountId)
    .switchIfEmpty(Mono.error(new AccountNotFound())),
).flatMap(withMDC(fetched -> {
  var foundTransaction = fetched.getT1();
  var fromAccount = fetched.getT2();
  var toAccount = fetched.getT3();
  if (foundTransaction.isPresent()) {
    log.warn("retry of transaction " + transactionKey);
    return Mono.empty();
  }
  ...
}
      
      



:





coroutineScope {
  val foundTransactionAsync = async {
    logger.info("async fetch of transaction $transactionKey")
    transactionRepository.findByUniqueKey(transactionKey)
  }
  val fromAccountAsync = async { 
    accountRepository.findById(fromAccountId) 
  }
  val toAccountAsync = async { 
    accountRepository.findById(toAccountId) 
  }

  if (foundTransactionAsync.await() != null) {
    logger.warn("retry of transaction $transactionKey")
    return@retry
  }

  val fromAccount = fromAccountAsync.await() ?: throw AccountNotFound()
  val toAccount = toAccountAsync.await() ?: throw AccountNotFound()
  ...
}
      
      



Kotlin “ ”, “ ” Reactor.





, -. Reactor , . - foundTransactionAsync.await(). , transactionRepository.findByUniqueKey() , , accountRepository.findById() ( ).





. , Reactor :





coroutineScope {
  val foundTransactionAsync = async {
    logger.info("async fetch of transaction $transactionKey")
    transactionRepository.findByUniqueKey(transactionKey)
  }
  val fromAccountAsync = async {
    accountRepository.findById(fromAccountId)
  }
  val toAccountAsync = async {
    accountRepository.findById(toAccountId)
  }

  if (foundTransactionAsync.await() != null) {
    logger.warn("retry of transaction $transactionKey")
    return@retry
  }

  val transactionToInsert = Transaction(
    amount = amount,
    fromAccountId = fromAccountId,
    toAccountId = toAccountId,
    uniqueKey = transactionKey
  )
  transactionalOperator.executeAndAwait {
    try {
      transactionRepository.save(transactionToInsert)
    } catch (e: DataIntegrityViolationException) {
      if (e.message?.contains("TRANSACTION_UNIQUE_KEY") != true) {
        throw e;
      }
    }
    val fromAccount = fromAccountAsync.await() ?: throw AccountNotFound()
    val toAccount = toAccountAsync.await() ?: throw AccountNotFound()
    if (fromAccount.amount - amount < BigDecimal.ZERO) {
      throw NotEnoghtMoney()
    }

    accountRepository.transferAmount(
      fromAccount.id!!, fromAccount.version, amount.negate()
    )
    accountRepository.transferAmount(
      toAccount.id!!, toAccount.version, amount
    )
  }
}
      
      



. .. , . , , ( ).





, , .





context scope

, :





  1. scope. , , .





  2. context. .





Spring , :





@PutMapping("/transfer")
suspend fun transfer(@RequestBody request: TransferRequest) {
  coroutineScope {
    withContext(MDCContext()) {
      ledger.transfer(request.transactionKey, request.fromAccountId, 
                      request.toAccountId, request.amount)
    }
  }
}
      
      



, regexp , . - .





AOP suspend

, , . aspect suspend .





I eventually managed to write that aspect. But to explain how this works, you need a separate article.





I hope there will be a more adequate way to write aspects (I will try to contribute to this).





Treatment evaluation

All problems disappeared. A couple of new ones were added, but it is tolerable.





I must say that coroutines are developing rapidly and I only expect better work with them.





It can be seen that the JetBrains team was attentive to the problems of the developers. As far as I know, about a year ago there were still problems with debugging and stact-racing, for example.





Most importantly, with coroutines, you don't need to keep in mind all the features of Reactor and its mighty API. You just write the code.








All Articles