Zaawansowane Wzorce Kolejek w Laravel: Batching, Chainy i Blokady Atomowe

Jeśli używasz kolejek Laravel, znasz podstawy: wysyłasz job, odpala się w tle, gotowe. Ale prawdziwe aplikacje mają bardziej złożone wymagania. Musisz przetworzyć sto plików i wiedzieć, kiedy wszystkie się zakończą. Potrzebujesz jobów zależnych od innych. Chcesz obsługiwać błędy bez utraty kontekstu. Potrzebujesz, żeby niektóre joby nigdy nie działały równolegle, inne - żeby były rate-limitowane względem zewnętrznego API. Ten artykuł omawia wzorce, które uzupełniają lukę między działającą kolejką a produkcyjnym systemem kolejkowania.

📋 Spis treści

📦 Job Batching - Grupuj, śledź, reaguj

Batching pozwala wysłać grupę jobów i reagować, gdy wszystkie się zakończą, gdy któryś się nie powiedzie, albo w obu przypadkach.

Podstawowy batch:

// app/Actions/ImportProductsAction.php

declare(strict_types=1);

namespace App\Actions;

use App\Jobs\ImportProductChunkJob;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;

class ImportProductsAction
{
    public function handle(array $productChunks): string
    {
        $batch = Bus::batch(
            collect($productChunks)->map(
                fn (array $chunk) => new ImportProductChunkJob($chunk)
            )->toArray()
        )
        ->then(function (Batch $batch) {
            // Wszystkie joby zakończone pomyślnie
            logger()->info("Import zakończony: {$batch->totalJobs} produktów.");
        })
        ->catch(function (Batch $batch, Throwable $e) {
            // Pierwsze niepowodzenie - batch domyślnie kontynuuje
            logger()->error("Chunk importu nie powiódł się: {$e->getMessage()}");
        })
        ->finally(function (Batch $batch) {
            // Zawsze się odpala - sukces lub błąd
            ImportLog::updateStatus($batch->id, $batch->failedJobs > 0 ? 'partial' : 'complete');
        })
        ->name('Product Import')
        ->allowFailures() // Nie anuluj całego batcha przy jednym błędzie
        ->dispatch();

        return $batch->id;
    }
}

Sam job musi używać traitu Batchable, żeby integrować się z cyklem życia batcha:

// app/Jobs/ImportProductChunkJob.php

declare(strict_types=1);

namespace App\Jobs;

use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;

class ImportProductChunkJob implements ShouldQueue
{
    use Batchable, Dispatchable, InteractsWithQueue, Queueable;

    public function __construct(public readonly array $chunk) {}

    public function handle(): void
    {
        // Sprawdź, czy batch nie został anulowany przed ciężką pracą
        if ($this->batch()?->cancelled()) {
            return;
        }

        foreach ($this->chunk as $product) {
            Product::updateOrCreate(['sku' => $product['sku']], $product);
        }
    }
}

Sprawdź postęp batcha w kontrolerze:

// app/Http/Controllers/Api/ImportController.php

declare(strict_types=1);

namespace App\Http\Controllers\Api;

use Illuminate\Support\Facades\Bus;

class ImportController extends Controller
{
    public function status(string $batchId): JsonResponse
    {
        $batch = Bus::findBatch($batchId);

        return response()->json([
            'total'      => $batch->totalJobs,
            'processed'  => $batch->processedJobs(),
            'failed'     => $batch->failedJobs,
            'progress'   => $batch->progress(), // 0100
            'finished'   => $batch->finished(),
        ]);
    }
}

To daje endpoint postępu do pollingu z frontendu.

Setup - batche wymagają tabeli job_batches:

php artisan queue:batches-table
php artisan migrate

🔗 Job Chaining - Sekwencyjne pipeline'y

Chaining uruchamia joby sekwencyjnie - następny startuje tylko gdy poprzedni zakończy się pomyślnie. Jeśli którykolwiek się nie powiedzie, reszta łańcucha jest porzucana.

// app/Actions/ProcessOrderAction.php

declare(strict_types=1);

namespace App\Actions;

use App\Jobs\ChargePaymentJob;
use App\Jobs\GenerateInvoiceJob;
use App\Jobs\SendConfirmationEmailJob;
use App\Jobs\UpdateInventoryJob;
use Illuminate\Support\Facades\Bus;

