Spring in Action 5 Self-learning notes 7 - 异步消息发送

Spring in Action 5 Self-learning notes 7 - 异步消息发送

依稀记得上一次使用消息的时候,还是在翻译Django 2 by Example的时候使用RabbitMQ异步发送电子邮件。 所谓异步消息,就是发送了之后并不要求立刻获得响应。SIA5 第八章的例子是订单发送到另外一个Taco Kitchen程序,用于制作实际的Taco。 这里使用到Spring 提供

依稀记得上一次使用消息的时候,还是在翻译Django 2 by Example的时候使用RabbitMQ异步发送电子邮件。 所谓异步消息,就是发送了之后并不要求立刻获得响应。SIA5 第八章的例子是订单发送到另外一个Taco Kitchen程序,用于制作实际的Taco。 这里使用到Spring 提供的三种方式:
  1. JMS Java Message Service
  2. RabbitMQ和AMQP
  3. Apache Kafka
看来也需要另外编写一个项目,运行在其他的端口,两个项目之间来发送消息了。

JMS

JMS最早于2001年引入,规范了异步消息的接口,Spring通过JmsTemplate对JMS提供支持,Spring还提供了事件驱动的POJO——可以根据队列或者订阅服务的消息进行响应的对象。

配置JMS

JMS需要一个消息代理,可以在Apache ActiveMQ或者更新的Apache ActiveMQ Artemis之间选择,我们选择Apache ActiveMQ Artemis,添加依赖:
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-artemis</artifactId>
</dependency>
Apache ActiveMQ Artemis是Apache ActiveMQ的升级+重新实现版本,但选用哪个对实际编写程序没有什么影响,在配置方面略有不同。 Spring默认将Artemis消息代理运行在61616端口,但是在生产环境下还需要几个配置:
属性 解释
spring.artemis.host 消息代理主机地址
spring.artemis.port 消息代理的端口
spring.artemis.user 登录消息代理的用户名,可选
spring.artemis.password 登录消息代理的密码,可选
如果是ActiveMQ的话略有不同,可以查阅原书181页,要注意这个是tcp连接,不是http连接。 在配置文件中配置地址和端口:
spring.artemis.host=localhost
spring.artemis.port=61616
最后还需要在系统里安装Apache ActiveMQ Artemis,安装步骤如下:
  1. Apache官网下载bin.zip文件。解压到任意一个目录,这个目录是base目录。
  2. 需要配置好JDK_HOME,CLASSPATH等路径
  3. 创建一个新的目录,不要和Artemis的安装目录一样,这个目录叫做instance目录
  4. 到base目录的/bin目录下边,进入windows power shell,输入以下命令:
    ./artemis create D:\Software\artemis --home D:\Software\apache-artemis-2.7.0 --nio  --no-mqtt-acceptor --password 123 --user mq --verbose --no-hornetq-acceptor --no-amqp-acceptor --autocreate
    
    这其中的红色部分是instance目录,金色部分是base目录。
  5. 会询问是否运行匿名访问,选Yes即可,如果选No,需要用户名和密码。
  6. 之后会自动计算5次系统的超时时间,然后自动设置。
  7. 然后需要到实例目录下去运行Artemis,有如下几种方式:
    1. 实例目录下的bin目录中,可以看到有artemis.cmd文件,在命令行下输入artemis run,看到启动信息成功即可
    2. 同样在bin目录下边,运行artemis-service.exe install,将其安装为系统服务,再运行artemis-service.exe start即可在后台启动,无需每次手动启动。
  8. 启动成功之后,Artemis的控制台是http://localhost:8161/console,REST API是http://localhost:8161/console/jolokia
  9. 启动后的默认端口是61616。在Spring中要设置好。

JmsTemplate 发送消息

方法有如下几种:
  1. 发送原始消息
    1. void send(MessageCreator messageCreator) throws JmsException;
    2. void send(Destination destination, MessageCreator messageCreator) throws JmsException;
    3. void send(String destinationName, MessageCreator messageCreator) throws JmsException;
  2. 发送对象,对象在后台转为消息
    1. void convertAndSend(Object message) throws JmsException;
    2. void convertAndSend(Destination destination, Object message) throws JmsException;
    3. void convertAndSend(String destinationName, Object message) throws JmsException;
  3. 用预处理自定义消息并发送
    1. void convertAndSend(Object message, MessagePostProcessor postProcessor) throws JmsException;
    2. void convertAndSend(Destination destination, Object message, MessagePostProcessor postProcessor) throws JmsException;
    3. void convertAndSend(String destinationName, Object message, MessagePostProcessor postProcessor) throws JmsException;
