TcpConnection.php 28 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066
  1. <?php
  2. /**
  3. * This file is part of workerman.
  4. *
  5. * Licensed under The MIT License
  6. * For full copyright and license information, please see the MIT-LICENSE.txt
  7. * Redistributions of files must retain the above copyright notice.
  8. *
  9. * @author walkor<walkor@workerman.net>
  10. * @copyright walkor<walkor@workerman.net>
  11. * @link http://www.workerman.net/
  12. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  13. */
  14. declare(strict_types=1);
  15. namespace Workerman\Connection;
  16. use JetBrains\PhpStorm\Pure;
  17. use JsonSerializable;
  18. use stdClass;
  19. use Throwable;
  20. use Workerman\Events\EventInterface;
  21. use Workerman\Protocols\Http\Request;
  22. use Workerman\Protocols\ProtocolInterface;
  23. use Workerman\Worker;
  24. /**
  25. * TcpConnection.
  26. * @property string websocketType
  27. */
  28. class TcpConnection extends ConnectionInterface implements JsonSerializable
  29. {
  30. /**
  31. * Read buffer size.
  32. *
  33. * @var int
  34. */
  35. const READ_BUFFER_SIZE = 87380;
  36. /**
  37. * Status initial.
  38. *
  39. * @var int
  40. */
  41. const STATUS_INITIAL = 0;
  42. /**
  43. * Status connecting.
  44. *
  45. * @var int
  46. */
  47. const STATUS_CONNECTING = 1;
  48. /**
  49. * Status connection established.
  50. *
  51. * @var int
  52. */
  53. const STATUS_ESTABLISHED = 2;
  54. /**
  55. * Status closing.
  56. *
  57. * @var int
  58. */
  59. const STATUS_CLOSING = 4;
  60. /**
  61. * Status closed.
  62. *
  63. * @var int
  64. */
  65. const STATUS_CLOSED = 8;
  66. /**
  67. * Emitted when socket connection is successfully established.
  68. *
  69. * @var ?callable
  70. */
  71. public $onConnect = null;
  72. /**
  73. * Emitted when websocket handshake completed (Only work when protocol is ws).
  74. *
  75. * @var ?callable
  76. */
  77. public $onWebSocketConnect = null;
  78. /**
  79. * Emitted when data is received.
  80. *
  81. * @var ?callable
  82. */
  83. public $onMessage = null;
  84. /**
  85. * Emitted when the other end of the socket sends a FIN packet.
  86. *
  87. * @var ?callable
  88. */
  89. public $onClose = null;
  90. /**
  91. * Emitted when an error occurs with connection.
  92. *
  93. * @var ?callable
  94. */
  95. public $onError = null;
  96. /**
  97. * Emitted when the send buffer becomes full.
  98. *
  99. * @var ?callable
  100. */
  101. public $onBufferFull = null;
  102. /**
  103. * Emitted when send buffer becomes empty.
  104. *
  105. * @var ?callable
  106. */
  107. public $onBufferDrain = null;
  108. /**
  109. * Transport (tcp/udp/unix/ssl).
  110. *
  111. * @var string
  112. */
  113. public string $transport = 'tcp';
  114. /**
  115. * Which worker belong to.
  116. *
  117. * @var ?Worker
  118. */
  119. public ?Worker $worker = null;
  120. /**
  121. * Bytes read.
  122. *
  123. * @var int
  124. */
  125. public int $bytesRead = 0;
  126. /**
  127. * Bytes written.
  128. *
  129. * @var int
  130. */
  131. public int $bytesWritten = 0;
  132. /**
  133. * Connection->id.
  134. *
  135. * @var int
  136. */
  137. public int $id = 0;
  138. /**
  139. * A copy of $worker->id which used to clean up the connection in worker->connections
  140. *
  141. * @var int
  142. */
  143. protected int $realId = 0;
  144. /**
  145. * Sets the maximum send buffer size for the current connection.
  146. * OnBufferFull callback will be emitted When send buffer is full.
  147. *
  148. * @var int
  149. */
  150. public int $maxSendBufferSize = 1048576;
  151. /**
  152. * Context.
  153. *
  154. * @var ?stdClass
  155. */
  156. public ?stdClass $context = null;
  157. /**
  158. * @var array
  159. */
  160. public array $headers = [];
  161. /**
  162. * @var ?Request
  163. */
  164. public ?Request $request = null;
  165. /**
  166. * Default send buffer size.
  167. *
  168. * @var int
  169. */
  170. public static int $defaultMaxSendBufferSize = 1048576;
  171. /**
  172. * Sets the maximum acceptable packet size for the current connection.
  173. *
  174. * @var int
  175. */
  176. public int $maxPackageSize = 1048576;
  177. /**
  178. * Default maximum acceptable packet size.
  179. *
  180. * @var int
  181. */
  182. public static int $defaultMaxPackageSize = 10485760;
  183. /**
  184. * Id recorder.
  185. *
  186. * @var int
  187. */
  188. protected static int $idRecorder = 1;
  189. /**
  190. * Cache.
  191. *
  192. * @var bool.
  193. */
  194. protected static bool $enableCache = true;
  195. /**
  196. * Socket
  197. *
  198. * @var resource
  199. */
  200. protected $socket = null;
  201. /**
  202. * Send buffer.
  203. *
  204. * @var string
  205. */
  206. protected string $sendBuffer = '';
  207. /**
  208. * Receive buffer.
  209. *
  210. * @var string
  211. */
  212. protected string $recvBuffer = '';
  213. /**
  214. * Current package length.
  215. *
  216. * @var int
  217. */
  218. protected int $currentPackageLength = 0;
  219. /**
  220. * Connection status.
  221. *
  222. * @var int
  223. */
  224. protected int $status = self::STATUS_ESTABLISHED;
  225. /**
  226. * Remote address.
  227. *
  228. * @var string
  229. */
  230. protected string $remoteAddress = '';
  231. /**
  232. * Is paused.
  233. *
  234. * @var bool
  235. */
  236. protected bool $isPaused = false;
  237. /**
  238. * SSL handshake completed or not.
  239. *
  240. * @var bool
  241. */
  242. protected bool|int $sslHandshakeCompleted = false;
  243. /**
  244. * All connection instances.
  245. *
  246. * @var array
  247. */
  248. public static array $connections = [];
  249. /**
  250. * Status to string.
  251. *
  252. * @var array
  253. */
  254. const STATUS_TO_STRING = [
  255. self::STATUS_INITIAL => 'INITIAL',
  256. self::STATUS_CONNECTING => 'CONNECTING',
  257. self::STATUS_ESTABLISHED => 'ESTABLISHED',
  258. self::STATUS_CLOSING => 'CLOSING',
  259. self::STATUS_CLOSED => 'CLOSED',
  260. ];
  261. /**
  262. * Construct.
  263. *
  264. * @param EventInterface $eventLoop
  265. * @param resource $socket
  266. * @param string $remoteAddress
  267. */
  268. public function __construct(EventInterface $eventLoop, $socket, string $remoteAddress = '')
  269. {
  270. ++self::$statistics['connection_count'];
  271. $this->id = $this->realId = self::$idRecorder++;
  272. if (self::$idRecorder === PHP_INT_MAX) {
  273. self::$idRecorder = 0;
  274. }
  275. $this->socket = $socket;
  276. stream_set_blocking($this->socket, false);
  277. // Compatible with hhvm
  278. if (function_exists('stream_set_read_buffer')) {
  279. stream_set_read_buffer($this->socket, 0);
  280. }
  281. $this->eventLoop = $eventLoop;
  282. $this->eventLoop->onReadable($this->socket, [$this, 'baseRead']);
  283. $this->maxSendBufferSize = self::$defaultMaxSendBufferSize;
  284. $this->maxPackageSize = self::$defaultMaxPackageSize;
  285. $this->remoteAddress = $remoteAddress;
  286. static::$connections[$this->id] = $this;
  287. $this->context = new stdClass;
  288. }
  289. /**
  290. * Get status.
  291. *
  292. * @param bool $rawOutput
  293. *
  294. * @return int|string
  295. */
  296. public function getStatus(bool $rawOutput = true): int|string
  297. {
  298. if ($rawOutput) {
  299. return $this->status;
  300. }
  301. return self::STATUS_TO_STRING[$this->status];
  302. }
  303. /**
  304. * Sends data on the connection.
  305. *
  306. * @param mixed $sendBuffer
  307. * @param bool $raw
  308. * @return bool|void
  309. * @throws Throwable
  310. */
  311. public function send(mixed $sendBuffer, bool $raw = false)
  312. {
  313. if ($this->status === self::STATUS_CLOSING || $this->status === self::STATUS_CLOSED) {
  314. return false;
  315. }
  316. // Try to call protocol::encode($sendBuffer) before sending.
  317. if (false === $raw && $this->protocol !== null) {
  318. /** @var ProtocolInterface $parser */
  319. $parser = $this->protocol;
  320. $sendBuffer = $parser::encode($sendBuffer, $this);
  321. if ($sendBuffer === '') {
  322. return;
  323. }
  324. }
  325. if ($this->status !== self::STATUS_ESTABLISHED ||
  326. ($this->transport === 'ssl' && $this->sslHandshakeCompleted !== true)
  327. ) {
  328. if ($this->sendBuffer && $this->bufferIsFull()) {
  329. ++self::$statistics['send_fail'];
  330. return false;
  331. }
  332. $this->sendBuffer .= $sendBuffer;
  333. $this->checkBufferWillFull();
  334. return;
  335. }
  336. // Attempt to send data directly.
  337. if ($this->sendBuffer === '') {
  338. if ($this->transport === 'ssl') {
  339. $this->eventLoop->onWritable($this->socket, [$this, 'baseWrite']);
  340. $this->sendBuffer = $sendBuffer;
  341. $this->checkBufferWillFull();
  342. return;
  343. }
  344. $len = 0;
  345. try {
  346. $len = @fwrite($this->socket, $sendBuffer);
  347. } catch (Throwable $e) {
  348. Worker::log($e);
  349. }
  350. // send successful.
  351. if ($len === strlen($sendBuffer)) {
  352. $this->bytesWritten += $len;
  353. return true;
  354. }
  355. // Send only part of the data.
  356. if ($len > 0) {
  357. $this->sendBuffer = substr($sendBuffer, $len);
  358. $this->bytesWritten += $len;
  359. } else {
  360. // Connection closed?
  361. if (!is_resource($this->socket) || feof($this->socket)) {
  362. ++self::$statistics['send_fail'];
  363. if ($this->onError) {
  364. try {
  365. ($this->onError)($this, static::SEND_FAIL, 'client closed');
  366. } catch (Throwable $e) {
  367. $this->error($e);
  368. }
  369. }
  370. $this->destroy();
  371. return false;
  372. }
  373. $this->sendBuffer = $sendBuffer;
  374. }
  375. $this->eventLoop->onWritable($this->socket, [$this, 'baseWrite']);
  376. // Check if send buffer will be full.
  377. $this->checkBufferWillFull();
  378. return;
  379. }
  380. if ($this->bufferIsFull()) {
  381. ++self::$statistics['send_fail'];
  382. return false;
  383. }
  384. $this->sendBuffer .= $sendBuffer;
  385. // Check if send buffer is full.
  386. $this->checkBufferWillFull();
  387. }
  388. /**
  389. * Get remote IP.
  390. *
  391. * @return string
  392. */
  393. public function getRemoteIp(): string
  394. {
  395. $pos = strrpos($this->remoteAddress, ':');
  396. if ($pos) {
  397. return substr($this->remoteAddress, 0, $pos);
  398. }
  399. return '';
  400. }
  401. /**
  402. * Get remote port.
  403. *
  404. * @return int
  405. */
  406. public function getRemotePort(): int
  407. {
  408. if ($this->remoteAddress) {
  409. return (int)substr(strrchr($this->remoteAddress, ':'), 1);
  410. }
  411. return 0;
  412. }
  413. /**
  414. * Get remote address.
  415. *
  416. * @return string
  417. */
  418. public function getRemoteAddress(): string
  419. {
  420. return $this->remoteAddress;
  421. }
  422. /**
  423. * Get local IP.
  424. *
  425. * @return string
  426. */
  427. public function getLocalIp(): string
  428. {
  429. $address = $this->getLocalAddress();
  430. $pos = strrpos($address, ':');
  431. if (!$pos) {
  432. return '';
  433. }
  434. return substr($address, 0, $pos);
  435. }
  436. /**
  437. * Get local port.
  438. *
  439. * @return int
  440. */
  441. public function getLocalPort(): int
  442. {
  443. $address = $this->getLocalAddress();
  444. $pos = strrpos($address, ':');
  445. if (!$pos) {
  446. return 0;
  447. }
  448. return (int)substr(strrchr($address, ':'), 1);
  449. }
  450. /**
  451. * Get local address.
  452. *
  453. * @return string
  454. */
  455. public function getLocalAddress(): string
  456. {
  457. if (!is_resource($this->socket)) {
  458. return '';
  459. }
  460. return (string)@stream_socket_get_name($this->socket, false);
  461. }
  462. /**
  463. * Get send buffer queue size.
  464. *
  465. * @return integer
  466. */
  467. public function getSendBufferQueueSize(): int
  468. {
  469. return strlen($this->sendBuffer);
  470. }
  471. /**
  472. * Get receive buffer queue size.
  473. *
  474. * @return integer
  475. */
  476. public function getRecvBufferQueueSize(): int
  477. {
  478. return strlen($this->recvBuffer);
  479. }
  480. /**
  481. * Pauses the reading of data. That is onMessage will not be emitted. Useful to throttle back an upload.
  482. *
  483. * @return void
  484. */
  485. public function pauseRecv(): void
  486. {
  487. $this->eventLoop->offReadable($this->socket);
  488. $this->isPaused = true;
  489. }
  490. /**
  491. * Resumes reading after a call to pauseRecv.
  492. *
  493. * @return void
  494. * @throws Throwable
  495. */
  496. public function resumeRecv(): void
  497. {
  498. if ($this->isPaused === true) {
  499. $this->eventLoop->onReadable($this->socket, [$this, 'baseRead']);
  500. $this->isPaused = false;
  501. $this->baseRead($this->socket, false);
  502. }
  503. }
  504. /**
  505. * Base read handler.
  506. *
  507. * @param resource $socket
  508. * @param bool $checkEof
  509. * @return void
  510. * @throws Throwable
  511. */
  512. public function baseRead($socket, bool $checkEof = true): void
  513. {
  514. static $requests = [];
  515. // SSL handshake.
  516. if ($this->transport === 'ssl' && $this->sslHandshakeCompleted !== true) {
  517. if ($this->doSslHandshake($socket)) {
  518. $this->sslHandshakeCompleted = true;
  519. if ($this->sendBuffer) {
  520. $this->eventLoop->onWritable($socket, [$this, 'baseWrite']);
  521. }
  522. } else {
  523. return;
  524. }
  525. }
  526. $buffer = '';
  527. try {
  528. $buffer = @fread($socket, self::READ_BUFFER_SIZE);
  529. } catch (Throwable) {
  530. }
  531. // Check connection closed.
  532. if ($buffer === '' || $buffer === false) {
  533. if ($checkEof && (feof($socket) || !is_resource($socket) || $buffer === false)) {
  534. $this->destroy();
  535. return;
  536. }
  537. } else {
  538. $this->bytesRead += strlen($buffer);
  539. if ($this->recvBuffer === '') {
  540. if (static::$enableCache && !isset($buffer[512]) && isset($requests[$buffer])) {
  541. ++self::$statistics['total_request'];
  542. $request = $requests[$buffer];
  543. if ($request instanceof Request) {
  544. $request = clone $request;
  545. $requests[$buffer] = $request;
  546. $request->connection = $this;
  547. $this->request = $request;
  548. $request->properties = [];
  549. }
  550. try {
  551. ($this->onMessage)($this, $request);
  552. } catch (Throwable $e) {
  553. $this->error($e);
  554. }
  555. return;
  556. }
  557. $this->recvBuffer = $buffer;
  558. } else {
  559. $this->recvBuffer .= $buffer;
  560. }
  561. }
  562. // If the application layer protocol has been set up.
  563. if ($this->protocol !== null) {
  564. while ($this->recvBuffer !== '' && !$this->isPaused) {
  565. // The current packet length is known.
  566. if ($this->currentPackageLength) {
  567. // Data is not enough for a package.
  568. if ($this->currentPackageLength > strlen($this->recvBuffer)) {
  569. break;
  570. }
  571. } else {
  572. // Get current package length.
  573. try {
  574. /** @var ProtocolInterface $parser */
  575. $parser = $this->protocol;
  576. $this->currentPackageLength = $parser::input($this->recvBuffer, $this);
  577. } catch (Throwable) {
  578. }
  579. // The packet length is unknown.
  580. if ($this->currentPackageLength === 0) {
  581. break;
  582. } elseif ($this->currentPackageLength > 0 && $this->currentPackageLength <= $this->maxPackageSize) {
  583. // Data is not enough for a package.
  584. if ($this->currentPackageLength > strlen($this->recvBuffer)) {
  585. break;
  586. }
  587. } // Wrong package.
  588. else {
  589. Worker::safeEcho('Error package. package_length=' . var_export($this->currentPackageLength, true));
  590. $this->destroy();
  591. return;
  592. }
  593. }
  594. // The data is enough for a packet.
  595. ++self::$statistics['total_request'];
  596. // The current packet length is equal to the length of the buffer.
  597. if ($one = strlen($this->recvBuffer) === $this->currentPackageLength) {
  598. $oneRequestBuffer = $this->recvBuffer;
  599. $this->recvBuffer = '';
  600. } else {
  601. // Get a full package from the buffer.
  602. $oneRequestBuffer = substr($this->recvBuffer, 0, $this->currentPackageLength);
  603. // Remove the current package from receive buffer.
  604. $this->recvBuffer = substr($this->recvBuffer, $this->currentPackageLength);
  605. }
  606. // Reset the current packet length to 0.
  607. $this->currentPackageLength = 0;
  608. try {
  609. // Decode request buffer before Emitting onMessage callback.
  610. /** @var ProtocolInterface $parser */
  611. $parser = $this->protocol;
  612. $request = $parser::decode($oneRequestBuffer, $this);
  613. if (static::$enableCache && (!is_object($request) || $request instanceof Request) && $one && !isset($oneRequestBuffer[512])) {
  614. $requests[$oneRequestBuffer] = $request;
  615. if (count($requests) > 512) {
  616. unset($requests[key($requests)]);
  617. }
  618. }
  619. ($this->onMessage)($this, $request);
  620. } catch (Throwable $e) {
  621. $this->error($e);
  622. }
  623. }
  624. return;
  625. }
  626. if ($this->recvBuffer === '' || $this->isPaused) {
  627. return;
  628. }
  629. // Applications protocol is not set.
  630. ++self::$statistics['total_request'];
  631. try {
  632. ($this->onMessage)($this, $this->recvBuffer);
  633. } catch (Throwable $e) {
  634. $this->error($e);
  635. }
  636. // Clean receive buffer.
  637. $this->recvBuffer = '';
  638. }
  639. /**
  640. * Base write handler.
  641. *
  642. * @return void
  643. * @throws Throwable
  644. */
  645. public function baseWrite(): void
  646. {
  647. $len = 0;
  648. try {
  649. if ($this->transport === 'ssl') {
  650. $len = @fwrite($this->socket, $this->sendBuffer, 8192);
  651. } else {
  652. $len = @fwrite($this->socket, $this->sendBuffer);
  653. }
  654. } catch (Throwable) {
  655. }
  656. if ($len === strlen($this->sendBuffer)) {
  657. $this->bytesWritten += $len;
  658. $this->eventLoop->offWritable($this->socket);
  659. $this->sendBuffer = '';
  660. // Try to emit onBufferDrain callback when send buffer becomes empty.
  661. if ($this->onBufferDrain) {
  662. try {
  663. ($this->onBufferDrain)($this);
  664. } catch (Throwable $e) {
  665. $this->error($e);
  666. }
  667. }
  668. if ($this->status === self::STATUS_CLOSING) {
  669. if ($this->context->streamSending) {
  670. return;
  671. }
  672. $this->destroy();
  673. }
  674. return;
  675. }
  676. if ($len > 0) {
  677. $this->bytesWritten += $len;
  678. $this->sendBuffer = substr($this->sendBuffer, $len);
  679. } else {
  680. ++self::$statistics['send_fail'];
  681. $this->destroy();
  682. }
  683. }
  684. /**
  685. * SSL handshake.
  686. *
  687. * @param resource $socket
  688. * @return bool|int
  689. * @throws Throwable
  690. */
  691. public function doSslHandshake($socket): bool|int
  692. {
  693. if (feof($socket)) {
  694. $this->destroy();
  695. return false;
  696. }
  697. $async = $this instanceof AsyncTcpConnection;
  698. /**
  699. * We disabled ssl3 because https://blog.qualys.com/ssllabs/2014/10/15/ssl-3-is-dead-killed-by-the-poodle-attack.
  700. * You can enable ssl3 by the codes below.
  701. */
  702. /*if($async){
  703. $type = STREAM_CRYPTO_METHOD_SSLv2_CLIENT | STREAM_CRYPTO_METHOD_SSLv23_CLIENT | STREAM_CRYPTO_METHOD_SSLv3_CLIENT;
  704. }else{
  705. $type = STREAM_CRYPTO_METHOD_SSLv2_SERVER | STREAM_CRYPTO_METHOD_SSLv23_SERVER | STREAM_CRYPTO_METHOD_SSLv3_SERVER;
  706. }*/
  707. if ($async) {
  708. $type = STREAM_CRYPTO_METHOD_SSLv2_CLIENT | STREAM_CRYPTO_METHOD_SSLv23_CLIENT;
  709. } else {
  710. $type = STREAM_CRYPTO_METHOD_SSLv2_SERVER | STREAM_CRYPTO_METHOD_SSLv23_SERVER;
  711. }
  712. // Hidden error.
  713. set_error_handler(function ($errno, $err_str) {
  714. if (!Worker::$daemonize) {
  715. Worker::safeEcho("SSL handshake error: $err_str \n");
  716. }
  717. });
  718. $ret = stream_socket_enable_crypto($socket, true, $type);
  719. restore_error_handler();
  720. // Negotiation has failed.
  721. if (false === $ret) {
  722. $this->destroy();
  723. return false;
  724. } elseif (0 === $ret) {
  725. // There isn't enough data and should try again.
  726. return 0;
  727. }
  728. return true;
  729. }
  730. /**
  731. * This method pulls all the data out of a readable stream, and writes it to the supplied destination.
  732. *
  733. * @param self $dest
  734. * @return void
  735. */
  736. public function pipe(self $dest): void
  737. {
  738. $source = $this;
  739. $this->onMessage = function ($source, $data) use ($dest) {
  740. $dest->send($data);
  741. };
  742. $this->onClose = function () use ($dest) {
  743. $dest->close();
  744. };
  745. $dest->onBufferFull = function () use ($source) {
  746. $source->pauseRecv();
  747. };
  748. $dest->onBufferDrain = function () use ($source) {
  749. $source->resumeRecv();
  750. };
  751. }
  752. /**
  753. * Remove $length of data from receive buffer.
  754. *
  755. * @param int $length
  756. * @return void
  757. */
  758. public function consumeRecvBuffer(int $length): void
  759. {
  760. $this->recvBuffer = substr($this->recvBuffer, $length);
  761. }
  762. /**
  763. * Close connection.
  764. *
  765. * @param mixed|null $data
  766. * @param bool $raw
  767. * @return void
  768. * @throws Throwable
  769. */
  770. public function close(mixed $data = null, bool $raw = false): void
  771. {
  772. if ($this->status === self::STATUS_CONNECTING) {
  773. $this->destroy();
  774. return;
  775. }
  776. if ($this->status === self::STATUS_CLOSING || $this->status === self::STATUS_CLOSED) {
  777. return;
  778. }
  779. if ($data !== null) {
  780. $this->send($data, $raw);
  781. }
  782. $this->status = self::STATUS_CLOSING;
  783. if ($this->sendBuffer === '') {
  784. $this->destroy();
  785. } else {
  786. $this->pauseRecv();
  787. }
  788. }
  789. /**
  790. * Is ipv4.
  791. *
  792. * return bool.
  793. */
  794. #[Pure]
  795. public function isIpV4(): bool
  796. {
  797. if ($this->transport === 'unix') {
  798. return false;
  799. }
  800. return !str_contains($this->getRemoteIp(), ':');
  801. }
  802. /**
  803. * Is ipv6.
  804. *
  805. * return bool.
  806. */
  807. #[Pure]
  808. public function isIpV6(): bool
  809. {
  810. if ($this->transport === 'unix') {
  811. return false;
  812. }
  813. return str_contains($this->getRemoteIp(), ':');
  814. }
  815. /**
  816. * Get the real socket.
  817. *
  818. * @return resource
  819. */
  820. public function getSocket()
  821. {
  822. return $this->socket;
  823. }
  824. /**
  825. * Check whether send buffer will be full.
  826. *
  827. * @return void
  828. * @throws Throwable
  829. */
  830. protected function checkBufferWillFull(): void
  831. {
  832. if ($this->maxSendBufferSize <= strlen($this->sendBuffer)) {
  833. if ($this->onBufferFull) {
  834. try {
  835. ($this->onBufferFull)($this);
  836. } catch (Throwable $e) {
  837. $this->error($e);
  838. }
  839. }
  840. }
  841. }
  842. /**
  843. * Whether send buffer is full.
  844. *
  845. * @return bool
  846. * @throws Throwable
  847. */
  848. protected function bufferIsFull(): bool
  849. {
  850. // Buffer has been marked as full but still has data to send then the packet is discarded.
  851. if ($this->maxSendBufferSize <= strlen($this->sendBuffer)) {
  852. if ($this->onError) {
  853. try {
  854. ($this->onError)($this, static::SEND_FAIL, 'send buffer full and drop package');
  855. } catch (Throwable $e) {
  856. $this->error($e);
  857. }
  858. }
  859. return true;
  860. }
  861. return false;
  862. }
  863. /**
  864. * Whether send buffer is Empty.
  865. *
  866. * @return bool
  867. */
  868. public function bufferIsEmpty(): bool
  869. {
  870. return empty($this->sendBuffer);
  871. }
  872. /**
  873. * Destroy connection.
  874. *
  875. * @return void
  876. * @throws Throwable
  877. */
  878. public function destroy(): void
  879. {
  880. // Avoid repeated calls.
  881. if ($this->status === self::STATUS_CLOSED) {
  882. return;
  883. }
  884. // Remove event listener.
  885. $this->eventLoop->offReadable($this->socket);
  886. $this->eventLoop->offWritable($this->socket);
  887. // Close socket.
  888. try {
  889. @fclose($this->socket);
  890. } catch (Throwable) {
  891. }
  892. $this->status = self::STATUS_CLOSED;
  893. // Try to emit onClose callback.
  894. if ($this->onClose) {
  895. try {
  896. ($this->onClose)($this);
  897. } catch (Throwable $e) {
  898. $this->error($e);
  899. }
  900. }
  901. // Try to emit protocol::onClose
  902. if ($this->protocol && method_exists($this->protocol, 'onClose')) {
  903. try {
  904. ([$this->protocol, 'onClose'])($this);
  905. } catch (Throwable $e) {
  906. $this->error($e);
  907. }
  908. }
  909. $this->sendBuffer = $this->recvBuffer = '';
  910. $this->currentPackageLength = 0;
  911. $this->isPaused = $this->sslHandshakeCompleted = false;
  912. if ($this->status === self::STATUS_CLOSED) {
  913. // Cleaning up the callback to avoid memory leaks.
  914. $this->onMessage = $this->onClose = $this->onError = $this->onBufferFull = $this->onBufferDrain = $this->eventLoop = $this->errorHandler = null;
  915. // Remove from worker->connections.
  916. if ($this->worker) {
  917. unset($this->worker->connections[$this->realId]);
  918. }
  919. unset(static::$connections[$this->realId]);
  920. }
  921. }
  922. /**
  923. * Enable or disable Cache.
  924. *
  925. * @param mixed $value
  926. */
  927. public static function enableCache(bool $value = true)
  928. {
  929. static::$enableCache = $value;
  930. }
  931. /**
  932. * Get the json_encode information.
  933. *
  934. * @return array
  935. */
  936. public function jsonSerialize(): array
  937. {
  938. return [
  939. 'id' => $this->id,
  940. 'status' => $this->getStatus(),
  941. 'transport' => $this->transport,
  942. 'getRemoteIp' => $this->getRemoteIp(),
  943. 'remotePort' => $this->getRemotePort(),
  944. 'getRemoteAddress' => $this->getRemoteAddress(),
  945. 'getLocalIp' => $this->getLocalIp(),
  946. 'getLocalPort' => $this->getLocalPort(),
  947. 'getLocalAddress' => $this->getLocalAddress(),
  948. 'isIpV4' => $this->isIpV4(),
  949. 'isIpV6' => $this->isIpV6(),
  950. ];
  951. }
  952. /**
  953. * Destruct.
  954. *
  955. * @return void
  956. */
  957. public function __destruct()
  958. {
  959. static $mod;
  960. self::$statistics['connection_count']--;
  961. if (Worker::getGracefulStop()) {
  962. if (!isset($mod)) {
  963. $mod = ceil((self::$statistics['connection_count'] + 1) / 3);
  964. }
  965. if (0 === self::$statistics['connection_count'] % $mod) {
  966. $pid = function_exists('posix_getpid') ? posix_getpid() : 0;
  967. Worker::log('worker[' . $pid . '] remains ' . self::$statistics['connection_count'] . ' connection(s)');
  968. }
  969. if (0 === self::$statistics['connection_count']) {
  970. Worker::stopAll();
  971. }
  972. }
  973. }
  974. }