Prechádzať zdrojové kódy

RpcWorker add time_out control

walkor 12 rokov pred
rodič
commit
ff835962c5

+ 121 - 4
applications/Rpc/Clients/RpcClient.php

@@ -2,24 +2,90 @@
 /**
  * 
  *  RpcClient Rpc客户端
+ *  
+ *  
+ *  示例
+ *  // 服务端列表
+    $address_array = array(
+            'tcp://127.0.0.1:2015',
+            'tcp://127.0.0.1:2015'
+            );
+    // 配置服务端列表
+    RpcClient::config($address_array);
+    
+    $uid = 567;
+    $user_client = RpcClient::instance('User');
+    // ==同步调用==
+    $ret_sync = $user_client->getInfoByUid($uid);
+   
+    // ==异步调用==
+    // 异步发送数据
+    $user_client->asend_getInfoByUid($uid);
+    $user_client->asend_getEmail($uid);
+
+     这里是其它的业务代码
+     ..............................................
+     
+    // 异步接收数据
+    $ret_async1 = $user_client->arecv_getEmail($uid);
+    $ret_async2 = $user_client->arecv_getInfoByUid($uid);
+ *  
  * @author walkor <worker-man@qq.com>
  */
 class RpcClient
 {
+    /**
+     * 发送数据和接收数据的超时时间  单位S
+     * @var integer
+     */
+    const TIME_OUT = 1;
+    
+    /**
+     * 异步调用发送数据前缀
+     * @var string
+     */
     const ASYNC_SEND_PREFIX = 'asend_';
     
+    /**
+     * 异步调用接收数据
+     * @var string
+     */
     const ASYNC_RECV_PREFIX = 'arecv_';
     
+    /**
+     * 服务端地址
+     * @var array
+     */
     protected static $addressArray = array();
     
+    /**
+     * 异步调用实例
+     * @var string
+     */
     protected static $asyncInstances = array();
     
+    /**
+     * 同步调用实例
+     * @var string
+     */
     protected static $instances = array();
     
+    /**
+     * 到服务端的socket连接
+     * @var resource
+     */
     protected  $connection = null;
     
+    /**
+     * 实例的服务名
+     * @var string
+     */
     protected $serviceName = '';
     
+    /**
+     * 设置/获取服务端地址
+     * @param array $address_array
+     */
     public static function config($address_array = array())
     {
         if(!empty($address_array))
@@ -29,6 +95,11 @@ class RpcClient
         return self::$addressArray;
     }
     
+    /**
+     * 获取一个实例
+     * @param string $service_name
+     * @return instance of RpcClient
+     */
     public static function instance($service_name)
     {
         if(!isset(self::$instances[$service_name]))
@@ -38,11 +109,22 @@ class RpcClient
         return self::$instances[$service_name];
     }
     
+    /**
+     * 构造函数
+     * @param string $service_name
+     */
     protected function __construct($service_name)
     {
         $this->serviceName = $service_name;
     }
     
+    /**
+     * 调用
+     * @param string $method
+     * @param array $arguments
+     * @throws Exception
+     * @return 
+     */
     public function __call($method, $arguments)
     {
         // 判断是否是异步发送
@@ -57,6 +139,7 @@ class RpcClient
             self::$asyncInstances[$instance_key] = new self($this->serviceName);
             return self::$asyncInstances[$instance_key]->sendData($real_method, $arguments);
         }
+        // 如果是异步接受数据
         if(0 === strpos($method, self::ASYNC_RECV_PREFIX))
         {
             $real_method = substr($method, strlen(self::ASYNC_RECV_PREFIX));
@@ -72,6 +155,11 @@ class RpcClient
         return $this->recvData();
     }
     
+    /**
+     * 发送数据给服务端
+     * @param string $method
+     * @param array $arguments
+     */
     public function sendData($method, $arguments)
     {
         $this->openConnection();
@@ -80,9 +168,13 @@ class RpcClient
                 'method'         => $method,
                 'param_array'  => $arguments,
                 ));
-        return fwrite($this->connection, $bin_data);
+        return fwrite($this->connection, $bin_data) == strlen($bin_data);
     }
     
+    /**
+     * 从服务端接收数据
+     * @throws Exception
+     */
     public function recvData()
     {
         $ret = fgets($this->connection);
@@ -94,15 +186,29 @@ class RpcClient
         return RpcProtocol::decode($ret);
     }
     
+    /**
+     * 打开到服务端的连接
+     * @return void
+     */
     protected function openConnection()
     {
         $address = self::$addressArray[array_rand(self::$addressArray)];
         $this->connection = stream_socket_client($address, $err_no, $err_msg);
+        if(!$this->connection)
+        {
+            throw new Exception("can not connect to $address , $err_no:$err_msg");
+        }
+        stream_set_timeout($this->connection, self::TIME_OUT);
     }
     
+    /**
+     * 关闭到服务端的连接
+     * @return void
+     */
     protected function closeConnection()
     {
         fclose($this->connection);
+        $this->connection = null;
     }
 }
 
@@ -158,25 +264,36 @@ class RpcProtocol
     }
 }
 
-if(1)
+// ==以下调用示例==
+if(false)
 {
+    // 服务端列表
     $address_array = array(
             'tcp://127.0.0.1:2015',
             'tcp://127.0.0.1:2015'
             );
+    // 配置服务端列表
     RpcClient::config($address_array);
     
     $uid = 567;
-    $blog_id = 789;
     $user_client = RpcClient::instance('User');
     // ==同步调用==
     $ret_sync = $user_client->getInfoByUid($uid);
    
     // ==异步调用==
-    // 发送数据
+    // 异步发送数据
     $user_client->asend_getInfoByUid($uid);
     $user_client->asend_getEmail($uid);
+    
+    /**
+     * 这里是其它的业务代码
+     * ..............................................
+     **/
+    
+    // 异步接收数据
     $ret_async1 = $user_client->arecv_getEmail($uid);
     $ret_async2 = $user_client->arecv_getInfoByUid($uid);
+    
+    // 打印结果
     var_dump($ret_sync, $ret_async1, $ret_async2);
 }

+ 1 - 1
applications/Rpc/Services/User.php

@@ -11,7 +11,7 @@ class User
                'uid'    => $uid,
                'name'=> 'test',
                'age'   => 18,
-               'sex'    => '?',
+               'sex'    => 'hmm..',
                );
    }
    

+ 1 - 1
workers/RpcWorker.php

@@ -4,7 +4,7 @@ require_once WORKERMAN_ROOT_DIR . 'applications/Common/Protocols/RpcProtocol.php
 
 /**
  * 
- *  Rpcworker,Rpc服务的入口文件
+ *  RpcWorker,Rpc服务的入口文件
  *  根据客户端传递参数调用 applications/Rpc/Services/目录下的文件的类的方法
  *  
  * @author walkor <worker-man@qq.com>