AsyncUdpConnection.php 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
  1. <?php
  2. /**
  3. * This file is part of workerman.
  4. *
  5. * Licensed under The MIT License
  6. * For full copyright and license information, please see the MIT-LICENSE.txt
  7. * Redistributions of files must retain the above copyright notice.
  8. *
  9. * @author walkor<walkor@workerman.net>
  10. * @copyright walkor<walkor@workerman.net>
  11. * @link http://www.workerman.net/
  12. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  13. */
  14. declare(strict_types=1);
  15. namespace Workerman\Connection;
  16. use Exception;
  17. use RuntimeException;
  18. use Throwable;
  19. use Workerman\Protocols\ProtocolInterface;
  20. use Workerman\Worker;
  21. use function class_exists;
  22. use function explode;
  23. use function fclose;
  24. use function stream_context_create;
  25. use function stream_set_blocking;
  26. use function stream_socket_client;
  27. use function stream_socket_recvfrom;
  28. use function stream_socket_sendto;
  29. use function strlen;
  30. use function substr;
  31. use function ucfirst;
  32. use const STREAM_CLIENT_CONNECT;
  33. /**
  34. * AsyncUdpConnection.
  35. */
  36. class AsyncUdpConnection extends UdpConnection
  37. {
  38. /**
  39. * Emitted when socket connection is successfully established.
  40. *
  41. * @var ?callable
  42. */
  43. public $onConnect = null;
  44. /**
  45. * Emitted when socket connection closed.
  46. *
  47. * @var ?callable
  48. */
  49. public $onClose = null;
  50. /**
  51. * Connected or not.
  52. *
  53. * @var bool
  54. */
  55. protected bool $connected = false;
  56. /**
  57. * Context option.
  58. *
  59. * @var array
  60. */
  61. protected array $contextOption = [];
  62. /**
  63. * Construct.
  64. *
  65. * @param string $remoteAddress
  66. * @throws Throwable
  67. */
  68. public function __construct($remoteAddress, $contextOption = [])
  69. {
  70. // Get the application layer communication protocol and listening address.
  71. [$scheme, $address] = explode(':', $remoteAddress, 2);
  72. // Check application layer protocol class.
  73. if ($scheme !== 'udp') {
  74. $scheme = ucfirst($scheme);
  75. $this->protocol = '\\Protocols\\' . $scheme;
  76. if (!class_exists($this->protocol)) {
  77. $this->protocol = "\\Workerman\\Protocols\\$scheme";
  78. if (!class_exists($this->protocol)) {
  79. throw new RuntimeException("class \\Protocols\\$scheme not exist");
  80. }
  81. }
  82. }
  83. $this->remoteAddress = substr($address, 2);
  84. $this->contextOption = $contextOption;
  85. }
  86. /**
  87. * For udp package.
  88. *
  89. * @param resource $socket
  90. * @return void
  91. */
  92. public function baseRead($socket): void
  93. {
  94. $recvBuffer = stream_socket_recvfrom($socket, static::MAX_UDP_PACKAGE_SIZE, 0, $remoteAddress);
  95. if (false === $recvBuffer || empty($remoteAddress)) {
  96. return;
  97. }
  98. if ($this->onMessage) {
  99. if ($this->protocol) {
  100. $recvBuffer = $this->protocol::decode($recvBuffer, $this);
  101. }
  102. ++ConnectionInterface::$statistics['total_request'];
  103. try {
  104. ($this->onMessage)($this, $recvBuffer);
  105. } catch (Throwable $e) {
  106. $this->error($e);
  107. }
  108. }
  109. }
  110. /**
  111. * Close connection.
  112. *
  113. * @param mixed $data
  114. * @param bool $raw
  115. * @return void
  116. */
  117. public function close(mixed $data = null, bool $raw = false): void
  118. {
  119. if ($data !== null) {
  120. $this->send($data, $raw);
  121. }
  122. $this->eventLoop->offReadable($this->socket);
  123. fclose($this->socket);
  124. $this->connected = false;
  125. // Try to emit onClose callback.
  126. if ($this->onClose) {
  127. try {
  128. ($this->onClose)($this);
  129. } catch (Throwable $e) {
  130. $this->error($e);
  131. }
  132. }
  133. $this->onConnect = $this->onMessage = $this->onClose = $this->eventLoop = $this->errorHandler = null;
  134. }
  135. /**
  136. * Sends data on the connection.
  137. *
  138. * @param mixed $sendBuffer
  139. * @param bool $raw
  140. * @return bool|null
  141. */
  142. public function send(mixed $sendBuffer, bool $raw = false): bool|null
  143. {
  144. if (false === $raw && $this->protocol) {
  145. $sendBuffer = $this->protocol::encode($sendBuffer, $this);
  146. if ($sendBuffer === '') {
  147. return null;
  148. }
  149. }
  150. if ($this->connected === false) {
  151. $this->connect();
  152. }
  153. return strlen($sendBuffer) === stream_socket_sendto($this->socket, $sendBuffer);
  154. }
  155. /**
  156. * Connect.
  157. *
  158. * @return void
  159. */
  160. public function connect(): void
  161. {
  162. if ($this->connected === true) {
  163. return;
  164. }
  165. if (!$this->eventLoop) {
  166. $this->eventLoop = Worker::$globalEvent;
  167. }
  168. if ($this->contextOption) {
  169. $context = stream_context_create($this->contextOption);
  170. $this->socket = stream_socket_client("udp://$this->remoteAddress", $errno, $errmsg,
  171. 30, STREAM_CLIENT_CONNECT, $context);
  172. } else {
  173. $this->socket = stream_socket_client("udp://$this->remoteAddress", $errno, $errmsg);
  174. }
  175. if (!$this->socket) {
  176. Worker::safeEcho((string)(new Exception($errmsg)));
  177. $this->eventLoop = null;
  178. return;
  179. }
  180. stream_set_blocking($this->socket, false);
  181. if ($this->onMessage) {
  182. $this->eventLoop->onReadable($this->socket, $this->baseRead(...));
  183. }
  184. $this->connected = true;
  185. // Try to emit onConnect callback.
  186. if ($this->onConnect) {
  187. try {
  188. ($this->onConnect)($this);
  189. } catch (Throwable $e) {
  190. $this->error($e);
  191. }
  192. }
  193. }
  194. }