虽然9个方法,但实际上很好记。发送原始消息就是send方法,发送对象消息一律是convertAndSend方法。发送原始消息需要创建器,发送对象消息需要对象。地址可以是字符串也可以是地址对象,在最后加上预处理器即可。
来看一个发送原始消息的例子:
package cc.conyli.restlearn.service;

import cc.conyli.restlearn.entity.Student;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

@Service
public class JmsMessageService implements JmsService {

    private JmsTemplate jmsTemplate;

    @Autowired
    public JmsMessageService(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    @Override
    public void sendStudentMessage(Student student) {
        jmsTemplate.send(new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createObjectMessage(student);
            }
        });
    }
}
这里实际上有一个自行编写的服务接口JmsService,其中的方法是public void sendStudentMessage(Student student),接口的代码省略。 send方法发送的消息实际上是从student对象构建而来,所以需要传入一个匿名对象,并且重写其中的方法,使用javax.jms.SessioncreateObjectMessage(student)方法来创建消息。 这里传入student对象后,可以发现其要求对象必须可序列化,所以还要给Student类加上实现Serializable接口。 由于是传入匿名对象,很自然的想到可以用lambda表达式改写:
@Override
public void sendStudentMessage(Student student) {
    jmsTemplate.send(session -> session.createObjectMessage(student));
}
如果此时运行这个方法,会发送到哪里呢,需要在配置文件中指定一个默认发送的地址:
spring.jms.template.default-destination=localhost:61616
当然也可以自己指定地址,可以使用字符串形式的地址,也可以组装一个Destination对象的Bean,然后注入给我们的服务类。 修改后的类如下:
package cc.conyli.restlearn.service;

import cc.conyli.restlearn.entity.Student;
import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

import javax.jms.Destination;

@Service
public class JmsMessageService implements JmsService {

    private JmsTemplate jmsTemplate;
    private Destination destination;

    @Autowired
    public JmsMessageService(JmsTemplate jmsTemplate, Destination destination) {
        this.jmsTemplate = jmsTemplate;
        this.destination = destination;
    }

    @Override
    public void sendStudentMessage(Student student) {
        jmsTemplate.send(destination, session -> session.createObjectMessage(student));
    }

}

注意,Destination Bean要定义在其他的配置类中,否则编译会自己引用自己,无法通过:
@Bean
public Destination studentQueue() {
    return new ActiveMQQueue("localhost:61616");
}

发送原始消息需要自己指定处理器,对要发送的对象进行处理以后才能发送,为此还要传入匿名代码,比较麻烦。如果可以直接发送对象,让消息转换在后台完成,就会简单一些。来看看基于对象的消息发送:
@Override
public void sendStudentObejct(Student student) {
    jmsTemplate.convertAndSend(destination, student);
}
相比原来的方法简单不少,当然这里也需要在接口里再定义一个方法。使用这个方法,在后台student对象会被MessageConverter的实现类转换成一个消息对象再发送。 MessageConverter是一个只有两个抽象方法的接口,一个方法用来将对象转换成消息,一个用来将消息转换成对象。
public interface MessageConverter {
    Message toMessage(Object object, Session session) throws JMSException, MessageConversionException;

