RabbitMQ笔记

消息队列(MQ,Message Queue)本质是一个队列,队列中存放的内容是message,是一种跨进程的通信机制,用于上下游传递消息。

RabbitMQ

MQ的概念

MQ (message queue 消息队列) 本质上是个队列具有队列FIFO(先进先出)的特性 只不过队列中存储的是”Message“ 还是一种跨进程的通讯机制,用于上下游传递消息,MQ是一种常见的上下游逻辑+物理解耦的消息通信服务

有了MQ上游与下游通信只需要依赖MQ就可以了,不再需要其他服务。

MQ可以做什么

  1. 流量消峰

    通过类似排队的方式,将任务以消息的形式存储到队列中慢慢处理,从而保护服务器不会因为秒杀等业务导致服务器宕机,而且可以限制队列中的任务数,对任务数量进行限制。

  2. 应用解耦

    系统与系统之间不再直接调用,而是通过MQ将消息下发到其他系统从而达到解耦,某个系统的异常不会干扰到其他系统的正常运行。

  3. 异步处理

    当处理一个花费时间比较长的操作时,通过MQ将任务交给其他线程异步处理加快响应速度提高用户体验。

MQ的分类

  1. ActiveMQ

    Apache出品 维护越来越少 高吞吐量场景使用较少

  2. Kafka

    为大数据而生的消息队列 百万级TPS (数据量非常大的业务可以选用,日志采集首选kafka)

  3. RocketMQ

    阿里巴巴出品 java语言开发 消息零丢失 支持十亿消息堆积 缺点就是支持的语言少,社区活跃度一般,系统迁移修改代码较多(为金融互联网而生,经历过双十一的考验)

  4. RabbitMQ

    当前中小公司最主流的消息消息中间件 支持语言多 社区活跃度高 中小公司首选

四大核心概念

  1. 生产者 发送消息到交换机
  2. 交换机 转发消息到队列
  3. 队列 存储消息
  4. 消费者 消费消息

六个模式

  1. 简单模式 : 一个生产者 一个队列 一个消费者

  2. 工作模式 : 一个生产者 一个队列 多个消费者

  3. 发布订阅模式 : 一个生产者 一个交换机 多个队列 多个消费者

    • Fanout 广播 发送到所有Binding的队列当中 routingKey没有任何作用
  4. 路由模式

    • Direct 定向 发送到指定的RoutingKey的队列当中
  5. 主题模式

    • Topic 通配符 发送到符合RoutingPattern的队列当中

      '*' 一个单词

      '#' 零到多个单词 这里以作为点分隔单词

    操作

    // 声明交换机                         Fanout     Direct    Topic 
    channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, false, false, false, null);
    channel.queueBind(queueName, exchangeName, "routingKey");    //绑定
    
  6. RPC

死信队列

队列中无法被消费者正确处理的消息

两个正常的交换机和队列,给正常队列绑定死信交换机和RoutingKey,当这个队列中的消息成为死信的时候,将会转发的死信交换机

死信的产生:

1. 队列已满
1. 消息被拒收并未重新入队
1. 消息超时

延迟队列

在RabbitMQ中没有提供延迟队列,而延迟队列的应用场景反而很常见,简单的延迟队列可以通过正常的队列配合死信队列,给正常的消息加上过期时间,监听死信队列即可实现简单的延迟队列

问题:但是这中方式实现的延迟队列在时间上是固定的没办法实现灵活的延迟时间控制,如果两个消息的过期时间第一个消息比第二个消息的过期时间要长,MQ只会检测第一条消息,第一条消息没有过期就不会再看第二条消息的过期时间,导致第二条消息没办法在指定的时间到达队列

解决方案:使用以上方案不能实现在消息粒度上的 TTL,通过安装插件可以让RabbitMQ支持延迟队列

  1. 下载插件 官网插件
  2. 移动到对应目录 /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
  3. 启用插件 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  4. 重启RabbitMQ服务 systemctl restart rabbitmq-server 貌似不重启也可以,建议重启

通过安装插件,在创建交换机时会有一个新的 x-delayed-message 选项

延迟队列代码实现

  1. 交换机和队列的创建

    @Configuration
    public class RabbitMQDelayedConfig {
        public static final String EXCHANGE_NAME = "delayed_exchange";
        public static final String QUEUE_NAME = "delayed_queue";
    
        @Bean("delayedExchange")
        public Exchange DelayedExchange() {
            return new ExchangeBuilder(EXCHANGE_NAME, ExchangeTypes.FANOUT).delayed().durable(true).build();
        }
        @Bean("delayedQueue")
        public Queue delayedQueue() {
            return QueueBuilder.durable(QUEUE_NAME).build();
        }
        @Bean
        public Binding delayedBinding(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") Exchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with("").noargs();
        }
    }
    
  2. 发送延迟消息

        @Test
        void testDelayedQueue() {
            rabbitTemplate.convertAndSend(RabbitMQDelayedConfig.EXCHANGE_NAME, "", "delayed Message test",
                    msg -> {
                        msg.getMessageProperties().setDelay(10 * 1000); //延迟时间 ms
                        return msg;
                    }
            );
        }
    
  3. 消费者正常消费

