RpcClient.php 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. <?php
  2. /**
  3. *
  4. * RpcClient Rpc客户端
  5. * @author walkor <worker-man@qq.com>
  6. */
  7. class RpcClient
  8. {
  9. const ASYNC_SEND_PREFIX = 'asend_';
  10. const ASYNC_RECV_PREFIX = 'arecv_';
  11. protected static $addressArray = array();
  12. protected static $asyncInstances = array();
  13. protected static $instances = array();
  14. protected $connection = null;
  15. protected $serviceName = '';
  16. public static function config($address_array = array())
  17. {
  18. if(!empty($address_array))
  19. {
  20. self::$addressArray = $address_array;
  21. }
  22. return self::$addressArray;
  23. }
  24. public static function instance($service_name)
  25. {
  26. if(!isset(self::$instances[$service_name]))
  27. {
  28. self::$instances[$service_name] = new self($service_name);
  29. }
  30. return self::$instances[$service_name];
  31. }
  32. protected function __construct($service_name)
  33. {
  34. $this->serviceName = $service_name;
  35. }
  36. public function __call($method, $arguments)
  37. {
  38. // 判断是否是异步发送
  39. if(0 === strpos($method, self::ASYNC_SEND_PREFIX))
  40. {
  41. $real_method = substr($method, strlen(self::ASYNC_SEND_PREFIX));
  42. $instance_key = $real_method . serialize($arguments);
  43. if(isset(self::$asyncInstances[$instance_key]))
  44. {
  45. throw new Exception($this->serviceName . "->$method(".implode(',', $arguments).") have already been called");
  46. }
  47. self::$asyncInstances[$instance_key] = new self($this->serviceName);
  48. return self::$asyncInstances[$instance_key]->sendData($real_method, $arguments);
  49. }
  50. if(0 === strpos($method, self::ASYNC_RECV_PREFIX))
  51. {
  52. $real_method = substr($method, strlen(self::ASYNC_RECV_PREFIX));
  53. $instance_key = $real_method . serialize($arguments);
  54. if(!isset(self::$asyncInstances[$instance_key]))
  55. {
  56. throw new Exception($this->serviceName . "->arecv_$real_method(".implode(',', $arguments).") have not been called");
  57. }
  58. return self::$asyncInstances[$instance_key]->recvData($real_method, $arguments);
  59. }
  60. // 同步发送接收
  61. $this->sendData($method, $arguments);
  62. return $this->recvData();
  63. }
  64. public function sendData($method, $arguments)
  65. {
  66. $this->openConnection();
  67. $bin_data = RpcProtocol::encode(array(
  68. 'class' => $this->serviceName,
  69. 'method' => $method,
  70. 'param_array' => $arguments,
  71. ));
  72. return fwrite($this->connection, $bin_data);
  73. }
  74. public function recvData()
  75. {
  76. $ret = fgets($this->connection);
  77. $this->closeConnection();
  78. if(!$ret)
  79. {
  80. throw new Exception("recvData empty");
  81. }
  82. return RpcProtocol::decode($ret);
  83. }
  84. protected function openConnection()
  85. {
  86. $address = self::$addressArray[array_rand(self::$addressArray)];
  87. $this->connection = stream_socket_client($address, $err_no, $err_msg);
  88. }
  89. protected function closeConnection()
  90. {
  91. fclose($this->connection);
  92. }
  93. }
  94. /**
  95. * RPC 协议解析 相关
  96. * 协议格式为 [json字符串\n]
  97. * @author walkor <worker-man@qq.com>
  98. * */
  99. class RpcProtocol
  100. {
  101. /**
  102. * 从socket缓冲区中预读长度
  103. * @var integer
  104. */
  105. const PRREAD_LENGTH = 87380;
  106. /**
  107. * 判断数据包是否接收完整
  108. * @param string $bin_data
  109. * @param mixed $data
  110. * @return integer 0代表接收完毕,大于0代表还要接收数据
  111. */
  112. public static function dealInput($bin_data)
  113. {
  114. $bin_data_length = strlen($bin_data);
  115. // 判断最后一个字符是否为\n,\n代表一个数据包的结束
  116. if($bin_data[$bin_data_length-1] !="\n")
  117. {
  118. // 再读
  119. return self::PRREAD_LENGTH;
  120. }
  121. return 0;
  122. }
  123. /**
  124. * 将数据打包成Rpc协议数据
  125. * @param mixed $data
  126. * @return string
  127. */
  128. public static function encode($data)
  129. {
  130. return json_encode($data)."\n";
  131. }
  132. /**
  133. * 解析Rpc协议数据
  134. * @param string $bin_data
  135. * @return mixed
  136. */
  137. public static function decode($bin_data)
  138. {
  139. return json_decode(trim($bin_data), true);
  140. }
  141. }
  142. if(1)
  143. {
  144. $address_array = array(
  145. 'tcp://127.0.0.1:2015',
  146. 'tcp://127.0.0.1:2015'
  147. );
  148. RpcClient::config($address_array);
  149. $uid = 567;
  150. $blog_id = 789;
  151. $user_client = RpcClient::instance('User');
  152. // ==同步调用==
  153. $ret_sync = $user_client->getInfoByUid($uid);
  154. // ==异步调用==
  155. // 发送数据
  156. $user_client->asend_getInfoByUid($uid);
  157. $user_client->asend_getEmail($uid);
  158. $ret_async1 = $user_client->arecv_getEmail($uid);
  159. $ret_async2 = $user_client->arecv_getInfoByUid($uid);
  160. var_dump($ret_sync, $ret_async1, $ret_async2);
  161. }