消息队列
欢迎来到消息队列知识库!
📨 消息队列简介
消息队列(Message Queue)是一种应用间通信方式,通过发送和接收消息来实现应用解耦、异步处理和削峰填谷。
🎯 为什么需要消息队列
1. 异步处理
不使用消息队列:
用户注册 -> 写入数据库 (50ms)
-> 发送邮件 (100ms)
-> 发送短信 (100ms)
总耗时: 250ms使用消息队列:
用户注册 -> 写入数据库 (50ms)
-> 发送消息到队列 (5ms)
总耗时: 55ms
异步处理:
消息队列 -> 发送邮件 (100ms)
-> 发送短信 (100ms)2. 系统解耦
不使用消息队列:
订单服务 -> 直接调用库存服务
-> 直接调用支付服务
-> 直接调用物流服务使用消息队列:
订单服务 -> 发送消息到队列
消息队列 -> 库存服务(订阅)
-> 支付服务(订阅)
-> 物流服务(订阅)3. 削峰填谷
秒杀场景:
瞬时请求 10000 QPS -> 消息队列 -> 匀速处理 1000 QPS4. 日志处理
应用日志 -> 消息队列 -> 日志处理系统📊 常用消息队列对比
| 特性 | RabbitMQ | Kafka | RocketMQ |
|---|---|---|---|
| 开发语言 | Erlang | Scala/Java | Java |
| 吞吐量 | 万级 | 十万级 | 十万级 |
| 时效性 | 微秒级 | 毫秒级 | 毫秒级 |
| 可用性 | 高(主从) | 非常高(分布式) | 非常高(分布式) |
| 功能特性 | 丰富 | 简单 | 丰富 |
| 社区活跃度 | 高 | 非常高 | 高 |
| 适用场景 | 企业级应用 | 大数据、日志 | 电商、金融 |
🐰 RabbitMQ
核心概念
生产者 -> Exchange (交换机) -> Queue (队列) -> 消费者
|
Binding (绑定)交换机类型
1. Direct Exchange (直连)
Producer -> Exchange (direct)
|
├─ routing_key: "error" -> Queue1 (error logs)
└─ routing_key: "info" -> Queue2 (info logs)2. Fanout Exchange (扇出)
Producer -> Exchange (fanout)
|
├─> Queue1 (广播)
├─> Queue2 (广播)
└─> Queue3 (广播)3. Topic Exchange (主题)
Producer -> Exchange (topic)
|
├─ pattern: "*.error" -> Queue1
├─ pattern: "user.*" -> Queue2
└─ pattern: "order.#" -> Queue34. Headers Exchange
根据消息头属性路由。
快速开始 (Node.js)
javascript
const amqp = require('amqplib');
// 生产者
async function sendMessage() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const queue = 'hello';
await channel.assertQueue(queue, { durable: true });
channel.sendToQueue(queue, Buffer.from('Hello RabbitMQ!'), {
persistent: true
});
console.log('Message sent');
await channel.close();
await connection.close();
}
// 消费者
async function receiveMessage() {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const queue = 'hello';
await channel.assertQueue(queue, { durable: true });
channel.consume(queue, (msg) => {
console.log('Received:', msg.content.toString());
channel.ack(msg); // 手动确认
});
}消息确认机制
1. 生产者确认
javascript
channel.confirmSelect();
channel.sendToQueue(queue, message, {}, (err, ok) => {
if (err) {
console.log('消息发送失败');
} else {
console.log('消息发送成功');
}
});2. 消费者确认
javascript
// 自动确认(不推荐)
channel.consume(queue, (msg) => {
console.log(msg.content.toString());
}, { noAck: true });
// 手动确认(推荐)
channel.consume(queue, (msg) => {
try {
// 处理消息
channel.ack(msg); // 确认
} catch (err) {
channel.nack(msg, false, true); // 拒绝并重新入队
}
});死信队列 (DLX)
javascript
// 创建死信交换机
await channel.assertExchange('dlx_exchange', 'direct');
await channel.assertQueue('dlx_queue');
await channel.bindQueue('dlx_queue', 'dlx_exchange', '');
// 创建普通队列,设置死信交换机
await channel.assertQueue('normal_queue', {
deadLetterExchange: 'dlx_exchange',
messageTtl: 10000 // 10秒过期
});🔥 Kafka
核心概念
生产者 -> Topic -> Partition 0 (Leader + Replicas)
-> Partition 1 (Leader + Replicas)
-> Partition 2 (Leader + Replicas)
消费者组 -> Consumer 0
-> Consumer 1
-> Consumer 2关键术语
- Topic - 消息主题
- Partition - 分区,实现并行
- Producer - 生产者
- Consumer - 消费者
- Consumer Group - 消费者组
- Offset - 偏移量,消息位置
快速开始 (Node.js)
javascript
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
});
// 生产者
async function produce() {
const producer = kafka.producer();
await producer.connect();
await producer.send({
topic: 'test-topic',
messages: [
{ key: 'key1', value: 'Hello Kafka!' }
]
});
await producer.disconnect();
}
// 消费者
async function consume() {
const consumer = kafka.consumer({ groupId: 'test-group' });
await consumer.connect();
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
key: message.key.toString(),
value: message.value.toString(),
offset: message.offset
});
}
});
}分区策略
1. 轮询(默认)
javascript
await producer.send({
topic: 'test-topic',
messages: [
{ value: 'message1' }, // -> partition 0
{ value: 'message2' }, // -> partition 1
{ value: 'message3' } // -> partition 2
]
});2. 按 Key 分区
javascript
await producer.send({
topic: 'test-topic',
messages: [
{ key: 'user1', value: 'message1' }, // 同一 key 进同一分区
{ key: 'user1', value: 'message2' },
{ key: 'user2', value: 'message3' }
]
});消费者组
Topic (3 partitions)
│
├─ Partition 0 -> Consumer 0 (Group A)
├─ Partition 1 -> Consumer 1 (Group A)
└─ Partition 2 -> Consumer 2 (Group A)
不同组可以独立消费:
├─ Partition 0,1,2 -> Consumer (Group B)🚀 RocketMQ
核心概念
Producer -> NameServer (路由信息)
-> Broker (Cluster)
└─ Topic -> Queue 0
-> Queue 1
-> Queue 2
Consumer Group消息类型
1. 普通消息
java
// 生产者
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
SendResult result = producer.send(msg);
producer.shutdown();2. 顺序消息
java
// 保证同一订单的消息顺序
producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long orderId = (Long) arg;
long index = orderId % mqs.size();
return mqs.get((int) index);
}
}, orderId);3. 延迟消息
java
Message msg = new Message("TopicTest", "TagA", "Delayed message".getBytes());
// 设置延迟级别: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
msg.setDelayTimeLevel(3); // 10秒后消费
producer.send(msg);4. 事务消息
java
TransactionMQProducer producer = new TransactionMQProducer("transaction_group");
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 事务状态回查
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
producer.sendMessageInTransaction(msg, null);🎯 应用场景
1. 异步解耦
用户注册
├─> 写入数据库
└─> 发送消息 -> 消息队列
├─> 发送欢迎邮件
├─> 发送短信验证码
└─> 添加积分2. 削峰填谷
秒杀场景:
用户请求 -> 消息队列 (缓冲) -> 订单处理系统 (匀速消费)3. 日志收集
应用服务器 -> 消息队列 -> 日志处理 -> Elasticsearch4. 数据同步
订单系统 -> 消息队列 -> 库存系统
-> 物流系统
-> 数据仓库💡 最佳实践
1. 消息幂等性
javascript
// 使用唯一 ID 防止重复消费
const processedIds = new Set();
consumer.run({
eachMessage: async ({ message }) => {
const msgId = message.key.toString();
if (processedIds.has(msgId)) {
console.log('消息已处理,跳过');
return;
}
// 处理消息
processOrder(message);
processedIds.add(msgId);
}
});2. 消息顺序性
- RabbitMQ: 单队列单消费者
- Kafka: 同一分区保证顺序
- RocketMQ: 使用顺序消息
3. 消息可靠性
生产者:
- 开启确认机制
- 消息持久化
- 重试机制
消费者:
- 手动确认
- 异常处理
- 重试队列
4. 性能优化
- 批量发送 - 减少网络开销
- 批量消费 - 提高吞吐量
- 异步处理 - 提高并发
- 合理分区 - Kafka 分区数 = 消费者数
📖 学习资源
官方文档
推荐书籍
- 《RabbitMQ 实战》
- 《Kafka 权威指南》
- 《RocketMQ 技术内幕》
准备好了吗?开始你的消息队列学习之旅!