功能模块

​ Broker RabbitMQ
​ Virtual Host 虚拟分组 包含交换机和队列属于Broker
​ Exchange 交换机
​ Queue 队列
​ Connection 链接
​ Channel 信道
​ Producer 生产者
​ Consumer 消费者
​ Binding 交换机和队列的绑定

安装RabbitMQ

Linux:

  1. 所需依赖

    Github下载erlang https://github.com/rabbitmq/erlang-rpm

    Github下载rabbitMQ https://github.com/rabbitmq/rabbitmq-server

    yum install socat -y
    rpm -ivh erlang...
    rpm -ivh rabbitmq...
    
  2. 加入启动项

    chkconfig rabbitmq-server on
    systemctl status rabbitmq-server
    
  3. 设置主机名对应本机IP

    vim /etc/hosts
    
  4. 启用web端管理页面

    rabbitmq-plugins enable rabbitmq_management
    #启用完成后,重启rabbitmq服务   RabbitMQ 的默认访问端口是 15672
    
  5. 设置用户角色 角色固定有四种级别:

    administrator:可以登录控制台、查看所有信息、并对rabbitmq进行管理
    monToring:监控者;登录控制台,查看所有信息
    policymaker:策略制定者;登录控制台指定策略
    managment:普通管理员;登录控制
    rabbitmqctl set_user_tags 用户名 角色

    安装完成后默认有一个guest用户,但是由于权限问题不允许外部用户访问所以需要添加一个新的用户用来远程管理

    rabbitmq-plugins enable rabbitmq_management
    # 创建账号和密码
    rabbitmqctl add_user 用户名 密码
    
    # 为用户添加资源权限,添加配置、写、读权限
    # set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
    rabbitmqctl set_permissions -p "/" y ".*" ".*" ".*"
    
    # 修改密码
    rabbitmqctl change_ password 用户名 新密码
    # 删除用户
    rabbitmqctl delete_user 用户名
    # 查看用户清单
    rabbitmqctl list_user
    

使用原生RabbitMQ

基本方式: 生产者 -- 队列 -- 消费者

要通过信道发送消息而不是直接通过连接对象

工具类代码

public class RabbitMQUtil {
    /**
     * 创建连接工厂  设置rabbitMq服务器的信息
     * 获取链接   获取信道
     */
    public static Channel getChannel() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.6.100");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        return connection.createChannel();
    }
}

生产者代码

public class Producer {
    public static final String QUEUE_NAME = "hello";
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtil.getChannel();
        // 声明一个消息队列      队列名     是否持久化  是否共享队列  是否自动删除  其他参数
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        String message = "Hello world!";
        //发送一个消息         指定交换机  路由的KEY值   其他参数    消息
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    }
}

消费者代码

public class Consumer {
    public static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtil.getChannel();
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody()));
            System.out.println("消费成功的回调执行了。");
        };
        CancelCallback cancelCallback = (value) -> {
            System.out.println("消费中断的回调执行了。");
        };
        // 消费哪个队列    是否自动应答   消费成功回调  消费者取消消费的回调
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

消息应答

消息应答就是:消费者在接收到消息并处理完成消息后,告诉MQ我已经出离完成,随后MQ将消息删除,如果消息没有被成功消费在一段时间后将会被重新放入队列中,如果多次无法消费成功,这条消息将会被丢到死信队列。在开发中要尽量避免死信的出现和正确处理死信队列中的消息。

在上面的消费者的代码中设置了消息自动应答,拿到消息后会自动应答,如果将自动应答关闭则需要在回调方法中手动应答,通过传入的consumerTag进行应答消息

应答代码

//确认应答      标签         是否批量应答   
basicAck(long consumerTag, boolean multiple);
//否定应答     标签
basicNack(long deliveryTag, boolean multiple, boolean requeue);
//否定应答       标签     是否重新入队  (false 直接丢弃消息)
basicReject(long consumerTag, boolean requeue);

//是否恢复消息入队
basicRecover(boolean requeue);

Multiple 的解释:

手动应答的好处是可以批量应答并且减少网络拥堵

  • true 代表批量应答 channel 上未应答的消息

    比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答

  • false 同上面相比只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

