Nhảy tới nội dung

Lock & Atomic

Vì sao cần Lock?

Trong môi trường coroutine, nhiều coroutine chạy đồng thời và có thể cùng truy cập một tài nguyên chia sẻ. Nếu không có cơ chế bảo vệ, dữ liệu có thể bị sai.

Ví dụ vấn đề race condition:

$counter = 0; // biến chia sẻ

// Coroutine A và B cùng chạy
Coroutine::create(function () use (&$counter) {
$temp = $counter; // đọc: 0
Coroutine::sleep(0.001); // yield - B chạy vào
$counter = $temp + 1; // ghi: 1 (nhưng B cũng đã ghi 1 rồi!)
});

Coroutine::create(function () use (&$counter) {
$temp = $counter; // đọc: 0 (A chưa ghi lại!)
$counter = $temp + 1; // ghi: 1
});

// Kết quả: $counter = 1 thay vì 2 ❌

Lock đảm bảo chỉ một coroutine được thao tác với tài nguyên tại một thời điểm.

Mutex (Coroutine Lock)

Coroutine\Lock là mutex dành riêng cho môi trường coroutine. Khi một coroutine đang giữ lock, coroutine khác muốn lấy lock sẽ yield (không block thread) cho đến khi lock được giải phóng.

use Swoole\Coroutine\Lock;

$lock = new Lock();

Coroutine::create(function () use ($lock) {
$lock->lock(); // lấy lock (yield nếu đang bị giữ)
try {
// Vùng an toàn - chỉ 1 coroutine vào đây tại một thời điểm
$data = readSharedData();
processData($data);
writeSharedData($data);
} finally {
$lock->unlock(); // LUÔN giải phóng trong finally
}
});
Luôn dùng finally

Nếu có exception xảy ra trước khi unlock(), lock sẽ không bao giờ được giải phóng và các coroutine khác sẽ chờ mãi mãi (deadlock). Luôn đặt unlock() trong finally.

Trylock - Không block nếu lock đang bận

$lock = new Lock();

Coroutine::create(function () use ($lock) {
if ($lock->trylock()) {
// lấy được lock ngay lập tức
try {
doWork();
} finally {
$lock->unlock();
}
} else {
// Không lấy được → xử lý khác (không chờ)
return cachedResult();
}
});

Timeout khi lấy lock

$lock = new Lock();

Coroutine::create(function () use ($lock) {
// Chờ tối đa 2 giây để lấy lock
if ($lock->lock(2.0)) {
try {
doWork();
} finally {
$lock->unlock();
}
} else {
// Hết timeout vẫn không lấy được lock
throw new \RuntimeException('Could not acquire lock after 2 seconds');
}
});

Atomic - Thao tác nguyên tử

Swoole\Atomic cung cấp counter thread-safe mà không cần lock. Dùng cho các thao tác tăng/giảm số đơn giản - nhanh hơn lock nhiều.

use Swoole\Atomic;

$counter = new Atomic(0); // giá trị ban đầu = 0

// An toàn khi nhiều coroutine cùng gọi đồng thời
Coroutine::create(fn() => $counter->add(1)); // +1
Coroutine::create(fn() => $counter->add(1)); // +1
Coroutine::create(fn() => $counter->sub(1)); // -1

echo $counter->get(); // 1 (luôn đúng)

Các phương thức Atomic

$atomic = new Atomic(100);

$atomic->get(); // lấy giá trị hiện tại: 100
$atomic->set(200); // đặt giá trị mới: 200
$atomic->add(50); // cộng và trả về giá trị MỚI: 250
$atomic->sub(30); // trừ và trả về giá trị MỚI: 220

// Compare And Swap: chỉ set nếu giá trị hiện tại = expected
$success = $atomic->cmpset(220, 999); // true: đổi từ 220 → 999
$success = $atomic->cmpset(220, 888); // false: giá trị không còn là 220

Atomic\Long - Cho số lớn

Atomic dùng 32-bit integer (max ~2 tỷ). Nếu cần số lớn hơn:

use Swoole\Atomic\Long;

$atomic = new Long(0);
$atomic->add(PHP_INT_MAX); // hỗ trợ 64-bit

Ứng dụng thực tế

Cache stampede prevention

Khi cache hết hạn, nhiều request cùng lúc cùng cố rebuild cache → gọi đến DB hàng trăm lần. Dùng lock để chỉ 1 request rebuild:

