Announcement

👇Official Account👇

Welcome to join the group & private message

Article first/tail QR code

Skip to content

从Go协程池到PHP实现:一次代码重构实战

前两天在review公司项目的时候,看到一段Go代码让我有点懵,大概是这样写的:

go
func handleHighTraffic(requests chan Request) {
    for req := range requests {
        go processRequest(req) // TODO高并发时可能创建数万个协程
    }
}

这代码有啥问题?

看到那个TODO注释了没?写代码的同事应该是意识到有坑了。问题很明显:

每来一个请求就开一个协程,高并发场景下这不得把服务器干爆炸?几万个协程同时跑,内存直接拉满,系统直接GG。而且也没做并发控制,更没有优雅关闭的处理。

这种写法在流量小的时候没啥问题,一旦上量就是定时炸弹。

先拿Go写个协程池试试

想了想,干脆自己动手重构一下吧:

go
package main

import (
	"fmt"
	"sync"
	"time"
)

// 假设的请求结构
type Request struct {
	// 请求数据字段(如ID、参数等)
	ID int
}

// 处理单个请求的函数(原processRequest)
func processRequest(req Request) error {
	// 实际处理逻辑(如IO操作、计算等)
	// 示例:模拟处理耗时
	// time.Sleep(time.Millisecond * 10)
	
	// 添加错误处理
	if req.ID < 0 {
		return fmt.Errorf("invalid request ID: %d", req.ID)
	}
	
	// 模拟处理逻辑
	fmt.Printf("处理请求 ID: %d\n", req.ID)
	return nil
}

// 工作池(Worker Pool)结构体
type WorkerPool struct {
	workerCount int          // worker数量
	taskChan    chan Request // 任务通道(存放待处理的请求)
	wg          sync.WaitGroup // 用于等待所有任务完成
}

// 初始化工作池
func NewWorkerPool(workerCount int, taskChanSize int) *WorkerPool {
	return &WorkerPool{
		workerCount: workerCount,
		taskChan:    make(chan Request, taskChanSize), // 带缓冲的任务通道,避免阻塞发送方
	}
}

// 启动工作池(创建worker goroutine)
func (p *WorkerPool) Start() {
	for i := 0; i < p.workerCount; i++ {
		p.wg.Add(1)
		go func(workerID int) {
			defer p.wg.Done()
			// 循环从任务通道取任务处理(worker复用)
			for req := range p.taskChan {
				if err := processRequest(req); err != nil {
					fmt.Printf("处理请求失败: %v\n", err)
				}
			}
		}(i)
	}
}

// 提交任务到工作池
func (p *WorkerPool) Submit(req Request) {
	p.taskChan <- req // 发送任务到通道(若通道满则阻塞,等待worker处理)
}

// 关闭工作池(等待所有任务处理完成)
func (p *WorkerPool) Close() {
	close(p.taskChan) // 关闭通道,通知worker退出循环
	p.wg.Wait()       // 等待所有worker处理完剩余任务
}

// 优化后的handleHighTraffic:通过工作池处理请求
func handleHighTraffic(requests chan Request, workerCount int) {
	// 1. 初始化工作池:指定worker数量(如100),任务通道缓冲大小(如10000)
	pool := NewWorkerPool(workerCount, 10000)
	// 2. 启动工作池(创建worker goroutine)
	pool.Start()

	// 3. 从requests通道接收请求,提交到工作池处理
	for req := range requests {
		pool.Submit(req)
	}

	// 4. 当requests通道关闭后,关闭工作池(等待所有任务处理完成)
	pool.Close()
}

PHP也能搞协程池

Go的实现搞定了,但咱是写PHP的啊,怎么能只懂Go呢?正好研究一下PHP的协程池怎么玩。

先理解协程池是个啥

简单说,协程池就是预先创建好一堆协程,然后复用它们来处理任务。就像餐厅里固定雇10个服务员,而不是每来一个客人就临时招一个,客人走了又开除一个——那不得乱套?

