SocketWorker.php 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770
  1. <?php
  2. namespace WORKERMAN\Core;
  3. require_once WORKERMAN_ROOT_DIR . 'Core/Events/Select.php';
  4. require_once WORKERMAN_ROOT_DIR . 'Core/AbstractWorker.php';
  5. require_once WORKERMAN_ROOT_DIR . 'Core/Lib/Config.php';
  6. /**
  7. * SocketWorker 监听某个端口,对外提供网络服务的worker
  8. *
  9. * @author walkor <worker-man@qq.com>
  10. *
  11. * <b>使用示例:</b>
  12. * <pre>
  13. * <code>
  14. * $worker = new SocketWorker();
  15. * $worker->start();
  16. * <code>
  17. * </pre>
  18. */
  19. abstract class SocketWorker extends AbstractWorker
  20. {
  21. /**
  22. * udp最大包长 linux:65507 mac:9216
  23. * @var integer
  24. */
  25. const MAX_UDP_PACKEG_SIZE = 65507;
  26. /**
  27. * 停止服务后等待EXIT_WAIT_TIME秒后还没退出则强制退出
  28. * @var integer
  29. */
  30. const EXIT_WAIT_TIME = 3;
  31. /**
  32. * 进程意外退出状态码
  33. * @var integer
  34. */
  35. const EXIT_UNEXPECT_CODE = 119;
  36. /**
  37. * worker的传输层协议
  38. * @var string
  39. */
  40. protected $protocol = "tcp";
  41. /**
  42. * worker监听端口的Socket
  43. * @var resource
  44. */
  45. protected $mainSocket = null;
  46. /**
  47. * worker接受的所有链接
  48. * @var array
  49. */
  50. protected $connections = array();
  51. /**
  52. * worker的所有读buffer
  53. * @var array
  54. */
  55. protected $recvBuffers = array();
  56. /**
  57. * 当前处理的fd
  58. * @var integer
  59. */
  60. protected $currentDealFd = 0;
  61. /**
  62. * UDP当前处理的客户端地址
  63. * @var string
  64. */
  65. protected $currentClientAddress = '';
  66. /**
  67. * worker的服务状态
  68. * @var integer
  69. */
  70. protected $workerStatus = self::STATUS_RUNNING;
  71. /**
  72. * 是否是长链接,(短连接每次请求后服务器主动断开,长连接一般是客户端主动断开)
  73. * @var bool
  74. */
  75. protected $isPersistentConnection = false;
  76. /**
  77. * 事件轮询库的名称
  78. * @var string
  79. */
  80. protected $eventLoopName ="\\WORKERMAN\\Core\\Events\\Select";
  81. /**
  82. * 时间轮询库实例
  83. * @var object
  84. */
  85. protected $event = null;
  86. /**
  87. * worker名称
  88. * @var string
  89. */
  90. protected $workerName = __CLASS__;
  91. /**
  92. * 该worker进程处理多少请求后退出,0表示不自动退出
  93. * @var integer
  94. */
  95. protected $maxRequests = 0;
  96. /**
  97. * 预读长度
  98. * @var integer
  99. */
  100. protected $prereadLength = 4;
  101. /**
  102. * 该进程使用的php文件
  103. * @var array
  104. */
  105. protected $includeFiles = array();
  106. /**
  107. * 统计信息
  108. * @var array
  109. */
  110. protected $statusInfo = array(
  111. 'start_time' => 0, // 该进程开始时间戳
  112. 'total_request' => 0, // 该进程处理的总请求数
  113. 'recv_timeout' => 0, // 该进程接收数据超时总数
  114. 'proc_timeout' => 0, // 该进程逻辑处理超时总数
  115. 'packet_err' => 0, // 该进程收到错误数据包的总数
  116. 'throw_exception' => 0, // 该进程逻辑处理时收到异常的总数
  117. 'thunder_herd' => 0, // 该进程受惊群效应影响的总数
  118. 'client_close' => 0, // 客户端提前关闭链接总数
  119. 'send_fail' => 0, // 发送数据给客户端失败总数
  120. );
  121. /**
  122. * 用户worker继承此worker类必须实现该方法,根据具体协议和当前收到的数据决定是否继续收包
  123. * @param string $recv_str 收到的数据包
  124. * @return int/false 返回0表示接收完毕/>0表示还有多少字节没有接收到/false出错
  125. */
  126. abstract public function dealInput($recv_str);
  127. /**
  128. * 用户worker继承此worker类必须实现该方法,根据包中的数据处理逻辑
  129. * 逻辑处理
  130. * @param string $recv_str 收到的数据包
  131. * @return void
  132. */
  133. abstract public function dealProcess($recv_str);
  134. /**
  135. * 构造函数
  136. * @param int $port
  137. * @param string $ip
  138. * @param string $protocol
  139. * @return void
  140. */
  141. public function __construct($worker_name = '')
  142. {
  143. // worker name
  144. if(!empty($worker_name))
  145. {
  146. $this->workerName = $worker_name;
  147. }
  148. else
  149. {
  150. $this->workerName = get_class($this);
  151. }
  152. // 是否开启长连接
  153. $this->isPersistentConnection = (bool)Lib\Config::get('workers.' . $worker_name . '.socket.persistent');
  154. // 最大请求数,如果没有配置则使用PHP_INT_MAX
  155. $this->maxRequests = (int)Lib\Config::get('workers.' . $worker_name . '.max_requests');
  156. $this->maxRequests = $this->maxRequests <= 0 ? PHP_INT_MAX : $this->maxRequests;
  157. $preread_length = (int)Lib\Config::get('workers.' . $worker_name . '.preread_length');
  158. if($preread_length > 0)
  159. {
  160. $this->prereadLength = $preread_length;
  161. }
  162. elseif(!$this->isPersistentConnection)
  163. {
  164. $this->prereadLength = 65535;
  165. }
  166. // worker启动时间
  167. $this->statusInfo['start_time'] = time();
  168. //事件轮询库
  169. if(extension_loaded('libevent'))
  170. {
  171. $this->setEventLoopName('Libevent');
  172. }
  173. // 检查退出状态
  174. $this->addShutdownHook();
  175. // 初始化事件轮询库
  176. // $this->event = new Libevent();
  177. // $this->event = new Select();
  178. $this->event = new $this->eventLoopName();
  179. }
  180. /**
  181. * 让该worker实例开始服务
  182. *
  183. * @return void
  184. */
  185. public function start()
  186. {
  187. // 安装信号处理函数
  188. $this->installSignal();
  189. // 触发该worker进程onStart事件,该进程整个生命周期只触发一次
  190. if($this->onStart())
  191. {
  192. return;
  193. }
  194. if($this->protocol == 'udp')
  195. {
  196. // 添加读udp事件
  197. $this->event->add($this->mainSocket, Events\BaseEvent::EV_READ, array($this, 'recvUdp'));
  198. }
  199. else
  200. {
  201. // 添加accept事件
  202. $ret = $this->event->add($this->mainSocket, Events\BaseEvent::EV_READ, array($this, 'accept'));
  203. }
  204. // 主体循环,整个子进程会阻塞在这个函数上
  205. $ret = $this->event->loop();
  206. $this->notice("evet->loop returned " . var_export($ret, true));
  207. exit(self::EXIT_UNEXPECT_CODE);
  208. }
  209. /**
  210. * 停止服务
  211. * @param bool $exit 是否退出
  212. * @return void
  213. */
  214. public function stop($exit = true)
  215. {
  216. // 触发该worker进程onStop事件
  217. if($this->onStop())
  218. {
  219. return;
  220. }
  221. // 标记这个worker开始停止服务
  222. if($this->workerStatus != self::STATUS_SHUTDOWN)
  223. {
  224. // 停止接收连接
  225. $this->event->del($this->mainSocket, Events\BaseEvent::EV_READ);
  226. fclose($this->mainSocket);
  227. $this->workerStatus = self::STATUS_SHUTDOWN;
  228. }
  229. // 没有链接要处理了
  230. if($this->allTaskHasDone())
  231. {
  232. if($exit)
  233. {
  234. exit(0);
  235. }
  236. }
  237. }
  238. /**
  239. * 设置worker监听的socket
  240. * @param resource $socket
  241. * @return void
  242. */
  243. public function setListendSocket($socket)
  244. {
  245. // 初始化
  246. $this->mainSocket = $socket;
  247. // 设置监听socket非阻塞
  248. stream_set_blocking($this->mainSocket, 0);
  249. // 获取协议
  250. $mata_data = stream_get_meta_data($socket);
  251. $this->protocol = substr($mata_data['stream_type'], 0, 3);
  252. }
  253. /**
  254. * 设置worker的事件轮询库的名称
  255. * @param string
  256. * @return void
  257. */
  258. public function setEventLoopName($event_loop_name)
  259. {
  260. $this->eventLoopName = "\\WORKERMAN\\Core\\Events\\".$event_loop_name;
  261. require_once WORKERMAN_ROOT_DIR . 'Core/Events/'.ucfirst(str_replace('WORKERMAN', '', $event_loop_name)).'.php';
  262. }
  263. /**
  264. * 接受一个链接
  265. * @param resource $socket
  266. * @param $null_one $flag
  267. * @param $null_two $base
  268. * @return void
  269. */
  270. public function accept($socket, $null_one = null, $null_two = null)
  271. {
  272. // 获得一个连接
  273. $new_connection = @stream_socket_accept($socket, 0);
  274. // 可能是惊群效应
  275. if(false === $new_connection)
  276. {
  277. $this->statusInfo['thunder_herd']++;
  278. return false;
  279. }
  280. // 接受请求数加1
  281. $this->statusInfo['total_request'] ++;
  282. // 连接的fd序号
  283. $fd = (int) $new_connection;
  284. $this->connections[$fd] = $new_connection;
  285. $this->recvBuffers[$fd] = array('buf'=>'', 'remain_len'=>$this->prereadLength);
  286. // 非阻塞
  287. stream_set_blocking($this->connections[$fd], 0);
  288. $this->event->add($this->connections[$fd], Events\BaseEvent::EV_READ , array($this, 'dealInputBase'), $fd);
  289. return $new_connection;
  290. }
  291. /**
  292. * 接收Udp数据
  293. * 如果数据超过一个udp包长,需要业务自己解析包体,判断数据是否全部到达
  294. * @param resource $socket
  295. * @param $null_one $flag
  296. * @param $null_two $base
  297. * @return void
  298. */
  299. public function recvUdp($socket, $null_one = null, $null_two = null)
  300. {
  301. $data = stream_socket_recvfrom($socket , self::MAX_UDP_PACKEG_SIZE, 0, $address);
  302. // 可能是惊群效应
  303. if(false === $data || empty($address))
  304. {
  305. $this->statusInfo['thunder_herd']++;
  306. return false;
  307. }
  308. // 接受请求数加1
  309. $this->statusInfo['total_request'] ++;
  310. $this->currentClientAddress = $address;
  311. if(0 === $this->dealInput($data))
  312. {
  313. $this->dealProcess($data);
  314. }
  315. }
  316. /**
  317. * 处理受到的数据
  318. * @param event_buffer $event_buffer
  319. * @param int $fd
  320. * @return void
  321. */
  322. public function dealInputBase($connection, $flag, $fd = null)
  323. {
  324. $this->currentDealFd = $fd;
  325. $buffer = stream_socket_recvfrom($connection, $this->recvBuffers[$fd]['remain_len']);
  326. // 出错了
  327. if('' == $buffer)
  328. {
  329. if(feof($connection))
  330. {
  331. // 客户端提前断开链接
  332. $this->statusInfo['client_close']++;
  333. // 如果该链接对应的buffer有数据,说明放生错误
  334. if(!empty($this->recvBuffers[$fd]['buf']))
  335. {
  336. $this->notice("CLIENT_CLOSE\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".var_export($this->recvBuffers[$fd]['buf'],true)."]\n");
  337. }
  338. }
  339. else
  340. {
  341. // 超时了
  342. $this->statusInfo['recv_timeout']++;
  343. // 如果该链接对应的buffer有数据,说明放生错误
  344. if(!empty($this->recvBuffers[$fd]['buf']))
  345. {
  346. $this->notice("RECV_TIMEOUT\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".var_export($this->recvBuffers[$fd]['buf'],true)."]\n");
  347. }
  348. }
  349. // 关闭链接
  350. $this->closeClient($fd);
  351. if($this->workerStatus == self::STATUS_SHUTDOWN)
  352. {
  353. $this->stop();
  354. }
  355. return;
  356. }
  357. $this->recvBuffers[$fd]['buf'] .= $buffer;
  358. $remain_len = $this->dealInput($this->recvBuffers[$fd]['buf']);
  359. // 包接收完毕
  360. if(0 === $remain_len)
  361. {
  362. // 执行处理
  363. try{
  364. // 业务处理
  365. $this->dealProcess($this->recvBuffers[$fd]['buf']);
  366. }
  367. catch(\Exception $e)
  368. {
  369. // 关闭闹钟
  370. //pcntl_alarm(0);
  371. $this->notice('CODE:' . $e->getCode() . ' MESSAGE:' . $e->getMessage()."\n".$e->getTraceAsString()."\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".var_export($this->recvBuffers[$fd]['buf'],true)."]\n");
  372. $this->statusInfo['throw_exception'] ++;
  373. $this->sendToClient($e->getMessage());
  374. }
  375. // 是否是长连接
  376. if($this->isPersistentConnection)
  377. {
  378. // 清空缓冲buffer
  379. $this->recvBuffers[$fd] = array('buf'=>'', 'remain_len'=>$this->prereadLength);
  380. }
  381. else
  382. {
  383. // 关闭链接
  384. $this->closeClient($fd);
  385. }
  386. }
  387. // 出错
  388. else if(false === $remain_len)
  389. {
  390. // 出错
  391. $this->statusInfo['packet_err']++;
  392. $this->sendToClient('packet_err:'.$this->recvBuffers[$fd]['buf']);
  393. $this->notice("PACKET_ERROR\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".var_export($this->recvBuffers[$fd]['buf'],true)."]\n");
  394. $this->closeClient($fd);
  395. }
  396. else
  397. {
  398. $this->recvBuffers[$fd]['remain_len'] = $remain_len;
  399. }
  400. // 检查是否是关闭状态或者是否到达请求上限
  401. if($this->workerStatus == self::STATUS_SHUTDOWN || $this->statusInfo['total_request'] >= $this->maxRequests)
  402. {
  403. // 关闭链接
  404. if($this->isPersistentConnection)
  405. {
  406. $this->closeClient($fd);
  407. }
  408. // 停止服务
  409. $this->stop();
  410. // EXIT_WAIT_TIME秒后退出进程
  411. pcntl_alarm(self::EXIT_WAIT_TIME);
  412. }
  413. }
  414. /**
  415. * 根据fd关闭链接
  416. * @param int $fd
  417. * @return void
  418. */
  419. protected function closeClient($fd)
  420. {
  421. // udp忽略
  422. if($this->protocol != 'udp')
  423. {
  424. $this->event->del($this->connections[$fd], Events\BaseEvent::EV_READ);
  425. fclose($this->connections[$fd]);
  426. unset($this->connections[$fd], $this->recvBuffers[$fd]);
  427. }
  428. }
  429. /**
  430. * 安装信号处理函数
  431. * @return void
  432. */
  433. protected function installSignal()
  434. {
  435. // 如果是由worker脚本启动则不安装信号
  436. if(!defined('WORKERMAN_PID_FILE'))
  437. {
  438. return;
  439. }
  440. // 闹钟信号
  441. $this->event->add(SIGALRM, Events\BaseEvent::EV_SIGNAL, array($this, 'signalHandler'), SIGALRM);
  442. // 终止进程信号
  443. $this->event->add(SIGINT, Events\BaseEvent::EV_SIGNAL, array($this, 'signalHandler'), SIGINT);
  444. // 平滑重启信号
  445. $this->event->add(SIGHUP, Events\BaseEvent::EV_SIGNAL, array($this, 'signalHandler'), SIGHUP);
  446. // 报告进程状态
  447. $this->event->add(SIGUSR1, Events\BaseEvent::EV_SIGNAL, array($this, 'signalHandler'), SIGUSR1);
  448. // 报告该进程使用的文件
  449. $this->event->add(SIGUSR2, Events\BaseEvent::EV_SIGNAL, array($this, 'signalHandler'), SIGUSR2);
  450. // 设置忽略信号
  451. pcntl_signal(SIGTTIN, SIG_IGN);
  452. pcntl_signal(SIGTTOU, SIG_IGN);
  453. pcntl_signal(SIGQUIT, SIG_IGN);
  454. pcntl_signal(SIGPIPE, SIG_IGN);
  455. pcntl_signal(SIGCHLD, SIG_IGN);
  456. }
  457. /**
  458. * 设置server信号处理函数
  459. * @param null $null
  460. * @param int $signal
  461. */
  462. public function signalHandler($signal, $null = null, $null = null)
  463. {
  464. switch($signal)
  465. {
  466. // 时钟处理函数
  467. case SIGALRM:
  468. // 停止服务后EXIT_WAIT_TIME秒还没退出则强制退出
  469. if($this->workerStatus == self::STATUS_SHUTDOWN)
  470. {
  471. exit(0);
  472. }
  473. break;
  474. // 停止该进程
  475. case SIGINT:
  476. // 平滑重启
  477. case SIGHUP:
  478. $this->stop();
  479. // EXIT_WAIT_TIME秒后退出进程
  480. pcntl_alarm(self::EXIT_WAIT_TIME);
  481. break;
  482. // 报告进程状态
  483. case SIGUSR1:
  484. $this->writeStatusToQueue();
  485. break;
  486. // 报告进程使用的php文件
  487. case SIGUSR2:
  488. $this->writeFilesListToQueue();
  489. break;
  490. }
  491. }
  492. /**
  493. * 发送数据到客户端
  494. * @return bool
  495. */
  496. public function sendToClient($str_to_send)
  497. {
  498. // tcp
  499. if($this->protocol != 'udp')
  500. {
  501. // tcp 如果一次没写完(一般是缓冲区满的情况),则阻塞写
  502. if(!$this->blockWrite($this->connections[$this->currentDealFd], $str_to_send, 500))
  503. {
  504. $this->notice('sendToClient fail ,Data length = ' . strlen($str_to_send));
  505. $this->statusInfo['send_fail']++;
  506. return false;
  507. }
  508. return true;
  509. }
  510. // udp 直接发送,要求数据包不能超过65515
  511. $len = stream_socket_sendto($this->mainSocket, $str_to_send, 0, $this->currentClientAddress);
  512. return $len == strlen($str_to_send);
  513. }
  514. /**
  515. * 向fd写数据,如果socket缓冲区满了,则改用阻塞模式写数据
  516. * @param resource $fd
  517. * @param string $str_to_write
  518. * @param int $time_out 单位毫秒
  519. * @return bool
  520. */
  521. protected function blockWrite($fd, $str_to_write, $timeout_ms = 500)
  522. {
  523. $send_len = @fwrite($fd, $str_to_write);
  524. if($send_len == strlen($str_to_write))
  525. {
  526. return true;
  527. }
  528. // 客户端关闭
  529. if(feof($fd))
  530. {
  531. $this->notice("blockWrite client close");
  532. return false;
  533. }
  534. // 设置阻塞
  535. stream_set_blocking($fd, 1);
  536. // 设置超时
  537. $timeout_sec = floor($timeout_ms/1000);
  538. $timeout_ms = $timeout_ms%1000;
  539. stream_set_timeout($fd, $timeout_sec, $timeout_ms*1000);
  540. $send_len += @fwrite($fd, substr($str_to_write, $send_len));
  541. // 改回非阻塞
  542. stream_set_blocking($fd, 0);
  543. return $send_len == strlen($str_to_write);
  544. }
  545. /**
  546. * 获取客户端ip
  547. * @param int $fd 已经链接的socket id
  548. * @return string
  549. */
  550. public function getRemoteIp($fd = null)
  551. {
  552. if(empty($fd))
  553. {
  554. if(!isset($this->connections[$this->currentDealFd]))
  555. {
  556. return '0.0.0.0';
  557. }
  558. $fd = $this->currentDealFd;
  559. }
  560. $ip = '';
  561. if($this->protocol == 'udp')
  562. {
  563. $sock_name = $this->currentClientAddress;
  564. }
  565. else
  566. {
  567. $sock_name = stream_socket_get_name($this->connections[$fd], true);
  568. }
  569. if($sock_name)
  570. {
  571. $tmp = explode(':', $sock_name);
  572. $ip = $tmp[0];
  573. }
  574. return $ip;
  575. }
  576. /**
  577. * 获取本地ip
  578. * @return string
  579. */
  580. public function getLocalIp()
  581. {
  582. $ip = '';
  583. $sock_name = '';
  584. if($this->protocol == 'udp' || !isset($this->connections[$this->currentDealFd]))
  585. {
  586. $sock_name = stream_socket_get_name($this->mainSocket, false);
  587. }
  588. else
  589. {
  590. $sock_name = stream_socket_get_name($this->connections[$this->currentDealFd], false);
  591. }
  592. if($sock_name)
  593. {
  594. $tmp = explode(':', $sock_name);
  595. $ip = $tmp[0];
  596. }
  597. if(empty($ip) || '127.0.0.1' == $ip)
  598. {
  599. $ip = gethostbyname(trim(`hostname`));
  600. }
  601. return $ip;
  602. }
  603. /**
  604. * 将当前worker进程状态写入消息队列
  605. * @return void
  606. */
  607. protected function writeStatusToQueue()
  608. {
  609. if(!Master::getQueueId())
  610. {
  611. return;
  612. }
  613. $error_code = 0;
  614. msg_send(Master::getQueueId(), self::MSG_TYPE_STATUS, array_merge($this->statusInfo, array('memory'=>memory_get_usage(true), 'pid'=>posix_getpid(), 'worker_name' => $this->workerName)), true, false, $error_code);
  615. }
  616. /**
  617. * 开发环境将当前进程使用的文件写入消息队列,用于FileMonitor监控文件更新
  618. * @return void
  619. */
  620. protected function writeFilesListToQueue()
  621. {
  622. if(!Master::getQueueId())
  623. {
  624. return;
  625. }
  626. $error_code = 0;
  627. $flip_file_list = array_flip(get_included_files());
  628. $file_list = array_diff_key($flip_file_list, $this->includeFiles);
  629. $this->includeFiles = $flip_file_list;
  630. if($file_list)
  631. {
  632. msg_send(Master::getQueueId(), self::MSG_TYPE_FILE_MONITOR, array_keys($file_list), true, false, $error_code);
  633. }
  634. }
  635. /**
  636. * 是否所有任务都已经完成
  637. * @return bool
  638. */
  639. protected function allTaskHasDone()
  640. {
  641. // 如果是长链接并且没有要处理的数据则是任务都处理完了
  642. return $this->noConnections() || ($this->isPersistentConnection && $this->allBufferIsEmpty());
  643. }
  644. /**
  645. * 检查是否所有的链接的缓冲区都是空
  646. * @return bool
  647. */
  648. protected function allBufferIsEmpty()
  649. {
  650. foreach($this->recvBuffers as $fd => $buf)
  651. {
  652. if(!empty($buf['buf']))
  653. {
  654. return false;
  655. }
  656. }
  657. return true;
  658. }
  659. /**
  660. * 该进程收到的任务是否都已经完成,重启进程时需要判断
  661. * @return bool
  662. */
  663. protected function noConnections()
  664. {
  665. return empty($this->connections);
  666. }
  667. /**
  668. * 该worker进程开始服务的时候会触发一次,可以在这里做一些全局的事情
  669. * @return bool
  670. */
  671. protected function onStart()
  672. {
  673. return false;
  674. }
  675. /**
  676. * 该worker进程停止服务的时候会触发一次,可以在这里做一些全局的事情
  677. * @return bool
  678. */
  679. protected function onStop()
  680. {
  681. return false;
  682. }
  683. }