| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176 |
- <?php
- /**
- *
- * 处理具体逻辑
- *
- * @author walkor <workerman.net>
- *
- */
- define('ROOT_DIR', realpath(__DIR__.'/../'));
- require_once ROOT_DIR . '/Protocols/GatewayProtocol.php';
- require_once ROOT_DIR . '/Event.php';
- require_once ROOT_DIR . '/Lib/APLog.php';
- class BusinessWorker extends Man\Core\SocketWorker
- {
- /**
- * BusinessWorker 实例
- * @var BusinessWorker
- */
- protected static $instance = null;
-
- /**
- * 与gateway的连接
- * ['ip:port' => conn, 'ip:port' => conn, ...]
- * @var array
- */
- protected static $gatewayConnections = array();
-
- /**
- * 进程启动时初始化
- * @see Man\Core.SocketWorker::onStart()
- */
- protected function onStart()
- {
- // 定时检查与gateway进程的连接
- \Man\Core\Lib\Task::init($this->event);
- \Man\Core\Lib\Task::add(1, array($this, 'checkGatewayConnections'));
- self::$instance = $this;
- }
-
- /**
- * 获取BusinessWorker实例
- * @return BusinessWorker
- */
- public static function instance()
- {
- return self::$instance;;
- }
-
- /**
- * 获取与gateway的连接
- */
- public static function getGatewayConnections()
- {
- return self::$gatewayConnections;
- }
-
- /**
- * 检查请求是否完整
- * @see Man\Core.SocketWorker::dealInput()
- */
- public function dealInput($recv_str)
- {
- return GatewayProtocol::input($recv_str);
- }
- /**
- * 处理请求
- * @see Man\Core.SocketWorker::dealProcess()
- */
- public function dealProcess($recv_str)
- {
- $pack = new GatewayProtocol($recv_str);
- Context::$client_ip = $pack->header['client_ip'];
- Context::$client_port = $pack->header['client_port'];
- Context::$local_ip = $pack->header['local_ip'];
- Context::$local_port = $pack->header['local_port'];
- Context::$socket_id = $pack->header['socket_id'];
- Context::$uid = $pack->header['uid'];
- switch($pack->header['cmd'])
- {
- case GatewayProtocol::CMD_ON_CONNECTION:
- $ret = call_user_func_array(array('Event', 'onConnect'), array($pack->body));
- break;
- case GatewayProtocol::CMD_ON_MESSAGE:
- $ret = call_user_func_array(array('Event', 'onMessage'), array(Context::$uid, $pack->body));
- break;
- case GatewayProtocol::CMD_ON_CLOSE:
- $ret = call_user_func_array(array('Event', 'onClose'), array(Context::$uid));
- break;
- }
- Context::clear();
- return $ret;
- }
-
- /**
- * 定时检查gateway通信端口
- */
- public function checkGatewayConnections()
- {
- $key = 'GLOBAL_GATEWAY_ADDRESS';
- $addresses_list = Store::get($key);
- if(empty($addresses_list))
- {
- return;
- }
-
- foreach($addresses_list as $addr)
- {
- if(!isset(self::$gatewayConnections[$addr]))
- {
- $conn = stream_socket_client("tcp://$addr", $errno, $errstr, 1);
- if(!$conn)
- {
- $this->notice($errstr);
- continue;
- }
- self::$gatewayConnections[$addr] = $conn;
- stream_set_blocking(self::$gatewayConnections[$addr], 0);
-
- $fd = (int) self::$gatewayConnections[$addr];
- $this->connections[$fd] = self::$gatewayConnections[$addr];
- $this->recvBuffers[$fd] = array('buf'=>'', 'remain_len'=>$this->prereadLength);
- $this->event->add($this->connections[$fd], \Man\Core\Events\BaseEvent::EV_READ , array($this, 'dealInputBase'), $fd);
- }
- }
- }
-
- /**
- * 关闭连接
- * @see Man\Core.SocketWorker::closeClient()
- */
- protected function closeClient($fd)
- {
- foreach(self::$gatewayConnections as $con)
- {
- $the_fd = (int) $con;
- if($the_fd == $fd)
- {
- unset(self::$gatewayConnections[$fd]);
- }
- }
- parent::closeClient($fd);
- }
-
- /**
- * 向客户端发送数据
- * @see Man\Core.SocketWorker::sendToClient()
- */
- public function sendToClient($buffer, $fd)
- {
- $this->currentDealFd = (int)$fd;
- parent::sendToClient($buffer);
- }
- }
- /**
- * 上下文 包含当前用户uid, 内部通信local_ip local_port socket_id ,以及客户端client_ip client_port
- * @author walkor
- *
- */
- class Context
- {
- public static $series_id;
- public static $local_ip;
- public static $local_port;
- public static $socket_id;
- public static $client_ip;
- public static $client_port;
- public static $uid;
- public static function clear()
- {
- self::$series_id = self::$local_ip = self::$local_port = self::$socket_id = self::$client_ip = self::$client_port = self::$uid = null;
- }
- }
|