04.ActiveMQ与Spring JMS整合

2020-09-16 15:34发布

    SpringJMS使用参考:http://docs.spring.io/spring/docs/current/spring-framework-reference/html/jms.html 

1.SpringJMS核心接口介绍

1.JmsTemplate
    JmsTemplate: 是Spring自身提供,只需向Spring容器内注册这个类即可,就可以使用JmsTemplate类对象方便的操作JMS,下面介绍他常用的方法。
    注意:JmsTemplate类是线程安全的,可以在整个应用范围使用。但并不是说整个引用只能使用一个JmsTemplate实例,可以根据需要注入多个JmsTemplate实例。
  1. // send - 发送一个消息,使用消息创建接口MessageCreator
  2. publicvoid send(MessageCreator messageCreator)
  3. publicvoid send(finalDestination destination,finalMessageCreator messageCreator)
  4. publicvoid send(finalString destinationName,finalMessageCreator messageCreator)
  5. // sendAndReceive - 发送并接收消息
  6. publicMessage sendAndReceive(MessageCreator messageCreator)
  7. publicMessage sendAndReceive(finalDestination destination,finalMessageCreator messageCreator)
  8. publicMessage sendAndReceive(finalString destinationName,finalMessageCreator messageCreator)
  9. // convertAndSend - 使用MessageConverter接口转换消息,先将对象转换成消息再发送消息。与之对应的是receiveAndConvert方法
  10. publicvoid convertAndSend(Object message)throwsJmsException
  11. publicvoid convertAndSend(Destination destination,finalObject message)
  12. publicvoid convertAndSend(String destinationName,finalObject message)
  13. publicvoid convertAndSend(Object message,MessagePostProcessor postProcessor)
  14. publicvoid convertAndSend(Destination destination,finalObject message,finalMessagePostProcessor postProcessor)
  15. publicvoid convertAndSend(String destinationName,finalObject message,finalMessagePostProcessor postProcessor)
  16. // receive - 接收消息
  17. publicMessage receive()
  18. publicMessage receive(Destination destination)
  19. publicMessage receive(String destinationName)
  20. // receiveSelected - 接收消息,并过滤消息
  21. publicMessage receiveSelected(String messageSelector)
  22. publicMessage receiveSelected(finalDestination destination,finalString messageSelector)
  23. publicMessage receiveSelected(finalString destinationName,finalString messageSelector)
  24. // receiveAndConvert - 接收消息,并使用MessageConverter接口转换消息,把一个消息转换成一个对象
  25. publicObject receiveAndConvert()
  26. publicObject receiveAndConvert(Destination destination)
  27. publicObject receiveAndConvert(String destinationName)
  28. // receiveSelectedAndConvert - 接收消息,并使用消息过滤和消息转换
  29. publicObject receiveSelectedAndConvert(String messageSelector)
  30. publicObject receiveSelectedAndConvert(Destination destination,String messageSelector)
  31. publicObject receiveSelectedAndConvert(String destinationName,String messageSelector)
  32. // browse - 浏览消息
  33. public<T> T browse(BrowserCallback<T> action)
  34. public<T> T browse(Queue queue,BrowserCallback<T> action)
  35. public<T> T browse(String queueName,BrowserCallback<T> action)
  36. // browseSelected - 浏览消息,并使用过滤
  37. public<T> T browseSelected(String messageSelector,BrowserCallback<T> action)
  38. public<T> T browseSelected(finalQueue queue,finalString messageSelector,finalBrowserCallback<T> action)
  39. public<T> T browseSelected(finalString queueName,finalString messageSelector,finalBrowserCallback<T> action)
  40. // execute - 执行SessionCallback、ProducerCallback、BrowserCallback回调接口,并得到回调接口返回值
  41. public<T> T execute(SessionCallback<T> action)
  42. public<T> T execute(SessionCallback<T> action,boolean startConnection)
  43. public<T> T execute(ProducerCallback<T> action)
  44. public<T> T execute(finalDestination destination,finalProducerCallback<T> action)
  45. public<T> T execute(finalDestination destination,finalProducerCallback<T> action)
