BusinessWorker.php 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. <?php
  2. /**
  3. *
  4. * 处理具体逻辑
  5. *
  6. * @author walkor <workerman.net>
  7. *
  8. */
  9. define('ROOT_DIR', realpath(__DIR__.'/../'));
  10. require_once ROOT_DIR . '/Lib/Gateway.php';
  11. require_once ROOT_DIR . '/Event.php';
  12. class BusinessWorker extends Man\Core\SocketWorker
  13. {
  14. /**
  15. * 与gateway的连接
  16. * ['ip:port' => conn, 'ip:port' => conn, ...]
  17. * @var array
  18. */
  19. protected $gatewayConnections = array();
  20. /**
  21. * 进程启动时初始化
  22. * @see Man\Core.SocketWorker::onStart()
  23. */
  24. protected function onStart()
  25. {
  26. // 定时检查与gateway进程的连接
  27. \Man\Core\Lib\Task::init($this->event);
  28. \Man\Core\Lib\Task::add(1, array($this, 'checkGatewayConnections'));
  29. $this->checkGatewayConnections();
  30. GateWay::setBusinessWorker($this);
  31. }
  32. /**
  33. * 获取与gateway的连接
  34. */
  35. public function getGatewayConnections()
  36. {
  37. return $this->gatewayConnections;
  38. }
  39. /**
  40. * 检查gateway转发来的用户请求是否完整
  41. * @see Man\Core.SocketWorker::dealInput()
  42. */
  43. public function dealInput($recv_str)
  44. {
  45. return GatewayProtocol::input($recv_str);
  46. }
  47. /**
  48. * 处理请求
  49. * @see Man\Core.SocketWorker::dealProcess()
  50. */
  51. public function dealProcess($recv_str)
  52. {
  53. $pack = new GatewayProtocol($recv_str);
  54. Context::$client_ip = $pack->header['client_ip'];
  55. Context::$client_port = $pack->header['client_port'];
  56. Context::$local_ip = $pack->header['local_ip'];
  57. Context::$local_port = $pack->header['local_port'];
  58. Context::$socket_id = $pack->header['socket_id'];
  59. Context::$uid = $pack->header['uid'];
  60. switch($pack->header['cmd'])
  61. {
  62. case GatewayProtocol::CMD_ON_CONNECTION:
  63. $ret = call_user_func_array(array('Event', 'onConnect'), array($pack->body));
  64. break;
  65. case GatewayProtocol::CMD_ON_MESSAGE:
  66. $ret = call_user_func_array(array('Event', 'onMessage'), array(Context::$uid, $pack->body));
  67. break;
  68. case GatewayProtocol::CMD_ON_CLOSE:
  69. $ret = call_user_func_array(array('Event', 'onClose'), array(Context::$uid));
  70. break;
  71. }
  72. Context::clear();
  73. return $ret;
  74. }
  75. /**
  76. * 定时检查gateway通信端口,如果有新的gateway则去建立长连接
  77. */
  78. public function checkGatewayConnections()
  79. {
  80. $key = 'GLOBAL_GATEWAY_ADDRESS';
  81. $addresses_list = Store::get($key);
  82. if(empty($addresses_list))
  83. {
  84. return;
  85. }
  86. // 循环遍历,查找未连接的gateway ip 端口
  87. foreach($addresses_list as $addr)
  88. {
  89. if(!isset($this->gatewayConnections[$addr]))
  90. {
  91. // 执行连接
  92. $conn = stream_socket_client("tcp://$addr", $errno, $errstr, 1);
  93. if(!$conn)
  94. {
  95. $this->notice($errstr);
  96. continue;
  97. }
  98. $this->gatewayConnections[$addr] = $conn;
  99. stream_set_blocking($this->gatewayConnections[$addr], 0);
  100. // 初始化一些值
  101. $fd = (int) $this->gatewayConnections[$addr];
  102. $this->connections[$fd] = $this->gatewayConnections[$addr];
  103. $this->recvBuffers[$fd] = array('buf'=>'', 'remain_len'=>$this->prereadLength);
  104. // 添加数据可读事件
  105. $this->event->add($this->connections[$fd], \Man\Core\Events\BaseEvent::EV_READ , array($this, 'dealInputBase'), $fd);
  106. }
  107. }
  108. }
  109. /**
  110. * 发送数据给客户端
  111. * @see Man\Core.SocketWorker::sendToClient()
  112. */
  113. public function sendToClient($buffer, $con = null)
  114. {
  115. if($con)
  116. {
  117. $this->currentDealFd = (int) $con;
  118. }
  119. return parent::sendToClient($buffer);
  120. }
  121. /**
  122. * 关闭连接
  123. * @see Man\Core.SocketWorker::closeClient()
  124. */
  125. protected function closeClient($fd)
  126. {
  127. // 清理$this->gatewayConnections对应项
  128. foreach($this->gatewayConnections as $addr => $con)
  129. {
  130. $the_fd = (int) $con;
  131. if($the_fd == $fd)
  132. {
  133. unset($this->gatewayConnections[$addr]);
  134. }
  135. }
  136. parent::closeClient($fd);
  137. }
  138. }