class CacheManager
{
private static array $locks = [];

public static function remember(string $key, int $ttl, callable $callback): mixed
{
$cached = Cache::get($key);
if ($cached !== null) {
return $cached;
}

// Double-check locking pattern
if (!isset(self::$locks[$key])) {
self::$locks[$key] = new \Swoole\Coroutine\Lock();
}

$lock = self::$locks[$key];
$lock->lock();
try {
// Kiểm tra lại sau khi lấy lock (có thể coroutine khác đã rebuild rồi)
$cached = Cache::get($key);
if ($cached !== null) {
return $cached;
}

$value = $callback();
Cache::set($key, $value, $ttl);
return $value;
} finally {
$lock->unlock();
}
}
}

// Dùng
$data = CacheManager::remember('expensive_query', 3600, fn() => DB::complex()->get());

Request counter

class ServerStats
{
// Atomic được chia sẻ an toàn giữa tất cả coroutine
private static Atomic $totalRequests;
private static Atomic $activeRequests;

public static function boot(): void
{
self::$totalRequests = new Atomic(0);
self::$activeRequests = new Atomic(0);
}

public static function requestStarted(): void
{
self::$totalRequests->add(1);
self::$activeRequests->add(1);
}

public static function requestFinished(): void
{
self::$activeRequests->sub(1);
}

public static function getStats(): array
{
return [
'total' => self::$totalRequests->get(),
'active' => self::$activeRequests->get(),
];
}
}

Distributed lock với Redis

Khi cần lock giữa nhiều server (không chỉ trong một process), dùng Redis:

class RedisLock
{
private string $key;
private string $token;
private int $ttl;

public function __construct(string $resource, int $ttlSeconds = 30)
{
$this->key = "lock:$resource";
$this->token = bin2hex(random_bytes(16)); // unique token
$this->ttl = $ttlSeconds;
}

public function acquire(): bool
{
// SET NX EX: chỉ set nếu key chưa tồn tại, đồng thời đặt TTL
return (bool) Redis::set($this->key, $this->token, ['NX', 'EX' => $this->ttl]);
}

public function release(): void
{
// Lua script: chỉ xóa nếu token khớp (tránh xóa lock của người khác)
Redis::eval(
"if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end",
keys: [$this->key],
args: [$this->token]
);
}
}

// Dùng
$lock = new RedisLock('payment:order:42');

if ($lock->acquire()) {
try {
processPayment($orderId);
} finally {
$lock->release();
}
} else {
throw new \Exception('Payment đang được xử lý');
}

Read-Write Lock

Khi nhiều coroutine chỉ đọc dữ liệu cùng lúc thì an toàn. Vấn đề chỉ xảy ra khi có ghi. ReadWrite Lock cho phép nhiều reader đồng thời, nhưng writer được độc quyền:

// Swoole không có built-in RWLock, nhưng có thể mô phỏng bằng Channel:
class ReadWriteLock
{
private Channel $writeLock;
private Atomic $readers;

public function __construct()
{
$this->writeLock = new Channel(1);
$this->writeLock->push(1); // unlock ban đầu
$this->readers = new Atomic(0);
}

public function lockRead(): void
{
// Block nếu có writer
if ($this->readers->add(1) === 1) {
$this->writeLock->pop(); // lần đầu đọc → lấy write lock
}
}

public function unlockRead(): void
{
if ($this->readers->sub(1) === 0) {
$this->writeLock->push(1); // reader cuối → trả write lock
}
}

public function lockWrite(): void
{
$this->writeLock->pop(); // chờ tất cả reader xong
}

public function unlockWrite(): void
{
$this->writeLock->push(1);
}
}

Tóm tắt

Công cụDùng khi
Coroutine\Lock (Mutex)Bảo vệ tài nguyên phức tạp, cần đảm bảo chỉ 1 coroutine truy cập
AtomicCounter đơn giản, thao tác tăng/giảm số
Atomic\LongCounter khi giá trị vượt quá 32-bit
Redis lockLock giữa nhiều server (distributed)
Quy tắc đơn giản
  • Chỉ cần đếm → dùng Atomic
  • Cần bảo vệ một đoạn code → dùng Coroutine\Lock với try/finally
  • Cần lock giữa nhiều máy chủ → dùng Redis distributed lock