亲宝软件园·资讯

展开

Java RabbitMQ

kaico2018 人气:0

消息堆积

消息堆积的产生场景:

保证消息不丢失

1、生产者使用消息确认机制保证消息百分之百能够将消息投递到MQ成功。

2、MQ服务器端应该将消息持久化到硬盘

3、消费者使用手动ack机制确认消息消费成功

如果MQ服务器容量满了怎么办?

使用死信队列将消息存到数据库中去,后期补偿消费。

死信队列

RabbitMQ死信队列俗称,备胎队列;消息中间件因为某种原因拒收该消息后,可以转移到死信队列中存放,死信队列也可以有交换机和路由key等。

产生背景:

代码案例:

maven依赖

<dependencies>
        <!-- springboot-web组件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 添加springboot对amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <!--fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>
    </dependencies>

yml配置

server:
#  服务启动端口配置
  port: 8081
  servlet:
#    应用访问路径
    context-path: /
spring:
  #增加application.druid.yml 的配置文件
#  profiles:
#    active: rabbitmq
  rabbitmq:
    ####连接地址
    host: www.kaicostudy.com
    ####端口号
    port: 5672
    ####账号
    username: kaico
    ####密码
    password: kaico
    ### 地址
    virtual-host: /kaicoStudy
###模拟演示死信队列
kaico:
  dlx:
    exchange: kaico_order_dlx_exchange
    queue: kaico_order_dlx_queue
    routingKey: kaico.order.dlx
  ###备胎交换机
  order:
    exchange: kaico_order_exchange
    queue: kaico_order_queue
    routingKey: kaico.order

队列配置类

@Configuration
public class DeadLetterMQConfig {
    /**
     * 订单交换机
     */
    @Value("${kaico.order.exchange}")
    private String orderExchange;
    /**
     * 订单队列
     */
    @Value("${kaico.order.queue}")
    private String orderQueue;
    /**
     * 订单路由key
     */
    @Value("${kaico.order.routingKey}")
    private String orderRoutingKey;
    /**
     * 死信交换机
     */
    @Value("${kaico.dlx.exchange}")
    private String dlxExchange;
    /**
     * 死信队列
     */
    @Value("${kaico.dlx.queue}")
    private String dlxQueue;
    /**
     * 死信路由
     */
    @Value("${kaico.dlx.routingKey}")
    private String dlxRoutingKey;
    /**
     * 声明死信交换机
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(dlxExchange);
    }
    /**
     * 声明死信队列
     *
     * @return Queue
     */
    @Bean
    public Queue dlxQueue() {
        return new Queue(dlxQueue);
    }
    /**
     * 声明订单业务交换机
     *
     * @return DirectExchange
     */
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(orderExchange);
    }
    /**
     * 绑定死信队列到死信交换机
     *
     * @return Binding
     */
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(dlxQueue())
                .to(dlxExchange())
                .with(dlxRoutingKey);
    }
    /**
     * 声明订单队列,并且绑定死信队列
     *
     * @return Queue
     */
    @Bean
    public Queue orderQueue() {
        // 订单队列绑定我们的死信交换机
        Map<String, Object> arguments = new HashMap<>(2);
        arguments.put("x-dead-letter-exchange", dlxExchange);
        arguments.put("x-dead-letter-routing-key", dlxRoutingKey);
        return new Queue(orderQueue, true, false, false, arguments);
    }
    /**
     * 绑定订单队列到订单交换机
     *
     * @return Binding
     */
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue())
                .to(orderExchange())
                .with(orderRoutingKey);
    }
}

死信队列消费者

@Component
public class OrderDlxConsumer {
    /**
     * 死信队列监听队列回调的方法
     * @param msg
     */
    @RabbitListener(queues = "kaico_order_dlx_queue")
    public void orderDlxConsumer(String msg) {
        System.out.println("死信队列消费订单消息" + msg);
    }
}

普通队列消费者

@Component
public class OrderConsumer {
    /**
     * 监听队列回调的方法
     *
     * @param msg
     */
    @RabbitListener(queues = "kaico_order_queue")
    public void orderConsumer(String msg) {
        System.out.println("正常订单消费者消息msg:" + msg);
    }
}

