消息队列在高并发场景下的选型实践与性能调优经验
项目初期的技术选型
去年下半年接了个后台运营系统重构,核心需求是「实时同步用户行为日志到 BI 看板」。一开始我真没想用消息队列——毕竟只是个内部系统,日志量预估也就每秒 30~50 条,直接写 MySQL 再配个定时任务拉取不就完事了?
结果上线第三天,运营同事反馈看板数据延迟 12 分钟,刷新页面还得手动点「强制同步」。查了一圈发现:前端埋点发来的 POST 请求在高峰期被 Nginx 限流 + PHP-FPM 队列塞满,有些请求甚至超时丢了。我们临时加了重试和兜底轮询,但越补越乱,最后干脆推翻重来。
选 RabbitMQ 是因为团队里没人玩过 Kafka,而我之前在另一个项目里用过它跑订单通知,熟门熟路。部署用 Docker Compose,三行命令搞定,连管理界面都自带(rabbitmq-management 插件),这点很香。
最大的坑:性能问题
接入后第一版代码简单粗暴:前端调 fetch('/api/log', { method: 'POST', body: JSON.stringify(data) }) → 后端 PHP 接收 → 拼一条 AMQP 消息 → $channel->basic_publish() → 完事。
结果压测一跑,TPS 直接掉到 8,CPU 占用飙到 95%。排查半天发现不是 RabbitMQ 的问题,是 PHP 进程每次 publish 都新建连接、声明 exchange、声明 queue……光握手就占了 60ms+。更骚的是,我们用了 Laravel 的 php-amqplib 封装,它默认把连接、通道全做成单例,但没考虑长连接复用场景,每次请求都 new 一个 Connection 实例,根本没走连接池。
折腾了半天,最后砍掉所有封装,手写连接管理:
<?php
// vendor/rabbitmq-conn.php —— 全局只初始化一次
class RabbitMQConnection {
private static $connection = null;
private static $channel = null;
public static function getChannel() {
if (self::$channel === null) {
$connection = new AMQPConnection([
'host' => 'rabbitmq',
'port' => 5672,
'login' => 'guest',
'password' => 'guest',
'vhost' => '/'
]);
self::$connection = $connection;
self::$channel = $connection->channel();
// 声明 exchange 和 queue 只做一次
self::$channel->exchange_declare('logs_exchange', 'topic', false, true, false);
self::$channel->queue_declare('bi_logs_queue', false, true, false, false);
self::$channel->queue_bind('bi_logs_queue', 'logs_exchange', 'log.*');
}
return self::$channel;
}
}
然后在接口里复用 channel:
<?php
// api/log.php
require_once 'vendor/rabbitmq-conn.php';
$data = json_decode(file_get_contents('php://input'), true);
$msg = new AMQPMessage(
json_encode($data),
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]
);
$channel = RabbitMQConnection::getChannel();
$channel->basic_publish($msg, 'logs_exchange', 'log.bi');
echo json_encode(['ok' => true]);
这一改,TPS 回到 180+,延迟稳定在 200ms 内。这里注意我踩过好几次坑:第一次忘了设 delivery_mode,RabbitMQ 重启后消息全丢;第二次 exchange 类型写成 direct,结果 topic routing key 匹配失败,日志全进黑洞;第三次没开 durable,Docker 重建容器后 queue 消失,花了半小时才想起来要手动声明。
消费端的“假死”问题
消费端用 PHP CLI 脚本常驻运行,逻辑是:while(true) { $channel->basic_consume(...) }。本地跑得好好的,上生产第二天就卡住不动了——没有报错,进程还在,就是不处理新消息。
查日志发现 consumer 没有 ack,但也没 reject。最后发现是 PHP 脚本里某次解析 JSON 失败触发了 json_last_error(),没 catch,脚本直接 exit,而 RabbitMQ 还以为它在处理,一直 hold 着 unack 消息。解决办法很简单:加 try/catch + 强制 nack + 日志打点:
<?php
$callback = function ($msg) {
try {
$data = json_decode($msg->body, true);
if (json_last_error() !== JSON_ERROR_NONE) {
throw new Exception('Invalid JSON');
}
// 写入 BI 数据库逻辑...
$msg->ack(); // 成功才 ack
} catch (Exception $e) {
error_log('[RabbitMQ] Consume failed: ' . $e->getMessage() . ' | Body: ' . $msg->body);
$msg->nack(['requeue' => false]); // 不重入队,丢进 dead-letter
}
};
顺手加了个 dead-letter exchange,把失败消息转存到 dlq.bi_logs,人工查完再手动 re-publish。目前每月平均 3~5 条失败,基本都是前端传了非法字段(比如 timestamp 传了字符串而不是数字),不影响主流程。
最终的解决方案
现在整套链路是这样的:
- 前端发日志 → PHP 接口(复用 AMQP channel)→ RabbitMQ(持久化 + topic exchange)
- RabbitMQ → PHP CLI consumer(自动重连 + nack + DLQ)→ MySQL + Redis 缓存看板聚合数据
- BI 看板直接读缓存,5 秒自动刷新
效果上,延迟控制在 300ms 内(P95),服务可用性 99.99%,没再出现过数据丢失或积压。唯一遗留问题是 consumer 进程偶尔被 OOM kill(PHP 内存没及时释放),我们加了 systemd 的 restart=always + 内存限制,暂时够用。理论上应该换 Go 或 Node.js 重写 consumer,但当前人力排期不允许,先这样扛着。
回顾与反思
回看整个过程,最值的其实是两点:一是别迷信封装库,特别是 PHP 这种无连接池语言,该手写就手写;二是 rabbitmqctl 命令真的得常备,rabbitmqctl list_queues、list_consumers、list_exchanges 查一次比翻文档快十倍。
另外提醒一句:别在 consumer 里做耗时操作(比如远程 HTTP 请求)。我们最初在消费端调了 fetch('https://jztheme.com/api/v1/validate') 做用户身份二次校验,结果 API 响应慢导致整个 consumer 阻塞,消息越积越多。后来改成异步回调 + 本地缓存校验结果,才稳下来。
这个方案肯定不是最优解——Kafka + Flink 流式处理会更优雅,但对一个小团队来说,能快速上线、稳定跑半年、运维成本低,就是好方案。
以上是我踩坑后的总结,希望对你有帮助。有更优的实现方式欢迎评论区交流,尤其是 PHP 消费端内存管理这块,我也在找更好的解法。

暂无评论