TcpConnection.php 27 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018
  1. <?php
  2. /**
  3. * This file is part of workerman.
  4. *
  5. * Licensed under The MIT License
  6. * For full copyright and license information, please see the MIT-LICENSE.txt
  7. * Redistributions of files must retain the above copyright notice.
  8. *
  9. * @author walkor<walkor@workerman.net>
  10. * @copyright walkor<walkor@workerman.net>
  11. * @link http://www.workerman.net/
  12. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  13. */
  14. namespace Workerman\Connection;
  15. use Workerman\Events\EventInterface;
  16. use Workerman\Protocols\Http\Request;
  17. use Workerman\Worker;
  18. /**
  19. * TcpConnection.
  20. */
  21. class TcpConnection extends ConnectionInterface implements \JsonSerializable
  22. {
  23. /**
  24. * Read buffer size.
  25. *
  26. * @var int
  27. */
  28. const READ_BUFFER_SIZE = 87380;
  29. /**
  30. * Status initial.
  31. *
  32. * @var int
  33. */
  34. const STATUS_INITIAL = 0;
  35. /**
  36. * Status connecting.
  37. *
  38. * @var int
  39. */
  40. const STATUS_CONNECTING = 1;
  41. /**
  42. * Status connection established.
  43. *
  44. * @var int
  45. */
  46. const STATUS_ESTABLISHED = 2;
  47. /**
  48. * Status closing.
  49. *
  50. * @var int
  51. */
  52. const STATUS_CLOSING = 4;
  53. /**
  54. * Status closed.
  55. *
  56. * @var int
  57. */
  58. const STATUS_CLOSED = 8;
  59. /**
  60. * Emitted when data is received.
  61. *
  62. * @var callable
  63. */
  64. public $onMessage = null;
  65. /**
  66. * Emitted when the other end of the socket sends a FIN packet.
  67. *
  68. * @var callable
  69. */
  70. public $onClose = null;
  71. /**
  72. * Emitted when an error occurs with connection.
  73. *
  74. * @var callable
  75. */
  76. public $onError = null;
  77. /**
  78. * Emitted when the send buffer becomes full.
  79. *
  80. * @var callable
  81. */
  82. public $onBufferFull = null;
  83. /**
  84. * Emitted when the send buffer becomes empty.
  85. *
  86. * @var callable
  87. */
  88. public $onBufferDrain = null;
  89. /**
  90. * Application layer protocol.
  91. * The format is like this Workerman\\Protocols\\Http.
  92. *
  93. * @var \Workerman\Protocols\ProtocolInterface
  94. */
  95. public $protocol = null;
  96. /**
  97. * Transport (tcp/udp/unix/ssl).
  98. *
  99. * @var string
  100. */
  101. public $transport = 'tcp';
  102. /**
  103. * Which worker belong to.
  104. *
  105. * @var Worker
  106. */
  107. public $worker = null;
  108. /**
  109. * Bytes read.
  110. *
  111. * @var int
  112. */
  113. public $bytesRead = 0;
  114. /**
  115. * Bytes written.
  116. *
  117. * @var int
  118. */
  119. public $bytesWritten = 0;
  120. /**
  121. * Connection->id.
  122. *
  123. * @var int
  124. */
  125. public $id = 0;
  126. /**
  127. * A copy of $worker->id which used to clean up the connection in worker->connections
  128. *
  129. * @var int
  130. */
  131. protected $_id = 0;
  132. /**
  133. * Sets the maximum send buffer size for the current connection.
  134. * OnBufferFull callback will be emited When the send buffer is full.
  135. *
  136. * @var int
  137. */
  138. public $maxSendBufferSize = 1048576;
  139. /**
  140. * Default send buffer size.
  141. *
  142. * @var int
  143. */
  144. public static $defaultMaxSendBufferSize = 1048576;
  145. /**
  146. * Sets the maximum acceptable packet size for the current connection.
  147. *
  148. * @var int
  149. */
  150. public $maxPackageSize = 1048576;
  151. /**
  152. * Default maximum acceptable packet size.
  153. *
  154. * @var int
  155. */
  156. public static $defaultMaxPackageSize = 10485760;
  157. /**
  158. * Id recorder.
  159. *
  160. * @var int
  161. */
  162. protected static $_idRecorder = 1;
  163. /**
  164. * Cache.
  165. *
  166. * @var bool.
  167. */
  168. protected static $_enableCache = true;
  169. /**
  170. * Socket
  171. *
  172. * @var resource
  173. */
  174. protected $_socket = null;
  175. /**
  176. * Send buffer.
  177. *
  178. * @var string
  179. */
  180. protected $_sendBuffer = '';
  181. /**
  182. * Receive buffer.
  183. *
  184. * @var string
  185. */
  186. protected $_recvBuffer = '';
  187. /**
  188. * Current package length.
  189. *
  190. * @var int
  191. */
  192. protected $_currentPackageLength = 0;
  193. /**
  194. * Connection status.
  195. *
  196. * @var int
  197. */
  198. protected $_status = self::STATUS_ESTABLISHED;
  199. /**
  200. * Remote address.
  201. *
  202. * @var string
  203. */
  204. protected $_remoteAddress = '';
  205. /**
  206. * Is paused.
  207. *
  208. * @var bool
  209. */
  210. protected $_isPaused = false;
  211. /**
  212. * SSL handshake completed or not.
  213. *
  214. * @var bool
  215. */
  216. protected $_sslHandshakeCompleted = false;
  217. /**
  218. * All connection instances.
  219. *
  220. * @var array
  221. */
  222. public static $connections = [];
  223. /**
  224. * Status to string.
  225. *
  226. * @var array
  227. */
  228. public static $_statusToString = [
  229. self::STATUS_INITIAL => 'INITIAL',
  230. self::STATUS_CONNECTING => 'CONNECTING',
  231. self::STATUS_ESTABLISHED => 'ESTABLISHED',
  232. self::STATUS_CLOSING => 'CLOSING',
  233. self::STATUS_CLOSED => 'CLOSED',
  234. ];
  235. /**
  236. * Construct.
  237. *
  238. * @param resource $socket
  239. * @param string $remote_address
  240. */
  241. public function __construct($socket, $remote_address = '')
  242. {
  243. ++self::$statistics['connection_count'];
  244. $this->id = $this->_id = self::$_idRecorder++;
  245. if (self::$_idRecorder === \PHP_INT_MAX) {
  246. self::$_idRecorder = 0;
  247. }
  248. $this->_socket = $socket;
  249. \stream_set_blocking($this->_socket, 0);
  250. // Compatible with hhvm
  251. if (\function_exists('stream_set_read_buffer')) {
  252. \stream_set_read_buffer($this->_socket, 0);
  253. }
  254. Worker::$globalEvent->onReadable($this->_socket, [$this, 'baseRead']);
  255. $this->maxSendBufferSize = self::$defaultMaxSendBufferSize;
  256. $this->maxPackageSize = self::$defaultMaxPackageSize;
  257. $this->_remoteAddress = $remote_address;
  258. static::$connections[$this->id] = $this;
  259. }
  260. /**
  261. * Get status.
  262. *
  263. * @param bool $raw_output
  264. *
  265. * @return int|string
  266. */
  267. public function getStatus($raw_output = true)
  268. {
  269. if ($raw_output) {
  270. return $this->_status;
  271. }
  272. return self::$_statusToString[$this->_status];
  273. }
  274. /**
  275. * Sends data on the connection.
  276. *
  277. * @param mixed $send_buffer
  278. * @param bool $raw
  279. * @return bool|null
  280. */
  281. public function send($send_buffer, $raw = false)
  282. {
  283. if ($this->_status === self::STATUS_CLOSING || $this->_status === self::STATUS_CLOSED) {
  284. return false;
  285. }
  286. // Try to call protocol::encode($send_buffer) before sending.
  287. if (false === $raw && $this->protocol !== null) {
  288. $send_buffer = $this->protocol::encode($send_buffer, $this);
  289. if ($send_buffer === '') {
  290. return;
  291. }
  292. }
  293. if ($this->_status !== self::STATUS_ESTABLISHED ||
  294. ($this->transport === 'ssl' && $this->_sslHandshakeCompleted !== true)
  295. ) {
  296. if ($this->_sendBuffer && $this->bufferIsFull()) {
  297. ++self::$statistics['send_fail'];
  298. return false;
  299. }
  300. $this->_sendBuffer .= $send_buffer;
  301. $this->checkBufferWillFull();
  302. return;
  303. }
  304. // Attempt to send data directly.
  305. if ($this->_sendBuffer === '') {
  306. if ($this->transport === 'ssl') {
  307. Worker::$globalEvent->onWritable($this->_socket, [$this, 'baseWrite']);
  308. $this->_sendBuffer = $send_buffer;
  309. $this->checkBufferWillFull();
  310. return;
  311. }
  312. $len = 0;
  313. try {
  314. $len = @\fwrite($this->_socket, $send_buffer);
  315. } catch (\Throwable $e) {
  316. Worker::log($e);
  317. }
  318. // send successful.
  319. if ($len === \strlen($send_buffer)) {
  320. $this->bytesWritten += $len;
  321. return true;
  322. }
  323. // Send only part of the data.
  324. if ($len > 0) {
  325. $this->_sendBuffer = \substr($send_buffer, $len);
  326. $this->bytesWritten += $len;
  327. } else {
  328. // Connection closed?
  329. if (!\is_resource($this->_socket) || \feof($this->_socket)) {
  330. ++self::$statistics['send_fail'];
  331. if ($this->onError) {
  332. try {
  333. ($this->onError)($this, static::SEND_FAIL, 'client closed');
  334. } catch (\Throwable $e) {
  335. Worker::stopAll(250, $e);
  336. }
  337. }
  338. $this->destroy();
  339. return false;
  340. }
  341. $this->_sendBuffer = $send_buffer;
  342. }
  343. Worker::$globalEvent->onWritable($this->_socket, [$this, 'baseWrite']);
  344. // Check if the send buffer will be full.
  345. $this->checkBufferWillFull();
  346. return;
  347. }
  348. if ($this->bufferIsFull()) {
  349. ++self::$statistics['send_fail'];
  350. return false;
  351. }
  352. $this->_sendBuffer .= $send_buffer;
  353. // Check if the send buffer is full.
  354. $this->checkBufferWillFull();
  355. }
  356. /**
  357. * Get remote IP.
  358. *
  359. * @return string
  360. */
  361. public function getRemoteIp()
  362. {
  363. $pos = \strrpos($this->_remoteAddress, ':');
  364. if ($pos) {
  365. return (string)\substr($this->_remoteAddress, 0, $pos);
  366. }
  367. return '';
  368. }
  369. /**
  370. * Get remote port.
  371. *
  372. * @return int
  373. */
  374. public function getRemotePort()
  375. {
  376. if ($this->_remoteAddress) {
  377. return (int)\substr(\strrchr($this->_remoteAddress, ':'), 1);
  378. }
  379. return 0;
  380. }
  381. /**
  382. * Get remote address.
  383. *
  384. * @return string
  385. */
  386. public function getRemoteAddress()
  387. {
  388. return $this->_remoteAddress;
  389. }
  390. /**
  391. * Get local IP.
  392. *
  393. * @return string
  394. */
  395. public function getLocalIp()
  396. {
  397. $address = $this->getLocalAddress();
  398. $pos = \strrpos($address, ':');
  399. if (!$pos) {
  400. return '';
  401. }
  402. return \substr($address, 0, $pos);
  403. }
  404. /**
  405. * Get local port.
  406. *
  407. * @return int
  408. */
  409. public function getLocalPort()
  410. {
  411. $address = $this->getLocalAddress();
  412. $pos = \strrpos($address, ':');
  413. if (!$pos) {
  414. return 0;
  415. }
  416. return (int)\substr(\strrchr($address, ':'), 1);
  417. }
  418. /**
  419. * Get local address.
  420. *
  421. * @return string
  422. */
  423. public function getLocalAddress()
  424. {
  425. if (!\is_resource($this->_socket)) {
  426. return '';
  427. }
  428. return (string)@\stream_socket_get_name($this->_socket, false);
  429. }
  430. /**
  431. * Get send buffer queue size.
  432. *
  433. * @return integer
  434. */
  435. public function getSendBufferQueueSize()
  436. {
  437. return \strlen($this->_sendBuffer);
  438. }
  439. /**
  440. * Get recv buffer queue size.
  441. *
  442. * @return integer
  443. */
  444. public function getRecvBufferQueueSize()
  445. {
  446. return \strlen($this->_recvBuffer);
  447. }
  448. /**
  449. * Is ipv4.
  450. *
  451. * return bool.
  452. */
  453. public function isIpV4()
  454. {
  455. if ($this->transport === 'unix') {
  456. return false;
  457. }
  458. return \strpos($this->getRemoteIp(), ':') === false;
  459. }
  460. /**
  461. * Is ipv6.
  462. *
  463. * return bool.
  464. */
  465. public function isIpV6()
  466. {
  467. if ($this->transport === 'unix') {
  468. return false;
  469. }
  470. return \strpos($this->getRemoteIp(), ':') !== false;
  471. }
  472. /**
  473. * Pauses the reading of data. That is onMessage will not be emitted. Useful to throttle back an upload.
  474. *
  475. * @return void
  476. */
  477. public function pauseRecv()
  478. {
  479. Worker::$globalEvent->offReadable($this->_socket);
  480. $this->_isPaused = true;
  481. }
  482. /**
  483. * Resumes reading after a call to pauseRecv.
  484. *
  485. * @return void
  486. */
  487. public function resumeRecv()
  488. {
  489. if ($this->_isPaused === true) {
  490. Worker::$globalEvent->onReadable($this->_socket, [$this, 'baseRead']);
  491. $this->_isPaused = false;
  492. $this->baseRead($this->_socket, false);
  493. }
  494. }
  495. /**
  496. * Base read handler.
  497. *
  498. * @param resource $socket
  499. * @param bool $check_eof
  500. * @return void
  501. */
  502. public function baseRead($socket, $check_eof = true)
  503. {
  504. static $requests = [];
  505. // SSL handshake.
  506. if ($this->transport === 'ssl' && $this->_sslHandshakeCompleted !== true) {
  507. if ($this->doSslHandshake($socket)) {
  508. $this->_sslHandshakeCompleted = true;
  509. if ($this->_sendBuffer) {
  510. Worker::$globalEvent->onWritable($socket, [$this, 'baseWrite']);
  511. }
  512. } else {
  513. return;
  514. }
  515. }
  516. $buffer = '';
  517. try {
  518. $buffer = @\fread($socket, self::READ_BUFFER_SIZE);
  519. } catch (\Throwable $e) {
  520. }
  521. // Check connection closed.
  522. if ($buffer === '' || $buffer === false) {
  523. if ($check_eof && (\feof($socket) || !\is_resource($socket) || $buffer === false)) {
  524. $this->destroy();
  525. return;
  526. }
  527. } else {
  528. $this->bytesRead += \strlen($buffer);
  529. if ($this->_recvBuffer === '') {
  530. if (static::$_enableCache && !isset($requests[512]) && isset($requests[$buffer])) {
  531. ++self::$statistics['total_request'];
  532. $request = $requests[$buffer];
  533. if ($request instanceof Request) {
  534. $request = clone $request;
  535. $requests[$buffer] = $request;
  536. $request->connection = $this;
  537. $this->__request = $request;
  538. $request->properties = [];
  539. }
  540. try {
  541. ($this->onMessage)($this, $request);
  542. } catch (\Throwable $e) {
  543. Worker::stopAll(250, $e);
  544. }
  545. return;
  546. }
  547. $this->_recvBuffer = $buffer;
  548. } else {
  549. $this->_recvBuffer .= $buffer;
  550. }
  551. }
  552. // If the application layer protocol has been set up.
  553. if ($this->protocol !== null) {
  554. while ($this->_recvBuffer !== '' && !$this->_isPaused) {
  555. // The current packet length is known.
  556. if ($this->_currentPackageLength) {
  557. // Data is not enough for a package.
  558. if ($this->_currentPackageLength > \strlen($this->_recvBuffer)) {
  559. break;
  560. }
  561. } else {
  562. // Get current package length.
  563. try {
  564. $this->_currentPackageLength = $this->protocol::input($this->_recvBuffer, $this);
  565. } catch (\Throwable $e) {
  566. }
  567. // The packet length is unknown.
  568. if ($this->_currentPackageLength === 0) {
  569. break;
  570. } elseif ($this->_currentPackageLength > 0 && $this->_currentPackageLength <= $this->maxPackageSize) {
  571. // Data is not enough for a package.
  572. if ($this->_currentPackageLength > \strlen($this->_recvBuffer)) {
  573. break;
  574. }
  575. } // Wrong package.
  576. else {
  577. Worker::safeEcho('Error package. package_length=' . \var_export($this->_currentPackageLength, true));
  578. $this->destroy();
  579. return;
  580. }
  581. }
  582. // The data is enough for a packet.
  583. ++self::$statistics['total_request'];
  584. // The current packet length is equal to the length of the buffer.
  585. if ($one = \strlen($this->_recvBuffer) === $this->_currentPackageLength) {
  586. $one_request_buffer = $this->_recvBuffer;
  587. $this->_recvBuffer = '';
  588. } else {
  589. // Get a full package from the buffer.
  590. $one_request_buffer = \substr($this->_recvBuffer, 0, $this->_currentPackageLength);
  591. // Remove the current package from the receive buffer.
  592. $this->_recvBuffer = \substr($this->_recvBuffer, $this->_currentPackageLength);
  593. }
  594. // Reset the current packet length to 0.
  595. $this->_currentPackageLength = 0;
  596. try {
  597. // Decode request buffer before Emitting onMessage callback.
  598. $request = $this->protocol::decode($one_request_buffer, $this);
  599. if (static::$_enableCache && (!\is_object($request) || $request instanceof Request) && $one && !isset($one_request_buffer[512])) {
  600. $requests[$one_request_buffer] = $request;
  601. if (\count($requests) > 512) {
  602. unset($requests[\key($requests)]);
  603. }
  604. }
  605. ($this->onMessage)($this, $request);
  606. } catch (\Throwable $e) {
  607. Worker::stopAll(250, $e);
  608. }
  609. }
  610. return;
  611. }
  612. if ($this->_recvBuffer === '' || $this->_isPaused) {
  613. return;
  614. }
  615. // Applications protocol is not set.
  616. ++self::$statistics['total_request'];
  617. try {
  618. ($this->onMessage)($this, $this->_recvBuffer);
  619. } catch (\Throwable $e) {
  620. Worker::stopAll(250, $e);
  621. }
  622. // Clean receive buffer.
  623. $this->_recvBuffer = '';
  624. }
  625. /**
  626. * Base write handler.
  627. *
  628. * @return void|bool
  629. */
  630. public function baseWrite()
  631. {
  632. \set_error_handler(function () {
  633. });
  634. if ($this->transport === 'ssl') {
  635. $len = @\fwrite($this->_socket, $this->_sendBuffer, 8192);
  636. } else {
  637. $len = @\fwrite($this->_socket, $this->_sendBuffer);
  638. }
  639. \restore_error_handler();
  640. if ($len === \strlen($this->_sendBuffer)) {
  641. $this->bytesWritten += $len;
  642. Worker::$globalEvent->offWritable($this->_socket);
  643. $this->_sendBuffer = '';
  644. // Try to emit onBufferDrain callback when the send buffer becomes empty.
  645. if ($this->onBufferDrain) {
  646. try {
  647. ($this->onBufferDrain)($this);
  648. } catch (\Throwable $e) {
  649. Worker::stopAll(250, $e);
  650. }
  651. }
  652. if ($this->_status === self::STATUS_CLOSING) {
  653. $this->destroy();
  654. }
  655. return true;
  656. }
  657. if ($len > 0) {
  658. $this->bytesWritten += $len;
  659. $this->_sendBuffer = \substr($this->_sendBuffer, $len);
  660. } else {
  661. ++self::$statistics['send_fail'];
  662. $this->destroy();
  663. }
  664. }
  665. /**
  666. * SSL handshake.
  667. *
  668. * @param resource $socket
  669. * @return bool
  670. */
  671. public function doSslHandshake($socket)
  672. {
  673. if (\feof($socket)) {
  674. $this->destroy();
  675. return false;
  676. }
  677. $async = $this instanceof AsyncTcpConnection;
  678. /**
  679. * We disabled ssl3 because https://blog.qualys.com/ssllabs/2014/10/15/ssl-3-is-dead-killed-by-the-poodle-attack.
  680. * You can enable ssl3 by the codes below.
  681. */
  682. /*if($async){
  683. $type = STREAM_CRYPTO_METHOD_SSLv2_CLIENT | STREAM_CRYPTO_METHOD_SSLv23_CLIENT | STREAM_CRYPTO_METHOD_SSLv3_CLIENT;
  684. }else{
  685. $type = STREAM_CRYPTO_METHOD_SSLv2_SERVER | STREAM_CRYPTO_METHOD_SSLv23_SERVER | STREAM_CRYPTO_METHOD_SSLv3_SERVER;
  686. }*/
  687. if ($async) {
  688. $type = \STREAM_CRYPTO_METHOD_SSLv2_CLIENT | \STREAM_CRYPTO_METHOD_SSLv23_CLIENT;
  689. } else {
  690. $type = \STREAM_CRYPTO_METHOD_SSLv2_SERVER | \STREAM_CRYPTO_METHOD_SSLv23_SERVER;
  691. }
  692. // Hidden error.
  693. \set_error_handler(function ($errno, $errstr, $file) {
  694. if (!Worker::$daemonize) {
  695. Worker::safeEcho("SSL handshake error: $errstr \n");
  696. }
  697. });
  698. $ret = \stream_socket_enable_crypto($socket, true, $type);
  699. \restore_error_handler();
  700. // Negotiation has failed.
  701. if (false === $ret) {
  702. $this->destroy();
  703. return false;
  704. } elseif (0 === $ret) {
  705. // There isn't enough data and should try again.
  706. return 0;
  707. }
  708. if (isset($this->onSslHandshake)) {
  709. try {
  710. ($this->onSslHandshake)($this);
  711. } catch (\Throwable $e) {
  712. Worker::stopAll(250, $e);
  713. }
  714. }
  715. return true;
  716. }
  717. /**
  718. * This method pulls all the data out of a readable stream, and writes it to the supplied destination.
  719. *
  720. * @param self $dest
  721. * @return void
  722. */
  723. public function pipe(self $dest)
  724. {
  725. $source = $this;
  726. $this->onMessage = function ($source, $data) use ($dest) {
  727. $dest->send($data);
  728. };
  729. $this->onClose = function ($source) use ($dest) {
  730. $dest->close();
  731. };
  732. $dest->onBufferFull = function ($dest) use ($source) {
  733. $source->pauseRecv();
  734. };
  735. $dest->onBufferDrain = function ($dest) use ($source) {
  736. $source->resumeRecv();
  737. };
  738. }
  739. /**
  740. * Remove $length of data from receive buffer.
  741. *
  742. * @param int $length
  743. * @return void
  744. */
  745. public function consumeRecvBuffer($length)
  746. {
  747. $this->_recvBuffer = \substr($this->_recvBuffer, $length);
  748. }
  749. /**
  750. * Close connection.
  751. *
  752. * @param mixed $data
  753. * @param bool $raw
  754. * @return void
  755. */
  756. public function close($data = null, $raw = false)
  757. {
  758. if ($this->_status === self::STATUS_CONNECTING) {
  759. $this->destroy();
  760. return;
  761. }
  762. if ($this->_status === self::STATUS_CLOSING || $this->_status === self::STATUS_CLOSED) {
  763. return;
  764. }
  765. if ($data !== null) {
  766. $this->send($data, $raw);
  767. }
  768. $this->_status = self::STATUS_CLOSING;
  769. if ($this->_sendBuffer === '') {
  770. $this->destroy();
  771. } else {
  772. $this->pauseRecv();
  773. }
  774. }
  775. /**
  776. * Get the real socket.
  777. *
  778. * @return resource
  779. */
  780. public function getSocket()
  781. {
  782. return $this->_socket;
  783. }
  784. /**
  785. * Check whether the send buffer will be full.
  786. *
  787. * @return void
  788. */
  789. protected function checkBufferWillFull()
  790. {
  791. if ($this->maxSendBufferSize <= \strlen($this->_sendBuffer)) {
  792. if ($this->onBufferFull) {
  793. try {
  794. ($this->onBufferFull)($this);
  795. } catch (\Throwable $e) {
  796. Worker::stopAll(250, $e);
  797. }
  798. }
  799. }
  800. }
  801. /**
  802. * Whether send buffer is full.
  803. *
  804. * @return bool
  805. */
  806. protected function bufferIsFull()
  807. {
  808. // Buffer has been marked as full but still has data to send then the packet is discarded.
  809. if ($this->maxSendBufferSize <= \strlen($this->_sendBuffer)) {
  810. if ($this->onError) {
  811. try {
  812. ($this->onError)($this, static::SEND_FAIL, 'send buffer full and drop package');
  813. } catch (\Throwable $e) {
  814. Worker::stopAll(250, $e);
  815. }
  816. }
  817. return true;
  818. }
  819. return false;
  820. }
  821. /**
  822. * Whether send buffer is Empty.
  823. *
  824. * @return bool
  825. */
  826. public function bufferIsEmpty()
  827. {
  828. return empty($this->_sendBuffer);
  829. }
  830. /**
  831. * Destroy connection.
  832. *
  833. * @return void
  834. */
  835. public function destroy()
  836. {
  837. // Avoid repeated calls.
  838. if ($this->_status === self::STATUS_CLOSED) {
  839. return;
  840. }
  841. // Remove event listener.
  842. Worker::$globalEvent->offReadable($this->_socket);
  843. Worker::$globalEvent->offWritable($this->_socket);
  844. // Close socket.
  845. try {
  846. @\fclose($this->_socket);
  847. } catch (\Throwable $e) {
  848. }
  849. $this->_status = self::STATUS_CLOSED;
  850. // Try to emit onClose callback.
  851. if ($this->onClose) {
  852. try {
  853. ($this->onClose)($this);
  854. } catch (\Throwable $e) {
  855. Worker::stopAll(250, $e);
  856. }
  857. }
  858. // Try to emit protocol::onClose
  859. if ($this->protocol && \method_exists($this->protocol, 'onClose')) {
  860. try {
  861. ([$this->protocol, 'onClose'])($this);
  862. } catch (\Throwable $e) {
  863. Worker::stopAll(250, $e);
  864. }
  865. }
  866. $this->_sendBuffer = $this->_recvBuffer = '';
  867. $this->_currentPackageLength = 0;
  868. $this->_isPaused = $this->_sslHandshakeCompleted = false;
  869. if ($this->_status === self::STATUS_CLOSED) {
  870. // Cleaning up the callback to avoid memory leaks.
  871. $this->onMessage = $this->onClose = $this->onError = $this->onBufferFull = $this->onBufferDrain = null;
  872. // Remove from worker->connections.
  873. if ($this->worker) {
  874. unset($this->worker->connections[$this->_id]);
  875. }
  876. unset(static::$connections[$this->_id]);
  877. }
  878. }
  879. /**
  880. * Enable or disable Cache.
  881. *
  882. * @param mixed $value
  883. */
  884. public static function enableCache($value)
  885. {
  886. static::$_enableCache = (bool)$value;
  887. }
  888. /**
  889. * Get the json_encode informattion.
  890. *
  891. * @return string
  892. */
  893. public function jsonSerialize()
  894. {
  895. return [
  896. 'id' => $this->id,
  897. 'status' => $this->getStatus(),
  898. 'transport' => $this->transport,
  899. 'getRemoteIp' => $this->getRemoteIp(),
  900. 'remotePort' => $this->getRemotePort(),
  901. 'getRemoteAddress' => $this->getRemoteAddress(),
  902. 'getLocalIp' => $this->getLocalIp(),
  903. 'getLocalPort' => $this->getLocalPort(),
  904. 'getLocalAddress' => $this->getLocalAddress(),
  905. 'isIpV4' => $this->isIpV4(),
  906. 'isIpV6' => $this->isIpV6(),
  907. ];
  908. }
  909. /**
  910. * Destruct.
  911. *
  912. * @return void
  913. */
  914. public function __destruct()
  915. {
  916. static $mod;
  917. self::$statistics['connection_count']--;
  918. if (Worker::getGracefulStop()) {
  919. if (!isset($mod)) {
  920. $mod = \ceil((self::$statistics['connection_count'] + 1) / 3);
  921. }
  922. if (0 === self::$statistics['connection_count'] % $mod) {
  923. Worker::log('worker[' . \posix_getpid() . '] remains ' . self::$statistics['connection_count'] . ' connection(s)');
  924. }
  925. if (0 === self::$statistics['connection_count']) {
  926. Worker::stopAll();
  927. }
  928. }
  929. }
  930. }