Goal:
- read from multiple queues, named the same, but located on different hosts / queue managers
- write a response to a randomly defined node
0. Let's assume that you have already deployed MQ or are using someone else's.
1. We load the dependencies into the project:
maven
<dependency>
<groupId>com.ibm.mq</groupId>
<artifactId>mq-jms-spring-boot-starter</artifactId>
<version>2.3.3</version>
</dependency>
gradle
compile group: 'com.ibm.mq', name: 'mq-jms-spring-boot-starter', version: '2.3.3'
2. Create a config, enter the connection parameters of your points (you have already created them?). We use an array, so there can be as many connections as you like.
mq:
servers:
- queueManager: QM1
channel: DEV.ADMIN.SVRCONN
connName: ibmmq.ru(1414)
user: admin
password: passw0rd
- queueManager: QM2
channel: DEV.ADMIN.SVRCONN
connName: ibmmq.ru(1415)
user: admin
password: passw0rd
queue1: QUEUE1
queue2: QUEUE2
3. Create classes for reading these properties:
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConfigurationProperties(prefix = "mq")
@EqualsAndHashCode(callSuper = false)
@Data
public class MqConfig {
private List<ConnectionConfiguration> servers;
private String queue1;
private String queue2;
}
import lombok.Data;
import lombok.EqualsAndHashCode;
@Data
@EqualsAndHashCode(callSuper = false)
public class ConnectionConfiguration {
String queueManager;
String channel;
String connName;
String user;
String password;
}
4. Create a listener:
import javax.jms.MessageListener;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MqListener implements MessageListener {
@SneakyThrows
@Override
public void onMessage(@Payload javax.jms.Message message) {
log.info(" <" + message + ">");
//TODO:
}
5. Configuring! We determine the connectionFactors for each element of the array from yml-properties. We create a sheet of templates for sending messages, to the input of which we feed the created connections. We create listener factories, at the input of which we also use the created connectionFactories.
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ibm.mq.jms.MQConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.*;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.QosSettings;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
import org.springframework.jms.support.converter.SimpleMessageConverter;
import javax.jms.*;
import java.util.*;
import static javax.jms.DeliveryMode.NON_PERSISTENT;
import static javax.jms.Session.CLIENT_ACKNOWLEDGE;
@Configuration
@EnableJms
@Slf4j
public class MqConfiguration {
@Autowired
MqConfig mqConfig;
@Autowired
private JmsListenerEndpointRegistry registry;
// , connectionFactories
@Bean
public List<JmsListenerContainerFactory> myFactories(
@Qualifier("myConnFactories")
List<CachingConnectionFactory> connectionFactories,
MqListener mqListener) {
List<JmsListenerContainerFactory> factories = new ArrayList<>();
connectionFactories.forEach(connectionFactory -> {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setSessionAcknowledgeMode(CLIENT_ACKNOWLEDGE);
QosSettings qosSettings = new QosSettings();
qosSettings.setDeliveryMode(NON_PERSISTENT);
factory.setReplyQosSettings(qosSettings);
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId("myJmsEndpoint-"+ UUID.randomUUID());
endpoint.setDestination(mqConfig.getQueue1());
endpoint.setMessageListener(mqListener);
registry.registerListenerContainer(endpoint, factory);
factories.add(factory);
});
return factories;
}
// ,
@Bean
@Qualifier("myJmsTemplates")
public List<JmsTemplate> jmsTemplates(
@Qualifier("myConnFactories")
List<CachingConnectionFactory> connectionFactories) {
return getJmsTemplates(new ArrayList<ConnectionFactory>(connectionFactories));
}
public List<JmsTemplate> getJmsTemplates(List<ConnectionFactory> connectionFactories) {
List<JmsTemplate> jmsTemplates = new ArrayList<>();
for (ConnectionFactory connectionFactory : connectionFactories) {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(connectionFactory);
jmsTemplate.setMessageConverter(new SimpleMessageConverter());
jmsTemplate.setDefaultDestinationName(mqConfig.getQueue2());
jmsTemplate.setDeliveryMode(NON_PERSISTENT);
jmsTemplate.setDeliveryPersistent(false);
jmsTemplate.setExplicitQosEnabled(true);
jmsTemplates.add(jmsTemplate);
}
return jmsTemplates;
}
// yml-
@Bean
@Qualifier("myConnFactories")
public List<CachingConnectionFactory> connectionFactories() throws JMSException {
List<CachingConnectionFactory> factories = new ArrayList<>();
for (ConnectionConfiguration server : mqConfig.getServers()) {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
MQConnectionFactory cf = new MQConnectionFactory();
cachingConnectionFactory.setTargetConnectionFactory(cf);
cf.setQueueManager(server.getQueueManager());
cf.setChannel(server.getChannel());
cf.setConnectionNameList(server.getConnName());
cf.setStringProperty(WMQConstants.USERID, server.getUser());
cf.setStringProperty(WMQConstants.PASSWORD, server.getPassword());
cf.setStringProperty("XMSC_WMQ_CONNECTION_MODE", "1");
factories.add(cachingConnectionFactory);
}
return factories;
}
}
endpoint.setMessageListener (mqListener);Here we indicate the listener (which was created in step 4) to determine the actions when receiving a message.
6. Let's create a service layer, where let's say there will be some logic and after sending a response.
import javax.jms.TextMessage;
public interface MqService {
void sendToMq(TextMessage msg);
}
import javax.jms.TextMessage;
import org.springframework.jms.JmsException;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class MqServiceImpl implements MqService {
@Autowired
private MqConfig mqConfig;
@Autowired
@Qualifier("myJmsTemplates")
List<JmsTemplate> jmsTemplates;
@Override
public void sendToMq(TextMessage msg ) {
//-
// / .
int maxIndex = jmsTemplates.size()-1; // - ""
int randomNumber = (int) Math.round(Math.random() * maxIndex);
jmsTemplates.get(randomNumber).convertAndSend(mqConfig.getQueue2(), msg);
}
}
7. Add sending the response to the listener:
@Autowired
MqService mqService;
@SneakyThrows
@Override
public void onMessage(@Payload javax.jms.Message message) {
log.info(" <" + message + ">");
mqService.sentToMq((TextMessage) message);
}
Voila, you're done, you can check.