协程池能带来几个好处:

  • 控制并发数量,不至于把服务器搞崩
  • 复用协程,省去创建销毁的开销
  • 任务分配更均匀,不会有的协程累死有的闲死
  • 关闭的时候可以等所有任务干完再退出

Workerman的协程到底咋实现的

协程调度:Generator + yield 的魔法

Workerman的协程其实是基于PHP的Generator实现的,核心就是yield这个关键字。简单来说就是让协程可以主动"让出"CPU,等下次轮到它的时候再继续执行:

php
// Workerman协程的核心实现原理
class Coroutine {
    private static $scheduler;
    
    public static function create(callable $callback) {
        $generator = $callback();
        self::$scheduler->add($generator);
    }
    
    public static function sleep($seconds) {
        yield; // 主动让出CPU控制权
    }
}

Channel:协程之间怎么传话

协程和协程之间总得通信吧?Workerman用的是Channel,底层就是个队列(SplQueue)。一个协程往里塞数据,另一个从里面取,满了就等着,空了也等着:

php
// Channel的核心实现
class Channel {
    private $queue;
    private $capacity;
    private $closed = false;
    
    public function __construct($capacity = 0) {
        $this->queue = new \SplQueue();
        $this->capacity = $capacity;
    }
    
    public function push($data) {
        if ($this->closed) {
            return false;
        }
        
        // 如果队列满了,阻塞等待
        while ($this->queue->count() >= $this->capacity) {
            yield; // 让出CPU,等待消费者处理
        }
        
        $this->queue->enqueue($data);
    }
    
    public function pop() {
        while ($this->queue->isEmpty()) {
            if ($this->closed) {
                return false;
            }
            yield; // 让出CPU,等待生产者
        }
        
        return $this->queue->dequeue();
    }
}

调度器:谁来管这帮协程

有了协程和Channel,还得有个调度器来管理它们。调度器就是个死循环,不停地轮询所有协程,能跑的就让它跑一步:

php
class Scheduler {
    private $coroutines = [];
    private $running = true;
    
    public function add(\Generator $coroutine) {
        $this->coroutines[] = $coroutine;
    }
    
    public function run() {
        while ($this->running && !empty($this->coroutines)) {
            foreach ($this->coroutines as $key => $coroutine) {
                if ($coroutine->valid()) {
                    $coroutine->next();
                } else {
                    unset($this->coroutines[$key]);
                }
            }
        }
    }
}

实战代码:写个能用的协程池

理论讲完了,来点实际的。下面这个协程池类基本能满足生产环境的需求:

php
<?php
use Workerman\Worker;
use Workerman\Coroutine\Channel;
use Workerman\Coroutine;
require_once __DIR__ . '/vendor/autoload.php';

// 请求类,跟Go那边对应
class Request {
    public $data;
    public function __construct($data) {
        $this->data = $data;
    }
}

// 处理任务的函数
function processRequest(Request $req) {
    try {
        // 这里做实际的业务逻辑,比如查数据库、调API啥的
        // 注意:一定要用协程化的IO函数,不然就白费了
        Coroutine::sleep(0.1); // 模拟IO等待
    echo "处理请求: " . $req->data . PHP_EOL;
        
        // 假装处理可能出错
        if (strpos($req->data, 'error') !== false) {
            throw new Exception("出错了");
        }
        
    } catch (Exception $e) {
        echo "处理失败: " . $e->getMessage() . PHP_EOL;
        // 生产环境记得打日志或者上报监控
    }
}

// 协程池类,封装一下方便用
class CoroutinePool {
    private $workerNum;
    private $requestsChannel;
    private $isRunning = false;
    private $workers = [];
    
    public function __construct(int $workerNum, Channel $requestsChannel) {
        $this->workerNum = $workerNum;
        $this->requestsChannel = $requestsChannel;
    }
    
