DelayedQueue queue

A couple of years ago, in one of our projects, we faced the need to postpone the execution of an action for a certain period of time. For example, find out the status of the payment in three hours or resend the notification after 45 minutes. However, at that time we did not find suitable libraries that could "postpone" and did not require additional time for configuration and operation. We analyzed the possible options and wrote our own little delayed queue library in Java using Redis as a repository. In this article I will talk about the capabilities of the library, its alternatives and those "rakes" that we came across in the process.



Functionality



So what does the delayed queue do? An event added to the pending queue is delivered to the handler at the specified time interval. If the processing fails, the event will be delivered again later. Moreover, the maximum number of attempts is limited. Redis does not guarantee safety, and you need to be prepared for the loss of events . However, in the cluster version, Redis shows a fairly high reliability, and we have never encountered this in a year and a half of operation.



API



Add an event to the queue



eventService.enqueueWithDelayNonBlocking(new DummyEvent("1"), Duration.ofHours(1)).subscribe();


Note that the method returns Mono, so to run you need to do one of the following:



  • subscribe(...)
  • block()


More detailed explanations are provided in the documentation for Project Reactor. The context is added to the event like this:



eventService.enqueueWithDelayNonBlocking(new DummyEvent("1"), Duration.ofHours(1), Map.of("key", "value")).subscribe();


Register event handler



eventService.addHandler(DummyEvent.class, e -> Mono.just(true), 1);


, :



eventService.addHandler(
        DummyEvent.class,
        e -> Mono
            .subscriberContext()
            .doOnNext(ctx -> {
                Map<String, String> eventContext = ctx.get("eventContext");
                log.info("context key {}", eventContext.get("key"));
            })
            .thenReturn(true),
        1
);




eventService.removeHandler(DummyEvent.class);




"-":



import static com.github.fred84.queue.DelayedEventService.delayedEventService;

var eventService = delayedEventService().client(redisClient).build();


:



import static com.github.fred84.queue.DelayedEventService.delayedEventService;

var eventService = delayedEventService()
        .client(redisClient)
        .mapper(objectMapper)
        .handlerScheduler(Schedulers.fromExecutorService(executor))
        .schedulingInterval(Duration.ofSeconds(1))
        .schedulingBatchSize(SCHEDULING_BATCH_SIZE)
        .enableScheduling(false)
        .pollingTimeout(POLLING_TIMEOUT)
        .eventContextHandler(new DefaultEventContextHandler())
        .dataSetPrefix("")
        .retryAttempts(10)
        .metrics(new NoopMetrics())
        .refreshSubscriptionsInterval(Duration.ofMinutes(5))
        .build();


( Redis) eventService.close() , @javax.annotation.PreDestroy.





- , . :



  • , Redis;
  • , ( "delayed.queue.ready.for.handling.count" )




, delayed queue. 2018

Amazon Web Services.

, . : " , Amazon-, ".





:





- , JMS . SQS , 15 .





" " . , Redis :





, Netflix dyno-queues

. , , .



, " " sorted set list, ( ):



var events = redis.zrangebyscore("delayed_events", Range.create(-1, System.currentTimeMillis()), 100);
events.forEach(key -> {
  var payload = extractPayload(key);
  var listName = extractType(key);
  redis.lpush(listName, payload);
  redis.zrem("delayed_events", key);
});


Spring Integration, :



redis.brpop(listName)


.





"list" (, ), list . Redis , 2 .



events.forEach(key -> {
  ...
  redis.multi();
  redis.zrem("delayed_events", key);
  redis.lpush(listName, payload);
  redis.exec();
});




list-a . , . "sorted_set" .



events.forEach(key -> {
  ...
  redis.multi();
  redis.zadd("delayed_events", nextAttempt(key))
  redis.zrem("delayed_events", key);
  redis.lpush(listName, payload);
  redis.exec();
});




, , " " "delayed queue" . "sorted set"

metadata;payload, payload , metadata - . . , metadata payload Redis hset "sorted set" .



var envelope = metadata + SEPARATOR + payload;
redis.zadd(envelope, scheduledAt);




var envelope = metadata + SEPARATOR + payload;
var key = eventType + SEPARATOR + eventId;

redis.multi();
redis.zadd(key, scheduledAt);
redis.hset("metadata", key, envelope)
redis.exec();




, . , list . TTL :



redis.set(lockKey, "value", ex(lockTimeout.toMillis() * 1000).nx());




Spring, . " " :





Lettuce , . Project Reactor , " ".

, Subscriber



redis
  .reactive()
  .brpop(timeout, queue)
  .map(e -> deserialize(e))
  .subscribe(new InnerSubscriber<>(handler, ... params ..))




class InnerSubscriber<T extends Event> extends BaseSubscriber<EventEnvelope<T>> {

    @Override
    protected void hookOnNext(@NotNull EventEnvelope<T> envelope) {
        Mono<Boolean> promise = handler.apply(envelope.getPayload());
        promise.subscribe(r -> request(1));
    }
}


, ( Netflix dyno queue, poll- ).



?



  • Kotlin DSL. Kotlin suspend fun API Project Reactor


Links






All Articles