RabbitMQ笔记

RabbatMQ

基础概念

中间件

中间件( Middleware )是处于操作系统和应用程序之间的软件,也有人认为它应该属于操作系统中的一部分。人们在使用中间件时,往往是一组中间件集成在一起,构成一个平台(包括开发平台和运行平台),但在这组中间件中必须要有一个通信中间件,即中间件=平台+通信,这个定义也限定了只有用于分布式系统中才能称为中间件,同时还可以把它与支撑软件和实用软件区分开来。

案例:

1,RMI (Remote Method lnvocations,远程调用)
2,Load Balancing(负载均衡,将访问负荷分散到各个服务器中)3,Transparent Fail-over(透明的故障切换)
4,Clustering(集群,用多个小的服务器代替大型机)
5,Back-end-Integration(后端集成,用现有的、新开发的系统如何去集成遗留的系统)
6,Transaction事务(全局/局部)全局事务(分布式事务)局部事务(在同一数据库联接内的事务7,Dynamic Redeployment(动态重新部署,在不停止原系统的情况下,部署新的系统)
8,System Management(系统管理)
9,Threading(多线程处理)
10,Message-oriented Middleware面向消息的中间件(异步的调用编程)

架构模式

  • 单体架构

在企业开发的中,大部分的初期架构都采用的是单体架构的模式进行架构,而这种架构的典型的特点:就是把所有的业务和模块,源代码,静态资源文件等都放在一个一工程中,如果其中的一个模块升级或者迭代发生一个很小变动都会重新编译和重新部署项目。这种的架构存在的问题就是:

    1. 耦合度太高
    2. 运维的成本过高3:不易维护
    3. 服务器的成本高
    4. 以及升级架构的复杂度也会增大
    • 分布式架构

    概述:一个请求由服务器端的多个服务(服务或者系统)协同处理完成

    和单体架构不同的是,单体架构是一个请求发起JVM调度线程(确切的是tomcat线程池)分配线程Thread来处理请求直到释放,而分布式是系统是:一个请求是由多个系统共同来协同完成,jvm和环境都可能是独立。如果生活中的比喻的话,单体架构就想建设一个小房子很快就能够搞定,如果你要建设一个鸟巢或者大型的建筑,你就必须是各个环节的协同和分布,这样目的也是项目发展都后期的时候要去部署和思考的问题。我们也不能看出来:分布式架构系统存在的特点和问题如下:

    仔在问题

    1. 学习成本高,技术栈过多
    2. 运维成本和服务器成本增高
    3. 人员的成本也会增高
    4. 项目的负载度也会上升
    5. 面临的错误和容错性也会成倍增加
    6. 占用的服务器端口和通讯的选择的成本高
    7. 安全性的考虑和因素逼迫可能选择RMI/MQ相关的服务器端通讯

    优势

    好处

    1. 服务系统的独立,占用的服务器资源减少和占用的硬件成本减少,确切的说是:可以合理的分配服务资源,不造成服务器资源的浪费
    2. 系统的独立维护和部署,耦合度降低,可插拔性。
    3. 系统的架构和技术栈的选择可以变的灵活(而不是单纯的选择java)
    4. 弹性的部署,不会造成平台因部署造成的瘫痪和停服的状态。
    • 服务网格

    基于消息中间件的分布式架构

    image-20210830114120983

    从上图中可以看出来,消息中间件的是

    1. 利用可靠的消息传递机制进行系统和系统直接的通讯
    2. 通过提供消息传递和消息的排队机制,它可以在分布式系统环境下扩展进程间的通讯。

    消息中间件应用场景

    1. 跨系统数据传递
    2. 高并发的流量削峰
    3. 数据的分发和异步处理
    4. 大数据分析与传递
    5. 分布式事务
    6. 应用解耦

    常见消息中间件:ActiveMQ、RabbitMQ、Kafka、RocketMQ ...

    消息中间件

    本质: 它是一种接受数据,接受请求、存储数据、发送数据等功能的技术服务。

    FIFO队列,跨进程的通信机制,用于上下游传递消息(Message)

    MQ消息队列:负责数据的传接受,存储和传递,所以性能要过于普通服务和技术。

    通信协议

    网络协议三要素

    1. 语法。语法是用户数据与控制信息的结构与格式,以及数据出现的顺序。
    2. 语义。语义是解释控制信息每个部分的意义。它规定了需要发出何种控制信息,以及完成的动作与做出什么样的响应。
    3. 时序。时序是对事件发生顺序的详细说明。

    对于消息中间件,不采用http协议

    常见消息中间件:OpenWire、AMQP、MQTT、Kafka、OpenMessage协议

    面试题:为什么消息中间件不直接使用http协议呢?

    1. 因为http请求报文头和响应报文头是比较复杂的,包含了cookie,数据的加密解密,状态码,响应码等附加的功能,但是对于一个消息而言,我们并不需要这么复杂,也没有这个必要性,它其实就是负责数据传递,存储,分发就行,一定要追求的是高性能。尽量简洁,快速。
    2. 大部分情况下http大部分都是短链接,在实际的交互过程中,一个请求到响应很有可能会中断,中断以后就不会就行持久化,就会造成请求的丢失。这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期的获取消息的过程,出现问题和故障要对数据或消息就行持久化等,目的是为了保证消息和数据的高可靠和稳健的运行。

    协议总结:是在TCP/IP协议基础之上构建的一种约定成俗的规范和机制、它的主要目的可以让客户端(应用程序java,go)讲行沟通和通讯。并且这种协议下规范必须具有持久性,高可用,高可靠的性能。

    AMQP协议

    AMQP:(全称:Advanced Message Queuing Protocol)是高级消息队列协议。由摩根大通集团传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang语言中的实现有RabbitMQ等。
    特性:

    1. 分布式事务支持。
    2. 消息的持久化支持。
    3. 性能和高可靠的消息处理优势。

    AMQP协议支持: RabbitMQ、ActiveMQ

    OpenMessage协议

    是近几年由阿里、雅虎和滴滴出行、Stremalio等公司共同参与创立的分布式消息中间件、流处理等领域的应用开发标准。
    特点:

    1. 结构简单
    2. 解析速度快
    3. 支持事务和持久化设计

    OpenMessage协议支持:RocketMQ

    Kafka协议

    Kafka协议是基于TCP/IP的二进制协议。消息内部是通过长度来分割,由一些基本数据类型组成。

    特点是:

    1. 结构简单
    2. 解析速度快
    3. 无事务支持
    4. 有持久化设计

    Kafka协议支持:Kafka

    消息分发策略

    MQ消息队列有如下几个角色:生产者、存储消息、消费者

    那么生产者生成消息以后,MQ进行存储,消费者是如何获取消息的呢?一般获取数据的方式无外乎推(push)或者拉(pull))两种方式,典型的git就有推拉机制,我们发送的http请求就是一种典型的拉取数据库数据返回的过程。而消息队列MQ是一种推送的过程,而这些机制会适用到很多的业务场景也有很多对应推机制策略。

    image-20210830141014283

    分发策略:

    • 发布订阅
    • 轮询分发
    • 公平分发
    • 重发
    • 消息拉取

    消息队列高可用&高可靠

    所谓高可用:是指产品在规定的条件和规定的时刻或时间内处于可执行规定功能状态的能力。
    当业务量增加时,请求也过大,一台消息中间件服务器的会触及硬件(CPU,内存,磁盘)的极限,一台消息服务器你已经无法满足业务的需求,所以消息中间件必须支持集群部署。来达到高可用的目的。

    • Master-slave主从共享数据部署方式集群模式

    image-20210830141952038

    • Master-slave主从同步部署方式集群模式

    image-20210830142400015

    • 多主集群同步部署方式集群模式

    image-20210830142446466

    • 多主集群转发部署方式集群模式

    image-20210830142506686

    • Master-slave与Breaker-cluster组合的集群模式

    image-20210830142618093

    高可靠机制:

    所谓高可用是指:是指系统可以无故障低持续运行,比如一个系统突然崩溃,报错,异常等等并不影响线上业务的正常运行,出错的几率极低,就称之为:高可靠。

    在高并发的业务场景中,如果不能保证系统的高可靠,那造成的隐患和损失是非常严重的。

    如何保证中间件消息的可靠性呢?可以从两个方面考虑:

    1. 消息的传输:通过协议来保证系统间数据解析的正确性。
    2. 消息的存储可靠:通过持久化来保证消息的可靠性。

    RabbitMQ 概念

    Server:

    ​ 又称Broker ,接受客户端的连接,实现AMQP实体服务。安装rabbitmq-server

    Connection

    ​ 连接,应用程序与Broker的网络连接TCP/IP/三次握手和四次挥手
    Channel:

    ​ 网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。
    Message

    ​ 消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
    Virtual Host

    ​ 虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名字的Exchange
    Exchange

    ​ 交换机,接受消息,根据路由键发送消息到绑定的队列。(==不具备消息存储的能力==)Bindings : Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.
    Routing key

    ​ 是一命路由规则,虚拟机可以用它来确定如何路由一个特定消息。
    Queue

    ​ 队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费者。

    image-20210830170216039

    安装与配置

    RabbitMQ是一个开源的遵循AMQP协议实现的基于Erlang语言编写,支持多种客户端(语言)。用于在分布式系统中存储消息,转发消息,具有高可用,,高可扩性,易用性等特征。

    Linux安装

    安装Erlang

    安装RabbitMQ

    系统开启自动启动

    Docker 安装

    默认安装

    docker pull rabbitmq:management # 获取rabbit 镜像
    docker run -di --name myrabbit -p 15672:15672 rabbitmq:management #创建并运行容器

    在运行时同时设置用户密码

    docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin \
    -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:3.8-management

    查看日志

    docker logs -f myrabbit

    管理界面

    默认情况下,rabbitmq 没有安装web端的客户端插件,需要安装才可以生效

    rabbitmq-plugins enable rabbitmq_managemer

    安装完毕后重启服务

    systemctl restart rabbitmq-server

    默认端口: 15672 web管理界面默认账号密码都是guest,默认只能在本机访问

    授权管理

    新增用户

    rabbitmqctl add_user admin admin

    设置用户分配权限

    rabbitmqctl set_user_tags admin administrator

    用户级别 权限由高到低

    1. administrator 超级管理者 可以登录控制台 查看所有信息 对rabbitmq进行管理
    2. monitoring 监控者 可以登录控制台 查看所有信息
    3. policymaker 策略制定者 可以登录控制台 指定策略
    4. management 普通用户 可以登录控制台
    5. none 无权限

    添加用户资源权限

    rabbitmqctl.bat set_permissions -p / admin ".*" ".*" ".*"

    其他指令

    rabbitmqctl add_user 账号 密码 # 添加用户
    rabbitmqctl set_user 账号 权限 # 分配权限
    rabbitmqctl change_password 账号 新密码 # 修改密码
    rabbitmqctl delete_user 账号 # 删除用户
    rabbitmqctl list_user # 查看用户清单
    rabbitmqctl.bat set_permissions -p / 用户名 ".*" ".*" ".*" # 为用户设置资源权限
    rabbitmqctl.bat set_permissions -p / root ".*" ".*" ".*"

    Rabbit简单使用

    RabbitMQ工作模式

    • 工作模式 Work

    特点:分发机制

    轮询:当有多个消费者接入时,消息的分配模式是一个消费者分配一条,直至消息消费完成

    公平分发:当一组消息发送给多个消费者时,消息处理快的消费者分配更多消息。

    • 发布订阅模式 Fanout

    特点:Fanout——发布与订阅模式,广播机制,没有路由key

    队列绑定交换机

    • 路由模式 Direct

    特点:有routing-key的匹配模式

    设置routing-key

    • 主题Topic模式 Topics

    特点:模糊的routing-key的匹配模式

    #:表示0个或多个,可以没有

    *:表示1个或多个,至少只有一个

    .:表示routing key的级别

    e.g: *.order.#表示routing key 中上一级中必须存在,下一级可以不存在,如com.order符合匹配条件

    • 参数模式RPC Headers

    特点:参数匹配模式

    通过 Key-value 匹配queues 。也是忽略routing key的一种路由方式

    SpringBoot集成

    需要导入相关依赖

    <!--Rabbit相关依赖-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit-test</artifactId>
        <scope>test</scope>
    </dependency>

    配置信息

    server:
      port: 8080 
    
    # rabbit默认配置本地服务访问,当rabbit服务在其他网络上时,需要配置信息
    spring:
      rabbitmq:
        host: # IP地址
        username: # 账号
        password: # 密码
        port: # 端口信息
        virtual-host: / # 资源盘

    Fanout模式

    provider

    配置类声明交换机、队列,并进行绑定

    @Configuration
    public class RibbonConfig {
        //1.声明注册fanout模式的交换机
    
        @Bean
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanout_order_exchange", true, false);
        }
    
        //2.声明队列
        @Bean
        public Queue smsQueue() {
            return new Queue("sms.fanout.queue", true);
        }
    
        @Bean
        public Queue phoneQueue() {
            return new Queue("phone.fanout.queue", true);
        }
    
        @Bean
        public Queue emailQueue() {
            return new Queue("email.fanout.queue", true);
        }
    
        //3.完成绑定关系(队列和交换机完成绑定关系)
    
        @Bean
        public Binding smsBing() {
            return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
        }
    
        @Bean
        public Binding phoneBing() {
            return BindingBuilder.bind(phoneQueue()).to(fanoutExchange());
        }
    
        @Bean
        public Binding emailBing() {
            return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
        }
    
    }

    提供服务

    @Service
    public class OrderService {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void makeOrder(String userId, String productId, int num) {
            //1.保存订单
            String orderId = UUID.randomUUID().toString();
            System.out.println("fanout 生成订单:" + orderId);
            //2.通过MQ完成消息分发
            //参数 (交换机,路由key/queue队列名称,消息内容)
            String exchangeName = "fanout_order_exchange";
            String routingKey = "";
            rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
        }
    }

    进行调用

    @Test
    void contextLoads() {
        orderService.makeOrder("1","1",4);
    }

    consumer

    定义队列并进行监听

    @RabbitListener(queues = "email.fanout.queue")
    @Service
    public class FanoutEmailConsumer {
    
        @RabbitHandler
        public void reviseMessage(String message) {
            System.out.println("email fanout  接收数据--->" + message);
        }
    }
    @RabbitListener(queues = "phone.fanout.queue")
    @Service
    public class FanoutPhoneConsumer {
    
        @RabbitHandler
        public void reviseMessage(String message) {
            System.out.println("phone fanout  接收数据--->" + message);
        }
    }
    @RabbitListener(queues = "sms.fanout.queue")
    @Service
    public class FanoutSMSConsumer {
    
        @RabbitHandler
        public void reviseMessage(String message) {
            System.out.println("sms fanout  接收数据--->" + message);
        }
    }

    启动主程序并进行监听

    @SpringBootApplication
    public class FanoutConsumerApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(FanoutConsumerApplication.class, args);
        }
    
    }

    Direct模式

    provider

    配置类

    @Configuration
    public class RibbonConfig {
        //1.声明注册fanout模式的交换机
        @Bean
        public DirectExchange directExchange() {
            return new DirectExchange("direct_order_exchange", true, false);
        }
    
        //2.声明队列
        @Bean
        public Queue smsQueue() {
            return new Queue("sms.fanout.queue", true);
        }
    
        @Bean
        public Queue phoneQueue() {
            return new Queue("phone.fanout.queue", true);
        }
    
        @Bean
        public Queue emailQueue() {
            return new Queue("email.fanout.queue", true);
        }
    
        //3.完成绑定关系(队列和交换机完成绑定关系)
        //direct 需要携带routing key
        @Bean
        public Binding directSmsBing() {
            return BindingBuilder.bind(smsQueue()).to(directExchange()).with("sms");
        }
    
        @Bean
        public Binding directPhoneBing() {
            return BindingBuilder.bind(phoneQueue()).to(directExchange()).with("phone");
        }
    
        @Bean
        public Binding directEmailBing() {
            return BindingBuilder.bind(emailQueue()).to(directExchange()).with("email");
        }
    }

    服务

    public void makeOrderByDirect(String userId, String productId, int num) {
    
        //1. 根据 productId 查看商品库存
    
        //2.保存订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("direc 生成订单:" + orderId);
        //3.通过MQ完成消息分发
        //参数 (交换机,路由key/queue队列名称,消息内容)
        String exchangeName = "direct_order_exchange";
        String routingKey = "sms";//指明向哪个队列发送
        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
        rabbitTemplate.convertAndSend(exchangeName, "phone", orderId);
    }

    consumer

    监听队列

    @RabbitListener(queues = "sms.fanout.queue")
    @Service
    public class DirectSMSConsumer {
    
        @RabbitHandler
        public void reviseMessage(String message) {
            System.out.println("sms direct    接收数据--->" + message);
        }
    }

    ...

    启动程序,持续监听

    Topic模式

    provider

    public void makeOrderByTopic(String userId, String productId, int num) {
        //1. 根据 productId 查看商品库存
    
        //2.保存订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("direc 生成订单:" + orderId);
        //3.通过MQ完成消息分发
        //参数 (交换机,路由key/queue队列名称,消息内容)
        String exchangeName = "topic_order_change";
        String routingKey = "com.sms"; //声明模糊routing key
        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
    }

    调用方法

    consumer

    使用注解方式声明交换机,队列以及相互绑定

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "email.fanout.queue",durable = "true",autoDelete = "false"),
            exchange = @Exchange(value = "topic_order_change",type = ExchangeTypes.TOPIC),
            key = "com.*"
    ))
    @Service
    public class TopicEmailConsumer {
    
        @RabbitHandler
        public void reviseMessage(String message) {
            System.out.println("email topic  接收数据--->" + message);
        }
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "phone.fanout.queue",durable = "true",autoDelete = "false"),
            exchange = @Exchange(value = "topic_order_change",type = ExchangeTypes.TOPIC),
            key = "*.phone.#"
    ))
    @Service
    public class TopicPhoneConsumer {
    
        @RabbitHandler
        public void reviseMessage(String message) {
            System.out.println("phone topic  接收数据--->" + message);
        }
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "sms.fanout.queue",durable = "true",autoDelete = "false"),
            exchange = @Exchange(value = "topic_order_change",type = ExchangeTypes.TOPIC),
            key = "*.sms.#"
    ))
    @Service
    public class TopicSMSConsumer {
    
        @RabbitHandler
        public void reviseMessage(String message) {
            System.out.println("sms topic  接收数据--->" + message);
        }
    }

    启动服务监听

    RabbitMQ配置

    TTL(过期时间)

    概述

    过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置TTL。目前有两种方法可以设置。

    • 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
    • 第二种方法是对消息进行单独设置,每条消息TTL可以不同。

    如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message被投递到死信队列,消费者将无法再收到该消息。

    设置队列TTL

    @Bean
    public Queue emailQueue() {
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-message-ttl",5000);//添加过期时间 参数:x-message-ttl
        return new Queue("email.fanout.queue", true,false,false,args);
    }

    设置消息TTL

    public void makeOrder(String userId, String productId, int num) {
    
        //1. 根据 productId 查看商品库存
    
        //2.保存订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("fanout 生成订单:" + orderId);
        //3.通过MQ完成消息分发
        //参数 (交换机,路由key/queue队列名称,消息内容)
        String exchangeName = "fanout_order_exchange";
        String routingKey = "";
        
        MessagePostProcessor messagePostProcessor = message -> {
            message.getMessageProperties().setExpiration("5000");//设置消息延迟
            message.getMessageProperties().setContentEncoding("utf-8");//设置消息编码格式
            return message;
        };
        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId,messagePostProcessor);
    }

    死信队列

    概述

    DLX(Dead-Letter-Exchange),可以称之为死信交换机、死信邮箱,当消息在一个队列中变成死信(dead message)之后,能被重新发送到另一个交换机(DLX)中,绑定DLX的队列就称为死信队列,消息变成死信的原因:

    • 消息被拒绝
    • 消息过期
    • 队列达到最大长度

    DLX是正常的交换机,能在任何队列上被指定,实际上就是设置某一个队列的属性。

    当这个队列中存在死信时,RabbitMQ会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列(死信队列)。

    想要使用死信队列,只需要在定义队列的时候设置队列参数:x-dead-letter-exchange指定交换机即可

    定义死信交换机和死信队列

    //********************
    //死信交换机
    @Bean
    public DirectExchange deadDirect() {
        return new DirectExchange("dead_direct_exchange", true, false);
    }
    @Bean
    public Queue deadQueue() {
        return new Queue("dead.direct.queue", true);
    }
    @Bean
    public Binding deadQueueBing() {
        return BindingBuilder.bind(deadQueue()).to(deadDirect()).with("dead");
    }
    //********************

    绑定死信队列

    @Bean
    public Queue emailQueue() {
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-message-ttl", 5000);//添加过期时间 参数:x-message-ttl
          args.put("x-dead-letter-exchange","dead_direct_exchange"); //绑定死信交换机
        args.put("x-dead-letter-routing-key","dead");//绑定死信队列 fanout模式不需要配置
        return new Queue("email.fanout.queue", true, false, false, args);
    }

    内存磁盘管理

    内存控制

    image-20210831221936410

    RabbitMQ内存默认占比为0.4(服务器总内存占比的0.4)

    当RabbitMQ的内存超过40%,就会产生警告,并且阻塞所有生产者连接,可以通过命令(仅本次启动)或配置文件方式进行修改(永久更改)

    命令修改:

    # 此方法rabbit重启后失效
    # rabbitmqctl set_vm_memory_high_watermark <fraction>
    rabbitmqctl set_vm_memory_high_watermark 0.6
    # rabbitmqctl set_vm_memory_high_watermark absolute <value>
    rabbitmqctl set_vm_memory_high_watermark absolute 50MB

    修改配置文件:/etc/rabbitmq/rabbitmq.conf

    # 使用relative相对值进行设置fraction,建议取值在0.4-0.7之间,不建议超过0.7
    vm_memory_high_watermark.ralative = 0.6
    # 使用absolute的绝对值方式修改,单位KB,MB,GB
    vm_memory_high_watermark_absolute = 2GB

    内存换页

    在某个Broker节点及内存阻塞生产者之前,它会尝试将队列中的消息换页到磁盘以释放内存空间,持久化和非持久化的消息都会写入磁盘中,其中持久化的消息本身就在磁盘中有一个副本,所以在转移的过程中持久化的消息会先从内存中清除掉

    默认情况下,内存到达的阈值50%时就会换页处理

    也就是说,在默认情况下该内存的阈值是0.4的情况下,当内存超过0.4*0.5=0.2时,会进行换页动作。

    ​ 比如有1000MB内存,当内存的使用率达到了400MB,已经达到了极限,但是因为配置的换页内存0.5,这个时候会在达到极限400mb之前,会把内存中的200MB进行转移到磁盘中。从而达到稳健的运行。
    可以通过设置vm_memory_high_watermark_paging_ratio来进行调整

    vm_memory_high_watermark.relative = 0.4
    vm_memory_high_watermark.paging_ratio = 0.7 # 设置值小于1,阈值才有意义

    磁盘预警

    当磁盘的剩余空间低于确定阈值时,同样会阻塞生产者,可以比米娜因非持久化的消息持续换页而耗尽磁盘空间导致服务器崩溃

    默认情况下︰磁盘预警为50MB的时候会进行预警。表示当前磁盘空间第50MB的时候会阻塞生产者并且停止内存消息换页到磁盘的过程。

    RabbitMQ定期检查可用磁盘空间量。检查磁盘空间的频率与上次检查时的空间大小有关(以确保在空间耗尽时磁盘报警及时消失)。通常情况下,磁盘空间每10秒检查一次,但随着达到极限,频率会增加。当接近极限时,RabbitMQ将每秒检查10次。这可能会对系统负载有一些影响。

    这个阈值可以减小,但是不能完全的消除因磁盘耗尽而导致崩溃的可能性。比如在两次磁盘空间的检查空隙内,第一次检查是:60MB,第二检查可能是:1MB,就会出现警告。

    配置磁盘可用空间限制

    disk_free_limit.relative = 1.0 # 设置相对于机器中的RAM的可用空间限制
    disk_free_limit.absolute = 1GB # 内存单位

    在群集中运行RabbitMQ时,磁盘警报是集群范围内的; 如果一个节点超出限制,那么所有节点都将阻止传入的消息

    消息确认机制

    在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,但当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?

    如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎么解决呢?

    RabbitMQ为我们提供了两种方式:

    1. 通过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案;
    2. 通过将channel设置成confirm模式来实现;

    为了保证消息从队列可靠地到达消费者,RabbitMQ提供消息确认机制(message acknowledgment)。消费者在声明队列时,可以指定noAck参数,当noAck=false时RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它。

    采用消息确认机制后,只要令noAck=false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直持有消息直到消费者显式调用basicAck为止。

    当noAck=false时,对于RabbitMQ服务器端而言,队列中的消息分成了两部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者ack信号的消息。如果服务器端一直没有收到消费者的ack信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。

    RabbitMQ不会为未ack的消息设置超时时间,它判断此消息是否需要重新投递给消费者的唯一依据消费该消息的消费者连接是否已经断开。这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。

    RabbitMQ管理平台界面上可以看到当前队列中Ready状态和Unacknowledged状态的消息数,分别对应上文中的等待投递给消费者的消息数和已经投递给消费者但是未收到ack信号的消息数。

    关闭自动消息确认,进行手动ack

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(ConfirmConfig.queueName, false, consumer);
    
    while(true){
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String msg = new String(delivery.getBody());
      // do something with msg. 
      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }

    broker发现当前消息无法被路由到指定的queues中(如果设置了mandatory属性,则broker会发送basic.return)
    非持久属性的消息到达了其所应该到达的所有queue中(和镜像queue中)
    持久消息到达了其所应该到达的所有queue中(和镜像中),并被持久化到了磁盘(fsync)
    持久消息从其所在的所有queue中被consume了(如果必要则会被ack)

    Confirm模式

    手动ack确认

    持久化机制

    为了保证RabbitMQ在退出或者crash等异常情况下数据没有丢失,需要将queue、exchange和message都持久化。——保证消息可靠性

    如果queue进行持久化操作,当服务重启之后,队列也会存在:服务会把持久化的queue写入磁盘,当服务重启的时候,会重新声明之前持久化队列

    队列持久化

    queue的持久化是通过durable=true来实现的。

    Connection connection = connectionFactory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("queue.persistent.name", true, false, false, null);

    是第二个参数设置为true,即durable=true.

    源码

        /**
         * @param queue queue的名称
         * @param durable 是否持久化
         * @param exclusive 排他队列,如果一个队列被声明为排他队列,该队列仅对首次申明它的连接可见,并在连接断开时自动删除。
                                                 1. 排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一连接创建的排他队列;
                                                 2.“首次”,即如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;
                                                 3.即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端
                                                     发送读取消息的应用场景
         * @param autoDelete 自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
         * @param arguments 携带参数
         * @return a declaration-confirm method to indicate the queue was successfully declared
         * @throws java.io.IOException if an error is encountered
         */
        Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                     Map<String, Object> arguments) throws IOException;
    

    queueDeclarePassive(String queue)可以用来检测一个queue是否已经存在。如果该队列存在,则会返回true;如果不存在,就会返回异常,但是不会创建新的队列

    消息持久化

    如果要在重启后保持消息的持久化,就必须设置消息是持久化的标识MessageProperties.PERSISTENT_TEXT_PLAIN

    channel.basicPublish("exchange.persistent", "persistent", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_test_message".getBytes());

    basicPublish的方法:

    /**
    *    exchange        exchange的名称
    *    routingKey    routingKey的名称
    * body                发送的消息体
    */
    
    void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
    void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
            throws IOException;
    void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
            throws IOException;
    
    //BasicProperties的定义
    
    
    public BasicProperties(
                String contentType,//消息类型如:text/plain
                String contentEncoding,//编码
                Map<String,Object> headers,
                Integer deliveryMode,//1:不持久化 2:持久化
                Integer priority,//优先级
                String correlationId,
                String replyTo,//反馈队列
                String expiration,//expiration到期时间
                String messageId,
                Date timestamp,
                String type,
                String userId,
                String appId,
                String clusterId)

    换言之,消息持久化标识就是MessageProperties对象参数 deliveryMode=2

    当设置了队列和消息的持久化之后,当broker服务重启的之后,消息依旧存在。单只设置队列持久化,重启之后消息会丢失;单只设置消息的持久化,重启之后队列消失,既而消息也丢失。单单设置消息持久化而不设置队列的持久化显得毫无意义。

    交换机持久化

    如果exchange不设置持久化,那么当broker服务重启之后,exchange将不复存在,那么既而发送方rabbitmq producer就无法正常发送消息。这里博主建议,同样设置exchange的持久化。exchange的持久化设置也特别简单,方法如下:

    Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
    Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
                                       Map<String, Object> arguments) throws IOException;
    Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
    Exchange.DeclareOk exchangeDeclare(String exchange,
                                              String type,
                                              boolean durable,
                                              boolean autoDelete,
                                              boolean internal,
                                              Map<String, Object> arguments) throws IOException;
    void exchangeDeclareNoWait(String exchange,
                               String type,
                               boolean durable,
                               boolean autoDelete,
                               boolean internal,
                               Map<String, Object> arguments) throws IOException;
    Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;

    一般只需要:channel.exchangeDeclare(exchangeName, “direct/topic/header/fanout”, true);即在声明的时候讲durable字段设置为true即可。

    分析

    将queue,exchange, message等都设置了持久化之后并不能100%保证数据不丢失,

    ​ 当autoAck =true,那么当consumer接收到相关消息之后,还没来得及处理就crash掉了,那么这样也算数据丢失。这种情况需要将autoAck设置为true(方法定义如下),然后在正确处理完消息之后进行手动ack

    ​ 消息在正确存入RabbitMQ之后,还需要有一段时间(这个时间很短,但不可忽视)才能存入磁盘之中,RabbitMQ并不是为每条消息都做fsync的处理,可能仅仅保存到cache中而不是物理磁盘上,在这段时间内RabbitMQ broker发生crash, 消息保存到cache但是还没来得及落盘,那么这些消息将会丢失。这种情况可以选择RabbitMQ的mirrored-queue即镜像队列,相当于配置了副本,当master在此特殊时间内crash掉,可以自动切换到slave,这样有效的保障了HA, 除非整个集群都挂掉。此外还可以在生产者引入事务机制、Confirm机制来确保消息已经正确的发送至broker端。随着可靠性增加,性能随之降低

    写入原理

    ​ 在写入文件前会有一个Buffer,大小为1M,数据在写入文件时,首先会写入到这个Buffer,如果Buffer已满,则会将Buffer写入到文件(未必刷到磁盘)。

    ​ 每个25ms,Buffer里的数据(不管Buffer是否已满)及未刷新到磁盘的文件内容必定会刷到磁盘。

    ​ 每次消息写入后,如果没有后续写入请求,则会直接将已写入的消息刷到磁盘:使用Erlang的receive x after 0实现,只要进程的信箱里没有消息,则产生一个timeout消息,而timeout会触发刷盘操作。

    Rabbit集群

    1. 启动多个rabbitmq服务,并指定不同的端口
    2. 设置主节点

      sudo rabbitmqctl -n rabbit-1 stop_app     # 停止服务
      sudo rabbitmqctl -n rabbit-1 reset             # 清除节点上的历史数据
      sudo rabbitmqctl -n rabbit-1 start_app     # 启动服务
    3. 设置从节点

      sudo rabbitmqctl -n rabbit-2 stop_app     # 停止服务
      sudo rabbitmqctl -n rabbit-2 reset             # 清除节点上的历史数据
      sudo rabbitmqctl -n rabbit-2 join_clusten rabbit-1@'Server-node' # 加入主节点集群中,(Server-node服务器的主机名)
      sudo rabbitmqctl -n rabbit-2 start_app     # 启动服务
      
      sudo rabbitmqctl cluster_status -n rabbit-1 # 查看集群状态

    如果采用多机部署方式,需要读取其中一个节点的cookie,并赋值到其他节点中(节点之间通过cookie确定相互是否可通信),cookie存放在/var/lib/rabbitmq/.erlang.cookie

    分布式事务

    概述

    分布式事务指服务的操作位于不同的节点上,需要保证事务的AICD特性

    解决方案

    1. 两阶段提交(2PC)

    需要数据库支持,java组件:atomikos等

    两阶段提交,通过引入协调者协调参与者的行为,并最终决定这些参与者是否要真正执行事务

    所有事务参与者在等待其他参与者响应时都处于同步阻塞状态,当大量的请求并发时,会造成拥堵,事务同时处理过多时,造成瘫痪

    1. 补偿事务(TCC)

    采用补偿机制:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。分为三个阶段

    • try阶段主要对业务系统做检测和资源预留
    • confirm阶段主要是业务系统做确认提交,try阶段执行成功并开始执行confirm阶段时,默认Confirm阶段不会出错。(只要try成功,confirm一定成功)
    • cancel阶段主要在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放

    ---

    优点:跟2PC比起来,实现及流程相对简单,但数据的一致性较差

    缺点:在第2、3步都可能失败,TCC属于应用层的一种补偿方式,在一些业务流程可能不好定义及处理

    1. 本地消息表(异步确保)

    本地消息表与业务数据表处于同一数据库中,利用本地事务来保证在对两个表的操作满足事务性,并使用消息队列保证最终一致性

    • 在分布式事务操作的一方完成写业务数据的操作之后向本地消息表发送一个消息,本地事务能保证这个消息一定会被写入本地消息表中。
    • 之后将本地消息表中的消息转发到Kafka等消息队列中,如果转发成功则将消息从本地消息表中删除,否则继续重新转发。
    • 在分布式事务操作的另一方从消息队列中读取一个消息,并执行消息中的操作。

    ---

    优点:避免分布式事务,实现最终一致性

    缺点:消息队列耦合到业务系统中,如果没有封装好的解决方案,需要处理很多细节

    1. MQ事务消息

    有一些第三方的MQ是支持事务消息的,比如RocketMQ,他们支持事务消息的方式也是类似于采用的二阶段提交,但是市面上一些主流的MQ都是不支持事务消息的,比如Kafka不支持。
    以阿里的 RabbitMQ中间件为例,其思路大致为:

    • 第一阶段Prepared消息,会拿到消息的地址。第二阶段执行本地事务,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。
    • 也就是说在业务方法内要想消息队列提交两次请求,一次发送消息和一次确认消息。如果确认消息发送失败了RabbitMQ会定期扫描消息集群中的事务消息,这时候发现了Prepared消息,它会向消息发送者确认,所以生产方需要实现一个check接口,RabbitMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。

    ---

    优点:实现最终一致性,不需要依赖本地数据库事务

    缺点:实现难度大,主流MQ不支持,RocketMQ事务消息部分代码未开源

    事务

    hidewnd

    hidewnd

    Just So So ...

    留下你的评论

    快留下你的小秘密吧