后台队列管理页面如下:

部署方式:死信队列不能够和正常队列存在同一个服务器中,应该分服务器存放。

延迟队列

订单30分钟未支付,系统自动超时关闭的实现方案。

基于任务调度实现,效率是非常低。

基于redis过期key实现,key失效时会回调客户端一个方法。

用户下单的时候,生成一个令牌(有效期)30分钟,存放到我们redis;缺点:非常冗余,会在表中存放一个冗余字段。

基于mq的延迟队列(最佳方案)rabbitmq情况下。

原理:在我们下单的时候,往mq投递一个消息设置有效期为30分钟,但该消息失效的时候(没有被消费的情况下),执行我们客户端一个方法告诉我们该消息已经失效,这时候查询这笔订单是否已经支付。

实现逻辑:

主要使用死信队列来实现。

想要的代码:就是正常的消费者不消费消息,或者没有正常的消费者,在设置的时间后进入死信队列中,然后死信消费者实现相应的业务逻辑。

RabbitMQ消息幂等问题

RabbitMQ消息自动重试机制

当消费者业务逻辑代码中,抛出异常自动实现重试 (默认是无数次重试)

应该对RabbitMQ重试次数实现限制,比如最多重试5次,每次间隔3s;重试多次还是失败的情况下,存放到死信队列或者存放到数据库表中记录后期人工补偿。因为重试失败次数之后,队列会自动删除这个消息。

消息重试原理: 在重试的过程中,使用aop拦截我们的消费监听方法,也不会打印这个错误日志。如果重试多次还是失败,达到最大失败次数的时候才会打印错误日志。

如果消费多次还是失败的情况下:

1、自动删除该消息;(消息可能丢失)

解决办法:

如果充实多次还是失败的情况下,最终存放到死信队列;

采用表日志记,消费失败错误日志的日志记录,后期人工自动对该消息实现补偿。

合理的选择重试机制

消费者获取消息后,调用第三方接口(HTTP请求),但是调用第三方接口失败呢?是否需要重试 ?

答:有时是因为网络异常调用失败,应该需要重试几次。

消费者获取消息后,应该代码问题抛出数据异常,是否需要重试?

答:不需要重试,代码异常需要重新修改代码发布项目。

消费者开启手动ack模式

第一步、springboot项目配置需要开启ack模式

acknowledge-mode: manual

第二步、消费者Java代码

int result = orderMapper.addOrder(orderEntity);
if (result >= 0) {
    // 开启消息确认机制
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

rabbitMQ如何解决消息幂等问题

什么是消息幂等性?MQ消费者如何保证幂等性?

产生的原因:就是因为消费者可能会开启自动重试,重试过程中可能会导致消费者业务逻辑代码重复执行。此刻消息已经消费了,因为业务报错导致消息重新消费,这时会出现

解决方案:采用消息全局id根据业务来定,根据业务id(全局唯一id)消费者可以判断这条消息已经消费了。

消费者代码逻辑:

RabbitMQ解决分布式事务问题

分布式事务:在分布式系统中,因为跨服务调用接口,存在多个不同的事务,每个事务都互不影响。就存在分布式事务的问题。

解决分布式事务核心思想:数据最终一致性。

分布式领域中名词:

强一致性 :要么同步速度非常快或者采用锁的机制 不允许出现脏读;

强一致性解决方案:要么数据库A非常迅速的将数据同步给数据B,或者数据库A没有同步完成之前数据库B不能够读取数据。

弱一致性: 允许读取的数据为原来的脏数据,允许读取的结果不一致性。

最终一致性: 在我们的分布式系统中,因为数据之间同步通过网络实现通讯,短暂的数据延迟是允许的,但是最终数据必须要一致性。

基于RabbitMQ解决分布式事务的思路

基于RabbitMQ解决分布式事务的思路:(采用最终一致性的方案)

解决思路图:核心是利用mq发送消息给其他系统将数据修改回来。

加载全部内容

相关教程
猜你喜欢
用户评论