class ProcessOrderAction
{
    public function handle(int $orderId): void
    {
        Bus::chain([
            new ChargePaymentJob($orderId),
            new UpdateInventoryJob($orderId),
            new GenerateInvoiceJob($orderId),
            new SendConfirmationEmailJob($orderId),
        ])
        ->catch(function (Throwable $e) use ($orderId) {
            logger()->error("Pipeline zamówienia {$orderId} nie powiódł się: {$e->getMessage()}");
            Order::find($orderId)?->markAsFailed();
        })
        ->dispatch();
    }
}

Warunkowa kontynuacja łańcucha - PendingChain::dispatchIf():

Bus::chain([
    new ChargePaymentJob($orderId),
    new UpdateInventoryJob($orderId),
])
->dispatchIf($order->requiresShipping, new CreateShipmentJob($orderId));

Mieszanie batchów i łańcuchów - batch wewnątrz łańcucha:

Bus::chain([
    new ValidateOrderJob($orderId),
    Bus::batch([
        new ProcessPaymentJob($orderId),
        new ReserveInventoryJob($orderId),
    ])->allowFailures(),
    new SendConfirmationJob($orderId),
])->dispatch();

Łańcuch czeka na zakończenie całego batcha przed przejściem do SendConfirmationJob.

🛡️ Job Middleware - Reguły per job

Job middleware pozwala przypiąć wielokrotnie używalne zachowania bezpośrednio do joba - rate limiting, deduplikację, throttling - bez powielania logiki w handle().

WithoutOverlapping - zapobiegaj równoległemu wykonaniu:

// app/Jobs/GenerateReportJob.php

declare(strict_types=1);

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\Middleware\WithoutOverlapping;

class GenerateReportJob implements ShouldQueue
{
    public function __construct(public readonly int $userId) {}

    public function middleware(): array
    {
        return [
            // Tylko jeden GenerateReportJob per user może działać naraz
            new WithoutOverlapping($this->userId),
        ];
    }

    public function handle(): void
    {
        // Generuj raport dla $this->userId
    }
}

WithoutOverlapping używa blokady Redis zakluczowanej na klasie joba + ID, które podajesz. Nakładające się joby są zwracane do kolejki i ponawiane.

Skonfiguruj wygasanie i zachowanie przy zwrocie:

public function middleware(): array
{
    return [
        (new WithoutOverlapping($this->userId))
            ->expireAfter(300)    // Blokada wygasa po 5 minutach (zapobieganie deadlockowi)
            ->releaseAfter(30)    // Zwróć do kolejki po 30 sekundach
            ->dontRelease(),      // Lub całkowicie odrzuć nakładające się joby
    ];
}

ThrottlesExceptions - cofaj się przy powtarzających się błędach:

// app/Jobs/SyncExternalApiJob.php

use Illuminate\Queue\Middleware\ThrottlesExceptions;

public function middleware(): array
{
    return [
        // Po 3 wyjątkach odczekaj 10 minut
        new ThrottlesExceptions(maxAttempts: 3, decayMinutes: 10),
    ];
}

To różni się od $tries. ThrottlesExceptions wstrzymuje joba gdy zaczyna się nie udawać, dając zewnętrznemu serwisowi czas na odbudowanie, i automatycznie wznawia.

RateLimited - respektuj limity zewnętrznego API:

// app/Providers/AppServiceProvider.php

use Illuminate\Support\Facades\RateLimiter;
use Illuminate\Cache\RateLimiting\Limit;

RateLimiter::for('external-api', function () {
    return Limit::perMinute(60); // Maks 60 wywołań API na minutę łącznie przez wszystkich workerów
});
// app/Jobs/SendToExternalApiJob.php

use Illuminate\Queue\Middleware\RateLimited;

public function middleware(): array
{
    return [new RateLimited('external-api')];
}

Wszyscy workerzy dzielą ten sam rate limiter przez Redis - jeśli masz 5 workerów, łącznie respektują limit 60/minutę, nie 300/minutę.

🔁 ShouldBeUnique i ShouldBeEncrypted

ShouldBeUnique - zapobiega wysłaniu duplikatu joba gdy jeden już jest w kolejce lub działa:

// app/Jobs/RecalculateUserStatsJob.php

declare(strict_types=1);

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;

class RecalculateUserStatsJob implements ShouldQueue, ShouldBeUnique
{
    public int $uniqueFor = 3600; // Okno unikalności: 1 godzina

    public function __construct(public readonly int $userId) {}

    public function uniqueId(): string
    {
        return (string) $this->userId; // Jeden job per user naraz
    }

    public function handle(): void
    {
        // Przelicz statystyki dla tego użytkownika
    }
}

