RpcClient.php 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. <?php
  2. /**
  3. *
  4. * RpcClient Rpc客户端
  5. *
  6. *
  7. * 示例
  8. * // 服务端列表
  9. $address_array = array(
  10. 'tcp://127.0.0.1:2015',
  11. 'tcp://127.0.0.1:2015'
  12. );
  13. // 配置服务端列表
  14. RpcClient::config($address_array);
  15. $uid = 567;
  16. $user_client = RpcClient::instance('User');
  17. // ==同步调用==
  18. $ret_sync = $user_client->getInfoByUid($uid);
  19. // ==异步调用==
  20. // 异步发送数据
  21. $user_client->asend_getInfoByUid($uid);
  22. $user_client->asend_getEmail($uid);
  23. 这里是其它的业务代码
  24. ..............................................
  25. // 异步接收数据
  26. $ret_async1 = $user_client->arecv_getEmail($uid);
  27. $ret_async2 = $user_client->arecv_getInfoByUid($uid);
  28. *
  29. * @author walkor <worker-man@qq.com>
  30. */
  31. class RpcClient
  32. {
  33. /**
  34. * 发送数据和接收数据的超时时间 单位S
  35. * @var integer
  36. */
  37. const TIME_OUT = 1;
  38. /**
  39. * 异步调用发送数据前缀
  40. * @var string
  41. */
  42. const ASYNC_SEND_PREFIX = 'asend_';
  43. /**
  44. * 异步调用接收数据
  45. * @var string
  46. */
  47. const ASYNC_RECV_PREFIX = 'arecv_';
  48. /**
  49. * 服务端地址
  50. * @var array
  51. */
  52. protected static $addressArray = array();
  53. /**
  54. * 异步调用实例
  55. * @var string
  56. */
  57. protected static $asyncInstances = array();
  58. /**
  59. * 同步调用实例
  60. * @var string
  61. */
  62. protected static $instances = array();
  63. /**
  64. * 到服务端的socket连接
  65. * @var resource
  66. */
  67. protected $connection = null;
  68. /**
  69. * 实例的服务名
  70. * @var string
  71. */
  72. protected $serviceName = '';
  73. /**
  74. * 设置/获取服务端地址
  75. * @param array $address_array
  76. */
  77. public static function config($address_array = array())
  78. {
  79. if(!empty($address_array))
  80. {
  81. self::$addressArray = $address_array;
  82. }
  83. return self::$addressArray;
  84. }
  85. /**
  86. * 获取一个实例
  87. * @param string $service_name
  88. * @return instance of RpcClient
  89. */
  90. public static function instance($service_name)
  91. {
  92. if(!isset(self::$instances[$service_name]))
  93. {
  94. self::$instances[$service_name] = new self($service_name);
  95. }
  96. return self::$instances[$service_name];
  97. }
  98. /**
  99. * 构造函数
  100. * @param string $service_name
  101. */
  102. protected function __construct($service_name)
  103. {
  104. $this->serviceName = $service_name;
  105. }
  106. /**
  107. * 调用
  108. * @param string $method
  109. * @param array $arguments
  110. * @throws Exception
  111. * @return
  112. */
  113. public function __call($method, $arguments)
  114. {
  115. // 判断是否是异步发送
  116. if(0 === strpos($method, self::ASYNC_SEND_PREFIX))
  117. {
  118. $real_method = substr($method, strlen(self::ASYNC_SEND_PREFIX));
  119. $instance_key = $real_method . serialize($arguments);
  120. if(isset(self::$asyncInstances[$instance_key]))
  121. {
  122. throw new Exception($this->serviceName . "->$method(".implode(',', $arguments).") have already been called");
  123. }
  124. self::$asyncInstances[$instance_key] = new self($this->serviceName);
  125. return self::$asyncInstances[$instance_key]->sendData($real_method, $arguments);
  126. }
  127. // 如果是异步接受数据
  128. if(0 === strpos($method, self::ASYNC_RECV_PREFIX))
  129. {
  130. $real_method = substr($method, strlen(self::ASYNC_RECV_PREFIX));
  131. $instance_key = $real_method . serialize($arguments);
  132. if(!isset(self::$asyncInstances[$instance_key]))
  133. {
  134. throw new Exception($this->serviceName . "->arecv_$real_method(".implode(',', $arguments).") have not been called");
  135. }
  136. return self::$asyncInstances[$instance_key]->recvData($real_method, $arguments);
  137. }
  138. // 同步发送接收
  139. $this->sendData($method, $arguments);
  140. return $this->recvData();
  141. }
  142. /**
  143. * 发送数据给服务端
  144. * @param string $method
  145. * @param array $arguments
  146. */
  147. public function sendData($method, $arguments)
  148. {
  149. $this->openConnection();
  150. $bin_data = RpcProtocol::encode(array(
  151. 'class' => $this->serviceName,
  152. 'method' => $method,
  153. 'param_array' => $arguments,
  154. ));
  155. return fwrite($this->connection, $bin_data) == strlen($bin_data);
  156. }
  157. /**
  158. * 从服务端接收数据
  159. * @throws Exception
  160. */
  161. public function recvData()
  162. {
  163. $ret = fgets($this->connection);
  164. $this->closeConnection();
  165. if(!$ret)
  166. {
  167. throw new Exception("recvData empty");
  168. }
  169. return RpcProtocol::decode($ret);
  170. }
  171. /**
  172. * 打开到服务端的连接
  173. * @return void
  174. */
  175. protected function openConnection()
  176. {
  177. $address = self::$addressArray[array_rand(self::$addressArray)];
  178. $this->connection = stream_socket_client($address, $err_no, $err_msg);
  179. if(!$this->connection)
  180. {
  181. throw new Exception("can not connect to $address , $err_no:$err_msg");
  182. }
  183. stream_set_timeout($this->connection, self::TIME_OUT);
  184. }
  185. /**
  186. * 关闭到服务端的连接
  187. * @return void
  188. */
  189. protected function closeConnection()
  190. {
  191. fclose($this->connection);
  192. $this->connection = null;
  193. }
  194. }
  195. /**
  196. * RPC 协议解析 相关
  197. * 协议格式为 [json字符串\n]
  198. * @author walkor <worker-man@qq.com>
  199. * */
  200. class RpcProtocol
  201. {
  202. /**
  203. * 从socket缓冲区中预读长度
  204. * @var integer
  205. */
  206. const PRREAD_LENGTH = 87380;
  207. /**
  208. * 判断数据包是否接收完整
  209. * @param string $bin_data
  210. * @param mixed $data
  211. * @return integer 0代表接收完毕,大于0代表还要接收数据
  212. */
  213. public static function dealInput($bin_data)
  214. {
  215. $bin_data_length = strlen($bin_data);
  216. // 判断最后一个字符是否为\n,\n代表一个数据包的结束
  217. if($bin_data[$bin_data_length-1] !="\n")
  218. {
  219. // 再读
  220. return self::PRREAD_LENGTH;
  221. }
  222. return 0;
  223. }
  224. /**
  225. * 将数据打包成Rpc协议数据
  226. * @param mixed $data
  227. * @return string
  228. */
  229. public static function encode($data)
  230. {
  231. return json_encode($data)."\n";
  232. }
  233. /**
  234. * 解析Rpc协议数据
  235. * @param string $bin_data
  236. * @return mixed
  237. */
  238. public static function decode($bin_data)
  239. {
  240. return json_decode(trim($bin_data), true);
  241. }
  242. }
  243. // ==以下调用示例==
  244. if(false)
  245. {
  246. // 服务端列表
  247. $address_array = array(
  248. 'tcp://127.0.0.1:2015',
  249. 'tcp://127.0.0.1:2015'
  250. );
  251. // 配置服务端列表
  252. RpcClient::config($address_array);
  253. $uid = 567;
  254. $user_client = RpcClient::instance('User');
  255. // ==同步调用==
  256. $ret_sync = $user_client->getInfoByUid($uid);
  257. // ==异步调用==
  258. // 异步发送数据
  259. $user_client->asend_getInfoByUid($uid);
  260. $user_client->asend_getEmail($uid);
  261. /**
  262. * 这里是其它的业务代码
  263. * ..............................................
  264. **/
  265. // 异步接收数据
  266. $ret_async1 = $user_client->arecv_getEmail($uid);
  267. $ret_async2 = $user_client->arecv_getInfoByUid($uid);
  268. // 打印结果
  269. var_dump($ret_sync, $ret_async1, $ret_async2);
  270. }