AsyncUdpConnection.php 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. <?php
  2. /**
  3. * This file is part of workerman.
  4. *
  5. * Licensed under The MIT License
  6. * For full copyright and license information, please see the MIT-LICENSE.txt
  7. * Redistributions of files must retain the above copyright notice.
  8. *
  9. * @author walkor<walkor@workerman.net>
  10. * @copyright walkor<walkor@workerman.net>
  11. * @link http://www.workerman.net/
  12. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  13. */
  14. declare(strict_types=1);
  15. namespace Workerman\Connection;
  16. use Exception;
  17. use Throwable;
  18. use Workerman\Protocols\ProtocolInterface;
  19. use Workerman\Worker;
  20. use function class_exists;
  21. use function explode;
  22. use function fclose;
  23. use function stream_context_create;
  24. use function stream_set_blocking;
  25. use function stream_socket_client;
  26. use function stream_socket_recvfrom;
  27. use function stream_socket_sendto;
  28. use function strlen;
  29. use function substr;
  30. use function ucfirst;
  31. use const STREAM_CLIENT_CONNECT;
  32. /**
  33. * AsyncUdpConnection.
  34. */
  35. class AsyncUdpConnection extends UdpConnection
  36. {
  37. /**
  38. * Emitted when socket connection is successfully established.
  39. *
  40. * @var ?callable
  41. */
  42. public $onConnect = null;
  43. /**
  44. * Emitted when socket connection closed.
  45. *
  46. * @var ?callable
  47. */
  48. public $onClose = null;
  49. /**
  50. * Connected or not.
  51. *
  52. * @var bool
  53. */
  54. protected bool $connected = false;
  55. /**
  56. * Context option.
  57. *
  58. * @var array
  59. */
  60. protected array $contextOption = [];
  61. /**
  62. * Construct.
  63. *
  64. * @param string $remoteAddress
  65. * @throws Exception
  66. */
  67. public function __construct($remoteAddress, $contextOption = [])
  68. {
  69. // Get the application layer communication protocol and listening address.
  70. [$scheme, $address] = explode(':', $remoteAddress, 2);
  71. // Check application layer protocol class.
  72. if ($scheme !== 'udp') {
  73. $scheme = ucfirst($scheme);
  74. $this->protocol = '\\Protocols\\' . $scheme;
  75. if (!class_exists($this->protocol)) {
  76. $this->protocol = "\\Workerman\\Protocols\\$scheme";
  77. if (!class_exists($this->protocol)) {
  78. throw new Exception("class \\Protocols\\$scheme not exist");
  79. }
  80. }
  81. }
  82. $this->remoteAddress = substr($address, 2);
  83. $this->contextOption = $contextOption;
  84. }
  85. /**
  86. * For udp package.
  87. *
  88. * @param resource $socket
  89. * @return void
  90. * @throws Throwable
  91. */
  92. public function baseRead($socket): void
  93. {
  94. $recvBuffer = stream_socket_recvfrom($socket, static::MAX_UDP_PACKAGE_SIZE, 0, $remoteAddress);
  95. if (false === $recvBuffer || empty($remoteAddress)) {
  96. return;
  97. }
  98. if ($this->onMessage) {
  99. if ($this->protocol) {
  100. /** @var ProtocolInterface $parser */
  101. $parser = $this->protocol;
  102. $recvBuffer = $parser::decode($recvBuffer, $this);
  103. }
  104. ++ConnectionInterface::$statistics['total_request'];
  105. try {
  106. ($this->onMessage)($this, $recvBuffer);
  107. } catch (Throwable $e) {
  108. $this->error($e);
  109. }
  110. }
  111. }
  112. /**
  113. * Close connection.
  114. *
  115. * @param mixed|null $data
  116. * @param bool $raw
  117. * @return void
  118. * @throws Throwable
  119. */
  120. public function close(mixed $data = null, bool $raw = false): void
  121. {
  122. if ($data !== null) {
  123. $this->send($data, $raw);
  124. }
  125. $this->eventLoop->offReadable($this->socket);
  126. fclose($this->socket);
  127. $this->connected = false;
  128. // Try to emit onClose callback.
  129. if ($this->onClose) {
  130. try {
  131. ($this->onClose)($this);
  132. } catch (Throwable $e) {
  133. $this->error($e);
  134. }
  135. }
  136. $this->onConnect = $this->onMessage = $this->onClose = $this->eventLoop = $this->errorHandler = null;
  137. }
  138. /**
  139. * Sends data on the connection.
  140. *
  141. * @param mixed $sendBuffer
  142. * @param bool $raw
  143. * @return void|boolean
  144. * @throws Throwable
  145. */
  146. public function send(mixed $sendBuffer, bool $raw = false)
  147. {
  148. if (false === $raw && $this->protocol) {
  149. /** @var ProtocolInterface $parser */
  150. $parser = $this->protocol;
  151. $sendBuffer = $parser::encode($sendBuffer, $this);
  152. if ($sendBuffer === '') {
  153. return;
  154. }
  155. }
  156. if ($this->connected === false) {
  157. $this->connect();
  158. }
  159. return strlen($sendBuffer) === stream_socket_sendto($this->socket, $sendBuffer);
  160. }
  161. /**
  162. * Connect.
  163. *
  164. * @return void
  165. * @throws Throwable
  166. */
  167. public function connect(): void
  168. {
  169. if ($this->connected === true) {
  170. return;
  171. }
  172. if (!$this->eventLoop) {
  173. $this->eventLoop = Worker::$globalEvent;
  174. }
  175. if ($this->contextOption) {
  176. $context = stream_context_create($this->contextOption);
  177. $this->socket = stream_socket_client("udp://$this->remoteAddress", $errno, $errmsg,
  178. 30, STREAM_CLIENT_CONNECT, $context);
  179. } else {
  180. $this->socket = stream_socket_client("udp://$this->remoteAddress", $errno, $errmsg);
  181. }
  182. if (!$this->socket) {
  183. Worker::safeEcho((string)(new Exception($errmsg)));
  184. $this->eventLoop = null;
  185. return;
  186. }
  187. stream_set_blocking($this->socket, false);
  188. if ($this->onMessage) {
  189. $this->eventLoop->onReadable($this->socket, $this->baseRead(...));
  190. }
  191. $this->connected = true;
  192. // Try to emit onConnect callback.
  193. if ($this->onConnect) {
  194. try {
  195. ($this->onConnect)($this);
  196. } catch (Throwable $e) {
  197. $this->error($e);
  198. }
  199. }
  200. }
  201. }