Skip to content

Commit

Permalink
replace messageListener with spring integration jms
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaopei0418 committed May 24, 2019
1 parent 9213662 commit 205e3ce
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 14 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
apply plugin: 'io.spring.dependency-management'

group = 'com.github'
version = '1.0.RELEASE'
version = '2.0.RELEASE'
sourceCompatibility = '1.8'

repositories {
Expand All @@ -20,6 +20,7 @@ dependencies {
}
implementation 'org.springframework.boot:spring-boot-starter-undertow'
implementation 'org.springframework.integration:spring-integration-jms'
implementation 'org.springframework.integration:spring-integration-file'
implementation 'com.ibm.mq:com.ibm.mq.allclient:9.1.2.0'
implementation 'org.projectlombok:lombok:1.18.6'
implementation 'com.alibaba:fastjson:1.2.56'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.github.distributionmessage.config;

import org.springframework.messaging.Message;

import java.text.SimpleDateFormat;
import java.util.Calendar;

/**
* Created by zhaopei on 17/12/20.
*/
public class IbmmqFileNameGenerator implements org.springframework.integration.file.FileNameGenerator {

private String prefix = "";

private String suffix = "";

private Boolean headerId = false;

private static final SimpleDateFormat TIME_FORMAT = new SimpleDateFormat("yyyyMMddHHmmssSSS");

public IbmmqFileNameGenerator(String prefix, String suffix, Boolean headerId) {
this.prefix = prefix;
this.suffix = suffix;
this.headerId = headerId;
}

public IbmmqFileNameGenerator() {

}

@Override
public String generateFileName(Message<?> message) {
StringBuffer buffer = new StringBuffer(this.prefix + "_");
buffer.append(TIME_FORMAT.format(Calendar.getInstance().getTime()));
if (headerId) {
buffer.append("_" + message.getHeaders().getId());
}
buffer.append(this.suffix);
return buffer.toString();
}

public String getPrefix() {
return prefix;
}

public void setPrefix(String prefix) {
this.prefix = prefix;
}

public String getSuffix() {
return suffix;
}

public void setSuffix(String suffix) {
this.suffix = suffix;
}

public Boolean getHeaderId() {
return headerId;
}

public void setHeaderId(Boolean headerId) {
this.headerId = headerId;
}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,32 @@
package com.github.distributionmessage.config;

import com.github.distributionmessage.constant.ChannelConstant;
import com.github.distributionmessage.handler.DistributionSendingMessageHandler;
import com.github.distributionmessage.listener.DistributionMessageListener;
import com.ibm.mq.jms.MQQueueConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;
import com.ibm.mq.jms.MQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.MessageCreator;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.file.FileNameGenerator;
import org.springframework.integration.file.FileWritingMessageHandler;
import org.springframework.integration.jms.ChannelPublishingJmsMessageListener;
import org.springframework.integration.jms.JmsMessageDrivenEndpoint;
import org.springframework.integration.jms.JmsSendingMessageHandler;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.StringUtils;

import javax.annotation.PostConstruct;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.Calendar;

@Configuration
public class IntegrationConfiguration {
Expand All @@ -25,15 +37,71 @@ public class IntegrationConfiguration {
@Autowired
private DistributionProp distributionProp;

@Bean(name = ChannelConstant.IBMMQ_RECEIVE_CHANNEL)
public MessageChannel ibmmqReceiveChannel() {
return new PublishSubscribeChannel();
}

private static MessageHandler buildFileWriteMessageHandler(String dir, FileNameGenerator fileNameGenerator, boolean split) {
FileWritingMessageHandler handler = null;
if (split) {
handler = new FileWritingMessageHandler(new SpelExpressionParser().parseExpression(
"@filePara.getTodayDir('" + dir + "')"));
} else {
handler = new FileWritingMessageHandler(new File(dir));
}
handler.setDeleteSourceFiles(true);
handler.setExpectReply(false);
handler.setFileNameGenerator(fileNameGenerator);
handler.setAutoCreateDirectory(true);
return handler;
}

@Bean
public Object filePara() {
return new Object() {

public String getTodayDir(String dir) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
return StringUtils.isEmpty(dir) ? "" : dir + File.separator +
simpleDateFormat.format(Calendar.getInstance().getTime());
}
};
}

// @Bean
// @ServiceActivator(inputChannel = ChannelConstant.IBMMQ_RECEIVE_CHANNEL)
public MessageHandler receiveMessageHandler() {
return buildFileWriteMessageHandler("D:\\softs\\distribution-message\\mqmessage",
new IbmmqFileNameGenerator("IBMMQ",
".xml", true), false);
}

@Bean
@ServiceActivator(inputChannel = ChannelConstant.IBMMQ_RECEIVE_CHANNEL)
public JmsSendingMessageHandler jmsSendingMessageHandler(JmsTemplate jmsTemplate) {
DistributionSendingMessageHandler distributionSendingMessageHandler = new DistributionSendingMessageHandler(jmsTemplate);
distributionSendingMessageHandler.setDistributionProp(this.distributionProp);
return distributionSendingMessageHandler;
}

@Bean
public DefaultMessageListenerContainer defaultMessageListenerContainer(ConnectionFactory connectionFactory) {
DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
defaultMessageListenerContainer.setConnectionFactory(connectionFactory);
defaultMessageListenerContainer.setConcurrency(this.distributionProp.getMinConcurrency() + "-"
+ this.distributionProp.getMaxConcurrency());
defaultMessageListenerContainer.setMessageListener(this.distributionMessageListener);
// defaultMessageListenerContainer.setMessageListener(this.distributionMessageListener);
defaultMessageListenerContainer.setDestinationName(this.distributionProp.getQueueName());
return defaultMessageListenerContainer;
}

@Bean
public JmsMessageDrivenEndpoint jmsMessageDrivenEndpoint(DefaultMessageListenerContainer defaultMessageListenerContainer) {
JmsMessageDrivenEndpoint jmsMessageDrivenEndpoint = new JmsMessageDrivenEndpoint(defaultMessageListenerContainer,
new ChannelPublishingJmsMessageListener());
jmsMessageDrivenEndpoint.setOutputChannelName(ChannelConstant.IBMMQ_RECEIVE_CHANNEL);
return jmsMessageDrivenEndpoint;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.github.distributionmessage.constant;

public interface ChannelConstant {

String IBMMQ_RECEIVE_CHANNEL = "ibmmqReceiveChannel";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.github.distributionmessage.handler;

import com.github.distributionmessage.config.DistributionProp;
import com.github.distributionmessage.constant.CommonConstant;
import com.github.distributionmessage.utils.CommonUtils;
import com.github.distributionmessage.utils.DistributionUtils;
import com.ibm.mq.jms.MQQueue;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.springframework.integration.jms.DefaultJmsHeaderMapper;
import org.springframework.integration.jms.JmsHeaderMapper;
import org.springframework.integration.jms.JmsSendingMessageHandler;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessagePostProcessor;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

@Data
@EqualsAndHashCode(callSuper=false)
public class DistributionSendingMessageHandler extends JmsSendingMessageHandler {

private final JmsTemplate jmsTemplate;

private JmsHeaderMapper headerMapper = new DefaultJmsHeaderMapper();

private DistributionProp distributionProp;

public DistributionSendingMessageHandler(JmsTemplate jmsTemplate) {
super(jmsTemplate);
this.jmsTemplate = jmsTemplate;
}

@Override
protected void handleMessageInternal(Message<?> message) {
MessagePostProcessor messagePostProcessor = new HeaderMappingMessagePostProcessor(message, this.headerMapper);
Assert.notNull(this.distributionProp, "distributionProp must not be null");
Assert.notNull(message, "Message must not be null");
Object playload = message.getPayload();
Assert.notNull(playload, "Message playload must not be null");
if (playload instanceof byte[]) {
try {
byte[] bytes = (byte[]) playload;
MQQueue queue = new MQQueue();
queue.setCCSID(this.distributionProp.getCcsid());
String sm = new String(bytes, CommonConstant.CHARSET);
String dxpid = DistributionUtils.getDxpIdByMessage(sm);
String msgtype = DistributionUtils.getMessageType(sm);
String queueName = DistributionUtils.getDestinationQueueName(this.distributionProp, dxpid, msgtype);
logger.info("dxpId=[" + dxpid + "] messageType=["
+ msgtype + "] distributionQueue=[" + queueName + "]");
queue.setBaseQueueName(queueName);
this.jmsTemplate.convertAndSend(queue, playload, messagePostProcessor);
} catch (Exception e) {
CommonUtils.logError(logger, e);
}
} else {
logger.error("message not is bytes message! message=[" + message + "]");
}
}

private static final class HeaderMappingMessagePostProcessor implements MessagePostProcessor {

private final Message<?> integrationMessage;

private final JmsHeaderMapper headerMapper;

HeaderMappingMessagePostProcessor(Message<?> integrationMessage, JmsHeaderMapper headerMapper) {
this.integrationMessage = integrationMessage;
this.headerMapper = headerMapper;
}

@Override
public javax.jms.Message postProcessMessage(javax.jms.Message jmsMessage) {
this.headerMapper.fromHeaders(this.integrationMessage.getHeaders(), jmsMessage);
return jmsMessage;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.github.distributionmessage.constant.CommonConstant;
import com.github.distributionmessage.utils.CommonUtils;
import com.github.distributionmessage.utils.DistributionUtils;
import com.ibm.mq.jms.MQQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -42,11 +43,14 @@ public void onMessage(Message message) {
logger.error("message is not dxp message! message=[" + sm + "]");
return;
}
jmsTemplate.send(queueName, session -> {
BytesMessage bm = session.createBytesMessage();
bm.writeBytes(bytes);
return bm;
});
MQQueue queue = new MQQueue(queueName);
queue.setCCSID(this.distributionProp.getCcsid());
jmsTemplate.convertAndSend(queue, bytes);
// jmsTemplate.send(queue, session -> {
// BytesMessage bm = session.createBytesMessage();
// bm.writeBytes(bytes);
// return bm;
// });
} else {
logger.error("message not is bytes message! message=[" + message + "]");
}
Expand Down
4 changes: 4 additions & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
server:
port: 8080


distribution:
hostName: 172.16.33.119
port: 1800
Expand All @@ -20,3 +21,6 @@ distribution:
randomDistribution:
# 所有没有区配到,就转发到默认队列
defaultQueue: GGFW_TO_ENT

logging:
config: classpath:logback-spring.xml

0 comments on commit 205e3ce

Please sign in to comment.