    Object fromMessage(Message message)
}
Spring针对这个消息转换类已经提供了一系列的实现,所有的实现都位于org.springframework.jms.support.converter包里:
Message converter 如何转换
MappingJackson2MessageConverter 使用Jackson 2 JSON库将对象转换成JSON字符串
MarshallingMessageConverter 使用JAXB将消息转换成XML
MessagingMessageConverter 额,没怎么看懂,好像是使用一种映射来进行来回转换
SimpleMessageConverter 将字符串转换成TestMessage对象,二进制序列转换成BytesMessage,Map类型转换成MapMessage,对象则进行序列化。
SimpleMessageConverter是默认的转换器,需要对象实现Serializable接口。如果要使用其他的转换器,需要将其声明为一个Bean:
@Bean
public MappingJackson2MessageConverter messageConverter() {
    MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
    messageConverter.setTypeIdPropertyName("_typeId");
    return messageConverter;
}
这里在返回Bean之前,调用了setTypeIdPropertyName("_typeId")方法,这非常重要。如果不调用该方法,这个方法仅接受一个类的全路径,这样导致接收消息的一方也必须使用全路径。 为了灵活性更高,可以调用setTypeIdMappings()方法,设置一个自定义的名称与实际的类进行对应:
@Bean
public MappingJackson2MessageConverter messageConverter() {
    MappingJackson2MessageConverter messageConverter = new MappingJackson2MessageConverter();
    messageConverter.setTypeIdPropertyName("_typeId");

    Map<String, Class<?>> typeIdMappings = new HashMap<>();
    typeIdMappings.put("student", Student.class);
    messageConverter.setTypeIdMappings(typeIdMappings);

    return messageConverter;
}
这个MessageConverter实际上是为了接收信息而配置的,需要写在其他的配置类里。很可能接收端里有另外一个Student的实现类,也能与消息的内容相匹配,但是可能还有其他不同的代码,用这种方式就很容易指定具体的类。
现在又面临着新的需求,在每次发送一个Student消息的时候,我希望加上一个学生的额外信息A,这个A仅仅只用在让接收端进行一些处理。为了这个需求,去更改Student类和数据库没有意义,这个时候就可以对Student对象进行处理。 实际上就是给消息加上一个头部数据,用于传递这个额外信息,如果使用直接发送消息,则可以使用setStringProperty给消息添加一个键值对:
@Override
public void sendStudentMessage(Student student) {
    jmsTemplate.send(destination, new MessageCreator() {
        @Override
        public Message createMessage(Session session) throws JMSException {
            Message message = session.createObjectMessage(student);
            message.setStringProperty("ExtraInfo", "123");
            return message;
        }
    });
}
也可以写成lambda形式:
@Override
public void sendStudentMessage(Student student) {
    jmsTemplate.send(destination, session -> {
        Message message = session.createObjectMessage(student);
        message.setStringProperty("ExtraInfo", "123");
        return message;});
}
此时的消息除了Student对象信息之外,还带着一个ExtraInfo=123的信息。 对于convertAndSend,则需要传入第三个参数,也就是处理器,使用起来和send方法传匿名转换对象一样:
@Override
public void sendStudentObejct(Student student) {
    jmsTemplate.convertAndSend(destination, student, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws JMSException {
            message.setStringProperty("ExtraInfo", "123");
            return message;
        }
    });
}
也可以写成lambda形式:
@Override
public void sendStudentObejct(Student student) {
    jmsTemplate.convertAndSend(destination, student, message -> {message.setStringProperty("ExtraInfo","123");
        return message;} );
}
这个处理器的postProcessMessage(Message message)方法接收消息,然后返回消息,在方法体内可以对象消息进行任意处理。 由于匿名处理器每次也要硬编码,如果有多个处理器方法的话,还可以采用this::process名称的方式传入不同的处理器,万变不离其宗。

JmsTemplate 接收消息

在接收信息的时候,有两种模式:
  1. pull model -- 代码等候一个消息,直到消息抵达,实际上是同步方式
  2. push model -- 消息可用的时候会到达代码中,实际上是异步方式
JmsTemplate提供了很多接收消息的方法,但全部都是pull model,也就是同步的,如果接受不到消息,线程将会阻塞。我们也可以来使用push model,一个消息监听器会在消息可用的时候通知我们。 同步模式和异步模式有着不同的用途,但一般情况下,都应该使用异步模式。

同步模式 - 使用 JmsTemplate 接收消息

一系列接收消息的方法有:
  1. 返回消息对象
    1. Message receive() throws JmsException;
    2. Message receive(Destination destination) throws JmsException;
    3. Message receive(String destinationName) throws JmsException;
  2. 返回实际类型的对象
    1. Object receiveAndConvert() throws JmsException;
    2. Object receiveAndConvert(Destination destination) throws JmsException;
    3. Object receiveAndConvert(String destinationName) throws JmsException;
