functions.php 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. <?php
  2. /**
  3. * 批量请求
  4. * @param array $request_buffer_array ['ip:port'=>req_buf, 'ip:port'=>req_buf, ...]
  5. * @return multitype:unknown string
  6. */
  7. function multiRequest($request_buffer_array)
  8. {
  9. $client_array = $sock_to_ip = $ip_list = array();
  10. foreach($request_buffer_array as $address => $buffer)
  11. {
  12. list($ip, $port) = explode(':', $address);
  13. $ip_list[$ip] = $ip;
  14. $client = stream_socket_client("tcp://$address", $errno, $errmsg, 1);
  15. if(!$client)
  16. {
  17. continue;
  18. }
  19. $client_array[$address] = $client;
  20. stream_set_timeout($client_array[$address], 0, 100000);
  21. fwrite($client_array[$address], $buffer);
  22. stream_set_blocking($client_array[$address], 0);
  23. $sock_to_address[(int)$client] = $address;
  24. }
  25. $read = $client_array;
  26. $write = $except = $read_buffer = array();
  27. $time_start = microtime(true);
  28. $timeout = 0.99;
  29. // 轮询处理数据
  30. while(count($read) > 0)
  31. {
  32. if(stream_select($read, $write, $except, 0, 200000))
  33. {
  34. foreach($read as $socket)
  35. {
  36. $address = $sock_to_address[(int)$socket];
  37. $buf = fread($socket, 8192);
  38. if(!$buf)
  39. {
  40. if(feof($socket))
  41. {
  42. unset($client_array[$address]);
  43. }
  44. continue;
  45. }
  46. if(!isset($read_buffer[$address]))
  47. {
  48. $read_buffer[$address] = $buf;
  49. }
  50. else
  51. {
  52. $read_buffer[$address] .= $buf;
  53. }
  54. // 数据接收完毕
  55. if(0 === JMProtocol::checkInput($read_buffer[$address]))
  56. {
  57. unset($client_array[$address]);
  58. }
  59. }
  60. }
  61. // 超时了
  62. if(microtime(true) - $time_start > $timeout)
  63. {
  64. break;
  65. }
  66. $read = $client_array;
  67. }
  68. foreach($read_buffer as $address => $buf)
  69. {
  70. list($ip, $port) = explode(':', $address);
  71. $this->lastSuccessIpArray[$ip] = $ip;
  72. }
  73. Cache::$lastFailedIpArray = array_diff($ip_list, Cache::$lastSuccessIpArray);
  74. ksort($read_buffer);
  75. return $read_buffer;
  76. }