Using Azure Service Bus from Java

Hello colleagues! It so happened that our application is written in the java stack, but is hosted in Azure. And we are trying to make the most of the cloud provider's management services.



One of them is Azure Service Bus, and today I want to talk about the features of using it in a regular Spring Boot application.



If you want to read about the rake features - scroll to the end of the article



What is Azure Service Bus



A few words about Azure Service Bus is a cloud message broker (cloud replacement for RabbitMQ, ActiveMQ). Supports queues (the message is delivered to one recipient) and topics (the publish / subscribe mechanism) - in more detail here



Support is declared:



  1. Ordered messages - the documentation says that this is a FIFO, BUT it is implemented using the concept of message sessions - a group of messages, not the entire queue. If you need to guarantee the order of messages, then you combine messages into a group, and now messages in the group will be delivered as a FIFO. So Azure Service Bus Queue is not a FIFO - it delivers your messages as randomly as it suits
  2. Dead-letter queue - everything is simple here, they could not successfully deliver the message after N attempts or a period of time - moved to DLQ
  3. Scheduled delivery - you can set a delay before delivery
  4. Message deferral - hides messages in the queue, the message will not be delivered automatically, but it can be retrieved by ID. We need to store this ID somewhere


How to integrate with Azure Service Bus



Azure Service Bus supports AMQP 1.0, which means it is not compatible with RabbitMQ clients. bunny uses AMQP 0.9.1



The only "standard" client that can work with the Service Bus is Apache Qpid .



There are 3 ways to pair your Spring Boot application with Service Bus:



  1. JMS + QPID — , — QPID — .

    timeout producer — — factory.setCacheProducers(false);
  2. Spring Cloud — Azure Service Bus — , . Service Bus

    ( 1.2.6) — , azure service bus java sdk.



    Spring Integration — , «Scheduled delivery» «Message deferral» .



    sdk, MessageAndSessionPump

  3. azure service bus java sdk — ,


Spring Cloud — Azure Service Bus



I will dwell on this method in more detail and tell you about the features of using the

example application is in the official repository, so there is no point in duplicating the code - the repository with an example is here .



Because it's Spring Integration Messaging it all comes down to Channel, MessageHandler, MessagingGateway, ServiceActivator.



And then there's the ServiceBusQueueTemplate .



Sending messages



We must have a Channel in which we write the message we want to send, on the other end there is a MessageHandler that sends it to the Service Bus.



The MessagHandler is com.microsoft.azure.spring.integration.core.DefaultMessageHandler - this is the connector to the external service.



How to bind it to a channel? - add the annotation - @ServiceActivator (inputChannel = OUTPUT_CHANNEL) and now our MessagHandler is listening to the OUTPUT_CHANNEL channel .



Next, we need to somehow write our message to the channel - here again the magic of the spring - we announce MessagingGateway and bind it to the channel by name.



A snippet from the example :



@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface QueueOutboundGateway {
    void send(String text);
}


That's all: Gateway -> Channel -> MessagHandler -> ServiceBusQueueTemplate -> ServiceBusMessageConverter .



In the code, it remains to inject our gateway and call the send method .



I mentioned ServiceBusMessageConverter in the call chain for a reason - if you want to add custom headers (for example CORRELATION_ID) to the message, this is the place where they need to be moved from org.springframework.messaging.MessageHeaders to the azure message.

The special method setCustomHeaders .



In this case, your gateway will look something like this:



@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface QueueOutboundGateway {
    void send(@Payload String text, @Header("CORRELATION_ID") String correlationId);
}


Receiving messages



Okay, we know how to send messages, how to get them now?



Here everything is the same - MessageProducer -> Channel -> Handler



The MessageProducer is com.microsoft.azure.spring.integration.servicebus.inbound.ServiceBusQueueInboundChannelAdapter - this is our connector to an external service. Inside, the same ServiceBusQueueTemplate with ServiceBusMessageConverter where you can read custom headers and put them in the spring integration message.



The channel is already installed in it by hand:



@Bean
public ServiceBusQueueInboundChannelAdapter queueMessageChannelAdapter(
        @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, 
        ServiceBusQueueOperation queueOperation) {
    queueOperation.setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.MANUAL).build());
    ServiceBusQueueInboundChannelAdapter adapter = new ServiceBusQueueInboundChannelAdapter(QUEUE_NAME,
            queueOperation);
    adapter.setOutputChannel(inputChannel);
    return adapter;
}


But the Handler itself is attached to the channel via @ServiceActivator .



@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
    String message = new String(payload);