本质上就是两种方法,返回值不同,都需要地址或者使用默认地址。
package cc.conyli.restlearn.service;

import cc.conyli.restlearn.entity.Student;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.stereotype.Service;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;

@Service
public class ReceiveMessageService implements JmsReceiveService {

    private JmsTemplate jmsTemplate;

    private MessageConverter messageConverter;

    private Destination destination;


    @Autowired
    public ReceiveMessageService(JmsTemplate jmsTemplate, MessageConverter messageConverter, Destination destination) {
        this.destination = destination;
        this.jmsTemplate = jmsTemplate;
        this.messageConverter = messageConverter;
    }

    @Override
    public Student receiveStudent() throws JMSException {
        Message message = jmsTemplate.receive(destination);
        return (Student) messageConverter.fromMessage(message);
    }
}
如果采用receiveAndConvert方法,就更简单一些:
@Override
public Student receiveStudent() {
    return (Student) jmsTemplate.receiveAndConvert(destination);
}
这里编写了一个控制器,用两个按钮来控制。可以发现,点击接受的时候线程阻塞住,浏览器没有反应。这个时候另外开一个页面点击发送消息,可以发现发送之后,另外一个在监听的也收到了消息。 不过如果先发送消息,不去接收的话,这个消息就会被抛掉,需要push方式去监听才可以。 后来试验了一下,将Destination设置为public Destination studentQueue() { return new ActiveMQQueue("192.168.0.234:61616");},使用linux服务器安装的Artimes,也是一样的,现在来看看异步监听。

异步 - 使用 监听器

对于异步的方法,需要使用@JmsListener修饰方法,可以将接收消息的方法变成被动监听,新创建一个监听消息并打印的类:
package cc.conyli.restlearn.controller;

import cc.conyli.restlearn.entity.Student;
import org.springframework.stereotype.Component;


@Component
public class JmsListener {

    @org.springframework.jms.annotation.JmsListener(destination = "192.168.100.100:61616")
    public void receiveStudent(Student student) {
        System.out.println(student);
    }
}
这个方法需要将要接受的对象当做参数传入,然后使用@JmsListener修饰,重新启动项目之后,只要给Artimes发送一条消息,就会自动接收,然后在控制台打印出来。 实际上,有了这个方法之后,就可以在后台进行任何工作,这样一个程序给另外一个程序发消息,另外一个程序接受到消息之后,就可以进行工作了。这就是异步的好处,可以让一个程序集中处理一些事情,将一些开销大,需要后边慢慢执行的任务,通过消息的方式交给机器上的其他程序或者其他的机器去处理。

Rabbit MQ

AMQP最著名的实现就是RabbitMQ了。相比JMS,兔子MQ提供了更高级的消息路由系统。JMS只是一个地址的指定队列,而兔子可以在一个地址提供不同的队列,消息在生产者放入兔子的时候,可以携带exchange key(可以理解为交换机)和routing key(相当于队列号)。这样不同的订阅者可以收到不同的消息,比JMS的灵活性大大提高。 生产者在将消息发送给Exchange的时候,一般会指定一个routing key(当然也可以不指定),来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。 在Exchange Type与binding key固定的情况下,我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。RabbitMQ为routing key设定的长度限制为255 bytes。 在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key;消费者将消息发送给Exchange时,一般会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。在绑定多个Queue到同一个Exchange的时候,这些Binding允许使用相同的binding key。binding key 并不是在所有情况下都生效,它依赖于Exchange Type,比如fanout类型的Exchange就会无视binding key,而是将消息路由到所有绑定到该Exchange的Queue。 兔子MQ有几种Exchange Type:
  1. Default,消息代理自动进行交换,所有队列会被绑定到默认的交换机,然后把消息发到与消息的routing key相同的队列里
  2. Direct,把消息路由到消息的routing key和binding key相同的队列
  3. Topic,把一个消息同时路由到多个binding key 和消息的routing key相等的队列
  4. Fanout,把消息路由到所有的队列,不管两个key的内容
  5. Headers,与Topic模式很像,唯一不同的是基于消息Header的内容,而不是key
  6. Dead Letter,所有无处可去的消息的集合,无处可去指的是key都不匹配的消息
