AsyncUdpConnection.php 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. <?php
  2. /**
  3. * This file is part of workerman.
  4. *
  5. * Licensed under The MIT License
  6. * For full copyright and license information, please see the MIT-LICENSE.txt
  7. * Redistributions of files must retain the above copyright notice.
  8. *
  9. * @author walkor<walkor@workerman.net>
  10. * @copyright walkor<walkor@workerman.net>
  11. * @link http://www.workerman.net/
  12. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  13. */
  14. declare(strict_types=1);
  15. namespace Workerman\Connection;
  16. use Exception;
  17. use Throwable;
  18. use Workerman\Protocols\ProtocolInterface;
  19. use Workerman\Worker;
  20. /**
  21. * AsyncUdpConnection.
  22. */
  23. class AsyncUdpConnection extends UdpConnection
  24. {
  25. /**
  26. * Emitted when socket connection is successfully established.
  27. *
  28. * @var ?callable
  29. */
  30. public $onConnect = null;
  31. /**
  32. * Emitted when socket connection closed.
  33. *
  34. * @var ?callable
  35. */
  36. public $onClose = null;
  37. /**
  38. * Connected or not.
  39. *
  40. * @var bool
  41. */
  42. protected bool $connected = false;
  43. /**
  44. * Context option.
  45. *
  46. * @var array
  47. */
  48. protected array $contextOption = [];
  49. /**
  50. * Construct.
  51. *
  52. * @param string $remoteAddress
  53. * @throws Exception
  54. */
  55. public function __construct($remoteAddress, $contextOption = [])
  56. {
  57. // Get the application layer communication protocol and listening address.
  58. list($scheme, $address) = explode(':', $remoteAddress, 2);
  59. // Check application layer protocol class.
  60. if ($scheme !== 'udp') {
  61. $scheme = ucfirst($scheme);
  62. $this->protocol = '\\Protocols\\' . $scheme;
  63. if (!class_exists($this->protocol)) {
  64. $this->protocol = "\\Workerman\\Protocols\\$scheme";
  65. if (!class_exists($this->protocol)) {
  66. throw new Exception("class \\Protocols\\$scheme not exist");
  67. }
  68. }
  69. }
  70. $this->remoteAddress = substr($address, 2);
  71. $this->contextOption = $contextOption;
  72. }
  73. /**
  74. * For udp package.
  75. *
  76. * @param resource $socket
  77. * @return void
  78. * @throws Throwable
  79. */
  80. public function baseRead($socket): void
  81. {
  82. $recvBuffer = stream_socket_recvfrom($socket, static::MAX_UDP_PACKAGE_SIZE, 0, $remoteAddress);
  83. if (false === $recvBuffer || empty($remoteAddress)) {
  84. return;
  85. }
  86. if ($this->onMessage) {
  87. if ($this->protocol) {
  88. /** @var ProtocolInterface $parser */
  89. $parser = $this->protocol;
  90. $recvBuffer = $parser::decode($recvBuffer, $this);
  91. }
  92. ++ConnectionInterface::$statistics['total_request'];
  93. try {
  94. ($this->onMessage)($this, $recvBuffer);
  95. } catch (Throwable $e) {
  96. $this->error($e);
  97. }
  98. }
  99. }
  100. /**
  101. * Close connection.
  102. *
  103. * @param mixed|null $data
  104. * @param bool $raw
  105. * @return void
  106. * @throws Throwable
  107. */
  108. public function close(mixed $data = null, bool $raw = false): void
  109. {
  110. if ($data !== null) {
  111. $this->send($data, $raw);
  112. }
  113. $this->eventLoop->offReadable($this->socket);
  114. fclose($this->socket);
  115. $this->connected = false;
  116. // Try to emit onClose callback.
  117. if ($this->onClose) {
  118. try {
  119. ($this->onClose)($this);
  120. } catch (Throwable $e) {
  121. $this->error($e);
  122. }
  123. }
  124. $this->onConnect = $this->onMessage = $this->onClose = $this->eventLoop = $this->errorHandler = null;
  125. }
  126. /**
  127. * Sends data on the connection.
  128. *
  129. * @param mixed $sendBuffer
  130. * @param bool $raw
  131. * @return void|boolean
  132. * @throws Throwable
  133. */
  134. public function send(mixed $sendBuffer, bool $raw = false)
  135. {
  136. if (false === $raw && $this->protocol) {
  137. /** @var ProtocolInterface $parser */
  138. $parser = $this->protocol;
  139. $sendBuffer = $parser::encode($sendBuffer, $this);
  140. if ($sendBuffer === '') {
  141. return;
  142. }
  143. }
  144. if ($this->connected === false) {
  145. $this->connect();
  146. }
  147. return strlen($sendBuffer) === stream_socket_sendto($this->socket, $sendBuffer);
  148. }
  149. /**
  150. * Connect.
  151. *
  152. * @return void
  153. * @throws Throwable
  154. */
  155. public function connect(): void
  156. {
  157. if ($this->connected === true) {
  158. return;
  159. }
  160. if (!$this->eventLoop) {
  161. $this->eventLoop = Worker::$globalEvent;
  162. }
  163. if ($this->contextOption) {
  164. $context = stream_context_create($this->contextOption);
  165. $this->socket = stream_socket_client("udp://$this->remoteAddress", $errno, $errmsg,
  166. 30, STREAM_CLIENT_CONNECT, $context);
  167. } else {
  168. $this->socket = stream_socket_client("udp://$this->remoteAddress", $errno, $errmsg);
  169. }
  170. if (!$this->socket) {
  171. Worker::safeEcho((string)(new Exception($errmsg)));
  172. $this->eventLoop = null;
  173. return;
  174. }
  175. stream_set_blocking($this->socket, false);
  176. if ($this->onMessage) {
  177. $this->eventLoop->onReadable($this->socket, [$this, 'baseRead']);
  178. }
  179. $this->connected = true;
  180. // Try to emit onConnect callback.
  181. if ($this->onConnect) {
  182. try {
  183. ($this->onConnect)($this);
  184. } catch (Throwable $e) {
  185. $this->error($e);
  186. }
  187. }
  188. }
  189. }