Lock & Atomic
Vì sao cần Lock?
Trong môi trường coroutine, nhiều coroutine chạy đồng thời và có thể cùng truy cập một tài nguyên chia sẻ. Nếu không có cơ chế bảo vệ, dữ liệu có thể bị sai.
Ví dụ vấn đề race condition:
$counter = 0; // biến chia sẻ
// Coroutine A và B cùng chạy
Coroutine::create(function () use (&$counter) {
$temp = $counter; // đọc: 0
Coroutine::sleep(0.001); // yield - B chạy vào
$counter = $temp + 1; // ghi: 1 (nhưng B cũng đã ghi 1 rồi!)
});
Coroutine::create(function () use (&$counter) {
$temp = $counter; // đọc: 0 (A chưa ghi lại!)
$counter = $temp + 1; // ghi: 1
});
// Kết quả: $counter = 1 thay vì 2 ❌
Lock đảm bảo chỉ một coroutine được thao tác với tài nguyên tại một thời điểm.
Mutex (Coroutine Lock)
Coroutine\Lock là mutex dành riêng cho môi trường coroutine. Khi một coroutine đang giữ lock, coroutine khác muốn lấy lock sẽ yield (không block thread) cho đến khi lock được giải phóng.
use Swoole\Coroutine\Lock;
$lock = new Lock();
Coroutine::create(function () use ($lock) {
$lock->lock(); // lấy lock (yield nếu đang bị giữ)
try {
// Vùng an toàn - chỉ 1 coroutine vào đây tại một thời điểm
$data = readSharedData();
processData($data);
writeSharedData($data);
} finally {
$lock->unlock(); // LUÔN giải phóng trong finally
}
});
finallyNếu có exception xảy ra trước khi unlock(), lock sẽ không bao giờ được giải phóng và các coroutine khác sẽ chờ mãi mãi (deadlock). Luôn đặt unlock() trong finally.
Trylock - Không block nếu lock đang bận
$lock = new Lock();
Coroutine::create(function () use ($lock) {
if ($lock->trylock()) {
// lấy được lock ngay lập tức
try {
doWork();
} finally {
$lock->unlock();
}
} else {
// Không lấy được → xử lý khác (không chờ)
return cachedResult();
}
});
Timeout khi lấy lock
$lock = new Lock();
Coroutine::create(function () use ($lock) {
// Chờ tối đa 2 giây để lấy lock
if ($lock->lock(2.0)) {
try {
doWork();
} finally {
$lock->unlock();
}
} else {
// Hết timeout vẫn không lấy được lock
throw new \RuntimeException('Could not acquire lock after 2 seconds');
}
});
Atomic - Thao tác nguyên tử
Swoole\Atomic cung cấp counter thread-safe mà không cần lock. Dùng cho các thao tác tăng/giảm số đơn giản - nhanh hơn lock nhiều.
use Swoole\Atomic;
$counter = new Atomic(0); // giá trị ban đầu = 0
// An toàn khi nhiều coroutine cùng gọi đồng thời
Coroutine::create(fn() => $counter->add(1)); // +1
Coroutine::create(fn() => $counter->add(1)); // +1
Coroutine::create(fn() => $counter->sub(1)); // -1
echo $counter->get(); // 1 (luôn đúng)
Các phương thức Atomic
$atomic = new Atomic(100);
$atomic->get(); // lấy giá trị hiện tại: 100
$atomic->set(200); // đặt giá trị mới: 200
$atomic->add(50); // cộng và trả về giá trị MỚI: 250
$atomic->sub(30); // trừ và trả về giá trị MỚI: 220
// Compare And Swap: chỉ set nếu giá trị hiện tại = expected
$success = $atomic->cmpset(220, 999); // true: đổi từ 220 → 999
$success = $atomic->cmpset(220, 888); // false: giá trị không còn là 220
Atomic\Long - Cho số lớn
Atomic dùng 32-bit integer (max ~2 tỷ). Nếu cần số lớn hơn:
use Swoole\Atomic\Long;
$atomic = new Long(0);
$atomic->add(PHP_INT_MAX); // hỗ trợ 64-bit
Ứng dụng thực tế
Cache stampede prevention
Khi cache hết hạn, nhiều request cùng lúc cùng cố rebuild cache → gọi đến DB hàng trăm lần. Dùng lock để chỉ 1 request rebuild:
class CacheManager
{
private static array $locks = [];
public static function remember(string $key, int $ttl, callable $callback): mixed
{
$cached = Cache::get($key);
if ($cached !== null) {
return $cached;
}
// Double-check locking pattern
if (!isset(self::$locks[$key])) {
self::$locks[$key] = new \Swoole\Coroutine\Lock();
}
$lock = self::$locks[$key];
$lock->lock();
try {
// Kiểm tra lại sau khi lấy lock (có thể coroutine khác đã rebuild rồi)
$cached = Cache::get($key);
if ($cached !== null) {
return $cached;
}
$value = $callback();
Cache::set($key, $value, $ttl);
return $value;
} finally {
$lock->unlock();
}
}
}
// Dùng
$data = CacheManager::remember('expensive_query', 3600, fn() => DB::complex()->get());
Request counter
class ServerStats
{
// Atomic được chia sẻ an toàn giữa tất cả coroutine
private static Atomic $totalRequests;
private static Atomic $activeRequests;
public static function boot(): void
{
self::$totalRequests = new Atomic(0);
self::$activeRequests = new Atomic(0);
}
public static function requestStarted(): void
{
self::$totalRequests->add(1);
self::$activeRequests->add(1);
}
public static function requestFinished(): void
{
self::$activeRequests->sub(1);
}
public static function getStats(): array
{
return [
'total' => self::$totalRequests->get(),
'active' => self::$activeRequests->get(),
];
}
}
Distributed lock với Redis
Khi cần lock giữa nhiều server (không chỉ trong một process), dùng Redis:
class RedisLock
{
private string $key;
private string $token;
private int $ttl;
public function __construct(string $resource, int $ttlSeconds = 30)
{
$this->key = "lock:$resource";
$this->token = bin2hex(random_bytes(16)); // unique token
$this->ttl = $ttlSeconds;
}
public function acquire(): bool
{
// SET NX EX: chỉ set nếu key chưa tồn tại, đồng thời đặt TTL
return (bool) Redis::set($this->key, $this->token, ['NX', 'EX' => $this->ttl]);
}
public function release(): void
{
// Lua script: chỉ xóa nếu token khớp (tránh xóa lock của người khác)
Redis::eval(
"if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end",
keys: [$this->key],
args: [$this->token]
);
}
}
// Dùng
$lock = new RedisLock('payment:order:42');
if ($lock->acquire()) {
try {
processPayment($orderId);
} finally {
$lock->release();
}
} else {
throw new \Exception('Payment đang được xử lý');
}
Read-Write Lock
Khi nhiều coroutine chỉ đọc dữ liệu cùng lúc thì an toàn. Vấn đề chỉ xảy ra khi có ghi. ReadWrite Lock cho phép nhiều reader đồng thời, nhưng writer được độc quyền:
// Swoole không có built-in RWLock, nhưng có thể mô phỏng bằng Channel:
class ReadWriteLock
{
private Channel $writeLock;
private Atomic $readers;
public function __construct()
{
$this->writeLock = new Channel(1);
$this->writeLock->push(1); // unlock ban đầu
$this->readers = new Atomic(0);
}
public function lockRead(): void
{
// Block nếu có writer
if ($this->readers->add(1) === 1) {
$this->writeLock->pop(); // lần đầu đọc → lấy write lock
}
}
public function unlockRead(): void
{
if ($this->readers->sub(1) === 0) {
$this->writeLock->push(1); // reader cuối → trả write lock
}
}
public function lockWrite(): void
{
$this->writeLock->pop(); // chờ tất cả reader xong
}
public function unlockWrite(): void
{
$this->writeLock->push(1);
}
}
Tóm tắt
| Công cụ | Dùng khi |
|---|---|
Coroutine\Lock (Mutex) | Bảo vệ tài nguyên phức tạp, cần đảm bảo chỉ 1 coroutine truy cập |
Atomic | Counter đơn giản, thao tác tăng/giảm số |
Atomic\Long | Counter khi giá trị vượt quá 32-bit |
| Redis lock | Lock giữa nhiều server (distributed) |
- Chỉ cần đếm → dùng
Atomic - Cần bảo vệ một đoạn code → dùng
Coroutine\Lockvớitry/finally - Cần lock giữa nhiều máy chủ → dùng Redis distributed lock