Skip to content

消息队列

欢迎来到消息队列知识库!

📨 消息队列简介

消息队列(Message Queue)是一种应用间通信方式,通过发送和接收消息来实现应用解耦、异步处理和削峰填谷。

🎯 为什么需要消息队列

1. 异步处理

不使用消息队列:

用户注册 -> 写入数据库 (50ms)
        -> 发送邮件 (100ms)
        -> 发送短信 (100ms)
总耗时: 250ms

使用消息队列:

用户注册 -> 写入数据库 (50ms)
        -> 发送消息到队列 (5ms)
总耗时: 55ms

异步处理:
消息队列 -> 发送邮件 (100ms)
        -> 发送短信 (100ms)

2. 系统解耦

不使用消息队列:

订单服务 -> 直接调用库存服务
        -> 直接调用支付服务
        -> 直接调用物流服务

使用消息队列:

订单服务 -> 发送消息到队列
消息队列 -> 库存服务(订阅)
        -> 支付服务(订阅)
        -> 物流服务(订阅)

3. 削峰填谷

秒杀场景:
瞬时请求 10000 QPS -> 消息队列 -> 匀速处理 1000 QPS

4. 日志处理

应用日志 -> 消息队列 -> 日志处理系统

📊 常用消息队列对比

特性RabbitMQKafkaRocketMQ
开发语言ErlangScala/JavaJava
吞吐量万级十万级十万级
时效性微秒级毫秒级毫秒级
可用性高(主从)非常高(分布式)非常高(分布式)
功能特性丰富简单丰富
社区活跃度非常高
适用场景企业级应用大数据、日志电商、金融

🐰 RabbitMQ

核心概念

生产者 -> Exchange (交换机) -> Queue (队列) -> 消费者
                 |
              Binding (绑定)

交换机类型

1. Direct Exchange (直连)

Producer -> Exchange (direct)
              |
              ├─ routing_key: "error" -> Queue1 (error logs)
              └─ routing_key: "info"  -> Queue2 (info logs)

2. Fanout Exchange (扇出)

Producer -> Exchange (fanout)
              |
              ├─> Queue1 (广播)
              ├─> Queue2 (广播)
              └─> Queue3 (广播)

3. Topic Exchange (主题)

Producer -> Exchange (topic)
              |
              ├─ pattern: "*.error" -> Queue1
              ├─ pattern: "user.*"  -> Queue2
              └─ pattern: "order.#" -> Queue3

4. Headers Exchange

根据消息头属性路由。

快速开始 (Node.js)

javascript
const amqp = require('amqplib');

// 生产者
async function sendMessage() {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();
    
    const queue = 'hello';
    await channel.assertQueue(queue, { durable: true });
    
    channel.sendToQueue(queue, Buffer.from('Hello RabbitMQ!'), {
        persistent: true
    });
    
    console.log('Message sent');
    
    await channel.close();
    await connection.close();
}

// 消费者
async function receiveMessage() {
    const connection = await amqp.connect('amqp://localhost');
    const channel = await connection.createChannel();
    
    const queue = 'hello';
    await channel.assertQueue(queue, { durable: true });
    
    channel.consume(queue, (msg) => {
        console.log('Received:', msg.content.toString());
        channel.ack(msg);  // 手动确认
    });
}

消息确认机制

1. 生产者确认

javascript
channel.confirmSelect();
channel.sendToQueue(queue, message, {}, (err, ok) => {
    if (err) {
        console.log('消息发送失败');
    } else {
        console.log('消息发送成功');
    }
});

2. 消费者确认

javascript
// 自动确认(不推荐)
channel.consume(queue, (msg) => {
    console.log(msg.content.toString());
}, { noAck: true });

// 手动确认(推荐)
channel.consume(queue, (msg) => {
    try {
        // 处理消息
        channel.ack(msg);  // 确认
    } catch (err) {
        channel.nack(msg, false, true);  // 拒绝并重新入队
    }
});

死信队列 (DLX)

javascript
// 创建死信交换机
await channel.assertExchange('dlx_exchange', 'direct');
await channel.assertQueue('dlx_queue');
await channel.bindQueue('dlx_queue', 'dlx_exchange', '');

// 创建普通队列,设置死信交换机
await channel.assertQueue('normal_queue', {
    deadLetterExchange: 'dlx_exchange',
    messageTtl: 10000  // 10秒过期
});

🔥 Kafka

核心概念

生产者 -> Topic -> Partition 0 (Leader + Replicas)
                -> Partition 1 (Leader + Replicas)
                -> Partition 2 (Leader + Replicas)
消费者组 -> Consumer 0
        -> Consumer 1
        -> Consumer 2

关键术语

  • Topic - 消息主题
  • Partition - 分区,实现并行
  • Producer - 生产者
  • Consumer - 消费者
  • Consumer Group - 消费者组
  • Offset - 偏移量,消息位置