这么多名词有点晕,实际上就可以认为兔子先指定一个路由模式,然后路由模式使用binding key规则绑定不同的队列,到实际消息进来的时候,看消息的routing key来决定消息到底被路由到哪个队列,不同的队列有不同的监听者。因此比起JMS,兔子MQ实际上是有了消息分类的概念。通过一个代理就可以输出多条消息,而JMS如果想输出多条消息,只能新开多个不同的JMS运行时实例。 由于兔子MQ是高级消息队列协议AMQP的实现,因此需要添加依赖:
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
有了依赖之后,Spring就会自动生成RaabitTemplate的Bean。当然,也会有属性可以配置:
属性 解释
spring.rabbit.addresses 一个兔子MQ代理的地址列表
spring.rabbit.host 主机名称,默认是localhost
spring.rabbit.port 端口,默认是5672
spring.rabbit.port.usrname 可选的用户名
spring.rabbit.port.password 可选的密码
RabbitTemplate的方法与Jms有点像,也有一个发送原始消息和发送对象消息的方法,但是不再是Destination对象,而是exchang和routing key,都是字符串类型的参数:
  1. 发送原始消息
    1. void send(Message message) throws AmqpException;
    2. void send(String routingKey, Message message) throws AmqpException;
    3. void send(String exchange, String routingKey, Message message)throws AmqpException;
  2. 发送对象消息
    1. void convertAndSend(Object message) throws AmqpException;
    2. void convertAndSend(String routingKey, Object message) throws AmqpException;
    3. void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
  3. 发送带有预处理过的消息
    1. void convertAndSend(Object message, MessagePostProcessor mPP) throws AmqpException;
    2. void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
    3. void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException
这些方法很相似。来看一下具体使用。
先来发送原始消息。JMS需要使用一个匿名类或者lambda传入一个转换器,而RabbitTemplate自带了一个转换器,比较方便:
package cc.conyli.restlearn.service;

import cc.conyli.restlearn.entity.Student;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class RabbitMessageService implements JmsService {

    private RabbitTemplate rabbitTemplate;

    @Autowired
    public RabbitMessageService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }



    @Override
    public void sendStudentMessage(Student student) {
        MessageConverter messageConverter = rabbitTemplate.getMessageConverter();
        MessageProperties properties = new MessageProperties();
        Message message = messageConverter.toMessage(student, properties);
        rabbitTemplate.send("conyli.cc", message);
    }

    @Override
    public void sendStudentObejct(Student student) {

    }
}
这里就无须传入转换器,直接先获取MessageProperties对象,相当于JMS的设置自定义的信息,然后使用转换器将数据对象和属性一起转换成消息。最后发送消息,由于发送的地址已经写在配置文件中,所以前边的"conyli.cc"是routing key。由于没有出传入exchange类型,所以会使用默认的exchange。 说到默认的exchange,其实默认的exchange名称是一个"",也就是一个空字符串。默认的routing key也是一个空字符串。可以在配置文件中设置默认的这两个内容:
spring.rabbitmq.template.exchange=central
spring.rabbitmq.template.routing-key=conyli.cc
这么设置的话,如果不传入exchange,就会使用conyli.cc当做routing key,用central当做exchange的名称。 使用convertAndSend方法就更简单了:
@Override
public void sendStudentObejct(Student student) {
    rabbitTemplate.convertAndSend(student);
}

发送和接受的时候像JMS一样,依然需要配置转换器,兔子MQ的转换器类型比JMS多一些:
Message converter 如何转换
Jackson2JsonMessageConverter 使用Jackson 2 JSON库将对象转换成JSON字符串
MarshallingMessageConverter 使用Spring Marshaller转换
SerializerMessageConverter 序列化转换,和JMS一样要求实现序列化接口
SimpleMessageConverter 将字符串转换成TestMessage对象,二进制序列转换成BytesMessage,Map类型转换成MapMessage,对象则进行序列化。
ContentTypeDelegatingMessageConverter 代理给另外一个基于contentType header信息的转换器
MessagingMessageConverter 把消息代理给其他的MessageConverter,但是把header信息代理给AmqpHeaderConverter
需要自己定义一个转换器的Bean,例如JSON格式:
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter()
}
注意这里的MessageConverter来自于包org.springframework.amqp.support.converter,都是AMQP相关的包,不要弄错。 Spring会自动发现这个Bean,然后将其注入RabbitTemplate中作为转换器。在获取消息的时候会用到。 也可以传入自定义转换器参数给convertAndSend方法,这个时候由于不能直接获得转换器,就和JMS的比较像,需要传入匿名对象了:
@Override
public void sendStudentObejct(Student student) {
    rabbitTemplate.convertAndSend("conyli.cc", student, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            MessageProperties properties = message.getMessageProperties();
            properties.setHeader("ExtraInfo", "123");
            return message;
        }
    });
}

