walkor hace 2 años
padre
commit
1c6b44e799
Se han modificado 1 ficheros con 29 adiciones y 41 borrados
  1. 29 41
      src/Events/Swow.php

+ 29 - 41
src/Events/Swow.php

@@ -127,23 +127,28 @@ class Swow implements EventInterface
      */
     public function onReadable($stream, $func)
     {
-        if (isset($this->readEvents[(int) $stream])) {
+        $fd = (int) $stream;
+        if (isset($this->readEvents[$fd])) {
             $this->offReadable($stream);
         }
-        $this->readEvents[(int) $stream] = Coroutine::run(function () use ($stream, $func): void {
+        Coroutine::run(function () use ($stream, $func, $fd): void {
             try {
+                $this->readEvents[$fd] = Coroutine::getCurrent();
                 while (true) {
                     $rEvent = stream_poll_one($stream, STREAM_POLLIN | STREAM_POLLHUP);
+                    if (!isset($this->readEvents[$fd]) || $this->readEvents[$fd] !== Coroutine::getCurrent()) {
+                        break;
+                    }
                     if ($rEvent !== STREAM_POLLNONE) {
                         $func($stream);
                     }
                     if ($rEvent !== STREAM_POLLIN) {
-                        $this->offReadable($stream, bySelf: true);
+                        $this->offReadable($stream);
                         break;
                     }
                 }
             } catch (RuntimeException) {
-                $this->offReadable($stream, bySelf: true);
+                $this->offReadable($stream);
             }
         });
         return true;
@@ -152,20 +157,10 @@ class Swow implements EventInterface
     /**
      * {@inheritdoc}
      */
-    public function offReadable($stream, bool $bySelf = false)
+    public function offReadable($stream)
     {
-        $fd = (int) $stream;
-        if (!isset($this->readEvents[$fd])) {
-            return;
-        }
-        if (!$bySelf) {
-            $coroutine = $this->readEvents[$fd];
-            if (!$coroutine->isExecuting()) {
-                return;
-            }
-            $coroutine->kill();
-        }
-        unset($this->readEvents[$fd]);
+        // 在当前协程执行 $coroutine->kill() 会导致不可预知问题,所以没有使用$coroutine->kill()
+        unset($this->readEvents[(int) $stream]);
     }
 
     /**
@@ -173,23 +168,28 @@ class Swow implements EventInterface
      */
     public function onWritable($stream, $func)
     {
-        if (isset($this->writeEvents[(int) $stream])) {
+        $fd = (int) $stream;
+        if (isset($this->writeEvents[$fd])) {
             $this->offWritable($stream);
         }
-        $this->writeEvents[(int) $stream] = Coroutine::run(function () use ($stream, $func): void {
+        Coroutine::run(function () use ($stream, $func, $fd): void {
             try {
+                $this->writeEvents[$fd] = Coroutine::getCurrent();
                 while (true) {
                     $rEvent = stream_poll_one($stream, STREAM_POLLOUT | STREAM_POLLHUP);
+                    if (!isset($this->writeEvents[$fd]) || $this->writeEvents[$fd] !== Coroutine::getCurrent()) {
+                        break;
+                    }
                     if ($rEvent !== STREAM_POLLNONE) {
                         $func($stream);
                     }
                     if ($rEvent !== STREAM_POLLOUT) {
-                        $this->offWritable($stream, bySelf: true);
+                        $this->offWritable($stream);
                         break;
                     }
                 }
             } catch (RuntimeException) {
-                $this->offWritable($stream, bySelf: true);
+                $this->offWritable($stream);
             }
         });
         return true;
@@ -198,20 +198,9 @@ class Swow implements EventInterface
     /**
      * {@inheritdoc}
      */
-    public function offWritable($stream, bool $bySelf = false)
+    public function offWritable($stream)
     {
-        $fd = (int) $stream;
-        if (!isset($this->writeEvents[$fd])) {
-            return;
-        }
-        if (!$bySelf) {
-            $coroutine = $this->writeEvents[$fd];
-            if (!$coroutine->isExecuting()) {
-                return;
-            }
-            $coroutine->kill();
-        }
-        unset($this->writeEvents[$fd]);
+        unset($this->writeEvents[(int) $stream]);
     }
 
     /**
@@ -219,18 +208,19 @@ class Swow implements EventInterface
      */
     public function onSignal($signal, $func)
     {
-        if (isset($this->signalListener[$signal])) {
-            return false;
-        }
-        $coroutine = Coroutine::run(static function () use ($signal, $func): void {
+        Coroutine::run(function () use ($signal, $func): void {
+            $this->signalListener[$signal] = Coroutine::getCurrent();
             while (1) {
                 try {
                     Signal::wait($signal);
+                    if (!isset($this->signalListener[$signal]) ||
+                        $this->signalListener[$signal] !== Coroutine::getCurrent()) {
+                        break;
+                    }
                     $func($signal);
                 } catch (SignalException) {}
             }
         });
-        $this->signalListener[$signal] = $coroutine;
         return true;
     }
 
@@ -242,7 +232,6 @@ class Swow implements EventInterface
         if (!isset($this->signalListener[$signal])) {
             return false;
         }
-        $this->signalListener[$signal]->kill();
         unset($this->signalListener[$signal]);
         return true;
     }
@@ -263,7 +252,6 @@ class Swow implements EventInterface
     public function stop()
     {
         Coroutine::getMain()->kill();
-        Signal::kill(getmypid(), Signal::INT);
     }
 
     public function destroy()