AbstractWorker.php 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. <?php
  2. namespace Man\Core;
  3. require_once WORKERMAN_ROOT_DIR . 'man/Core/Events/Select.php';
  4. /**
  5. * 抽象Worker类
  6. * 必须实现start方法
  7. * @author walkor <worker-man@qq.com>
  8. */
  9. abstract class AbstractWorker
  10. {
  11. /**
  12. * worker状态 运行中
  13. * @var integer
  14. */
  15. const STATUS_RUNNING = 2;
  16. /**
  17. * worker状态 停止中
  18. * @var integer
  19. */
  20. const STATUS_SHUTDOWN = 4;
  21. /**
  22. * 消息队列状态消息类型
  23. * @var integer
  24. */
  25. const MSG_TYPE_STATUS = 1;
  26. /**
  27. * 消息队列文件监控消息类型
  28. * @var integer
  29. */
  30. const MSG_TYPE_FILE_MONITOR = 2;
  31. /**
  32. * 进程编号
  33. * @var integer
  34. */
  35. public static $number = 0;
  36. /**
  37. * worker名称
  38. * @var string
  39. */
  40. protected $workerName = __CLASS__;
  41. /**
  42. * worker监听端口的Socket
  43. * @var resource
  44. */
  45. protected $mainSocket = null;
  46. /**
  47. * 当前worker的服务状态
  48. * @var integer
  49. */
  50. protected $workerStatus = self::STATUS_RUNNING;
  51. /**
  52. * 让该worker实例开始服务
  53. *
  54. * @return void
  55. */
  56. abstract public function start();
  57. /**
  58. * 构造函数,主要是初始化信号处理函数
  59. * @return void
  60. */
  61. public function __construct()
  62. {
  63. $this->workerName = get_class($this);
  64. $this->installSignal();
  65. $this->addShutdownHook();
  66. }
  67. /**
  68. * 设置监听的socket
  69. * @param resource $socket
  70. * @return void
  71. */
  72. public function setListendSocket($socket)
  73. {
  74. // 初始化
  75. $this->mainSocket = $socket;
  76. stream_set_blocking($this->mainSocket, 0);
  77. }
  78. /**
  79. * 安装信号处理函数
  80. * @return void
  81. */
  82. protected function installSignal()
  83. {
  84. // 如果是由worker脚本启动则不安装信号
  85. if(!defined('WORKERMAN_PID_FILE'))
  86. {
  87. return;
  88. }
  89. // 报告进程状态
  90. pcntl_signal(SIGINT, array($this, 'signalHandler'));
  91. pcntl_signal(SIGHUP, array($this, 'signalHandler'));
  92. // 设置忽略信号
  93. pcntl_signal(SIGALRM, SIG_IGN);
  94. pcntl_signal(SIGUSR1, SIG_IGN);
  95. pcntl_signal(SIGUSR2, SIG_IGN);
  96. pcntl_signal(SIGTTIN, SIG_IGN);
  97. pcntl_signal(SIGTTOU, SIG_IGN);
  98. pcntl_signal(SIGQUIT, SIG_IGN);
  99. pcntl_signal(SIGPIPE, SIG_IGN);
  100. pcntl_signal(SIGCHLD, SIG_IGN);
  101. }
  102. /**
  103. * 设置server信号处理函数
  104. * @param integer $signal
  105. * @return void
  106. */
  107. public function signalHandler($signal)
  108. {
  109. switch($signal)
  110. {
  111. // 停止该进程
  112. case SIGINT:
  113. // 平滑重启
  114. case SIGHUP:
  115. $this->workerStatus = self::STATUS_SHUTDOWN;
  116. break;
  117. }
  118. }
  119. /**
  120. * 判断该进程是否收到退出信号,收到信号后要马上退出,否则稍后会被住进成强行杀死
  121. * @return boolean
  122. */
  123. public function hasShutDown()
  124. {
  125. pcntl_signal_dispatch();
  126. return $this->workerStatus == self::STATUS_SHUTDOWN;
  127. }
  128. /**
  129. * 获取主进程统计信息
  130. * @return array
  131. */
  132. protected function getMasterStatus()
  133. {
  134. if(!Master::getShmId())
  135. {
  136. return array();
  137. }
  138. return shm_get_var(Master::getShmId(), Master::STATUS_VAR_ID);
  139. }
  140. /**
  141. * 获取worker与pid的映射关系
  142. * @return array ['worker_name1'=>[pid1=>pid1,pid2=>pid2,..], 'worker_name2'=>[pid3,..], ...]
  143. */
  144. protected function getWorkerPidMap()
  145. {
  146. $status = $this->getMasterStatus();
  147. if(empty($status))
  148. {
  149. return array();
  150. }
  151. return $status['pid_map'];
  152. }
  153. /**
  154. * 获取pid与worker的映射关系
  155. * @return array ['pid1'=>'worker_name1','pid2'=>'worker_name2', ...]
  156. */
  157. protected function getPidWorkerMap()
  158. {
  159. $pid_worker_map = array();
  160. if($worker_pid_map = $this->getWorkerPidMap())
  161. {
  162. foreach($worker_pid_map as $worker_name=>$pid_array)
  163. {
  164. foreach($pid_array as $pid)
  165. {
  166. $pid_worker_map[$pid] = $worker_name;
  167. }
  168. }
  169. }
  170. return $pid_worker_map;
  171. }
  172. /**
  173. * 进程关闭时进行错误检查
  174. * @return void
  175. */
  176. protected function addShutdownHook()
  177. {
  178. register_shutdown_function(array($this, 'checkErrors'));
  179. }
  180. /**
  181. * 检查错误
  182. * @return void
  183. */
  184. public function checkErrors()
  185. {
  186. if(self::STATUS_SHUTDOWN != $this->workerStatus)
  187. {
  188. $error_msg = "WORKER EXIT UNEXPECTED ";
  189. if($errors = error_get_last())
  190. {
  191. $error_msg .= $this->getErrorType($errors['type']) . " {$errors['message']} in {$errors['file']} on line {$errors['line']}";
  192. }
  193. $this->notice($error_msg);
  194. }
  195. }
  196. /**
  197. * 增加进程序列号
  198. * @return int
  199. */
  200. public function increaseNumber()
  201. {
  202. return ++self::$number;
  203. }
  204. /**
  205. * 获取错误类型对应的意义
  206. * @param integer $type
  207. * @return string
  208. */
  209. public function getErrorType($type)
  210. {
  211. switch($type)
  212. {
  213. case E_ERROR: // 1 //
  214. return 'E_ERROR';
  215. case E_WARNING: // 2 //
  216. return 'E_WARNING';
  217. case E_PARSE: // 4 //
  218. return 'E_PARSE';
  219. case E_NOTICE: // 8 //
  220. return 'E_NOTICE';
  221. case E_CORE_ERROR: // 16 //
  222. return 'E_CORE_ERROR';
  223. case E_CORE_WARNING: // 32 //
  224. return 'E_CORE_WARNING';
  225. case E_CORE_ERROR: // 64 //
  226. return 'E_COMPILE_ERROR';
  227. case E_CORE_WARNING: // 128 //
  228. return 'E_COMPILE_WARNING';
  229. case E_USER_ERROR: // 256 //
  230. return 'E_USER_ERROR';
  231. case E_USER_WARNING: // 512 //
  232. return 'E_USER_WARNING';
  233. case E_USER_NOTICE: // 1024 //
  234. return 'E_USER_NOTICE';
  235. case E_STRICT: // 2048 //
  236. return 'E_STRICT';
  237. case E_RECOVERABLE_ERROR: // 4096 //
  238. return 'E_RECOVERABLE_ERROR';
  239. case E_DEPRECATED: // 8192 //
  240. return 'E_DEPRECATED';
  241. case E_USER_DEPRECATED: // 16384 //
  242. return 'E_USER_DEPRECATED';
  243. }
  244. return "";
  245. }
  246. /**
  247. * 记录日志
  248. * @param sring $str
  249. * @return void
  250. */
  251. protected function notice($str, $display = true)
  252. {
  253. $str = 'Worker['.get_class($this).']:'.$str;
  254. Lib\Log::add($str);
  255. if($display && Lib\Config::get('workerman.debug') == 1)
  256. {
  257. echo $str."\n";
  258. }
  259. }
  260. }