functions.php 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. <?php
  2. /**
  3. * 批量请求
  4. * @param array $request_buffer_array ['ip:port'=>req_buf, 'ip:port'=>req_buf, ...]
  5. * @return multitype:unknown string
  6. */
  7. function multiRequest($request_buffer_array)
  8. {
  9. \Statistics\Lib\Cache::$lastSuccessIpArray = array();
  10. $client_array = $sock_to_ip = $ip_list = array();
  11. foreach($request_buffer_array as $address => $buffer)
  12. {
  13. list($ip, $port) = explode(':', $address);
  14. $ip_list[$ip] = $ip;
  15. $client = stream_socket_client("tcp://$address", $errno, $errmsg, 1);
  16. if(!$client)
  17. {
  18. continue;
  19. }
  20. $client_array[$address] = $client;
  21. stream_set_timeout($client_array[$address], 0, 100000);
  22. fwrite($client_array[$address], $buffer);
  23. stream_set_blocking($client_array[$address], 0);
  24. $sock_to_address[(int)$client] = $address;
  25. }
  26. $read = $client_array;
  27. $write = $except = $read_buffer = array();
  28. $time_start = microtime(true);
  29. $timeout = 0.99;
  30. // 轮询处理数据
  31. while(count($read) > 0)
  32. {
  33. if(stream_select($read, $write, $except, 0, 200000))
  34. {
  35. foreach($read as $socket)
  36. {
  37. $address = $sock_to_address[(int)$socket];
  38. $buf = fread($socket, 8192);
  39. if(!$buf)
  40. {
  41. if(feof($socket))
  42. {
  43. unset($client_array[$address]);
  44. }
  45. continue;
  46. }
  47. if(!isset($read_buffer[$address]))
  48. {
  49. $read_buffer[$address] = $buf;
  50. }
  51. else
  52. {
  53. $read_buffer[$address] .= $buf;
  54. }
  55. // 数据接收完毕
  56. if(($len = strlen($read_buffer[$address])) && $read_buffer[$address][$len-1] === "\n")
  57. {
  58. unset($client_array[$address]);
  59. }
  60. }
  61. }
  62. // 超时了
  63. if(microtime(true) - $time_start > $timeout)
  64. {
  65. break;
  66. }
  67. $read = $client_array;
  68. }
  69. foreach($read_buffer as $address => $buf)
  70. {
  71. list($ip, $port) = explode(':', $address);
  72. \Statistics\Lib\Cache::$lastSuccessIpArray[$ip] = $ip;
  73. }
  74. \Statistics\Lib\Cache::$lastFailedIpArray = array_diff($ip_list, \Statistics\Lib\Cache::$lastSuccessIpArray);
  75. ksort($read_buffer);
  76. return $read_buffer;
  77. }