批量应答要防止错误应答的发生,批量应答目的是为了防止过于频繁的请求RabbitMQ服务器,对RabbitMQ服务器造成压力。

消息分发

  • 轮询分发

    不论处理消息的能力如何,分配的消息的数量都是相同的

  • 不公平分发

    可以根据处理消息的能力进行分发不同数量的消息

    channel.basicQos(int prefetchCount) //开启不公平分发  prefetchCount是预读取数量,当读取了n条时不再接收消息 自然谁执行的快谁就拿到的消息多   如果预取值是0那就是无限的预取值将不断收到消息不管处理消息的能力如何
    

消息发布确认

消息确认模式:

防止消息发布时出现发布失败但是并不知道的情况发生

  • 单个确认

    同步等待消息确认,安全性最高但是性能最低 速度较慢

  • 批量确认

    性能优于单个确认 但是连接的次数最少,对Rabbit服务器的压力最小

  • 异步批量确认

    性能最好,但是需要写异步处理的监听器,稍微有些麻烦但是是最优解

代码示例:

channel.confirmSelect(); //开启消息发布确认模式

//单个确认或者批量确认时使用 
channel.waitForConfirms();  //将信道中发送的消息进行确认

//使用异步确认模式时使用
//成功时回调
ConfirmCallback ackCallback = (deliveryTag,multiple) ->{
    System.out.println("确认的消息:"+deliveryTag);
};
//失败时回调
ConfirmCallback nackCallback= (deliveryTag,multiple) ->{
    System.out.println("未确认的消息:"+deliveryTag);
};
// 在信道上进行监听消息并进回调
channel.addConfirmListener(ackCallback,nackCallback); 

SpringBoot整合

  1. 依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  2. yml

    spring:
      rabbitmq:
        host: 192.168.6.100
        port: 5672
        username: admin
        password: admin 
        publisher-confirm-type: correlated  #开启发布确认模式
        publisher-returns: true   # 开启returnCallback
    
  3. 配置RabbitMQ的交换机,队列和绑定关系

    @Configuration
    public class RabbitMQConfig {
        public static final String EXCHANGE_NAME = "boot_topic_exchange";
        public static final String QUEUE_NAME = "boot_topic_queue";
        //    声明交换机
        @Bean("bootExchange")
        public Exchange bootExchange() {
            return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
        }
        //    声明队列
        @Bean("bootQueue")
        public Queue bootQueue() {
            return QueueBuilder.durable(QUEUE_NAME).build();
        }
        //    声明绑定关系   队列 - 交换机 - routingKey
        @Bean
        public Binding bindQueueExchange
            (@Qualifier("bootExchange") Exchange exchange, @Qualifier("bootQueue") Queue queue) {
            return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
        }
    }
    
  4. 生产者

    // 交换机转发失败return回调
    rabbitTemplate.setMandatory(true); //设置交换机将失败的消息发送到returnCallback
    rabbitTemplate.setReturnsCallback(returnedMessage -> {
        byte[] body = returnedMessage.getMessage().getBody();
        String message = new String(body);
        System.out.println("失败返回的消息:" + message);
    });
    
    //消息发送确认回调
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        /**
        * @param correlationData   相关配置信息
        * @param ack               是否成功收到消息
        * @param cause             失败的原因
        */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (ack) {
                System.out.println("消息接收成功");
            } else {
                System.out.println("消息未能成功接收 原因:" + cause);
            }}});
    // 发送消息
    rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.log","hello");
    
  5. 消费者

    @Component
    public class RabbitMQListener {
        @RabbitListener(queues = "boot_direct_queue1", ackMode = "MANUAL")  // 设置独立的确认模式 不设置则遵循全局配置
        public void listenBootMessage(Message message, Channel channel) throws IOException {
            try {
                System.out.println(new String(message.getBody()));
                // 处理成功签收消息
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认签收
            } catch (Exception e) {
                //  出现异常 重回队列  拒签 并且不重回队列,如果绑定了死信队列这条消息将会到达死信队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            }
        }
    }
    

其他概念

消息幂等性

用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等。

消息重复消费/生产

消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断, 故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。

使用唯一的消息ID 防止重复消费和重复生产 可以配合Redis的setnx实现

队列优先级

应用场景: 购物系统在订单量大的情况下对创造价值较高的店铺订单功能提高处理优先级
配置队列 x-max-priority 参数 最大255官网推荐 1-10 不然会占用过高的系统资源
消息发送时也要配置优先级
配置完优先级,队列中的数据将会被重新排列,优先级高的将会被优先消费

惰性队列

如果消费者下线维护或宕机,大量消息堆积在内存中,内存占用只增不减,这时候就需要惰性队列进行存储,将消息存储到硬盘中以免占用过多的内存