SNS + SQS - Bezbolesna integracja z Laravel

SNS + SQS - Bezbolesna integracja z Laravel

Integracja SNS + SQS to prosty sposób na luźne powiązanie usług i skalowalny event driven flow. Poniżej pokazuję kompletny proces - od konfiguracji AWS, przez ustawienia Laravela, po wzorce niezawodności i testy lokalne.

Dlaczego SNS + SQS

  • SNS publikuje zdarzenia do wielu subskrybentów
  • SQS buforuje wiadomości i zapewnia niezawodną dostawę do workerów
  • Połączenie obu usług pozwala publikować jeden event i odbierać go w wielu systemach niezależnie

Architektura w skrócie

  • Producent publikuje zdarzenie do tematu SNS
  • Temat SNS ma subskrypcję SQS
  • Laravel ma worker queue:work podłączony do kolejki SQS
  • Worker odbiera powiadomienia SNS dostarczone przez SQS i uruchamia lokalne handlery

Krok 1 - konfiguracja zasobów w AWS

  1. Utwórz temat SNS
    • Nazwa przykładowa: orders-events
  2. Utwórz kolejkę SQS
    • Nazwa przykładowa: orders-events-queue
    • Warto od razu dodać DLQ, np. orders-events-dlq, z policy redrive
  3. Podepnij subskrypcję SQS do SNS
    • Protokół: sqs
    • Endpoint: ARN kolejki orders-events-queue
  4. Nadaj uprawnienia aby SNS mógł wysyłać do SQS
    • W policy kolejki SQS dodaj zezwolenie na sqs:SendMessage dla ARN tematu SNS
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "AllowSnsToSendToSqs",
      "Effect": "Allow",
      "Principal": {"Service": "sns.amazonaws.com"},
      "Action": "sqs:SendMessage",
      "Resource": "arn:aws:sqs:eu-central-1:123456789012:orders-events-queue",
      "Condition": {
        "ArnEquals": {
          "aws:SourceArn": "arn:aws:sns:eu-central-1:123456789012:orders-events"
        }
      }
    }
  ]

Krok 2 - konfiguracja Laravela

.env

QUEUE_CONNECTION=sqs

AWS_ACCESS_KEY_ID=xxx
AWS_SECRET_ACCESS_KEY=xxx
AWS_DEFAULT_REGION=eu-central-1
AWS_ACCOUNT_ID=123456789012

SQS_PREFIX=https://sqs.${AWS_DEFAULT_REGION}.amazonaws.com/${AWS_ACCOUNT_ID}
SQS_QUEUE=orders-events-queue
SQS_SUFFIX=

config/queue.php - połączenie SQS

Laravel ma wbudowany driver sqs. Wystarczy dopisać połączenie:

'connections' => [
    'sqs' => [
        'driver' => 'sqs',
        'key' => env('AWS_ACCESS_KEY_ID'),
        'secret' => env('AWS_SECRET_ACCESS_KEY'),
        'prefix' => env('SQS_PREFIX'),
        'queue' => env('SQS_QUEUE'),
        'suffix' => env('SQS_SUFFIX'),
        'region' => env('AWS_DEFAULT_REGION'),
        'after_commit' => false,
    ],
],

Uruchomienie workera

php artisan queue:work sqs --sleep=1 --tries=3 --timeout=60

Krok 3 - jak wygląda wiadomość z SNS w SQS

SNS opakowuje oryginalny payload w strukturę Notification. W SQS zobaczysz Message jako tekst JSON, który trzeba zdekodować. Najczęściej masz dwa poziomy JSON.

Przykładowy job który to obsłuży:

<?php

declare(strict_types=1);

namespace App\Queue\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

final class ProcessSnsNotification implements ShouldQueue
{
    use InteractsWithQueue, Queueable, SerializesModels;

    /** @var array<string, mixed> */
    private array $envelope;

    /** @param array<string, mixed> $envelope */
    public function __construct(array $envelope)
    {
        $this->envelope = $envelope;
    }

    public function handle(): void
    {
        // Poziom 1 - opakowanie SNS
        $outer = $this->envelope;

        // Główny payload zwykle w polu Message jako string JSON
        $innerRaw = $outer['Message'] ?? '{}';
        $inner = json_decode($innerRaw, true);

        if (!is_array($inner)) {
            // log i koniec - zły format
            return;
        }

        // Przykład - event type oraz dane
        $type = $inner['type'] ?? $inner['event_type'] ?? null;
        $data = $inner['data'] ?? [];

        if ($type === 'order.created') {
            // wywołanie lokalnego serwisu domenowego
            app(\App\Domain\Orders\OrderEvents::class)->orderCreated($data);
            return;
        }

        if ($type === 'order.cancelled') {
            app(\App\Domain\Orders\OrderEvents::class)->orderCancelled($data);
            return;
        }

        // brak dopasowania - opcjonalny log
    }
}

Aby Laravel wysłał takiego joba dla każdej wiadomości z SQS, możesz skorzystać z mechanizmu custom payload mappera albo prostego Job pipeline. Najprościej - pozwól workerowi odpalać Illuminate\Queue\Jobs\SqsJob i wewnątrz własnego listenera zdekoduj Body i ręcznie zdispatchuj ProcessSnsNotification. Praktyczniej - publikować do SQS już gotowe payloady w formacie typowego job Laravela. W wielu integracjach jednak odbiera się powiadomienia SNS przez SQS i samodzielnie dekoduje Message jak wyżej.

