浏览代码

Compatible windows system

李亮 8 年之前
父节点
当前提交
1a1b5e7fec
共有 4 个文件被更改,包括 82 次插入20 次删除
  1. 9 1
      Connection/AsyncTcpConnection.php
  2. 7 0
      Events/EventInterface.php
  3. 6 4
      Events/React/StreamSelectLoop.php
  4. 60 15
      Events/Select.php

+ 9 - 1
Connection/AsyncTcpConnection.php

@@ -184,6 +184,10 @@ class AsyncTcpConnection extends TcpConnection
         }
         // Add socket to global event loop waiting connection is successfully established or faild. 
         Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'checkConnection'));
+        // For windows.
+        if(DIRECTORY_SEPARATOR === '\\') {
+            Worker::$globalEvent->add($this->_socket, EventInterface::EV_EXCEPT, array($this, 'checkConnection'));
+        }
     }
 
     /**
@@ -201,7 +205,7 @@ class AsyncTcpConnection extends TcpConnection
             $this->_reconnectTimer = Timer::add($after, array($this, 'connect'), null, false);
             return;
         }
-        return $this->connect();
+        $this->connect();
     }
 
     /**
@@ -255,6 +259,10 @@ class AsyncTcpConnection extends TcpConnection
      */
     public function checkConnection($socket)
     {
+        // Remove EV_EXPECT for windows.
+        if(DIRECTORY_SEPARATOR === '\\') {
+            Worker::$globalEvent->del($socket, EventInterface::EV_EXCEPT);
+        }
         // Check socket state.
         if ($address = stream_socket_get_name($socket, true)) {
             // Remove write listener.

+ 7 - 0
Events/EventInterface.php

@@ -30,6 +30,13 @@ interface EventInterface
     const EV_WRITE = 2;
 
     /**
+     * Except event
+     *
+     * @var int
+     */
+    const EV_EXCEPT = 3;
+
+    /**
      * Signal event.
      *
      * @var int

+ 6 - 4
Events/React/StreamSelectLoop.php

@@ -113,7 +113,7 @@ class StreamSelectLoop extends \React\EventLoop\StreamSelectLoop
      */
     public function addSignal($signal, $callback)
     {
-        if(PHP_EOL !== "\r\n") {
+        if(DIRECTORY_SEPARATOR === '/') {
             pcntl_signal($signal, $callback);
         }
     }
@@ -125,7 +125,7 @@ class StreamSelectLoop extends \React\EventLoop\StreamSelectLoop
      */
     public function removeSignal($signal)
     {
-        if(PHP_EOL !== "\r\n") {
+        if(DIRECTORY_SEPARATOR === '/') {
             pcntl_signal($signal, SIG_IGN);
         }
     }
@@ -146,13 +146,15 @@ class StreamSelectLoop extends \React\EventLoop\StreamSelectLoop
         if ($read || $write) {
             $except = null;
             // Calls signal handlers for pending signals
-            pcntl_signal_dispatch();
+            if(DIRECTORY_SEPARATOR === '/') {
+                pcntl_signal_dispatch();
+            }
             // suppress warnings that occur, when stream_select is interrupted by a signal
             return @stream_select($read, $write, $except, $timeout === null ? null : 0, $timeout);
         }
 
         // Calls signal handlers for pending signals
-        if(PHP_EOL !== "\r\n") {
+        if(DIRECTORY_SEPARATOR === '/') {
             pcntl_signal_dispatch();
         }
         $timeout && usleep($timeout);

+ 60 - 15
Events/Select.php

@@ -47,6 +47,13 @@ class Select implements EventInterface
     protected $_writeFds = array();
 
     /**
+     * Fds waiting for except event.
+     *
+     * @var array
+     */
+    protected $_exceptFds = array();
+
+    /**
      * Timer scheduler.
      * {['data':timer_id, 'priority':run_timestamp], ..}
      *
@@ -89,8 +96,9 @@ class Select implements EventInterface
     public function __construct()
     {
         // Create a pipeline and put into the collection of the read to read the descriptor to avoid empty polling.
-        $this->channel = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
-        if ($this->channel) {
+        $this->channel = stream_socket_pair(DIRECTORY_SEPARATOR === '/' ? STREAM_PF_UNIX : STREAM_PF_INET,
+            STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
+        if($this->channel) {
             stream_set_blocking($this->channel[0], 0);
             $this->_readFds[0] = $this->channel[0];
         }
@@ -115,7 +123,16 @@ class Select implements EventInterface
                 $this->_allEvents[$fd_key][$flag] = array($func, $fd);
                 $this->_writeFds[$fd_key]         = $fd;
                 break;
+            case self::EV_EXCEPT:
+                $fd_key = (int)$fd;
+                $this->_allEvents[$fd_key][$flag] = array($func, $fd);
+                $this->_exceptFds[$fd_key] = $fd;
+                break;
             case self::EV_SIGNAL:
+                // Windows not support signal.
+                if(DIRECTORY_SEPARATOR !== '/') {
+                    return false;
+                }
                 $fd_key                              = (int)$fd;
                 $this->_signalEvents[$fd_key][$flag] = array($func, $fd);
                 pcntl_signal($fd, array($this, 'signalHandler'));
@@ -161,7 +178,17 @@ class Select implements EventInterface
                     unset($this->_allEvents[$fd_key]);
                 }
                 return true;
+            case self::EV_EXCEPT:
+                unset($this->_allEvents[$fd_key][$flag], $this->_exceptFds[$fd_key]);
+                if(empty($this->_allEvents[$fd_key]))
+                {
+                    unset($this->_allEvents[$fd_key]);
+                }
+                return true;
             case self::EV_SIGNAL:
+                if(DIRECTORY_SEPARATOR !== '/') {
+                    return false;
+                }
                 unset($this->_signalEvents[$fd_key]);
                 pcntl_signal($fd, SIG_IGN);
                 break;
@@ -227,13 +254,17 @@ class Select implements EventInterface
     {
         $e = null;
         while (1) {
-            // Calls signal handlers for pending signals
-            pcntl_signal_dispatch();
+            if(DIRECTORY_SEPARATOR === '/') {
+                // Calls signal handlers for pending signals
+                pcntl_signal_dispatch();
+            }
 
             $read  = $this->_readFds;
             $write = $this->_writeFds;
+            $except = $this->_writeFds;
+
             // Waiting read/write/signal/timeout events.
-            $ret = @stream_select($read, $write, $e, 0, $this->_selectTimeout);
+            $ret = @stream_select($read, $write, $except, 0, $this->_selectTimeout);
 
             if (!$this->_scheduler->isEmpty()) {
                 $this->tick();
@@ -243,19 +274,33 @@ class Select implements EventInterface
                 continue;
             }
 
-            foreach ($read as $fd) {
-                $fd_key = (int)$fd;
-                if (isset($this->_allEvents[$fd_key][self::EV_READ])) {
-                    call_user_func_array($this->_allEvents[$fd_key][self::EV_READ][0],
-                        array($this->_allEvents[$fd_key][self::EV_READ][1]));
+            if ($read) {
+                foreach ($read as $fd) {
+                    $fd_key = (int)$fd;
+                    if (isset($this->_allEvents[$fd_key][self::EV_READ])) {
+                        call_user_func_array($this->_allEvents[$fd_key][self::EV_READ][0],
+                            array($this->_allEvents[$fd_key][self::EV_READ][1]));
+                    }
                 }
             }
 
-            foreach ($write as $fd) {
-                $fd_key = (int)$fd;
-                if (isset($this->_allEvents[$fd_key][self::EV_WRITE])) {
-                    call_user_func_array($this->_allEvents[$fd_key][self::EV_WRITE][0],
-                        array($this->_allEvents[$fd_key][self::EV_WRITE][1]));
+            if ($write) {
+                foreach ($write as $fd) {
+                    $fd_key = (int)$fd;
+                    if (isset($this->_allEvents[$fd_key][self::EV_WRITE])) {
+                        call_user_func_array($this->_allEvents[$fd_key][self::EV_WRITE][0],
+                            array($this->_allEvents[$fd_key][self::EV_WRITE][1]));
+                    }
+                }
+            }
+
+            if($except) {
+                foreach($except as $fd) {
+                    $fd_key = (int) $fd;
+                    if(isset($this->_allEvents[$fd_key][self::EV_EXCEPT])) {
+                        call_user_func_array($this->_allEvents[$fd_key][self::EV_EXCEPT][0],
+                            array($this->_allEvents[$fd_key][self::EV_EXCEPT][1]));
+                    }
                 }
             }
         }