2020-08-13 13:39:52 晓掌柜 版权声明:本文为站长原创文章,转载请写明出处
承接上文Springboot+Redis+RabbitMQ实战教程(一)Redis,本次主要对RabbitMQ的集成及相关知识点做一些总结。
消息总线(Message Queue)是一种跨进程、一步的通信机制。主要用于上下游的消息传递。
① 消息队列的作用
消息队列主要的作用就是应用解耦、异步操作、流量削峰、日志记录等。
② 典型的适用场景
应用解耦 -- 库存和订单的分离
异步操作 -- 用户注册后发送邮件
并行操作 -- 用户注册发送邮件的同时也发送一条短信
流量削峰 -- 在类似秒杀活动中把请求放置到队列中控制数量(达到数量即停止)
根据服务器性能慢慢消费(每次消费XX条)
日志收集 -- 比如和logback的集成
2.3.1、消息堆积(生产者 > 消费者、失败的消息无法ACK、消费者瓶颈或挂掉)
① 排查消费者效率及性能
② 增加消费者或消费者线程
2.3.2、消息丢失
① 生产者丢失(断网、宕机):生产者在消息投递时开启确认机制
② 队列中丢失(队列服务器宕机):持久化处理
③ 消费者中丢失:手动ACK
2.3.3、重复消费(没有ACK,手动ACK时失败)
① 消息中加入标识作为检测依据
② 消费后变更消息标识
③ 如遇到再次消费是检测消息标识,检测到未消费时继续进行
3.1.1、安装Erlang
https://www.erlang.org/downloads
3.1.2、安装RabbitMQ
http://www.rabbitmq.com/download.html
3.1.3、注意事项
详细的安装操作可以在网络上搜索,这里提示一下(rabbitMQ解压完成后一定要安装插件)
cmd进入sbin目录下执行 rabbitmq-plugins enable rabbitmq_management
3.1.3、默认登录密码及端口
guest、guest (可修改);登录的端口号是15672
3.1.4、Host和User分配
添加Host并和user进行关联分配
注意事项:
① 安装erlang后需配置系统环境变量,变量名为 ERLANG_HOME,路径为安装根目录
② 修改Path用户环境变量配置,在原有上加上 %ERLANG_HOME%\bin
3.2.1、maven依赖
<!-- rabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.2.1、相关yml配置
# RabbitMQ基础配置 rabbitmq:
host: 127.0.0.1
port: 5672
username: xa
password: guangmuhua
listener:
simple:
retry:
# 开启消费者(程序出现异常的情况下会)进行重试
enabled: true
# 最大重试次数
max-attempts: 5
# 重试间隔次数
initial-interval: 3000
# 开启手动ack
acknowledge-mode: manual
# 指定最小的消费者数量
concurrency: 1
# 指定最大的消费者数量
max-concurrency: 1
# 指定一个请求能处理多少的消息,如果有事务的话,必须大于等于transaction的数量
prefetch: 1
声明队列和交换机并绑定。
/**
* @author XA
* date 2021/1/25 15:36
* description: 队列和交换机绑定
*/
@Configuration
public class RabbitConfig {
@Value("${rabbitmqConf.article.exchange}")
private String articleExchange;
@Value("${rabbitmqConf.article.query}")
private String articleQuery;
@Value("${rabbitmqConf.article.routing}")
private String articleRouting;
@Value("${rabbitmqConf.mail.exchange}")
private String mailExchange;
@Value("${rabbitmqConf.mail.query}")
private String mailQuery;
@Value("${rabbitmqConf.mail.routing}")
private String mailRouting;
/* 文章队列和交换机绑定 */
@Bean
public Queue articleQuery() {
/* durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 */
/* exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable */
/* autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 */
/* 一般设置一下队列的持久化就好,其余两个就是默认false */
return new Queue(articleQuery,true,false,false);
}
@Bean
DirectExchange articleExchange() {
return new DirectExchange(articleExchange,true,false);
}
/* 绑定将队列和交换机绑定, 并设置用于匹配键:articleRouting */
@Bean
Binding bindingOrder() {
return BindingBuilder.bind(articleQuery()).to(articleExchange()).with(articleRouting);
}
/* 邮件发送交换机和队列绑定 */
@Bean
public Queue mailQuery() {
return new Queue(mailQuery, true, false, false);
}
@Bean
DirectExchange mailExchacge() {
return new DirectExchange(mailExchange, true, false);
}
@Bean
Binding bindingMail() {
return BindingBuilder.bind(mailQuery()).to(mailExchacge()).with(mailRouting);
}
}
private void sendMailMq(MailMqDTO mailMqDTO){
String str = JSONObject.toJSONString(mailMqDTO);
Message message = MessageBuilder.withBody(str.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setContentEncoding("utf-8")
.setMessageId(UUID.randomUUID()+"")
.build();
rabbitTemplate.convertAndSend(articleExchange,articleRouting, message);
}
/**
* 功能描述: 消息监听 (注意:这里的RabbitListener注解放置在类上会报错)
* * 没有队列时创建 -- queuesToDeclare
* * 持久化 -- durable
* * 是否自动删除 -- autoDelete
*
* Param: [message]
* Return: void
*/
@RabbitHandler
@RabbitListener(queues="mail")
@Transactional(rollbackFor = Exception.class)
public void process(Message message, Channel channel) throws IOException {
try {
MailMqDTO mailMqDTO = JSON.parseObject(new String(message.getBody()), MailMqDTO.class);
assert mailMqDTO != null;
mailUtils.sendMail(mailMqDTO);
/* 消费成功之后进行手动签收 */
/* todo 注意:MQ在ack时是线程不安全的,channel的publicer可能已经失效 */
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}catch (Exception e){
/* 消费失败,将消息重新放置到队列中 */
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,true);
e.printStackTrace();
}
}
这里仅做初步的项目集成,更多高级用法会在后续陆续放出,敬请期待。
也欢迎大家沟通交流,共同进步!