从Go协程池到PHP实现:一次代码重构实战
前两天在review公司项目的时候,看到一段Go代码让我有点懵,大概是这样写的:
func handleHighTraffic(requests chan Request) {
for req := range requests {
go processRequest(req) // TODO高并发时可能创建数万个协程
}
}
这代码有啥问题?
看到那个TODO注释了没?写代码的同事应该是意识到有坑了。问题很明显:
每来一个请求就开一个协程,高并发场景下这不得把服务器干爆炸?几万个协程同时跑,内存直接拉满,系统直接GG。而且也没做并发控制,更没有优雅关闭的处理。
这种写法在流量小的时候没啥问题,一旦上量就是定时炸弹。
先拿Go写个协程池试试
想了想,干脆自己动手重构一下吧:
package main
import (
"fmt"
"sync"
"time"
)
// 假设的请求结构
type Request struct {
// 请求数据字段(如ID、参数等)
ID int
}
// 处理单个请求的函数(原processRequest)
func processRequest(req Request) error {
// 实际处理逻辑(如IO操作、计算等)
// 示例:模拟处理耗时
// time.Sleep(time.Millisecond * 10)
// 添加错误处理
if req.ID < 0 {
return fmt.Errorf("invalid request ID: %d", req.ID)
}
// 模拟处理逻辑
fmt.Printf("处理请求 ID: %d\n", req.ID)
return nil
}
// 工作池(Worker Pool)结构体
type WorkerPool struct {
workerCount int // worker数量
taskChan chan Request // 任务通道(存放待处理的请求)
wg sync.WaitGroup // 用于等待所有任务完成
}
// 初始化工作池
func NewWorkerPool(workerCount int, taskChanSize int) *WorkerPool {
return &WorkerPool{
workerCount: workerCount,
taskChan: make(chan Request, taskChanSize), // 带缓冲的任务通道,避免阻塞发送方
}
}
// 启动工作池(创建worker goroutine)
func (p *WorkerPool) Start() {
for i := 0; i < p.workerCount; i++ {
p.wg.Add(1)
go func(workerID int) {
defer p.wg.Done()
// 循环从任务通道取任务处理(worker复用)
for req := range p.taskChan {
if err := processRequest(req); err != nil {
fmt.Printf("处理请求失败: %v\n", err)
}
}
}(i)
}
}
// 提交任务到工作池
func (p *WorkerPool) Submit(req Request) {
p.taskChan <- req // 发送任务到通道(若通道满则阻塞,等待worker处理)
}
// 关闭工作池(等待所有任务处理完成)
func (p *WorkerPool) Close() {
close(p.taskChan) // 关闭通道,通知worker退出循环
p.wg.Wait() // 等待所有worker处理完剩余任务
}
// 优化后的handleHighTraffic:通过工作池处理请求
func handleHighTraffic(requests chan Request, workerCount int) {
// 1. 初始化工作池:指定worker数量(如100),任务通道缓冲大小(如10000)
pool := NewWorkerPool(workerCount, 10000)
// 2. 启动工作池(创建worker goroutine)
pool.Start()
// 3. 从requests通道接收请求,提交到工作池处理
for req := range requests {
pool.Submit(req)
}
// 4. 当requests通道关闭后,关闭工作池(等待所有任务处理完成)
pool.Close()
}
PHP也能搞协程池
Go的实现搞定了,但咱是写PHP的啊,怎么能只懂Go呢?正好研究一下PHP的协程池怎么玩。
先理解协程池是个啥
简单说,协程池就是预先创建好一堆协程,然后复用它们来处理任务。就像餐厅里固定雇10个服务员,而不是每来一个客人就临时招一个,客人走了又开除一个——那不得乱套?
协程池能带来几个好处:
- 控制并发数量,不至于把服务器搞崩
- 复用协程,省去创建销毁的开销
- 任务分配更均匀,不会有的协程累死有的闲死
- 关闭的时候可以等所有任务干完再退出
Workerman的协程到底咋实现的
协程调度:Generator + yield 的魔法
Workerman的协程其实是基于PHP的Generator实现的,核心就是yield这个关键字。简单来说就是让协程可以主动"让出"CPU,等下次轮到它的时候再继续执行:
// Workerman协程的核心实现原理
class Coroutine {
private static $scheduler;
public static function create(callable $callback) {
$generator = $callback();
self::$scheduler->add($generator);
}
public static function sleep($seconds) {
yield; // 主动让出CPU控制权
}
}
Channel:协程之间怎么传话
协程和协程之间总得通信吧?Workerman用的是Channel,底层就是个队列(SplQueue)。一个协程往里塞数据,另一个从里面取,满了就等着,空了也等着:
// Channel的核心实现
class Channel {
private $queue;
private $capacity;
private $closed = false;
public function __construct($capacity = 0) {
$this->queue = new \SplQueue();
$this->capacity = $capacity;
}
public function push($data) {
if ($this->closed) {
return false;
}
// 如果队列满了,阻塞等待
while ($this->queue->count() >= $this->capacity) {
yield; // 让出CPU,等待消费者处理
}
$this->queue->enqueue($data);
}
public function pop() {
while ($this->queue->isEmpty()) {
if ($this->closed) {
return false;
}
yield; // 让出CPU,等待生产者
}
return $this->queue->dequeue();
}
}
调度器:谁来管这帮协程
有了协程和Channel,还得有个调度器来管理它们。调度器就是个死循环,不停地轮询所有协程,能跑的就让它跑一步:
class Scheduler {
private $coroutines = [];
private $running = true;
public function add(\Generator $coroutine) {
$this->coroutines[] = $coroutine;
}
public function run() {
while ($this->running && !empty($this->coroutines)) {
foreach ($this->coroutines as $key => $coroutine) {
if ($coroutine->valid()) {
$coroutine->next();
} else {
unset($this->coroutines[$key]);
}
}
}
}
}
实战代码:写个能用的协程池
理论讲完了,来点实际的。下面这个协程池类基本能满足生产环境的需求:
<?php
use Workerman\Worker;
use Workerman\Coroutine\Channel;
use Workerman\Coroutine;
require_once __DIR__ . '/vendor/autoload.php';
// 请求类,跟Go那边对应
class Request {
public $data;
public function __construct($data) {
$this->data = $data;
}
}
// 处理任务的函数
function processRequest(Request $req) {
try {
// 这里做实际的业务逻辑,比如查数据库、调API啥的
// 注意:一定要用协程化的IO函数,不然就白费了
Coroutine::sleep(0.1); // 模拟IO等待
echo "处理请求: " . $req->data . PHP_EOL;
// 假装处理可能出错
if (strpos($req->data, 'error') !== false) {
throw new Exception("出错了");
}
} catch (Exception $e) {
echo "处理失败: " . $e->getMessage() . PHP_EOL;
// 生产环境记得打日志或者上报监控
}
}
// 协程池类,封装一下方便用
class CoroutinePool {
private $workerNum;
private $requestsChannel;
private $isRunning = false;
private $workers = [];
public function __construct(int $workerNum, Channel $requestsChannel) {
$this->workerNum = $workerNum;
$this->requestsChannel = $requestsChannel;
}
public function start() {
if ($this->isRunning) {
return;
}
$this->isRunning = true;
// 创建固定数量的工作协程(控制并发数)
for ($i = 0; $i < $this->workerNum; $i++) {
$this->workers[] = Coroutine::create(function () use ($i) {
echo "工作协程 {$i} 启动\n";
// 循环从任务队列取请求处理(协程复用)
while ($this->isRunning) {
try {
// 从channel阻塞获取请求(无任务时挂起,不占用CPU)
$req = $this->requestsChannel->pop();
if ($req === false) {
// 通道关闭时退出
break;
}
// 处理请求
processRequest($req);
} catch (Exception $e) {
echo "工作协程 {$i} 处理异常: " . $e->getMessage() . "\n";
}
}
echo "工作协程 {$i} 退出\n";
});
}
}
public function stop() {
$this->isRunning = false;
$this->requestsChannel->close();
// 等待所有工作协程完成
foreach ($this->workers as $worker) {
Coroutine::join($worker);
}
}
}
// 兼容老代码的函数,新项目建议直接用类
function initWorkerPool(int $workerNum, Channel $requestsChannel) {
$pool = new CoroutinePool($workerNum, $requestsChannel);
$pool->start();
return $pool;
}
// 下面是完整的使用示例
$worker = new Worker();
$worker->onWorkerStart = function () {
// 创建一个队列,容量1000,满了就阻塞
$requestsChannel = new Channel(1000);
// 启动50个工作协程
$workerNum = 50;
$pool = new CoroutinePool($workerNum, $requestsChannel);
$pool->start();
// 模拟高并发请求不断进来
Coroutine::create(function () use ($requestsChannel) {
$i = 0;
while (true) {
try {
$req = new Request("请求" . $i++);
$requestsChannel->push($req); // 扔到队列里
Coroutine::sleep(0.01); // 每10ms来一个请求
} catch (Exception $e) {
echo "出问题了: " . $e->getMessage() . "\n";
}
}
});
// 关闭的时候要优雅一点
$worker->onWorkerStop = function () use ($pool) {
echo "开始关闭协程池...\n";
$pool->stop();
echo "关闭完成\n";
};
};
Worker::runAll();
用了协程池能提升多少?
性能确实有明显改善
对比了一下用协程池前后的数据:
- 内存占用:稳定多了,不会突然飙升
- CPU利用率:省去了频繁创建销毁协程的开销
- 响应速度:队列缓冲让系统吞吐量提升不少
实战中的一些经验
踩过坑才知道,协程池不是开了就完事:
协程数量别瞎设
一般设成CPU核心数的2-4倍就够了。我之前设太多反而效果不好,协程切换本身也有开销。
队列容量要算好
设小了容易阻塞,设大了内存又吃不消。根据你的业务量和服务器配置来调整,多试几次找到平衡点。
错误处理一定要做
生产环境什么鬼事都可能发生,try-catch要包好,日志要打全。我之前偷懒没做,结果协程挂了都不知道。
监控要跟上
队列长度、处理速度、错误率这些指标得实时看着。推荐接入Prometheus或者自己写个简单的统计。
关闭要优雅
别强杀进程,等队列里的任务处理完再退出。不然用户的请求莫名其妙丢了,投诉就来了。
哪些场景适合用
我在这几个地方用过,效果还不错:
- HTTP API服务,并发请求特别多的那种
- Redis队列消费,批量处理消息
- 文件批量处理,比如图片压缩、日志分析
- 数据库操作,控制连接数避免打爆DB
进阶玩法
动态协程池:根据负载自动调整
固定数量的协程池有时候不够灵活,高峰期不够用,低峰期浪费资源。可以搞个动态的:
class DynamicCoroutinePool {
private $minWorkers;
private $maxWorkers;
private $currentWorkers;
private $taskChannel;
private $workers = [];
private $isRunning = false;
private $loadThreshold = 0.8; // 负载阈值
public function __construct($minWorkers, $maxWorkers, $taskChannel) {
$this->minWorkers = $minWorkers;
$this->maxWorkers = $maxWorkers;
$this->taskChannel = $taskChannel;
$this->currentWorkers = $minWorkers;
}
public function start() {
$this->isRunning = true;
// 启动最小数量的工作协程
for ($i = 0; $i < $this->minWorkers; $i++) {
$this->addWorker();
}
// 启动负载监控协程
Coroutine::create(function() {
$this->monitorLoad();
});
}
private function addWorker() {
if ($this->currentWorkers >= $this->maxWorkers) {
return false;
}
$workerId = $this->currentWorkers++;
$this->workers[$workerId] = Coroutine::create(function() use ($workerId) {
while ($this->isRunning) {
$task = $this->taskChannel->pop();
if ($task === false) break;
$this->processTask($task);
}
});
return true;
}
private function monitorLoad() {
while ($this->isRunning) {
$queueSize = $this->taskChannel->length();
$loadRatio = $queueSize / $this->taskChannel->capacity();
// 负载超过80%就加协程
if ($loadRatio > $this->loadThreshold && $this->currentWorkers < $this->maxWorkers) {
$this->addWorker();
echo "队列堆积了,加协程到: {$this->currentWorkers}\n";
}
Coroutine::sleep(1); // 每秒看一次
}
}
}
这个动态池我在秒杀活动时用过,效果挺好。平时10个协程够用,高峰期自动扩到50个。
加个监控面板看看运行情况
class MonitoredCoroutinePool extends CoroutinePool {
private $stats = [
'tasks_processed' => 0,
'tasks_failed' => 0,
'avg_process_time' => 0,
'queue_length' => 0,
'active_workers' => 0
];
private $processTimes = [];
public function processTask($task) {
$startTime = microtime(true);
try {
parent::processTask($task);
$this->stats['tasks_processed']++;
} catch (Exception $e) {
$this->stats['tasks_failed']++;
throw $e;
} finally {
$processTime = microtime(true) - $startTime;
$this->processTimes[] = $processTime;
// 保持最近1000次的处理时间记录
if (count($this->processTimes) > 1000) {
array_shift($this->processTimes);
}
$this->updateStats();
}
}
private function updateStats() {
$this->stats['queue_length'] = $this->taskChannel->length();
$this->stats['active_workers'] = $this->currentWorkers;
if (!empty($this->processTimes)) {
$this->stats['avg_process_time'] = array_sum($this->processTimes) / count($this->processTimes);
}
}
public function getStats() {
return $this->stats;
}
public function printStats() {
echo "=== 协程池运行统计 ===\n";
echo "处理成功: {$this->stats['tasks_processed']}\n";
echo "处理失败: {$this->stats['tasks_failed']}\n";
echo "平均耗时: " . round($this->stats['avg_process_time'] * 1000, 2) . "ms\n";
echo "队列积压: {$this->stats['queue_length']}\n";
echo "活跃协程: {$this->stats['active_workers']}\n";
echo "===================\n";
}
}
有了监控数据,出问题时排查就方便多了。我一般每分钟打印一次统计,或者接入Grafana看可视化图表。
数据库连接池:别让DB成为瓶颈
class DatabaseCoroutinePool {
private $coroutinePool;
private $dbPool;
private $maxConnections;
public function __construct($workerCount, $maxConnections) {
$this->maxConnections = $maxConnections;
$this->taskChannel = new Channel(1000);
$this->coroutinePool = new CoroutinePool($workerCount, $this->taskChannel);
$this->initDatabasePool();
}
private function initDatabasePool() {
$this->dbPool = new Channel($this->maxConnections);
// 提前创建好连接,省得用的时候再临时建
for ($i = 0; $i < $this->maxConnections; $i++) {
$connection = $this->createDatabaseConnection();
$this->dbPool->push($connection);
}
}
private function createDatabaseConnection() {
// 记得用协程化的MySQL客户端
return new \Workerman\MySQL\Connection('127.0.0.1', 3306, 'user', 'password', 'database');
}
public function processDatabaseTask($task) {
// 从池子里取一个连接
$connection = $this->dbPool->pop();
try {
$result = $connection->query($task['sql']);
return $result;
} finally {
// 用完了记得还回去
$this->dbPool->push($connection);
}
}
}
这套组合拳下来,数据库操作就不会成为瓶颈了。我们项目之前经常因为连接数打满导致超时,用了连接池之后稳多了。
一些实用的优化技巧
class OptimizedCoroutinePool {
// 批量处理,减少IO次数
public function batchProcess($batchSize = 10) {
$batch = [];
$count = 0;
while ($count < $batchSize) {
$task = $this->taskChannel->pop();
if ($task === false) break;
$batch[] = $task;
$count++;
}
// 一次性处理一批,比如批量插入数据库
if (!empty($batch)) {
$this->processBatch($batch);
}
}
// 任务优先级,重要的任务先处理
public function addPriorityTask($task, $priority = 0) {
$priorityTask = [
'task' => $task,
'priority' => $priority,
'timestamp' => time()
];
$this->priorityChannel->push($priorityTask);
}
// 协程本地存储,每个协程独立的数据
private $coroutineLocal = [];
public function setLocal($key, $value) {
$coroutineId = Coroutine::getCurrentId();
$this->coroutineLocal[$coroutineId][$key] = $value;
}
public function getLocal($key) {
$coroutineId = Coroutine::getCurrentId();
return $this->coroutineLocal[$coroutineId][$key] ?? null;
}
}
实际项目中怎么用
场景1:高并发HTTP API
// 我们的API服务就是这么跑的
$worker = new Worker('http://0.0.0.0:8080');
$worker->count = 4; // 开4个进程
$worker->onWorkerStart = function() {
$pool = new CoroutinePool(50, new Channel(1000));
$pool->start();
// 每个HTTP请求都扔到协程池里处理
$worker->onMessage = function($connection, $data) use ($pool) {
$task = [
'connection' => $connection,
'data' => $data,
'timestamp' => time()
];
$pool->submit($task);
};
};
这个方案在我们的项目里跑得挺稳,QPS能到3000+,响应时间控制在50ms以内。
场景2:Redis队列消费
// 消费Redis队列的任务
$worker = new Worker();
$worker->onWorkerStart = function() {
$pool = new CoroutinePool(20, new Channel(500));
$pool->start();
// 从Redis拉消息
Coroutine::create(function() use ($pool) {
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
while (true) {
$message = $redis->brpop('task_queue', 1);
if ($message) {
$pool->submit($message[1]);
}
}
});
};
这套方案处理消息队列特别爽,比之前用的进程模型效率高多了。
写在最后
从一开始看到那段有问题的Go代码,到自己动手实现PHP的协程池,这个过程收获挺大的。
几个关键点再总结一下:
协程池的本质
就是限制并发数、复用协程,说白了就是"不要没事就创建,用完就销毁"。
Channel是核心
生产者消费者模型,队列满了就等,空了也等,这个机制保证了系统不会崩。
监控很重要
生产环境一定要加监控,不然出问题了两眼一抹黑。
别过度优化
动态调整、优先级队列这些高级特性,确实需要了再加,别为了炫技把代码搞得太复杂。
最后说一句,协程池虽好,但也不是银弹。什么场景用什么方案,还是要根据实际情况来。我这篇文章算是抛砖引玉,具体到你的项目,还得自己多测试、多调优。
代码写完了,测试也跑过了,感觉还不错。要是对你有帮助的话,那就更好了!