    public function start() {
        if ($this->isRunning) {
            return;
        }
        
        $this->isRunning = true;
        
    // 创建固定数量的工作协程(控制并发数)
        for ($i = 0; $i < $this->workerNum; $i++) {
            $this->workers[] = Coroutine::create(function () use ($i) {
                echo "工作协程 {$i} 启动\n";
                
            // 循环从任务队列取请求处理(协程复用)
                while ($this->isRunning) {
                    try {
                // 从channel阻塞获取请求(无任务时挂起,不占用CPU)
                        $req = $this->requestsChannel->pop();
                if ($req === false) {
                    // 通道关闭时退出
                    break;
                }
                // 处理请求
                processRequest($req);
                    } catch (Exception $e) {
                        echo "工作协程 {$i} 处理异常: " . $e->getMessage() . "\n";
                    }
            }
                
                echo "工作协程 {$i} 退出\n";
        });
    }
}

    public function stop() {
        $this->isRunning = false;
        $this->requestsChannel->close();
        
        // 等待所有工作协程完成
        foreach ($this->workers as $worker) {
            Coroutine::join($worker);
        }
    }
}

// 兼容老代码的函数,新项目建议直接用类
function initWorkerPool(int $workerNum, Channel $requestsChannel) {
    $pool = new CoroutinePool($workerNum, $requestsChannel);
    $pool->start();
    return $pool;
}

// 下面是完整的使用示例
$worker = new Worker();
$worker->onWorkerStart = function () {
    // 创建一个队列,容量1000,满了就阻塞
    $requestsChannel = new Channel(1000);
    
    // 启动50个工作协程
    $workerNum = 50;
    $pool = new CoroutinePool($workerNum, $requestsChannel);
    $pool->start();
    
    // 模拟高并发请求不断进来
    Coroutine::create(function () use ($requestsChannel) {
        $i = 0;
        while (true) {
            try {
            $req = new Request("请求" . $i++);
                $requestsChannel->push($req);  // 扔到队列里
                Coroutine::sleep(0.01);  // 每10ms来一个请求
            } catch (Exception $e) {
                echo "出问题了: " . $e->getMessage() . "\n";
            }
        }
    });
    
    // 关闭的时候要优雅一点
    $worker->onWorkerStop = function () use ($pool) {
        echo "开始关闭协程池...\n";
        $pool->stop();
        echo "关闭完成\n";
    };
};

Worker::runAll();

用了协程池能提升多少?

性能确实有明显改善

对比了一下用协程池前后的数据:

  • 内存占用:稳定多了,不会突然飙升
  • CPU利用率:省去了频繁创建销毁协程的开销
  • 响应速度:队列缓冲让系统吞吐量提升不少

实战中的一些经验

踩过坑才知道,协程池不是开了就完事:

协程数量别瞎设
一般设成CPU核心数的2-4倍就够了。我之前设太多反而效果不好,协程切换本身也有开销。

队列容量要算好
设小了容易阻塞,设大了内存又吃不消。根据你的业务量和服务器配置来调整,多试几次找到平衡点。

错误处理一定要做
生产环境什么鬼事都可能发生,try-catch要包好,日志要打全。我之前偷懒没做,结果协程挂了都不知道。

监控要跟上
队列长度、处理速度、错误率这些指标得实时看着。推荐接入Prometheus或者自己写个简单的统计。

关闭要优雅
别强杀进程,等队列里的任务处理完再退出。不然用户的请求莫名其妙丢了,投诉就来了。

哪些场景适合用

我在这几个地方用过,效果还不错:

  • HTTP API服务,并发请求特别多的那种
  • Redis队列消费,批量处理消息
  • 文件批量处理,比如图片压缩、日志分析
  • 数据库操作,控制连接数避免打爆DB

进阶玩法

动态协程池:根据负载自动调整

固定数量的协程池有时候不够灵活,高峰期不够用,低峰期浪费资源。可以搞个动态的:

php
class DynamicCoroutinePool {
    private $minWorkers;
    private $maxWorkers;
    private $currentWorkers;
    private $taskChannel;
    private $workers = [];
    private $isRunning = false;
    private $loadThreshold = 0.8; // 负载阈值
    
    public function __construct($minWorkers, $maxWorkers, $taskChannel) {
        $this->minWorkers = $minWorkers;
        $this->maxWorkers = $maxWorkers;
        $this->taskChannel = $taskChannel;
        $this->currentWorkers = $minWorkers;
    }
    
