SNS + SQS - Bezbolesna integracja z Laravel
Integracja SNS + SQS to prosty sposób na luźne powiązanie usług i skalowalny event driven flow. Poniżej pokazuję kompletny proces - od konfiguracji AWS, przez ustawienia Laravela, po wzorce niezawodności i testy lokalne.
Dlaczego SNS + SQS
- SNS publikuje zdarzenia do wielu subskrybentów
- SQS buforuje wiadomości i zapewnia niezawodną dostawę do workerów
- Połączenie obu usług pozwala publikować jeden event i odbierać go w wielu systemach niezależnie
Architektura w skrócie
- Producent publikuje zdarzenie do tematu SNS
- Temat SNS ma subskrypcję SQS
- Laravel ma worker
queue:workpodłączony do kolejki SQS - Worker odbiera powiadomienia SNS dostarczone przez SQS i uruchamia lokalne handlery
Krok 1 - konfiguracja zasobów w AWS
- Utwórz temat SNS
- Nazwa przykładowa:
orders-events
- Nazwa przykładowa:
- Utwórz kolejkę SQS
- Nazwa przykładowa:
orders-events-queue - Warto od razu dodać DLQ, np.
orders-events-dlq, z policy redrive
- Nazwa przykładowa:
- Podepnij subskrypcję SQS do SNS
- Protokół:
sqs - Endpoint: ARN kolejki
orders-events-queue
- Protokół:
- Nadaj uprawnienia aby SNS mógł wysyłać do SQS
- W policy kolejki SQS dodaj zezwolenie na
sqs:SendMessagedla ARN tematu SNS
- W policy kolejki SQS dodaj zezwolenie na
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowSnsToSendToSqs",
"Effect": "Allow",
"Principal": {"Service": "sns.amazonaws.com"},
"Action": "sqs:SendMessage",
"Resource": "arn:aws:sqs:eu-central-1:123456789012:orders-events-queue",
"Condition": {
"ArnEquals": {
"aws:SourceArn": "arn:aws:sns:eu-central-1:123456789012:orders-events"
}
}
}
]Krok 2 - konfiguracja Laravela
.env
QUEUE_CONNECTION=sqs
AWS_ACCESS_KEY_ID=xxx
AWS_SECRET_ACCESS_KEY=xxx
AWS_DEFAULT_REGION=eu-central-1
AWS_ACCOUNT_ID=123456789012
SQS_PREFIX=https://sqs.${AWS_DEFAULT_REGION}.amazonaws.com/${AWS_ACCOUNT_ID}
SQS_QUEUE=orders-events-queue
SQS_SUFFIX=
config/queue.php - połączenie SQS
Laravel ma wbudowany driver sqs. Wystarczy dopisać połączenie:
'connections' => [
'sqs' => [
'driver' => 'sqs',
'key' => env('AWS_ACCESS_KEY_ID'),
'secret' => env('AWS_SECRET_ACCESS_KEY'),
'prefix' => env('SQS_PREFIX'),
'queue' => env('SQS_QUEUE'),
'suffix' => env('SQS_SUFFIX'),
'region' => env('AWS_DEFAULT_REGION'),
'after_commit' => false,
],
],
Uruchomienie workera
php artisan queue:work sqs --sleep=1 --tries=3 --timeout=60
Krok 3 - jak wygląda wiadomość z SNS w SQS
SNS opakowuje oryginalny payload w strukturę Notification. W SQS zobaczysz Message jako tekst JSON, który trzeba zdekodować. Najczęściej masz dwa poziomy JSON.
Przykładowy job który to obsłuży:
<?php
declare(strict_types=1);
namespace App\Queue\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
final class ProcessSnsNotification implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
/** @var array<string, mixed> */
private array $envelope;
/** @param array<string, mixed> $envelope */
public function __construct(array $envelope)
{
$this->envelope = $envelope;
}
public function handle(): void
{
// Poziom 1 - opakowanie SNS
$outer = $this->envelope;
// Główny payload zwykle w polu Message jako string JSON
$innerRaw = $outer['Message'] ?? '{}';
$inner = json_decode($innerRaw, true);
if (!is_array($inner)) {
// log i koniec - zły format
return;
}
// Przykład - event type oraz dane
$type = $inner['type'] ?? $inner['event_type'] ?? null;
$data = $inner['data'] ?? [];
if ($type === 'order.created') {
// wywołanie lokalnego serwisu domenowego
app(\App\Domain\Orders\OrderEvents::class)->orderCreated($data);
return;
}
if ($type === 'order.cancelled') {
app(\App\Domain\Orders\OrderEvents::class)->orderCancelled($data);
return;
}
// brak dopasowania - opcjonalny log
}
}
Aby Laravel wysłał takiego joba dla każdej wiadomości z SQS, możesz skorzystać z mechanizmu custom payload mappera albo prostego Job pipeline. Najprościej - pozwól workerowi odpalać Illuminate\Queue\Jobs\SqsJob i wewnątrz własnego listenera zdekoduj Body i ręcznie zdispatchuj ProcessSnsNotification. Praktyczniej - publikować do SQS już gotowe payloady w formacie typowego job Laravela. W wielu integracjach jednak odbiera się powiadomienia SNS przez SQS i samodzielnie dekoduje Message jak wyżej.
Wariant z Listenerem kolejki:
<?php
namespace App\Providers;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
final class QueueTapServiceProvider extends ServiceProvider
{
public function boot(): void
{
Queue::before(function (JobProcessing $event) {
// opcjonalny log
});
Queue::after(function (JobProcessed $event) {
// opcjonalny log
});
}
}
W praktyce wygodniej jest wystawić pojedynczy Job który przyjmuje całe Body z SQS i w handle wykonuje dekodowanie jak w przykładzie.
Krok 4 - publikacja zdarzeń do SNS
Po stronie producenta możesz użyć klienta AWS SDK dla PHP. W Laravela wstrzyknij Aws\Sns\SnsClient.
<?php
declare(strict_types=1);
namespace App\Infra\Events;
use Aws\Sns\SnsClient;
final class SnsPublisher
{
public function __construct(private SnsClient $sns) {}
/** @param array<string, mixed> $payload */
public function publish(string $topicArn, string $type, array $payload): void
{
$message = json_encode([
'type' => $type,
'data' => $payload,
]);
$this->sns->publish([
'TopicArn' => $topicArn,
'Message' => $message,
// Można dodać MessageAttributes jeśli chcesz filtrować subskrypcje
]);
}
}
Rejestracja klienta AWS:
<?php
use Aws\Sns\SnsClient;
app()->singleton(SnsClient::class, function () {
return new SnsClient([
'version' => '2010-03-31',
'region' => config('queue.connections.sqs.region'),
'credentials' => [
'key' => config('queue.connections.sqs.key'),
'secret' => config('queue.connections.sqs.secret'),
],
]);
});
Krok 5 - niezawodność
- DLQ - dodaj kolejkę martwych listów oraz policy redrive
- Idempotencja - zapisuj klucz deduplikacji na poziomie handera, np.
event_idw tabeli - Retry - konfiguruj
--triesworkera i sensowne timeouty - Backoff - jeżeli handlery wykonują zewnętrzne wywołania, dodaj backoff na wyjątki sieciowe
- Observability - logi strukturalne + metryki liczby wiadomości, czasu przetwarzania i błędów
Krok 6 - testy lokalne z LocalStack
docker-compose.yml:
version: "3.9"
services:
localstack:
image: localstack/localstack:3
environment:
- SERVICES=sns,sqs
- DEFAULT_REGION=eu-central-1
ports:
- "4566:4566"
volumes:
- "./.localstack:/var/lib/localstack"
Konfiguracja Laravela pod LocalStack:
AWS_ACCESS_KEY_ID=test
AWS_SECRET_ACCESS_KEY=test
AWS_DEFAULT_REGION=eu-central-1
SQS_PREFIX=http://localhost:4566/000000000000
SQS_QUEUE=orders-events-queue
Inicjalizacja zasobów:
awslocal sns create-topic --name orders-events
awslocal sqs create-queue --queue-name orders-events-queue
awslocal sns subscribe \
--topic-arn arn:aws:sns:eu-central-1:000000000000:orders-events \
--protocol sqs \
--notification-endpoint arn:aws:sqs:eu-central-1:000000000000:orders-events-queue
# Policy aby SNS mógł wysyłać do SQS
awslocal sqs set-queue-attributes \
--queue-url http://localhost:4566/000000000000/orders-events-queue \
--attributes "Policy=$(cat queue-policy.json)"
Test publikacji:
awslocal sns publish \
--topic-arn arn:aws:sns:eu-central-1:000000000000:orders-events \
--message '{"type":"order.created","data":{"order_id":"o-123"}}'
Uruchom worker i obserwuj logi.
Wzorce filtrowania po atrybutach
SNS pozwala filtrować subskrypcje po MessageAttributes. Przykład publikacji:
$this->sns->publish([
'TopicArn' => $topicArn,
'Message' => json_encode(['type' => 'order.created', 'data' => $data]),
'MessageAttributes' => [
'service' => [
'DataType' => 'String',
'StringValue' => 'orders',
],
'tenant' => [
'DataType' => 'String',
'StringValue' => $tenantId,
],
],
]);
W subskrypcji definiujesz policy filtra, dzięki czemu kolejka otrzymuje tylko wybrane zdarzenia.
Najczęstsze problemy i szybkie diagnozy
- Worker nic nie odbiera - sprawdź policy SQS oraz subskrypcję SNS do kolejki
- Błędy deserializacji - pamiętaj że
Messageto string JSON, potrzebne jest podwójnejson_decode - Duplikaty - wprowadź idempotencję na poziomie handera
- Brak uprawnień - zweryfikuj IAM i regiony w
.env
Podsumowanie
Połączenie SNS + SQS w Laravelu pozwala zbudować skalowalny i odporny na błędy przepływ zdarzeń. Najważniejsze elementy to poprawna konfiguracja uprawnień między SNS i SQS, właściwe dekodowanie powiadomień po stronie workera oraz spójny standard odpowiedzi JSON API po stronie endpointów. Z takim szkieletem łatwo rozwijać kolejne typy zdarzeń, dodawać filtry subskrypcji i monitorować przepływ wiadomości w środowiskach produkcyjnych i lokalnych.