消息队列在高并发场景下的选型实践与性能调优经验

IT人会静 交互 阅读 801
赞 15 收藏
二维码
手机扫码查看
反馈

项目初期的技术选型

去年下半年接了个后台运营系统重构,核心需求是「实时同步用户行为日志到 BI 看板」。一开始我真没想用消息队列——毕竟只是个内部系统,日志量预估也就每秒 30~50 条,直接写 MySQL 再配个定时任务拉取不就完事了?

消息队列在高并发场景下的选型实践与性能调优经验

结果上线第三天,运营同事反馈看板数据延迟 12 分钟,刷新页面还得手动点「强制同步」。查了一圈发现:前端埋点发来的 POST 请求在高峰期被 Nginx 限流 + PHP-FPM 队列塞满,有些请求甚至超时丢了。我们临时加了重试和兜底轮询,但越补越乱,最后干脆推翻重来。

选 RabbitMQ 是因为团队里没人玩过 Kafka,而我之前在另一个项目里用过它跑订单通知,熟门熟路。部署用 Docker Compose,三行命令搞定,连管理界面都自带(rabbitmq-management 插件),这点很香。

最大的坑:性能问题

接入后第一版代码简单粗暴:前端调 fetch('/api/log', { method: 'POST', body: JSON.stringify(data) }) → 后端 PHP 接收 → 拼一条 AMQP 消息 → $channel->basic_publish() → 完事。

结果压测一跑,TPS 直接掉到 8,CPU 占用飙到 95%。排查半天发现不是 RabbitMQ 的问题,是 PHP 进程每次 publish 都新建连接、声明 exchange、声明 queue……光握手就占了 60ms+。更骚的是,我们用了 Laravel 的 php-amqplib 封装,它默认把连接、通道全做成单例,但没考虑长连接复用场景,每次请求都 new 一个 Connection 实例,根本没走连接池。

折腾了半天,最后砍掉所有封装,手写连接管理:

<?php
// vendor/rabbitmq-conn.php —— 全局只初始化一次
class RabbitMQConnection {
    private static $connection = null;
    private static $channel = null;

    public static function getChannel() {
        if (self::$channel === null) {
            $connection = new AMQPConnection([
                'host' => 'rabbitmq',
                'port' => 5672,
                'login' => 'guest',
                'password' => 'guest',
                'vhost' => '/'
            ]);
            self::$connection = $connection;
            self::$channel = $connection->channel();
            // 声明 exchange 和 queue 只做一次
            self::$channel->exchange_declare('logs_exchange', 'topic', false, true, false);
            self::$channel->queue_declare('bi_logs_queue', false, true, false, false);
            self::$channel->queue_bind('bi_logs_queue', 'logs_exchange', 'log.*');
        }
        return self::$channel;
    }
}

然后在接口里复用 channel:

<?php
// api/log.php
require_once 'vendor/rabbitmq-conn.php';

$data = json_decode(file_get_contents('php://input'), true);
$msg = new AMQPMessage(
    json_encode($data),
    [
        'content_type' => 'application/json',
        'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
    ]
);

$channel = RabbitMQConnection::getChannel();
$channel->basic_publish($msg, 'logs_exchange', 'log.bi');
echo json_encode(['ok' => true]);

这一改,TPS 回到 180+,延迟稳定在 200ms 内。这里注意我踩过好几次坑:第一次忘了设 delivery_mode,RabbitMQ 重启后消息全丢;第二次 exchange 类型写成 direct,结果 topic routing key 匹配失败,日志全进黑洞;第三次没开 durable,Docker 重建容器后 queue 消失,花了半小时才想起来要手动声明。

消费端的“假死”问题

消费端用 PHP CLI 脚本常驻运行,逻辑是:while(true) { $channel->basic_consume(...) }。本地跑得好好的,上生产第二天就卡住不动了——没有报错,进程还在,就是不处理新消息。

查日志发现 consumer 没有 ack,但也没 reject。最后发现是 PHP 脚本里某次解析 JSON 失败触发了 json_last_error(),没 catch,脚本直接 exit,而 RabbitMQ 还以为它在处理,一直 hold 着 unack 消息。解决办法很简单:加 try/catch + 强制 nack + 日志打点:

<?php
$callback = function ($msg) {
    try {
        $data = json_decode($msg->body, true);
        if (json_last_error() !== JSON_ERROR_NONE) {
            throw new Exception('Invalid JSON');
        }
        // 写入 BI 数据库逻辑...
        $msg->ack(); // 成功才 ack
    } catch (Exception $e) {
        error_log('[RabbitMQ] Consume failed: ' . $e->getMessage() . ' | Body: ' . $msg->body);
        $msg->nack(['requeue' => false]); // 不重入队,丢进 dead-letter
    }
};

顺手加了个 dead-letter exchange,把失败消息转存到 dlq.bi_logs,人工查完再手动 re-publish。目前每月平均 3~5 条失败,基本都是前端传了非法字段(比如 timestamp 传了字符串而不是数字),不影响主流程。

最终的解决方案

现在整套链路是这样的:

  • 前端发日志 → PHP 接口(复用 AMQP channel)→ RabbitMQ(持久化 + topic exchange)
  • RabbitMQ → PHP CLI consumer(自动重连 + nack + DLQ)→ MySQL + Redis 缓存看板聚合数据
  • BI 看板直接读缓存,5 秒自动刷新

效果上,延迟控制在 300ms 内(P95),服务可用性 99.99%,没再出现过数据丢失或积压。唯一遗留问题是 consumer 进程偶尔被 OOM kill(PHP 内存没及时释放),我们加了 systemd 的 restart=always + 内存限制,暂时够用。理论上应该换 Go 或 Node.js 重写 consumer,但当前人力排期不允许,先这样扛着。

回顾与反思

回看整个过程,最值的其实是两点:一是别迷信封装库,特别是 PHP 这种无连接池语言,该手写就手写;二是 rabbitmqctl 命令真的得常备,rabbitmqctl list_queueslist_consumerslist_exchanges 查一次比翻文档快十倍。

另外提醒一句:别在 consumer 里做耗时操作(比如远程 HTTP 请求)。我们最初在消费端调了 fetch('https://jztheme.com/api/v1/validate') 做用户身份二次校验,结果 API 响应慢导致整个 consumer 阻塞,消息越积越多。后来改成异步回调 + 本地缓存校验结果,才稳下来。

这个方案肯定不是最优解——Kafka + Flink 流式处理会更优雅,但对一个小团队来说,能快速上线、稳定跑半年、运维成本低,就是好方案。

以上是我踩坑后的总结,希望对你有帮助。有更优的实现方式欢迎评论区交流,尤其是 PHP 消费端内存管理这块,我也在找更好的解法。

本文章不代表JZTHEME立场,仅为作者个人观点 / 研究心得 / 经验分享,旨在交流探讨,供读者参考。
发表评论

暂无评论