RpcClient.php 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. <?php
  2. /**
  3. *
  4. * RpcClient Rpc客户端
  5. * @author walkor <worker-man@qq.com>
  6. */
  7. class RpcClient
  8. {
  9. const ASYNC_SEND_PREFIX = 'asend_';
  10. const ASYNC_RECV_PREFIX = 'arecv_';
  11. protected static $addressArray = array();
  12. protected static $asyncInstances = array();
  13. protected static $instances = array();
  14. protected $connection = null;
  15. protected $serviceName = '';
  16. public static function config($address_array = array())
  17. {
  18. if(!empty($address_array))
  19. {
  20. self::$addressArray = $address_array;
  21. }
  22. return self::$addressArray;
  23. }
  24. public static function instance($service_name)
  25. {
  26. if(!isset(self::$instances[$service_name]))
  27. {
  28. self::$instances[$service_name] = new self($service_name);
  29. }
  30. return self::$instances[$service_name];
  31. }
  32. protected function __construct($service_name)
  33. {
  34. $this->serviceName = $service_name;
  35. }
  36. public function __call($method, $arguments)
  37. {
  38. // 判断是否是异步发送
  39. if(0 === str_pos($method, self::ASYNC_SEND_PREFIX))
  40. {
  41. $real_method = substr($method, strlen(self::ASYNC_SEND_PREFIX));
  42. $instance_key = $real_method . serialize($arguments);
  43. if(isset(self::$asyncInstances[$instance_key]))
  44. {
  45. throw new Exception($this->serviceName . "->$method(".implode(',', $arguments).") have already been called");
  46. }
  47. self::$asyncInstances[$instance_key] = new self($this->serviceName);
  48. return self::$asyncInstances[$instance_key]->sendData($real_method, $arguments);
  49. }
  50. if(0 === str_pos($method, self::ASYNC_RECV_PREFIX))
  51. {
  52. $real_method = substr($method, strlen(self::ASYNC_RECV_PREFIX));
  53. $instance_key = $real_method . serialize($arguments);
  54. if(!isset(self::$asyncInstances[$instance_key]))
  55. {
  56. throw new Exception($this->serviceName . "->arecv_$real_method(".implode(',', $arguments).") have not been called");
  57. }
  58. return self::$asyncInstances[$instance_key]->recvData($real_method, $arguments);
  59. }
  60. }
  61. public function sendData($method, $arguments)
  62. {
  63. $this->openConnection();
  64. $bin_data = RpcProtocol::encode(array(
  65. 'class' => $this->serviceName,
  66. 'method' => $method,
  67. 'param_array' => $arguments,
  68. ));
  69. return fwrite($this->connection, $bin_data);
  70. }
  71. public function recvData($method, $arguments)
  72. {
  73. $ret = fgets($this->connection);
  74. $this->closeConnection();
  75. if(!$ret)
  76. {
  77. throw new Exception("recvData empty");
  78. }
  79. }
  80. protected function openConnection()
  81. {
  82. $address = self::$addressArray[array_rand(self::$addressArray)];
  83. $this->connection = stream_socket_client($address, $err_no, $err_msg);
  84. }
  85. protected function closeConnection()
  86. {
  87. fclose($this->connection);
  88. }
  89. }
  90. /**
  91. * RPC 协议解析 相关
  92. * 协议格式为 [json字符串\n]
  93. * @author walkor <worker-man@qq.com>
  94. * */
  95. class RpcProtocol
  96. {
  97. /**
  98. * 从socket缓冲区中预读长度
  99. * @var integer
  100. */
  101. const PRREAD_LENGTH = 87380;
  102. /**
  103. * 判断数据包是否接收完整
  104. * @param string $bin_data
  105. * @param mixed $data
  106. * @return integer 0代表接收完毕,大于0代表还要接收数据
  107. */
  108. public static function dealInput($bin_data)
  109. {
  110. $bin_data_length = strlen($bin_data);
  111. // 判断最后一个字符是否为\n,\n代表一个数据包的结束
  112. if($bin_data[$bin_data_length-1] !="\n")
  113. {
  114. // 再读
  115. return self::PRREAD_LENGTH;
  116. }
  117. return 0;
  118. }
  119. /**
  120. * 将数据打包成Rpc协议数据
  121. * @param mixed $data
  122. * @return string
  123. */
  124. public static function encode($data)
  125. {
  126. return json_encode($data)."\n";
  127. }
  128. /**
  129. * 解析Rpc协议数据
  130. * @param string $bin_data
  131. * @return mixed
  132. */
  133. public static function decode($bin_data)
  134. {
  135. return json_decode(trim($bin_data), true);
  136. }
  137. }
  138. if(false)
  139. {
  140. $address_array = array(
  141. 'tcp://127.0.0.1:2015',
  142. 'tcp://127.0.0.1:2015'
  143. );
  144. RpcClient::config($address_array);
  145. var_export(RpcClient::instance('User')->getInfoByUid(123));
  146. }