Bladeren bron

add RpcWorker RpcClient

walkor 12 jaren geleden
bovenliggende
commit
be3d1b510c

+ 54 - 0
applications/Common/Protocols/RpcProtocol.php

@@ -0,0 +1,54 @@
+<?php 
+
+/**
+ * RPC 协议解析 相关
+ * 协议格式为 [json字符串\n]
+ * @author walkor <worker-man@qq.com>
+ * */
+class RpcProtocol
+{
+   /**
+    * 从socket缓冲区中预读长度
+    * @var integer
+    */
+    const PRREAD_LENGTH = 87380;
+    
+    /**
+     * 判断数据包是否接收完整
+     * @param string $bin_data
+     * @param mixed $data
+     * @return integer 0代表接收完毕,大于0代表还要接收数据
+     */
+    public static function dealInput($bin_data)
+    {
+        $bin_data_length = strlen($bin_data);
+        // 判断最后一个字符是否为\n,\n代表一个数据包的结束
+        if($bin_data[$bin_data_length-1] !="\n")
+        {
+            // 再读
+            return self::PRREAD_LENGTH;
+        }
+        return 0;
+    }
+    
+    /**
+     * 将数据打包成Rpc协议数据
+     * @param mixed $data
+     * @return string
+     */
+    public static function encode($data)
+    {
+        return json_encode($data)."\n";
+    }
+    
+   /**
+    * 解析Rpc协议数据
+    * @param string $bin_data
+    * @return mixed
+    */
+    public static function decode($bin_data)
+    {
+        return json_decode(trim($bin_data), true);
+    }
+    
+}

+ 156 - 0
applications/Rpc/Clients/RpcClient.php

@@ -0,0 +1,156 @@
+<?php
+/**
+ * 
+ *  RpcClient Rpc客户端
+ * @author walkor <worker-man@qq.com>
+ */
+class RpcClient
+{
+    const ASYNC_SEND_PREFIX = 'asend_';
+    
+    const ASYNC_RECV_PREFIX = 'arecv_';
+    
+    protected static $addressArray = array();
+    
+    protected static $asyncInstances = array();
+    
+    protected static $instances = array();
+    
+    protected  $connection = null;
+    
+    protected $serviceName = '';
+    
+    protected static function config($address_array = array())
+    {
+        if(!empty($address_array))
+        {
+            self::$addressArray = $address_array;
+        }
+        return self::$addressArray;
+    }
+    
+    public static function instance($service_name)
+    {
+        if(!isset(self::$instances[$service_name]))
+        {
+            self::$instances[$service_name] = new self($service_name);
+        }
+        return self::$instances[$service_name];
+    }
+    
+    protected function __construct($service_name)
+    {
+        $this->serviceName = $service_name;
+    }
+    
+    public function __call($method, $arguments)
+    {
+        // 判断是否是异步发送
+        if(0 === str_pos($method, self::ASYNC_SEND_PREFIX))
+        {
+            $real_method = substr($method, strlen(self::ASYNC_SEND_PREFIX));
+            $instance_key = $real_method . serialize($arguments);
+            if(isset(self::$asyncInstances[$instance_key]))
+            {
+                throw new Exception($this->serviceName . "->$method(".implode(',', $arguments).") have already been called");
+            }
+            self::$asyncInstances[$instance_key] = new self($this->serviceName);
+            return self::$asyncInstances[$instance_key]->sendData($real_method, $arguments);
+        }
+        if(0 === str_pos($method, self::ASYNC_RECV_PREFIX))
+        {
+            $real_method = substr($method, strlen(self::ASYNC_RECV_PREFIX));
+            $instance_key = $real_method . serialize($arguments);
+            if(!isset(self::$asyncInstances[$instance_key]))
+            {
+                throw new Exception($this->serviceName . "->arecv_$real_method(".implode(',', $arguments).") have not been called");
+            }
+            return self::$asyncInstances[$instance_key]->recvData($real_method, $arguments);
+        }
+    }
+    
+    public function sendData($method, $arguments)
+    {
+        $this->openConnection();
+        $bin_data = RpcProtocol::encode(array(
+                'class'              => $this->serviceName,
+                'method'         => $method,
+                'param_array'  => $arguments,
+                ));
+        return fwrite($this->connection, $bin_data);
+    }
+    
+    public function recvData($method, $arguments)
+    {
+        $ret = fgets($this->connection);
+        $this->closeConnection();
+        if(!$ret)
+        {
+            throw new Exception("recvData empty");
+        }
+    }
+    
+    protected function openConnection()
+    {
+        $address = self::$addressArray[array_rand(self::$addressArray)];
+        $this->connection = stream_socket_client($address, $err_no, $err_msg);
+    }
+    
+    protected function closeConnection()
+    {
+        fclose($this->connection);
+    }
+}
+
+/**
+ * RPC 协议解析 相关
+ * 协议格式为 [json字符串\n]
+ * @author walkor <worker-man@qq.com>
+ * */
+class RpcProtocol
+{
+    /**
+     * 从socket缓冲区中预读长度
+     * @var integer
+     */
+    const PRREAD_LENGTH = 87380;
+
+    /**
+     * 判断数据包是否接收完整
+     * @param string $bin_data
+     * @param mixed $data
+     * @return integer 0代表接收完毕,大于0代表还要接收数据
+     */
+    public static function dealInput($bin_data)
+    {
+        $bin_data_length = strlen($bin_data);
+        // 判断最后一个字符是否为\n,\n代表一个数据包的结束
+        if($bin_data[$bin_data_length-1] !="\n")
+        {
+            // 再读
+            return self::PRREAD_LENGTH;
+        }
+        return 0;
+    }
+
+    /**
+     * 将数据打包成Rpc协议数据
+     * @param mixed $data
+     * @return string
+     */
+    public static function encode($data)
+    {
+        return json_encode($data)."\n";
+    }
+
+    /**
+     * 解析Rpc协议数据
+     * @param string $bin_data
+     * @return mixed
+     */
+    public static function decode($bin_data)
+    {
+        return json_decode(trim($bin_data), true);
+    }
+
+}