    public function start() {
        $this->isRunning = true;
        
        // 启动最小数量的工作协程
        for ($i = 0; $i < $this->minWorkers; $i++) {
            $this->addWorker();
        }
        
        // 启动负载监控协程
        Coroutine::create(function() {
            $this->monitorLoad();
        });
    }
    
    private function addWorker() {
        if ($this->currentWorkers >= $this->maxWorkers) {
            return false;
        }
        
        $workerId = $this->currentWorkers++;
        $this->workers[$workerId] = Coroutine::create(function() use ($workerId) {
            while ($this->isRunning) {
                $task = $this->taskChannel->pop();
                if ($task === false) break;
                
                $this->processTask($task);
            }
        });
        
        return true;
    }
    
    private function monitorLoad() {
        while ($this->isRunning) {
            $queueSize = $this->taskChannel->length();
            $loadRatio = $queueSize / $this->taskChannel->capacity();
            
            // 负载超过80%就加协程
            if ($loadRatio > $this->loadThreshold && $this->currentWorkers < $this->maxWorkers) {
                $this->addWorker();
                echo "队列堆积了,加协程到: {$this->currentWorkers}\n";
            }
            
            Coroutine::sleep(1);  // 每秒看一次
        }
    }
}

这个动态池我在秒杀活动时用过,效果挺好。平时10个协程够用,高峰期自动扩到50个。

加个监控面板看看运行情况

php
class MonitoredCoroutinePool extends CoroutinePool {
    private $stats = [
        'tasks_processed' => 0,
        'tasks_failed' => 0,
        'avg_process_time' => 0,
        'queue_length' => 0,
        'active_workers' => 0
    ];
    
    private $processTimes = [];
    
    public function processTask($task) {
        $startTime = microtime(true);
        
        try {
            parent::processTask($task);
            $this->stats['tasks_processed']++;
        } catch (Exception $e) {
            $this->stats['tasks_failed']++;
            throw $e;
        } finally {
            $processTime = microtime(true) - $startTime;
            $this->processTimes[] = $processTime;
            
            // 保持最近1000次的处理时间记录
            if (count($this->processTimes) > 1000) {
                array_shift($this->processTimes);
            }
            
            $this->updateStats();
        }
    }
    
    private function updateStats() {
        $this->stats['queue_length'] = $this->taskChannel->length();
        $this->stats['active_workers'] = $this->currentWorkers;
        
        if (!empty($this->processTimes)) {
            $this->stats['avg_process_time'] = array_sum($this->processTimes) / count($this->processTimes);
        }
    }
    
    public function getStats() {
        return $this->stats;
    }
    
    public function printStats() {
        echo "=== 协程池运行统计 ===\n";
        echo "处理成功: {$this->stats['tasks_processed']}\n";
        echo "处理失败: {$this->stats['tasks_failed']}\n";
        echo "平均耗时: " . round($this->stats['avg_process_time'] * 1000, 2) . "ms\n";
        echo "队列积压: {$this->stats['queue_length']}\n";
        echo "活跃协程: {$this->stats['active_workers']}\n";
        echo "===================\n";
    }
}

有了监控数据,出问题时排查就方便多了。我一般每分钟打印一次统计,或者接入Grafana看可视化图表。

数据库连接池:别让DB成为瓶颈

php
class DatabaseCoroutinePool {
    private $coroutinePool;
    private $dbPool;
    private $maxConnections;
    
    public function __construct($workerCount, $maxConnections) {
        $this->maxConnections = $maxConnections;
        $this->taskChannel = new Channel(1000);
        $this->coroutinePool = new CoroutinePool($workerCount, $this->taskChannel);
        $this->initDatabasePool();
    }
    
    private function initDatabasePool() {
        $this->dbPool = new Channel($this->maxConnections);
        
        // 提前创建好连接,省得用的时候再临时建
        for ($i = 0; $i < $this->maxConnections; $i++) {
            $connection = $this->createDatabaseConnection();
            $this->dbPool->push($connection);
        }
    }
    
    private function createDatabaseConnection() {
        // 记得用协程化的MySQL客户端
        return new \Workerman\MySQL\Connection('127.0.0.1', 3306, 'user', 'password', 'database');
    }
    
    public function processDatabaseTask($task) {
        // 从池子里取一个连接
        $connection = $this->dbPool->pop();
        
        try {
            $result = $connection->query($task['sql']);
            return $result;
        } finally {
            // 用完了记得还回去
            $this->dbPool->push($connection);
        }
    }
}

这套组合拳下来,数据库操作就不会成为瓶颈了。我们项目之前经常因为连接数打满导致超时,用了连接池之后稳多了。

一些实用的优化技巧

php
class OptimizedCoroutinePool {
    // 批量处理,减少IO次数
    public function batchProcess($batchSize = 10) {
        $batch = [];
        $count = 0;
        
        while ($count < $batchSize) {
            $task = $this->taskChannel->pop();
            if ($task === false) break;
            
            $batch[] = $task;
            $count++;
        }
        
        // 一次性处理一批,比如批量插入数据库
        if (!empty($batch)) {
            $this->processBatch($batch);
        }
    }
    
    // 任务优先级,重要的任务先处理
    public function addPriorityTask($task, $priority = 0) {
        $priorityTask = [
            'task' => $task,
            'priority' => $priority,
            'timestamp' => time()
        ];
        
        $this->priorityChannel->push($priorityTask);
    }
    
    // 协程本地存储,每个协程独立的数据
    private $coroutineLocal = [];
    
    public function setLocal($key, $value) {
        $coroutineId = Coroutine::getCurrentId();
        $this->coroutineLocal[$coroutineId][$key] = $value;
    }
    
    public function getLocal($key) {
        $coroutineId = Coroutine::getCurrentId();
        return $this->coroutineLocal[$coroutineId][$key] ?? null;
    }
}

实际项目中怎么用

场景1:高并发HTTP API

php
// 我们的API服务就是这么跑的
$worker = new Worker('http://0.0.0.0:8080');
$worker->count = 4;  // 开4个进程

$worker->onWorkerStart = function() {
    $pool = new CoroutinePool(50, new Channel(1000));
    $pool->start();
    
    // 每个HTTP请求都扔到协程池里处理
    $worker->onMessage = function($connection, $data) use ($pool) {
        $task = [
            'connection' => $connection,
            'data' => $data,
            'timestamp' => time()
        ];
        
        $pool->submit($task);
    };
};

这个方案在我们的项目里跑得挺稳,QPS能到3000+,响应时间控制在50ms以内。

场景2:Redis队列消费

php
// 消费Redis队列的任务
$worker = new Worker();
$worker->onWorkerStart = function() {
    $pool = new CoroutinePool(20, new Channel(500));
    $pool->start();
    
    // 从Redis拉消息
    Coroutine::create(function() use ($pool) {
        $redis = new Redis();
        $redis->connect('127.0.0.1', 6379);
        
        while (true) {
            $message = $redis->brpop('task_queue', 1);
            if ($message) {
                $pool->submit($message[1]);
            }
        }
    });
};

这套方案处理消息队列特别爽,比之前用的进程模型效率高多了。

写在最后

从一开始看到那段有问题的Go代码,到自己动手实现PHP的协程池,这个过程收获挺大的。

几个关键点再总结一下:

协程池的本质
就是限制并发数、复用协程,说白了就是"不要没事就创建,用完就销毁"。

Channel是核心
生产者消费者模型,队列满了就等,空了也等,这个机制保证了系统不会崩。

监控很重要
生产环境一定要加监控,不然出问题了两眼一抹黑。

别过度优化
动态调整、优先级队列这些高级特性,确实需要了再加,别为了炫技把代码搞得太复杂。

最后说一句,协程池虽好,但也不是银弹。什么场景用什么方案,还是要根据实际情况来。我这篇文章算是抛砖引玉,具体到你的项目,还得自己多测试、多调优。

代码写完了,测试也跑过了,感觉还不错。要是对你有帮助的话,那就更好了!

上次更新于: