RabbitMq从入门到精通
什么是消息中间件?
消息中间件基于队列模型实现异步/同步传输数据
作用:可以实现支撑高并发、异步解耦、流量削峰、降低耦合度。
传统的http请求存在那些缺点
Http请求基于请求与响应的模型,在高并发的情况下,客户端发送大量的请求达到服务器端有可能会导致我们服务器端处理请求堆积。
Tomcat服务器处理每个请求都有自己独立的线程,如果超过最大线程数会将该请求缓存到队列中,如果请求堆积过多的情况下,有可能会导致tomcat服务器崩溃的问题。
所以一般都会在nginx入口实现限流,整合服务保护框架。
http请求处理业务逻辑如果比较耗时的情况下,容易造成客户端一直等待,阻塞等待过程中会导致客户端超时发生重试策略,有可能会引发幂等性问题。
注意事项:接口是为http协议的情况下,最好不要处理比较耗时的业务逻辑,耗时的业务逻辑应该单独交给多线程或者是mq处理。
Mq应用场景有哪些?
- 异步发送短信
- 异步发送新人优惠券
- 处理一些比较耗时的操作
为什么需要使用mq
可以实现支撑高并发、异步解耦、流量削峰、降低耦合度。
Mq与多线程之间区别
- MQ可以实现异步/解耦/流量削峰问题;
- 多线程也可以实现异步,但是消耗到cpu资源,没有实现解耦。
Mq消息中间件名词
- Producer 生产者:投递消息到MQ服务器端;
- Consumer 消费者:从MQ服务器端获取消息处理业务逻辑;
- Broker MQ服务器端
- Topic 主题:分类业务逻辑发送短信主题、发送优惠券主题
- Queue 存放消息模型 队列 先进先出 后进后出原则 数组/链表
- Message 生产者投递消息报文:json
主流mq区别对比
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
开发语言 | java | erlang | java | scala |
单机吞吐量 | 万级 | 万级 | 10万级 | 10万级 |
时效性 | ms级 | us级 | ms级 | ms级以内 |
可用性 | 高(主从架构) | 高(主从架构) | 非常高(分布式架构) | 非常高(分布式架构) |
功能特性 | 成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持较好 | 基于erlang开发,所以并发能力很强,性能极其好,延时很低管理界面较丰富 | MQ功能比较完备,扩展性佳 | 只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广 |
基于多线程队列简单实现mq
import com.alibaba.fastjson.JSONObject;
import java.util.concurrent.LinkedBlockingDeque;
public class STThreadMQ {
private static LinkedBlockingDeque<JSONObject> msgs = new LinkedBlockingDeque<>();
public static void main(String[] args) {
//生产线程
Thread producerThread = new Thread(new Runnable() {
@Override
public void run() {
try {
while (true){
Thread.sleep(1000);
JSONObject data = new JSONObject();
data.put("userId","1234");
//存入消息
msgs.offer(data);
System.out.println(Thread.currentThread().getName()+ "," +data);
}
}catch (Exception e){
}
}
},"生产者");
producerThread.start();
//消费线程
Thread ConsumerThread = new Thread(new Runnable() {
@Override
public void run() {
try {
while (true){
JSONObject data = msgs.poll();
if (data != null){
System.out.println(Thread.currentThread().getName()+ "," +data);
}
}
}catch (Exception e){
}
}
},"消费者");
ConsumerThread.start();
}
}
- 生产者投递消息给MQ服务器端,MQ服务器端需要缓存该消息
如果mq服务器端宕机之后,消息如何保证不丢失
- 持久化机制
如果mq接收到生产者投递消息,如果消费者不在的情况下,该消息是否会丢失?
- 不会丢失,消息确认机制 必须要消费者消费该消息成功之后,再通知给mq服务器端删除该消息。
Mq服务器端将该消息推送消费者:
- 消费者已经和mq服务器保持长连接。
消费者主动拉取消息:
- 消费者第一次刚启动的时候
Mq如何实现抗高并发思想
- Mq消费者根据自身能力情况 ,拉取mq服务器端消息消费
- 默认的情况下是取出一条消息
- 缺点:存在延迟的问题
需要考虑mq消费者提高速率的问题
- 如何消费者提高速率:消费者实现集群、消费者批量获取消息即可
RabbitMQ
RabbitMQ基本介绍
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件),RabbitMQ服务器是用Erlang语言编写的。
RabitMQ官方网站:
- 1.点对点(简单)的队列
- 2.工作(公平性)队列模式
- 3.发布订阅模式
- 4.路由模式Routing
- 5.通配符模式Topics
- 6.RPC
https://www.rabbitmq.com/getstarted.html
RabbitMQ环境的基本安装
- 1.下载并安装erlang,下载地址:http://www.erlang.org/download
- 2.配置erlang环境变量信息
- 新增环境变量ERLANG_HOME=erlang的安装地址
- 将%ERLANG_HOME%\bin加入到path中
- 3.下载并安装RabbitMQ,下载地址:http://www.rabbitmq.com/download.html
- **注意: RabbitMQ 它依赖于Erlang,需要先安装Erlang。 https://www.rabbitmq.com/install-windows.html **
启动Rabbitmq常见问题
如果rabbitmq 启动成功无法访问 管理平台页面
进入到F:\path\rabbitmq\rabbitmq\rabbitmq_server-3.6.9\sbin> 执行
rabbitmq-plugins enable rabbitmq_management
rabbitmqctl start_app
Rabbitmq管理平台中心
RabbitMQ 管理平台地址 http://127.0.0.1:15672
默认账号:guest/guest 用户可以自己创建新的账号
Virtual Hosts:
像mysql有数据库的概念并且可以指定用户对库和表等操作的权限。那RabbitMQ呢?
RabbitMQ也有类似的权限管理。在RabbitMQ中可以虚拟消息服务器VirtualHost,每个VirtualHost相当月一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通
默认的端口15672:rabbitmq管理平台端口号
默认的端口5672: rabbitmq消息中间内部通讯的端口
默认的端口号25672 rabbitmq集群的端口号
RabbitMQ常见名词
/Virtual Hosts—分类
/队列 存放我们消息
Exchange 分派我们消息在那个队列存放起来 类似于nginx
15672—rabbitmq控制台管理平台 http协议
25672—rabbitmq 集群通信端口号
Amqp 5672—rabbitmq内部通信的一个端口号
快速入门RabbitMQ简单队列
首先需要再RabbitMQ平台创建Virtual Hosts 和队列。
/meiteVirtualHosts
—-订单队列
—-支付队列
- 在RabbitMQ平台创建一个队列:
- 在编写生产者代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
private static final String QUEUE_NAME = "mayikt--queue";
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接
Connection connection = RabbitMQConnection.getConnection();
//2.创建通道
Channel channel = connection.createChannel();
String msg = "每特教育666";
channel.basicPublish(null,QUEUE_NAME,null,msg.getBytes());
//关闭通道和连接
channel.close();
connection.close();
}
}
- 在编写消费者代码
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
private static final String QUEUE_NAME = "mayikt--queue";
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接
Connection connection = RabbitMQConnection.getConnection();
//2.设置通道
Channel channel = connection.createChannel();
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body)throws IOException{
String msg = new String(body,"UTF-8");
System.out.println("消费者获取消息:" + msg);
}
};
//3.监听队列
//autoAck true:自动签收
//autoAck false:
channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
}
}
- 连接代码
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQConnection {
//获取连接
public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
//1.设置连接VirtualHost
connectionFactory.setVirtualHost("/meiteVirtualHosts");
//2.设置账号和密码
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
//3.mq连接信息地址
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
return connectionFactory.newConnection();
}
}
RabbitMQ如何保证消息不丢失
Mq如何保证消息不丢失:
- 生产者角色
- 确保生产者投递消息到MQ服务器端成功。
- Ack 消息确认机制
- 同步或者异步的形式
- 方式1:Confirms
- 方式2:事务消息
- 消费者模式
- 在rabbitmq情况下:
- 必须要将消息消费成功之后,才会将该消息从mq服务器端中移除。
- 在kafka中的情况下:
- 不管是消费成功还是消费失败,该消息都不会立即从mq服务器端移除。
RabitMQ工作队列
默认的传统队列是为均摊消费,存在不公平性;如果每个消费者速度不一样的情况下,均摊消费是不公平的,应该是能者多劳。
在通道中只需要设置basicQos为1即可,表示MQ服务器每次只会给消费者推送1条消息必须手动ack确认之后才会继续发送。channel.basicQos(1);
import com.rabbitmq.client.*;
import com.st.mq.RabbitMQConnection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer02 {
private static final String QUEUE_NAME = "mayikt-queue";
public static void main(String[] args) throws IOException, TimeoutException, IOException, TimeoutException {
// 1.创建连接
Connection connection = RabbitMQConnection.getConnection();
// 2.设置通道
Channel channel = connection.createChannel();
//指定我们消费者每次批量获取消息
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(100);
}catch (Exception e){
}
String msg = new String(body, "UTF-8");
System.out.println("消费者获取消息:" + msg);
//消费者完成,删除消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 3.监听队列
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
}
}
import com.rabbitmq.client.*;
import com.st.mq.RabbitMQConnection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer01 {
private static final String QUEUE_NAME = "mayikt-queue";
public static void main(String[] args) throws IOException, TimeoutException, IOException, TimeoutException {
// 1.创建连接
Connection connection = RabbitMQConnection.getConnection();
// 2.设置通道
Channel channel = connection.createChannel();
//指定我们消费者每次批量获取消息
channel.basicQos(2);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("消费者获取消息:" + msg);
//消费者完成 删除消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 3.监听队列
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
}
}
RabbitMQ交换机类型
Direct exchange(直连交换机)
Fanout exchange(扇型交换机)
Topic exchange(主题交换机)
Headers exchange(头交换机)
/Virtual Hosts—区分不同的团队
—-队列 存放消息
—-交换机 路由消息存放在那个队列中 类似于nginx
—路由key 分发规则
RabbitMQ Fanout 发布订阅
生产者发送一条消息,经过交换机转发到多个不同的队列,多个不同的队列就多个不同的消费者。
原理:
- 需要创建两个队列 ,每个队列对应一个消费者;
- 队列需要绑定我们交换机
- 生产者投递消息到交换机中,交换机在将消息分配给两个队列中都存放起来;
- 消费者从队列中获取这个消息。
生产者代码
import com.mayikt.rabbitmq.RabbitMQConnection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ProducerFanout {
/**
* 定义交换机的名称
*/
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建Connection
Connection connection = RabbitMQConnection.getConnection();
// 创建Channel
Channel channel = connection.createChannel();
// 通道关联交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
String msg = "每特教育6666";
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
channel.close();
connection.close();
}
}
消费者代码
邮件消费者
import com.mayikt.rabbitmq.RabbitMQConnection;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class MailConsumer {
/**
* 定义邮件队列
*/
private static final String QUEUE_NAME = "fanout_email_queue";
/**
* 定义交换机的名称
*/
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("邮件消费者...");
// 创建我们的连接
Connection connection = RabbitMQConnection.getConnection();
// 创建我们通道
final Channel channel = connection.createChannel();
// 关联队列消费者关联队列
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("邮件消费者获取消息:" + msg);
}
};
// 开始监听消息 自动签收
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
短信消费者
public class SmsConsumer {
/**
* 定义短信队列
*/
private static final String QUEUE_NAME = "fanout_email_sms";
/**
* 定义交换机的名称
*/
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("短信消费者...");
// 创建我们的连接
Connection connection = RabbitMQConnection.getConnection();
// 创建我们通道
final Channel channel = connection.createChannel();
// 关联队列消费者关联队列
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "UTF-8");
System.out.println("短信消费者获取消息:" + msg);
}
};
// 开始监听消息 自动签收
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
Direct路由模式
当交换机类型为direct类型时,根据队列绑定的路由建转发到具体的队列中存放消息
Topic主题模式
当交换机类型为topic类型时,根据队列绑定的路由建模糊转发到具体的队列中存放。
#号表示支持匹配多个词;
*号表示只能匹配一个词
SpringBoot整合RabbitMQ
Maven依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<dependencies>
<!-- springboot-web组件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 添加springboot对amqp的支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<!--fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.49</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
配置类:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
/**
* @ClassName RabbitMQConfig
* @Author song
* @Version V1.0
**/
@Component
public class RabbitMQConfig {
/**
* 定义交换机
*/
private String EXCHANGE_SPRINGBOOT_NAME = "/mayikt_ex";
/**
* 短信队列
*/
private String FANOUT_SMS_QUEUE = "fanout_sms_queue";
/**
* 邮件队列
*/
private String FANOUT_EMAIL_QUEUE = "fanout_email_queue";
/**
* 配置smsQueue
*
* @return
*/
@Bean
public Queue smsQueue() {
return new Queue(FANOUT_SMS_QUEUE);
}
/**
* 配置emailQueue
*
* @return
*/
@Bean
public Queue emailQueue() {
return new Queue(FANOUT_EMAIL_QUEUE);
}
/**
* 配置fanoutExchange
*
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(EXCHANGE_SPRINGBOOT_NAME);
}
// 绑定交换机 sms
@Bean
public Binding bindingSmsFanoutExchange(Queue smsQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(smsQueue).to(fanoutExchange);
}
// 绑定交换机 email
@Bean
public Binding bindingEmailFanoutExchange(Queue emailQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(emailQueue).to(fanoutExchange);
}
}
配置文件
application.yml
spring:
rabbitmq:
####连接地址
host: 127.0.0.1
####端口号
port: 5672
####账号
username: guest
####密码
password: guest
### 地址
virtual-host: /meiteVirtualHosts
生产者
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @ClassName FanoutProducer
* @Author song
* @Version V1.0
**/
@RestController
public class FanoutProducer {
@Autowired
private AmqpTemplate amqpTemplate;
/**
* 发送消息
*
* @return
*/
@RequestMapping("/sendMsg")
public String sendMsg(String msg) {
/**
* 1.交换机名称
* 2.路由key名称
* 3.发送内容
*/
amqpTemplate.convertAndSend("/mayikt_ex", "", msg);
return "success";
}
}
消费者
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @ClassName FanoutEmailConsumer
* @Author 蚂蚁课堂余胜军 QQ644064779 www.mayikt.com
* @Version V1.0
**/
@Slf4j
@Component
@RabbitListener(queues = "fanout_email_queue")
public class FanoutEmailConsumer {
@RabbitHandler
public void process(String msg) {
log.info(">>邮件消费者消息msg:{}<<", msg);
}
}
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @ClassName fanout_sms_queue
* @Author 蚂蚁课堂余胜军 QQ644064779 www.mayikt.com
* @Version V1.0
**/
@Slf4j
@Component
@RabbitListener(queues = "fanout_sms_queue")
public class FanoutSmsConsumer {
@RabbitHandler
public void process(String msg) {
log.info(">>短信消费者消息msg:{}<<", msg);
}
}
生产者如何获取消费结果
1. 根据业务来定
消费者消费成功结果:
1.能够在数据库中插入一条数据
2.Rocketmq 自带全局消息id,能够根据该全局消息获取消费结果
原理: 生产者投递消息到mq服务器,mq服务器端在这时候返回一个全局的消息id,
当我们消费者消费该消息成功之后,消费者会给我们mq服务器端发送通知标记该消息消费成功。
生产者获取到该消息全局id,每隔2s时间调用mq服务器端接口查询该消息是否有被消费成功。
- 异步返回一个全局id,前端使用ajax定时主动查询;
- 在rocketmq中,自带根据消息id查询是否消费成功
RabbitMQ死信队列
死信队列产生的背景
RabbitMQ死信队列俗称,备胎队列;消息中间件因为某种原因拒收该消息后,可以转移到死信队列中存放,死信队列也可以有交换机和路由key等。
产生死信队列的原因
- 消息投递到MQ中存放 消息已经过期 消费者没有及时的获取到我们消息,消息如果存放到mq服务器中过期之后,会转移到备胎死信队列存放。
- 队列达到最大的长度 (队列容器已经满了)
- 消费者消费多次消息失败,就会转移存放到死信队列中
死信队列的架构原理
死信队列和普通队列区别不是很大
普通与死信队列都有自己独立的交换机和路由key、队列和消费者。
区别:
- 生产者投递消息先投递到我们普通交换机中,普通交换机在将该消息投到普通队列中缓存起来,普通队列对应有自己独立普通消费者。
- 如果生产者投递消息到普通队列中,普通队列发现该消息一直没有被消费者消费的情况下,在这时候会将该消息转移到死信(备胎)交换机中,死信(备胎)交换机对应有自己独立的 死信(备胎)队列 对应独立死信(备胎)消费者。
死信队列应用场景
- 30分钟订单超时设计
A. Redis过期key
B. 死信延迟队列实现:
采用死信队列,创建一个普通队列没有对应的消费者消费消息,在30分钟过后就会将该消息转移到死信备胎消费者实现消费。
备胎死信消费者会根据该订单号码查询是否已经支付过,如果没有支付的情况下则会开始回滚库存操作。
RabbitMQ消息幂等问题
RabbitMQ消息自动重试机制
- 当我们消费者处理执行我们业务代码的时候,如果抛出异常的情况下
在这时候mq会自动触发重试机制,默认的情况下rabbitmq是无限次数的重试。需要人为指定重试次数限制问题
- 在什么情况下消费者需要实现重试策略?
A. 消费者获取消息后,调用第三方接口,但是调用第三方接口失败呢?是否需要重试?
该情况下需要实现重试策略,网络延迟只是暂时调用不通,重试多次有可能会调用通。
B. 消费者获取消息后,因为代码问题抛出数据异常,是否需要重试?
该情况下是不需要实现重试策略,就算重试多次,最终还是失败的。
可以将日志存放起来,后期通过定时任务或者人工补偿形式。
如果是重试多次还是失败消息,需要重新发布消费者版本实现消费
可以使用死信队列
Mq在重试的过程中,有可能会引发消费者重复消费的问题。
Mq消费者需要解决 幂等性问题
幂等性 保证数据唯一
方式1:
生产者在投递消息的时候,生成一个全局唯一id,放在我们消息中。
Msg id=123456
Msg id=123456
Msg id=123456
消费者获取到我们该消息,可以根据该全局唯一id实现去重复。
全局唯一id 根据业务来定的 订单号码作为全局的id
实际上还是需要再db层面解决数据防重复。
业务逻辑是在做insert操作 使用唯一主键约束
业务逻辑是在做update操作 使用乐观锁
- 当消费者业务逻辑代码中,抛出异常自动实现重试 (默认是无数次重试)
- 应该对RabbitMQ重试次数实现限制,比如最多重试5次,每次间隔3s;重试多次还是失败的情况下,存放到死信队列或者存放到数据库表中记录后期人工补偿
如何合理选择消息重试
总结:如果消费者处理消息时,因为代码原因抛出异常是需要从新发布版本才能解决的,那么就不需要重试,重试也解决不了该问题的。存放到死信队列或者是数据库表记录、后期人工实现补偿。
Rabbitmq如何开启重试策略
spring:
rabbitmq:
####连接地址
host: 127.0.0.1
####端口号
port: 5672
####账号
username: guest
####密码
password: guest
### 地址
virtual-host: /meite_rabbitmq
listener:
simple:
retry:
####开启消费者(程序出现异常的情况下会)进行重试
enabled: true
####最大重试次数
max-attempts: 5
####重试间隔次数
initial-interval: 3000
消费者重试过程中,如何避免幂等性问题
重试的过程中,为了避免业务逻辑重复执行,建议提前全局id提前查询,如果存在
的情况下,就无需再继续做该流程。
重试的次数最好有一定间隔次数,在数据库底层层面保证数据唯一性,比如加上唯一id。
SpringBoot开启消息确认机制
配置文件新增
spring:
rabbitmq:
####连接地址
host: 127.0.0.1
####端口号
port: 5672
####账号
username: guest
####密码
password: guest
### 地址
virtual-host: /meiteVirtualHosts
listener:
simple:
retry:
####开启消费者(程序出现异常的情况下会)进行重试
enabled: true
####最大重试次数
max-attempts: 5
####重试间隔次数
initial-interval: 3000
acknowledge-mode: manual
datasource:
url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8
username: root
password: root
driver-class-name: com.mysql.jdbc.Driver
消费者ack代码
@Slf4j
@Component
@RabbitListener(queues = "fanout_order_queue")
public class FanoutOrderConsumer {
@Autowired
private OrderManager orderManager;
@Autowired
private OrderMapper orderMapper;
@RabbitHandler
public void process(OrderEntity orderEntity, Message message, Channel channel) throws IOException {
// try {
log.info(">>orderEntity:{}<<", orderEntity.toString());
String orderId = orderEntity.getOrderId();
if (StringUtils.isEmpty(orderId)) {
log.error(">>orderId is null<<");
return;
}
OrderEntity dbOrderEntity = orderMapper.getOrder(orderId);
if (dbOrderEntity != null) {
log.info(">>该订单已经被消费过,无需重复消费!<<");
// 无需继续重试
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
return;
}
int result = orderManager.addOrder(orderEntity);
log.info(">>插入数据库中数据成功<<");
if (result >= 0) {
// 开启消息确认机制
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
// int i = 1 / 0;
// } catch (Exception e) {
// // 将失败的消息记录下来,后期采用人工补偿的形式
// }
}
}