Skip to content
赞助商赞助商赞助商
虚位以待
虚位以待
虚位以待
虚位以待
虚位以待
虚位以待
虚位以待
虚位以待
虚位以待
虚位以待
虚位以待
虚位以待
虚位以待
虚位以待
虚位以待

队列

介绍

NOTE

Laravel 现在提供了 Horizon,一个美观的仪表板和配置系统,用于您的 Redis 驱动的队列。查看完整的 Horizon 文档 以获取更多信息。

Laravel 队列为各种不同的队列后端提供了统一的 API,例如 Beanstalk、Amazon SQS、Redis,甚至是关系数据库。队列允许您将耗时的任务(如发送电子邮件)的处理推迟到稍后进行。推迟这些耗时的任务可以显著加快应用程序的 Web 请求速度。

队列配置文件存储在 config/queue.php 中。在此文件中,您将找到框架中包含的每个队列驱动程序的连接配置,其中包括数据库、BeanstalkdAmazon SQSRedis 和一个同步驱动程序,该驱动程序将立即执行作业(用于本地使用)。还包括一个 null 队列驱动程序,该驱动程序会丢弃排队的作业。

连接与队列

在开始使用 Laravel 队列之前,了解“连接”和“队列”之间的区别很重要。在您的 config/queue.php 配置文件中,有一个 connections 配置选项。此选项定义了与后端服务(如 Amazon SQS、Beanstalk 或 Redis)的特定连接。然而,任何给定的队列连接可能有多个“队列”,可以被视为不同的作业堆栈或堆。

请注意,queue 配置文件中的每个连接配置示例都包含一个 queue 属性。这是将作业发送到给定连接时将被调度到的默认队列。换句话说,如果您在调度作业时没有明确定义应将其调度到哪个队列,则作业将被放置在连接配置的 queue 属性中定义的队列中:

php
// 这个作业被发送到默认队列...
Job::dispatch();

// 这个作业被发送到 "emails" 队列...
Job::dispatch()->onQueue('emails');

某些应用程序可能不需要将作业推送到多个队列,而是更喜欢拥有一个简单的队列。然而,将作业推送到多个队列对于希望优先处理或分段处理作业的应用程序特别有用,因为 Laravel 队列工作者允许您通过优先级指定应处理哪些队列。例如,如果您将作业推送到 high 队列,您可以运行一个工作者,给予它们更高的处理优先级:

php
php artisan queue:work --queue=high,default

驱动程序说明和先决条件

数据库

要使用 database 队列驱动程序,您需要一个数据库表来保存作业。要生成创建此表的迁移,请运行 queue:table Artisan 命令。创建迁移后,您可以使用 migrate 命令迁移数据库:

php
php artisan queue:table

php artisan migrate

Redis

要使用 redis 队列驱动程序,您应在 config/database.php 配置文件中配置 Redis 数据库连接。

Redis 集群

如果您的 Redis 队列连接使用 Redis 集群,则您的队列名称必须包含 key hash tag。这是为了确保给定队列的所有 Redis 键都放置在同一个哈希槽中:

php
'redis' => [
    'driver' => 'redis',
    'connection' => 'default',
    'queue' => '{default}',
    'retry_after' => 90,
],

阻塞

使用 Redis 队列时,您可以使用 block_for 配置选项来指定驱动程序在进入工作者循环并重新轮询 Redis 数据库之前应等待作业可用的时间。

根据您的队列负载调整此值比持续轮询 Redis 数据库以获取新作业更有效。例如,您可以将值设置为 5,表示驱动程序应在等待作业可用时阻塞五秒钟:

php
'redis' => [
    'driver' => 'redis',
    'connection' => 'default',
    'queue' => 'default',
    'retry_after' => 90,
    'block_for' => 5,
],

NOTE

block_for 设置为 0 将导致队列工作者无限期阻塞,直到作业可用。这也将阻止信号(如 SIGTERM)在处理下一个作业之前被处理。

其他驱动程序先决条件