2.连接工厂(连接池)
    JmsTemplate需要引用一个ConnectionFactory,JmsTemlate每次发送消息时都会重新创建连接,创建connection,session,创建productor。这是一个非常耗性能的地方,特别是大数据量的情况下。所以出现了PooledConnectionFactory,PooledConnectionFactory是ActiveMQ中的类,它实现了ConnectionFactory。SpringJMS的实现有SingleConnectionFactory、CachingConnectionFactory。
  1. PooledConnectionFactory:会缓存connection,session和productor,不会缓存consumer。因此只适合于生产者发送消息。
  2. SingleConnectionFactory:对于建立JMS服务器链接的请求会一直返回同一个Connection,并且会忽略Connection的close方法调用(包括调用createConnection()得到的Connection)
  3. CachingConnectionFactory:继承了SingleConnectionFactory,所以它拥有SingleConnectionFactory的所有功能,同时它还新增了缓存功能,它可以缓存Session、MessageProducer和MessageConsumer。
    SpringJMS提供了CachingConnectionFactory,这才是首选的方案。然而CachingConnectionFactory有一个问题必须指出,默认情况下,CachingConnectionFactory只缓存一个session,在它的JavaDoc中,它声明对于低并发情况下这是足够的。与之相反,PooledConnectionFactory的默认值是500。这些设置,在很多情况下,需要亲自去测试并验证。我将其设置为100,对我来说还是很不错。
3.接口介绍
以下是SpringJMS的常用接口或类:
  1. MessageCreator -- 消息创建接口,发送消息时需要使用此接口创建消息
  2. SessionCallback -- 使用JMS Session时的回调接口
  3. ProducerCallback -- 使用JMS Session和MessageProducer时的回调接口
  4. BrowserCallback -- 使用JMS Session和QueueBrowser时的回调接口
  5. MessageListener -- 消息监听器接口,注解@JmsListener与其功能相似
  6. ListenerContainer -- 消息侦听器容器接口,实现有SimpleMessageListenerContainer、DefaultMessageListenerContainer
  7. MessageConverter -- 消息转换接口,用于JMS消息到Java对象之间的转换

2.消息监听器

    在Spring整合JMS的应用中我们在定义消息监听器的时候一共可以定义三种类型的消息监听器,分别是MessageListener、SessionAwareMessageListener和MessageListenerAdapter。下面就分别来介绍一下这几种类型的区别。
1.MessageListener
    MessageListener是最原始的消息监听器,它是JMS规范中定义的一个接口。其中定义了一个用于处理接收到的消息的onMessage方法,该方法只接收一个Message参数。我们前面在讲配置消费者的时候用的消息监听器就是MessageListener,代码如下:
  1. publicclassQueueMessageListenerimplementsMessageListener
  2. {
  3. @Override
  4. publicvoid onMessage(Message message)
  5. {
  6. try
  7. {
  8. if(message instanceofTextMessage)
  9. {
  10. TextMessage tm =(TextMessage) message;
  11. System.out.println("监听消息:"+ tm.getText());
  12. }
  13. else
  14. {
  15. System.out.println("消息类型:"+ message.getClass());
  16. }
  17. }
  18. catch(JMSException e)
  19. {
  20. e.printStackTrace();
  21. }
  22. }
  23. }
    在上面代码中我们只是简单的打印了一些消息相关的信息。
2.SessionAwareMessageListener
    SessionAwareMessageListener是Spring为我们提供的,它不是标准的JMS MessageListener。MessageListener的设计只是纯粹用来接收消息的,假如我们在使用MessageListener处理接收到的消息时我们需要发送一个消息通知对方我们已经收到这个消息了,那么这个时候我们就需要在代码里面去重新获取一个Connection或Session。SessionAwareMessageListener的设计就是为了方便我们在接收到消息后发送一个回复的消息,它同样为我们提供了一个处理接收到的消息的onMessage方法,但是这个方法可以同时接收两个参数,一个是表示当前接收到的消息Message,另一个就是可以用来发送消息的Session对象。先来看一段代码:
  1. publicclassQueueSessionAwareMessageListenerimplementsSessionAwareMessageListener<TextMessage>
  2. {
  3. /** 回复消息的目的地 */
  4. privateDestination destination;
  5. @Override
  6. publicvoid onMessage(TextMessage message,Session session)throwsJMSException
  7. {
  8. System.out.println("监听消息内容:"+ message.getText());
  9. MessageProducer messageProducer = session.createProducer(destination);
  10. TextMessage replyMessage = session.createTextMessage("已收到消息:"+ message.getJMSMessageID());
  11. messageProducer.send(replyMessage);
  12. }
  13. publicDestination getDestination()
  14. {
  15. return destination;
  16. }
  17. publicvoid setDestination(Destination destination)
  18. {
  19. this.destination = destination;
  20. }
  21. }
    在上面代码中我们定义了一个SessionAwareMessageListener,在这个Listener中我们在接收到了一个消息之后,利用对应的Session创建了一个到destination的生产者和对应的消息,然后利用创建好的生产者发送对应的消息。
