V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
KevinRed
V2EX  ›  程序员

PHP 使用 Redis 实现消息队列

  •  
  •   KevinRed · 2020-03-26 09:24:07 +08:00 · 2799 次点击
    这是一个创建于 1689 天前的主题,其中的信息可能已经有所发展或是发生改变。

    原文排版更整洁哦=>PHP 使用 Redis 实现消息队列

    原文排版更整洁哦=>PHP 使用 Redis 实现消息队列

    原文排版更整洁哦=>PHP 使用 Redis 实现消息队列

    在服务器性能不佳的情况下,高并发访问使用消息队列存储请求,并按一定速率处理是比较好的解决方案

    关于 php 多线程和 redis 使用前面文章有介绍,传送门:

    PHP 实现多线程

    Redis 安装使用 & PHP Redis 插件安装

    PHP 使用 Redis 实现分布式锁

    设计方案如下:

    1.设置标识 flag ( nil 未执行,1 执行中),表示消费线程是否正在运行( flag 的访问需要加锁);并给 flag 设置一个失效时间,避免消费线程意外中断,flag 一直为 1

    2.接口接收到请求时,将请求参数存到 redis 的 list 中。请求存到 redis 中后判断 flag,决定是否启动消费线程

    3.消费线程从 redis 中取出参数处理,若 list 不为空,重置 flag 失效时间,并置为 1 ;若 list 为空,删除 flag (这里可以加一个循环判断,阻塞一段时间)

    本文以一个批量发送邮件的例子说明

    有一个发送邮件的接口,在并发访问时,邮件服务器扛不住请求,导致邮件发送失败

    接下来上代码

    /**
     * 发送邮件接口
     *
     */
        public function addMail(){
    
            $param = I();
    
            $redis = new \Redis();
            $redis->connect('127.0.0.1', 6379);
           
    
            //尝试获取锁
            $lockKey = 'lock';
            $lockExpire = 60;
    
            $status = true;
            while ($status) {
            
                $lockValue = time() + $lockExpire;
                $lock = $redis->setnx($lockKey, $lockValue);
    
                if (!empty($lock) || ($redis->get($lockKey) < time() && $redis->getSet($lockKey, $lockValue) < time())) {
                    $redis->expire($lockKey, $lockExpire);
    
                    //已获取到锁
                    
                    $flag = $redis->get('flag');
                    
                    //请求存入
                    $redis->lPush("mail", $param);
    
                    
                    //判断是否要启动消费线程
                    if (empty($flag)) {
    
                        //设置 flag
                        $redis->set('flag', 1);
                        
                        //设置 flag 失效时间防止线程死掉
                        $redis->expire($key, 600);
                        
                        //请求消费线程
                        $this->sendHttpRequest("http://127.0.0.1/Home/Index/redis", "POST");
                    }
    
                    if ($redis->ttl($lockKey))
                        $redis->del($lockKey);
                    $status = FALSE;
                } else {
                    sleep(2);
                }
            }
        }
    
    
    
        /**
         * 消费线程
         *
         */
        public function redis(){
    
            set_time_limit(0);
            ignore_user_abort(true);//设置与客户机断开是否会终止执行
    
           
            $redis = new \Redis();
            $redis->connect('127.0.0.1', 6379);
            
            $time = 1;//记录阻塞时间
            
            //开始消费
            while (1) {
            
                //取出请求
                $param = $redis->rPop("mail");
                
                if (empty($param)) {
                
                    //阻塞一段时间
                    if ($time <= 10) {
                        $time++;
                        sleep(10);
                    } else {
    
                        //结束消费,尝试获取锁
                        $lockKey = 'lock';
                        $lockExpire = 60;
    
                        $status = true;
                        while ($status) {
                            $lockValue = time() + $lockExpire;
                            $lock = $redis->setnx($lockKey, $lockValue);
    
                            if (!empty($lock) || ($redis->get($lockKey) < time() && $redis->getSet($lockKey, $lockValue) < time())) {
                                $redis->expire($lockKey, $lockExpire);
     
                                
                                //删除 flag
                                $redis->del('flag');
    
                                if ($redis->ttl($lockKey))
                                    $redis->del($lockKey);
                                $status = FALSE;
                            } else {
                                sleep(2);
                            }
                        }
    
    
                        break;
                    }
                } else {
    
    
                    //发送邮件
                    $result = $this->sendEmail();
                    
                    $time = 1;//重置阻塞时间
                    
                    //设置 flag 状态,尝试获取锁
                    $lockKey = 'lock';
                    $lockExpire = 60;
    
                    $status = true;
                    while ($status) {
                        $lockValue = time() + $lockExpire;
                        $lock = $redis->setnx($lockKey, $lockValue);
    
                        if (!empty($lock) || ($redis->get($lockKey) < time() && $redis->getSet($lockKey, $lockValue) < time())) {
                            $redis->expire($lockKey, $lockExpire);
    
                            //设置运行状态
                            $redis->set($key, 1);
                            
                            //设置失效时间防止线程死掉
                            $redis->expire($key, 600);
    
                            if ($redis->ttl($lockKey))
                                $redis->del($lockKey);
                            $status = FALSE;
                        } else {
                            sleep(2);
                        }
                    }
                }
            }
    
        }
    
    
         /**
         * 发送邮件
         *
         */    
        function sendEmail(){
            set_time_limit(540);//设置超时时间
    
            if (Send()) {
                return 1;
            } else {
                return 0;
            }
        }
    
    
    
    4 条回复    2020-03-26 13:27:26 +08:00
    win7pro
        1
    win7pro  
       2020-03-26 10:22:42 +08:00
    swoole
    KevinRed
        2
    KevinRed  
    OP
       2020-03-26 10:33:53 +08:00
    @win7pro 这确实是好东西,移植过去太费劲就手动撸了一个
    z5864703
        3
    z5864703  
       2020-03-26 10:54:42 +08:00
    有几个改进的地方
    1.redis 操作可以使用 lua 脚本来做几个命令组合运行,同时还可以保证原子性,比如 ttl+del 那块操作
    2.消费者可以用 cli 单独进程来跑,循环读 list 就好了,而不用通过一次 post 请求来触发。
    3.防止进程死掉可以用信号来实现
    KevinRed
        4
    KevinRed  
    OP
       2020-03-26 13:27:26 +08:00 via Android
    @z5864703 谢谢大佬指教
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2842 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 02:10 · PVG 10:10 · LAX 18:10 · JFK 21:10
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.