- JMS Java Message Service
- RabbitMQ和AMQP
- Apache Kafka
JMS
JMS最早于2001年引入,规范了异步消息的接口,Spring通过JmsTemplate
对JMS提供支持,Spring还提供了事件驱动的POJO——可以根据队列或者订阅服务的消息进行响应的对象。
配置JMS
JMS需要一个消息代理,可以在Apache ActiveMQ或者更新的Apache ActiveMQ Artemis之间选择,我们选择Apache ActiveMQ Artemis,添加依赖:<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-artemis</artifactId> </dependency>Apache ActiveMQ Artemis是Apache ActiveMQ的升级+重新实现版本,但选用哪个对实际编写程序没有什么影响,在配置方面略有不同。 Spring默认将Artemis消息代理运行在61616端口,但是在生产环境下还需要几个配置:
属性 | 解释 |
---|---|
spring.artemis.host | 消息代理主机地址 |
spring.artemis.port | 消息代理的端口 |
spring.artemis.user | 登录消息代理的用户名,可选 |
spring.artemis.password | 登录消息代理的密码,可选 |
spring.artemis.host=localhost spring.artemis.port=61616最后还需要在系统里安装Apache ActiveMQ Artemis,安装步骤如下:
- 到Apache官网下载bin.zip文件。解压到任意一个目录,这个目录是base目录。
- 需要配置好JDK_HOME,CLASSPATH等路径
- 创建一个新的目录,不要和Artemis的安装目录一样,这个目录叫做instance目录
- 到base目录的/bin目录下边,进入windows power shell,输入以下命令:
./artemis create D:\Software\artemis --home D:\Software\apache-artemis-2.7.0 --nio --no-mqtt-acceptor --password 123 --user mq --verbose --no-hornetq-acceptor --no-amqp-acceptor --autocreate
这其中的红色部分是instance目录,金色部分是base目录。 - 会询问是否运行匿名访问,选Yes即可,如果选No,需要用户名和密码。
- 之后会自动计算5次系统的超时时间,然后自动设置。
- 然后需要到实例目录下去运行Artemis,有如下几种方式:
- 实例目录下的bin目录中,可以看到有artemis.cmd文件,在命令行下输入
artemis run
,看到启动信息成功即可 - 同样在bin目录下边,运行
artemis-service.exe install
,将其安装为系统服务,再运行artemis-service.exe start
即可在后台启动,无需每次手动启动。
- 实例目录下的bin目录中,可以看到有artemis.cmd文件,在命令行下输入
- 启动成功之后,Artemis的控制台是http://localhost:8161/console,REST API是http://localhost:8161/console/jolokia
- 启动后的默认端口是61616。在Spring中要设置好。
JmsTemplate 发送消息
方法有如下几种:- 发送原始消息
void send(MessageCreator messageCreator) throws JmsException;
void send(Destination destination, MessageCreator messageCreator) throws JmsException;
void send(String destinationName, MessageCreator messageCreator) throws JmsException;
- 发送对象,对象在后台转为消息
void convertAndSend(Object message) throws JmsException;
void convertAndSend(Destination destination, Object message) throws JmsException;
void convertAndSend(String destinationName, Object message) throws JmsException;
- 用预处理自定义消息并发送
void convertAndSend(Object message, MessagePostProcessor postProcessor) throws JmsException;
void convertAndSend(Destination destination, Object message, MessagePostProcessor postProcessor) throws JmsException;
void convertAndSend(String destinationName, Object message, MessagePostProcessor postProcessor) throws JmsException;
来看一个发送原始消息的例子:
package cc.conyli.restlearn.service; import cc.conyli.restlearn.entity.Student; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; @Service public class JmsMessageService implements JmsService { private JmsTemplate jmsTemplate; @Autowired public JmsMessageService(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } @Override public void sendStudentMessage(Student student) { jmsTemplate.send(new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createObjectMessage(student); } }); } }这里实际上有一个自行编写的服务接口JmsService,其中的方法是public void sendStudentMessage(Student student),接口的代码省略。 send方法发送的消息实际上是从student对象构建而来,所以需要传入一个匿名对象,并且重写其中的方法,使用
javax.jms.Session
的createObjectMessage(student)
方法来创建消息。
这里传入student对象后,可以发现其要求对象必须可序列化,所以还要给Student类加上实现Serializable接口。
由于是传入匿名对象,很自然的想到可以用lambda表达式改写:
@Override public void sendStudentMessage(Student student) { jmsTemplate.send(session -> session.createObjectMessage(student)); }如果此时运行这个方法,会发送到哪里呢,需要在配置文件中指定一个默认发送的地址:
spring.jms.template.default-destination=localhost:61616当然也可以自己指定地址,可以使用字符串形式的地址,也可以组装一个Destination对象的Bean,然后注入给我们的服务类。 修改后的类如下:
package cc.conyli.restlearn.service; import cc.conyli.restlearn.entity.Student; import org.apache.activemq.artemis.jms.client.ActiveMQQueue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Service; import javax.jms.Destination; @Service public class JmsMessageService implements JmsService { private JmsTemplate jmsTemplate; private Destination destination; @Autowired public JmsMessageService(JmsTemplate jmsTemplate, Destination destination) { this.jmsTemplate = jmsTemplate; this.destination = destination; } @Override public void sendStudentMessage(Student student) { jmsTemplate.send(destination, session -> session.createObjectMessage(student)); } }注意,Destination Bean要定义在其他的配置类中,否则编译会自己引用自己,无法通过:
@Bean public Destination studentQueue() { return new ActiveMQQueue("localhost:61616"); }
发送原始消息需要自己指定处理器,对要发送的对象进行处理以后才能发送,为此还要传入匿名代码,比较麻烦。如果可以直接发送对象,让消息转换在后台完成,就会简单一些。来看看基于对象的消息发送:
@Override public void sendStudentObejct(Student student) { jmsTemplate.convertAndSend(destination, student); }相比原来的方法简单不少,当然这里也需要在接口里再定义一个方法。使用这个方法,在后台student对象会被MessageConverter的实现类转换成一个消息对象再发送。
MessageConverter
是一个只有两个抽象方法的接口,一个方法用来将对象转换成消息,一个用来将消息转换成对象。
public interface MessageConverter { Message toMessage(Object object, Session session) throws JMSException, MessageConversionException; Object fromMessage(Message message) }Spring针对这个消息转换类已经提供了一系列的实现,所有的实现都位于
org.springframework.jms.support.converter
包里:
Message converter | 如何转换 |
---|---|
MappingJackson2MessageConverter | 使用Jackson 2 JSON库将对象转换成JSON字符串 |
MarshallingMessageConverter | 使用JAXB将消息转换成XML |
MessagingMessageConverter | 额,没怎么看懂,好像是使用一种映射来进行来回转换 |
SimpleMessageConverter | 将字符串转换成TestMessage对象,二进制序列转换成BytesMessage,Map类型转换成MapMessage,对象则进行序列化。 |
SimpleMessageConverter
是默认的转换器,需要对象实现Serializable接口。如果要使用其他的转换器,需要将其声明为一个Bean:
@Bean public MappingJackson2MessageConverter messageConverter() { MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter(); messageConverter.setTypeIdPropertyName("_typeId"); return messageConverter; }这里在返回Bean之前,调用了
setTypeIdPropertyName("_typeId")
方法,这非常重要。如果不调用该方法,这个方法仅接受一个类的全路径,这样导致接收消息的一方也必须使用全路径。
为了灵活性更高,可以调用setTypeIdMappings()
方法,设置一个自定义的名称与实际的类进行对应:
@Bean public MappingJackson2MessageConverter messageConverter() { MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter(); messageConverter.setTypeIdPropertyName("_typeId"); Map<String, Class<?>> typeIdMappings = new HashMap<>(); typeIdMappings.put("student", Student.class); messageConverter.setTypeIdMappings(typeIdMappings); return messageConverter; }这个MessageConverter实际上是为了接收信息而配置的,需要写在其他的配置类里。很可能接收端里有另外一个Student的实现类,也能与消息的内容相匹配,但是可能还有其他不同的代码,用这种方式就很容易指定具体的类。
现在又面临着新的需求,在每次发送一个Student消息的时候,我希望加上一个学生的额外信息A,这个A仅仅只用在让接收端进行一些处理。为了这个需求,去更改Student类和数据库没有意义,这个时候就可以对Student对象进行处理。 实际上就是给消息加上一个头部数据,用于传递这个额外信息,如果使用直接发送消息,则可以使用
setStringProperty
给消息添加一个键值对:
@Override public void sendStudentMessage(Student student) { jmsTemplate.send(destination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { Message message = session.createObjectMessage(student); message.setStringProperty("ExtraInfo", "123"); return message; } }); }也可以写成lambda形式:
@Override public void sendStudentMessage(Student student) { jmsTemplate.send(destination, session -> { Message message = session.createObjectMessage(student); message.setStringProperty("ExtraInfo", "123"); return message;}); }此时的消息除了Student对象信息之外,还带着一个ExtraInfo=123的信息。 对于convertAndSend,则需要传入第三个参数,也就是处理器,使用起来和send方法传匿名转换对象一样:
@Override public void sendStudentObejct(Student student) { jmsTemplate.convertAndSend(destination, student, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws JMSException { message.setStringProperty("ExtraInfo", "123"); return message; } }); }也可以写成lambda形式:
@Override public void sendStudentObejct(Student student) { jmsTemplate.convertAndSend(destination, student, message -> {message.setStringProperty("ExtraInfo","123"); return message;} ); }这个处理器的
postProcessMessage(Message message)
方法接收消息,然后返回消息,在方法体内可以对象消息进行任意处理。
由于匿名处理器每次也要硬编码,如果有多个处理器方法的话,还可以采用this::process名称的方式传入不同的处理器,万变不离其宗。
JmsTemplate 接收消息
在接收信息的时候,有两种模式:- pull model -- 代码等候一个消息,直到消息抵达,实际上是同步方式
- push model -- 消息可用的时候会到达代码中,实际上是异步方式
同步模式 - 使用 JmsTemplate 接收消息
一系列接收消息的方法有:- 返回消息对象
Message receive() throws JmsException;
Message receive(Destination destination) throws JmsException;
Message receive(String destinationName) throws JmsException;
- 返回实际类型的对象
Object receiveAndConvert() throws JmsException;
Object receiveAndConvert(Destination destination) throws JmsException;
Object receiveAndConvert(String destinationName) throws JmsException;
package cc.conyli.restlearn.service; import cc.conyli.restlearn.entity.Student; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.support.converter.MessageConverter; import org.springframework.stereotype.Service; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; @Service public class ReceiveMessageService implements JmsReceiveService { private JmsTemplate jmsTemplate; private MessageConverter messageConverter; private Destination destination; @Autowired public ReceiveMessageService(JmsTemplate jmsTemplate, MessageConverter messageConverter, Destination destination) { this.destination = destination; this.jmsTemplate = jmsTemplate; this.messageConverter = messageConverter; } @Override public Student receiveStudent() throws JMSException { Message message = jmsTemplate.receive(destination); return (Student) messageConverter.fromMessage(message); } }如果采用
receiveAndConvert
方法,就更简单一些:
@Override public Student receiveStudent() { return (Student) jmsTemplate.receiveAndConvert(destination); }这里编写了一个控制器,用两个按钮来控制。可以发现,点击接受的时候线程阻塞住,浏览器没有反应。这个时候另外开一个页面点击发送消息,可以发现发送之后,另外一个在监听的也收到了消息。 不过如果先发送消息,不去接收的话,这个消息就会被抛掉,需要push方式去监听才可以。 后来试验了一下,将Destination设置为
public Destination studentQueue() { return new
ActiveMQQueue("192.168.0.234:61616");}
,使用linux服务器安装的Artimes,也是一样的,现在来看看异步监听。
异步 - 使用 监听器
对于异步的方法,需要使用@JmsListener
修饰方法,可以将接收消息的方法变成被动监听,新创建一个监听消息并打印的类:
package cc.conyli.restlearn.controller; import cc.conyli.restlearn.entity.Student; import org.springframework.stereotype.Component; @Component public class JmsListener { @org.springframework.jms.annotation.JmsListener(destination = "192.168.100.100:61616") public void receiveStudent(Student student) { System.out.println(student); } }这个方法需要将要接受的对象当做参数传入,然后使用
@JmsListener
修饰,重新启动项目之后,只要给Artimes发送一条消息,就会自动接收,然后在控制台打印出来。
实际上,有了这个方法之后,就可以在后台进行任何工作,这样一个程序给另外一个程序发消息,另外一个程序接受到消息之后,就可以进行工作了。这就是异步的好处,可以让一个程序集中处理一些事情,将一些开销大,需要后边慢慢执行的任务,通过消息的方式交给机器上的其他程序或者其他的机器去处理。
Rabbit MQ
AMQP最著名的实现就是RabbitMQ了。相比JMS,兔子MQ提供了更高级的消息路由系统。JMS只是一个地址的指定队列,而兔子可以在一个地址提供不同的队列,消息在生产者放入兔子的时候,可以携带exchange
key
(可以理解为交换机)和routing key
(相当于队列号)。这样不同的订阅者可以收到不同的消息,比JMS的灵活性大大提高。
生产者在将消息发送给Exchange的时候,一般会指定一个routing key(当然也可以不指定),来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding
key联合使用才能最终生效。
在Exchange Type与binding key固定的情况下,我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。RabbitMQ为routing key设定的长度限制为255
bytes。
在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing
key相匹配时,消息将会被路由到对应的Queue中。在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding key。binding key
并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。
兔子MQ有几种Exchange Type:
Default
,消息代理自动进行交换,所有队列会被绑定到默认的交换机,然后把消息发到与消息的routing key相同的队列里Direct
,把消息路由到消息的routing key和binding key相同的队列Topic
,把一个消息同时路由到多个binding key 和消息的routing key相等的队列Fanout
,把消息路由到所有的队列,不管两个key的内容Headers
,与Topic模式很像,唯一不同的是基于消息Header的内容,而不是keyDead Letter
,所有无处可去的消息的集合,无处可去指的是key都不匹配的消息
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>有了依赖之后,Spring就会自动生成RaabitTemplate的Bean。当然,也会有属性可以配置:
属性 | 解释 |
---|---|
spring.rabbit.addresses | 一个兔子MQ代理的地址列表 |
spring.rabbit.host | 主机名称,默认是localhost |
spring.rabbit.port | 端口,默认是5672 |
spring.rabbit.port.usrname | 可选的用户名 |
spring.rabbit.port.password | 可选的密码 |
- 发送原始消息
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message)throws AmqpException;
- 发送对象消息
void convertAndSend(Object message) throws AmqpException;
void convertAndSend(String routingKey, Object message) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
- 发送带有预处理过的消息
void convertAndSend(Object message, MessagePostProcessor mPP) throws AmqpException;
void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException
先来发送原始消息。JMS需要使用一个匿名类或者lambda传入一个转换器,而RabbitTemplate自带了一个转换器,比较方便:
package cc.conyli.restlearn.service; import cc.conyli.restlearn.entity.Student; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class RabbitMessageService implements JmsService { private RabbitTemplate rabbitTemplate; @Autowired public RabbitMessageService(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } @Override public void sendStudentMessage(Student student) { MessageConverter messageConverter = rabbitTemplate.getMessageConverter(); MessageProperties properties = new MessageProperties(); Message message = messageConverter.toMessage(student, properties); rabbitTemplate.send("conyli.cc", message); } @Override public void sendStudentObejct(Student student) { } }这里就无须传入转换器,直接先获取MessageProperties对象,相当于JMS的设置自定义的信息,然后使用转换器将数据对象和属性一起转换成消息。最后发送消息,由于发送的地址已经写在配置文件中,所以前边的"conyli.cc"是routing key。由于没有出传入exchange类型,所以会使用默认的exchange。 说到默认的exchange,其实默认的exchange名称是一个"",也就是一个空字符串。默认的routing key也是一个空字符串。可以在配置文件中设置默认的这两个内容:
spring.rabbitmq.template.exchange=central spring.rabbitmq.template.routing-key=conyli.cc这么设置的话,如果不传入exchange,就会使用conyli.cc当做routing key,用central当做exchange的名称。 使用convertAndSend方法就更简单了:
@Override public void sendStudentObejct(Student student) { rabbitTemplate.convertAndSend(student); }
发送和接受的时候像JMS一样,依然需要配置转换器,兔子MQ的转换器类型比JMS多一些:
Message converter | 如何转换 |
---|---|
Jackson2JsonMessageConverter | 使用Jackson 2 JSON库将对象转换成JSON字符串 |
MarshallingMessageConverter | 使用Spring Marshaller转换 |
SerializerMessageConverter | 序列化转换,和JMS一样要求实现序列化接口 |
SimpleMessageConverter | 将字符串转换成TestMessage对象,二进制序列转换成BytesMessage,Map类型转换成MapMessage,对象则进行序列化。 |
ContentTypeDelegatingMessageConverter | 代理给另外一个基于contentType header信息的转换器 |
MessagingMessageConverter | 把消息代理给其他的MessageConverter,但是把header信息代理给AmqpHeaderConverter |
@Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter() }注意这里的MessageConverter来自于包
org.springframework.amqp.support.converter
,都是AMQP相关的包,不要弄错。
Spring会自动发现这个Bean,然后将其注入RabbitTemplate中作为转换器。在获取消息的时候会用到。
也可以传入自定义转换器参数给convertAndSend方法,这个时候由于不能直接获得转换器,就和JMS的比较像,需要传入匿名对象了:
@Override public void sendStudentObejct(Student student) { rabbitTemplate.convertAndSend("conyli.cc", student, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { MessageProperties properties = message.getMessageProperties(); properties.setHeader("ExtraInfo", "123"); return message; } }); }
接受消息也有两种模式,使用RabbitTemplate的pull模式和使用
@RabbitListener
的push模式。
先来看RabbitTemplate的receive()方法。
这个也有一堆方法,核心是队列的名称和过期时间,可以查看原书的199页。
注意其中的T receiveAndConvert(ParameterizedTypeReference<T> type)
这类方法,只能用于JSON转换器,其他对象只能用上边的几个方法。
SIA5原书这里只是用到了一个队列,并没有区分RabbitMQ的各个不同队列。关于这方面内容还需要深入研究。
编写一个接收类:
package cc.conyli.restlearn.service; import cc.conyli.restlearn.entity.Student; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.jms.JMSException; @Service public class RabbitReceiveService implements JmsReceiveService { private RabbitTemplate rabbitTemplate; private MessageConverter messageConverter; @Autowired public RabbitReceiveService(RabbitTemplate rabbitTemplate, MessageConverter messageConverter) { this.rabbitTemplate = rabbitTemplate; this.messageConverter = messageConverter; } @Override public Student receiveStudent() throws JMSException { Message message = rabbitTemplate.receive("conyli.cc", 3000); return message == null ? (Student) messageConverter.fromMessage(message) : null; } }这里传入了队列名称,还有过期时间,以毫秒计算。默认的过期时间也可以配置:
spring.rabbitmq.template.receive-timeout=3000如果使用receiveAndConvert,可以更加简化:
@Override public Student receiveStudent() throws JMSException { return (Student) rabbitTemplate.receiveAndConvert("conyli.cc"); }如果是JSON转换器,还可以使用类型安全的方法:
@Override public Student receiveStudent() throws JMSException { return rabbitTemplate.receiveAndConvert("conyli.cc", new ParameterizedTypeReference<Student>() { }); }使用这种方法,转换器必须是一个SmartMessageConverter的实现,Jackson2JsonMessageConverter就是一个实现。 在编写了获取之后,一开始总是报错找不到队列,后来到RabbitMQ的控制台里,自己新建了叫做sia5的控制台,然后在发送的时候使用sia5作为routing key,在接收的时候也使用sia5作为队列名称,就可以了。 我的机器的RabbitMQ里还有原来用Django时候留下来的celery queue,发现用这个queue不行,会报错。使用sia5新建的队列倒是没问题。 而且这个比JMS好的地方是,放到队列里的东西,可以不立刻取,比如先发送三次,就可以获取三次,获取第四次的时候就会阻塞然后三秒钟后获得null对象。
push方法的使用和JMS几乎一样,需要指定队列名称即可:
@RabbitListener(queues = {"sia5"}) public void receiveRabbitStudent(Student student) { System.out.println(student); }queues可以是一个字符串表示单个队列名,也可以是一个字符串数组。 第三个消息队列Kafka暂时先不看,相信有了前边两个的学习经验,最后一个到时候看文档和看书也能搞定。 总结一下,如果说REST服务和消费是比较直观的方式,那么通过消息代理两个程序之间互相通信是比较间接的方式,然而因为不像HTTP请求的时效性那么强,异步的消息队列处理尤其适合将重型任务分解。 相比JMS,应该还是使用RabbitMQ更加棒,今后尽量使用RabbitMQ来进行编写程序,比如编写两个运行在不同服务器上的程序,通过消息代理来沟通,确实很不错。