AbstractWorker.php 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. <?php
  2. namespace Man\Core;
  3. require_once WORKERMAN_ROOT_DIR . 'Core/Events/Select.php';
  4. /**
  5. * 抽象Worker类
  6. * 必须实现start方法
  7. * @author walkor <workerman.net>
  8. */
  9. abstract class AbstractWorker
  10. {
  11. /**
  12. * worker状态 运行中
  13. * @var integer
  14. */
  15. const STATUS_RUNNING = 2;
  16. /**
  17. * worker状态 停止中
  18. * @var integer
  19. */
  20. const STATUS_SHUTDOWN = 4;
  21. /**
  22. * 消息队列状态消息类型
  23. * @var integer
  24. */
  25. const MSG_TYPE_STATUS = 1;
  26. /**
  27. * 消息队列文件监控消息类型
  28. * @var integer
  29. */
  30. const MSG_TYPE_FILE_MONITOR = 2;
  31. /**
  32. * worker名称
  33. * @var string
  34. */
  35. protected $workerName = __CLASS__;
  36. /**
  37. * worker监听端口的Socket
  38. * @var resource
  39. */
  40. protected $mainSocket = null;
  41. /**
  42. * 当前worker的服务状态
  43. * @var integer
  44. */
  45. protected $workerStatus = self::STATUS_RUNNING;
  46. /**
  47. * 让该worker实例开始服务
  48. *
  49. * @return void
  50. */
  51. abstract public function start();
  52. /**
  53. * 构造函数,主要是初始化信号处理函数
  54. * @return void
  55. */
  56. public function __construct($worker_name = null)
  57. {
  58. $this->workerName = $worker_name ? $worker_name : get_class($this);
  59. $this->installSignal();
  60. $this->addShutdownHook();
  61. }
  62. /**
  63. * 设置监听的socket
  64. * @param resource $socket
  65. * @return void
  66. */
  67. public function setListendSocket($socket)
  68. {
  69. // 初始化
  70. $this->mainSocket = $socket;
  71. stream_set_blocking($this->mainSocket, 0);
  72. }
  73. /**
  74. * 安装信号处理函数
  75. * @return void
  76. */
  77. protected function installSignal()
  78. {
  79. // 报告进程状态
  80. pcntl_signal(SIGINT, array($this, 'signalHandler'));
  81. pcntl_signal(SIGHUP, array($this, 'signalHandler'));
  82. pcntl_signal(SIGTTOU, array($this, 'signalHandler'));
  83. // 设置忽略信号
  84. pcntl_signal(SIGALRM, SIG_IGN);
  85. pcntl_signal(SIGUSR1, SIG_IGN);
  86. pcntl_signal(SIGUSR2, SIG_IGN);
  87. pcntl_signal(SIGTTIN, SIG_IGN);
  88. pcntl_signal(SIGQUIT, SIG_IGN);
  89. pcntl_signal(SIGPIPE, SIG_IGN);
  90. pcntl_signal(SIGCHLD, SIG_IGN);
  91. }
  92. /**
  93. * 设置server信号处理函数
  94. * @param integer $signal
  95. * @return void
  96. */
  97. public function signalHandler($signal)
  98. {
  99. switch($signal)
  100. {
  101. // 停止该进程
  102. case SIGINT:
  103. // 平滑重启
  104. case SIGHUP:
  105. $this->workerStatus = self::STATUS_SHUTDOWN;
  106. break;
  107. // 终端关闭
  108. case SIGTTOU:
  109. $this->resetFd();
  110. break;
  111. }
  112. }
  113. /**
  114. * 判断该进程是否收到退出信号,收到信号后要马上退出,否则稍后会被住进成强行杀死
  115. * @return boolean
  116. */
  117. public function hasShutDown()
  118. {
  119. pcntl_signal_dispatch();
  120. return $this->workerStatus == self::STATUS_SHUTDOWN;
  121. }
  122. /**
  123. * 获取主进程统计信息
  124. * @return array
  125. */
  126. protected function getMasterStatus()
  127. {
  128. if(!Master::getShmId())
  129. {
  130. return array();
  131. }
  132. return shm_get_var(Master::getShmId(), Master::STATUS_VAR_ID);
  133. }
  134. /**
  135. * 获取worker与pid的映射关系
  136. * @return array ['worker_name1'=>[pid1=>pid1,pid2=>pid2,..], 'worker_name2'=>[pid3,..], ...]
  137. */
  138. protected function getWorkerPidMap()
  139. {
  140. $status = $this->getMasterStatus();
  141. if(empty($status))
  142. {
  143. return array();
  144. }
  145. return $status['pid_map'];
  146. }
  147. /**
  148. * 获取pid与worker的映射关系
  149. * @return array ['pid1'=>'worker_name1','pid2'=>'worker_name2', ...]
  150. */
  151. protected function getPidWorkerMap()
  152. {
  153. $pid_worker_map = array();
  154. if($worker_pid_map = $this->getWorkerPidMap())
  155. {
  156. foreach($worker_pid_map as $worker_name=>$pid_array)
  157. {
  158. foreach($pid_array as $pid)
  159. {
  160. $pid_worker_map[$pid] = $worker_name;
  161. }
  162. }
  163. }
  164. return $pid_worker_map;
  165. }
  166. /**
  167. * 进程关闭时进行错误检查
  168. * @return void
  169. */
  170. protected function addShutdownHook()
  171. {
  172. register_shutdown_function(array($this, 'checkErrors'));
  173. }
  174. /**
  175. * 检查错误
  176. * @return void
  177. */
  178. public function checkErrors()
  179. {
  180. if(self::STATUS_SHUTDOWN != $this->workerStatus)
  181. {
  182. $error_msg = "WORKER EXIT UNEXPECTED ";
  183. if($errors = error_get_last())
  184. {
  185. $error_msg .= $this->getErrorType($errors['type']) . " {$errors['message']} in {$errors['file']} on line {$errors['line']}";
  186. }
  187. $this->notice($error_msg);
  188. }
  189. }
  190. /**
  191. * 获取错误类型对应的意义
  192. * @param integer $type
  193. * @return string
  194. */
  195. public function getErrorType($type)
  196. {
  197. switch($type)
  198. {
  199. case E_ERROR: // 1 //
  200. return 'E_ERROR';
  201. case E_WARNING: // 2 //
  202. return 'E_WARNING';
  203. case E_PARSE: // 4 //
  204. return 'E_PARSE';
  205. case E_NOTICE: // 8 //
  206. return 'E_NOTICE';
  207. case E_CORE_ERROR: // 16 //
  208. return 'E_CORE_ERROR';
  209. case E_CORE_WARNING: // 32 //
  210. return 'E_CORE_WARNING';
  211. case E_CORE_ERROR: // 64 //
  212. return 'E_COMPILE_ERROR';
  213. case E_CORE_WARNING: // 128 //
  214. return 'E_COMPILE_WARNING';
  215. case E_USER_ERROR: // 256 //
  216. return 'E_USER_ERROR';
  217. case E_USER_WARNING: // 512 //
  218. return 'E_USER_WARNING';
  219. case E_USER_NOTICE: // 1024 //
  220. return 'E_USER_NOTICE';
  221. case E_STRICT: // 2048 //
  222. return 'E_STRICT';
  223. case E_RECOVERABLE_ERROR: // 4096 //
  224. return 'E_RECOVERABLE_ERROR';
  225. case E_DEPRECATED: // 8192 //
  226. return 'E_DEPRECATED';
  227. case E_USER_DEPRECATED: // 16384 //
  228. return 'E_USER_DEPRECATED';
  229. }
  230. return "";
  231. }
  232. /**
  233. * 记录日志
  234. * @param sring $str
  235. * @return void
  236. */
  237. protected function notice($str, $display = true)
  238. {
  239. $str = 'Worker['.get_class($this).']:'.$str;
  240. Lib\Log::add($str);
  241. if($display && Lib\Config::get('workerman.debug') == 1)
  242. {
  243. echo $str."\n";
  244. }
  245. }
  246. /**
  247. * 关闭标准输入输出
  248. * @return void
  249. */
  250. protected function resetFd()
  251. {
  252. global $STDOUT, $STDERR;
  253. @fclose(STDOUT);
  254. @fclose(STDERR);
  255. // 将标准输出重定向到/dev/null
  256. $STDOUT = fopen('/dev/null',"rw+");
  257. $STDERR = fopen('/dev/null',"rw+");
  258. }
  259. }