|
@@ -16,8 +16,6 @@ declare(strict_types=1);
|
|
|
|
|
|
|
|
namespace Workerman\Events;
|
|
namespace Workerman\Events;
|
|
|
|
|
|
|
|
-use SplPriorityQueue;
|
|
|
|
|
-use Throwable;
|
|
|
|
|
use function count;
|
|
use function count;
|
|
|
use function max;
|
|
use function max;
|
|
|
use function microtime;
|
|
use function microtime;
|
|
@@ -32,6 +30,7 @@ class Select implements EventInterface
|
|
|
{
|
|
{
|
|
|
/**
|
|
/**
|
|
|
* Running.
|
|
* Running.
|
|
|
|
|
+ *
|
|
|
* @var bool
|
|
* @var bool
|
|
|
*/
|
|
*/
|
|
|
protected bool $running = true;
|
|
protected bool $running = true;
|
|
@@ -39,47 +38,47 @@ class Select implements EventInterface
|
|
|
/**
|
|
/**
|
|
|
* All listeners for read/write event.
|
|
* All listeners for read/write event.
|
|
|
*
|
|
*
|
|
|
- * @var array
|
|
|
|
|
|
|
+ * @var array<int, callable>
|
|
|
*/
|
|
*/
|
|
|
protected array $readEvents = [];
|
|
protected array $readEvents = [];
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* All listeners for read/write event.
|
|
* All listeners for read/write event.
|
|
|
*
|
|
*
|
|
|
- * @var array
|
|
|
|
|
|
|
+ * @var array<int, callable>
|
|
|
*/
|
|
*/
|
|
|
protected array $writeEvents = [];
|
|
protected array $writeEvents = [];
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * @var array
|
|
|
|
|
|
|
+ * @var array<int, callable>
|
|
|
*/
|
|
*/
|
|
|
protected array $exceptEvents = [];
|
|
protected array $exceptEvents = [];
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Event listeners of signal.
|
|
* Event listeners of signal.
|
|
|
*
|
|
*
|
|
|
- * @var array
|
|
|
|
|
|
|
+ * @var array<int, callable>
|
|
|
*/
|
|
*/
|
|
|
protected array $signalEvents = [];
|
|
protected array $signalEvents = [];
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Fds waiting for read event.
|
|
* Fds waiting for read event.
|
|
|
*
|
|
*
|
|
|
- * @var array
|
|
|
|
|
|
|
+ * @var array<int, resource>
|
|
|
*/
|
|
*/
|
|
|
protected array $readFds = [];
|
|
protected array $readFds = [];
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Fds waiting for write event.
|
|
* Fds waiting for write event.
|
|
|
*
|
|
*
|
|
|
- * @var array
|
|
|
|
|
|
|
+ * @var array<int, resource>
|
|
|
*/
|
|
*/
|
|
|
protected array $writeFds = [];
|
|
protected array $writeFds = [];
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Fds waiting for except event.
|
|
* Fds waiting for except event.
|
|
|
*
|
|
*
|
|
|
- * @var array
|
|
|
|
|
|
|
+ * @var array<int, resource>
|
|
|
*/
|
|
*/
|
|
|
protected array $exceptFds = [];
|
|
protected array $exceptFds = [];
|
|
|
|
|
|
|
@@ -87,9 +86,9 @@ class Select implements EventInterface
|
|
|
* Timer scheduler.
|
|
* Timer scheduler.
|
|
|
* {['data':timer_id, 'priority':run_timestamp], ..}
|
|
* {['data':timer_id, 'priority':run_timestamp], ..}
|
|
|
*
|
|
*
|
|
|
- * @var SplPriorityQueue
|
|
|
|
|
|
|
+ * @var \SplPriorityQueue
|
|
|
*/
|
|
*/
|
|
|
- protected SplPriorityQueue $scheduler;
|
|
|
|
|
|
|
+ protected \SplPriorityQueue $scheduler;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* All timer event listeners.
|
|
* All timer event listeners.
|
|
@@ -116,16 +115,15 @@ class Select implements EventInterface
|
|
|
/**
|
|
/**
|
|
|
* @var ?callable
|
|
* @var ?callable
|
|
|
*/
|
|
*/
|
|
|
- protected $errorHandler;
|
|
|
|
|
|
|
+ protected $errorHandler = null;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Construct.
|
|
* Construct.
|
|
|
*/
|
|
*/
|
|
|
public function __construct()
|
|
public function __construct()
|
|
|
{
|
|
{
|
|
|
- // Init SplPriorityQueue.
|
|
|
|
|
- $this->scheduler = new SplPriorityQueue();
|
|
|
|
|
- $this->scheduler->setExtractFlags(SplPriorityQueue::EXTR_BOTH);
|
|
|
|
|
|
|
+ $this->scheduler = new \SplPriorityQueue();
|
|
|
|
|
+ $this->scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -242,10 +240,11 @@ class Select implements EventInterface
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* On except.
|
|
* On except.
|
|
|
|
|
+ *
|
|
|
* @param resource $stream
|
|
* @param resource $stream
|
|
|
- * @param $func
|
|
|
|
|
|
|
+ * @param callable $func
|
|
|
*/
|
|
*/
|
|
|
- public function onExcept($stream, $func): void
|
|
|
|
|
|
|
+ public function onExcept($stream, callable $func): void
|
|
|
{
|
|
{
|
|
|
$fdKey = (int)$stream;
|
|
$fdKey = (int)$stream;
|
|
|
$this->exceptEvents[$fdKey] = $func;
|
|
$this->exceptEvents[$fdKey] = $func;
|
|
@@ -254,6 +253,7 @@ class Select implements EventInterface
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Off except.
|
|
* Off except.
|
|
|
|
|
+ *
|
|
|
* @param resource $stream
|
|
* @param resource $stream
|
|
|
* @return bool
|
|
* @return bool
|
|
|
*/
|
|
*/
|
|
@@ -276,7 +276,7 @@ class Select implements EventInterface
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
$this->signalEvents[$signal] = $func;
|
|
$this->signalEvents[$signal] = $func;
|
|
|
- pcntl_signal($signal, $this->signalHandler(...));
|
|
|
|
|
|
|
+ pcntl_signal($signal, fn () => $this->safeCall($this->signalEvents[$signal], [$signal]));
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -296,20 +296,10 @@ class Select implements EventInterface
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * Signal handler.
|
|
|
|
|
- *
|
|
|
|
|
- * @param int $signal
|
|
|
|
|
- */
|
|
|
|
|
- public function signalHandler(int $signal): void
|
|
|
|
|
- {
|
|
|
|
|
- $this->signalEvents[$signal]($signal);
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- /**
|
|
|
|
|
* Tick for timer.
|
|
* Tick for timer.
|
|
|
*
|
|
*
|
|
|
* @return void
|
|
* @return void
|
|
|
- * @throws Throwable
|
|
|
|
|
|
|
+ * @throws \Throwable
|
|
|
*/
|
|
*/
|
|
|
protected function tick(): void
|
|
protected function tick(): void
|
|
|
{
|
|
{
|
|
@@ -320,6 +310,7 @@ class Select implements EventInterface
|
|
|
$nextRunTime = -$schedulerData['priority'];
|
|
$nextRunTime = -$schedulerData['priority'];
|
|
|
$timeNow = microtime(true);
|
|
$timeNow = microtime(true);
|
|
|
$this->selectTimeout = (int)(($nextRunTime - $timeNow) * 1000000);
|
|
$this->selectTimeout = (int)(($nextRunTime - $timeNow) * 1000000);
|
|
|
|
|
+
|
|
|
if ($this->selectTimeout <= 0) {
|
|
if ($this->selectTimeout <= 0) {
|
|
|
$this->scheduler->extract();
|
|
$this->scheduler->extract();
|
|
|
|
|
|
|
@@ -335,12 +326,7 @@ class Select implements EventInterface
|
|
|
} else {
|
|
} else {
|
|
|
unset($this->eventTimer[$timerId]);
|
|
unset($this->eventTimer[$timerId]);
|
|
|
}
|
|
}
|
|
|
- try {
|
|
|
|
|
- $taskData[0](...$taskData[1]);
|
|
|
|
|
- } catch (Throwable $e) {
|
|
|
|
|
- $this->error($e);
|
|
|
|
|
- continue;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ $this->safeCall($taskData[0], $taskData[1]);
|
|
|
} else {
|
|
} else {
|
|
|
break;
|
|
break;
|
|
|
}
|
|
}
|
|
@@ -363,8 +349,8 @@ class Select implements EventInterface
|
|
|
*/
|
|
*/
|
|
|
public function deleteAllTimer(): void
|
|
public function deleteAllTimer(): void
|
|
|
{
|
|
{
|
|
|
- $this->scheduler = new SplPriorityQueue();
|
|
|
|
|
- $this->scheduler->setExtractFlags(SplPriorityQueue::EXTR_BOTH);
|
|
|
|
|
|
|
+ $this->scheduler = new \SplPriorityQueue();
|
|
|
|
|
+ $this->scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH);
|
|
|
$this->eventTimer = [];
|
|
$this->eventTimer = [];
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -377,11 +363,12 @@ class Select implements EventInterface
|
|
|
$read = $this->readFds;
|
|
$read = $this->readFds;
|
|
|
$write = $this->writeFds;
|
|
$write = $this->writeFds;
|
|
|
$except = $this->exceptFds;
|
|
$except = $this->exceptFds;
|
|
|
- if ($read || $write || $except) {
|
|
|
|
|
|
|
+ if (!empty($read) || !empty($write) || !empty($except)) {
|
|
|
// Waiting read/write/signal/timeout events.
|
|
// Waiting read/write/signal/timeout events.
|
|
|
try {
|
|
try {
|
|
|
@stream_select($read, $write, $except, 0, $this->selectTimeout);
|
|
@stream_select($read, $write, $except, 0, $this->selectTimeout);
|
|
|
- } catch (Throwable) {
|
|
|
|
|
|
|
+ } catch (\Throwable) {
|
|
|
|
|
+ // do nothing
|
|
|
}
|
|
}
|
|
|
} else {
|
|
} else {
|
|
|
$this->selectTimeout >= 1 && usleep($this->selectTimeout);
|
|
$this->selectTimeout >= 1 && usleep($this->selectTimeout);
|
|
@@ -394,21 +381,21 @@ class Select implements EventInterface
|
|
|
foreach ($read as $fd) {
|
|
foreach ($read as $fd) {
|
|
|
$fdKey = (int)$fd;
|
|
$fdKey = (int)$fd;
|
|
|
if (isset($this->readEvents[$fdKey])) {
|
|
if (isset($this->readEvents[$fdKey])) {
|
|
|
- $this->readEvents[$fdKey]($fd);
|
|
|
|
|
|
|
+ $this->safeCall($this->readEvents[$fdKey], [$fd]);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
foreach ($write as $fd) {
|
|
foreach ($write as $fd) {
|
|
|
$fdKey = (int)$fd;
|
|
$fdKey = (int)$fd;
|
|
|
if (isset($this->writeEvents[$fdKey])) {
|
|
if (isset($this->writeEvents[$fdKey])) {
|
|
|
- $this->writeEvents[$fdKey]($fd);
|
|
|
|
|
|
|
+ $this->safeCall($this->writeEvents[$fdKey], [$fd]);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
foreach ($except as $fd) {
|
|
foreach ($except as $fd) {
|
|
|
$fdKey = (int)$fd;
|
|
$fdKey = (int)$fd;
|
|
|
if (isset($this->exceptEvents[$fdKey])) {
|
|
if (isset($this->exceptEvents[$fdKey])) {
|
|
|
- $this->exceptEvents[$fdKey]($fd);
|
|
|
|
|
|
|
+ $this->safeCall($this->exceptEvents[$fdKey], [$fd]);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -429,8 +416,13 @@ class Select implements EventInterface
|
|
|
foreach ($this->signalEvents as $signal => $item) {
|
|
foreach ($this->signalEvents as $signal => $item) {
|
|
|
$this->offsignal($signal);
|
|
$this->offsignal($signal);
|
|
|
}
|
|
}
|
|
|
- $this->readFds = $this->writeFds = $this->exceptFds = $this->readEvents
|
|
|
|
|
- = $this->writeEvents = $this->exceptEvents = $this->signalEvents = [];
|
|
|
|
|
|
|
+ $this->readFds = [];
|
|
|
|
|
+ $this->writeFds = [];
|
|
|
|
|
+ $this->exceptFds = [];
|
|
|
|
|
+ $this->readEvents = [];
|
|
|
|
|
+ $this->writeEvents = [];
|
|
|
|
|
+ $this->exceptEvents = [];
|
|
|
|
|
+ $this->signalEvents = [];
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -458,15 +450,20 @@ class Select implements EventInterface
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * @param Throwable $e
|
|
|
|
|
|
|
+ * @param callable $func
|
|
|
|
|
+ * @param array $args
|
|
|
* @return void
|
|
* @return void
|
|
|
- * @throws Throwable
|
|
|
|
|
*/
|
|
*/
|
|
|
- public function error(Throwable $e): void
|
|
|
|
|
|
|
+ private function safeCall(callable $func, array $args = []): void
|
|
|
{
|
|
{
|
|
|
- if (!$this->errorHandler) {
|
|
|
|
|
- throw new $e;
|
|
|
|
|
|
|
+ try {
|
|
|
|
|
+ $func(...$args);
|
|
|
|
|
+ } catch (\Throwable $e) {
|
|
|
|
|
+ if ($this->errorHandler === null) {
|
|
|
|
|
+ echo $e;
|
|
|
|
|
+ } else {
|
|
|
|
|
+ ($this->errorHandler)($e);
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
- ($this->errorHandler)($e);
|
|
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|