BusinessWorker.php 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. <?php
  2. /**
  3. *
  4. * 处理具体逻辑
  5. *
  6. * @author walkor <workerman.net>
  7. *
  8. */
  9. define('ROOT_DIR', realpath(__DIR__.'/../'));
  10. require_once ROOT_DIR . '/Lib/Gateway.php';
  11. require_once ROOT_DIR . '/Event.php';
  12. class BusinessWorker extends Man\Core\SocketWorker
  13. {
  14. /**
  15. * 与gateway的连接
  16. * ['ip:port' => conn, 'ip:port' => conn, ...]
  17. * @var array
  18. */
  19. protected $gatewayConnections = array();
  20. /**
  21. * 进程启动时初始化
  22. * @see Man\Core.SocketWorker::onStart()
  23. */
  24. protected function onStart()
  25. {
  26. // 定时检查与gateway进程的连接
  27. \Man\Core\Lib\Task::init($this->event);
  28. \Man\Core\Lib\Task::add(1, array($this, 'checkGatewayConnections'));
  29. GateWay::setBusinessWorker($this);
  30. }
  31. /**
  32. * 获取与gateway的连接
  33. */
  34. public static function getGatewayConnections()
  35. {
  36. return $this->gatewayConnections;
  37. }
  38. /**
  39. * 检查gateway转发来的用户请求是否完整
  40. * @see Man\Core.SocketWorker::dealInput()
  41. */
  42. public function dealInput($recv_str)
  43. {
  44. return GatewayProtocol::input($recv_str);
  45. }
  46. /**
  47. * 处理请求
  48. * @see Man\Core.SocketWorker::dealProcess()
  49. */
  50. public function dealProcess($recv_str)
  51. {
  52. $pack = new GatewayProtocol($recv_str);
  53. Context::$client_ip = $pack->header['client_ip'];
  54. Context::$client_port = $pack->header['client_port'];
  55. Context::$local_ip = $pack->header['local_ip'];
  56. Context::$local_port = $pack->header['local_port'];
  57. Context::$socket_id = $pack->header['socket_id'];
  58. Context::$uid = $pack->header['uid'];
  59. switch($pack->header['cmd'])
  60. {
  61. case GatewayProtocol::CMD_ON_CONNECTION:
  62. $ret = call_user_func_array(array('Event', 'onConnect'), array($pack->body));
  63. break;
  64. case GatewayProtocol::CMD_ON_MESSAGE:
  65. $ret = call_user_func_array(array('Event', 'onMessage'), array(Context::$uid, $pack->body));
  66. break;
  67. case GatewayProtocol::CMD_ON_CLOSE:
  68. $ret = call_user_func_array(array('Event', 'onClose'), array(Context::$uid));
  69. break;
  70. }
  71. Context::clear();
  72. return $ret;
  73. }
  74. /**
  75. * 定时检查gateway通信端口,如果有新的gateway则去建立长连接
  76. */
  77. public function checkGatewayConnections()
  78. {
  79. $key = 'GLOBAL_GATEWAY_ADDRESS';
  80. $addresses_list = Store::get($key);
  81. if(empty($addresses_list))
  82. {
  83. return;
  84. }
  85. // 循环遍历,查找未连接的gateway ip 端口
  86. foreach($addresses_list as $addr)
  87. {
  88. if(!isset($this->gatewayConnections[$addr]))
  89. {
  90. // 执行连接
  91. $conn = stream_socket_client("tcp://$addr", $errno, $errstr, 1);
  92. if(!$conn)
  93. {
  94. $this->notice($errstr);
  95. continue;
  96. }
  97. $this->gatewayConnections[$addr] = $conn;
  98. stream_set_blocking($this->gatewayConnections[$addr], 0);
  99. // 初始化一些值
  100. $fd = (int) $this->gatewayConnections[$addr];
  101. $this->connections[$fd] = $this->gatewayConnections[$addr];
  102. $this->recvBuffers[$fd] = array('buf'=>'', 'remain_len'=>$this->prereadLength);
  103. // 添加数据可读事件
  104. $this->event->add($this->connections[$fd], \Man\Core\Events\BaseEvent::EV_READ , array($this, 'dealInputBase'), $fd);
  105. }
  106. }
  107. }
  108. /**
  109. * 发送数据给客户端
  110. * @see Man\Core.SocketWorker::sendToClient()
  111. */
  112. public function sendToClient($buffer, $con = null)
  113. {
  114. if($con)
  115. {
  116. $this->currentDealFd = (int) $con;
  117. }
  118. return parent::sendToClient($buffer);
  119. }
  120. /**
  121. * 关闭连接
  122. * @see Man\Core.SocketWorker::closeClient()
  123. */
  124. protected function closeClient($fd)
  125. {
  126. // 清理$this->gatewayConnections对应项
  127. foreach($this->gatewayConnections as $addr => $con)
  128. {
  129. $the_fd = (int) $con;
  130. if($the_fd == $fd)
  131. {
  132. unset($this->gatewayConnections[$addr]);
  133. }
  134. }
  135. parent::closeClient($fd);
  136. }
  137. }