AbstractWorker.php 6.5 KB


  1. <?php
  2. namespace Man\Core;
  3. require_once WORKERMAN_ROOT_DIR . 'man/Core/Events/Select.php';
  4. /**
  5. * 抽象Worker类
  6. * 必须实现start方法
  7. * @author walkor <worker-man@qq.com>
  8. */
  9. abstract class AbstractWorker
  10. {
  11. /**
  12. * worker状态 运行中
  13. * @var integer
  14. */
  15. const STATUS_RUNNING = 2;
  16. /**
  17. * worker状态 停止中
  18. * @var integer
  19. */
  20. const STATUS_SHUTDOWN = 4;
  21. /**
  22. * 消息队列状态消息类型
  23. * @var integer
  24. */
  25. const MSG_TYPE_STATUS = 1;
  26. /**
  27. * 消息队列文件监控消息类型
  28. * @var integer
  29. */
  30. const MSG_TYPE_FILE_MONITOR = 2;
  31. /**
  32. * worker监听端口的Socket
  33. * @var resource
  34. */
  35. protected $mainSocket = null;
  36. /**
  37. * 当前worker的服务状态
  38. * @var integer
  39. */
  40. protected $workerStatus = self::STATUS_RUNNING;
  41. /**
  42. * 让该worker实例开始服务
  43. *
  44. * @return void
  45. */
  46. abstract public function start();
  47. /**
  48. * 构造函数,主要是初始化信号处理函数
  49. * @return void
  50. */
  51. public function __construct()
  52. {
  53. $this->installSignal();
  54. $this->addShutdownHook();
  55. }
  56. /**
  57. * 设置监听的socket
  58. * @param resource $socket
  59. * @return void
  60. */
  61. public function setListendSocket($socket)
  62. {
  63. // 初始化
  64. $this->mainSocket = $socket;
  65. stream_set_blocking($this->mainSocket, 0);
  66. }
  67. /**
  68. * 安装信号处理函数
  69. * @return void
  70. */
  71. protected function installSignal()
  72. {
  73. // 如果是由worker脚本启动则不安装信号
  74. if(!defined('WORKERMAN_PID_FILE'))
  75. {
  76. return;
  77. }
  78. // 报告进程状态
  79. pcntl_signal(SIGINT, array($this, 'signalHandler'));
  80. pcntl_signal(SIGHUP, array($this, 'signalHandler'));
  81. // 设置忽略信号
  82. pcntl_signal(SIGALRM, SIG_IGN);
  83. pcntl_signal(SIGUSR1, SIG_IGN);
  84. pcntl_signal(SIGUSR2, SIG_IGN);
  85. pcntl_signal(SIGTTIN, SIG_IGN);
  86. pcntl_signal(SIGTTOU, SIG_IGN);
  87. pcntl_signal(SIGQUIT, SIG_IGN);
  88. pcntl_signal(SIGPIPE, SIG_IGN);
  89. pcntl_signal(SIGCHLD, SIG_IGN);
  90. }
  91. /**
  92. * 设置server信号处理函数
  93. * @param integer $signal
  94. * @return void
  95. */
  96. public function signalHandler($signal)
  97. {
  98. switch($signal)
  99. {
  100. // 停止该进程
  101. case SIGINT:
  102. // 平滑重启
  103. case SIGHUP:
  104. $this->workerStatus = self::STATUS_SHUTDOWN;
  105. break;
  106. }
  107. }
  108. /**
  109. * 判断该进程是否收到退出信号,收到信号后要马上退出,否则稍后会被住进成强行杀死
  110. * @return boolean
  111. */
  112. public function hasShutDown()
  113. {
  114. pcntl_signal_dispatch();
  115. return $this->workerStatus == self::STATUS_SHUTDOWN;
  116. }
  117. /**
  118. * 获取主进程统计信息
  119. * @return array
  120. */
  121. protected function getMasterStatus()
  122. {
  123. if(!Master::getShmId())
  124. {
  125. return array();
  126. }
  127. return shm_get_var(Master::getShmId(), Master::STATUS_VAR_ID);
  128. }
  129. /**
  130. * 获取worker与pid的映射关系
  131. * @return array ['worker_name1'=>[pid1=>pid1,pid2=>pid2,..], 'worker_name2'=>[pid3,..], ...]
  132. */
  133. protected function getWorkerPidMap()
  134. {
  135. $status = $this->getMasterStatus();
  136. if(empty($status))
  137. {
  138. return array();
  139. }
  140. return $status['pid_map'];
  141. }
  142. /**
  143. * 获取pid与worker的映射关系
  144. * @return array ['pid1'=>'worker_name1','pid2'=>'worker_name2', ...]
  145. */
  146. protected function getPidWorkerMap()
  147. {
  148. $pid_worker_map = array();
  149. if($worker_pid_map = $this->getWorkerPidMap())
  150. {
  151. foreach($worker_pid_map as $worker_name=>$pid_array)
  152. {
  153. foreach($pid_array as $pid)
  154. {
  155. $pid_worker_map[$pid] = $worker_name;
  156. }
  157. }
  158. }
  159. return $pid_worker_map;
  160. }
  161. /**
  162. * 进程关闭时进行错误检查
  163. * @return void
  164. */
  165. protected function addShutdownHook()
  166. {
  167. register_shutdown_function(array($this, 'checkErrors'));
  168. }
  169. /**
  170. * 检查错误
  171. * @return void
  172. */
  173. public function checkErrors()
  174. {
  175. if(self::STATUS_SHUTDOWN != $this->workerStatus)
  176. {
  177. $error_msg = "WORKER EXIT UNEXPECTED ";
  178. if($errors = error_get_last())
  179. {
  180. $error_msg .= $this->getErrorType($errors['type']) . " {$errors['message']} in {$errors['file']} on line {$errors['line']}";
  181. }
  182. $this->notice($error_msg);
  183. }
  184. }
  185. /**
  186. * 获取错误类型对应的意义
  187. * @param integer $type
  188. * @return string
  189. */
  190. public function getErrorType($type)
  191. {
  192. switch($type)
  193. {
  194. case E_ERROR: // 1 //
  195. return 'E_ERROR';
  196. case E_WARNING: // 2 //
  197. return 'E_WARNING';
  198. case E_PARSE: // 4 //
  199. return 'E_PARSE';
  200. case E_NOTICE: // 8 //
  201. return 'E_NOTICE';
  202. case E_CORE_ERROR: // 16 //
  203. return 'E_CORE_ERROR';
  204. case E_CORE_WARNING: // 32 //
  205. return 'E_CORE_WARNING';
  206. case E_CORE_ERROR: // 64 //
  207. return 'E_COMPILE_ERROR';
  208. case E_CORE_WARNING: // 128 //
  209. return 'E_COMPILE_WARNING';
  210. case E_USER_ERROR: // 256 //
  211. return 'E_USER_ERROR';
  212. case E_USER_WARNING: // 512 //
  213. return 'E_USER_WARNING';
  214. case E_USER_NOTICE: // 1024 //
  215. return 'E_USER_NOTICE';
  216. case E_STRICT: // 2048 //
  217. return 'E_STRICT';
  218. case E_RECOVERABLE_ERROR: // 4096 //
  219. return 'E_RECOVERABLE_ERROR';
  220. case E_DEPRECATED: // 8192 //
  221. return 'E_DEPRECATED';
  222. case E_USER_DEPRECATED: // 16384 //
  223. return 'E_USER_DEPRECATED';
  224. }
  225. return "";
  226. }
  227. /**
  228. * 记录日志
  229. * @param sring $str
  230. * @return void
  231. */
  232. protected function notice($str, $display = true)
  233. {
  234. $str = 'Worker['.get_class($this).']:'.$str;
  235. Lib\Log::add($str);
  236. if($display && Lib\Config::get('workerman.debug') == 1)
  237. {
  238. echo $str."\n";
  239. }
  240. }
  241. }