接受消息也有两种模式,使用RabbitTemplate的pull模式和使用@RabbitListener的push模式。 先来看RabbitTemplate的receive()方法。 这个也有一堆方法,核心是队列的名称和过期时间,可以查看原书的199页。 注意其中的T receiveAndConvert(ParameterizedTypeReference<T> type)这类方法,只能用于JSON转换器,其他对象只能用上边的几个方法。 SIA5原书这里只是用到了一个队列,并没有区分RabbitMQ的各个不同队列。关于这方面内容还需要深入研究。 编写一个接收类:
package cc.conyli.restlearn.service;

import cc.conyli.restlearn.entity.Student;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.jms.JMSException;

@Service
public class RabbitReceiveService implements JmsReceiveService {

    private RabbitTemplate rabbitTemplate;

    private MessageConverter messageConverter;

    @Autowired
    public RabbitReceiveService(RabbitTemplate rabbitTemplate, MessageConverter messageConverter) {
        this.rabbitTemplate = rabbitTemplate;
        this.messageConverter = messageConverter;
    }

    @Override
    public Student receiveStudent() throws JMSException {
        Message message = rabbitTemplate.receive("conyli.cc", 3000);
        return message == null ? (Student) messageConverter.fromMessage(message) : null;
    }
}
这里传入了队列名称,还有过期时间,以毫秒计算。默认的过期时间也可以配置:
spring.rabbitmq.template.receive-timeout=3000
如果使用receiveAndConvert,可以更加简化:
@Override
public Student receiveStudent() throws JMSException {
    return (Student) rabbitTemplate.receiveAndConvert("conyli.cc");
}
如果是JSON转换器,还可以使用类型安全的方法:
@Override
public Student receiveStudent() throws JMSException {
    return rabbitTemplate.receiveAndConvert("conyli.cc", new ParameterizedTypeReference<Student>() {
    });
}
使用这种方法,转换器必须是一个SmartMessageConverter的实现,Jackson2JsonMessageConverter就是一个实现。 在编写了获取之后,一开始总是报错找不到队列,后来到RabbitMQ的控制台里,自己新建了叫做sia5的控制台,然后在发送的时候使用sia5作为routing key,在接收的时候也使用sia5作为队列名称,就可以了。 我的机器的RabbitMQ里还有原来用Django时候留下来的celery queue,发现用这个queue不行,会报错。使用sia5新建的队列倒是没问题。 而且这个比JMS好的地方是,放到队列里的东西,可以不立刻取,比如先发送三次,就可以获取三次,获取第四次的时候就会阻塞然后三秒钟后获得null对象。
push方法的使用和JMS几乎一样,需要指定队列名称即可:
@RabbitListener(queues = {"sia5"})
public void receiveRabbitStudent(Student student) {
    System.out.println(student);
}
queues可以是一个字符串表示单个队列名,也可以是一个字符串数组。 第三个消息队列Kafka暂时先不看,相信有了前边两个的学习经验,最后一个到时候看文档和看书也能搞定。 总结一下,如果说REST服务和消费是比较直观的方式,那么通过消息代理两个程序之间互相通信是比较间接的方式,然而因为不像HTTP请求的时效性那么强,异步的消息队列处理尤其适合将重型任务分解。 相比JMS,应该还是使用RabbitMQ更加棒,今后尽量使用RabbitMQ来进行编写程序,比如编写两个运行在不同服务器上的程序,通过消息代理来沟通,确实很不错。
LICENSED UNDER CC BY-NC-SA 4.0
Comment