BusinessWorker.php 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. <?php
  2. /**
  3. *
  4. * 处理具体逻辑
  5. *
  6. * @author walkor <workerman.net>
  7. *
  8. */
  9. define('ROOT_DIR', realpath(__DIR__.'/../'));
  10. require_once ROOT_DIR . '/Protocols/GatewayProtocol.php';
  11. require_once ROOT_DIR . '/Event.php';
  12. require_once ROOT_DIR . '/Lib/APLog.php';
  13. class BusinessWorker extends Man\Core\SocketWorker
  14. {
  15. /**
  16. * 与gateway的连接
  17. * ['ip:port' => conn, 'ip:port' => conn, ...]
  18. * @var array
  19. */
  20. protected static $gatewayConnections = array();
  21. /**
  22. * 进程启动时初始化
  23. * @see Man\Core.SocketWorker::onStart()
  24. */
  25. protected function onStart()
  26. {
  27. // 定时检查与gateway进程的连接
  28. \Man\Core\Lib\Task::init($this->event);
  29. \Man\Core\Lib\Task::add(1, array($this, 'checkGatewayConnections'));
  30. }
  31. /**
  32. * 检查请求是否完整
  33. * @see Man\Core.SocketWorker::dealInput()
  34. */
  35. public function dealInput($recv_str)
  36. {
  37. return GatewayProtocol::input($recv_str);
  38. }
  39. /**
  40. * 处理请求
  41. * @see Man\Core.SocketWorker::dealProcess()
  42. */
  43. public function dealProcess($recv_str)
  44. {
  45. $pack = new GatewayProtocol($recv_str);
  46. Context::$client_ip = $pack->header['client_ip'];
  47. Context::$client_port = $pack->header['client_port'];
  48. Context::$local_ip = $pack->header['local_ip'];
  49. Context::$local_port = $pack->header['local_port'];
  50. Context::$socket_id = $pack->header['socket_id'];
  51. Context::$uid = $pack->header['uid'];
  52. switch($pack->header['cmd'])
  53. {
  54. case GatewayProtocol::CMD_ON_CONNECTION:
  55. $ret = call_user_func_array(array('Event', 'onConnect'), array($pack->body));
  56. break;
  57. case GatewayProtocol::CMD_ON_MESSAGE:
  58. $ret = call_user_func_array(array('Event', 'onMessage'), array(Context::$uid, $pack->body));
  59. break;
  60. case GatewayProtocol::CMD_ON_CLOSE:
  61. $ret = call_user_func_array(array('Event', 'onClose'), array(Context::$uid));
  62. break;
  63. }
  64. Context::clear();
  65. return $ret;
  66. }
  67. /**
  68. * 定时检查gateway通信端口
  69. */
  70. public function checkGatewayConnections()
  71. {
  72. $key = 'GLOBAL_GATEWAY_ADDRESS';
  73. $addresses_list = Store::get($key);
  74. if(empty($addresses_list))
  75. {
  76. return;
  77. }
  78. foreach($addresses_list as $addr)
  79. {
  80. if(!isset(self::$gatewayConnections[$addr]))
  81. {
  82. $conn = stream_socket_client("tcp://$addr", $errno, $errstr, 1);
  83. if(!$conn)
  84. {
  85. $this->notice($errstr);
  86. continue;
  87. }
  88. self::$gatewayConnections[$addr] = $conn;
  89. stream_set_blocking(self::$gatewayConnections[$addr], 0);
  90. $fd = (int) self::$gatewayConnections[$addr];
  91. $this->connections[$fd] = self::$gatewayConnections[$addr];
  92. $this->recvBuffers[$fd] = array('buf'=>'', 'remain_len'=>$this->prereadLength);
  93. $this->event->add($this->connections[$fd], \Man\Core\Events\BaseEvent::EV_READ , array($this, 'dealInputBase'), $fd);
  94. }
  95. }
  96. }
  97. /**
  98. * 关闭连接
  99. * @see Man\Core.SocketWorker::closeClient()
  100. */
  101. protected function closeClient($fd)
  102. {
  103. foreach(self::$gatewayConnections as $addr => $con)
  104. {
  105. $the_fd = (int) $con;
  106. if($the_fd == $fd)
  107. {
  108. unset(self::$gatewayConnections[$addr]);
  109. }
  110. }
  111. parent::closeClient($fd);
  112. }
  113. }
  114. /**
  115. * 上下文 包含当前用户uid, 内部通信local_ip local_port socket_id ,以及客户端client_ip client_port
  116. * @author walkor
  117. *
  118. */
  119. class Context
  120. {
  121. public static $series_id;
  122. public static $local_ip;
  123. public static $local_port;
  124. public static $socket_id;
  125. public static $client_ip;
  126. public static $client_port;
  127. public static $uid;
  128. public static function clear()
  129. {
  130. self::$series_id = self::$local_ip = self::$local_port = self::$socket_id = self::$client_ip = self::$client_port = self::$uid = null;
  131. }
  132. }