Multi connection IBM MQ using Spring

I will give an example of how to configure multiple endpoints to connect to IBM MQ.



Goal:



  • read from multiple queues, named the same, but located on different hosts / queue managers
  • write a response to a randomly defined node


0. Let's assume that you have already deployed MQ or are using someone else's.



1. We load the dependencies into the project:



maven
<dependency>
    <groupId>com.ibm.mq</groupId>
    <artifactId>mq-jms-spring-boot-starter</artifactId>
    <version>2.3.3</version>
</dependency>




gradle
compile group: 'com.ibm.mq', name: 'mq-jms-spring-boot-starter', version: '2.3.3'




2. Create a config, enter the connection parameters of your points (you have already created them?). We use an array, so there can be as many connections as you like.



mq:
  servers:
    - queueManager: QM1
      channel: DEV.ADMIN.SVRCONN
      connName: ibmmq.ru(1414)
      user: admin
      password: passw0rd
    - queueManager: QM2
      channel: DEV.ADMIN.SVRCONN
      connName: ibmmq.ru(1415)
      user: admin
      password: passw0rd
  queue1: QUEUE1
  queue2: QUEUE2


3. Create classes for reading these properties:



import lombok.Data;
import lombok.EqualsAndHashCode;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties(prefix = "mq")
@EqualsAndHashCode(callSuper = false)
@Data
public class MqConfig {
    private List<ConnectionConfiguration> servers;
    private String queue1;
    private String queue2;

}


import lombok.Data;
import lombok.EqualsAndHashCode;

@Data
@EqualsAndHashCode(callSuper = false)
public class ConnectionConfiguration {
    String queueManager;
    String channel;
    String connName;
    String user;
    String password;
}


4. Create a listener:



import javax.jms.MessageListener;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MqListener implements MessageListener {

    @SneakyThrows
    @Override
    public void onMessage(@Payload javax.jms.Message message) {
        log.info("  <" + message + ">");
        //TODO:      
    }


5. Configuring! We determine the connectionFactors for each element of the array from yml-properties. We create a sheet of templates for sending messages, to the input of which we feed the created connections. We create listener factories, at the input of which we also use the created connectionFactories.




import com.fasterxml.jackson.databind.ObjectMapper;
import com.ibm.mq.jms.MQConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.*;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.QosSettings;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
import org.springframework.jms.support.converter.SimpleMessageConverter;

import javax.jms.*;
import java.util.*;

import static javax.jms.DeliveryMode.NON_PERSISTENT;
import static javax.jms.Session.CLIENT_ACKNOWLEDGE;

@Configuration
@EnableJms
@Slf4j
public class MqConfiguration {

    @Autowired
    MqConfig mqConfig;

    @Autowired
    private JmsListenerEndpointRegistry registry;

//  ,       connectionFactories
    @Bean
    public List<JmsListenerContainerFactory> myFactories(
            @Qualifier("myConnFactories") 
            List<CachingConnectionFactory> connectionFactories,
            MqListener mqListener) {
        List<JmsListenerContainerFactory> factories = new ArrayList<>();
        connectionFactories.forEach(connectionFactory -> {
            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setSessionAcknowledgeMode(CLIENT_ACKNOWLEDGE);

            QosSettings qosSettings = new QosSettings();
            qosSettings.setDeliveryMode(NON_PERSISTENT);
            factory.setReplyQosSettings(qosSettings);

            SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
            endpoint.setId("myJmsEndpoint-"+ UUID.randomUUID());
            endpoint.setDestination(mqConfig.getQueue1());
            endpoint.setMessageListener(mqListener);
            registry.registerListenerContainer(endpoint, factory);

            factories.add(factory);
        });
        return factories;
    }
//     ,      
    @Bean
    @Qualifier("myJmsTemplates")
    public List<JmsTemplate> jmsTemplates(
            @Qualifier("myConnFactories") 
            List<CachingConnectionFactory> connectionFactories) {
        return getJmsTemplates(new ArrayList<ConnectionFactory>(connectionFactories));
    }

    public List<JmsTemplate> getJmsTemplates(List<ConnectionFactory> connectionFactories) {
        List<JmsTemplate> jmsTemplates = new ArrayList<>();
        for (ConnectionFactory connectionFactory : connectionFactories) {
            JmsTemplate jmsTemplate = new JmsTemplate();
            jmsTemplate.setConnectionFactory(connectionFactory);
            jmsTemplate.setMessageConverter(new SimpleMessageConverter());
            jmsTemplate.setDefaultDestinationName(mqConfig.getQueue2());
            jmsTemplate.setDeliveryMode(NON_PERSISTENT);
            jmsTemplate.setDeliveryPersistent(false);
            jmsTemplate.setExplicitQosEnabled(true);
            jmsTemplates.add(jmsTemplate);
        }
        return jmsTemplates;
    }

//       yml-
    @Bean
    @Qualifier("myConnFactories")
    public List<CachingConnectionFactory> connectionFactories() throws JMSException {
        List<CachingConnectionFactory> factories = new ArrayList<>();

        for (ConnectionConfiguration server : mqConfig.getServers()) {
            CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
            MQConnectionFactory cf = new MQConnectionFactory();
            cachingConnectionFactory.setTargetConnectionFactory(cf);
            cf.setQueueManager(server.getQueueManager());
            cf.setChannel(server.getChannel());
            cf.setConnectionNameList(server.getConnName());
            cf.setStringProperty(WMQConstants.USERID, server.getUser());
            cf.setStringProperty(WMQConstants.PASSWORD, server.getPassword());
            cf.setStringProperty("XMSC_WMQ_CONNECTION_MODE", "1");

            factories.add(cachingConnectionFactory);
        }
        return factories;
    }

}


endpoint.setMessageListener (mqListener);
Here we indicate the listener (which was created in step 4) to determine the actions when receiving a message.



6. Let's create a service layer, where let's say there will be some logic and after sending a response.



import javax.jms.TextMessage;

public interface MqService {

    void sendToMq(TextMessage msg);

}


import javax.jms.TextMessage;
import org.springframework.jms.JmsException;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

@Service
@Slf4j
public class MqServiceImpl implements MqService {

    @Autowired
    private MqConfig mqConfig;

    @Autowired
    @Qualifier("myJmsTemplates")
    List<JmsTemplate> jmsTemplates;


    @Override
    public void sendToMq(TextMessage msg ) {
        //- 
        //     /  .
        int maxIndex = jmsTemplates.size()-1; //    - ""
        int randomNumber = (int) Math.round(Math.random() * maxIndex);
        jmsTemplates.get(randomNumber).convertAndSend(mqConfig.getQueue2(), msg);
    }
}


7. Add sending the response to the listener:



    @Autowired
    MqService mqService;

    @SneakyThrows
    @Override
    public void onMessage(@Payload javax.jms.Message message) {
        log.info("  <" + message + ">");
        mqService.sentToMq((TextMessage) message);
    }



Voila, you're done, you can check.



All Articles