Wariant z Listenerem kolejki:

<?php

namespace App\Providers;

use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;

final class QueueTapServiceProvider extends ServiceProvider
{
    public function boot(): void
    {
        Queue::before(function (JobProcessing $event) {
            // opcjonalny log
        });

        Queue::after(function (JobProcessed $event) {
            // opcjonalny log
        });
    }
}

W praktyce wygodniej jest wystawić pojedynczy Job który przyjmuje całe Body z SQS i w handle wykonuje dekodowanie jak w przykładzie.


Krok 4 - publikacja zdarzeń do SNS

Po stronie producenta możesz użyć klienta AWS SDK dla PHP. W Laravela wstrzyknij Aws\Sns\SnsClient.

<?php

declare(strict_types=1);

namespace App\Infra\Events;

use Aws\Sns\SnsClient;

final class SnsPublisher
{
    public function __construct(private SnsClient $sns) {}

    /** @param array<string, mixed> $payload */
    public function publish(string $topicArn, string $type, array $payload): void
    {
        $message = json_encode([
            'type' => $type,
            'data' => $payload,
        ]);

        $this->sns->publish([
            'TopicArn' => $topicArn,
            'Message'  => $message,
            // Można dodać MessageAttributes jeśli chcesz filtrować subskrypcje
        ]);
    }
}

Rejestracja klienta AWS:

<?php

use Aws\Sns\SnsClient;

app()->singleton(SnsClient::class, function () {
    return new SnsClient([
        'version' => '2010-03-31',
        'region'  => config('queue.connections.sqs.region'),
        'credentials' => [
            'key'    => config('queue.connections.sqs.key'),
            'secret' => config('queue.connections.sqs.secret'),
        ],
    ]);
});

Krok 5 - niezawodność

  • DLQ - dodaj kolejkę martwych listów oraz policy redrive
  • Idempotencja - zapisuj klucz deduplikacji na poziomie handera, np. event_id w tabeli
  • Retry - konfiguruj --tries workera i sensowne timeouty
  • Backoff - jeżeli handlery wykonują zewnętrzne wywołania, dodaj backoff na wyjątki sieciowe
  • Observability - logi strukturalne + metryki liczby wiadomości, czasu przetwarzania i błędów

Krok 6 - testy lokalne z LocalStack

docker-compose.yml:

version: "3.9"
services:
  localstack:
    image: localstack/localstack:3
    environment:
      - SERVICES=sns,sqs
      - DEFAULT_REGION=eu-central-1
    ports:
      - "4566:4566"
    volumes:
      - "./.localstack:/var/lib/localstack"

Konfiguracja Laravela pod LocalStack:

AWS_ACCESS_KEY_ID=test
AWS_SECRET_ACCESS_KEY=test
AWS_DEFAULT_REGION=eu-central-1

SQS_PREFIX=http://localhost:4566/000000000000
SQS_QUEUE=orders-events-queue

Inicjalizacja zasobów:

awslocal sns create-topic --name orders-events
awslocal sqs create-queue --queue-name orders-events-queue
awslocal sns subscribe \
  --topic-arn arn:aws:sns:eu-central-1:000000000000:orders-events \
  --protocol sqs \
  --notification-endpoint arn:aws:sqs:eu-central-1:000000000000:orders-events-queue

# Policy aby SNS mógł wysyłać do SQS
awslocal sqs set-queue-attributes \
  --queue-url http://localhost:4566/000000000000/orders-events-queue \
  --attributes "Policy=$(cat queue-policy.json)"

Test publikacji:

awslocal sns publish \
  --topic-arn arn:aws:sns:eu-central-1:000000000000:orders-events \
  --message '{"type":"order.created","data":{"order_id":"o-123"}}'

Uruchom worker i obserwuj logi.

Wzorce filtrowania po atrybutach

SNS pozwala filtrować subskrypcje po MessageAttributes. Przykład publikacji:

$this->sns->publish([
    'TopicArn' => $topicArn,
    'Message'  => json_encode(['type' => 'order.created', 'data' => $data]),
    'MessageAttributes' => [
        'service' => [
            'DataType' => 'String',
            'StringValue' => 'orders',
        ],
        'tenant' => [
            'DataType' => 'String',
            'StringValue' => $tenantId,
        ],
    ],
]);

W subskrypcji definiujesz policy filtra, dzięki czemu kolejka otrzymuje tylko wybrane zdarzenia.


Najczęstsze problemy i szybkie diagnozy

  • Worker nic nie odbiera - sprawdź policy SQS oraz subskrypcję SNS do kolejki
  • Błędy deserializacji - pamiętaj że Message to string JSON, potrzebne jest podwójne json_decode
  • Duplikaty - wprowadź idempotencję na poziomie handera
  • Brak uprawnień - zweryfikuj IAM i regiony w .env

Podsumowanie

Połączenie SNS + SQS w Laravelu pozwala zbudować skalowalny i odporny na błędy przepływ zdarzeń. Najważniejsze elementy to poprawna konfiguracja uprawnień między SNS i SQS, właściwe dekodowanie powiadomień po stronie workera oraz spójny standard odpowiedzi JSON API po stronie endpointów. Z takim szkieletem łatwo rozwijać kolejne typy zdarzeń, dodawać filtry subskrypcji i monitorować przepływ wiadomości w środowiskach produkcyjnych i lokalnych.