.......


You can immediately get the line:



@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(String payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
.......


You may have noticed the strange Checkpointer checkpointer parameter, it is used to manually acknowledge message processing

If you set CheckpointMode.MANUAL when creating the ServiceBusQueueInboundChannelAdapter , you must send acknowledge for the message yourself. If you use CheckpointMode.RECORD then confirmation will be sent automatically - details in the ServiceBusQueueTemplate code .







Features of use



So, the list of "rakes" and "chips" on which we have already gone.



ReceiveMode.PEEKLOCK



Azure Service Bus supports PEEKLOCK mode - the consumer takes a message, it locks into the service bus, is inaccessible to anyone for a certain time (lock duration), but is not deleted from it. If within the allotted time the consumer has not sent a confirmation of processing - success / abandon or has not extended the lock - the message is considered again available and a new delivery attempt will be made.



Interestingly, abandon simply resets the lock and the message becomes available instantly for re-delivery.



ServiceBusQueueTemplate default creates QueueClient mode ReceiveMode.PEEKLOCK .



If an unhandled exception flies in our handler- no acknowledge will be sent to the server and the message will remain locked and will be re-delivered by timeout.

In this case, the delivery counter will increase, which is logical.



I don't know if this is a bug or a feature - but it is very convenient to make a delay between retry for situations when it is necessary.



If the message cannot be processed even with retry, it is necessary to catch exceptions and mark the message as processed and add additional logic to the application, otherwise it will be delivered again and again until it reaches the re-delivery number limit (configured when creating a queue in the service bus )



Concurrency and Prefetch message count



As you may have guessed, the concurrency setting is responsible for the number of parallel message handlers, and the prefetch message count is how many messages we will get into the buffer from the server.



By default, the ServiceBusQueueTemplate is auto-configured (AzureServiceBusQueueAutoConfiguration) with a value of 1 for both parameters, i.e. by default each queue will have one processing thread, although the concept of a service bus with acknowledge for each individual message implies many concurrent handlers. This is all the more important if you have a long request processing.



Unfortunately, these settings cannot be set through the application config (application.yml / application.properties) and can only be set in code. But even through code, you won't be able to set different settings for different queues.



Therefore, if you need to make different settings, you will have to create several ServiceBusQueueTemplate beans for each ServiceBusQueueInboundChannelAdapter



CompletableFuture inside azure service bus java sdk



The azure service bus java sdk itself is implemented around CompletableFuture and CachedThreadPool executor - MessagingFactory.INTERNAL_THREAD_POOL so be careful with all sorts of thread local beans



Ordered messages



We use the service bus as a job queue - some jobs depend on each other and therefore must be executed in the order they were created.



As I mentioned above, T-shirts use the concept of message sessions - when messages are grouped into a session by key (transmitted in the header), the session exists as long as there is at least one message with the session key - in detail in the documentation

Service bus guarantees the delivery of messages within such a group in the order of adding to server (i.e. in the order in which the service bus server wrote them to the repository).



It is also worth mentioning if you have created a sessions enabled queue - this means that all messages must have a header with a session key.



Immediately we were very pleased with the possibility of the service bus to line up messages in a FIFO queue - albeit for a group of messages.



But after a while, we started to notice problems:



  • some messages began to arrive an infinite number of times
  • queue processing slowed down
  • in the service bus statistics, half of the requests are marked as failed, and failed requests appear even on an empty queue when idle


Looking into the sdk code, we found out the peculiarity of working with sessions:



  1. the consumer captures the session and begins to read all available messages in it
  2. simultaneously processed the number of sessions equal to the concurrency parameter
  3. unhandled exception — 1 ( ) — re-delivery ? 0 exception — ttl .
  4. — success abandon. — delay re-delivery

    .. abandon — , delivery counter .

    delivery count


As a result, they abandoned this service bus feature and wrote a bicycle, and the service bus acts as a trigger.



As soon as the sessions enabled queue was canceled, the errors in the statistics disappeared; the request to the service bus.



In JMS + Qpid bundle - this functionality is not available.



Potential problems with queue sizes larger than 1G



We haven't met yet, but I heard that it starts to work unstable if the queue size is more than 1G.



If you come across this or vice versa everything works - write in the comments.



Problems with tracing requests



The standard azure application insights agent cannot track message sending as dependency and incoming messages as requests.



I had to add some code.



Outcome



If you need a job queue with a long message processing time and you do not need a queue, you can use.



If message processing is fast - use Azure Event Hub - regular Kafka, standard client works fine.



All Articles