以下是列出的队列驱动程序所需的依赖项:

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~4.0
  • Redis: predis/predis ~1.0 或 phpredis PHP 扩展

创建作业

生成作业类

默认情况下,应用程序的所有可排队作业都存储在 app/Jobs 目录中。如果 app/Jobs 目录不存在,当您运行 make:job Artisan 命令时,它将被创建。您可以使用 Artisan CLI 生成一个新的排队作业:

php
php artisan make:job ProcessPodcast

生成的类将实现 Illuminate\Contracts\Queue\ShouldQueue 接口,指示 Laravel 该作业应被推送到队列以异步运行。

类结构

作业类非常简单,通常只包含一个 handle 方法,该方法在作业被队列处理时调用。首先,让我们看一个示例作业类。在此示例中,我们假装管理一个播客发布服务,并需要在发布之前处理上传的播客文件:

php
<?php

namespace App\Jobs;

use App\AudioProcessor;
use App\Podcast;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessPodcast implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    protected $podcast;

    /**
     * 创建一个新的作业实例。
     *
     * @param  Podcast  $podcast
     * @return void
     */
    public function __construct(Podcast $podcast)
    {
        $this->podcast = $podcast;
    }

    /**
     * 执行作业。
     *
     * @param  AudioProcessor  $processor
     * @return void
     */
    public function handle(AudioProcessor $processor)
    {
        // 处理上传的播客...
    }
}

在此示例中,请注意我们能够将 Eloquent 模型 直接传递到排队作业的构造函数中。由于作业使用的 SerializesModels trait,Eloquent 模型及其加载的关系将在作业处理时被优雅地序列化和反序列化。如果您的排队作业在其构造函数中接受 Eloquent 模型,则只有模型的标识符会被序列化到队列中。当作业实际处理时,队列系统将自动从数据库中重新检索完整的模型实例及其加载的关系。这对您的应用程序来说是完全透明的,并防止了序列化完整 Eloquent 模型实例时可能出现的问题。

handle 方法在作业被队列处理时调用。请注意,我们能够在作业的 handle 方法上进行类型提示依赖项。Laravel 服务容器 会自动注入这些依赖项。

如果您希望完全控制容器如何将依赖项注入到 handle 方法中,可以使用容器的 bindMethod 方法。bindMethod 方法接受一个回调,该回调接收作业和容器。在回调中,您可以随意调用 handle 方法。通常,您应该从 服务提供者 中调用此方法:

php
use App\Jobs\ProcessPodcast;

$this->app->bindMethod(ProcessPodcast::class.'@handle', function ($job, $app) {
    return $job->handle($app->make(AudioProcessor::class));
});

NOTE

二进制数据(如原始图像内容)应在传递给排队作业之前通过 base64_encode 函数传递。否则,作业在放置到队列时可能无法正确序列化为 JSON。

处理关系

由于加载的关系也会被序列化,序列化的作业字符串可能会变得相当大。要防止关系被序列化,您可以在设置属性值时调用模型的 withoutRelations 方法。此方法将返回一个没有加载关系的模型实例:

php
/**
 * 创建一个新的作业实例。
 *
 * @param  \App\Podcast  $podcast
 * @return void
 */
public function __construct(Podcast $podcast)
{
    $this->podcast = $podcast->withoutRelations();
}

作业中间件

作业中间件允许您在排队作业的执行过程中包裹自定义逻辑,从而减少作业本身的样板代码。例如,考虑以下 handle 方法,它利用 Laravel 的 Redis 速率限制功能,每五秒钟只允许一个作业处理一次:

php
/**
 * 执行作业。
 *
 * @return void
 */
public function handle()
{
    Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
        info('获得锁...');

        // 处理作业...
    }, function () {
        // 无法获得锁...

        return $this->release(5);
    });
}

虽然此代码是有效的,但 handle 方法的结构变得嘈杂,因为它被 Redis 速率限制逻辑所干扰。此外,必须为我们希望速率限制的任何其他作业复制此速率限制逻辑。

