TcpConnection.php 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. <?php
  2. namespace Workerman\Connection;
  3. use Workerman\Events\Libevent;
  4. use Workerman\Events\Select;
  5. use Workerman\Events\EventInterface;
  6. use Workerman\Worker;
  7. use \Exception;
  8. /**
  9. * connection
  10. * @author walkor<walkor@workerman.net>
  11. */
  12. class TcpConnection extends ConnectionInterface
  13. {
  14. /**
  15. * when recv data from client ,how much bytes to read
  16. * @var unknown_type
  17. */
  18. const READ_BUFFER_SIZE = 8192;
  19. /**
  20. * connection status connecting
  21. * @var int
  22. */
  23. const STATUS_CONNECTING = 1;
  24. /**
  25. * connection status establish
  26. * @var int
  27. */
  28. const STATUS_ESTABLISH = 2;
  29. /**
  30. * connection status closing
  31. * @var int
  32. */
  33. const STATUS_CLOSING = 4;
  34. /**
  35. * connection status closed
  36. * @var int
  37. */
  38. const STATUS_CLOSED = 8;
  39. /**
  40. * when receive data, onMessage will be run
  41. * @var callback
  42. */
  43. public $onMessage = null;
  44. /**
  45. * when connection close, onClose will be run
  46. * @var callback
  47. */
  48. public $onClose = null;
  49. /**
  50. * when some thing wrong ,onError will be run
  51. * @var callback
  52. */
  53. public $onError = null;
  54. /**
  55. * protocol
  56. * @var string
  57. */
  58. public $protocol = '';
  59. /**
  60. * max send buffer size (Bytes)
  61. * @var int
  62. */
  63. public static $maxSendBufferSize = 1048576;
  64. /**
  65. * max package size (Bytes)
  66. * @var int
  67. */
  68. public static $maxPackageSize = 10485760;
  69. /**
  70. * the socket
  71. * @var resource
  72. */
  73. protected $_socket = null;
  74. /**
  75. * the buffer to send
  76. * @var string
  77. */
  78. protected $_sendBuffer = '';
  79. /**
  80. * the buffer read from socket
  81. * @var string
  82. */
  83. protected $_recvBuffer = '';
  84. /**
  85. * current package length
  86. * @var int
  87. */
  88. protected $_currentPackageLength = 0;
  89. /**
  90. * connection status
  91. * @var int
  92. */
  93. protected $_status = self::STATUS_ESTABLISH;
  94. /**
  95. * remote ip
  96. * @var string
  97. */
  98. protected $_remoteIp = '';
  99. /**
  100. * remote port
  101. * @var int
  102. */
  103. protected $_remotePort = 0;
  104. /**
  105. * remote address
  106. * @var string
  107. */
  108. protected $_remoteAddress = '';
  109. /**
  110. * create a connection
  111. * @param resource $socket
  112. * @param EventInterface $event
  113. */
  114. public function __construct($socket)
  115. {
  116. $this->_socket = $socket;
  117. stream_set_blocking($this->_socket, 0);
  118. Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
  119. }
  120. /**
  121. * send buffer to client
  122. * @param string $send_buffer
  123. * @param bool $raw
  124. * @return void|boolean
  125. */
  126. public function send($send_buffer, $raw = false)
  127. {
  128. if($this->_status == self::STATUS_CLOSED)
  129. {
  130. return false;
  131. }
  132. if(false === $raw && $this->protocol)
  133. {
  134. $parser = $this->protocol;
  135. $send_buffer = $parser::encode($send_buffer, $this);
  136. }
  137. if($this->_sendBuffer === '')
  138. {
  139. $len = @fwrite($this->_socket, $send_buffer);
  140. if($len === strlen($send_buffer))
  141. {
  142. return true;
  143. }
  144. if($len > 0)
  145. {
  146. $this->_sendBuffer = substr($send_buffer, $len);
  147. }
  148. else
  149. {
  150. if(feof($this->_socket))
  151. {
  152. self::$statistics['send_fail']++;
  153. if($this->onError)
  154. {
  155. call_user_func($this->onError, $this, WORKERMAN_SEND_FAIL, 'client closed');
  156. }
  157. $this->destroy();
  158. return false;
  159. }
  160. $this->_sendBuffer = $send_buffer;
  161. }
  162. Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
  163. return null;
  164. }
  165. else
  166. {
  167. // check send buffer size
  168. if(self::$maxSendBufferSize <= strlen($this->_sendBuffer) + strlen($send_buffer))
  169. {
  170. self::$statistics['send_fail']++;
  171. if($this->onError)
  172. {
  173. call_user_func($this->onError, $this, WORKERMAN_SEND_FAIL, 'send buffer full');
  174. }
  175. return false;
  176. }
  177. $this->_sendBuffer .= $send_buffer;
  178. }
  179. }
  180. /**
  181. * get remote ip
  182. * @return string
  183. */
  184. public function getRemoteIp()
  185. {
  186. if(!$this->_remoteIp)
  187. {
  188. $this->_remoteAddress = stream_socket_get_name($this->_socket, true);
  189. if($this->_remoteAddress)
  190. {
  191. list($this->_remoteIp, $this->_remotePort) = explode(':', $this->_remoteAddress, 2);
  192. $this->_remotePort = (int)$this->_remotePort;
  193. }
  194. }
  195. return $this->_remoteIp;
  196. }
  197. /**
  198. * get remote port
  199. */
  200. public function getRemotePort()
  201. {
  202. if(!$this->_remotePort)
  203. {
  204. $this->_remoteAddress = stream_socket_get_name($this->_socket, true);
  205. if($this->_remoteAddress)
  206. {
  207. list($this->_remoteIp, $this->_remotePort) = explode(':', $this->_remoteAddress, 2);
  208. $this->_remotePort = (int)$this->_remotePort;
  209. }
  210. }
  211. return $this->_remotePort;
  212. }
  213. /**
  214. * when socket is readable
  215. * @param resource $socket
  216. * @return void
  217. */
  218. public function baseRead($socket)
  219. {
  220. while($buffer = fread($socket, self::READ_BUFFER_SIZE))
  221. {
  222. $this->_recvBuffer .= $buffer;
  223. }
  224. if($this->_recvBuffer)
  225. {
  226. if(!$this->onMessage)
  227. {
  228. return ;
  229. }
  230. // protocol has been set
  231. if($this->protocol)
  232. {
  233. $parser = $this->protocol;
  234. while($this->_recvBuffer)
  235. {
  236. // already know current package length
  237. if($this->_currentPackageLength)
  238. {
  239. // we need more buffer
  240. if($this->_currentPackageLength > strlen($this->_recvBuffer))
  241. {
  242. break;
  243. }
  244. }
  245. else
  246. {
  247. // try to get the current package length
  248. $this->_currentPackageLength = $parser::input($this->_recvBuffer, $this);
  249. // need more buffer
  250. if($this->_currentPackageLength === 0)
  251. {
  252. break;
  253. }
  254. elseif($this->_currentPackageLength > 0 && $this->_currentPackageLength <= self::$maxPackageSize)
  255. {
  256. // need more buffer
  257. if($this->_currentPackageLength > strlen($this->_recvBuffer))
  258. {
  259. break;
  260. }
  261. }
  262. // error package
  263. else
  264. {
  265. $this->close('error package. package_length='.var_export($this->_currentPackageLength, true));
  266. }
  267. }
  268. // recvived the whole data
  269. self::$statistics['total_request']++;
  270. $one_request_buffer = substr($this->_recvBuffer, 0, $this->_currentPackageLength);
  271. $this->_recvBuffer = substr($this->_recvBuffer, $this->_currentPackageLength);
  272. $this->_currentPackageLength = 0;
  273. try
  274. {
  275. call_user_func($this->onMessage, $this, $parser::decode($one_request_buffer, $this));
  276. }
  277. catch(Exception $e)
  278. {
  279. self::$statistics['throw_exception']++;
  280. echo $e;
  281. }
  282. }
  283. if(feof($socket))
  284. {
  285. $this->destroy();
  286. return;
  287. }
  288. return;
  289. }
  290. self::$statistics['total_request']++;
  291. // protocol not set
  292. try
  293. {
  294. call_user_func($this->onMessage, $this, $this->_recvBuffer);
  295. }
  296. catch(Exception $e)
  297. {
  298. self::$statistics['throw_exception']++;
  299. echo $e;
  300. }
  301. $this->_recvBuffer = '';
  302. if(feof($socket))
  303. {
  304. $this->destroy();
  305. return;
  306. }
  307. }
  308. else if(feof($socket))
  309. {
  310. $this->destroy();
  311. return;
  312. }
  313. }
  314. /**
  315. * when socket is writeable
  316. * @return void
  317. */
  318. public function baseWrite()
  319. {
  320. $len = @fwrite($this->_socket, $this->_sendBuffer);
  321. if($len === strlen($this->_sendBuffer))
  322. {
  323. Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
  324. $this->_sendBuffer = '';
  325. if($this->_status == self::STATUS_CLOSING)
  326. {
  327. $this->destroy();
  328. }
  329. return true;
  330. }
  331. if($len > 0)
  332. {
  333. $this->_sendBuffer = substr($this->_sendBuffer, $len);
  334. }
  335. else
  336. {
  337. if(feof($this->_socket))
  338. {
  339. self::$statistics['send_fail']++;
  340. $this->destroy();
  341. }
  342. }
  343. }
  344. /**
  345. * consume recvBuffer
  346. * @param int $length
  347. */
  348. public function consumeRecvBuffer($length)
  349. {
  350. $this->_recvBuffer = substr($this->_recvBuffer, $length);
  351. }
  352. /**
  353. * close the connection
  354. * @param mixed $data
  355. * @void
  356. */
  357. public function close($data = null)
  358. {
  359. if($data !== null)
  360. {
  361. $this->send($data);
  362. }
  363. $this->_status = self::STATUS_CLOSING;
  364. if($this->_sendBuffer === '')
  365. {
  366. $this->destroy();
  367. }
  368. }
  369. /**
  370. * get socket
  371. * @return resource
  372. */
  373. public function getSocket()
  374. {
  375. return $this->_socket;
  376. }
  377. /**
  378. * destroy the connection
  379. * @void
  380. */
  381. protected function destroy()
  382. {
  383. if($this->onClose)
  384. {
  385. try
  386. {
  387. call_user_func($this->onClose, $this);
  388. }
  389. catch (Exception $e)
  390. {
  391. self::$statistics['throw_exception']++;
  392. echo $e;
  393. }
  394. }
  395. Worker::$globalEvent->del($this->_socket, EventInterface::EV_READ);
  396. Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
  397. @fclose($this->_socket);
  398. $this->_status = self::STATUS_CLOSED;
  399. }
  400. }