Skip to main content

Channel

Channel là gì?

Channel là ống dẫn để các coroutine giao tiếp với nhau. Một coroutine đẩy dữ liệu vào channel, coroutine khác lấy dữ liệu ra.

Hãy hình dung như băng chuyền ở nhà bếp: đầu bếp A đặt món lên băng chuyền, đầu bếp B lấy món đi phục vụ. Hai người làm việc độc lập nhưng phối hợp qua băng chuyền.

Coroutine Producer                    Coroutine Consumer
│ │
│ push("job1") │
│──────────────> [ Channel ] ──────────>│ pop() = "job1"
│ push("job2") │
│──────────────> [ "job2" ] ───────────>│ pop() = "job2"
│ │

Tạo Channel

use Swoole\Coroutine\Channel;

// Channel có buffer (chứa tối đa 5 phần tử)
$channel = new Channel(5);

// Channel không có buffer (capacity = 1, gần như unbuffered)
$channel = new Channel(1);

Capacity là số phần tử channel có thể chứa trước khi producer bị block:

CapacityHành vi
1Producer bị block ngay nếu consumer chưa lấy
> 1Producer có thể push tiếp tục đến khi đầy buffer
0Không hợp lệ trong Swoole

Push và Pop

$channel = new Channel(3);

// Push - đẩy dữ liệu vào
$channel->push("hello");
$channel->push(["type" => "order", "id" => 42]);
$channel->push(null); // có thể push null

// Pop - lấy dữ liệu ra
$data = $channel->pop(); // block cho đến khi có dữ liệu
$data = $channel->pop(2.0); // block tối đa 2 giây, trả false nếu timeout
Blocking là gì?

Khi channel đầy, push() sẽ yield coroutine hiện tại (không block thread!), tức là Swoole sẽ chạy coroutine khác cho đến khi channel có chỗ trống.

Tương tự, khi channel rỗng, pop() sẽ yield cho đến khi có dữ liệu.

Đây là điểm mạnh của coroutine: blocking trông có vẻ đơn giản nhưng thực ra là non-blocking.

Ví dụ: Producer - Consumer

Pattern phổ biến nhất: một coroutine tạo công việc, coroutine khác xử lý.

$channel = new Channel(10);

// Producer: tạo công việc
Coroutine::create(function () use ($channel) {
$orderIds = [101, 102, 103, 104, 105];

foreach ($orderIds as $id) {
$channel->push($id);
echo "Đẩy vào: order #$id\n";
}

$channel->close(); // báo hiệu không còn công việc nữa
});

// Consumer: xử lý công việc
Coroutine::create(function () use ($channel) {
while (true) {
$orderId = $channel->pop(1.0);

if ($orderId === false) {
// false = timeout hoặc channel đã đóng
if ($channel->isClosing()) break;
continue;
}

echo "Đang xử lý order #$orderId\n";
// ... gửi email, cập nhật DB, v.v.
}

echo "Consumer hoàn thành\n";
});

Ví dụ: Fan-out (nhiều consumer)

Chia công việc cho nhiều worker xử lý song song:

$channel = new Channel(50);
$workerCount = 4;

// 1 producer
Coroutine::create(function () use ($channel) {
for ($i = 1; $i <= 20; $i++) {
$channel->push(['task_id' => $i, 'data' => "payload_$i"]);
}
$channel->close();
});

// 4 consumer chạy song song
for ($w = 1; $w <= $workerCount; $w++) {
Coroutine::create(function () use ($channel, $w) {
while (!$channel->isClosing()) {
$task = $channel->pop(0.5);
if ($task === false) break;

// Mỗi worker xử lý task của mình
processTask($task);
echo "Worker $w xử lý task #{$task['task_id']}\n";
}
});
}
Producer: ──> [channel: task1, task2, ... task20]
↓ ↓ ↓ ↓
Worker 1 Worker 2 Worker 3 Worker 4

Ví dụ: Collect kết quả từ nhiều coroutine

Dùng channel như một nơi tổng hợp kết quả từ nhiều coroutine chạy song song:

class DashboardController
{
#[Get('/dashboard')]
public function index(): array
{
$ch = new Channel(3); // chứa đủ 3 kết quả

// Chạy 3 queries song song
Coroutine::create(fn() => $ch->push(['key' => 'revenue', 'value' => DB::table('orders')->sum('total')]));
Coroutine::create(fn() => $ch->push(['key' => 'users', 'value' => DB::table('users')->count()]));
Coroutine::create(fn() => $ch->push(['key' => 'views', 'value' => Cache::get('page_views', 0)]));

// Thu thập 3 kết quả
$data = [];
for ($i = 0; $i < 3; $i++) {
$result = $ch->pop(5.0); // timeout 5s
if ($result !== false) {
$data[$result['key']] = $result['value'];
}
}

return $data;
// Tổng thời gian = max(t_revenue, t_users, t_views)
}
}

Đóng Channel

$channel->close(); // đóng channel

// Kiểm tra trạng thái
$channel->isClosing(); // true nếu channel đang đóng
$channel->isEmpty(); // true nếu không còn dữ liệu
$channel->isFull(); // true nếu buffer đầy
$channel->length(); // số phần tử hiện tại trong buffer
$channel->capacity(); // dung lượng tối đa của buffer

Khi channel đóng:

  • pop() vẫn có thể lấy các phần tử còn lại trong buffer
  • Khi buffer rỗng, pop() trả về false
  • push() trả về false

Select - Chờ nhiều channel

Giống như select trong Go: chờ channel nào có dữ liệu trước thì xử lý:

// Swoole không có cú pháp select như Go, nhưng bạn có thể dùng
// nhiều coroutine + một result channel để đạt hiệu quả tương tự

$result = new Channel(1);

Coroutine::create(fn() => $result->push(['from' => 'db', 'data' => DB::...->get()]));
Coroutine::create(fn() => $result->push(['from' => 'cache', 'data' => Cache::get('key')]));

$first = $result->pop(3.0); // lấy kết quả từ cái nào xong trước

Ứng dụng thực tế trong framework

Rate limiting tự chế

// Tạo semaphore giới hạn số request đồng thời đến API bên ngoài
$semaphore = new Channel(5); // tối đa 5 request cùng lúc

// Pre-fill semaphore
for ($i = 0; $i < 5; $i++) {
$semaphore->push(1);
}

function callExternalApi(string $url, Channel $semaphore): mixed
{
$semaphore->pop(); // lấy "slot" - block nếu đã có 5 request đang chạy
try {
return Http::get($url);
} finally {
$semaphore->push(1); // trả "slot" lại
}
}

Task queue đơn giản

class TaskWorker
{
private Channel $queue;

public function boot(): void
{
$this->queue = new Channel(100);

// Khởi động worker trong background
Coroutine::create(function () {
while (true) {
$task = $this->queue->pop(); // chờ mãi

try {
$this->handle($task);
} catch (\Throwable $e) {
Log::error('Task failed: ' . $e->getMessage());
}
}
});
}

public function dispatch(array $task): void
{
$this->queue->push($task);
}
}

Tóm tắt

Tình huốngDùng Channel
Truyền dữ liệu giữa coroutine
Giới hạn số task chạy đồng thời (semaphore)
Thu thập kết quả từ nhiều coroutine
Giao tiếp giữa các request khác nhau❌ (dùng Redis pub/sub)
Lưu trữ lâu dài❌ (dùng database)