我们可以定义一个处理速率限制的作业中间件,而不是在 handle 方法中进行速率限制。Laravel 没有作业中间件的默认位置,因此您可以将作业中间件放置在应用程序中的任何位置。在此示例中,我们将中间件放置在 app/Jobs/Middleware 目录中:

php
<?php

namespace App\Jobs\Middleware;

use Illuminate\Support\Facades\Redis;

class RateLimited
{
    /**
     * 处理排队作业。
     *
     * @param  mixed  $job
     * @param  callable  $next
     * @return mixed
     */
    public function handle($job, $next)
    {
        Redis::throttle('key')
                ->block(0)->allow(1)->every(5)
                ->then(function () use ($job, $next) {
                    // 获得锁...

                    $next($job);
                }, function () use ($job) {
                    // 无法获得锁...

                    $job->release(5);
                });
    }
}

如您所见,像 路由中间件 一样,作业中间件接收正在处理的作业和一个应调用的回调以继续处理作业。

创建作业中间件后,可以通过从作业的 middleware 方法返回它们来将它们附加到作业。此方法不存在于 make:job Artisan 命令生成的作业中,因此您需要将其添加到自己的作业类定义中:

php
use App\Jobs\Middleware\RateLimited;

/**
 * 获取作业应通过的中间件。
 *
 * @return array
 */
public function middleware()
{
    return [new RateLimited];
}

调度作业

编写作业类后,您可以使用作业本身的 dispatch 方法调度它。传递给 dispatch 方法的参数将传递给作业的构造函数:

php
<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储新播客。
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客...

        ProcessPodcast::dispatch($podcast);
    }
}

延迟调度

如果您希望延迟排队作业的执行,可以在调度作业时使用 delay 方法。例如,让我们指定一个作业在调度后 10 分钟内不可用:

php
<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储新播客。
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客...

        ProcessPodcast::dispatch($podcast)
                ->delay(now()->addMinutes(10));
    }
}

NOTE

Amazon SQS 队列服务的最大延迟时间为 15 分钟。

同步调度

如果您希望立即(同步)调度作业,可以使用 dispatchNow 方法。使用此方法时,作业不会被排队,而是在当前进程中立即运行:

php
<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储新播客。
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客...

        ProcessPodcast::dispatchNow($podcast);
    }
}

作业链

作业链允许您指定在主作业成功执行后应按顺序运行的排队作业列表。如果序列中的一个作业失败,则不会运行其余作业。要执行排队作业链,您可以在任何可调度作业上使用 withChain 方法:

php
ProcessPodcast::withChain([
    new OptimizePodcast,
    new ReleasePodcast
])->dispatch();

NOTE

使用 $this->delete() 方法删除作业不会阻止链式作业被处理。链条只会在链中的作业失败时停止执行。

链连接和队列

如果您希望为链式作业指定默认连接和队列,可以使用 allOnConnectionallOnQueue 方法。这些方法指定队列连接和队列名称,除非排队作业明确分配了不同的连接/队列:

php
ProcessPodcast::withChain([
    new OptimizePodcast,
    new ReleasePodcast
])->dispatch()->allOnConnection('redis')->allOnQueue('podcasts');

自定义队列和连接

调度到特定队列

通过将作业推送到不同的队列,您可以“分类”排队作业,甚至可以优先考虑为各种队列分配多少工作者。请记住,这不会将作业推送到您的队列配置文件中定义的不同队列“连接”,而只是推送到单个连接中的特定队列。要指定队列,请在调度作业时使用 onQueue 方法:

php
<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储新播客。
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客...

        ProcessPodcast::dispatch($podcast)->onQueue('processing');
    }
}

调度到特定连接

如果您正在处理多个队列连接,可以指定将作业推送到哪个连接。要指定连接,请在调度作业时使用 onConnection 方法:

php
<?php

namespace App\Http\Controllers;

use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储新播客。
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // 创建播客...

        ProcessPodcast::dispatch($podcast)->onConnection('sqs');
    }
}