3.MessageListenerAdapter
    MessageListenerAdapter类实现了MessageListener接口和SessionAwareMessageListener接口,它的主要作用是将接收到的消息进行类型转换,然后通过反射的形式把它交给一个普通的Java类进行处理。MessageListenerAdapter会把接收到的消息做如下转换:
  1. TextMessage转换为String对象
  2. BytesMessage转换为byte数组
  3. MapMessage转换为Map对象
  4. ObjectMessage转换为对应的Serializable对象
    既然前面说了MessageListenerAdapter会把接收到的消息做一个类型转换,然后利用反射把它交给真正的目标处理器——一个普通的Java类进行处理,如果真正的目标处理器是一个MessageListener或者是一个SessionAwareMessageListener,那么Spring将直接使用接收到的Message对象作为参数调用它们的onMessage方法,而不会再利用反射去进行调用。那么我们在定义一个MessageListenerAdapter的时候就需要为它指定这样一个目标类。这个目标类我们可以通过MessageListenerAdapter的构造方法参数指定,也可以通过属性delegate指定,如:
  1. <!--消息监听适配器-->
  2. <bean id="messageListenerAdapter"class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
  3. <!--通过构造函数指定消息处理器-->
  4. <constructor-arg>
  5. <bean id="readerMessage"class="com.test.ReaderMessage"/>
  6. </constructor-arg>
  7. <!--指定消息处理器类处理消息的方法-->
  8. <property name="defaultListenerMethod" value="receiveMessage"/>
  9. <!--通过delegate属性指定消息处理器-->
  10. <property name="delegate">
  11. <bean id="readerMessage"class="com.test.ReaderMessage"/>
  12. </property>
  13. </bean>
    前面说了如果我们指定的这个目标处理器是一个MessageListener或者是一个SessionAwareMessageListener的时候Spring将直接利用接收到的Message对象作为方法参数调用它们的onMessage方法。但是如果指定的目标处理器是一个普通的Java类时Spring将利用Message进行了类型转换之后的对象作为参数通过反射去调用真正的目标处理器的处理方法,那么Spring是如何知道该调用哪个方法呢?这是通过MessageListenerAdapter的defaultListenerMethod属性来决定的,当我们没有指定该属性时,Spring会默认调用目标处理器的handleMessage方法。若使用以下MessageListenerByAdapter类处理消息,可以配置MessageListenerAdapterdelegate属性来设置处理消息的方法。
  1. publicclassMessageListenerByAdapter
  2. {
  3. publicvoid handleMessage(String message)
  4. {
  5. System.out.println("handleMessage方法处理消息,消息内容是:"+ message);
  6. }
  7. publicvoid receiveMessage(String message)
  8. {
  9. System.out.println("receiveMessage方法处理消息,消息内容是:"+ message);
  10. }
  11. }
    MessageListenerAdapter除了会自动的把一个普通Java类当做MessageListener来处理接收到的消息之外,其另外一个主要的功能是可以自动的发送返回消息当我们设置的消息处理方法的返回值不为空的时候,SpringJMS会自动将它封装为一个JMS Message,然后自动进行回复。那么这个时候这个回复消息将发送到哪里呢?这主要有两种方式可以指定。
  1. 第一,可以通过发送的Message的setJMSReplyTo方法指定该消息对应的回复消息的目的地。这需要生产者发送消息之前调用setJMSReplyTo方法。
  2. 第二,通过MessageListenerAdapter的defaultResponseDestination属性来指定。
