Jeśli używasz kolejek Laravel, znasz podstawy: wysyłasz job, odpala się w tle, gotowe. Ale prawdziwe aplikacje mają bardziej złożone wymagania. Musisz przetworzyć sto plików i wiedzieć, kiedy wszystkie się zakończą. Potrzebujesz jobów zależnych od innych. Chcesz obsługiwać błędy bez utraty kontekstu. Potrzebujesz, żeby niektóre joby nigdy nie działały równolegle, inne - żeby były rate-limitowane względem zewnętrznego API. Ten artykuł omawia wzorce, które uzupełniają lukę między działającą kolejką a produkcyjnym systemem kolejkowania.
📋 Spis treści
- 📦 Job Batching - Grupuj, śledź, reaguj
- 🔗 Job Chaining - Sekwencyjne pipeline'y
- 🛡️ Job Middleware - Reguły per job
- 🔁 ShouldBeUnique i ShouldBeEncrypted
- 💥 Strategie błędów - Retry, Backoff, Dead Letter Queue
- 🧪 Testowanie z Bus::fake() i Queue::fake()
- 🚀 Kolejki priorytetowe w Redis z Supervisor
- ✅ Podsumowanie
📦 Job Batching - Grupuj, śledź, reaguj
Batching pozwala wysłać grupę jobów i reagować, gdy wszystkie się zakończą, gdy któryś się nie powiedzie, albo w obu przypadkach.
Podstawowy batch:
// app/Actions/ImportProductsAction.php
declare(strict_types=1);
namespace App\Actions;
use App\Jobs\ImportProductChunkJob;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;
class ImportProductsAction
{
public function handle(array $productChunks): string
{
$batch = Bus::batch(
collect($productChunks)->map(
fn (array $chunk) => new ImportProductChunkJob($chunk)
)->toArray()
)
->then(function (Batch $batch) {
// Wszystkie joby zakończone pomyślnie
logger()->info("Import zakończony: {$batch->totalJobs} produktów.");
})
->catch(function (Batch $batch, Throwable $e) {
// Pierwsze niepowodzenie - batch domyślnie kontynuuje
logger()->error("Chunk importu nie powiódł się: {$e->getMessage()}");
})
->finally(function (Batch $batch) {
// Zawsze się odpala - sukces lub błąd
ImportLog::updateStatus($batch->id, $batch->failedJobs > 0 ? 'partial' : 'complete');
})
->name('Product Import')
->allowFailures() // Nie anuluj całego batcha przy jednym błędzie
->dispatch();
return $batch->id;
}
}
Sam job musi używać traitu Batchable, żeby integrować się z cyklem życia batcha:
// app/Jobs/ImportProductChunkJob.php
declare(strict_types=1);
namespace App\Jobs;
use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
class ImportProductChunkJob implements ShouldQueue
{
use Batchable, Dispatchable, InteractsWithQueue, Queueable;
public function __construct(public readonly array $chunk) {}
public function handle(): void
{
// Sprawdź, czy batch nie został anulowany przed ciężką pracą
if ($this->batch()?->cancelled()) {
return;
}
foreach ($this->chunk as $product) {
Product::updateOrCreate(['sku' => $product['sku']], $product);
}
}
}
Sprawdź postęp batcha w kontrolerze:
// app/Http/Controllers/Api/ImportController.php
declare(strict_types=1);
namespace App\Http\Controllers\Api;
use Illuminate\Support\Facades\Bus;
class ImportController extends Controller
{
public function status(string $batchId): JsonResponse
{
$batch = Bus::findBatch($batchId);
return response()->json([
'total' => $batch->totalJobs,
'processed' => $batch->processedJobs(),
'failed' => $batch->failedJobs,
'progress' => $batch->progress(), // 0–100
'finished' => $batch->finished(),
]);
}
}
To daje endpoint postępu do pollingu z frontendu.
Setup - batche wymagają tabeli job_batches:
php artisan queue:batches-table
php artisan migrate
🔗 Job Chaining - Sekwencyjne pipeline'y
Chaining uruchamia joby sekwencyjnie - następny startuje tylko gdy poprzedni zakończy się pomyślnie. Jeśli którykolwiek się nie powiedzie, reszta łańcucha jest porzucana.
// app/Actions/ProcessOrderAction.php
declare(strict_types=1);
namespace App\Actions;
use App\Jobs\ChargePaymentJob;
use App\Jobs\GenerateInvoiceJob;
use App\Jobs\SendConfirmationEmailJob;
use App\Jobs\UpdateInventoryJob;
use Illuminate\Support\Facades\Bus;
class ProcessOrderAction
{
public function handle(int $orderId): void
{
Bus::chain([
new ChargePaymentJob($orderId),
new UpdateInventoryJob($orderId),
new GenerateInvoiceJob($orderId),
new SendConfirmationEmailJob($orderId),
])
->catch(function (Throwable $e) use ($orderId) {
logger()->error("Pipeline zamówienia {$orderId} nie powiódł się: {$e->getMessage()}");
Order::find($orderId)?->markAsFailed();
})
->dispatch();
}
}
Warunkowa kontynuacja łańcucha - PendingChain::dispatchIf():
Bus::chain([
new ChargePaymentJob($orderId),
new UpdateInventoryJob($orderId),
])
->dispatchIf($order->requiresShipping, new CreateShipmentJob($orderId));
Mieszanie batchów i łańcuchów - batch wewnątrz łańcucha:
Bus::chain([
new ValidateOrderJob($orderId),
Bus::batch([
new ProcessPaymentJob($orderId),
new ReserveInventoryJob($orderId),
])->allowFailures(),
new SendConfirmationJob($orderId),
])->dispatch();
Łańcuch czeka na zakończenie całego batcha przed przejściem do SendConfirmationJob.
🛡️ Job Middleware - Reguły per job
Job middleware pozwala przypiąć wielokrotnie używalne zachowania bezpośrednio do joba - rate limiting, deduplikację, throttling - bez powielania logiki w handle().
WithoutOverlapping - zapobiegaj równoległemu wykonaniu:
// app/Jobs/GenerateReportJob.php
declare(strict_types=1);
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\Middleware\WithoutOverlapping;
class GenerateReportJob implements ShouldQueue
{
public function __construct(public readonly int $userId) {}
public function middleware(): array
{
return [
// Tylko jeden GenerateReportJob per user może działać naraz
new WithoutOverlapping($this->userId),
];
}
public function handle(): void
{
// Generuj raport dla $this->userId
}
}
WithoutOverlapping używa blokady Redis zakluczowanej na klasie joba + ID, które podajesz. Nakładające się joby są zwracane do kolejki i ponawiane.
Skonfiguruj wygasanie i zachowanie przy zwrocie:
public function middleware(): array
{
return [
(new WithoutOverlapping($this->userId))
->expireAfter(300) // Blokada wygasa po 5 minutach (zapobieganie deadlockowi)
->releaseAfter(30) // Zwróć do kolejki po 30 sekundach
->dontRelease(), // Lub całkowicie odrzuć nakładające się joby
];
}
ThrottlesExceptions - cofaj się przy powtarzających się błędach:
// app/Jobs/SyncExternalApiJob.php
use Illuminate\Queue\Middleware\ThrottlesExceptions;
public function middleware(): array
{
return [
// Po 3 wyjątkach odczekaj 10 minut
new ThrottlesExceptions(maxAttempts: 3, decayMinutes: 10),
];
}
To różni się od $tries. ThrottlesExceptions wstrzymuje joba gdy zaczyna się nie udawać, dając zewnętrznemu serwisowi czas na odbudowanie, i automatycznie wznawia.
RateLimited - respektuj limity zewnętrznego API:
// app/Providers/AppServiceProvider.php
use Illuminate\Support\Facades\RateLimiter;
use Illuminate\Cache\RateLimiting\Limit;
RateLimiter::for('external-api', function () {
return Limit::perMinute(60); // Maks 60 wywołań API na minutę łącznie przez wszystkich workerów
});
// app/Jobs/SendToExternalApiJob.php
use Illuminate\Queue\Middleware\RateLimited;
public function middleware(): array
{
return [new RateLimited('external-api')];
}
Wszyscy workerzy dzielą ten sam rate limiter przez Redis - jeśli masz 5 workerów, łącznie respektują limit 60/minutę, nie 300/minutę.
🔁 ShouldBeUnique i ShouldBeEncrypted
ShouldBeUnique - zapobiega wysłaniu duplikatu joba gdy jeden już jest w kolejce lub działa:
// app/Jobs/RecalculateUserStatsJob.php
declare(strict_types=1);
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
class RecalculateUserStatsJob implements ShouldQueue, ShouldBeUnique
{
public int $uniqueFor = 3600; // Okno unikalności: 1 godzina
public function __construct(public readonly int $userId) {}
public function uniqueId(): string
{
return (string) $this->userId; // Jeden job per user naraz
}
public function handle(): void
{
// Przelicz statystyki dla tego użytkownika
}
}
Jeśli wysyłasz RecalculateUserStatsJob(42) gdy jeden już jest w kolejce dla usera 42, drugi dispatch jest po cichu odrzucany.
ShouldBeUniqueUntilProcessing - zwalnia blokadę unikalności zaraz gdy job startuje (nie kończy), pozwalając na ponowne wysłanie podczas gdy aktualny działa:
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;
class RecalculateUserStatsJob implements ShouldQueue, ShouldBeUniqueUntilProcessing
ShouldBeEncrypted - szyfruje payload joba w kolejce:
// app/Jobs/SendPasswordResetJob.php
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueue;
class SendPasswordResetJob implements ShouldQueue, ShouldBeEncrypted
{
public function __construct(
public readonly string $email,
public readonly string $token, // Niewidoczny w plain text w Redis
) {}
}
Używaj tego dla każdego joba niosącego wrażliwe dane - tokeny, PII, referencje płatności - które inaczej byłyby czytelne w Redis lub tabeli kolejki.
💥 Strategie błędów - Retry, Backoff, Dead Letter Queue
Podstawowa konfiguracja retry:
// app/Jobs/ProcessWebhookJob.php
declare(strict_types=1);
namespace App\Jobs;
use Illuminate\Contracts\Queue\ShouldQueue;
class ProcessWebhookJob implements ShouldQueue
{
public int $tries = 5; // Łączna liczba prób przed oznaczeniem jako failed
public int $maxExceptions = 3; // Oznacz jako failed po 3 nieobsłużonych wyjątkach
// Tablicowy backoff - czekaj 60s, potem 180s, potem 600s między próbami
public array $backoff = [60, 180, 600];
public function handle(): void
{
// Przetwórz payload webhooka
}
public function failed(\Throwable $e): void
{
// Wywoływane gdy job wyczerpał wszystkie próby
logger()->error("Webhook trwale nie powiódł się: {$e->getMessage()}", [
'job' => $this::class,
]);
WebhookLog::markFailed($this->webhookId, $e->getMessage());
}
}
retryUntil() - okno ponowień oparte na czasie:
public function retryUntil(): \DateTime
{
// Ponawiaj przez 24 godziny, potem poddaj się
return now()->addHours(24);
}
To lepsze niż stała liczba $tries dla jobów zależnych od zewnętrznych serwisów - ponawiasz przez sensowne okno czasowe, a nie liczbę prób.
Custom Dead Letter Queue - zapisz wszystkie nieudane joby do dedykowanej tabeli:
// database/migrations/2026_04_01_create_failed_jobs_detail_table.php
Schema::create('failed_jobs_detail', function (Blueprint $table) {
$table->id();
$table->string('job_class');
$table->json('payload');
$table->text('exception');
$table->timestamp('failed_at');
});
// app/Jobs/Concerns/LogsFailure.php
declare(strict_types=1);
namespace App\Jobs\Concerns;
use App\Models\FailedJobDetail;
trait LogsFailure
{
public function failed(\Throwable $e): void
{
FailedJobDetail::create([
'job_class' => static::class,
'payload' => json_encode($this),
'exception' => $e->getMessage() . "\n" . $e->getTraceAsString(),
'failed_at' => now(),
]);
}
}
Dodaj trait do każdego joba wymagającego niestandardowego śledzenia błędów, a następnie zbuduj prosty dashboard na failed_jobs_detail.
🧪 Testowanie z Bus::fake() i Queue::fake()
Queue::fake() - asercje na wysłanych jobach:
// tests/Feature/OrderControllerTest.php
declare(strict_types=1);
use App\Jobs\ProcessOrderJob;
use App\Jobs\SendConfirmationEmailJob;
use Illuminate\Support\Facades\Queue;
it('wysyła job przetwarzania przy tworzeniu zamówienia', function () {
Queue::fake();
$this->postJson('/api/v1/orders', [...])
->assertCreated();
Queue::assertPushed(ProcessOrderJob::class);
Queue::assertNotPushed(SendConfirmationEmailJob::class);
});
it('wysyła na właściwą kolejkę', function () {
Queue::fake();
ProcessOrderJob::dispatch($order);
Queue::assertPushedOn('orders', ProcessOrderJob::class);
});
Bus::fake() - asercje na łańcuchach i batchach:
// tests/Feature/ProcessOrderActionTest.php
declare(strict_types=1);
use App\Actions\ProcessOrderAction;
use App\Jobs\ChargePaymentJob;
use App\Jobs\GenerateInvoiceJob;
use Illuminate\Support\Facades\Bus;
it('wysyła właściwy łańcuch jobów dla zamówienia', function () {
Bus::fake();
app(ProcessOrderAction::class)->handle($order->id);
Bus::assertChained([
ChargePaymentJob::class,
UpdateInventoryJob::class,
GenerateInvoiceJob::class,
SendConfirmationEmailJob::class,
]);
});
it('wysyła batch przy masowym imporcie', function () {
Bus::fake();
app(ImportProductsAction::class)->handle($chunks);
Bus::assertBatched(function ($batch) use ($chunks) {
return $batch->jobs->count() === count($chunks)
&& $batch->name === 'Product Import';
});
});
Testuj hook failed() bezpośrednio:
it('loguje błąd gdy job webhooka wyczerpie retry', function () {
$job = new ProcessWebhookJob($webhookId);
$exception = new \RuntimeException('Timeout zewnętrznego serwisu');
$job->failed($exception);
expect(WebhookLog::where('webhook_id', $webhookId)->first()->status)
->toBe('failed');
});
🚀 Kolejki priorytetowe w Redis z Supervisor
Workery Laravel mogą słuchać na wielu kolejkach z określonym priorytetem. Joby na kolejkach wyższego priorytetu są przetwarzane pierwsze.
Wysyłaj do konkretnej kolejki:
// Wysoki priorytet - przetwarzanie płatności
ProcessPaymentJob::dispatch($order)->onQueue('high');
// Normalny priorytet - powiadomienia
SendEmailJob::dispatch($user)->onQueue('default');
// Niski priorytet - analityka, raporty
UpdateAnalyticsJob::dispatch($event)->onQueue('low');
Konfiguracja Supervisor - procesy workerów z kolejnością priorytetów:
; /etc/supervisor/conf.d/laravel-worker.conf
[program:laravel-worker-high]
process_name=%(program_name)s_%(process_num)02d
command=php /var/www/home/artisan queue:work redis --queue=high,default,low --sleep=3 --tries=3
autostart=true
autorestart=true
numprocs=4
redirect_stderr=true
stdout_logfile=/var/log/laravel-worker-high.log
[program:laravel-worker-low]
process_name=%(program_name)s_%(process_num)02d
command=php /var/www/home/artisan queue:work redis --queue=low --sleep=3 --tries=3
autostart=true
autorestart=true
numprocs=1
redirect_stderr=true
stdout_logfile=/var/log/laravel-worker-low.log
Kolejność --queue=high,default,low oznacza, że worker sprawdza najpierw kolejkę high, potem default, potem low. Cztery workery obsługują joby wysokiego priorytetu; jeden obsługuje pracę w tle niskiego priorytetu.
Kluczowa zasada: osobne workery dla różnych priorytetów zapobiegają zagłodzeniu jobów wysokiego priorytetu. Nawet jeśli low ma zaległy backlog 10k jobów analitycznych, workery płatności nie są tym dotknięte.
✅ Podsumowanie
- Używaj batching gdy musisz przetworzyć grupę jobów i reagować na ich zbiorowe zakończenie - importy, operacje bulk, równoległe pipeline'y
- Używaj chaining gdy kroki muszą się wykonywać sekwencyjnie i każdy zależy od poprzedniego - przetwarzanie zamówień, wieloetapowe przepływy
- Dodawaj job middleware (
WithoutOverlapping,ThrottlesExceptions,RateLimited) żeby wyrażać reguły per job bez zaśmiecaniahandle() - Używaj
ShouldBeUniqueżeby zapobiec duplikatom jobów dla tej samej encji przy częstym dispatchowaniu - Używaj
ShouldBeEncrypteddla każdego joba niosącego PII, tokeny lub wrażliwe referencje - Buduj Dead Letter Queue na hooku
failed()żeby mieć kontekst przy trwałych błędach - Pisz testy z
Bus::fake()do asercji łańcuchów i batchów - to najcięższe bugi do złapania bez nich - Używaj kolejek priorytetowych z osobnymi procesami Supervisora żeby krytyczne joby nigdy nie czekały za pracą w tle
Obserwuj mnie na LinkedIn po więcej tipów z Laravel! Chcesz deep-dive w Laravel Pulse do monitorowania zdrowia kolejek na produkcji? Daj znać w komentarzach!