您可以链接 onConnectiononQueue 方法来为作业指定连接和队列:

php
ProcessPodcast::dispatch($podcast)
              ->onConnection('sqs')
              ->onQueue('processing');

或者,您可以在作业类上将 connection 指定为属性:

php
<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 应处理作业的队列连接。
     *
     * @var string
     */
    public $connection = 'sqs';
}

指定最大作业尝试次数/超时值

最大尝试次数

指定作业可以尝试的最大次数的一种方法是通过 Artisan 命令行上的 --tries 开关:

php
php artisan queue:work --tries=3

但是,您可以通过在作业类本身上定义最大尝试次数来采取更细粒度的方法。如果在作业上指定了最大尝试次数,它将优先于命令行上提供的值:

php
<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 作业可以尝试的次数。
     *
     * @var int
     */
    public $tries = 5;
}

基于时间的尝试

作为定义作业可以尝试多少次的替代方法,您可以定义作业应超时的时间。这允许在给定时间范围内尝试任何次数的作业。要定义作业应超时的时间,请在作业类中添加 retryUntil 方法:

php
/**
 * 确定作业应超时的时间。
 *
 * @return \DateTime
 */
public function retryUntil()
{
    return now()->addSeconds(5);
}

NOTE

您还可以在排队的事件监听器上定义 retryUntil 方法。

超时

NOTE

timeout 功能针对 PHP 7.1+ 和 pcntl PHP 扩展进行了优化。

同样,可以使用 Artisan 命令行上的 --timeout 开关指定作业可以运行的最大秒数:

php
php artisan queue:work --timeout=30

但是,您也可以在作业类本身上定义作业应允许运行的最大秒数。如果在作业上指定了超时,它将优先于命令行上指定的任何超时:

php
<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 作业在超时之前可以运行的秒数。
     *
     * @var int
     */
    public $timeout = 120;
}

速率限制

NOTE

此功能要求您的应用程序可以与 Redis 服务器 交互。

如果您的应用程序与 Redis 交互,您可以按时间或并发限制排队作业的速率。当您的排队作业与速率限制的 API 交互时,此功能可以提供帮助。

例如,使用 throttle 方法,您可以将给定类型的作业限制为每 60 秒仅运行 10 次。如果无法获得锁,您通常应将作业释放回队列,以便稍后重试:

php
Redis::throttle('key')->allow(10)->every(60)->then(function () {
    // 作业逻辑...
}, function () {
    // 无法获得锁...

    return $this->release(10);
});

NOTE

在上面的示例中,key 可以是唯一标识您希望速率限制的作业类型的任何字符串。例如,您可能希望根据作业的类名和它操作的 Eloquent 模型的 ID 构建键。

NOTE

将节流的作业释放回队列仍会增加作业的总 attempts 次数。

或者,您可以指定可以同时处理给定作业的最大工作者数量。当排队作业正在修改应仅由一个作业一次修改的资源时,这可能会有所帮助。例如,使用 funnel 方法,您可以限制给定类型的作业仅由一个工作者同时处理:

php
Redis::funnel('key')->limit(1)->then(function () {
    // 作业逻辑...
}, function () {
    // 无法获得锁...

    return $this->release(10);
});

NOTE

使用速率限制时,您的作业需要成功运行的尝试次数可能很难确定。因此,将速率限制与 基于时间的尝试 结合使用是有用的。

错误处理

如果在处理作业时抛出异常,作业将自动释放回队列,以便再次尝试。作业将继续释放,直到达到应用程序允许的最大尝试次数。最大尝试次数由 queue:work Artisan 命令上使用的 --tries 开关定义。或者,可以在作业类本身上定义最大尝试次数。有关运行队列工作者的更多信息 可以在下面找到

队列闭包

除了将作业类调度到队列之外,您还可以调度闭包。这对于需要在当前请求周期之外执行的快速简单任务非常有用:

php
$podcast = App\Podcast::find(1);