+ 27 - 0
applications/Rpc/Services/Blog.php

@@ -0,0 +1,27 @@
+<?php
+/**
+ *  测试
+ * @author walkor <worker-man@qq.com>
+ */
+class Blog
+{
+   public static function getByBlogId($blog_id)
+   {
+       return array(
+               'blog_id'    => $blog_id,
+               'title'=> 'workerman is a high performance RPC server framework for network applications implemented in PHP using libevent',
+               'content'   => 'this is content ...',
+               );
+   }
+   
+   public static function getTitleListByUid($uid)
+   {
+       return array(
+               'blog title 1',
+               'blog title 2',
+               'blog title 3',
+               'blog title 4',
+               'blog title 5',
+               );
+   }
+}

+ 22 - 0
applications/Rpc/Services/User.php

@@ -0,0 +1,22 @@
+<?php
+/**
+ *  测试
+ * @author walkor <worker-man@qq.com>
+ */
+class User
+{
+   public static function getInfo($uid)
+   {
+       return array(
+               'uid'    => $uid,
+               'name'=> 'test',
+               'age'   => 18,
+               'sex'    => '?',
+               );
+   }
+   
+   public static function getEmail($uid)
+   {
+       return 'worker-man@qq.com';
+   }
+}

+ 5 - 0
man/Core/Master.php

@@ -5,6 +5,11 @@ require_once WORKERMAN_ROOT_DIR . 'man/Core/Lib/Config.php';
 require_once WORKERMAN_ROOT_DIR . 'man/Core/Lib/Task.php';
 require_once WORKERMAN_ROOT_DIR . 'man/Core/Lib/Log.php';
 
+if(!defined('WORKERMAN_ROOT_DIR'))
+{
+    define('WORKERMAN_ROOT_DIR', realpath(__DIR__."/../../")."/");
+}
+
 /**
  * 
  * 主进程

+ 1 - 1
workers/EchoWorker.php

@@ -5,7 +5,7 @@
  * @author walkor <worker-man@qq.com>
  */
 require_once WORKERMAN_ROOT_DIR . 'man/Core/SocketWorker.php';
-class EchoWorker extends Man\Core\SocketWorker
+class RpcWorker extends Man\Core\SocketWorker
 {
     /**
      * 确定数据是否接收完整,这里每次收到包都认为数据完整

+ 72 - 0
workers/RpcWorker.php

@@ -0,0 +1,72 @@
+<?php
+require_once WORKERMAN_ROOT_DIR . 'man/Core/SocketWorker.php';
+require_once WORKERMAN_ROOT_DIR . 'applications/Common/Protocols/RpcProtocol.php';
+
+/**
+ * 
+ *  Rpcworker,Rpc服务的入口文件
+ *  根据客户端传递参数调用 applications/Rpc/Services/目录下的文件的类的方法
+ *  
+ * @author walkor <worker-man@qq.com>
+ */
+class RpcWorker extends Man\Core\SocketWorker
+{
+    /**
+     * 确定数据是否接收完整
+     * @see Man\Core.SocketWorker::dealInput()
+     */
+    public function dealInput($recv_str)
+    {
+        return RpcProtocol::dealInput($recv_str); 
+    }
+
+    /**
+     * 数据接收完整后处理业务逻辑
+     * @see Man\Core.SocketWorker::dealProcess()
+     */
+    public function dealProcess($recv_str)
+    {
+        /**
+         * data的数据格式为
+         * ['class'=>xx, 'method'=>xx, 'param_array'=>array(xx)]
+         * @var array
+         */
+        $data = RpcProtocol::decode($recv_str);
+        // 判断数据是否正确
+        if(empty($data['class']) || empty($data['method']) || !isset($data['param_array']))
+        {
+            // 发送数据给客户端,请求包错误
+            return $this->sendToClient(RpcProtocol::encode(array('code'=>400, 'msg'=>'bad request', 'data'=>null)));
+        }
+        // 获得要调用的类、方法、及参数
+        $class = $data['class'];
+        $method = $data['method'];
+        $param_array = $data['param_array'];
+        
+        // 判断类对应文件是否载入
+        if(class_exists($class))
+        {
+            $include_file = WORKERMAN_ROOT_DIR . "applications/Rpc/Services/$class.php";
+            if(!is_file($include_file))
+            {
+                // 发送数据给客户端 类不存在
+                return $this->sendToClient(RpcProtocol::encode(array('code'=>404, 'msg'=>'class not found', 'data'=>null)));
+            }
+            require_once $include_file;
+        }
+        
+        // 调用类的方法
+        try 
+        {
+            $ret = call_user_function_array(array($class, $method), $param_array);
+            // 发送数据给客户端,调用成功,data下标对应的元素即为调用结果
+            return $this->sendToClient(RpcProtocol::encode(array('code'=>0, 'msg'=>'ok', 'data'=>$ret)));
+        }
+        // 有异常
+        catch(Exception $e)
+        {
+            // 发送数据给客户端,发生异常,调用失败
+            return $this->sendToClient(RpcProtocol::encode(array('code'=>500, 'msg'=>$e->getMessage(), 'data'=>$e)));
+        }
+    }
+}