Nhảy tới nội dung

Messenger (Message Queue)

Messenger component cho phép xử lý tasks bất đồng bộ thông qua message queues, hỗ trợ RabbitMQ (AMQP) và Database.

Cấu hình

// config/messenger.php
return [
'default' => env('MESSENGER_CONNECTION', 'database'),

'connections' => [
'rabbitmq' => [
'driver' => 'amqp',
'host' => env('RABBITMQ_HOST', '127.0.0.1'),
'port' => env('RABBITMQ_PORT', 5672),
'user' => env('RABBITMQ_USER', 'guest'),
'password' => env('RABBITMQ_PASSWORD', 'guest'),
'vhost' => env('RABBITMQ_VHOST', '/'),
'queue' => env('RABBITMQ_QUEUE', 'default'),
'exchange' => env('RABBITMQ_EXCHANGE', ''),
'exchange_type' => 'direct',
],

'database' => [
'driver' => 'database',
'connection' => null, // null = default database connection
'table' => 'messenger_messages',
'queue' => 'default',
],
],

'serializer' => \Vietiso\Core\Messenger\Serialization\JsonSerializer::class,
];

Tạo Message

Message là một class chứa dữ liệu cần xử lý:

<?php

namespace App\Messages;

class SendEmailMessage
{
public function __construct(
public readonly string $to,
public readonly string $subject,
public readonly string $body,
) {}
}
<?php

namespace App\Messages;

class ProcessOrderMessage
{
public function __construct(
public readonly int $orderId,
public readonly array $items,
) {}
}

Dispatch Message

use Vietiso\Core\Messenger\Facade\Bus;
use App\Messages\SendEmailMessage;

// Dispatch message
Bus::dispatch(new SendEmailMessage(
to: 'user@example.com',
subject: 'Welcome!',
body: 'Thank you for registering.'
));

Dispatch với Options

// Delay - xử lý sau một khoảng thời gian
Bus::dispatch(new SendEmailMessage(...))
->delay(60); // 60 seconds

Bus::dispatch(new SendEmailMessage(...))
->delay(new DateInterval('PT5M')); // 5 minutes

// Chọn connection
Bus::dispatch(new SendEmailMessage(...))
->onConnection('rabbitmq');

// Chọn queue
Bus::dispatch(new SendEmailMessage(...))
->onQueue('emails');

// Kết hợp
Bus::dispatch(new SendEmailMessage(...))
->onConnection('rabbitmq')
->onQueue('high-priority')
->delay(30);

Tạo Handler

Handler xử lý message khi được consume:

<?php

namespace App\Handlers;

use App\Messages\SendEmailMessage;
use Vietiso\Core\Mail\Facade\Mail;

class SendEmailHandler
{
public function __invoke(SendEmailMessage $message): void
{
Mail::to($message->to)
->send(new GenericMail($message->subject, $message->body));
}
}
<?php

namespace App\Handlers;

use App\Messages\ProcessOrderMessage;

class ProcessOrderHandler
{
public function __construct(
protected OrderService $orderService,
protected InventoryService $inventory,
) {}

public function __invoke(ProcessOrderMessage $message): void
{
$order = Order::findOrFail($message->orderId);

// Process order
$this->orderService->process($order);

// Update inventory
foreach ($message->items as $item) {
$this->inventory->decrement($item['product_id'], $item['quantity']);
}
}
}

Đăng ký Handlers

// config/messenger.php
return [
// ...

'handlers' => [
SendEmailMessage::class => SendEmailHandler::class,
ProcessOrderMessage::class => ProcessOrderHandler::class,
],
];

Consume Messages

CLI Command

# Consume từ default connection
php vietiso messenger:consume

# Consume từ connection cụ thể
php vietiso messenger:consume rabbitmq

# Consume từ queue cụ thể
php vietiso messenger:consume --queue=emails

# Limit số messages
php vietiso messenger:consume --limit=100

# Timeout
php vietiso messenger:consume --timeout=60

Trong Code

use Vietiso\Core\Messenger\Worker;

$worker = new Worker($transport, $handlers);
$worker->run([
'limit' => 100,
'timeout' => 60,
]);

Transports

RabbitMQ (AMQP)

'rabbitmq' => [
'driver' => 'amqp',
'host' => env('RABBITMQ_HOST', '127.0.0.1'),
'port' => env('RABBITMQ_PORT', 5672),
'user' => env('RABBITMQ_USER', 'guest'),
'password' => env('RABBITMQ_PASSWORD', 'guest'),
'vhost' => '/',
'queue' => 'default',
'exchange' => '',
'exchange_type' => 'direct',
],

Database

'database' => [
'driver' => 'database',
'connection' => null,
'table' => 'messenger_messages',
'queue' => 'default',
],

Database transport tự động tạo bảng nếu chưa tồn tại.

Ví dụ thực tế

Email Queue

// Message
class SendWelcomeEmailMessage
{
public function __construct(
public readonly int $userId,
) {}
}

// Handler
class SendWelcomeEmailHandler
{
public function __invoke(SendWelcomeEmailMessage $message): void
{
$user = User::find($message->userId);
Mail::to($user->email)->send(new WelcomeMail($user));
}
}

// Dispatch khi user đăng ký
class UserController
{
public function register(RegisterDTO $dto)
{
$user = User::create($dto->toArray());

// Queue welcome email
Bus::dispatch(new SendWelcomeEmailMessage($user->id));

return $user;
}
}

Order Processing

// Khi order được tạo
class OrderController
{
public function store(CreateOrderDTO $dto)
{
$order = Order::create($dto->toArray());

// Queue xử lý order
Bus::dispatch(new ProcessOrderMessage(
orderId: $order->id,
items: $dto->items,
));

// Queue gửi email xác nhận
Bus::dispatch(new SendOrderConfirmationMessage($order->id))
->delay(5); // Delay 5s để đảm bảo order đã được lưu

return $order;
}
}

Scheduled Tasks

// Gửi reminder sau 24h
Bus::dispatch(new SendReminderMessage($user->id))
->delay(new DateInterval('P1D'));

// Retry failed job sau 5 phút
Bus::dispatch(new RetryJobMessage($jobId))
->delay(300);

Best Practices

  1. Idempotent Handlers: Handler nên có thể chạy nhiều lần mà không gây side effects
  2. Small Messages: Chỉ đưa data cần thiết vào message (IDs thay vì full objects)
  3. Error Handling: Xử lý exceptions trong handlers
  4. Monitoring: Monitor queue size và processing time
  5. Separate Queues: Dùng queues khác nhau cho các loại tasks khác nhau