dispatch(function () use ($podcast) {
    $podcast->publish();
});

将闭包调度到队列时,闭包的代码内容会被加密签名,以便在传输过程中无法修改。

运行队列工作者

Laravel 包含一个队列工作者,它将在作业推送到队列时处理新作业。您可以使用 queue:work Artisan 命令运行工作者。请注意,一旦 queue:work 命令启动,它将继续运行,直到手动停止或关闭终端:

php
php artisan queue:work

NOTE

要使 queue:work 进程永久在后台运行,您应该使用进程监视器(如 Supervisor)来确保队列工作者不会停止运行。

请记住,队列工作者是长时间运行的进程,并在内存中存储已启动的应用程序状态。因此,它们在启动后不会注意到代码库中的更改。因此,在部署过程中,请确保 重新启动队列工作者。此外,请记住,应用程序创建或修改的任何静态状态在作业之间不会自动重置。

或者,您可以运行 queue:listen 命令。使用 queue:listen 命令时,您不必在想要重新加载更新的代码或重置应用程序状态时手动重新启动工作者;但是,此命令不如 queue:work 高效:

php
php artisan queue:listen

指定连接和队列

您还可以指定工作者应使用哪个队列连接。传递给 work 命令的连接名称应对应于 config/queue.php 配置文件中定义的连接之一:

php
php artisan queue:work redis

您可以通过仅处理给定连接的特定队列来进一步自定义队列工作者。例如,如果您的所有电子邮件都在 redis 队列连接上的 emails 队列中处理,您可以发出以下命令以启动仅处理该队列的工作者:

php
php artisan queue:work redis --queue=emails

处理单个作业

--once 选项可用于指示工作者仅处理队列中的单个作业:

php
php artisan queue:work --once

处理所有排队作业然后退出

--stop-when-empty 选项可用于指示工作者处理所有作业然后优雅地退出。当在 Docker 容器中使用 Laravel 队列时,如果希望在队列为空后关闭容器,此选项非常有用:

php
php artisan queue:work --stop-when-empty

资源考虑

守护进程队列工作者在处理每个作业之前不会“重启”框架。因此,您应该在每个作业完成后释放任何重资源。例如,如果您使用 GD 库进行图像处理,完成后应使用 imagedestroy 释放内存。

队列优先级

有时您可能希望优先处理队列。例如,在 config/queue.php 中,您可以将 redis 连接的默认 queue 设置为 low。但是,有时您可能希望将作业推送到 high 优先级队列,如下所示:

php
dispatch((new Job)->onQueue('high'));

要启动一个工作者,确保在继续处理 low 队列上的任何作业之前处理所有 high 队列作业,请将逗号分隔的队列名称列表传递给 work 命令:

php
php artisan queue:work --queue=high,low

队列工作者和部署

由于队列工作者是长时间运行的进程,因此在不重新启动的情况下不会拾取代码更改。因此,使用队列工作者部署应用程序的最简单方法是在部署过程中重新启动工作者。您可以通过发出 queue:restart 命令优雅地重新启动所有工作者:

php
php artisan queue:restart

此命令将指示所有队列工作者在完成当前作业后优雅地“死亡”,以便不会丢失现有作业。由于在执行 queue:restart 命令时队列工作者将死亡,因此您应该运行一个进程管理器(如 Supervisor)以自动重新启动队列工作者。

NOTE

队列使用 缓存 存储重启信号,因此在使用此功能之前,您应验证是否为应用程序正确配置了缓存驱动程序。

作业过期和超时

作业过期

config/queue.php 配置文件中,每个队列连接定义了一个 retry_after 选项。此选项指定队列连接在重试正在处理的作业之前应等待的秒数。例如,如果 retry_after 的值设置为 90,则如果作业已处理 90 秒而未被删除,它将被释放回队列。通常,您应将 retry_after 值设置为作业合理完成处理所需的最大秒数。

NOTE

唯一不包含 retry_after 值的队列连接是 Amazon SQS。SQS 将根据 AWS 控制台中管理的 默认可见性超时 重试作业。