Jeśli wysyłasz RecalculateUserStatsJob(42) gdy jeden już jest w kolejce dla usera 42, drugi dispatch jest po cichu odrzucany.

ShouldBeUniqueUntilProcessing - zwalnia blokadę unikalności zaraz gdy job startuje (nie kończy), pozwalając na ponowne wysłanie podczas gdy aktualny działa:

use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;

class RecalculateUserStatsJob implements ShouldQueue, ShouldBeUniqueUntilProcessing

ShouldBeEncrypted - szyfruje payload joba w kolejce:

// app/Jobs/SendPasswordResetJob.php

use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueue;

class SendPasswordResetJob implements ShouldQueue, ShouldBeEncrypted
{
    public function __construct(
        public readonly string $email,
        public readonly string $token, // Niewidoczny w plain text w Redis
    ) {}
}

Używaj tego dla każdego joba niosącego wrażliwe dane - tokeny, PII, referencje płatności - które inaczej byłyby czytelne w Redis lub tabeli kolejki.

💥 Strategie błędów - Retry, Backoff, Dead Letter Queue

Podstawowa konfiguracja retry:

// app/Jobs/ProcessWebhookJob.php

declare(strict_types=1);

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldQueue;

class ProcessWebhookJob implements ShouldQueue
{
    public int $tries = 5;           // Łączna liczba prób przed oznaczeniem jako failed
    public int $maxExceptions = 3;   // Oznacz jako failed po 3 nieobsłużonych wyjątkach

    // Tablicowy backoff - czekaj 60s, potem 180s, potem 600s między próbami
    public array $backoff = [60, 180, 600];

    public function handle(): void
    {
        // Przetwórz payload webhooka
    }

    public function failed(\Throwable $e): void
    {
        // Wywoływane gdy job wyczerpał wszystkie próby
        logger()->error("Webhook trwale nie powiódł się: {$e->getMessage()}", [
            'job' => $this::class,
        ]);

        WebhookLog::markFailed($this->webhookId, $e->getMessage());
    }
}

retryUntil() - okno ponowień oparte na czasie:

public function retryUntil(): \DateTime
{
    // Ponawiaj przez 24 godziny, potem poddaj się
    return now()->addHours(24);
}

To lepsze niż stała liczba $tries dla jobów zależnych od zewnętrznych serwisów - ponawiasz przez sensowne okno czasowe, a nie liczbę prób.

Custom Dead Letter Queue - zapisz wszystkie nieudane joby do dedykowanej tabeli:

// database/migrations/2026_04_01_create_failed_jobs_detail_table.php

Schema::create('failed_jobs_detail', function (Blueprint $table) {
    $table->id();
    $table->string('job_class');
    $table->json('payload');
    $table->text('exception');
    $table->timestamp('failed_at');
});
// app/Jobs/Concerns/LogsFailure.php

declare(strict_types=1);

namespace App\Jobs\Concerns;

use App\Models\FailedJobDetail;

trait LogsFailure
{
    public function failed(\Throwable $e): void
    {
        FailedJobDetail::create([
            'job_class' => static::class,
            'payload'   => json_encode($this),
            'exception' => $e->getMessage() . "\n" . $e->getTraceAsString(),
            'failed_at' => now(),
        ]);
    }
}

Dodaj trait do każdego joba wymagającego niestandardowego śledzenia błędów, a następnie zbuduj prosty dashboard na failed_jobs_detail.

🧪 Testowanie z Bus::fake() i Queue::fake()

Queue::fake() - asercje na wysłanych jobach:

// tests/Feature/OrderControllerTest.php

declare(strict_types=1);

use App\Jobs\ProcessOrderJob;
use App\Jobs\SendConfirmationEmailJob;
use Illuminate\Support\Facades\Queue;

it('wysyła job przetwarzania przy tworzeniu zamówienia', function () {
    Queue::fake();

    $this->postJson('/api/v1/orders', [...])
        ->assertCreated();

    Queue::assertPushed(ProcessOrderJob::class);
    Queue::assertNotPushed(SendConfirmationEmailJob::class);
});

it('wysyła na właściwą kolejkę', function () {
    Queue::fake();

    ProcessOrderJob::dispatch($order);

    Queue::assertPushedOn('orders', ProcessOrderJob::class);
});

Bus::fake() - asercje na łańcuchach i batchach:

// tests/Feature/ProcessOrderActionTest.php

declare(strict_types=1);

use App\Actions\ProcessOrderAction;
use App\Jobs\ChargePaymentJob;
use App\Jobs\GenerateInvoiceJob;
use Illuminate\Support\Facades\Bus;

