StatisticWorker.php 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. <?php
  2. /**
  3. *
  4. * @author walkor <walkor@workerman.net>
  5. */
  6. class StatisticWorker extends Man\Core\SocketWorker
  7. {
  8. /**
  9. * 最大日志buffer,大于这个值就写磁盘
  10. * @var integer
  11. */
  12. const MAX_LOG_BUFFER_SZIE = 1024000;
  13. /**
  14. * 多长时间写一次数据到磁盘
  15. * @var integer
  16. */
  17. const WRITE_PERIOD_LENGTH = 60;
  18. /**
  19. * 多长时间清理一次老的磁盘数据
  20. * @var integer
  21. */
  22. const CLEAR_PERIOD_LENGTH = 86400;
  23. /**
  24. * 数据多长时间过期
  25. * @var integer
  26. */
  27. const EXPIRED_TIME = 1296000;
  28. /**
  29. * 统计数据
  30. * ip=>modid=>interface=>['code'=>[xx=>count,xx=>count],'suc_cost_time'=>xx,'fail_cost_time'=>xx, 'suc_count'=>xx, 'fail_count'=>xx]
  31. * @var array
  32. */
  33. protected $statisticData = array();
  34. /**
  35. * 日志的buffer
  36. * @var string
  37. */
  38. protected $logBuffer = '';
  39. /**
  40. * 放统计数据的目录(相对于workerman/logs/)
  41. * @var string
  42. */
  43. protected $statisticDir = 'statistic/statistic/';
  44. /**
  45. * 存放统计日志的目录(相对于workerman/logs/)
  46. * @var string
  47. */
  48. protected $logDir = 'statistic/log/';
  49. /**
  50. * 提供统计查询的socket
  51. * @var resource
  52. */
  53. protected $providerSocket = null;
  54. /**
  55. * udp 默认全部接收完毕
  56. * @see Man\Core.SocketWorker::dealInput()
  57. */
  58. public function dealInput($recv_buffer)
  59. {
  60. return 0;
  61. }
  62. /**
  63. * 业务处理
  64. * @see Man\Core.SocketWorker::dealProcess()
  65. */
  66. public function dealProcess($recv_buffer)
  67. {
  68. // 解码
  69. $unpack_data = StatisticProtocol::decode($recv_buffer);
  70. $module = $unpack_data['module'];
  71. $interface = $unpack_data['interface'];
  72. $cost_time = $unpack_data['cost_time'];
  73. $success = $unpack_data['success'];
  74. $time = $unpack_data['time'];
  75. $code = $unpack_data['code'];
  76. $msg = str_replace("\n", "<br>", $unpack_data['msg']);
  77. $ip = $this->getRemoteIp();
  78. // 模块接口统计
  79. $this->collectStatistics($module, $interface, $cost_time, $success, $ip, $code, $msg);
  80. // 全局统计
  81. $this->collectStatistics('WorkerMan', 'Statistics', $cost_time, $success, $ip, $code, $msg);
  82. // 失败记录日志
  83. if(!$success)
  84. {
  85. $this->logBuffer .= date('Y-m-d H:i:s',$time)."\t$ip\t$module::$interface\tcode:$code\tmsg:$msg\n";
  86. if(strlen($this->logBuffer) >= self::MAX_LOG_BUFFER_SZIE)
  87. {
  88. $this->writeLogToDisk();
  89. }
  90. }
  91. }
  92. /**
  93. * 收集统计数据
  94. * @param string $module
  95. * @param string $interface
  96. * @param float $cost_time
  97. * @param int $success
  98. * @param string $ip
  99. * @param int $code
  100. * @param string $msg
  101. * @return void
  102. */
  103. protected function collectStatistics($module, $interface , $cost_time, $success, $ip, $code, $msg)
  104. {
  105. // 统计相关信息
  106. if(!isset($this->statisticData[$ip]))
  107. {
  108. $this->statisticData[$ip] = array();
  109. }
  110. if(!isset($this->statisticData[$ip][$module]))
  111. {
  112. $this->statisticData[$ip][$module] = array();
  113. }
  114. if(!isset($this->statisticData[$ip][$module][$interface]))
  115. {
  116. $this->statisticData[$ip][$module][$interface] = array('code'=>array(), 'suc_cost_time'=>0, 'fail_cost_time'=>0, 'suc_count'=>0, 'fail_count'=>0);
  117. }
  118. if(!isset($this->statisticData[$ip][$module][$interface]['code'][$code]))
  119. {
  120. $this->statisticData[$ip][$module][$interface]['code'][$code] = 0;
  121. }
  122. $this->statisticData[$ip][$module][$interface]['code'][$code]++;
  123. if($success)
  124. {
  125. $this->statisticData[$ip][$module][$interface]['suc_cost_time'] += $cost_time;
  126. $this->statisticData[$ip][$module][$interface]['suc_count'] ++;
  127. }
  128. else
  129. {
  130. $this->statisticData[$ip][$module][$interface]['fail_cost_time'] += $cost_time;
  131. $this->statisticData[$ip][$module][$interface]['fail_count'] ++;
  132. }
  133. }
  134. /**
  135. * 将统计数据写入磁盘
  136. * @return void
  137. */
  138. public function writeStatisticsToDisk()
  139. {
  140. $time = time();
  141. // 循环将每个ip的统计数据写入磁盘
  142. foreach($this->statisticData as $ip => $mod_if_data)
  143. {
  144. foreach($mod_if_data as $module=>$items)
  145. {
  146. // 文件夹不存在则创建一个
  147. $file_dir = WORKERMAN_LOG_DIR . $this->statisticDir.$module;
  148. if(!is_dir($file_dir))
  149. {
  150. umask(0);
  151. mkdir($file_dir, 0777, true);
  152. }
  153. // 依次写入磁盘
  154. foreach($items as $interface=>$data)
  155. {
  156. file_put_contents($file_dir. "/{$interface}.".date('Y-m-d'), "$ip\t$time\t{$data['suc_count']}\t{$data['suc_cost_time']}\t{$data['fail_count']}\t{$data['fail_cost_time']}\t".json_encode($data['code'])."\n", FILE_APPEND | LOCK_EX);
  157. }
  158. }
  159. }
  160. // 清空统计
  161. $this->statisticData = array();
  162. }
  163. /**
  164. * 将日志数据写入磁盘
  165. * @return void
  166. */
  167. public function writeLogToDisk()
  168. {
  169. // 没有统计数据则返回
  170. if(empty($this->logBuffer))
  171. {
  172. return;
  173. }
  174. // 写入磁盘
  175. file_put_contents(WORKERMAN_LOG_DIR . $this->logDir . date('Y-m-d'), $this->logBuffer, FILE_APPEND | LOCK_EX);
  176. $this->logBuffer = '';
  177. }
  178. /**
  179. * 初始化
  180. * 统计目录检查
  181. * 初始化任务
  182. * @see Man\Core.SocketWorker::onStart()
  183. */
  184. protected function onStart()
  185. {
  186. // 初始化目录
  187. umask(0);
  188. $statistic_dir = WORKERMAN_LOG_DIR . $this->statisticDir;
  189. if(!is_dir($statistic_dir))
  190. {
  191. mkdir($statistic_dir, 0777, true);
  192. }
  193. $log_dir = WORKERMAN_LOG_DIR . $this->logDir;
  194. if(!is_dir($log_dir))
  195. {
  196. mkdir($log_dir, 0777, true);
  197. }
  198. // 初始化任务
  199. \Man\Core\Lib\Task::init($this->event);
  200. // 定时保存统计数据
  201. \Man\Core\Lib\Task::add(self::WRITE_PERIOD_LENGTH, array($this, 'writeStatisticsToDisk'));
  202. \Man\Core\Lib\Task::add(self::WRITE_PERIOD_LENGTH, array($this, 'writeLogToDisk'));
  203. // 定时清理不用的统计数据
  204. \Man\Core\Lib\Task::add(self::CLEAR_PERIOD_LENGTH, array($this, 'clearDisk'), array(WORKERMAN_LOG_DIR . $this->statisticDir, self::EXPIRED_TIME));
  205. \Man\Core\Lib\Task::add(self::CLEAR_PERIOD_LENGTH, array($this, 'clearDisk'), array(WORKERMAN_LOG_DIR . $this->logDir, self::EXPIRED_TIME));
  206. }
  207. /**
  208. * 进程停止时需要将数据写入磁盘
  209. * @see Man\Core.SocketWorker::onStop()
  210. */
  211. protected function onStop()
  212. {
  213. $this->writeLogToDisk();
  214. $this->writeStatisticsToDisk();
  215. }
  216. /**
  217. * 清除磁盘数据
  218. * @param string $file
  219. * @param int $exp_time
  220. */
  221. public function clearDisk($file = null, $exp_time = 86400)
  222. {
  223. $time_now = time();
  224. if(is_file($file))
  225. {
  226. $mtime = filemtime($file);
  227. if(!$mtime)
  228. {
  229. $this->notice("filemtime $file fail");
  230. return;
  231. }
  232. if($time_now - $mtime > $exp_time)
  233. {
  234. unlink($file);
  235. }
  236. return;
  237. }
  238. foreach (glob($file."/*") as $file_name)
  239. {
  240. $this->clearDisk($file_name, $exp_time);
  241. }
  242. }
  243. }
  244. /**
  245. *
  246. * struct statisticPortocol
  247. * {
  248. * unsigned char module_name_len;
  249. * unsigned char interface_name_len;
  250. * float cost_time;
  251. * unsigned char success;
  252. * int code;
  253. * unsigned short msg_len;
  254. * unsigned int time;
  255. * char[module_name_len] module_name;
  256. * char[interface_name_len] interface_name;
  257. * char[msg_len] msg;
  258. * }
  259. *
  260. * @author workerman.net
  261. */
  262. class StatisticProtocol
  263. {
  264. /**
  265. * 包头长度
  266. * @var integer
  267. */
  268. const PACKAGE_FIXED_LENGTH = 17;
  269. /**
  270. * udp 包最大长度
  271. * @var integer
  272. */
  273. const MAX_UDP_PACKGE_SIZE = 65507;
  274. /**
  275. * char类型能保存的最大数值
  276. * @var integer
  277. */
  278. const MAX_CHAR_VALUE = 255;
  279. /**
  280. * usigned short 能保存的最大数值
  281. * @var integer
  282. */
  283. const MAX_UNSIGNED_SHORT_VALUE = 65535;
  284. /**
  285. * 编码
  286. * @param string $module
  287. * @param string $interface
  288. * @param float $cost_time
  289. * @param int $success
  290. * @param int $code
  291. * @param string $msg
  292. * @return string
  293. */
  294. public static function encode($module, $interface , $cost_time, $success, $code = 0,$msg = '')
  295. {
  296. // 防止模块名过长
  297. if(strlen($module) > self::MAX_CHAR_VALUE)
  298. {
  299. $module = substr($module, 0, self::MAX_CHAR_VALUE);
  300. }
  301. // 防止接口名过长
  302. if(strlen($interface) > self::MAX_CHAR_VALUE)
  303. {
  304. $interface = substr($interface, 0, self::MAX_CHAR_VALUE);
  305. }
  306. // 防止msg过长
  307. $module_name_length = strlen($module);
  308. $interface_name_length = strlen($interface);
  309. $avalible_size = self::MAX_UDP_PACKGE_SIZE - self::PACKAGE_FIXED_LENGTH - $module_name_length - $interface_name_length;
  310. if(strlen($msg) > $avalible_size)
  311. {
  312. $msg = substr($msg, 0, $avalible_size);
  313. }
  314. // 打包
  315. return pack('CCfCNnN', $module_name_length, $interface_name_length, $cost_time, $success ? 1 : 0, $code, strlen($msg), time()).$module.$interface.$msg;
  316. }
  317. /**
  318. * 解包
  319. * @param string $bin_data
  320. * @return array
  321. */
  322. public static function decode($bin_data)
  323. {
  324. // 解包
  325. $data = unpack("Cmodule_name_len/Cinterface_name_len/fcost_time/Csuccess/Ncode/nmsg_len/Ntime", $bin_data);
  326. $module = substr($bin_data, self::PACKAGE_FIXED_LENGTH, $data['module_name_len']);
  327. $interface = substr($bin_data, self::PACKAGE_FIXED_LENGTH + $data['module_name_len'], $data['interface_name_len']);
  328. $msg = substr($bin_data, self::PACKAGE_FIXED_LENGTH + $data['module_name_len'] + $data['interface_name_len']);
  329. return array(
  330. 'module' => $module,
  331. 'interface' => $interface,
  332. 'cost_time' => $data['cost_time'],
  333. 'success' => $data['success'],
  334. 'time' => $data['time'],
  335. 'code' => $data['code'],
  336. 'msg' => $msg,
  337. );
  338. }
  339. }