【译】关于RabbitMQ 在分布式微服务中的使用转载
RabbitMQ也称为开源消息代理,支持多种消息协议,常常部署在分布式系统上,同时也是一个轻量级可轻松部署应用程序,而且作为一个队列,它输入的消息可以优先被执行。
RabbitMQ 在许多操作系统和云环境上运行,并为大多数流行语言提供广泛的开发工具。
这是一种生产者-消费者风格的模式,生产者发送消息,消费者消费它。
RabbitMQ的主要特点如下:
-
异步消息
-
分布式部署
-
管理与监控
-
企业和云存储
安装
对于 RabbitMQ,您首先需要在系统中安装ErLang ,因为 RabbitMQ 程序是用 ErLang 编程语言编写的。在 ErLang 之后,您可以 按照那里的说明从其主页下载最新版本的RabbitMQ 。
RabbitMQ 在微服务中的使用
RabbitMQ 是在微服务架构中实现消息队列的最简单的免费可用选项之一。这些队列模式可以通过各种微服务之间的通信来帮助扩展您的应用程序。我们可以将这些队列用于各种目的,例如核心微服务之间的交互、微服务的解耦、实现故障转移机制以及通过消息代理发送电子邮件通知。
当两个或多个核心模块需要相互通信时,我们不应该直接进行 HTTP 调用,因为这样会使核心层紧耦合,当每个核心模块的实例较多时,将难以管理。此外,每当服务关闭时,HTTP 调用模式都会失败,因为在重新启动后,无法跟踪旧的 HTTP 请求调用。这导致需要 RabbitMQ。
在微服务中设置 RabbitMQ
在微服务架构的本次演示中,我们将通过各种核心微服务发送电子邮件通知的示例模式。在此模式中,我们将有一个生产者,即任何核心微服务,它将生成电子邮件内容并将其传递到队列。然后这个电子邮件内容被消费者获取,消费者总是在监听队列中的新消息。
请注意,我们正在为我们的微服务使用 Spring Boot,因此我们将为 Spring 提供配置。
1) Producer: 该层负责生成电子邮件内容并将此内容传递给 RabbitMQ 中的消息代理。
a) 在属性文件中,我们需要提及队列名称和交换类型以及安装 RabbitMQ 服务器的主机和端口。
queue.name=messagequeue
fanout.exchange=messagequeue-exchange
spring.rabbitmq.host: localhost
spring.rabbitmq.port: 5672
spring.rabbitmq.username: guest
spring.rabbitmq.password: guest
b) 我们需要创建一个配置类,它将使用队列名称和交换类型将队列绑定到微服务模块。
@Configuration
public class RabbitConfiguration {
@Value("${fanout.exchange}")
private String fanoutExchange;
@Value("${queue.name}")
private String queueName;
@Bean
Queue queue() {
return new Queue(queueName, true);
}
@Bean
FanoutExchange exchange() {
return new FanoutExchange(fanoutExchange);
}
@Bean
Binding binding(Queue queue, FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
}
c) 最后,我们需要一个 util 类,用于使用 Spring 框架提供的 RabbitTemplate 将实际的电子邮件内容发送到队列。
@Component
public class QueueProducer {
protected Logger logger = LoggerFactory.getLogger(getClass());
@Value("${fanout.exchange}")
private String fanoutExchange;
private final RabbitTemplate rabbitTemplate;
@Autowired
public QueueProducer(RabbitTemplate rabbitTemplate) {
super();
this.rabbitTemplate = rabbitTemplate;
}
public void produce(NotificationRequestDTO notificationDTO) throws Exception {
logger.info("Storing notification...");
rabbitTemplate.setExchange(fanoutExchange);
rabbitTemplate.convertAndSend(new ObjectMapper().writeValueAsString(notificationDTO));
logger.info("Notification stored in queue sucessfully");
}
}
d) 然后您可以从模块中的任何位置调用生产方法。
{
queueProducer.produce(notificationDTO);
}
2)Consumer: 该层负责以先进先出的方式消费来自RabbitMQ消息代理的消息,然后进行email相关的操作。
a) 在属性文件中,我们需要提及队列名称和交换类型,以及安装 RabbitMQ 服务器的主机和端口。
queue.name=messagequeue
fanout.exchange=messagequeue-exchange
spring.rabbitmq.host: localhost
spring.rabbitmq.port: 5672
spring.rabbitmq.username: guest
spring.rabbitmq.password: guest
b) 我们需要创建一个配置类,它将使用队列名称和交换类型将队列绑定到微服务模块。此外,在消费者的 RabbitMQ 配置中,我们将需要创建一个 MessageListenerAdapter
bean,这将使其充当消费者并始终侦听队列管道中的传入消息。这 MessageListenerAdapter
将有一个带有 Consumer util 类的参数化构造函数 defaultListenerMethod
,我们可以在其中指定与电子邮件相关的操作。
@Configuration
public class RabbitConfiguration {
private static final String LISTENER_METHOD = "receiveMessage";
@Value("${queue.name}")
private String queueName;
@Value("${fanout.exchange}")
private String fanoutExchange;
@Bean
Queue queue() {
return new Queue(queueName, true);
}
@Bean
FanoutExchange exchange() {
return new FanoutExchange(fanoutExchange);
}
@Bean
Binding binding(Queue queue, FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(QueueConsumer consumer) {
return new MessageListenerAdapter(consumer, LISTENER_METHOD);
}
}
c) 然后,我们需要创建一个 QueueConsumer
类,该类将具有指定的消息侦听器方法,我们可以在其中执行实际的电子邮件发送操作。
@Component
public class QueueConsumer {
@Autowired
MailServiceImpl mailServiceImpl;
protected Logger logger = LoggerFactory.getLogger(getClass());
public void receiveMessage(String message) {
logger.info("Received (String) " + message);
processMessage(message);
}
public void receiveMessage(byte[] message) {
String strMessage = new String(message);
logger.info("Received (No String) " + strMessage);
processMessage(strMessage);
}
private void processMessage(String message) {
try {
MailDTO mailDTO = new ObjectMapper().readValue(message, MailDTO.class);
ValidationUtil.validateMailDTO(mailDTO);
mailServiceImpl.sendMail(mailDTO, null);
} catch (JsonParseException e) {
logger.warn("Bad JSON in message: " + message);
} catch (JsonMappingException e) {
logger.warn("cannot map JSON to NotificationRequest: " + message);
} catch (Exception e) {
logger.error(e.getMessage());
}
}
}
结论
使用 RabbitMQ,您可以避免服务之间的直接 HTTP 调用,并消除核心微服务的紧密耦合。这将帮助您在更高级别扩展您的微服务并在微服务之间添加故障转移机制。
原文作者:Akash Bhingole /Senior Software Engineer, Globant