快速开始 (Node.js)

javascript
const { Kafka } = require('kafkajs');

const kafka = new Kafka({
    clientId: 'my-app',
    brokers: ['localhost:9092']
});

// 生产者
async function produce() {
    const producer = kafka.producer();
    await producer.connect();
    
    await producer.send({
        topic: 'test-topic',
        messages: [
            { key: 'key1', value: 'Hello Kafka!' }
        ]
    });
    
    await producer.disconnect();
}

// 消费者
async function consume() {
    const consumer = kafka.consumer({ groupId: 'test-group' });
    await consumer.connect();
    await consumer.subscribe({ topic: 'test-topic', fromBeginning: true });
    
    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
            console.log({
                key: message.key.toString(),
                value: message.value.toString(),
                offset: message.offset
            });
        }
    });
}

分区策略

1. 轮询(默认)

javascript
await producer.send({
    topic: 'test-topic',
    messages: [
        { value: 'message1' },  // -> partition 0
        { value: 'message2' },  // -> partition 1
        { value: 'message3' }   // -> partition 2
    ]
});

2. 按 Key 分区

javascript
await producer.send({
    topic: 'test-topic',
    messages: [
        { key: 'user1', value: 'message1' },  // 同一 key 进同一分区
        { key: 'user1', value: 'message2' },
        { key: 'user2', value: 'message3' }
    ]
});

消费者组

Topic (3 partitions)

  ├─ Partition 0 -> Consumer 0 (Group A)
  ├─ Partition 1 -> Consumer 1 (Group A)
  └─ Partition 2 -> Consumer 2 (Group A)
  
不同组可以独立消费:
  ├─ Partition 0,1,2 -> Consumer (Group B)

🚀 RocketMQ

核心概念

Producer -> NameServer (路由信息)
         -> Broker (Cluster)
            └─ Topic -> Queue 0
                     -> Queue 1
                     -> Queue 2
Consumer Group

消息类型

1. 普通消息

java
// 生产者
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();

Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
SendResult result = producer.send(msg);

producer.shutdown();

2. 顺序消息

java
// 保证同一订单的消息顺序
producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Long orderId = (Long) arg;
        long index = orderId % mqs.size();
        return mqs.get((int) index);
    }
}, orderId);

3. 延迟消息

java
Message msg = new Message("TopicTest", "TagA", "Delayed message".getBytes());
// 设置延迟级别: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
msg.setDelayTimeLevel(3);  // 10秒后消费
producer.send(msg);

4. 事务消息

java
TransactionMQProducer producer = new TransactionMQProducer("transaction_group");
producer.setTransactionListener(new TransactionListener() {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地事务
        return LocalTransactionState.COMMIT_MESSAGE;
    }
    
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 事务状态回查
        return LocalTransactionState.COMMIT_MESSAGE;
    }
});

producer.start();
producer.sendMessageInTransaction(msg, null);

🎯 应用场景

1. 异步解耦

用户注册
  ├─> 写入数据库
  └─> 发送消息 -> 消息队列
                   ├─> 发送欢迎邮件
                   ├─> 发送短信验证码
                   └─> 添加积分

2. 削峰填谷

秒杀场景:
用户请求 -> 消息队列 (缓冲) -> 订单处理系统 (匀速消费)

3. 日志收集

应用服务器 -> 消息队列 -> 日志处理 -> Elasticsearch

4. 数据同步

订单系统 -> 消息队列 -> 库存系统
                   -> 物流系统
                   -> 数据仓库

💡 最佳实践

1. 消息幂等性

javascript
// 使用唯一 ID 防止重复消费
const processedIds = new Set();

consumer.run({
    eachMessage: async ({ message }) => {
        const msgId = message.key.toString();
        
        if (processedIds.has(msgId)) {
            console.log('消息已处理,跳过');
            return;
        }
        
        // 处理消息
        processOrder(message);
        
        processedIds.add(msgId);
    }
});

2. 消息顺序性

  • RabbitMQ: 单队列单消费者
  • Kafka: 同一分区保证顺序
  • RocketMQ: 使用顺序消息

3. 消息可靠性

生产者:

  • 开启确认机制
  • 消息持久化
  • 重试机制

消费者:

  • 手动确认
  • 异常处理
  • 重试队列

4. 性能优化

  • 批量发送 - 减少网络开销
  • 批量消费 - 提高吞吐量
  • 异步处理 - 提高并发
  • 合理分区 - Kafka 分区数 = 消费者数

📖 学习资源

官方文档

推荐书籍

  • 《RabbitMQ 实战》
  • 《Kafka 权威指南》
  • 《RocketMQ 技术内幕》

准备好了吗?开始你的消息队列学习之旅!