工作者超时

queue:work Artisan 命令公开了一个 --timeout 选项。--timeout 选项指定 Laravel 队列主进程在终止正在处理作业的子队列工作者之前将等待的时间。有时,子队列进程可能会因各种原因而“冻结”。--timeout 选项会删除超过指定时间限制的冻结进程:

php
php artisan queue:work --timeout=60

retry_after 配置选项和 --timeout CLI 选项不同,但它们协同工作以确保作业不会丢失,并且作业仅成功处理一次。

NOTE

--timeout 值应始终比 retry_after 配置值短几秒钟。这将确保在重试作业之前始终终止处理给定作业的工作者。如果 --timeout 选项长于 retry_after 配置值,您的作业可能会被处理两次。

工作者休眠时间

当队列上有作业可用时,工作者将继续处理作业而没有延迟。然而,sleep 选项决定了如果没有新作业可用,工作者将“休眠”的时间(以秒为单位)。在休眠期间,工作者不会处理任何新作业 - 作业将在工作者再次唤醒后处理。

php
php artisan queue:work --sleep=3

Supervisor 配置

安装 Supervisor

Supervisor 是 Linux 操作系统的进程监视器,如果 queue:work 进程失败,它将自动重新启动。要在 Ubuntu 上安装 Supervisor,您可以使用以下命令:

bash
sudo apt-get install supervisor

NOTE

如果自己配置 Supervisor 听起来很复杂,请考虑使用 Laravel Forge,它将自动为您的 Laravel 项目安装和配置 Supervisor。

配置 Supervisor

Supervisor 配置文件通常存储在 /etc/supervisor/conf.d 目录中。在此目录中,您可以创建任意数量的配置文件,指示 Supervisor 如何监视您的进程。例如,让我们创建一个 laravel-worker.conf 文件,启动并监视一个 queue:work 进程:

ini
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
autostart=true
autorestart=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600

在此示例中,numprocs 指令将指示 Supervisor 运行 8 个 queue:work 进程并监视所有进程,如果它们失败,将自动重新启动。您应更改 command 指令中的 queue:work sqs 部分以反映您所需的队列连接。

NOTE

您应确保 stopwaitsecs 的值大于最长运行作业消耗的秒数。否则,Supervisor 可能会在作业完成处理之前终止作业。

启动 Supervisor

创建配置文件后,您可以使用以下命令更新 Supervisor 配置并启动进程:

bash
sudo supervisorctl reread

sudo supervisorctl update

sudo supervisorctl start laravel-worker:*

有关 Supervisor 的更多信息,请查阅 Supervisor 文档

处理失败的作业

有时您的排队作业会失败。别担心,事情并不总是按计划进行!Laravel 提供了一种方便的方法来指定作业应尝试的最大次数。在作业超过此尝试次数后,它将被插入到 failed_jobs 数据库表中。要为 failed_jobs 表创建迁移,您可以使用 queue:failed-table 命令:

bash
php artisan queue:failed-table

php artisan migrate

然后,在运行 队列工作者 时,您可以使用 queue:work 命令上的 --tries 开关指定作业应尝试的最大次数。如果您没有为 --tries 选项指定值,作业将仅尝试一次:

bash
php artisan queue:work redis --tries=3

此外,您可以使用 --delay 选项指定 Laravel 在重试失败作业之前应等待的秒数。默认情况下,作业会立即重试:

bash
php artisan queue:work redis --tries=3 --delay=3

如果您希望在每个作业的基础上配置失败作业重试延迟,可以通过在排队作业类上定义 retryAfter 属性来实现:

php
/**
 * 重试作业前等待的秒数。
 *
 * @var int
 */
public $retryAfter = 3;

清理失败的作业

您可以直接在作业类上定义一个 failed 方法,允许您在发生故障时执行作业特定的清理。这是向用户发送警报或还原作业执行的任何操作的理想位置。导致作业失败的 Exception 将传递给 failed 方法:

