think-queue for ThinkPHP6
安装
composer require topthink/think-queue
配置
配置文件位于
config/queue.php
公共配置
[
'default'=>'sync' //驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动//或其他自定义的完整的类名
]
创建任务类
推荐使用
app\job
作为任务类的命名空间 也可以放在任意可以自动加载到的地方
任务类不需继承任何类,如果这个类只有一个任务,那么就只需要提供一个fire
方法就可以了,如果有多个小任务,就写多个方法,下面发布任务的时候会有区别
每个方法会传入两个参数 think\queue\Job $job
(当前的任务对象) 和 $data
(发布任务时自定义的数据)
还有个可选的任务失败执行的方法 failed
传入的参数为$data
(发布任务时自定义的数据)
下面写两个例子
namespace app\job;
use think\queue\Job;
class Job1{
public function fire(Job $job, $data){
//....这里执行具体的任务
if ($job->attempts() > 3) {
//通过这个方法可以检查这个任务已经重试了几次了
}
//如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法
$job->delete();
// 也可以重新发布这个任务
$job->release($delay); //$delay为延迟时间
}
public function failed($data){
// ...任务达到最大重试次数后,失败了
}
}
namespace app\lib\job;
use think\queue\Job;
class Job2{
public function task1(Job $job, $data){
}
public function task2(Job $job, $data){
}
public function failed($data){
}
}
发布任务
think\facade\Queue::push($job, $data = '', $queue = null)
和
think\facade\Queue::later($delay, $job, $data = '', $queue = null)
两个方法,前者是立即执行,后者是在
$delay
秒后执行
$job
是任务名
命名空间是app\job
的,比如上面的例子一,写Job1
类名即可
其他的需要些完整的类名,比如上面的例子二,需要写完整的类名app\lib\job\Job2
如果一个任务类里有多个小任务的话,如上面的例子二,需要用@+方法名app\lib\job\Job2@task1
、app\lib\job\Job2@task2
$data
是你要传到任务里的参数
$queue
队列名,指定这个任务是在哪个队列上执行,同下面监控队列的时候指定的队列名,可不填
监听任务并执行
&> php think queue:listen
&> php think queue:work
两种,具体的可选参数可以输入命令加 --help
查看
可配合supervisor使用,保证进程常驻
命令⾏参数说明
Work 模式
php think queue:work --daemon //是否循环执⾏,如果不加该参数,则该命令处理完下⼀个消息就退出 --queue helloJobQueue //要处理的队列的名称 --delay 0 \ //如果本次任务执⾏抛出异常且任务未被删除时,设置其下次执⾏前延迟多少秒,默认为0 --force \ //系统处于维护状态时是否仍然处理任务,并未找到相关说明 --memory 128 \ //该进程允许使⽤的内存上限,以 M 为单位 --sleep 3 \ //如果队列中⽆任务,则sleep多少秒后重新检查(work+daemon模式)或者退出(listen或⾮daemon模式) --tries 2 //如果任务已经超过尝试次数上限,则触发‘任务尝试次数超限’事件,默认为0
Listen 模式
php think queue:listen --queue helloJobQueue \ //监听的队列的名称 --delay 0 \ //如果本次任务执⾏抛出异常且任务未被删除时,设置其下次执⾏前延迟多少秒,默认为0 --memory 128 \ //该进程允许使⽤的内存上限,以 M 为单位 --sleep 3 \ //如果队列中⽆任务,则多长时间后重新检查,daemon模式下有效 --tries 0 \ //如果任务已经超过重发次数上限,则进⼊失败处理逻辑,默认为0 --timeout 60 //创建的work⼦进程的允许执⾏的最长时间,以秒为单位
work 模式和 listen 模式的区别
两者都可以⽤于处理消息队列中的任务
区别在于:
2.3.1 执⾏原理不同
work 命令是单进程的处理模式。
按照是否设置了
参数,work命令⼜可分为单次执⾏和循环执⾏两种模式。
--daemon
单次执⾏:不添加
参数,该模式下,work进程在处理完下⼀个消息后直接结束当前进程。当不存在新消
--daemon
息时,会sleep⼀段时间然后退出。
循环执⾏:添加了
参数,该模式下,work进程会循环地处理队列中的消息,直到内存超出参数配置才结
--daemon
束进程。当不存在新消息时,会在每次循环中sleep⼀段时间。
listen 命令是⽗进程 + ⼦进程的处理模式。
listen命令所在的⽗进程会创建⼀个单次执⾏模式的work⼦进程,并通过该work⼦进程来处理队列中的下⼀个消息,当这
个work⼦进程退出之后,listen命令所在的⽗进程会监听到该⼦进程的退出信号,并重新创建⼀个新的单次执⾏的work⼦
进程
2.3.2 退出时机不同
work 命令的退出时机在上⾯的执⾏原理部分已叙述,此处不再重复
listen 命令中,listen所在的⽗进程正常情况会⼀直运⾏,除⾮遇到下⾯两种情况:
创建的某个work⼦进程的执⾏时间超过了 listen命令⾏中的
参数配置,此时work⼦进程会被强制结
--timeout
束,listen所在的⽗进程也会抛出⼀个
异常并退出。开发者可以选择捕获该异常,让⽗
ProcessTimeoutException
进程继续执⾏,也可以选择通过 supervisor 等监控软件重启⼀个新的listen命令。
listen 命令所在的⽗进程因某种原因存在内存泄露,则当⽗进程本⾝占⽤的内存超过了命令⾏中的
参
--memory
数配置时,⽗⼦进程均会退出。正常情况下,listen进程本⾝占⽤的内存是稳定不变的。
2.3.3 性能不同
work 命令是在脚本内部做循环,框架脚本在命令执⾏的初期就已加载完毕;
⽽listen模式则是处理完⼀个任务之后新开⼀个work进程,此时会重新加载框架脚本。
因此:work 模式的性能会⽐listen模式⾼。
注意:当代码有更新时,work 模式下需要⼿动去执⾏
命令重启队列来使改动⽣效;⽽listen 模式
php think queue:restart
会⾃动⽣效,⽆需其他操作。
2.3.4 超时控制能⼒
work 模式本质上既不能控制进程⾃⾝的运⾏时间,也⽆法限制执⾏中的任务的执⾏时间。
举例来说,假如你在某次上线之后,在上⽂中的
消费者的
⽅法中添加了⼀段死循环:
\application\index\job\Hello.php fire publicfunctionfire(){ while(true){ //死循环 $consoleOutPut->writeln("<info>I am looping forever inside a job.</info> \n"); sleep(1); } }
那么这个循环将永远不能停⽌,直到任务所在的进程超过内存限制或者由管理员⼿动结束。这个过程不会有任何的告警。
更严重的是,如果你配置了expire ,那么这个死循环的任务可能会污染到同样处理
队列的其他work进程,
helloJobQueue
最后好⼏个work进程将被卡死在这段死循环中。详情后⽂会说明。
work 模式下的超时控制能⼒,实际上应该理解为多个work 进程配合下的过期任务重发能⼒。
⽽ listen命令可以限制其创建的work⼦进程的超时时间。
listen 命令可通过
参数限制work⼦进程允许运⾏的最长时间,超过该时间限制仍未结束的⼦进程会被强制结
--timeout
束;
这⾥有必要补充⼀下 expire 和 timeout 之间的区别:
expire 在配置⽂件中设置,timeout 在 listen命令的命令⾏参数中设置,⽽且,expire 和 timeout 是两个不同层次
上的概念:
expire 是指任务的过期时间。这个时间是全局的,影响到所有的work进程。(不管是独⽴的work命令还是 listen
模式下创建的的work⼦进程) 。expire 针对的对象是任务。
timeout 是指work⼦进程的超时时间。这个时间只对当前执⾏的listen 命令有效。timeout 针对的对象是work⼦
进程。
2.3.5 使⽤场景不同
根据上⾯的介绍,可以看到,
work 命令的适⽤场景是:
任务数量较多
性能要求较⾼
任务的执⾏时间较短
消费者类中不存在死循环,sleep() ,exit() ,die() 等容易导致bug的逻辑
listen命令的适⽤场景是:
任务数量较少
任务的执⾏时间较长(如⽣成⼤型的excel报表等),
任务的执⾏时间需要有严格限制
--------------------------------------------------------
### 数据库队列的demo
在 config/queue.php 中,将队列驱动配置为数据库:
```
return [
'default' => 'database', // 使用数据库驱动
'connections' => [
'sync' => [
'type' => 'sync', // 同步执行
],
'database' => [
'type' => 'database', // 数据库驱动
'queue' => 'default', // 队列名称
'table' => 'jobs', // 存储任务的表名
'connection' => null, // 使用默认数据库连接
],
],
'failed' => [
'type' => 'none', // 失败任务处理方式
'table' => 'failed_jobs', // 存储失败任务的表名
],
];
```
### 创建队列任务表
运行以下命令生成队列任务表:
```
php think queue:table
php think queue:failed-table
php think migrate:run
```
这会创建两张表:
jobs:用于存储队列任务。
failed_jobs:用于存储失败的任务。
### 创建队列任务
```
<?php
namespace app\job;
use think\queue\Job;
class ProcessData
{
/**
* 任务处理逻辑
*/
public function fire(Job $job, $data)
{
// 模拟处理数据
$id = $data['id'];
$name = $data['name'];
echo "Processing data: ID = {$id}, Name = {$name}\n";
// 处理完成后删除任务
$job->delete();
}
/**
* 任务失败处理逻辑
*/
public function failed($data)
{
// 记录失败日志或通知管理员
echo "Failed to process data: ID = {$data['id']}, Name = {$data['name']}\n";
}
}
```
### 推送任务到队列
```
<?php
namespace app\controller;
use think\facade\Queue;
class Index
{
public function index()
{
// 模拟任务数据
$data = [
'id' => 1,
'name' => 'ThinkPHP Queue Demo',
];
// 推送任务到队列
Queue::push(\app\job\ProcessData::class, $data);
return 'Task pushed to database queue!';
}
}
```
### 启动队列监听
```
php think queue:work --queue default
```
可以查看 jobs 表,任务会被存储在这里。当任务被处理后,它会被自动删除。如果任务失败,会记录在 failed_jobs 表中。