注意:当两种方式都指定了消息的回复目的地的时候使用发送消息的setJMSReplyTo方法指定的目的地将具有较高的优先级。
4.配置消息监听器
    配置以上三种消息监听器,需要使用消息侦听容器(MessageListenerContainer),在配置一个MessageListenerContainer的时候有三个属性必须指定,一个是表示从哪里监听的ConnectionFactory;一个是表示监听什么的Destination;一个是接收到消息以后进行消息处理的MessageListener。示例如下:
  1. <bean id="jmsContainer"class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  2. <!--设置连接工厂-->
  3. <property name="connectionFactory" ref="connectionFactory"/>
  4. <!--设置监听地址-->
  5. <property name="destination" ref="queueDestination"/>
  6. <!--设置消息监听处理器,可以是以上三种-->
  7. <property name="messageListener" ref="queueMessageListener"/>
  8. </bean>
Spring提供了三种AbstractMessageListenerContainer的子类,每种各有其特点。
  1. SimpleMessageListenerContainer:这个消息侦听容器是三种中最简单的。它在启动时创建一个会话session和消费者Consumer,并且会使用标准的JMS MessageConsumer.setMessageListener()方法注册监听器让JMS提供者调用监听器的回调函数。它不会动态的适应运行时需要和参与外部的事务管理。兼容性方面,它非常接近于独立的JMS规范,但一般不兼容Java EE的JMS限制。
  2. DefaultMessageListenerContainer:这个消息侦听器使用的最多。跟SimpleMessageListenerContainer相比,DefaultMessageListenerContainer会动态的适应运行时需要,并且能够参与外部的事务管理。它很好的平衡了对JMS提供者要求低、先进功能如事务参与和兼容Java EE环境。
  3. ServerSessionMessageListenerContainer:这个监听器容器利用JMS ServerSessionPool SPI动态管理JMS Session。使用者各种消息监听器可以获得运行时动态调优功能,但是这也要求JMS提供者支持ServerSessionPool SPI。如果不需要运行时性能调整,请使用 DefaultMessageListenerContainer 或 SimpleMessageListenerContainer。

3.ActiveMQ与SpringJMS整合示例

    本节提供了一个SpringJMS与ActiveMQ整合的示例,本示例可以作为参考。
1.配置spring-context-jms.xml
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beansxmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:context="http://www.springframework.org/schema/context"
  5. xmlns:jms="http://www.springframework.org/schema/jms"
  6. xsi:schemaLocation="
  7. http://www.springframework.org/schema/jms
  8. http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
  9. http://www.springframework.org/schema/beans
  10. http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
  11. http://www.springframework.org/schema/context
  12. http://www.springframework.org/schema/context/spring-context-4.0.xsd"
  13. default-lazy-init="false">
  14. <!-- 配置JMS连接工厂 -->
  15. <beanid="connectionFactory"class="org.apache.activemq.ActiveMQConnectionFactory">
  16. <propertyname="brokerURL"value="failover:(tcp://localhost:61616)"/>
  17. </bean>
  18. <!-- ActiveMQ连接池配置,ActiveMQ实现 -->
  19. <!--
  20. <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
  21. <property name="connectionFactory" ref="connectionFactory" />
  22. </bean>
  23. -->
  24. <!-- ActiveMQ连接池配置,SpingJMS实现 -->
  25. <beanid="cachingConnectionFactory"class="org.springframework.jms.connection.CachingConnectionFactory">
  26. <propertyname="targetConnectionFactory"ref="connectionFactory"/>
  27. <!-- Session缓存数量,这里属性也可以直接在这里配置 -->
  28. <propertyname="sessionCacheSize"value="100"/>
  29. </bean>
  30. <!-- 消息队列01 -->
  31. <beanid="queueDestination01"class="org.apache.activemq.command.ActiveMQQueue">
  32. <!-- 设置消息队列的名字 -->
  33. <constructor-arg>
  34. <value>spring-jms-queue01</value>
  35. </constructor-arg>
  36. </bean>
  37. <!-- 消息队列02 -->
  38. <beanid="queueDestination02"class="org.apache.activemq.command.ActiveMQQueue">
  39. <!-- 设置消息队列的名字 -->
  40. <constructor-arg>
  41. <value>spring-jms-queue02</value>
  42. </constructor-arg>
  43. </bean>
  44. <!-- 配置JMS模板,Spring提供的JMS工具类,它发送、接收消息。 -->
  45. <beanid="jmsTemplate"class="org.springframework.jms.core.JmsTemplate">
  46. <propertyname="connectionFactory"ref="cachingConnectionFactory"/>
  47. <propertyname="defaultDestination"ref="queueDestination01"/>
  48. <propertyname="receiveTimeout"
    标签: