PHP 是 单进程同步模型 ,一个请求对应一个进程, I/O 是同步阻塞的。通过 nginx/apache/php-fpm 等服务的扩展,才使得 PHP 提供高并发的服务,原理就是维护一个进程池,每个请求服务时单独起一个新的进程,每个进程独立存在。

PHP 不支持多线程模式和回调处理,因此 PHP 内部脚本都是同步阻塞式的,如果你发起一个 5s 的请求,那么程序就会 I/O 阻塞  5s ,直到请求返回结果,才会继续执行代码。因此做爬虫之类的高并发请求需求很吃力。

PHP 并发请求 - 示例1

demo 

/**

* curl 并发请求示例
* @param array $urls 请求url字符串数组
*/
public function multiRequest($urls)
{
// 初始化一个新的 curl 批处理句柄
$mh = curl_multi_init();
// 单个请求的 curl 句柄数组
$urlHandlers = [];
// 最终结果整理数组
$urlData = [];

// 循环初始化多个请求句柄
foreach($urls as $value){
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $value);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1); // 设置请求结果不直接输出,而是将结果交给一个变量
curl_setopt($ch, CURLOPT_TIMEOUT, 10); // 设置超时时间

$urlHandlers[] = $ch;

// 向 curl 批处理会话中添加单独的 curl 句柄
}

$active = null;

do {
// 返回的 $active 是活跃链接的数量, $mrc 是返回值,正常为0, 异常为 -1
$mrc = curl_multi_exec($mh, $active);
}while ($mrc = CURLM_CALL_MULTI_PERFORM);

// 如果还有活动的请求,同时操作状态为 OK,CURLM_OK 为常量值 0
while($active && $mrc == CURLM_OK){
// 持续查询状态并不利于处理任务,每 50ms 检查一次,此时释放 CPU ,降低机器负载
usleep(50000);
// 如果批处理句柄OK,重复检查操作状态直至OK。select返回值异常时为-1,正常为1(因为只有一个批处理句柄)
if(curl_multi_select($mh) != -1){
do{
$mrc = curl_multi_exec($mh, $active);
}while($mrc == CURLM_CALL_MULTI_PERFORM);
}
}

// 获取返回结果
foreach($urlHandlers as $index = $ch){
$urlDatap[$index] = curl_multi_getcontent($ch);
// 移除单个 curl 句柄
curl_multi_remove_handle($mh, $ch);
}

// 结束批处理句柄
curl_multi_close($mh);

// 返回结果
return $urlData;
}

在该并发请求中,先创建一个批处理句柄,然后将 url 的 CURL 句柄添加到批处理句柄中,并不断查询批处理句柄的执行状态,当执行完成后,获取返回的结果。

PHP 并发请求 - 示例2

class CurlMultiUtil {

/**
* 根据url,postData获取curl请求对象,这个比较简单,可以看官方文档
*/
private static function getCurlObject($url,$postData=array(),$header=array()){
$options = array();
$url = trim($url);
$options[CURLOPT_URL] = $url;
$options[CURLOPT_TIMEOUT] = 3;
$options[CURLOPT_RETURNTRANSFER] = true;
foreach($header as $key=>$value){
$options[$key] =$value;
}
if(!empty($postData) && is_array($postData)){
$options[CURLOPT_POST] = true;
$options[CURLOPT_POSTFIELDS] = http_build_query($postData);
}
if(stripos($url,'https') === 0){
$options[CURLOPT_SSL_VERIFYPEER] = false;
}
$ch = curl_init();
curl_setopt_array($ch,$options);
return $ch;
}

/**
* [request description]
* @param [type] $chList
* @return [type]
*/
private static function request($chList){
$downloader = curl_multi_init();
// 将三个待请求对象放入下载器中
foreach ($chList as $ch){
curl_multi_add_handle($downloader,$ch);
}
$res = array();
// 轮询
do {
while (($execrun = curl_multi_exec($downloader, $running)) == CURLM_CALL_MULTI_PERFORM);
if ($execrun != CURLM_OK) {
break;
}
// 一旦有一个请求完成,找出来,处理,因为curl底层是select,所以最大受限于1024
while ($done = curl_multi_info_read($downloader)){
// 从请求中获取信息、内容、错误
// $info = curl_getinfo($done['handle']);
$output = curl_multi_getcontent($done['handle']);
// $error = curl_error($done['handle']);
$res[] = $output;
// 把请求已经完成了得 curl handle 删除
curl_multi_remove_handle($downloader, $done['handle']);
}
// 当没有数据的时候进行堵塞,把 CPU 使用权交出来,避免上面 do 死循环空跑数据导致 CPU 100%
if ($running) {
$rel = curl_multi_select($downloader, 1);
if($rel == -1){
usleep(1000);
}
}
if($running == false){
break;
}
}while(true);
curl_multi_close($downloader);
return $res;
}

/**
* [get description]
* @param [type] $urlArr
* @return [type]
*/
public static function get($urlArr){
$data = array();
if (!empty($urlArr)) {
$chList = array();
foreach ($urlArr as $key => $url) {
$chList[] = self::getCurlObject($url);
}
$data = self::request($chList);
}
return $data;
}
}

curl_multi 相关函数

/**

* 函数作用:返回一个新 curl 批处理句柄
* @return resource 成功返回 curl 批处理句柄,失败返回false
*/
resource curl_multi_init (void)

/**
* 函数作用:向 curl 批处理绘画中添加单独的 curl 句柄
* @param $mh 由 curl_multi_init() 返回的批处理句柄
* @param $ch 由 curl_init() 返回的 curl 单独句柄
* @return resource 成功返回 curl 批处理句柄,失败返回false
*/
int curl_multi_add_handle (resource $mh, resource $ch)

/**
* 函数作用:运行当前 curl 句柄的子链接
* @param $mh 由 curl_multi_init() 返回的批处理句柄
* @param $still_running 一个用来判断才做是否仍在执行的标识的引用
* @return 一个定义与 curl 预定义常量中的 curl 代码
*/
int curl_multi_exec (resource $mh, int &$still_running)

/**
* 函数作用:等待所有 curl 批处理中的活动链接
* @param $mh 由 curl_multi_init() 返回的批处理句柄
* @param $timeout 以秒为单位,等待响应的时间
* @return 成功时返回描述符集合中描述符的数量。失败时,select 失败时返回 -1, 否则返回超时(从底层的 select 系统调用)
*/
int curl_multi_select (resource $mh [, float $timeout = 1.0])

/**
* 函数作用:移除 curl 批处理句柄资源中的某个句柄资源
* 说明:从给定的批处理句柄 $mh 中移除 $ch 句柄。当 $ch 句柄被移除以后,仍然可以合法的用 curl_exec() 执行这个句柄,如果要移除的句柄正在被使用,则这个句柄涉及的所有传输任务会被终止。
* @param $mh 由 curl_multi_init() 返回的批处理句柄
* @param $ch 由 curl_init() 返回的单个 curl 句柄
* @return 成功时返回0,失败时返回 CURLM_XXX 中的一个
*/
int curl_multi_remove_handle (resource $mh, resource $ch)

/**
* 函数作用:关闭一组cURL句柄
* @param $mh 由curl_multi_init返回的批处理句柄
* @return void
*/
void curl_multi_close (resource $mh)

/**
* 函数作用:如果设置了 CURLOPT_RETURNTRANSFER,则返回获取的输出的文本流
* @param $ch 由 curl_init 返回的 curl 句柄
* @return string 如果设置了 CURLOPT_RETURNTRANSFER,则返回获取的输出的文本流
*/
string curl_multi_getcontent (resource $ch)

注意项

并发数限制

curl_multi 会消耗很多的系统资源,在并发请求时,并发数有一定阈值,一般为 512 ,是由于 curl 内部限制,超过最大并发会导致失败。

超时时间

为了防止慢请求影响整个服务,可以设置 CURLOPT_TIMEOUT 来控制超时时间,防止部分假死的请求无线阻塞进程处理,最后打死机器服务。

返回
顶部