SocketWorker.php 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803
  1. <?php
  2. namespace Man\Core;
  3. use Man\Core\Events\BaseEvent;
  4. require_once WORKERMAN_ROOT_DIR . 'man/Core/Events/Select.php';
  5. require_once WORKERMAN_ROOT_DIR . 'man/Core/AbstractWorker.php';
  6. require_once WORKERMAN_ROOT_DIR . 'man/Core/Lib/Config.php';
  7. /**
  8. * SocketWorker 监听某个端口,对外提供网络服务的worker
  9. *
  10. * @author walkor <worker-man@qq.com>
  11. *
  12. * <b>使用示例:</b>
  13. * <pre>
  14. * <code>
  15. * $worker = new SocketWorker();
  16. * $worker->start();
  17. * <code>
  18. * </pre>
  19. */
  20. abstract class SocketWorker extends AbstractWorker
  21. {
  22. /**
  23. * udp最大包长 linux:65507 mac:9216
  24. * @var integer
  25. */
  26. const MAX_UDP_PACKEG_SIZE = 65507;
  27. /**
  28. * 停止服务后等待EXIT_WAIT_TIME秒后还没退出则强制退出
  29. * @var integer
  30. */
  31. const EXIT_WAIT_TIME = 3;
  32. /**
  33. * worker的传输层协议
  34. * @var string
  35. */
  36. protected $protocol = "tcp";
  37. /**
  38. * worker监听端口的Socket
  39. * @var resource
  40. */
  41. protected $mainSocket = null;
  42. /**
  43. * worker接受的所有链接
  44. * @var array
  45. */
  46. protected $connections = array();
  47. /**
  48. * 客户端连接的读buffer
  49. * @var array
  50. */
  51. protected $recvBuffers = array();
  52. /**
  53. * 客户端连接的写buffer
  54. * @var array
  55. */
  56. protected $sendBuffers = array();
  57. /**
  58. * 当前处理的fd
  59. * @var integer
  60. */
  61. protected $currentDealFd = 0;
  62. /**
  63. * UDP当前处理的客户端地址
  64. * @var string
  65. */
  66. protected $currentClientAddress = '';
  67. /**
  68. * 是否是长链接,(短连接每次请求后服务器主动断开,长连接一般是客户端主动断开)
  69. * @var bool
  70. */
  71. protected $isPersistentConnection = false;
  72. /**
  73. * 事件轮询库的名称
  74. * @var string
  75. */
  76. protected $eventLoopName ="\\Man\\Core\\Events\\Select";
  77. /**
  78. * 时间轮询库实例
  79. * @var object
  80. */
  81. protected $event = null;
  82. /**
  83. * 该worker进程处理多少请求后退出,0表示不自动退出
  84. * @var integer
  85. */
  86. protected $maxRequests = 0;
  87. /**
  88. * 预读长度
  89. * @var integer
  90. */
  91. protected $prereadLength = 4;
  92. /**
  93. * 该进程使用的php文件
  94. * @var array
  95. */
  96. protected $includeFiles = array();
  97. /**
  98. * 统计信息
  99. * @var array
  100. */
  101. protected $statusInfo = array(
  102. 'start_time' => 0, // 该进程开始时间戳
  103. 'total_request' => 0, // 该进程处理的总请求数
  104. 'packet_err' => 0, // 该进程收到错误数据包的总数
  105. 'throw_exception' => 0, // 该进程逻辑处理时收到异常的总数
  106. 'thunder_herd' => 0, // 该进程受惊群效应影响的总数
  107. 'client_close' => 0, // 客户端提前关闭链接总数
  108. 'send_fail' => 0, // 发送数据给客户端失败总数
  109. );
  110. /**
  111. * 用户worker继承此worker类必须实现该方法,根据具体协议和当前收到的数据决定是否继续收包
  112. * @param string $recv_str 收到的数据包
  113. * @return int/false 返回0表示接收完毕/>0表示还有多少字节没有接收到/false出错
  114. */
  115. abstract public function dealInput($recv_str);
  116. /**
  117. * 用户worker继承此worker类必须实现该方法,根据包中的数据处理逻辑
  118. * 逻辑处理
  119. * @param string $recv_str 收到的数据包
  120. * @return void
  121. */
  122. abstract public function dealProcess($recv_str);
  123. /**
  124. * 构造函数
  125. * @param int $port
  126. * @param string $ip
  127. * @param string $protocol
  128. * @return void
  129. */
  130. public function __construct($worker_name = null)
  131. {
  132. // worker name
  133. $this->workerName = $worker_name ? $worker_name : get_class($this);
  134. // 是否开启长连接
  135. $this->isPersistentConnection = (bool)Lib\Config::get( $this->workerName . '.persistent_connection');
  136. // 最大请求数,如果没有配置则使用PHP_INT_MAX
  137. $this->maxRequests = (int)Lib\Config::get( $this->workerName . '.max_requests');
  138. $this->maxRequests = $this->maxRequests <= 0 ? PHP_INT_MAX : $this->maxRequests;
  139. $preread_length = (int)Lib\Config::get( $this->workerName . '.preread_length');
  140. if($preread_length > 0)
  141. {
  142. $this->prereadLength = $preread_length;
  143. }
  144. elseif(!$this->isPersistentConnection)
  145. {
  146. $this->prereadLength = 65535;
  147. }
  148. // worker启动时间
  149. $this->statusInfo['start_time'] = time();
  150. //事件轮询库
  151. if(extension_loaded('libevent'))
  152. {
  153. $this->setEventLoopName('Libevent');
  154. }
  155. // 检查退出状态
  156. $this->addShutdownHook();
  157. // 初始化事件轮询库
  158. // $this->event = new Libevent();
  159. // $this->event = new Select();
  160. $this->event = new $this->eventLoopName();
  161. }
  162. /**
  163. * 让该worker实例开始服务
  164. *
  165. * @return void
  166. */
  167. public function start()
  168. {
  169. // 安装信号处理函数
  170. $this->installSignal();
  171. // 触发该worker进程onStart事件,该进程整个生命周期只触发一次
  172. if($this->onStart())
  173. {
  174. return;
  175. }
  176. if($this->protocol == 'udp')
  177. {
  178. // 添加读udp事件
  179. $this->event->add($this->mainSocket, Events\BaseEvent::EV_READ, array($this, 'recvUdp'));
  180. }
  181. else
  182. {
  183. // 添加accept事件
  184. $ret = $this->event->add($this->mainSocket, Events\BaseEvent::EV_READ, array($this, 'accept'));
  185. }
  186. // 主体循环,整个子进程会阻塞在这个函数上
  187. $ret = $this->event->loop();
  188. $this->notice('worker loop exit');
  189. exit(0);
  190. }
  191. /**
  192. * 停止服务
  193. * @param bool $exit 是否退出
  194. * @return void
  195. */
  196. public function stop($exit = true)
  197. {
  198. // 触发该worker进程onStop事件
  199. if($this->onStop())
  200. {
  201. return;
  202. }
  203. // 标记这个worker开始停止服务
  204. if($this->workerStatus != self::STATUS_SHUTDOWN)
  205. {
  206. // 停止接收连接
  207. $this->event->del($this->mainSocket, Events\BaseEvent::EV_READ);
  208. fclose($this->mainSocket);
  209. $this->workerStatus = self::STATUS_SHUTDOWN;
  210. }
  211. // 没有链接要处理了
  212. if($this->allTaskHasDone())
  213. {
  214. if($exit)
  215. {
  216. exit(0);
  217. }
  218. }
  219. }
  220. /**
  221. * 设置worker监听的socket
  222. * @param resource $socket
  223. * @return void
  224. */
  225. public function setListendSocket($socket)
  226. {
  227. // 初始化
  228. $this->mainSocket = $socket;
  229. // 设置监听socket非阻塞
  230. stream_set_blocking($this->mainSocket, 0);
  231. // 获取协议
  232. $mata_data = stream_get_meta_data($socket);
  233. $this->protocol = substr($mata_data['stream_type'], 0, 3);
  234. }
  235. /**
  236. * 设置worker的事件轮询库的名称
  237. * @param string
  238. * @return void
  239. */
  240. public function setEventLoopName($event_loop_name)
  241. {
  242. $this->eventLoopName = "\\Man\\Core\\Events\\".$event_loop_name;
  243. require_once WORKERMAN_ROOT_DIR . 'man/Core/Events/'.ucfirst(str_replace('WORKERMAN', '', $event_loop_name)).'.php';
  244. }
  245. /**
  246. * 接受一个链接
  247. * @param resource $socket
  248. * @param $null_one $flag
  249. * @param $null_two $base
  250. * @return void
  251. */
  252. public function accept($socket, $null_one = null, $null_two = null)
  253. {
  254. // 获得一个连接
  255. $new_connection = @stream_socket_accept($socket, 0);
  256. // 可能是惊群效应
  257. if(false === $new_connection)
  258. {
  259. $this->statusInfo['thunder_herd']++;
  260. return false;
  261. }
  262. // 接受请求数加1
  263. $this->statusInfo['total_request'] ++;
  264. // 连接的fd序号
  265. $fd = (int) $new_connection;
  266. $this->connections[$fd] = $new_connection;
  267. $this->recvBuffers[$fd] = array('buf'=>'', 'remain_len'=>$this->prereadLength);
  268. // 非阻塞
  269. stream_set_blocking($this->connections[$fd], 0);
  270. $this->event->add($this->connections[$fd], Events\BaseEvent::EV_READ , array($this, 'dealInputBase'), $fd);
  271. return $new_connection;
  272. }
  273. /**
  274. * 接收Udp数据
  275. * 如果数据超过一个udp包长,需要业务自己解析包体,判断数据是否全部到达
  276. * @param resource $socket
  277. * @param $null_one $flag
  278. * @param $null_two $base
  279. * @return void
  280. */
  281. public function recvUdp($socket, $null_one = null, $null_two = null)
  282. {
  283. $data = stream_socket_recvfrom($socket , self::MAX_UDP_PACKEG_SIZE, 0, $address);
  284. // 可能是惊群效应
  285. if(false === $data || empty($address))
  286. {
  287. $this->statusInfo['thunder_herd']++;
  288. return false;
  289. }
  290. // 接受请求数加1
  291. $this->statusInfo['total_request'] ++;
  292. $this->currentClientAddress = $address;
  293. if(0 === $this->dealInput($data))
  294. {
  295. $this->dealProcess($data);
  296. }
  297. }
  298. /**
  299. * 处理受到的数据
  300. * @param event_buffer $event_buffer
  301. * @param int $fd
  302. * @return void
  303. */
  304. public function dealInputBase($connection, $flag, $fd = null)
  305. {
  306. $this->currentDealFd = $fd;
  307. $buffer = stream_socket_recvfrom($connection, $this->recvBuffers[$fd]['remain_len']);
  308. // 出错了
  309. if('' == $buffer && '' == ($buffer = fread($connection, $this->recvBuffers[$fd]['remain_len'])))
  310. {
  311. if(!feof($connection))
  312. {
  313. return;
  314. }
  315. // 客户端提前断开链接
  316. $this->statusInfo['client_close']++;
  317. // 如果该链接对应的buffer有数据,说明发生错误
  318. if(!empty($this->recvBuffers[$fd]['buf']))
  319. {
  320. $this->notice("CLIENT_CLOSE\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".var_export($this->recvBuffers[$fd]['buf'],true)."]\n");
  321. }
  322. // 关闭链接
  323. $this->closeClient($fd);
  324. if($this->workerStatus == self::STATUS_SHUTDOWN)
  325. {
  326. $this->stop();
  327. }
  328. return;
  329. }
  330. $this->recvBuffers[$fd]['buf'] .= $buffer;
  331. $remain_len = $this->dealInput($this->recvBuffers[$fd]['buf']);
  332. // 包接收完毕
  333. if(0 === $remain_len)
  334. {
  335. // 执行处理
  336. try{
  337. // 业务处理
  338. $this->dealProcess($this->recvBuffers[$fd]['buf']);
  339. }
  340. catch(\Exception $e)
  341. {
  342. $this->notice('CODE:' . $e->getCode() . ' MESSAGE:' . $e->getMessage()."\n".$e->getTraceAsString()."\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".var_export($this->recvBuffers[$fd]['buf'],true)."]\n");
  343. $this->statusInfo['throw_exception'] ++;
  344. $this->sendToClient($e->getMessage());
  345. }
  346. // 是否是长连接
  347. if($this->isPersistentConnection)
  348. {
  349. // 清空缓冲buffer
  350. $this->recvBuffers[$fd] = array('buf'=>'', 'remain_len'=>$this->prereadLength);
  351. }
  352. else
  353. {
  354. // 关闭链接
  355. if(empty($this->sendBuffers[$fd]))
  356. {
  357. $this->closeClient($fd);
  358. }
  359. }
  360. }
  361. // 出错
  362. else if(false === $remain_len)
  363. {
  364. // 出错
  365. $this->statusInfo['packet_err']++;
  366. $this->sendToClient('packet_err:'.$this->recvBuffers[$fd]['buf']);
  367. $this->notice("PACKET_ERROR\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".var_export($this->recvBuffers[$fd]['buf'],true)."]\n");
  368. $this->closeClient($fd);
  369. }
  370. else
  371. {
  372. $this->recvBuffers[$fd]['remain_len'] = $remain_len;
  373. }
  374. // 检查是否是关闭状态或者是否到达请求上限
  375. if($this->workerStatus == self::STATUS_SHUTDOWN || $this->statusInfo['total_request'] >= $this->maxRequests)
  376. {
  377. // 停止服务
  378. $this->stop();
  379. // EXIT_WAIT_TIME秒后退出进程
  380. pcntl_alarm(self::EXIT_WAIT_TIME);
  381. }
  382. }
  383. /**
  384. * 根据fd关闭链接
  385. * @param int $fd
  386. * @return void
  387. */
  388. protected function closeClient($fd)
  389. {
  390. // udp忽略
  391. if($this->protocol != 'udp')
  392. {
  393. $this->event->del($this->connections[$fd], Events\BaseEvent::EV_READ);
  394. $this->event->del($this->connections[$fd], Events\BaseEvent::EV_WRITE);
  395. fclose($this->connections[$fd]);
  396. unset($this->connections[$fd], $this->recvBuffers[$fd], $this->sendBuffers[$fd]);
  397. }
  398. }
  399. /**
  400. * 安装信号处理函数
  401. * @return void
  402. */
  403. protected function installSignal()
  404. {
  405. // 闹钟信号
  406. $this->event->add(SIGALRM, Events\BaseEvent::EV_SIGNAL, array($this, 'signalHandler'), SIGALRM);
  407. // 终止进程信号
  408. $this->event->add(SIGINT, Events\BaseEvent::EV_SIGNAL, array($this, 'signalHandler'), SIGINT);
  409. // 平滑重启信号
  410. $this->event->add(SIGHUP, Events\BaseEvent::EV_SIGNAL, array($this, 'signalHandler'), SIGHUP);
  411. // 报告进程状态
  412. $this->event->add(SIGUSR1, Events\BaseEvent::EV_SIGNAL, array($this, 'signalHandler'), SIGUSR1);
  413. // 报告该进程使用的文件
  414. $this->event->add(SIGUSR2, Events\BaseEvent::EV_SIGNAL, array($this, 'signalHandler'), SIGUSR2);
  415. // 设置忽略信号
  416. pcntl_signal(SIGTTIN, SIG_IGN);
  417. pcntl_signal(SIGTTOU, SIG_IGN);
  418. pcntl_signal(SIGQUIT, SIG_IGN);
  419. pcntl_signal(SIGPIPE, SIG_IGN);
  420. pcntl_signal(SIGCHLD, SIG_IGN);
  421. }
  422. /**
  423. * 设置server信号处理函数
  424. * @param null $null
  425. * @param int $signal
  426. */
  427. public function signalHandler($signal, $null = null, $null = null)
  428. {
  429. switch($signal)
  430. {
  431. // 时钟处理函数
  432. case SIGALRM:
  433. // 停止服务后EXIT_WAIT_TIME秒还没退出则强制退出
  434. if($this->workerStatus == self::STATUS_SHUTDOWN)
  435. {
  436. exit(0);
  437. }
  438. break;
  439. // 停止该进程
  440. case SIGINT:
  441. // 平滑重启
  442. case SIGHUP:
  443. $this->stop();
  444. // EXIT_WAIT_TIME秒后退出进程
  445. pcntl_alarm(self::EXIT_WAIT_TIME);
  446. break;
  447. // 报告进程状态
  448. case SIGUSR1:
  449. $this->writeStatusToQueue();
  450. break;
  451. // 报告进程使用的php文件
  452. case SIGUSR2:
  453. $this->writeFilesListToQueue();
  454. break;
  455. }
  456. }
  457. /**
  458. * 发送数据到客户端
  459. * @return bool
  460. */
  461. public function sendToClient($str_to_send)
  462. {
  463. // tcp
  464. if($this->protocol != 'udp')
  465. {
  466. $send_len = @stream_socket_sendto($this->connections[$this->currentDealFd], $str_to_send);
  467. if($send_len === strlen($str_to_send))
  468. {
  469. return true;
  470. }
  471. if($send_len > 0)
  472. {
  473. $this->sendBuffers[$this->currentDealFd] = substr($str_to_send, $send_len);
  474. }
  475. else
  476. {
  477. $this->sendBuffers[$this->currentDealFd] = $str_to_send;
  478. }
  479. $this->event->add($this->connections[$this->currentDealFd], Events\BaseEvent::EV_WRITE, array($this, 'tcpWriteToClient'), array($this->currentDealFd));
  480. /*
  481. // tcp 如果一次没写完(一般是缓冲区满的情况),则阻塞写
  482. if(!$this->blockWrite($this->connections[$this->currentDealFd], $str_to_send, 500))
  483. {
  484. $this->notice('sendToClient fail ,Data length = ' . strlen($str_to_send));
  485. $this->statusInfo['send_fail']++;
  486. return false;
  487. }*/
  488. return null;
  489. }
  490. // udp 直接发送,要求数据包不能超过65515
  491. return strlen($str_to_send) == stream_socket_sendto($this->mainSocket, $str_to_send, 0, $this->currentClientAddress);
  492. }
  493. /**
  494. * 向客户端socket写数据
  495. * @param resource $fd
  496. * @param string $bin_data
  497. */
  498. public function tcpWriteToClient($fd)
  499. {
  500. if(empty($this->connections[$fd]))
  501. {
  502. $this->notice("tcpWriteToClient \$this->connections[$fd] is null");
  503. return false;
  504. }
  505. $send_len = @stream_socket_sendto($this->connections[$fd], $this->sendBuffers[$fd]);
  506. if($send_len === strlen($this->sendBuffers[$fd]))
  507. {
  508. if(!$this->isPersistentConnection)
  509. {
  510. return $this->closeClient($fd);
  511. }
  512. $this->event->del($this->connections[$fd], BaseEvent::EV_WRITE);
  513. return;
  514. }
  515. if($send_len > 0)
  516. {
  517. $this->sendBuffers[$fd] = substr($this->sendBuffers[$fd], $send_len);
  518. }
  519. }
  520. /**
  521. * 向fd写数据,如果socket缓冲区满了,则改用阻塞模式写数据
  522. * @param resource $fd
  523. * @param string $str_to_write
  524. * @param int $time_out 单位毫秒
  525. * @return bool
  526. */
  527. protected function blockWrite($fd, $str_to_write, $timeout_ms = 500)
  528. {
  529. $send_len = @fwrite($fd, $str_to_write);
  530. if($send_len == strlen($str_to_write))
  531. {
  532. return true;
  533. }
  534. // 客户端关闭
  535. if(feof($fd))
  536. {
  537. $this->notice("blockWrite client close");
  538. return false;
  539. }
  540. // 设置阻塞
  541. stream_set_blocking($fd, 1);
  542. // 设置超时
  543. $timeout_sec = floor($timeout_ms/1000);
  544. $timeout_ms = $timeout_ms%1000;
  545. stream_set_timeout($fd, $timeout_sec, $timeout_ms*1000);
  546. $send_len += @fwrite($fd, substr($str_to_write, $send_len));
  547. // 改回非阻塞
  548. stream_set_blocking($fd, 0);
  549. return $send_len == strlen($str_to_write);
  550. }
  551. /**
  552. * 获取客户端地址
  553. * @param int $fd 已经链接的socket id
  554. * @return string ip:port
  555. */
  556. public function getRemoteAddress($fd = null)
  557. {
  558. if(empty($fd))
  559. {
  560. if(!isset($this->connections[$this->currentDealFd]))
  561. {
  562. return '';
  563. }
  564. $fd = $this->currentDealFd;
  565. }
  566. if($this->protocol == 'udp')
  567. {
  568. $sock_name = $this->currentClientAddress;
  569. }
  570. else
  571. {
  572. $sock_name = stream_socket_get_name($this->connections[$fd], true);
  573. }
  574. return $sock_name;
  575. }
  576. /**
  577. * 获取客户端ip
  578. * @param integer $fd 已经链接的socket id
  579. * @return string
  580. */
  581. public function getRemoteIp($fd = null)
  582. {
  583. $ip = '';
  584. $address= $this->getRemoteAddress($fd);
  585. if($address)
  586. {
  587. list($ip, $port) = explode(':', $address, 2);
  588. }
  589. return $ip;
  590. }
  591. /**
  592. * 获取客户端ip
  593. * @param integer $fd 已经链接的socket id
  594. * @return integer
  595. */
  596. public function getRemotePort($fd = null)
  597. {
  598. $port = 0;
  599. $address= $this->getRemoteAddress($fd);
  600. if($address)
  601. {
  602. list($ip, $port) = explode(':', $address, 2);
  603. }
  604. return $port;
  605. }
  606. /**
  607. * 获取本地ip
  608. * @return string
  609. */
  610. public function getLocalIp()
  611. {
  612. $ip = '';
  613. $sock_name = '';
  614. if($this->protocol == 'udp' || !isset($this->connections[$this->currentDealFd]))
  615. {
  616. $sock_name = stream_socket_get_name($this->mainSocket, false);
  617. }
  618. else
  619. {
  620. $sock_name = stream_socket_get_name($this->connections[$this->currentDealFd], false);
  621. }
  622. if($sock_name)
  623. {
  624. $tmp = explode(':', $sock_name);
  625. $ip = $tmp[0];
  626. }
  627. if(empty($ip) || '127.0.0.1' == $ip)
  628. {
  629. $ip = gethostbyname(trim(`hostname`));
  630. }
  631. return $ip;
  632. }
  633. /**
  634. * 将当前worker进程状态写入消息队列
  635. * @return void
  636. */
  637. protected function writeStatusToQueue()
  638. {
  639. if(!Master::getQueueId())
  640. {
  641. return;
  642. }
  643. $error_code = 0;
  644. msg_send(Master::getQueueId(), self::MSG_TYPE_STATUS, array_merge($this->statusInfo, array('memory'=>memory_get_usage(true), 'pid'=>posix_getpid(), 'worker_name' => $this->workerName)), true, false, $error_code);
  645. }
  646. /**
  647. * 开发环境将当前进程使用的文件写入消息队列,用于FileMonitor监控文件更新
  648. * @return void
  649. */
  650. protected function writeFilesListToQueue()
  651. {
  652. if(!Master::getQueueId())
  653. {
  654. return;
  655. }
  656. $error_code = 0;
  657. $flip_file_list = array_flip(get_included_files());
  658. $file_list = array_diff_key($flip_file_list, $this->includeFiles);
  659. $this->includeFiles = $flip_file_list;
  660. if($file_list)
  661. {
  662. foreach(array_chunk($file_list, 10, true) as $list)
  663. {
  664. msg_send(Master::getQueueId(), self::MSG_TYPE_FILE_MONITOR, array_keys($list), true, false, $error_code);
  665. }
  666. }
  667. }
  668. /**
  669. * 是否所有任务都已经完成
  670. * @return bool
  671. */
  672. protected function allTaskHasDone()
  673. {
  674. // 如果是长链接并且没有要处理的数据则是任务都处理完了
  675. return $this->noConnections() || ($this->isPersistentConnection && $this->allBufferIsEmpty());
  676. }
  677. /**
  678. * 检查是否所有的链接的缓冲区都是空
  679. * @return bool
  680. */
  681. protected function allBufferIsEmpty()
  682. {
  683. foreach($this->recvBuffers as $fd => $buf)
  684. {
  685. if(!empty($buf['buf']))
  686. {
  687. return false;
  688. }
  689. }
  690. return true;
  691. }
  692. /**
  693. * 该进程收到的任务是否都已经完成,重启进程时需要判断
  694. * @return bool
  695. */
  696. protected function noConnections()
  697. {
  698. return empty($this->connections);
  699. }
  700. /**
  701. * 该worker进程开始服务的时候会触发一次,可以在这里做一些全局的事情
  702. * @return bool
  703. */
  704. protected function onStart()
  705. {
  706. return false;
  707. }
  708. /**
  709. * 该worker进程停止服务的时候会触发一次,可以在这里做一些全局的事情
  710. * @return bool
  711. */
  712. protected function onStop()
  713. {
  714. return false;
  715. }
  716. }