it('wysyła właściwy łańcuch jobów dla zamówienia', function () {
    Bus::fake();

    app(ProcessOrderAction::class)->handle($order->id);

    Bus::assertChained([
        ChargePaymentJob::class,
        UpdateInventoryJob::class,
        GenerateInvoiceJob::class,
        SendConfirmationEmailJob::class,
    ]);
});

it('wysyła batch przy masowym imporcie', function () {
    Bus::fake();

    app(ImportProductsAction::class)->handle($chunks);

    Bus::assertBatched(function ($batch) use ($chunks) {
        return $batch->jobs->count() === count($chunks)
            && $batch->name === 'Product Import';
    });
});

Testuj hook failed() bezpośrednio:

it('loguje błąd gdy job webhooka wyczerpie retry', function () {
    $job       = new ProcessWebhookJob($webhookId);
    $exception = new \RuntimeException('Timeout zewnętrznego serwisu');

    $job->failed($exception);

    expect(WebhookLog::where('webhook_id', $webhookId)->first()->status)
        ->toBe('failed');
});

🚀 Kolejki priorytetowe w Redis z Supervisor

Workery Laravel mogą słuchać na wielu kolejkach z określonym priorytetem. Joby na kolejkach wyższego priorytetu są przetwarzane pierwsze.

Wysyłaj do konkretnej kolejki:

// Wysoki priorytet - przetwarzanie płatności
ProcessPaymentJob::dispatch($order)->onQueue('high');

// Normalny priorytet - powiadomienia
SendEmailJob::dispatch($user)->onQueue('default');

// Niski priorytet - analityka, raporty
UpdateAnalyticsJob::dispatch($event)->onQueue('low');

Konfiguracja Supervisor - procesy workerów z kolejnością priorytetów:

; /etc/supervisor/conf.d/laravel-worker.conf

[program:laravel-worker-high]
process_name=%(program_name)s_%(process_num)02d
command=php /var/www/home/artisan queue:work redis --queue=high,default,low --sleep=3 --tries=3
autostart=true
autorestart=true
numprocs=4
redirect_stderr=true
stdout_logfile=/var/log/laravel-worker-high.log

[program:laravel-worker-low]
process_name=%(program_name)s_%(process_num)02d
command=php /var/www/home/artisan queue:work redis --queue=low --sleep=3 --tries=3
autostart=true
autorestart=true
numprocs=1
redirect_stderr=true
stdout_logfile=/var/log/laravel-worker-low.log

Kolejność --queue=high,default,low oznacza, że worker sprawdza najpierw kolejkę high, potem default, potem low. Cztery workery obsługują joby wysokiego priorytetu; jeden obsługuje pracę w tle niskiego priorytetu.

Kluczowa zasada: osobne workery dla różnych priorytetów zapobiegają zagłodzeniu jobów wysokiego priorytetu. Nawet jeśli low ma zaległy backlog 10k jobów analitycznych, workery płatności nie są tym dotknięte.

✅ Podsumowanie

  • Używaj batching gdy musisz przetworzyć grupę jobów i reagować na ich zbiorowe zakończenie - importy, operacje bulk, równoległe pipeline'y
  • Używaj chaining gdy kroki muszą się wykonywać sekwencyjnie i każdy zależy od poprzedniego - przetwarzanie zamówień, wieloetapowe przepływy
  • Dodawaj job middleware (WithoutOverlapping, ThrottlesExceptions, RateLimited) żeby wyrażać reguły per job bez zaśmiecania handle()
  • Używaj ShouldBeUnique żeby zapobiec duplikatom jobów dla tej samej encji przy częstym dispatchowaniu
  • Używaj ShouldBeEncrypted dla każdego joba niosącego PII, tokeny lub wrażliwe referencje
  • Buduj Dead Letter Queue na hooku failed() żeby mieć kontekst przy trwałych błędach
  • Pisz testy z Bus::fake() do asercji łańcuchów i batchów - to najcięższe bugi do złapania bez nich
  • Używaj kolejek priorytetowych z osobnymi procesami Supervisora żeby krytyczne joby nigdy nie czekały za pracą w tle

Obserwuj mnie na LinkedIn po więcej tipów z Laravel! Chcesz deep-dive w Laravel Pulse do monitorowania zdrowia kolejek na produkcji? Daj znać w komentarzach!

Komentarze (0)
Zostaw komentarz

© 2026 Wszelkie prawa zastrzeżone.