php
<?php

namespace App\Jobs;

use App\AudioProcessor;
use App\Podcast;
use Exception;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class ProcessPodcast implements ShouldQueue
{
    use InteractsWithQueue, Queueable, SerializesModels;

    protected $podcast;

    /**
     * 创建一个新的作业实例。
     *
     * @param  Podcast  $podcast
     * @return void
     */
    public function __construct(Podcast $podcast)
    {
        $this->podcast = $podcast;
    }

    /**
     * 执行作业。
     *
     * @param  AudioProcessor  $processor
     * @return void
     */
    public function handle(AudioProcessor $processor)
    {
        // 处理上传的播客...
    }

    /**
     * 作业处理失败。
     *
     * @param  Exception  $exception
     * @return void
     */
    public function failed(Exception $exception)
    {
        // 发送用户失败通知等...
    }
}

NOTE

如果作业是使用 dispatchNow 方法调度的,则不会调用 failed 方法。

失败作业事件

如果您希望注册一个在作业失败时调用的事件,可以使用 Queue::failing 方法。此事件是通过电子邮件或 Slack 通知团队的绝佳机会。例如,我们可以从 Laravel 附带的 AppServiceProvider 中附加一个回调到此事件:

php
<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;

class AppServiceProvider extends ServiceProvider
{
    /**
     * 注册任何应用程序服务。
     *
     * @return void
     */
    public function register()
    {
        //
    }

    /**
     * 启动任何应用程序服务。
     *
     * @return void
     */
    public function boot()
    {
        Queue::failing(function (JobFailed $event) {
            // $event->connectionName
            // $event->job
            // $event->exception
        });
    }
}

重试失败的作业

要查看已插入到 failed_jobs 数据库表中的所有失败作业,可以使用 queue:failed Artisan 命令:

bash
php artisan queue:failed

queue:failed 命令将列出作业 ID、连接、队列和失败时间。作业 ID 可用于重试失败的作业。例如,要重试 ID 为 5 的失败作业,请发出以下命令:

bash
php artisan queue:retry 5

要重试所有失败的作业,请执行 queue:retry 命令并传递 all 作为 ID:

bash
php artisan queue:retry all

如果您希望删除失败的作业,可以使用 queue:forget 命令:

bash
php artisan queue:forget 5

要删除所有失败的作业,可以使用 queue:flush 命令:

bash
php artisan queue:flush

忽略缺失的模型

将 Eloquent 模型注入作业时,它会在放置到队列之前自动序列化,并在作业处理时恢复。但是,如果模型在作业等待工作者处理时被删除,您的作业可能会因 ModelNotFoundException 而失败。

为了方便起见,您可以选择通过将作业的 deleteWhenMissingModels 属性设置为 true 来自动删除缺失模型的作业:

php
/**
 * 如果模型不再存在,则删除作业。
 *
 * @var bool
 */
public $deleteWhenMissingModels = true;

作业事件

使用 Queue facade 上的 beforeafter 方法,您可以指定在排队作业处理之前或之后执行的回调。这些回调是执行额外日志记录或为仪表板增加统计信息的绝佳机会。通常,您应该从 服务提供者 中调用这些方法。例如,我们可以使用 Laravel 附带的 AppServiceProvider

php
<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;

class AppServiceProvider extends ServiceProvider
{
    /**
     * 注册任何应用程序服务。
     *
     * @return void
     */
    public function register()
    {
        //
    }

    /**
     * 启动任何应用程序服务。
     *
     * @return void
     */
    public function boot()
    {
        Queue::before(function (JobProcessing $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });

        Queue::after(function (JobProcessed $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });
    }
}

使用 Queue facade 上的 looping 方法,您可以指定在工作者尝试从队列获取作业之前执行的回调。例如,您可以注册一个闭包以回滚任何由先前失败的作业留下的事务:

php
Queue::looping(function () {
    while (DB::transactionLevel() > 0) {
        DB::rollBack();
    }
});