原文排版更整洁哦=>PHP 使用 Redis 实现消息队列
原文排版更整洁哦=>PHP 使用 Redis 实现消息队列
原文排版更整洁哦=>PHP 使用 Redis 实现消息队列
在服务器性能不佳的情况下,高并发访问使用消息队列存储请求,并按一定速率处理是比较好的解决方案
关于 php 多线程和 redis 使用前面文章有介绍,传送门:
设计方案如下:
1.设置标识 flag ( nil 未执行,1 执行中),表示消费线程是否正在运行( flag 的访问需要加锁);并给 flag 设置一个失效时间,避免消费线程意外中断,flag 一直为 1
2.接口接收到请求时,将请求参数存到 redis 的 list 中。请求存到 redis 中后判断 flag,决定是否启动消费线程
3.消费线程从 redis 中取出参数处理,若 list 不为空,重置 flag 失效时间,并置为 1 ;若 list 为空,删除 flag (这里可以加一个循环判断,阻塞一段时间)
本文以一个批量发送邮件的例子说明
有一个发送邮件的接口,在并发访问时,邮件服务器扛不住请求,导致邮件发送失败
接下来上代码
/**
* 发送邮件接口
*
*/
public function addMail(){
$param = I();
$redis = new \Redis();
$redis->connect('127.0.0.1', 6379);
//尝试获取锁
$lockKey = 'lock';
$lockExpire = 60;
$status = true;
while ($status) {
$lockValue = time() + $lockExpire;
$lock = $redis->setnx($lockKey, $lockValue);
if (!empty($lock) || ($redis->get($lockKey) < time() && $redis->getSet($lockKey, $lockValue) < time())) {
$redis->expire($lockKey, $lockExpire);
//已获取到锁
$flag = $redis->get('flag');
//请求存入
$redis->lPush("mail", $param);
//判断是否要启动消费线程
if (empty($flag)) {
//设置 flag
$redis->set('flag', 1);
//设置 flag 失效时间防止线程死掉
$redis->expire($key, 600);
//请求消费线程
$this->sendHttpRequest("http://127.0.0.1/Home/Index/redis", "POST");
}
if ($redis->ttl($lockKey))
$redis->del($lockKey);
$status = FALSE;
} else {
sleep(2);
}
}
}
/**
* 消费线程
*
*/
public function redis(){
set_time_limit(0);
ignore_user_abort(true);//设置与客户机断开是否会终止执行
$redis = new \Redis();
$redis->connect('127.0.0.1', 6379);
$time = 1;//记录阻塞时间
//开始消费
while (1) {
//取出请求
$param = $redis->rPop("mail");
if (empty($param)) {
//阻塞一段时间
if ($time <= 10) {
$time++;
sleep(10);
} else {
//结束消费,尝试获取锁
$lockKey = 'lock';
$lockExpire = 60;
$status = true;
while ($status) {
$lockValue = time() + $lockExpire;
$lock = $redis->setnx($lockKey, $lockValue);
if (!empty($lock) || ($redis->get($lockKey) < time() && $redis->getSet($lockKey, $lockValue) < time())) {
$redis->expire($lockKey, $lockExpire);
//删除 flag
$redis->del('flag');
if ($redis->ttl($lockKey))
$redis->del($lockKey);
$status = FALSE;
} else {
sleep(2);
}
}
break;
}
} else {
//发送邮件
$result = $this->sendEmail();
$time = 1;//重置阻塞时间
//设置 flag 状态,尝试获取锁
$lockKey = 'lock';
$lockExpire = 60;
$status = true;
while ($status) {
$lockValue = time() + $lockExpire;
$lock = $redis->setnx($lockKey, $lockValue);
if (!empty($lock) || ($redis->get($lockKey) < time() && $redis->getSet($lockKey, $lockValue) < time())) {
$redis->expire($lockKey, $lockExpire);
//设置运行状态
$redis->set($key, 1);
//设置失效时间防止线程死掉
$redis->expire($key, 600);
if ($redis->ttl($lockKey))
$redis->del($lockKey);
$status = FALSE;
} else {
sleep(2);
}
}
}
}
}
/**
* 发送邮件
*
*/
function sendEmail(){
set_time_limit(540);//设置超时时间
if (Send()) {
return 1;
} else {
return 0;
}
}
1
win7pro 2020-03-26 10:22:42 +08:00
swoole
|
3
z5864703 2020-03-26 10:54:42 +08:00
有几个改进的地方
1.redis 操作可以使用 lua 脚本来做几个命令组合运行,同时还可以保证原子性,比如 ttl+del 那块操作 2.消费者可以用 cli 单独进程来跑,循环读 list 就好了,而不用通过一次 post 请求来触发。 3.防止进程死掉可以用信号来实现 |