老鬼 7 år sedan
förälder
incheckning
d7da281e14

+ 52 - 27
Connection/AsyncTcpConnection.php

@@ -75,7 +75,7 @@ class AsyncTcpConnection extends TcpConnection
     /**
      * Context option.
      *
-     * @var resource
+     * @var array
      */
     protected $_contextOption = null;
 
@@ -115,7 +115,7 @@ class AsyncTcpConnection extends TcpConnection
         if (!$address_info) {
             list($scheme, $this->_remoteAddress) = explode(':', $remote_address, 2);
             if (!$this->_remoteAddress) {
-                echo new \Exception('bad remote_address');
+                Worker::safeEcho(new \Exception('bad remote_address'));
             }
         } else {
             if (!isset($address_info['port'])) {
@@ -156,20 +156,20 @@ class AsyncTcpConnection extends TcpConnection
 
         // For statistics.
         self::$statistics['connection_count']++;
-        $this->maxSendBufferSize        = self::$defaultMaxSendBufferSize;
-        $this->_contextOption           = $context_option;
-        static::$connections[$this->id] = $this;
+        $this->maxSendBufferSize         = self::$defaultMaxSendBufferSize;
+        $this->_contextOption            = $context_option;
+        static::$connections[$this->_id] = $this;
     }
 
     /**
      * Do connect.
      *
-     * @return void 
+     * @return void
      */
     public function connect()
     {
         if ($this->_status !== self::STATUS_INITIAL && $this->_status !== self::STATUS_CLOSING &&
-             $this->_status !== self::STATUS_CLOSED) {
+            $this->_status !== self::STATUS_CLOSED) {
             return;
         }
         $this->_status           = self::STATUS_CONNECTING;
@@ -178,10 +178,10 @@ class AsyncTcpConnection extends TcpConnection
             // Open socket connection asynchronously.
             if ($this->_contextOption) {
                 $context = stream_context_create($this->_contextOption);
-                $this->_socket = stream_socket_client("{$this->transport}://{$this->_remoteHost}:{$this->_remotePort}",
+                $this->_socket = stream_socket_client("tcp://{$this->_remoteHost}:{$this->_remotePort}",
                     $errno, $errstr, 0, STREAM_CLIENT_ASYNC_CONNECT, $context);
             } else {
-                $this->_socket = stream_socket_client("{$this->transport}://{$this->_remoteHost}:{$this->_remotePort}",
+                $this->_socket = stream_socket_client("tcp://{$this->_remoteHost}:{$this->_remotePort}",
                     $errno, $errstr, 0, STREAM_CLIENT_ASYNC_CONNECT);
             }
         } else {
@@ -199,7 +199,7 @@ class AsyncTcpConnection extends TcpConnection
             }
             return;
         }
-        // Add socket to global event loop waiting connection is successfully established or faild. 
+        // Add socket to global event loop waiting connection is successfully established or faild.
         Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'checkConnection'));
         // For windows.
         if(DIRECTORY_SEPARATOR === '\\') {
@@ -213,8 +213,10 @@ class AsyncTcpConnection extends TcpConnection
      * @param int $after
      * @return void
      */
-    public function reConnect($after = 0) {
-        $this->_status = self::STATUS_INITIAL;
+    public function reconnect($after = 0)
+    {
+        $this->_status                   = self::STATUS_INITIAL;
+        static::$connections[$this->_id] = $this;
         if ($this->_reconnectTimer) {
             Timer::del($this->_reconnectTimer);
         }
@@ -226,9 +228,19 @@ class AsyncTcpConnection extends TcpConnection
     }
 
     /**
+     * CancelReconnect.
+     */
+    public function cancelReconnect()
+    {
+        if ($this->_reconnectTimer) {
+            Timer::del($this->_reconnectTimer);
+        }
+    }
+
+    /**
      * Get remote address.
      *
-     * @return string 
+     * @return string
      */
     public function getRemoteHost()
     {
@@ -274,37 +286,50 @@ class AsyncTcpConnection extends TcpConnection
      * @param resource $socket
      * @return void
      */
-    public function checkConnection($socket)
+    public function checkConnection()
     {
+        if ($this->_status != self::STATUS_CONNECTING) {
+            return;
+        }
+
         // Remove EV_EXPECT for windows.
         if(DIRECTORY_SEPARATOR === '\\') {
-            Worker::$globalEvent->del($socket, EventInterface::EV_EXCEPT);
+            Worker::$globalEvent->del($this->_socket, EventInterface::EV_EXCEPT);
         }
+
         // Check socket state.
-        if ($address = stream_socket_get_name($socket, true)) {
-            // Remove write listener.
-            Worker::$globalEvent->del($socket, EventInterface::EV_WRITE);
+        if ($address = stream_socket_get_name($this->_socket, true)) {
             // Nonblocking.
-            stream_set_blocking($socket, 0);
+            stream_set_blocking($this->_socket, 0);
             // Compatible with hhvm
             if (function_exists('stream_set_read_buffer')) {
-                stream_set_read_buffer($socket, 0);
+                stream_set_read_buffer($this->_socket, 0);
             }
             // Try to open keepalive for tcp and disable Nagle algorithm.
             if (function_exists('socket_import_stream') && $this->transport === 'tcp') {
-                $raw_socket = socket_import_stream($socket);
+                $raw_socket = socket_import_stream($this->_socket);
                 socket_set_option($raw_socket, SOL_SOCKET, SO_KEEPALIVE, 1);
                 socket_set_option($raw_socket, SOL_TCP, TCP_NODELAY, 1);
             }
-            // Register a listener waiting read event.
-            Worker::$globalEvent->add($socket, EventInterface::EV_READ, array($this, 'baseRead'));
-            // There are some data waiting to send.
-            if ($this->_sendBuffer) {
-                Worker::$globalEvent->add($socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
+
+            // Remove write listener.
+            Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
+
+            // SSL handshake.
+            if ($this->transport === 'ssl') {
+                $this->_sslHandshakeCompleted = $this->doSslHandshake($this->_socket);
+            } else {
+                // There are some data waiting to send.
+                if ($this->_sendBuffer) {
+                    Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
+                }
             }
+
+            // Register a listener waiting read event.
+            Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
+
             $this->_status                = self::STATUS_ESTABLISHED;
             $this->_remoteAddress         = $address;
-            $this->_sslHandshakeCompleted = true;
 
             // Try to emit onConnect callback.
             if ($this->onConnect) {

+ 206 - 0
Connection/AsyncUdpConnection.php

@@ -0,0 +1,206 @@
+<?php
+/**
+ * This file is part of workerman.
+ *
+ * Licensed under The MIT License
+ * For full copyright and license information, please see the MIT-LICENSE.txt
+ * Redistributions of files must retain the above copyright notice.
+ *
+ * @author    walkor<walkor@workerman.net>
+ * @copyright walkor<walkor@workerman.net>
+ * @link      http://www.workerman.net/
+ * @license   http://www.opensource.org/licenses/mit-license.php MIT License
+ */
+namespace Workerman\Connection;
+
+use Workerman\Events\EventInterface;
+use Workerman\Worker;
+use Exception;
+
+/**
+ * AsyncTcpConnection.
+ */
+class AsyncUdpConnection extends UdpConnection
+{
+    /**
+     * Emitted when socket connection is successfully established.
+     *
+     * @var callback
+     */
+    public $onConnect = null;
+
+    /**
+     * Emitted when socket connection closed.
+     *
+     * @var callback
+     */
+    public $onClose = null;
+
+    /**
+     * Connected or not.
+     *
+     * @var bool
+     */
+    protected $connected = false;
+
+    /**
+     * Context option.
+     *
+     * @var array
+     */
+    protected $_contextOption = null;
+
+    /**
+     * Construct.
+     *
+     * @param string $remote_address
+     * @throws Exception
+     */
+    public function __construct($remote_address, $context_option = null)
+    {
+        // Get the application layer communication protocol and listening address.
+        list($scheme, $address) = explode(':', $remote_address, 2);
+        // Check application layer protocol class.
+        if ($scheme !== 'udp') {
+            $scheme         = ucfirst($scheme);
+            $this->protocol = '\\Protocols\\' . $scheme;
+            if (!class_exists($this->protocol)) {
+                $this->protocol = "\\Workerman\\Protocols\\$scheme";
+                if (!class_exists($this->protocol)) {
+                    throw new Exception("class \\Protocols\\$scheme not exist");
+                }
+            }
+        }
+        
+        $this->_remoteAddress = substr($address, 2);
+        $this->_contextOption = $context_option;
+    }
+    
+    /**
+     * For udp package.
+     *
+     * @param resource $socket
+     * @return bool
+     */
+    public function baseRead($socket)
+    {
+        $recv_buffer = stream_socket_recvfrom($socket, Worker::MAX_UDP_PACKAGE_SIZE, 0, $remote_address);
+        if (false === $recv_buffer || empty($remote_address)) {
+            return false;
+        }
+        
+        if ($this->onMessage) {
+            if ($this->protocol) {
+                $parser      = $this->protocol;
+                $recv_buffer = $parser::decode($recv_buffer, $this);
+            }
+            ConnectionInterface::$statistics['total_request']++;
+            try {
+                call_user_func($this->onMessage, $this, $recv_buffer);
+            } catch (\Exception $e) {
+                Worker::log($e);
+                exit(250);
+            } catch (\Error $e) {
+                Worker::log($e);
+                exit(250);
+            }
+        }
+        return true;
+    }
+
+    /**
+     * Sends data on the connection.
+     *
+     * @param string $send_buffer
+     * @param bool   $raw
+     * @return void|boolean
+     */
+    public function send($send_buffer, $raw = false)
+    {
+        if (false === $raw && $this->protocol) {
+            $parser      = $this->protocol;
+            $send_buffer = $parser::encode($send_buffer, $this);
+            if ($send_buffer === '') {
+                return null;
+            }
+        }
+        if ($this->connected === false) {
+            $this->connect();
+        }
+        return strlen($send_buffer) === stream_socket_sendto($this->_socket, $send_buffer, 0);
+    }
+    
+    
+    /**
+     * Close connection.
+     *
+     * @param mixed $data
+     * @param bool $raw
+     *
+     * @return bool
+     */
+    public function close($data = null, $raw = false)
+    {
+        if ($data !== null) {
+            $this->send($data, $raw);
+        }
+        Worker::$globalEvent->del($this->_socket, EventInterface::EV_READ);
+        fclose($this->_socket);
+        $this->connected = false;
+        // Try to emit onClose callback.
+        if ($this->onClose) {
+            try {
+                call_user_func($this->onClose, $this);
+            } catch (\Exception $e) {
+                Worker::log($e);
+                exit(250);
+            } catch (\Error $e) {
+                Worker::log($e);
+                exit(250);
+            }
+        }
+        $this->onConnect = $this->onMessage = $this->onClose = null;
+        return true;
+    }
+
+    /**
+     * Connect.
+     *
+     * @return void
+     */
+    public function connect()
+    {
+        if ($this->connected === true) {
+            return;
+        }
+        if ($this->_contextOption) {
+            $context = stream_context_create($this->_contextOption);
+            $this->_socket = stream_socket_client("udp://{$this->_remoteAddress}", $errno, $errmsg,
+                30, STREAM_CLIENT_CONNECT, $context);
+        } else {
+            $this->_socket = stream_socket_client("udp://{$this->_remoteAddress}", $errno, $errmsg);
+        }
+
+        if (!$this->_socket) {
+            Worker::safeEcho(new \Exception($errmsg));
+            return;
+        }
+        if ($this->onMessage) {
+            Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
+        }
+        $this->connected = true;
+        // Try to emit onConnect callback.
+        if ($this->onConnect) {
+            try {
+                call_user_func($this->onConnect, $this);
+            } catch (\Exception $e) {
+                Worker::log($e);
+                exit(250);
+            } catch (\Error $e) {
+                Worker::log($e);
+                exit(250);
+            }
+        }
+    }
+
+}

+ 108 - 48
Connection/TcpConnection.php

@@ -260,6 +260,7 @@ class TcpConnection extends ConnectionInterface
      *
      * @param string $name
      * @param array  $arguments
+     * @return void
      */
     public function __call($name, $arguments) {
         // Try to emit custom function within protocol
@@ -273,10 +274,7 @@ class TcpConnection extends ConnectionInterface
                 Worker::log($e);
                 exit(250);
             }
-	} else {
-	    trigger_error('Call to undefined method '.__CLASS__.'::'.$name.'()', E_USER_ERROR);
-	}
-
+        }
     }
 
     /**
@@ -299,8 +297,8 @@ class TcpConnection extends ConnectionInterface
             stream_set_read_buffer($this->_socket, 0);
         }
         Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
-        $this->maxSendBufferSize = self::$defaultMaxSendBufferSize;
-        $this->_remoteAddress    = $remote_address;
+        $this->maxSendBufferSize        = self::$defaultMaxSendBufferSize;
+        $this->_remoteAddress           = $remote_address;
         static::$connections[$this->id] = $this;
     }
 
@@ -324,7 +322,7 @@ class TcpConnection extends ConnectionInterface
      *
      * @param string $send_buffer
      * @param bool  $raw
-     * @return void|bool|null
+     * @return bool|null
      */
     public function send($send_buffer, $raw = false)
     {
@@ -355,10 +353,17 @@ class TcpConnection extends ConnectionInterface
             return null;
         }
 
-
         // Attempt to send data directly.
         if ($this->_sendBuffer === '') {
-            $len = @fwrite($this->_socket, $send_buffer, 8192);
+            if ($this->transport === 'ssl') {
+                Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
+                $this->_sendBuffer = $send_buffer;
+                $this->checkBufferWillFull();
+                return null;
+            }
+            set_error_handler(function(){});
+            $len = fwrite($this->_socket, $send_buffer);
+            restore_error_handler();
             // send successful.
             if ($len === strlen($send_buffer)) {
                 $this->bytesWritten += $len;
@@ -552,6 +557,8 @@ class TcpConnection extends ConnectionInterface
         }
     }
 
+
+
     /**
      * Base read handler.
      *
@@ -563,37 +570,19 @@ class TcpConnection extends ConnectionInterface
     {
         // SSL handshake.
         if ($this->transport === 'ssl' && $this->_sslHandshakeCompleted !== true) {
-            $ret = stream_socket_enable_crypto($socket, true, STREAM_CRYPTO_METHOD_SSLv2_SERVER | 
-					       STREAM_CRYPTO_METHOD_SSLv23_SERVER);
-            // Negotiation has failed.
-            if(false === $ret) {
-                if (!feof($socket)) {
-                    echo "\nSSL Handshake fail. \nBuffer:".bin2hex(fread($socket, 8182))."\n";
+            if ($this->doSslHandshake($socket)) {
+                $this->_sslHandshakeCompleted = true;
+                if ($this->_sendBuffer) {
+                    Worker::$globalEvent->add($socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
                 }
-                return $this->destroy();
-            } elseif(0 === $ret) {
-                // There isn't enough data and should try again.
+            } else {
                 return;
             }
-            if (isset($this->onSslHandshake)) {
-                try {
-                    call_user_func($this->onSslHandshake, $this);
-                } catch (\Exception $e) {
-                    Worker::log($e);
-                    exit(250);
-                } catch (\Error $e) {
-                    Worker::log($e);
-                    exit(250);
-                }
-            }
-            $this->_sslHandshakeCompleted = true;
-            if ($this->_sendBuffer) {
-                Worker::$globalEvent->add($socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
-            }
-            return;
         }
 
-        $buffer = @fread($socket, self::READ_BUFFER_SIZE);
+        set_error_handler(function(){});
+        $buffer = fread($socket, self::READ_BUFFER_SIZE);
+        restore_error_handler();
 
         // Check connection closed.
         if ($buffer === '' || $buffer === false) {
@@ -618,18 +607,22 @@ class TcpConnection extends ConnectionInterface
                     }
                 } else {
                     // Get current package length.
+                    set_error_handler(function($code, $msg, $file, $line){
+                        Worker::safeEcho("$msg in file $file on line $line\n");
+                    });
                     $this->_currentPackageLength = $parser::input($this->_recvBuffer, $this);
+                    restore_error_handler();
                     // The packet length is unknown.
                     if ($this->_currentPackageLength === 0) {
                         break;
-                    } elseif ($this->_currentPackageLength > 0 && $this->_currentPackageLength <= self::$maxPackageSize) {
+                    } elseif ($this->_currentPackageLength > 0 && $this->_currentPackageLength <= static::$maxPackageSize) {
                         // Data is not enough for a package.
                         if ($this->_currentPackageLength > strlen($this->_recvBuffer)) {
                             break;
                         }
                     } // Wrong package.
                     else {
-                        echo 'error package. package_length=' . var_export($this->_currentPackageLength, true);
+                        Worker::safeEcho('error package. package_length=' . var_export($this->_currentPackageLength, true));
                         $this->destroy();
                         return;
                     }
@@ -696,12 +689,18 @@ class TcpConnection extends ConnectionInterface
      */
     public function baseWrite()
     {
-        $len = @fwrite($this->_socket, $this->_sendBuffer, 8192);
+        set_error_handler(function(){});
+        if ($this->transport === 'ssl') {
+            $len = fwrite($this->_socket, $this->_sendBuffer, 8192);
+        } else {
+            $len = fwrite($this->_socket, $this->_sendBuffer);
+        }
+        restore_error_handler();
         if ($len === strlen($this->_sendBuffer)) {
             $this->bytesWritten += $len;
             Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
             $this->_sendBuffer = '';
-            // Try to emit onBufferDrain callback when the send buffer becomes empty. 
+            // Try to emit onBufferDrain callback when the send buffer becomes empty.
             if ($this->onBufferDrain) {
                 try {
                     call_user_func($this->onBufferDrain, $this);
@@ -728,6 +727,54 @@ class TcpConnection extends ConnectionInterface
     }
 
     /**
+     * SSL handshake.
+     *
+     * @param $socket
+     * @return bool
+     */
+    public function doSslHandshake($socket){
+        if (feof($socket)) {
+            $this->destroy();
+            return false;
+        }
+        $async = $this instanceof AsyncTcpConnection;
+        if($async){
+            $type = STREAM_CRYPTO_METHOD_SSLv2_CLIENT | STREAM_CRYPTO_METHOD_SSLv23_CLIENT;
+        }else{
+            $type = STREAM_CRYPTO_METHOD_SSLv2_SERVER | STREAM_CRYPTO_METHOD_SSLv23_SERVER;
+        }
+
+        // Hidden error.
+        set_error_handler(function($errno, $errstr, $file){
+            if (!Worker::$daemonize) {
+                Worker::safeEcho("SSL handshake error: $errstr \n");
+            }
+        });
+        $ret     = stream_socket_enable_crypto($socket, true, $type);
+        restore_error_handler();
+        // Negotiation has failed.
+        if (false === $ret) {
+            $this->destroy();
+            return false;
+        } elseif (0 === $ret) {
+            // There isn't enough data and should try again.
+            return false;
+        }
+        if (isset($this->onSslHandshake)) {
+            try {
+                call_user_func($this->onSslHandshake, $this);
+            } catch (\Exception $e) {
+                Worker::log($e);
+                exit(250);
+            } catch (\Error $e) {
+                Worker::log($e);
+                exit(250);
+            }
+        }
+        return true;
+    }
+
+    /**
      * This method pulls all the data out of a readable stream, and writes it to the supplied destination.
      *
      * @param TcpConnection $dest
@@ -839,6 +886,16 @@ class TcpConnection extends ConnectionInterface
         }
         return false;
     }
+    
+    /**
+     * Whether send buffer is Empty.
+     *
+     * @return bool
+     */
+    public function bufferIsEmpty()
+    {
+    	return empty($this->_sendBuffer);
+    }
 
     /**
      * Destroy connection.
@@ -854,13 +911,12 @@ class TcpConnection extends ConnectionInterface
         // Remove event listener.
         Worker::$globalEvent->del($this->_socket, EventInterface::EV_READ);
         Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
+
         // Close socket.
-        @fclose($this->_socket);
-        // Remove from worker->connections.
-        if ($this->worker) {
-            unset($this->worker->connections[$this->_id]);
-        }
-        unset(static::$connections[$this->_id]);
+        set_error_handler(function(){});
+        fclose($this->_socket);
+        restore_error_handler();
+
         $this->_status = self::STATUS_CLOSED;
         // Try to emit onClose callback.
         if ($this->onClose) {
@@ -875,7 +931,7 @@ class TcpConnection extends ConnectionInterface
             }
         }
         // Try to emit protocol::onClose
-        if (is_object($this->protocol) && method_exists($this->protocol, 'onClose')) {
+        if ($this->protocol && method_exists($this->protocol, 'onClose')) {
             try {
                 call_user_func(array($this->protocol, 'onClose'), $this);
             } catch (\Exception $e) {
@@ -889,6 +945,11 @@ class TcpConnection extends ConnectionInterface
         if ($this->_status === self::STATUS_CLOSED) {
             // Cleaning up the callback to avoid memory leaks.
             $this->onMessage = $this->onClose = $this->onError = $this->onBufferFull = $this->onBufferDrain = null;
+            // Remove from worker->connections.
+            if ($this->worker) {
+                unset($this->worker->connections[$this->_id]);
+            }
+            unset(static::$connections[$this->_id]);
         }
     }
 
@@ -911,8 +972,7 @@ class TcpConnection extends ConnectionInterface
             }
 
             if(0 === self::$statistics['connection_count']) {
-                Worker::$globalEvent->destroy();
-                exit(0);
+                Worker::stopAll();
             }
         }
     }

+ 15 - 5
Events/Event.php

@@ -57,7 +57,12 @@ class Event implements EventInterface
      */
     public function __construct()
     {
-        $this->_eventBase = new \EventBase();
+        if (class_exists('\\\\EventBase', false)) {
+            $class_name = '\\\\EventBase';
+        } else {
+            $class_name = '\EventBase';
+        }
+        $this->_eventBase = new $class_name();
     }
    
     /**
@@ -65,11 +70,16 @@ class Event implements EventInterface
      */
     public function add($fd, $flag, $func, $args=array())
     {
+        if (class_exists('\\\\Event', false)) {
+            $class_name = '\\\\Event';
+        } else {
+            $class_name = '\Event';
+        }
         switch ($flag) {
             case self::EV_SIGNAL:
 
                 $fd_key = (int)$fd;
-                $event = \Event::signal($this->_eventBase, $fd, $func);
+                $event = $class_name::signal($this->_eventBase, $fd, $func);
                 if (!$event||!$event->add()) {
                     return false;
                 }
@@ -80,7 +90,7 @@ class Event implements EventInterface
             case self::EV_TIMER_ONCE:
 
                 $param = array($func, (array)$args, $flag, $fd, self::$_timerId);
-                $event = new \Event($this->_eventBase, -1, \Event::TIMEOUT|\Event::PERSIST, array($this, "timerCallback"), $param);
+                $event = new $class_name($this->_eventBase, -1, $class_name::TIMEOUT|$class_name::PERSIST, array($this, "timerCallback"), $param);
                 if (!$event||!$event->addTimer($fd)) {
                     return false;
                 }
@@ -89,8 +99,8 @@ class Event implements EventInterface
                 
             default :
                 $fd_key = (int)$fd;
-                $real_flag = $flag === self::EV_READ ? \Event::READ | \Event::PERSIST : \Event::WRITE | \Event::PERSIST;
-                $event = new \Event($this->_eventBase, $fd, $real_flag, $func, $fd);
+                $real_flag = $flag === self::EV_READ ? $class_name::READ | $class_name::PERSIST : $class_name::WRITE | $class_name::PERSIST;
+                $event = new $class_name($this->_eventBase, $fd, $real_flag, $func, $fd);
                 if (!$event||!$event->add()) {
                     return false;
                 }

+ 262 - 0
Events/React/Base.php

@@ -0,0 +1,262 @@
+<?php
+/**
+ * This file is part of workerman.
+ *
+ * Licensed under The MIT License
+ * For full copyright and license information, please see the MIT-LICENSE.txt
+ * Redistributions of files must retain the above copyright notice.
+ *
+ * @author    walkor<walkor@workerman.net>
+ * @copyright walkor<walkor@workerman.net>
+ * @link      http://www.workerman.net/
+ * @license   http://www.opensource.org/licenses/mit-license.php MIT License
+ */
+namespace Workerman\Events\React;
+use Workerman\Events\EventInterface;
+use React\EventLoop\TimerInterface;
+
+/**
+ * Class StreamSelectLoop
+ * @package Workerman\Events\React
+ */
+class Base implements \React\EventLoop\LoopInterface
+{
+    /**
+     * @var array
+     */
+    protected $_timerIdMap = array();
+
+    /**
+     * @var int
+     */
+    protected $_timerIdIndex = 0;
+
+    /**
+     * @var array
+     */
+    protected $_signalHandlerMap = array();
+
+    /**
+     * @var \React\EventLoop\LoopInterface
+     */
+    protected $_eventLoop = null;
+
+    /**
+     * Base constructor.
+     */
+    public function __construct()
+    {
+        $this->_eventLoop = new \React\EventLoop\StreamSelectLoop();
+    }
+
+    /**
+     * Add event listener to event loop.
+     *
+     * @param $fd
+     * @param $flag
+     * @param $func
+     * @param array $args
+     * @return bool
+     */
+    public function add($fd, $flag, $func, $args = array())
+    {
+        $args = (array)$args;
+        switch ($flag) {
+            case EventInterface::EV_READ:
+                return $this->addReadStream($fd, $func);
+            case EventInterface::EV_WRITE:
+                return $this->addWriteStream($fd, $func);
+            case EventInterface::EV_SIGNAL:
+                if (isset($this->_signalHandlerMap[$fd])) {
+                    $this->removeSignal($fd, $this->_signalHandlerMap[$fd]);
+                }
+                $this->_signalHandlerMap[$fd] = $func;
+                return $this->addSignal($fd, $func);
+            case EventInterface::EV_TIMER:
+                $timer_obj = $this->addPeriodicTimer($fd, function() use ($func, $args) {
+                    call_user_func_array($func, $args);
+                });
+                $this->_timerIdMap[++$this->_timerIdIndex] = $timer_obj;
+                return $this->_timerIdIndex;
+            case EventInterface::EV_TIMER_ONCE:
+                $index = ++$this->_timerIdIndex;
+                $timer_obj = $this->addTimer($fd, function() use ($func, $args, $index) {
+                    $this->del($index,EventInterface::EV_TIMER_ONCE);
+                    call_user_func_array($func, $args);
+                });
+                $this->_timerIdMap[$index] = $timer_obj;
+                return $this->_timerIdIndex;
+        }
+        return false;
+    }
+
+    /**
+     * Remove event listener from event loop.
+     *
+     * @param mixed $fd
+     * @param int   $flag
+     * @return bool
+     */
+    public function del($fd, $flag)
+    {
+        switch ($flag) {
+            case EventInterface::EV_READ:
+                return $this->removeReadStream($fd);
+            case EventInterface::EV_WRITE:
+                return $this->removeWriteStream($fd);
+            case EventInterface::EV_SIGNAL:
+                if (!isset($this->_eventLoop[$fd])) {
+                    return false;
+                }
+                $func = $this->_eventLoop[$fd];
+                unset($this->_eventLoop[$fd]);
+                return $this->removeSignal($fd, $func);
+
+            case EventInterface::EV_TIMER:
+            case EventInterface::EV_TIMER_ONCE:
+                if (isset($this->_timerIdMap[$fd])){
+                    $timer_obj = $this->_timerIdMap[$fd];
+                    unset($this->_timerIdMap[$fd]);
+                    $this->cancelTimer($timer_obj);
+                    return true;
+                }
+        }
+        return false;
+    }
+
+
+    /**
+     * Main loop.
+     *
+     * @return void
+     */
+    public function loop()
+    {
+        $this->run();
+    }
+
+
+    /**
+     * Destroy loop.
+     *
+     * @return void
+     */
+    public function destroy()
+    {
+
+    }
+
+    /**
+     * Get timer count.
+     *
+     * @return integer
+     */
+    public function getTimerCount()
+    {
+        return count($this->_timerIdMap);
+    }
+
+    /**
+     * @param resource $stream
+     * @param callable $listener
+     */
+    public function addReadStream($stream, $listener)
+    {
+        return $this->_eventLoop->addReadStream($stream, $listener);
+    }
+
+    /**
+     * @param resource $stream
+     * @param callable $listener
+     */
+    public function addWriteStream($stream, $listener)
+    {
+        return $this->_eventLoop->addWriteStream($stream, $listener);
+    }
+
+    /**
+     * @param resource $stream
+     */
+    public function removeReadStream($stream)
+    {
+        return $this->_eventLoop->removeReadStream($stream);
+    }
+
+    /**
+     * @param resource $stream
+     */
+    public function removeWriteStream($stream)
+    {
+        return $this->_eventLoop->removeWriteStream($stream);
+    }
+
+    /**
+     * @param float|int $interval
+     * @param callable $callback
+     * @return \React\EventLoop\Timer\Timer|TimerInterface
+     */
+    public function addTimer($interval, $callback)
+    {
+        return $this->_eventLoop->addTimer($interval, $callback);
+    }
+
+    /**
+     * @param float|int $interval
+     * @param callable $callback
+     * @return \React\EventLoop\Timer\Timer|TimerInterface
+     */
+    public function addPeriodicTimer($interval, $callback)
+    {
+        return $this->_eventLoop->addPeriodicTimer($interval, $callback);
+    }
+
+    /**
+     * @param TimerInterface $timer
+     */
+    public function cancelTimer(TimerInterface $timer)
+    {
+        return $this->_eventLoop->cancelTimer($timer);
+    }
+
+    /**
+     * @param callable $listener
+     */
+    public function futureTick($listener)
+    {
+        return $this->_eventLoop->futureTick($listener);
+    }
+
+    /**
+     * @param int $signal
+     * @param callable $listener
+     */
+    public function addSignal($signal, $listener)
+    {
+        return $this->_eventLoop->addSignal($signal, $listener);
+    }
+
+    /**
+     * @param int $signal
+     * @param callable $listener
+     */
+    public function removeSignal($signal, $listener)
+    {
+        return $this->_eventLoop->removeSignal($signal, $listener);
+    }
+
+    /**
+     * Run.
+     */
+    public function run()
+    {
+        return $this->_eventLoop->run();
+    }
+
+    /**
+     * Stop.
+     */
+    public function stop()
+    {
+        return $this->_eventLoop->stop();
+    }
+}

+ 2 - 161
Events/React/ExtEventLoop.php

@@ -12,175 +12,16 @@
  * @license   http://www.opensource.org/licenses/mit-license.php MIT License
  */
 namespace Workerman\Events\React;
-use Workerman\Events\EventInterface;
 
 /**
  * Class ExtEventLoop
  * @package Workerman\Events\React
  */
-class ExtEventLoop extends \React\EventLoop\ExtEventLoop
+class ExtEventLoop extends Base
 {
-    /**
-     * Event base.
-     *
-     * @var EventBase
-     */
-    protected $_eventBase = null;
 
-    /**
-     * All signal Event instances.
-     *
-     * @var array
-     */
-    protected $_signalEvents = array();
-
-    /**
-     * @var array
-     */
-    protected $_timerIdMap = array();
-
-    /**
-     * @var int
-     */
-    protected $_timerIdIndex = 0;
-
-    /**
-     * Add event listener to event loop.
-     *
-     * @param $fd
-     * @param $flag
-     * @param $func
-     * @param array $args
-     * @return bool
-     */
-    public function add($fd, $flag, $func, $args = array())
-    {
-        $args = (array)$args;
-        switch ($flag) {
-            case EventInterface::EV_READ:
-                return $this->addReadStream($fd, $func);
-            case EventInterface::EV_WRITE:
-                return $this->addWriteStream($fd, $func);
-            case EventInterface::EV_SIGNAL:
-                return $this->addSignal($fd, $func);
-            case EventInterface::EV_TIMER:
-                $timer_id = ++$this->_timerIdIndex;
-                $timer_obj = $this->addPeriodicTimer($fd, function() use ($func, $args) {
-                    call_user_func_array($func, $args);
-                });
-                $this->_timerIdMap[$timer_id] = $timer_obj;
-                return $timer_id;
-            case EventInterface::EV_TIMER_ONCE:
-                $timer_id = ++$this->_timerIdIndex;
-                $timer_obj = $this->addTimer($fd, function() use ($func, $args, $timer_id) {
-                    unset($this->_timerIdMap[$timer_id]);
-                    call_user_func_array($func, $args);
-                });
-                $this->_timerIdMap[$timer_id] = $timer_obj;
-                return $timer_id;
-        }
-        return false;
-    }
-
-    /**
-     * Remove event listener from event loop.
-     *
-     * @param mixed $fd
-     * @param int   $flag
-     * @return bool
-     */
-    public function del($fd, $flag)
-    {
-        switch ($flag) {
-            case EventInterface::EV_READ:
-                return $this->removeReadStream($fd);
-            case EventInterface::EV_WRITE:
-                return $this->removeWriteStream($fd);
-            case EventInterface::EV_SIGNAL:
-                return $this->removeSignal($fd);
-            case EventInterface::EV_TIMER:
-            case EventInterface::EV_TIMER_ONCE;
-                if (isset($this->_timerIdMap[$fd])){
-                    $timer_obj = $this->_timerIdMap[$fd];
-                    unset($this->_timerIdMap[$fd]);
-                    $this->cancelTimer($timer_obj);
-                    return true;
-                }
-        }
-        return false;
-    }
-
-
-    /**
-     * Main loop.
-     *
-     * @return void
-     */
-    public function loop()
-    {
-        $this->run();
-    }
-
-    /**
-     * Construct
-     */
     public function __construct()
     {
-        parent::__construct();
-        $class = new \ReflectionClass('\React\EventLoop\ExtEventLoop');
-        $property = $class->getProperty('eventBase');
-        $property->setAccessible(true);
-        $this->_eventBase = $property->getValue($this);
-    }
-
-    /**
-     * Add signal handler.
-     *
-     * @param $signal
-     * @param $callback
-     * @return bool
-     */
-    public function addSignal($signal, $callback)
-    {
-        $event = \Event::signal($this->_eventBase, $signal, $callback);
-        if (!$event||!$event->add()) {
-            return false;
-        }
-        $this->_signalEvents[$signal] = $event;
-    }
-
-    /**
-     * Remove signal handler.
-     *
-     * @param $signal
-     */
-    public function removeSignal($signal)
-    {
-        if (isset($this->_signalEvents[$signal])) {
-            $this->_signalEvents[$signal]->del();
-            unset($this->_signalEvents[$signal]);
-        }
-    }
-
-    /**
-     * Destroy loop.
-     *
-     * @return void
-     */
-    public function destroy()
-    {
-        foreach ($this->_signalEvents as $event) {
-            $event->del();
-        }
-    }
-
-    /**
-     * Get timer count.
-     *
-     * @return integer
-     */
-    public function getTimerCount()
-    {
-        return count($this->_timerIdMap);
+        $this->_eventLoop = new \React\EventLoop\ExtEventLoop();
     }
 }

+ 27 - 0
Events/React/ExtLibEventLoop.php

@@ -0,0 +1,27 @@
+<?php
+/**
+ * This file is part of workerman.
+ *
+ * Licensed under The MIT License
+ * For full copyright and license information, please see the MIT-LICENSE.txt
+ * Redistributions of files must retain the above copyright notice.
+ *
+ * @author    walkor<walkor@workerman.net>
+ * @copyright walkor<walkor@workerman.net>
+ * @link      http://www.workerman.net/
+ * @license   http://www.opensource.org/licenses/mit-license.php MIT License
+ */
+namespace Workerman\Events\React;
+use Workerman\Events\EventInterface;
+
+/**
+ * Class ExtLibEventLoop
+ * @package Workerman\Events\React
+ */
+class ExtLibEventLoop extends Base
+{
+    public function __construct()
+    {
+        $this->_eventLoop = new \React\EventLoop\ExtLibeventLoop();
+    }
+}

+ 0 - 187
Events/React/LibEventLoop.php

@@ -1,187 +0,0 @@
-<?php
-/**
- * This file is part of workerman.
- *
- * Licensed under The MIT License
- * For full copyright and license information, please see the MIT-LICENSE.txt
- * Redistributions of files must retain the above copyright notice.
- *
- * @author    walkor<walkor@workerman.net>
- * @copyright walkor<walkor@workerman.net>
- * @link      http://www.workerman.net/
- * @license   http://www.opensource.org/licenses/mit-license.php MIT License
- */
-namespace Workerman\Events\React;
-use Workerman\Events\EventInterface;
-
-/**
- * Class LibEventLoop
- * @package Workerman\Events\React
- */
-class LibEventLoop extends \React\EventLoop\LibEventLoop
-{
-    /**
-     * Event base.
-     *
-     * @var event_base resource
-     */
-    protected $_eventBase = null;
-
-    /**
-     * All signal Event instances.
-     *
-     * @var array
-     */
-    protected $_signalEvents = array();
-
-    /**
-     * @var array
-     */
-    protected $_timerIdMap = array();
-
-    /**
-     * @var int
-     */
-    protected $_timerIdIndex = 0;
-
-    /**
-     * Add event listener to event loop.
-     *
-     * @param $fd
-     * @param $flag
-     * @param $func
-     * @param array $args
-     * @return bool
-     */
-    public function add($fd, $flag, $func, $args = array())
-    {
-        $args = (array)$args;
-        switch ($flag) {
-            case EventInterface::EV_READ:
-                return $this->addReadStream($fd, $func);
-            case EventInterface::EV_WRITE:
-                return $this->addWriteStream($fd, $func);
-            case EventInterface::EV_SIGNAL:
-                return $this->addSignal($fd, $func);
-            case EventInterface::EV_TIMER:
-                $timer_id = ++$this->_timerIdIndex;
-                $timer_obj = $this->addPeriodicTimer($fd, function() use ($func, $args) {
-                    call_user_func_array($func, $args);
-                });
-                $this->_timerIdMap[$timer_id] = $timer_obj;
-                return $timer_id;
-            case EventInterface::EV_TIMER_ONCE:
-                $timer_id = ++$this->_timerIdIndex;
-                $timer_obj = $this->addTimer($fd, function() use ($func, $args, $timer_id) {
-                    unset($this->_timerIdMap[$timer_id]);
-                    call_user_func_array($func, $args);
-                });
-                $this->_timerIdMap[$timer_id] = $timer_obj;
-                return $timer_id;
-        }
-        return false;
-    }
-
-    /**
-     * Remove event listener from event loop.
-     *
-     * @param mixed $fd
-     * @param int   $flag
-     * @return bool
-     */
-    public function del($fd, $flag)
-    {
-        switch ($flag) {
-            case EventInterface::EV_READ:
-                return $this->removeReadStream($fd);
-            case EventInterface::EV_WRITE:
-                return $this->removeWriteStream($fd);
-            case EventInterface::EV_SIGNAL:
-                return $this->removeSignal($fd);
-            case EventInterface::EV_TIMER:
-            case EventInterface::EV_TIMER_ONCE;
-                if (isset($this->_timerIdMap[$fd])){
-                    $timer_obj = $this->_timerIdMap[$fd];
-                    unset($this->_timerIdMap[$fd]);
-                    $this->cancelTimer($timer_obj);
-                    return true;
-                }
-        }
-        return false;
-    }
-
-
-    /**
-     * Main loop.
-     *
-     * @return void
-     */
-    public function loop()
-    {
-        $this->run();
-    }
-
-    /**
-     * Construct.
-     */
-    public function __construct()
-    {
-        parent::__construct();
-        $class = new \ReflectionClass('\React\EventLoop\LibEventLoop');
-        $property = $class->getProperty('eventBase');
-        $property->setAccessible(true);
-        $this->_eventBase = $property->getValue($this);
-    }
-
-    /**
-     * Add signal handler.
-     *
-     * @param $signal
-     * @param $callback
-     * @return bool
-     */
-    public function addSignal($signal, $callback)
-    {
-        $event = event_new();
-        $this->_signalEvents[$signal] = $event;
-        event_set($event, $signal, EV_SIGNAL | EV_PERSIST, $callback);
-        event_base_set($event, $this->_eventBase);
-        event_add($event);
-    }
-
-    /**
-     * Remove signal handler.
-     *
-     * @param $signal
-     */
-    public function removeSignal($signal)
-    {
-        if (isset($this->_signalEvents[$signal])) {
-            $event = $this->_signalEvents[$signal];
-            event_del($event);
-            unset($this->_signalEvents[$signal]);
-        }
-    }
-
-    /**
-     * Destroy loop.
-     *
-     * @return void
-     */
-    public function destroy()
-    {
-        foreach ($this->_signalEvents as $event) {
-            event_del($event);
-        }
-    }
-
-    /**
-     * Get timer count.
-     *
-     * @return integer
-     */
-    public function getTimerCount()
-    {
-        return count($this->_timerIdMap);
-    }
-}

+ 3 - 163
Events/React/StreamSelectLoop.php

@@ -12,175 +12,15 @@
  * @license   http://www.opensource.org/licenses/mit-license.php MIT License
  */
 namespace Workerman\Events\React;
-use Workerman\Events\EventInterface;
 
 /**
  * Class StreamSelectLoop
  * @package Workerman\Events\React
  */
-class StreamSelectLoop extends \React\EventLoop\StreamSelectLoop
+class StreamSelectLoop extends Base
 {
-    /**
-     * @var array
-     */
-    protected $_timerIdMap = array();
-
-    /**
-     * @var int
-     */
-    protected $_timerIdIndex = 0;
-
-    /**
-     * Add event listener to event loop.
-     *
-     * @param $fd
-     * @param $flag
-     * @param $func
-     * @param array $args
-     * @return bool
-     */
-    public function add($fd, $flag, $func, $args = array())
-    {
-        $args = (array)$args;
-        switch ($flag) {
-            case EventInterface::EV_READ:
-                return $this->addReadStream($fd, $func);
-            case EventInterface::EV_WRITE:
-                return $this->addWriteStream($fd, $func);
-            case EventInterface::EV_SIGNAL:
-                return $this->addSignal($fd, $func);
-            case EventInterface::EV_TIMER:
-                $timer_obj = $this->addPeriodicTimer($fd, function() use ($func, $args) {
-                    call_user_func_array($func, $args);
-                });
-                $this->_timerIdMap[++$this->_timerIdIndex] = $timer_obj;
-                return $this->_timerIdIndex;
-            case EventInterface::EV_TIMER_ONCE:
-                $index = ++$this->_timerIdIndex;
-                $timer_obj = $this->addTimer($fd, function() use ($func, $args, $index) {
-                    $this->del($index,EventInterface::EV_TIMER_ONCE);
-                    call_user_func_array($func, $args);
-                });
-                $this->_timerIdMap[$index] = $timer_obj;
-                return $this->_timerIdIndex;
-        }
-        return false;
-    }
-
-    /**
-     * Remove event listener from event loop.
-     *
-     * @param mixed $fd
-     * @param int   $flag
-     * @return bool
-     */
-    public function del($fd, $flag)
-    {
-        switch ($flag) {
-            case EventInterface::EV_READ:
-                return $this->removeReadStream($fd);
-            case EventInterface::EV_WRITE:
-                return $this->removeWriteStream($fd);
-            case EventInterface::EV_SIGNAL:
-                return $this->removeSignal($fd);
-            case EventInterface::EV_TIMER:
-            case EventInterface::EV_TIMER_ONCE;
-                if (isset($this->_timerIdMap[$fd])){
-                    $timer_obj = $this->_timerIdMap[$fd];
-                    unset($this->_timerIdMap[$fd]);
-                    $this->cancelTimer($timer_obj);
-                    return true;
-                }
-        }
-        return false;
-    }
-
-
-    /**
-     * Main loop.
-     *
-     * @return void
-     */
-    public function loop()
-    {
-        $this->run();
-    }
-
-    /**
-     * Add signal handler.
-     *
-     * @param $signal
-     * @param $callback
-     * @return bool
-     */
-    public function addSignal($signal, $callback)
-    {
-        if(DIRECTORY_SEPARATOR === '/') {
-            pcntl_signal($signal, $callback);
-        }
-    }
-
-    /**
-     * Remove signal handler.
-     *
-     * @param $signal
-     */
-    public function removeSignal($signal)
-    {
-        if(DIRECTORY_SEPARATOR === '/') {
-            pcntl_signal($signal, SIG_IGN);
-        }
-    }
-
-    /**
-     * Emulate a stream_select() implementation that does not break when passed
-     * empty stream arrays.
-     *
-     * @param array        &$read   An array of read streams to select upon.
-     * @param array        &$write  An array of write streams to select upon.
-     * @param integer|null $timeout Activity timeout in microseconds, or null to wait forever.
-     *
-     * @return integer|false The total number of streams that are ready for read/write.
-     * Can return false if stream_select() is interrupted by a signal.
-     */
-    protected function streamSelect(array &$read, array &$write, $timeout)
-    {
-        if ($read || $write) {
-            $except = null;
-            // Calls signal handlers for pending signals
-            if(DIRECTORY_SEPARATOR === '/') {
-                pcntl_signal_dispatch();
-            }
-            // suppress warnings that occur, when stream_select is interrupted by a signal
-            return @stream_select($read, $write, $except, $timeout === null ? null : 0, $timeout);
-        }
-
-        // Calls signal handlers for pending signals
-        if(DIRECTORY_SEPARATOR === '/') {
-            pcntl_signal_dispatch();
-        }
-        $timeout && usleep($timeout);
-
-        return 0;
-    }
-
-    /**
-     * Destroy loop.
-     *
-     * @return void
-     */
-    public function destroy()
-    {
-
-    }
-
-    /**
-     * Get timer count.
-     *
-     * @return integer
-     */
-    public function getTimerCount()
+    public function __construct()
     {
-        return count($this->_timerIdMap);
+        $this->_eventLoop = new \React\EventLoop\StreamSelectLoop();
     }
 }

+ 15 - 6
Events/Select.php

@@ -114,14 +114,20 @@ class Select implements EventInterface
     {
         switch ($flag) {
             case self::EV_READ:
-                $fd_key                           = (int)$fd;
-                $this->_allEvents[$fd_key][$flag] = array($func, $fd);
-                $this->_readFds[$fd_key]          = $fd;
-                break;
             case self::EV_WRITE:
+                $count = $flag === self::EV_READ ? count($this->_readFds) : count($this->_writeFds);
+                if ($count >= 1024) {
+                    echo "Warning: system call select exceeded the maximum number of connections 1024, please install event/libevent extension for more connections.\n";
+                } else if (DIRECTORY_SEPARATOR !== '/' && $count >= 256) {
+                    echo "Warning: system call select exceeded the maximum number of connections 256.\n";
+                }
                 $fd_key                           = (int)$fd;
                 $this->_allEvents[$fd_key][$flag] = array($func, $fd);
-                $this->_writeFds[$fd_key]         = $fd;
+                if ($flag === self::EV_READ) {
+                    $this->_readFds[$fd_key] = $fd;
+                } else {
+                    $this->_writeFds[$fd_key] = $fd;
+                }
                 break;
             case self::EV_EXCEPT:
                 $fd_key = (int)$fd;
@@ -268,7 +274,10 @@ class Select implements EventInterface
             $except = $this->_exceptFds;
 
             // Waiting read/write/signal/timeout events.
-            $ret = @stream_select($read, $write, $except, 0, $this->_selectTimeout);
+            set_error_handler(function(){});
+            $ret = stream_select($read, $write, $except, 0, $this->_selectTimeout);
+            restore_error_handler();
+
 
             if (!$this->_scheduler->isEmpty()) {
                 $this->tick();

+ 216 - 0
Events/Swoole.php

@@ -0,0 +1,216 @@
+<?php
+/**
+ * This file is part of workerman.
+ *
+ * Licensed under The MIT License
+ * For full copyright and license information, please see the MIT-LICENSE.txt
+ * Redistributions of files must retain the above copyright notice.
+ *
+ * @author    Ares<aresrr#qq.com>
+ * @link      http://www.workerman.net/
+ * @link      https://github.com/ares333/Workerman
+ * @license   http://www.opensource.org/licenses/mit-license.php MIT License
+ */
+namespace Workerman\Events;
+
+use Swoole\Event;
+use Swoole\Timer;
+
+class Swoole implements EventInterface
+{
+
+    protected $_timer = array();
+
+    protected $_timerOnceMap = array();
+
+    protected $_fd = array();
+
+    // milisecond
+    public static $signalDispatchInterval = 200;
+
+    protected $_hasSignal = false;
+
+    /**
+     *
+     * {@inheritdoc}
+     *
+     * @see \Workerman\Events\EventInterface::add()
+     */
+    public function add($fd, $flag, $func, $args = null)
+    {
+        if (! isset($args)) {
+            $args = array();
+        }
+        switch ($flag) {
+            case self::EV_SIGNAL:
+                $res = pcntl_signal($fd, $func, false);
+                if (! $this->_hasSignal && $res) {
+                    Timer::tick(static::$signalDispatchInterval,
+                        function () {
+                            pcntl_signal_dispatch();
+                        });
+                    $this->_hasSignal = true;
+                }
+                return $res;
+            case self::EV_TIMER:
+            case self::EV_TIMER_ONCE:
+                $method = self::EV_TIMER == $flag ? 'tick' : 'after';
+                $mapId = count($this->_timerOnceMap);
+                $timer_id = Timer::$method($fd * 1000,
+                    function ($timer_id = null) use ($func, $args, $mapId) {
+                        call_user_func_array($func, $args);
+                        // EV_TIMER_ONCE
+                        if (! isset($timer_id)) {
+                            // may be deleted in $func
+                            if (array_key_exists($mapId, $this->_timerOnceMap)) {
+                                $timer_id = $this->_timerOnceMap[$mapId];
+                                unset($this->_timer[$timer_id],
+                                    $this->_timerOnceMap[$mapId]);
+                            }
+                        }
+                    });
+                if ($flag == self::EV_TIMER_ONCE) {
+                    $this->_timerOnceMap[$mapId] = $timer_id;
+                    $this->_timer[$timer_id] = $mapId;
+                } else {
+                    $this->_timer[$timer_id] = null;
+                }
+                return $timer_id;
+            case self::EV_READ:
+            case self::EV_WRITE:
+                $fd_key = (int) $fd;
+                if (! isset($this->_fd[$fd_key])) {
+                    if ($flag == self::EV_READ) {
+                        $res = Event::add($fd, $func, null, SWOOLE_EVENT_READ);
+                        $fd_type = SWOOLE_EVENT_READ;
+                    } else {
+                        $res = Event::add($fd, null, $func, SWOOLE_EVENT_WRITE);
+                        $fd_type = SWOOLE_EVENT_WRITE;
+                    }
+                    if ($res) {
+                        $this->_fd[$fd_key] = $fd_type;
+                    }
+                } else {
+                    $fd_val = $this->_fd[$fd_key];
+                    $res = true;
+                    if ($flag == self::EV_READ) {
+                        if (($fd_val & SWOOLE_EVENT_READ) != SWOOLE_EVENT_READ) {
+                            $res = Event::set($fd, $func, null,
+                                SWOOLE_EVENT_READ | SWOOLE_EVENT_WRITE);
+                            $this->_fd[$fd_key] |= SWOOLE_EVENT_READ;
+                        }
+                    } else {
+                        if (($fd_val & SWOOLE_EVENT_WRITE) != SWOOLE_EVENT_WRITE) {
+                            $res = Event::set($fd, null, $func,
+                                SWOOLE_EVENT_READ | SWOOLE_EVENT_WRITE);
+                            $this->_fd[$fd_key] |= SWOOLE_EVENT_WRITE;
+                        }
+                    }
+                }
+                return $res;
+        }
+    }
+
+    /**
+     *
+     * {@inheritdoc}
+     *
+     * @see \Workerman\Events\EventInterface::del()
+     */
+    public function del($fd, $flag)
+    {
+        switch ($flag) {
+            case self::EV_SIGNAL:
+                return pcntl_signal($fd, SIG_IGN, false);
+            case self::EV_TIMER:
+            case self::EV_TIMER_ONCE:
+                // already remove in EV_TIMER_ONCE callback.
+                if (! array_key_exists($fd, $this->_timer)) {
+                    return true;
+                }
+                $res = Timer::clear($fd);
+                if ($res) {
+                    $mapId = $this->_timer[$fd];
+                    if (isset($mapId)) {
+                        unset($this->_timerOnceMap[$mapId]);
+                    }
+                    unset($this->_timer[$fd]);
+                }
+                return $res;
+            case self::EV_READ:
+            case self::EV_WRITE:
+                $fd_key = (int) $fd;
+                if (isset($this->_fd[$fd_key])) {
+                    $fd_val = $this->_fd[$fd_key];
+                    if ($flag == self::EV_READ) {
+                        $flag_remove = ~ SWOOLE_EVENT_READ;
+                    } else {
+                        $flag_remove = ~ SWOOLE_EVENT_WRITE;
+                    }
+                    $fd_val &= $flag_remove;
+                    if (0 === $fd_val) {
+                        $res = Event::del($fd);
+                        if ($res) {
+                            unset($this->_fd[$fd_key]);
+                        }
+                    } else {
+                        $res = Event::set($fd, null, null, $fd_val);
+                        if ($res) {
+                            $this->_fd[$fd_key] = $fd_val;
+                        }
+                    }
+                } else {
+                    $res = true;
+                }
+                return $res;
+        }
+    }
+
+    /**
+     *
+     * {@inheritdoc}
+     *
+     * @see \Workerman\Events\EventInterface::clearAllTimer()
+     */
+    public function clearAllTimer()
+    {
+        foreach (array_keys($this->_timer) as $v) {
+            Timer::clear($v);
+        }
+        $this->_timer = array();
+        $this->_timerOnceMap = array();
+    }
+
+    /**
+     *
+     * {@inheritdoc}
+     *
+     * @see \Workerman\Events\EventInterface::loop()
+     */
+    public function loop()
+    {
+        Event::wait();
+    }
+
+    /**
+     *
+     * {@inheritdoc}
+     *
+     * @see \Workerman\Events\EventInterface::destroy()
+     */
+    public function destroy()
+    {
+        //Event::exit();
+    }
+
+    /**
+     *
+     * {@inheritdoc}
+     *
+     * @see \Workerman\Events\EventInterface::getTimerCount()
+     */
+    public function getTimerCount()
+    {
+        return count($this->_timer);
+    }
+}

+ 4 - 0
Lib/Constants.php

@@ -31,6 +31,10 @@ define('WORKERMAN_CONNECT_FAIL', 1);
 // For onError callback.
 define('WORKERMAN_SEND_FAIL', 2);
 
+// Define OS Type
+define('OS_TYPE_LINUX', 'linux');
+define('OS_TYPE_WINDOWS', 'windows');
+
 // Compatible with php7
 if(!class_exists('Error'))
 {

+ 4 - 3
Lib/Timer.php

@@ -14,6 +14,7 @@
 namespace Workerman\Lib;
 
 use Workerman\Events\EventInterface;
+use Workerman\Worker;
 use Exception;
 
 /**
@@ -85,7 +86,7 @@ class Timer
     public static function add($time_interval, $func, $args = array(), $persistent = true)
     {
         if ($time_interval <= 0) {
-            echo new Exception("bad time_interval");
+            Worker::safeEcho(new Exception("bad time_interval"));
             return false;
         }
 
@@ -95,7 +96,7 @@ class Timer
         }
 
         if (!is_callable($func)) {
-            echo new Exception("not callable");
+            Worker::safeEcho(new Exception("not callable"));
             return false;
         }
 
@@ -136,7 +137,7 @@ class Timer
                     try {
                         call_user_func_array($task_func, $task_args);
                     } catch (\Exception $e) {
-                        echo $e;
+                        Worker::safeEcho($e);
                     }
                     if ($persistent) {
                         self::add($time_interval, $task_func, $task_args);

+ 128 - 15
Protocols/Http.php

@@ -38,7 +38,7 @@ class Http
     {
         if (!strpos($recv_buffer, "\r\n\r\n")) {
             // Judge whether the package length exceeds the limit.
-            if (strlen($recv_buffer) >= TcpConnection::$maxPackageSize) {
+            if (strlen($recv_buffer) >= $connection::$maxPackageSize) {
                 $connection->close();
                 return 0;
             }
@@ -73,7 +73,7 @@ class Http
             $content_length = isset($match[1]) ? $match[1] : 0;
             return $content_length + strlen($header) + 4;
         }
-        return 0;
+        return $method === 'DELETE' ? strlen($header) + 4 : 0;
     }
 
     /**
@@ -106,6 +106,7 @@ class Http
             'HTTP_ACCEPT_ENCODING' => '',
             'HTTP_COOKIE'          => '',
             'HTTP_CONNECTION'      => '',
+            'CONTENT_TYPE'         => '',
             'REMOTE_ADDR'          => '',
             'REMOTE_PORT'          => '0',
             'REQUEST_TIME'         => time()
@@ -158,9 +159,17 @@ class Http
                 case 'CONTENT_LENGTH':
                     $_SERVER['CONTENT_LENGTH'] = $value;
                     break;
+                case 'UPGRADE':
+					if($value=='websocket'){
+						$connection->protocol = "\\Workerman\\Protocols\\Websocket";
+						return \Workerman\Protocols\Websocket::input($recv_buffer,$connection);
+					}
+                    break;
             }
         }
-
+		if(isset($_SERVER['HTTP_ACCEPT_ENCODING']) && strpos($_SERVER['HTTP_ACCEPT_ENCODING'], 'gzip') !== FALSE){
+			HttpCache::$gzip = true;
+		}
         // Parse $_POST.
         if ($_SERVER['REQUEST_METHOD'] === 'POST') {
             if (isset($_SERVER['CONTENT_TYPE'])) {
@@ -178,6 +187,17 @@ class Http
             }
         }
 
+        // Parse other HTTP action parameters
+        if ($_SERVER['REQUEST_METHOD'] != 'GET' && $_SERVER['REQUEST_METHOD'] != "POST") {
+            $data = array();
+            if ($_SERVER['CONTENT_TYPE'] === "application/x-www-form-urlencoded") {
+                parse_str($http_body, $data);
+            } elseif ($_SERVER['CONTENT_TYPE'] === "application/json") {
+                $data = json_decode($http_body, true);
+            }
+            $_REQUEST = array_merge($_REQUEST, $data);
+        }
+
         // HTTP_RAW_REQUEST_DATA HTTP_RAW_POST_DATA
         $GLOBALS['HTTP_RAW_REQUEST_DATA'] = $GLOBALS['HTTP_RAW_POST_DATA'] = $http_body;
 
@@ -190,8 +210,13 @@ class Http
             $_SERVER['QUERY_STRING'] = '';
         }
 
-        // REQUEST
-        $_REQUEST = array_merge($_GET, $_POST);
+        if (is_array($_POST)) {
+            // REQUEST
+            $_REQUEST = array_merge($_GET, $_POST, $_REQUEST);
+        } else {
+            // REQUEST
+            $_REQUEST = array_merge($_GET, $_REQUEST);
+        }
 
         // REMOTE_ADDR REMOTE_PORT
         $_SERVER['REMOTE_ADDR'] = $connection->getRemoteIp();
@@ -232,7 +257,10 @@ class Http
                 $header .= $item . "\r\n";
             }
         }
-
+		if(HttpCache::$gzip && isset($connection->gzip) && $connection->gzip){
+			$header .= "Content-Encoding: gzip\r\n";
+			$content = gzencode($content,$connection->gzip);
+		}
         // header
         $header .= "Server: workerman/" . Worker::VERSION . "\r\nContent-Length: " . strlen($content) . "\r\n\r\n";
 
@@ -331,6 +359,84 @@ class Http
     }
 
     /**
+     * sessionCreateId
+     *
+     * @return string
+     */
+    public static function sessionCreateId()
+    {
+        mt_srand();
+        return bin2hex(pack('d', microtime(true)) . pack('N',mt_rand(0, 2147483647)));
+    }
+
+    /**
+     * sessionId
+     *
+     * @param string  $id
+     *
+     * @return string|null
+     */
+    public static function sessionId($id = null)
+    {
+        if (PHP_SAPI != 'cli') {
+            return $id ? session_id($id) : session_id();
+        }
+        if (static::sessionStarted() && HttpCache::$instance->sessionFile) {
+            return str_replace('sess_', '', basename(HttpCache::$instance->sessionFile));
+        }
+        return '';
+    }
+
+    /**
+     * sessionName
+     *
+     * @param string  $name
+     *
+     * @return string
+     */
+    public static function sessionName($name = null)
+    {
+        if (PHP_SAPI != 'cli') {
+            return $name ? session_name($name) : session_name();
+        }
+        $session_name = HttpCache::$sessionName;
+        if ($name && ! static::sessionStarted()) {
+            HttpCache::$sessionName = $name;
+        }
+        return $session_name;
+    }
+
+    /**
+     * sessionSavePath
+     *
+     * @param string  $path
+     *
+     * @return void
+     */
+    public static function sessionSavePath($path = null)
+    {
+        if (PHP_SAPI != 'cli') {
+            return $path ? session_save_path($path) : session_save_path();
+        }
+        if ($path && is_dir($path) && is_writable($path) && !static::sessionStarted()) {
+            HttpCache::$sessionPath = $path;
+        }
+        return HttpCache::$sessionPath;
+    }
+
+    /**
+     * sessionStarted
+     *
+     * @return bool
+     */
+    public static function sessionStarted()
+    {
+        if (!HttpCache::$instance) return false;
+
+        return HttpCache::$instance->sessionStarted;
+    }
+
+    /**
      * sessionStart
      *
      * @return bool
@@ -344,18 +450,18 @@ class Http
         self::tryGcSessions();
 
         if (HttpCache::$instance->sessionStarted) {
-            echo "already sessionStarted\n";
+            Worker::safeEcho("already sessionStarted\n");
             return true;
         }
         HttpCache::$instance->sessionStarted = true;
         // Generate a SID.
-        if (!isset($_COOKIE[HttpCache::$sessionName]) || !is_file(HttpCache::$sessionPath . '/ses' . $_COOKIE[HttpCache::$sessionName])) {
-            $file_name = tempnam(HttpCache::$sessionPath, 'ses');
-            if (!$file_name) {
-                return false;
+        if (!isset($_COOKIE[HttpCache::$sessionName]) || !is_file(HttpCache::$sessionPath . '/sess_' . $_COOKIE[HttpCache::$sessionName])) {
+            // Create a unique session_id and the associated file name.
+            while (true) {
+                $session_id = static::sessionCreateId();
+                if (!is_file($file_name = HttpCache::$sessionPath . '/sess_' . $session_id)) break;
             }
             HttpCache::$instance->sessionFile = $file_name;
-            $session_id                       = substr(basename($file_name), strlen('ses'));
             return self::setcookie(
                 HttpCache::$sessionName
                 , $session_id
@@ -367,7 +473,7 @@ class Http
             );
         }
         if (!HttpCache::$instance->sessionFile) {
-            HttpCache::$instance->sessionFile = HttpCache::$sessionPath . '/ses' . $_COOKIE[HttpCache::$sessionName];
+            HttpCache::$instance->sessionFile = HttpCache::$sessionPath . '/sess_' . $_COOKIE[HttpCache::$sessionName];
         }
         // Read session from session file.
         if (HttpCache::$instance->sessionFile) {
@@ -555,6 +661,7 @@ class HttpCache
      */
     public static $instance             = null;
     public static $header               = array();
+    public static $gzip                 = false;
     public static $sessionPath          = '';
     public static $sessionName          = '';
     public static $sessionGcProbability = 1;
@@ -565,8 +672,14 @@ class HttpCache
 
     public static function init()
     {
-        self::$sessionName = ini_get('session.name');
-        self::$sessionPath = @session_save_path();
+        if (!self::$sessionName) {
+            self::$sessionName = ini_get('session.name');
+        }
+
+        if (!self::$sessionPath) {
+            self::$sessionPath = @session_save_path();
+        }
+
         if (!self::$sessionPath || strpos(self::$sessionPath, 'tcp://') === 0) {
             self::$sessionPath = sys_get_temp_dir();
         }

+ 1 - 1
Protocols/Text.php

@@ -30,7 +30,7 @@ class Text
     public static function input($buffer, TcpConnection $connection)
     {
         // Judge whether the package length exceeds the limit.
-        if (strlen($buffer) >= TcpConnection::$maxPackageSize) {
+        if (strlen($buffer) >= $connection::$maxPackageSize) {
             $connection->close();
             return 0;
         }

+ 95 - 64
Protocols/Websocket.php

@@ -48,7 +48,7 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
         // Receive length.
         $recv_len = strlen($buffer);
         // We need more data.
-        if ($recv_len < 2) {
+        if ($recv_len < 6) {
             return 0;
         }
 
@@ -70,6 +70,13 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
             $data_len     = $secondbyte & 127;
             $is_fin_frame = $firstbyte >> 7;
             $masked       = $secondbyte >> 7;
+
+            if (!$masked) {
+                Worker::safeEcho("frame not masked so close the connection\n");
+                $connection->close();
+                return 0;
+            }
+
             $opcode       = $firstbyte & 0xf;
             switch ($opcode) {
                 case 0x0:
@@ -95,64 +102,18 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
                         }
                     } // Close connection.
                     else {
-                        $connection->close();
+                        $connection->close("\x88\x02\x27\x10", true);
                     }
                     return 0;
                 // Ping package.
                 case 0x9:
-                    // Try to emit onWebSocketPing callback.
-                    if (isset($connection->onWebSocketPing) || isset($connection->worker->onWebSocketPing)) {
-                        try {
-                            call_user_func(isset($connection->onWebSocketPing)?$connection->onWebSocketPing:$connection->worker->onWebSocketPing, $connection);
-                        } catch (\Exception $e) {
-                            Worker::log($e);
-                            exit(250);
-                        } catch (\Error $e) {
-                            Worker::log($e);
-                            exit(250);
-                        }
-                    } // Send pong package to client.
-                    else {
-                        $connection->send(pack('H*', '8a00'), true);
-                    }
-
-                    // Consume data from receive buffer.
-                    if (!$data_len) {
-                        $head_len = $masked ? 6 : 2;
-                        $connection->consumeRecvBuffer($head_len);
-                        if ($recv_len > $head_len) {
-                            return static::input(substr($buffer, $head_len), $connection);
-                        }
-                        return 0;
-                    }
                     break;
                 // Pong package.
                 case 0xa:
-                    // Try to emit onWebSocketPong callback.
-                    if (isset($connection->onWebSocketPong) || isset($connection->worker->onWebSocketPong)) {
-                        try {
-                            call_user_func(isset($connection->onWebSocketPong)?$connection->onWebSocketPong:$connection->worker->onWebSocketPong, $connection);
-                        } catch (\Exception $e) {
-                            Worker::log($e);
-                            exit(250);
-                        } catch (\Error $e) {
-                            Worker::log($e);
-                            exit(250);
-                        }
-                    }
-                    //  Consume data from receive buffer.
-                    if (!$data_len) {
-                        $head_len = $masked ? 6 : 2;
-                        $connection->consumeRecvBuffer($head_len);
-                        if ($recv_len > $head_len) {
-                            return static::input(substr($buffer, $head_len), $connection);
-                        }
-                        return 0;
-                    }
                     break;
                 // Wrong opcode. 
                 default :
-                    echo "error opcode $opcode and close websocket connection. Buffer:" . bin2hex($buffer) . "\n";
+                    Worker::safeEcho("error opcode $opcode and close websocket connection. Buffer:" . bin2hex($buffer) . "\n");
                     $connection->close();
                     return 0;
             }
@@ -179,13 +140,63 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
             $current_frame_length = $head_len + $data_len;
 
             $total_package_size = strlen($connection->websocketDataBuffer) + $current_frame_length;
-            if ($total_package_size > TcpConnection::$maxPackageSize) {
-                echo "error package. package_length=$total_package_size\n";
+            if ($total_package_size > $connection::$maxPackageSize) {
+                Worker::safeEcho("error package. package_length=$total_package_size\n");
                 $connection->close();
                 return 0;
             }
 
             if ($is_fin_frame) {
+                if ($opcode === 0x9) {
+                    if ($recv_len >= $current_frame_length) {
+                        $ping_data = static::decode(substr($buffer, 0, $current_frame_length), $connection);
+                        $connection->consumeRecvBuffer($current_frame_length);
+                        $tmp_connection_type = isset($connection->websocketType) ? $connection->websocketType : static::BINARY_TYPE_BLOB;
+                        $connection->websocketType = "\x8a";
+                        if (isset($connection->onWebSocketPing) || isset($connection->worker->onWebSocketPing)) {
+                            try {
+                                call_user_func(isset($connection->onWebSocketPing)?$connection->onWebSocketPing:$connection->worker->onWebSocketPing, $connection, $ping_data);
+                            } catch (\Exception $e) {
+                                Worker::log($e);
+                                exit(250);
+                            } catch (\Error $e) {
+                                Worker::log($e);
+                                exit(250);
+                            }
+                        } else {
+                            $connection->send($ping_data);
+                        }
+                        $connection->websocketType = $tmp_connection_type;
+                        if ($recv_len > $current_frame_length) {
+                            return static::input(substr($buffer, $current_frame_length), $connection);
+                        }
+                    }
+                    return 0;
+                } else if ($opcode === 0xa) {
+                    if ($recv_len >= $current_frame_length) {
+                        $pong_data = static::decode(substr($buffer, 0, $current_frame_length), $connection);
+                        $connection->consumeRecvBuffer($current_frame_length);
+                        $tmp_connection_type = isset($connection->websocketType) ? $connection->websocketType : static::BINARY_TYPE_BLOB;
+                        $connection->websocketType = "\x8a";
+                        // Try to emit onWebSocketPong callback.
+                        if (isset($connection->onWebSocketPong) || isset($connection->worker->onWebSocketPong)) {
+                            try {
+                                call_user_func(isset($connection->onWebSocketPong)?$connection->onWebSocketPong:$connection->worker->onWebSocketPong, $connection, $pong_data);
+                            } catch (\Exception $e) {
+                                Worker::log($e);
+                                exit(250);
+                            } catch (\Error $e) {
+                                Worker::log($e);
+                                exit(250);
+                            }
+                        }
+                        $connection->websocketType = $tmp_connection_type;
+                        if ($recv_len > $current_frame_length) {
+                            return static::input(substr($buffer, $current_frame_length), $connection);
+                        }
+                    }
+                    return 0;
+                }
                 return $current_frame_length;
             } else {
                 $connection->websocketCurrentFrameLength = $current_frame_length;
@@ -293,7 +304,7 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
      */
     public static function decode($buffer, ConnectionInterface $connection)
     {
-        $masks = $data = $decoded = null;
+        $masks = $data = $decoded = '';
         $len = ord($buffer[1]) & 127;
         if ($len === 126) {
             $masks = substr($buffer, 4, 4);
@@ -357,10 +368,8 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
             $handshake_message .= "Upgrade: websocket\r\n";
             $handshake_message .= "Sec-WebSocket-Version: 13\r\n";
             $handshake_message .= "Connection: Upgrade\r\n";
-            $handshake_message .= "Server: workerman/".Worker::VERSION."\r\n";
-            $handshake_message .= "Sec-WebSocket-Accept: " . $new_key . "\r\n\r\n";
-            // Mark handshake complete..
-            $connection->websocketHandshake = true;
+            $handshake_message .= "Sec-WebSocket-Accept: " . $new_key . "\r\n";
+
             // Websocket data buffer.
             $connection->websocketDataBuffer = '';
             // Current websocket frame length.
@@ -369,18 +378,14 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
             $connection->websocketCurrentFrameBuffer = '';
             // Consume handshake data.
             $connection->consumeRecvBuffer($header_length);
-            // Send handshake response.
-            $connection->send($handshake_message, true);
 
-            // There are data waiting to be sent.
-            if (!empty($connection->tmpWebsocketData)) {
-                $connection->send($connection->tmpWebsocketData, true);
-                $connection->tmpWebsocketData = '';
-            }
             // blob or arraybuffer
             if (empty($connection->websocketType)) {
                 $connection->websocketType = static::BINARY_TYPE_BLOB;
             }
+
+            $has_server_header = false;
+
             // Try to emit onWebSocketConnect callback.
             if (isset($connection->onWebSocketConnect) || isset($connection->worker->onWebSocketConnect)) {
                 static::parseHttpHeader($buffer);
@@ -397,10 +402,36 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
                     $connection->session = \GatewayWorker\Lib\Context::sessionEncode($_SESSION);
                 }
                 $_GET = $_SERVER = $_SESSION = $_COOKIE = array();
+
+                if (isset($connection->headers)) {
+                    if (is_array($connection->headers))  {
+                        foreach ($connection->headers as $header) {
+                            if (strpos($header, 'Server:') === 0) {
+                                $has_server_header = true;
+                            }
+                            $handshake_message .= "$header\r\n";
+                        }
+                    } else {
+                        $handshake_message .= "$connection->headers\r\n";
+                    }
+                }
+            }
+            if (!$has_server_header) {
+                $handshake_message .= "Server: workerman/".Worker::VERSION."\r\n";
+            }
+            $handshake_message .= "\r\n";
+            // Send handshake response.
+            $connection->send($handshake_message, true);
+            // Mark handshake complete..
+            $connection->websocketHandshake = true;
+            // There are data waiting to be sent.
+            if (!empty($connection->tmpWebsocketData)) {
+                $connection->send($connection->tmpWebsocketData, true);
+                $connection->tmpWebsocketData = '';
             }
             if (strlen($buffer) > $header_length) {
                 return static::input(substr($buffer, $header_length), $connection);
-            } 
+            }
             return 0;
         } // Is flash policy-file-request.
         elseif (0 === strpos($buffer, '<polic')) {

+ 103 - 86
Protocols/Ws.php

@@ -46,7 +46,7 @@ class Ws
     public static function input($buffer, $connection)
     {
         if (empty($connection->handshakeStep)) {
-            echo "recv data before handshake. Buffer:" . bin2hex($buffer) . "\n";
+            Worker::safeEcho("recv data before handshake. Buffer:" . bin2hex($buffer) . "\n");
             return false;
         }
         // Recv handshake response
@@ -71,6 +71,13 @@ class Ws
             $data_len     = $secondbyte & 127;
             $is_fin_frame = $firstbyte >> 7;
             $masked       = $secondbyte >> 7;
+
+            if ($masked) {
+                Worker::safeEcho("frame masked so close the connection\n");
+                $connection->close();
+                return 0;
+            }
+
             $opcode       = $firstbyte & 0xf;
 
             switch ($opcode) {
@@ -102,64 +109,19 @@ class Ws
                     return 0;
                 // Ping package.
                 case 0x9:
-                    // Try to emit onWebSocketPing callback.
-                    if (isset($connection->onWebSocketPing)) {
-                        try {
-                            call_user_func($connection->onWebSocketPing, $connection);
-                        } catch (\Exception $e) {
-                            Worker::log($e);
-                            exit(250);
-                        } catch (\Error $e) {
-                            Worker::log($e);
-                            exit(250);
-                        }
-                    } // Send pong package to client.
-                    else {
-                        $connection->send(pack('H*', '8a00'), true);
-                    }
-                    // Consume data from receive buffer.
-                    if (!$data_len) {
-                        $head_len = $masked ? 6 : 2;
-                        $connection->consumeRecvBuffer($head_len);
-                        if ($recv_len > $head_len) {
-                            return self::input(substr($buffer, $head_len), $connection);
-                        }
-                        return 0;
-                    }
                     break;
                 // Pong package.
                 case 0xa:
-                    // Try to emit onWebSocketPong callback.
-                    if (isset($connection->onWebSocketPong)) {
-                        try {
-                            call_user_func($connection->onWebSocketPong, $connection);
-                        } catch (\Exception $e) {
-                            Worker::log($e);
-                            exit(250);
-                        } catch (\Error $e) {
-                            Worker::log($e);
-                            exit(250);
-                        }
-                    }
-                    //  Consume data from receive buffer.
-                    if (!$data_len) {
-                        $head_len = $masked ? 6 : 2;
-                        $connection->consumeRecvBuffer($head_len);
-                        if ($recv_len > $head_len) {
-                            return self::input(substr($buffer, $head_len), $connection);
-                        }
-                        return 0;
-                    }
                     break;
-                // Wrong opcode. 
+                // Wrong opcode.
                 default :
-                    echo "error opcode $opcode and close websocket connection. Buffer:" . $buffer . "\n";
+                    Worker::safeEcho("error opcode $opcode and close websocket connection. Buffer:" . $buffer . "\n");
                     $connection->close();
                     return 0;
             }
             // Calculate packet length.
             if ($data_len === 126) {
-                if (strlen($buffer) < 6) {
+                if (strlen($buffer) < 4) {
                     return 0;
                 }
                 $pack = unpack('nn/ntotal_len', $buffer);
@@ -175,13 +137,64 @@ class Ws
             }
 
             $total_package_size = strlen($connection->websocketDataBuffer) + $current_frame_length;
-            if ($total_package_size > TcpConnection::$maxPackageSize) {
-                echo "error package. package_length=$total_package_size\n";
+            if ($total_package_size > $connection::$maxPackageSize) {
+                Worker::safeEcho("error package. package_length=$total_package_size\n");
                 $connection->close();
                 return 0;
             }
 
             if ($is_fin_frame) {
+                if ($opcode === 0x9) {
+                    if ($recv_len >= $current_frame_length) {
+                        $ping_data = static::decode(substr($buffer, 0, $current_frame_length), $connection);
+                        $connection->consumeRecvBuffer($current_frame_length);
+                        $tmp_connection_type = isset($connection->websocketType) ? $connection->websocketType : static::BINARY_TYPE_BLOB;
+                        $connection->websocketType = "\x8a";
+                        if (isset($connection->onWebSocketPing)) {
+                            try {
+                                call_user_func($connection->onWebSocketPing, $connection, $ping_data);
+                            } catch (\Exception $e) {
+                                Worker::log($e);
+                                exit(250);
+                            } catch (\Error $e) {
+                                Worker::log($e);
+                                exit(250);
+                            }
+                        } else {
+                            $connection->send($ping_data);
+                        }
+                        $connection->websocketType = $tmp_connection_type;
+                        if ($recv_len > $current_frame_length) {
+                            return static::input(substr($buffer, $current_frame_length), $connection);
+                        }
+                    }
+                    return 0;
+
+                } else if ($opcode === 0xa) {
+                    if ($recv_len >= $current_frame_length) {
+                        $pong_data = static::decode(substr($buffer, 0, $current_frame_length), $connection);
+                        $connection->consumeRecvBuffer($current_frame_length);
+                        $tmp_connection_type = isset($connection->websocketType) ? $connection->websocketType : static::BINARY_TYPE_BLOB;
+                        $connection->websocketType = "\x8a";
+                        // Try to emit onWebSocketPong callback.
+                        if (isset($connection->onWebSocketPong)) {
+                            try {
+                                call_user_func($connection->onWebSocketPong, $connection, $pong_data);
+                            } catch (\Exception $e) {
+                                Worker::log($e);
+                                exit(250);
+                            } catch (\Error $e) {
+                                Worker::log($e);
+                                exit(250);
+                            }
+                        }
+                        $connection->websocketType = $tmp_connection_type;
+                        if ($recv_len > $current_frame_length) {
+                            return static::input(substr($buffer, $current_frame_length), $connection);
+                        }
+                    }
+                    return 0;
+                }
                 return $current_frame_length;
             } else {
                 $connection->websocketCurrentFrameLength = $current_frame_length;
@@ -289,31 +302,14 @@ class Ws
      */
     public static function decode($bytes, $connection)
     {
-        $masked = ord($bytes[1]) >> 7;
-        $data_length = $masked ? ord($bytes[1]) & 127 : ord($bytes[1]);
-        $decoded_data = '';
-        if ($masked === true) {
-            if ($data_length === 126) {
-                $mask = substr($bytes, 4, 4);
-                $coded_data = substr($bytes, 8);
-            } else if ($data_length === 127) {
-                $mask = substr($bytes, 10, 4);
-                $coded_data = substr($bytes, 14);
-            } else {
-                $mask = substr($bytes, 2, 4);
-                $coded_data = substr($bytes, 6);
-            }
-            for ($i = 0; $i < strlen($coded_data); $i++) {
-                $decoded_data .= $coded_data[$i] ^ $mask[$i % 4];
-            }
+        $data_length = ord($bytes[1]);
+
+        if ($data_length === 126) {
+            $decoded_data = substr($bytes, 4);
+        } else if ($data_length === 127) {
+            $decoded_data = substr($bytes, 10);
         } else {
-            if ($data_length === 126) {
-                $decoded_data = substr($bytes, 4);
-            } else if ($data_length === 127) {
-                $decoded_data = substr($bytes, 10);
-            } else {
-                $decoded_data = substr($bytes, 2);
-            }
+            $decoded_data = substr($bytes, 2);
         }
         if ($connection->websocketCurrentFrameLength) {
             $connection->websocketDataBuffer .= $decoded_data;
@@ -358,7 +354,7 @@ class Ws
      * Send websocket handshake.
      *
      * @param \Workerman\Connection\TcpConnection $connection
-     * @return void 
+     * @return void
      */
     public static function sendHandshake($connection)
     {
@@ -369,14 +365,26 @@ class Ws
         $port = $connection->getRemotePort();
         $host = $port === 80 ? $connection->getRemoteHost() : $connection->getRemoteHost() . ':' . $port;
         // Handshake header.
+        $connection->websocketSecKey = base64_encode(md5(mt_rand(), true));
+        $userHeader = '';
+        if (!empty($connection->wsHttpHeader)) {
+            if (is_array($connection->wsHttpHeader)){
+                foreach($connection->wsHttpHeader as $k=>$v){
+                    $userHeader .= "$k: $v\r\n";
+                }
+            }else{
+                $userHeader .= $connection->wsHttpHeader;
+            }
+            $userHeader = "\r\n".trim($userHeader);
+        }
         $header = 'GET ' . $connection->getRemoteURI() . " HTTP/1.1\r\n".
         "Host: $host\r\n".
         "Connection: Upgrade\r\n".
         "Upgrade: websocket\r\n".
         "Origin: ". (isset($connection->websocketOrigin) ? $connection->websocketOrigin : '*') ."\r\n".
-	(isset($connection->WSClientProtocol)?"Sec-WebSocket-Protocol: ".$connection->WSClientProtocol."\r\n":'').
+        (isset($connection->WSClientProtocol)?"Sec-WebSocket-Protocol: ".$connection->WSClientProtocol."\r\n":'').
         "Sec-WebSocket-Version: 13\r\n".
-        "Sec-WebSocket-Key: " . base64_encode(md5(mt_rand(), true)) . "\r\n\r\n";
+        "Sec-WebSocket-Key: " . $connection->websocketSecKey . $userHeader . "\r\n\r\n";
         $connection->send($header, true);
         $connection->handshakeStep               = 1;
         $connection->websocketCurrentFrameLength = 0;
@@ -395,16 +403,25 @@ class Ws
     {
         $pos = strpos($buffer, "\r\n\r\n");
         if ($pos) {
-            // handshake complete
+            //checking Sec-WebSocket-Accept
+            if (preg_match("/Sec-WebSocket-Accept: *(.*?)\r\n/i", $buffer, $match)) {
+                if ($match[1] !== base64_encode(sha1($connection->websocketSecKey . "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", true))) {
+                    Worker::safeEcho("Sec-WebSocket-Accept not match. Header:\n" . substr($buffer, 0, $pos) . "\n");
+                    $connection->close();
+                    return 0;
+                }
+            } else {
+                Worker::safeEcho("Sec-WebSocket-Accept not found. Header:\n" . substr($buffer, 0, $pos) . "\n");
+                $connection->close();
+                return 0;
+            }
 
-	    // Get WebSocket subprotocol (if specified by server)
-    	    $header = explode("\r\n", substr($buffer, 0, $pos));
-	    foreach ($header as $hrow) {
-		if (preg_match("#^(.+?)\:(.+?)$#", $hrow, $m) && ($m[1] == "Sec-WebSocket-Protocol")) {
-		    $connection->WSServerProtocol = trim($m[2]);
-		}
+            // handshake complete
 
-	    }
+            // Get WebSocket subprotocol (if specified by server)
+            if (preg_match("/Sec-WebSocket-Protocol: *(.*?)\r\n/i", $buffer, $match)) {
+                $connection->WSServerProtocol = trim($match[1]);
+            }
 
             $connection->handshakeStep = 2;
             $handshake_response_length = $pos + 4;

+ 3 - 11
README.md

@@ -406,23 +406,15 @@ use Workerman\Worker;
 
 $worker = new Worker('tcp://0.0.0.0:6161');
 
-$worker->onWorkerStart = function() {
-    global   $client;
-    $loop    = Worker::getEventLoop();
-    $factory = new React\Dns\Resolver\Factory();
-    $dns     = $factory->createCached('8.8.8.8', $loop);
-    $factory = new React\HttpClient\Factory();
-    $client = $factory->create($loop, $dns);
-};
-
 $worker->onMessage = function($connection, $host) {
-    global     $client;
+    $loop    = Worker::getEventLoop();
+    $client  = new \React\HttpClient\Client($loop);
     $request = $client->request('GET', trim($host));
     $request->on('error', function(Exception $e) use ($connection) {
         $connection->send($e);
     });
     $request->on('response', function ($response) use ($connection) {
-        $response->on('data', function ($data, $response) use ($connection) {
+        $response->on('data', function ($data) use ($connection) {
             $connection->send($data);
         });
     });

+ 20 - 10
WebServer.php

@@ -47,12 +47,15 @@ class WebServer extends Worker
      * Add virtual host.
      *
      * @param string $domain
-     * @param string $root_path
+     * @param string $config
      * @return void
      */
-    public function addRoot($domain, $root_path)
+    public function addRoot($domain, $config)
     {
-        $this->serverRoot[$domain] = $root_path;
+	if (is_string($config)) {
+            $config = array('root' => $config);
+	}
+        $this->serverRoot[$domain] = $config;
     }
 
     /**
@@ -89,7 +92,7 @@ class WebServer extends Worker
     public function onWorkerStart()
     {
         if (empty($this->serverRoot)) {
-            echo new \Exception('server root not set, please use WebServer::addRoot($domain, $root_path) to set server root path');
+            Worker::safeEcho(new \Exception('server root not set, please use WebServer::addRoot($domain, $root_path) to set server root path'));
             exit(250);
         }
 
@@ -148,7 +151,7 @@ class WebServer extends Worker
     public function onMessage($connection)
     {
         // REQUEST_URI.
-        $workerman_url_info = parse_url($_SERVER['REQUEST_URI']);
+        $workerman_url_info = parse_url('http://'.$_SERVER['HTTP_HOST'].$_SERVER['REQUEST_URI']);
         if (!$workerman_url_info) {
             Http::header('HTTP/1.1 400 Bad Request');
             $connection->close('<h1>400 Bad Request</h1>');
@@ -164,10 +167,12 @@ class WebServer extends Worker
             $workerman_file_extension = 'php';
         }
 
-        $workerman_root_dir = isset($this->serverRoot[$_SERVER['SERVER_NAME']]) ? $this->serverRoot[$_SERVER['SERVER_NAME']] : current($this->serverRoot);
-
+        $workerman_siteConfig = isset($this->serverRoot[$_SERVER['SERVER_NAME']]) ? $this->serverRoot[$_SERVER['SERVER_NAME']] : current($this->serverRoot);
+		$workerman_root_dir = $workerman_siteConfig['root'];
         $workerman_file = "$workerman_root_dir/$workerman_path";
-
+		if(isset($workerman_siteConfig['additionHeader'])){
+			Http::header($workerman_siteConfig['additionHeader']);
+		}
         if ($workerman_file_extension === 'php' && !is_file($workerman_file)) {
             $workerman_file = "$workerman_root_dir/index.php";
             if (!is_file($workerman_file)) {
@@ -204,7 +209,7 @@ class WebServer extends Worker
                 } catch (\Exception $e) {
                     // Jump_exit?
                     if ($e->getMessage() != 'jump_exit') {
-                        echo $e;
+                        Worker::safeEcho($e);
                     }
                 }
                 $content = ob_get_clean();
@@ -223,7 +228,12 @@ class WebServer extends Worker
         } else {
             // 404
             Http::header("HTTP/1.1 404 Not Found");
-            $connection->close('<html><head><title>404 File not found</title></head><body><center><h3>404 Not Found</h3></center></body></html>');
+			if(isset($workerman_siteConfig['custom404']) && file_exists($workerman_siteConfig['custom404'])){
+				$html404 = file_get_contents($workerman_siteConfig['custom404']);
+			}else{
+				$html404 = '<html><head><title>404 File not found</title></head><body><center><h3>404 Not Found</h3></center></body></html>';
+			}
+            $connection->close($html404);
             return;
         }
     }

+ 356 - 129
Worker.php

@@ -33,7 +33,7 @@ class Worker
      *
      * @var string
      */
-    const VERSION = '3.5.4';
+    const VERSION = '3.5.14';
 
     /**
      * Status starting.
@@ -85,6 +85,13 @@ class Worker
     const MAX_UDP_PACKAGE_SIZE = 65535;
 
     /**
+     * The safe distance for columns adjacent
+     *
+     * @var int
+     */
+    const UI_SAFE_LENGTH = 4;
+
+    /**
      * Worker id.
      *
      * @var int
@@ -227,11 +234,17 @@ class Worker
     /**
      * Pause accept new connections or not.
      *
-     * @var string
+     * @var bool
      */
     protected $_pauseAccept = true;
 
     /**
+     * Is worker stopping ?
+     * @var bool
+     */
+    public $stopping = false;
+
+    /**
      * Daemonize.
      *
      * @var bool
@@ -318,7 +331,7 @@ class Worker
     /**
      * All worker instances.
      *
-     * @var array
+     * @var Worker[]
      */
     protected static $_workers = array();
 
@@ -375,6 +388,27 @@ class Worker
     protected static $_maxUserNameLength = 12;
 
     /**
+     * Maximum length of the Proto names.
+     *
+     * @var int
+     */
+    protected static $_maxProtoNameLength = 4;
+
+    /**
+     * Maximum length of the Processes names.
+     *
+     * @var int
+     */
+    protected static $_maxProcessesNameLength = 9;
+
+    /**
+     * Maximum length of the Status names.
+     *
+     * @var int
+     */
+    protected static $_maxStatusNameLength = 1;
+
+    /**
      * The file to store status info of current worker process.
      *
      * @var string
@@ -393,7 +427,7 @@ class Worker
      *
      * @var string
      */
-    protected static $_OS = 'linux';
+    protected static $_OS = OS_TYPE_LINUX;
 
     /**
      * Processes for windows.
@@ -419,7 +453,8 @@ class Worker
      */
     protected static $_availableEventLoops = array(
         'libevent' => '\Workerman\Events\Libevent',
-        'event'    => '\Workerman\Events\Event'
+        'event'    => '\Workerman\Events\Event',
+        'swoole'   => '\Workerman\Events\Swoole'
     );
 
     /**
@@ -442,6 +477,18 @@ class Worker
     protected static $_gracefulStop = false;
 
     /**
+     * Standard output stream
+     * @var resource
+     */
+    protected static $_outputStream = null;
+
+    /**
+     * If $outputStream support decorated
+     * @var bool
+     */
+    protected static $_outputDecorated = null;
+
+    /**
      * Run all worker instances.
      *
      * @return void
@@ -473,7 +520,7 @@ class Worker
             exit("only run in command line mode \n");
         }
         if (DIRECTORY_SEPARATOR === '\\') {
-            self::$_OS = 'windows';
+            self::$_OS = OS_TYPE_WINDOWS;
         }
     }
 
@@ -484,6 +531,10 @@ class Worker
      */
     protected static function init()
     {
+        set_error_handler(function($code, $msg, $file, $line){
+            Worker::safeEcho("$msg in file $file on line $line\n");
+        });
+
         // Start file.
         $backtrace        = debug_backtrace();
         static::$_startFile = $backtrace[count($backtrace) - 1]['file'];
@@ -530,7 +581,7 @@ class Worker
      */
     protected static function initWorkers()
     {
-        if (static::$_OS !== 'linux') {
+        if (static::$_OS !== OS_TYPE_LINUX) {
             return;
         }
         foreach (static::$_workers as $worker) {
@@ -539,33 +590,27 @@ class Worker
                 $worker->name = 'none';
             }
 
-            // Get maximum length of worker name.
-            $worker_name_length = strlen($worker->name);
-            if (static::$_maxWorkerNameLength < $worker_name_length) {
-                static::$_maxWorkerNameLength = $worker_name_length;
-            }
-
-            // Get maximum length of socket name.
-            $socket_name_length = strlen($worker->getSocketName());
-            if (static::$_maxSocketNameLength < $socket_name_length) {
-                static::$_maxSocketNameLength = $socket_name_length;
-            }
-
             // Get unix user of the worker process.
-            if (static::$_OS === 'linux') {
-                if (empty($worker->user)) {
-                    $worker->user = static::getCurrentUser();
-                } else {
-                    if (posix_getuid() !== 0 && $worker->user != static::getCurrentUser()) {
-                        static::log('Warning: You must have the root privileges to change uid and gid.');
-                    }
+            if (empty($worker->user)) {
+                $worker->user = static::getCurrentUser();
+            } else {
+                if (posix_getuid() !== 0 && $worker->user != static::getCurrentUser()) {
+                    static::log('Warning: You must have the root privileges to change uid and gid.');
                 }
             }
 
-            // Get maximum length of unix user name.
-            $user_name_length = strlen($worker->user);
-            if (static::$_maxUserNameLength < $user_name_length) {
-                static::$_maxUserNameLength = $user_name_length;
+            // Socket name.
+            $worker->socket = $worker->getSocketName();
+
+            // Status name.
+            $worker->status = '<g> [OK] </g>';
+
+            // Get clolumn mapping for UI
+            foreach(static::getUiColumns() as $column_name => $prop){
+                !isset($worker->{$prop}) && $worker->{$prop}= 'NNNN';
+                $prop_length = strlen($worker->{$prop});
+                $key = '_max' . ucfirst(strtolower($column_name)) . 'NameLength';
+                static::$$key = max(static::$$key, $prop_length);
             }
 
             // Listen.
@@ -603,6 +648,7 @@ class Worker
     {
         foreach (static::$_workers as $worker_id => $worker) {
             $new_id_map = array();
+            $worker->count = $worker->count <= 0 ? 1 : $worker->count;
             for($key = 0; $key < $worker->count; $key++) {
                 $new_id_map[$key] = isset(static::$_idMap[$worker_id][$key]) ? static::$_idMap[$worker_id][$key] : 0;
             }
@@ -629,30 +675,51 @@ class Worker
     protected static function displayUI()
     {
         global $argv;
-        if (isset($argv[1])) {
-            // Support not display ui: php serv.php start/restart -q , php serv.php start/restart -d -q
-            if ( (isset($argv[2]) && ($argv[2] === '-q')) || (isset($argv[2]) && ($argv[2] === '-d') && isset($argv[3]) && ($argv[3] === '-q')) ) {
-                return;
-            }
+        if (in_array('-q', $argv)) {
+            return;
         }
-        static::safeEcho("\033[1A\n\033[K-----------------------\033[47;30m WORKERMAN \033[0m-----------------------------\r\n\033[0m");
-        static::safeEcho('Workerman version:'. static::VERSION. "          PHP version:". PHP_VERSION. "\r\n");
-        static::safeEcho("------------------------\033[47;30m WORKERS \033[0m-------------------------------\r\n");
-        if (static::$_OS !== 'linux') {
+        if (static::$_OS !== OS_TYPE_LINUX) {
+            static::safeEcho("----------------------- WORKERMAN -----------------------------\r\n");
+            static::safeEcho('Workerman version:'. static::VERSION. "          PHP version:". PHP_VERSION. "\r\n");
+            static::safeEcho("------------------------ WORKERS -------------------------------\r\n");
             static::safeEcho("worker               listen                              processes status\r\n");
             return;
         }
-        static::safeEcho("\033[47;30muser\033[0m". str_pad('',
-                static::$_maxUserNameLength + 2 - strlen('user')). "\033[47;30mworker\033[0m". str_pad('',
-                static::$_maxWorkerNameLength + 2 - strlen('worker')). "\033[47;30mlisten\033[0m". str_pad('',
-                static::$_maxSocketNameLength + 2 - strlen('listen')). "\033[47;30mprocesses\033[0m \033[47;30m". "status\033[0m\n");
 
+        //show version
+        $line_version = 'Workerman version:' . static::VERSION . str_pad('PHP version:', 22, ' ', STR_PAD_LEFT) . PHP_VERSION . PHP_EOL;
+        !defined('LINE_VERSIOIN_LENGTH') && define('LINE_VERSIOIN_LENGTH', strlen($line_version));
+        $total_length = static::getSingleLineTotalLength();
+        $line_one = '<n>' . str_pad('<w> WORKERMAN </w>', $total_length + strlen('<w></w>'), '-', STR_PAD_BOTH) . '</n>'. PHP_EOL;
+        $line_two = str_pad('<w> WORKERS </w>' , $total_length  + strlen('<w></w>'), '-', STR_PAD_BOTH) . PHP_EOL;
+        static::safeEcho($line_one . $line_version . $line_two);
+
+        //Show title
+        $title = '';
+        foreach(static::getUiColumns() as $column_name => $prop){
+            $key = '_max' . ucfirst(strtolower($column_name)) . 'NameLength';
+            //just keep compatible with listen name 
+            $column_name == 'socket' && $column_name = 'listen';
+            $title.= "<w>{$column_name}</w>"  .  str_pad('', static::$$key + static::UI_SAFE_LENGTH - strlen($column_name));
+        }
+        $title && static::safeEcho($title . PHP_EOL);
+
+        //Show content
         foreach (static::$_workers as $worker) {
-            static::safeEcho(str_pad($worker->user, static::$_maxUserNameLength + 2). str_pad($worker->name,
-                    static::$_maxWorkerNameLength + 2). str_pad($worker->getSocketName(),
-                    static::$_maxSocketNameLength + 2). str_pad(' ' . $worker->count, 9). " \033[32;40m [OK] \033[0m\n");
+            $content = '';
+            foreach(static::getUiColumns() as $column_name => $prop){
+                $key = '_max' . ucfirst(strtolower($column_name)) . 'NameLength';
+                preg_match_all("/(<n>|<\/n>|<w>|<\/w>|<g>|<\/g>)/is", $worker->{$prop}, $matches);
+                $place_holder_length = !empty($matches) ? strlen(implode('', $matches[0])) : 0;
+                $content .= str_pad($worker->{$prop}, static::$$key + static::UI_SAFE_LENGTH + $place_holder_length);
+            }
+            $content && static::safeEcho($content . PHP_EOL);
         }
-        static::safeEcho("----------------------------------------------------------------\n");
+
+        //Show last line
+        $line_last = str_pad('', static::getSingleLineTotalLength(), '-') . PHP_EOL;
+        $content && static::safeEcho($line_last);
+
         if (static::$daemonize) {
             static::safeEcho("Input \"php $argv[0] stop\" to stop. Start success.\n\n");
         } else {
@@ -661,14 +728,56 @@ class Worker
     }
 
     /**
+     * Get UI columns to be shown in terminal
+     *
+     * 1. $column_map: array('ui_column_name' => 'clas_property_name')
+     * 2. Consider move into configuration in future
+     *
+     * @return array
+     */
+    public static function getUiColumns()
+    {
+        $column_map = array(
+            'proto'     =>  'transport',
+            'user'      =>  'user',
+            'worker'    =>  'name',
+            'socket'    =>  'socket',
+            'processes' =>  'count',
+            'status'    =>  'status',
+        );
+
+        return $column_map;
+    }
+
+    /**
+     * Get single line total length for ui
+     *
+     * @return int
+     */
+    public static function getSingleLineTotalLength()
+    {
+        $total_length = 0;
+
+        foreach(static::getUiColumns() as $column_name => $prop){
+            $key = '_max' . ucfirst(strtolower($column_name)) . 'NameLength';
+            $total_length += static::$$key + static::UI_SAFE_LENGTH;
+        }
+
+        //keep beauty when show less colums
+        !defined('LINE_VERSIOIN_LENGTH') && define('LINE_VERSIOIN_LENGTH', 0);
+        $total_length <= LINE_VERSIOIN_LENGTH && $total_length = LINE_VERSIOIN_LENGTH;
+
+        return $total_length;
+    }
+
+    /**
      * Parse command.
-     * php yourfile.php start | stop | restart | reload | status [-d]
      *
      * @return void
      */
     protected static function parseCommand()
     {
-        if (static::$_OS !== 'linux') {
+        if (static::$_OS !== OS_TYPE_LINUX) {
             return;
         }
         global $argv;
@@ -682,8 +791,11 @@ class Worker
             'status',
             'connections',
         );
-        $usage = "Usage: php yourfile.php {" . implode('|', $available_commands) . "} [-d]\n";
+        $usage = "Usage: php yourfile <command> [mode]\nCommands: \nstart\t\tStart worker in DEBUG mode.\n\t\tUse mode -d to start in DAEMON mode.\nstop\t\tStop worker.\n\t\tUse mode -g to stop gracefully.\nrestart\t\tRestart workers.\n\t\tUse mode -d to start in DAEMON mode.\n\t\tUse mode -g to stop gracefully.\nreload\t\tReload codes.\n\t\tUse mode -g to reload gracefully.\nstatus\t\tGet worker status.\n\t\tUse mode -d to show live status.\nconnections\tGet worker connections.\n";
         if (!isset($argv[1]) || !in_array($argv[1], $available_commands)) {
+            if (isset($argv[1])) {
+                static::safeEcho('Unknown command: ' . $argv[1] . "\n");
+            }
             exit($usage);
         }
 
@@ -704,7 +816,7 @@ class Worker
 
         // Get master process PID.
         $master_pid      = is_file(static::$pidFile) ? file_get_contents(static::$pidFile) : 0;
-        $master_is_alive = $master_pid && @posix_kill($master_pid, 0) && posix_getpid() != $master_pid;
+        $master_is_alive = $master_pid && posix_kill($master_pid, 0) && posix_getpid() != $master_pid;
         // Master is still alive?
         if ($master_is_alive) {
             if ($command === 'start') {
@@ -734,37 +846,39 @@ class Worker
                     sleep(1);
                     // Clear terminal.
                     if ($command2 === '-d') {
-                        echo "\33[H\33[2J\33(B\33[m";
+                        static::safeEcho("\33[H\33[2J\33(B\33[m", true);
                     }
                     // Echo status data.
-                    echo static::formatStatusData();
+                    static::safeEcho(static::formatStatusData());
                     if ($command2 !== '-d') {
                         exit(0);
                     }
-                    echo "\nPress Ctrl+C to quit.\n\n";
+                    static::safeEcho("\nPress Ctrl+C to quit.\n\n");
                 }
                 exit(0);
             case 'connections':
-                if (is_file(static::$_statisticsFile)) {
-                    @unlink(static::$_statisticsFile);
+                if (is_file(static::$_statisticsFile) && is_writable(static::$_statisticsFile)) {
+                    unlink(static::$_statisticsFile);
                 }
                 // Master process will send SIGIO signal to all child processes.
                 posix_kill($master_pid, SIGIO);
                 // Waiting amoment.
                 usleep(500000);
                 // Display statisitcs data from a disk file.
-                @readfile(static::$_statisticsFile);
+                if(is_readable(static::$_statisticsFile)) {
+                    readfile(static::$_statisticsFile);
+                }
                 exit(0);
             case 'restart':
             case 'stop':
                 if ($command2 === '-g') {
                     static::$_gracefulStop = true;
                     $sig = SIGTERM;
-                    static::log("Workerman[$start_file] is gracefully stoping ...");
+                    static::log("Workerman[$start_file] is gracefully stopping ...");
                 } else {
                     static::$_gracefulStop = false;
                     $sig = SIGINT;
-                    static::log("Workerman[$start_file] is stoping ...");
+                    static::log("Workerman[$start_file] is stopping ...");
                 }
                 // Send stop signal to master process.
                 $master_pid && posix_kill($master_pid, $sig);
@@ -804,6 +918,9 @@ class Worker
                 posix_kill($master_pid, $sig);
                 exit;
             default :
+                if (isset($command)) {
+                    static::safeEcho('Unknown command: ' . $command . "\n");
+                }
                 exit($usage);
         }
     }
@@ -816,7 +933,10 @@ class Worker
     protected static function formatStatusData()
     {
         static $total_request_cache = array();
-        $info = @file(static::$_statisticsFile, FILE_IGNORE_NEW_LINES);
+        if (!is_readable(static::$_statisticsFile)) {
+            return '';
+        }
+        $info = file(static::$_statisticsFile, FILE_IGNORE_NEW_LINES);
         if (!$info) {
             return '';
         }
@@ -827,6 +947,14 @@ class Worker
         unset($info[0]);
         $data_waiting_sort = array();
         $read_process_status = false;
+        $total_requests = 0;
+        $total_qps = 0;
+        $total_connections = 0;
+        $total_fails = 0;
+        $total_memory = 0;
+        $total_timers = 0;
+        $maxLen1 = static::$_maxSocketNameLength;
+        $maxLen2 = static::$_maxWorkerNameLength;
         foreach($info as $key => $value) {
             if (!$read_process_status) {
                 $status_str .= $value . "\n";
@@ -838,8 +966,15 @@ class Worker
             if(preg_match('/^[0-9]+/', $value, $pid_math)) {
                 $pid = $pid_math[0];
                 $data_waiting_sort[$pid] = $value;
-                if(preg_match('/^\S+?\s+?\S+?\s+?\S+?\s+?\S+?\s+?\S+?\s+?\S+?\s+?\S+?\s+?(\S+?)\s+?/', $value, $match)) {
-                    $current_total_request[$pid] = $match[1];
+                if(preg_match('/^\S+?\s+?(\S+?)\s+?(\S+?)\s+?(\S+?)\s+?(\S+?)\s+?(\S+?)\s+?(\S+?)\s+?(\S+?)\s+?/', $value, $match)) {
+                    $total_memory += intval(str_ireplace('M','',$match[1]));
+                    $maxLen1 = max($maxLen1,strlen($match[2]));
+                    $maxLen2 = max($maxLen2,strlen($match[3]));
+                    $total_connections += intval($match[4]);
+                    $total_fails += intval($match[5]);
+                    $total_timers += intval($match[6]);
+                    $current_total_request[$pid] = $match[7];
+                    $total_requests += intval($match[7]);
                 }
             }
         }
@@ -857,10 +992,18 @@ class Worker
                 $qps = 0;
             } else {
                 $qps = $current_total_request[$pid] - $total_request_cache[$pid];
+                $total_qps += $qps;
             }
             $status_str .= $data_waiting_sort[$pid]. " " . str_pad($qps, 6) ." [idle]\n";
         }
         $total_request_cache = $current_total_request;
+        $status_str .= "----------------------------------------------PROCESS STATUS---------------------------------------------------\n";
+        $status_str .= "Summary\t" . str_pad($total_memory.'M', 7) . " "
+            . str_pad('-', $maxLen1) . " "
+            . str_pad('-', $maxLen2) . " "
+            . str_pad($total_connections, 11) . " " . str_pad($total_fails, 9) . " "
+            . str_pad($total_timers, 7) . " " . str_pad($total_requests, 13) . " "
+            . str_pad($total_qps,6)." [Summary] \n";
         return $status_str;
     }
 
@@ -872,7 +1015,7 @@ class Worker
      */
     protected static function installSignal()
     {
-        if (static::$_OS !== 'linux') {
+        if (static::$_OS !== OS_TYPE_LINUX) {
             return;
         }
         // stop
@@ -898,7 +1041,7 @@ class Worker
      */
     protected static function reinstallSignal()
     {
-        if (static::$_OS !== 'linux') {
+        if (static::$_OS !== OS_TYPE_LINUX) {
             return;
         }
         // uninstall stop signal handler
@@ -919,7 +1062,7 @@ class Worker
         static::$globalEvent->add(SIGUSR1, EventInterface::EV_SIGNAL, array('\Workerman\Worker', 'signalHandler'));
         // reinstall graceful reload signal handler
         static::$globalEvent->add(SIGQUIT, EventInterface::EV_SIGNAL, array('\Workerman\Worker', 'signalHandler'));
-        // reinstall  status signal handler
+        // reinstall status signal handler
         static::$globalEvent->add(SIGUSR2, EventInterface::EV_SIGNAL, array('\Workerman\Worker', 'signalHandler'));
         // reinstall connection status signal handler
         static::$globalEvent->add(SIGIO, EventInterface::EV_SIGNAL, array('\Workerman\Worker', 'signalHandler'));
@@ -972,7 +1115,7 @@ class Worker
      */
     protected static function daemonize()
     {
-        if (!static::$daemonize || static::$_OS !== 'linux') {
+        if (!static::$daemonize || static::$_OS !== OS_TYPE_LINUX) {
             return;
         }
         umask(0);
@@ -1001,17 +1144,24 @@ class Worker
      */
     public static function resetStd()
     {
-        if (!static::$daemonize || static::$_OS !== 'linux') {
+        if (!static::$daemonize || static::$_OS !== OS_TYPE_LINUX) {
             return;
         }
         global $STDOUT, $STDERR;
         $handle = fopen(static::$stdoutFile, "a");
         if ($handle) {
             unset($handle);
-            @fclose(STDOUT);
-            @fclose(STDERR);
+            set_error_handler(function(){});
+            fclose($STDOUT);
+            fclose($STDERR);
+            fclose(STDOUT);
+            fclose(STDERR);
             $STDOUT = fopen(static::$stdoutFile, "a");
             $STDERR = fopen(static::$stdoutFile, "a");
+            // change output stream
+            static::$_outputStream = null;
+            static::outputStream($STDOUT);
+            restore_error_handler();
         } else {
             throw new Exception('can not open stdoutFile ' . static::$stdoutFile);
         }
@@ -1024,11 +1174,11 @@ class Worker
      */
     protected static function saveMasterPid()
     {
-        if (static::$_OS !== 'linux') {
+        if (static::$_OS !== OS_TYPE_LINUX) {
             return;
         }
         static::$_masterPid = posix_getpid();
-        if (false === @file_put_contents(static::$pidFile, static::$_masterPid)) {
+        if (false === file_put_contents(static::$pidFile, static::$_masterPid)) {
             throw new Exception('can not save pid to ' . static::$pidFile);
         }
     }
@@ -1044,6 +1194,10 @@ class Worker
             return static::$eventLoopClass;
         }
 
+        if (!class_exists('\Swoole\Event')) {
+            unset(static::$_availableEventLoops['swoole']);
+        }
+        
         $loop_name = '';
         foreach (static::$_availableEventLoops as $name=>$class) {
             if (extension_loaded($name)) {
@@ -1056,7 +1210,7 @@ class Worker
             if (interface_exists('\React\EventLoop\LoopInterface')) {
                 switch ($loop_name) {
                     case 'libevent':
-                        static::$eventLoopClass = '\Workerman\Events\React\LibEventLoop';
+                        static::$eventLoopClass = '\Workerman\Events\React\ExtLibEventLoop';
                         break;
                     case 'event':
                         static::$eventLoopClass = '\Workerman\Events\React\ExtEventLoop';
@@ -1097,7 +1251,7 @@ class Worker
      */
     protected static function forkWorkers()
     {
-        if (static::$_OS === 'linux') {
+        if (static::$_OS === OS_TYPE_LINUX) {
             static::forkWorkersForLinux();
         } else {
             static::forkWorkersForWindows();
@@ -1123,7 +1277,6 @@ class Worker
                 }
             }
 
-            $worker->count = $worker->count <= 0 ? 1 : $worker->count;
             while (count(static::$_pidMap[$worker->workerId]) < $worker->count) {
                 static::forkOneWorkerForLinux($worker);
             }
@@ -1135,16 +1288,16 @@ class Worker
      *
      * @return void
      */
-    public static function forkWorkersForWindows()
+    protected static function forkWorkersForWindows()
     {
         $files = static::getStartFilesForWindows();
         global $argv;
-        if(isset($argv[1]) && $argv[1] === '-q')
+        if(in_array('-q', $argv) || count($files) === 1)
         {
             if(count(static::$_workers) > 1)
             {
-                echo "@@@ Error: multi workers init in one php file are not support @@@\r\n";
-                echo "@@@ Please visit http://wiki.workerman.net/Multi_woker_for_win @@@\r\n";
+                static::safeEcho("@@@ Error: multi workers init in one php file are not support @@@\r\n");
+                static::safeEcho("@@@ Please visit http://wiki.workerman.net/Multi_woker_for_win @@@\r\n");
             }
             elseif(count(static::$_workers) <= 0)
             {
@@ -1156,7 +1309,7 @@ class Worker
             $worker = current(static::$_workers);
 
             // Display UI.
-            echo str_pad($worker->name, 21) . str_pad($worker->getSocketName(), 36) . str_pad($worker->count, 10) . "[ok]\n";
+            static::safeEcho(str_pad($worker->name, 21) . str_pad($worker->getSocketName(), 36) . str_pad($worker->count, 10) . "[ok]\n");
             $worker->listen();
             $worker->run();
             exit("@@@child exit@@@\r\n");
@@ -1182,11 +1335,6 @@ class Worker
         $files = array();
         foreach($argv as $file)
         {
-            $ext = pathinfo($file, PATHINFO_EXTENSION );
-            if($ext !== 'php')
-            {
-                continue;
-            }
             if(is_file($file))
             {
                 $files[$file] = $file;
@@ -1221,9 +1369,9 @@ class Worker
             static::$globalEvent = new Select();
             Timer::init(static::$globalEvent);
         }
-        $timer_id = Timer::add(1, function()use($std_handler)
+        $timer_id = Timer::add(0.1, function()use($std_handler)
         {
-            echo fread($std_handler, 65535);
+            Worker::safeEcho(fread($std_handler, 65535));
         });
 
         // 保存子进程句柄
@@ -1246,15 +1394,15 @@ class Worker
             {
                 if(!$status['running'])
                 {
-                    echo "process $start_file terminated and try to restart\n";
+                    static::safeEcho("process $start_file terminated and try to restart\n");
                     Timer::del($timer_id);
-                    @proc_close($process);
+                    proc_close($process);
                     static::forkOneWorkerForWindows($start_file);
                 }
             }
             else
             {
-                echo "proc_get_status fail\n";
+                static::safeEcho("proc_get_status fail\n");
             }
         }
     }
@@ -1280,6 +1428,8 @@ class Worker
             static::$_idMap[$worker->workerId][$id]   = $pid;
         } // For child processes.
         elseif (0 === $pid) {
+            srand();
+            mt_srand();
             if ($worker->reusePort) {
                 $worker->listen();
             }
@@ -1287,7 +1437,13 @@ class Worker
                 static::resetStd();
             }
             static::$_pidMap  = array();
-            static::$_workers = array($worker->workerId => $worker);
+            // Remove other listener.
+            foreach(static::$_workers as $key => $one_worker) {
+                if ($one_worker->workerId !== $worker->workerId) {
+                    $one_worker->unlisten();
+                    unset(static::$_workers[$key]);
+                }
+            }
             Timer::delAll();
             static::setProcessTitle('WorkerMan: worker process  ' . $worker->name . ' ' . $worker->getSocketName());
             $worker->setUserAndGroup();
@@ -1356,13 +1512,15 @@ class Worker
      */
     protected static function setProcessTitle($title)
     {
+        set_error_handler(function(){});
         // >=php 5.5
         if (function_exists('cli_set_process_title')) {
-            @cli_set_process_title($title);
+            cli_set_process_title($title);
         } // Need proctitle when php<=5.5 .
         elseif (extension_loaded('proctitle') && function_exists('setproctitle')) {
-            @setproctitle($title);
+            setproctitle($title);
         }
+        restore_error_handler();
     }
 
     /**
@@ -1372,7 +1530,7 @@ class Worker
      */
     protected static function monitorWorkers()
     {
-        if (static::$_OS === 'linux') {
+        if (static::$_OS === OS_TYPE_LINUX) {
             static::monitorWorkersForLinux();
         } else {
             static::monitorWorkersForWindows();
@@ -1416,7 +1574,7 @@ class Worker
                         unset(static::$_pidMap[$worker_id][$pid]);
 
                         // Mark id is available.
-                        $id                            = static::getId($worker_id, $pid);
+                        $id                              = static::getId($worker_id, $pid);
                         static::$_idMap[$worker_id][$id] = 0;
 
                         break;
@@ -1430,18 +1588,13 @@ class Worker
                         unset(static::$_pidsToRestart[$pid]);
                         static::reload();
                     }
-                } else {
-                    // If shutdown state and all child processes exited then master process exit.
-                    if (!static::getAllWorkerPids()) {
-                        static::exitAndClearAll();
-                    }
-                }
-            } else {
-                // If shutdown state and all child processes exited then master process exit.
-                if (static::$_status === static::STATUS_SHUTDOWN && !static::getAllWorkerPids()) {
-                    static::exitAndClearAll();
                 }
             }
+
+            // If shutdown state and all child processes exited then master process exit.
+            if (static::$_status === static::STATUS_SHUTDOWN && !static::getAllWorkerPids()) {
+                static::exitAndClearAll();
+            }
         }
     }
 
@@ -1452,7 +1605,7 @@ class Worker
      */
     protected static function monitorWorkersForWindows()
     {
-        Timer::add(0.5, "\\Workerman\\Worker::checkWorkerStatusForWindows");
+        Timer::add(1, "\\Workerman\\Worker::checkWorkerStatusForWindows");
 
         static::$globalEvent->loop();
     }
@@ -1594,6 +1747,7 @@ class Worker
                     Timer::add(static::KILL_WORKER_TIMER_TIME, 'posix_kill', array($worker_pid, SIGKILL), false);
                 }
             }
+            Timer::add(1, "\\Workerman\\Worker::checkIfChildRunning");
             // Remove statistics file.
             if (is_file(static::$_statisticsFile)) {
                 @unlink(static::$_statisticsFile);
@@ -1602,16 +1756,36 @@ class Worker
         else {
             // Execute exit.
             foreach (static::$_workers as $worker) {
-                $worker->stop();
+                if(!$worker->stopping){
+                    $worker->stop();
+                    $worker->stopping = true;
+                }
             }
             if (!static::$_gracefulStop || ConnectionInterface::$statistics['connection_count'] <= 0) {
-                static::$globalEvent->destroy();
+                static::$_workers = array();
+                if (static::$globalEvent) {
+                    static::$globalEvent->destroy();
+                }
                 exit(0);
             }
         }
     }
 
     /**
+     * check if child processes is really running
+     */
+    public static function checkIfChildRunning()
+    {
+        foreach (static::$_pidMap as $worker_id => $worker_pid_array) {
+            foreach ($worker_pid_array as $pid => $worker_pid) {
+                if (!posix_kill($pid, 0)) {
+                    unset(static::$_pidMap[$worker_id][$pid]);
+                }
+            }
+        }
+    }
+
+    /**
      * Get process status.
      *
      * @return number
@@ -1697,6 +1871,7 @@ class Worker
         }
 
         // For child processes.
+        reset(static::$_workers);
         /** @var \Workerman\Worker $worker */
         $worker            = current(static::$_workers);
         $worker_status_str = posix_getpid() . "\t" . str_pad(round(memory_get_usage(true) / (1024 * 1024), 2) . "M", 7)
@@ -1796,7 +1971,7 @@ class Worker
     public static function checkErrors()
     {
         if (static::STATUS_SHUTDOWN != static::$_status) {
-            $error_msg = static::$_OS === 'linx' ? 'Worker['. posix_getpid() .'] process terminated' : 'Worker process terminated';
+            $error_msg = static::$_OS === OS_TYPE_LINUX ? 'Worker['. posix_getpid() .'] process terminated' : 'Worker process terminated';
             $errors    = error_get_last();
             if ($errors && ($errors['type'] === E_ERROR ||
                     $errors['type'] === E_PARSE ||
@@ -1866,19 +2041,62 @@ class Worker
             static::safeEcho($msg);
         }
         file_put_contents((string)static::$logFile, date('Y-m-d H:i:s') . ' ' . 'pid:'
-            . (static::$_OS === 'linux' ? posix_getpid() : 1) . ' ' . $msg, FILE_APPEND | LOCK_EX);
+            . (static::$_OS === OS_TYPE_LINUX ? posix_getpid() : 1) . ' ' . $msg, FILE_APPEND | LOCK_EX);
     }
 
     /**
      * Safe Echo.
-     *
      * @param $msg
+     * @param bool $decorated
+     * @return bool
+     */
+    public static function safeEcho($msg, $decorated = false)
+    {
+        $stream = static::outputStream();
+        if (!$stream) {
+            return false;
+        }
+        if (!$decorated) {
+            $line = $white = $green = $end = '';
+            if (static::$_outputDecorated) {
+                $line = "\033[1A\n\033[K";
+                $white = "\033[47;30m";
+                $green = "\033[32;40m";
+                $end = "\033[0m";
+            }
+            $msg = str_replace(array('<n>', '<w>', '<g>'), array($line, $white, $green), $msg);
+            $msg = str_replace(array('</n>', '</w>', '</g>'), $end, $msg);
+        } elseif (!static::$_outputDecorated) {
+            return false;
+        }
+        fwrite($stream, $msg);
+        fflush($stream);
+        return true;
+    }
+
+    /**
+     * @param null $stream
+     * @return bool|resource
      */
-    public static function safeEcho($msg)
+    private static function outputStream($stream = null)
     {
-        if (!function_exists('posix_isatty') || posix_isatty(STDOUT)) {
-            echo $msg;
+        if (!$stream) {
+            $stream = static::$_outputStream ? static::$_outputStream : STDOUT;
+        }
+        if (!$stream || !is_resource($stream) || 'stream' !== get_resource_type($stream)) {
+            return false;
         }
+        $stat = fstat($stream);
+        if (($stat['mode'] & 0170000) === 0100000) {
+            // file
+            static::$_outputDecorated = false;
+        } else {
+            static::$_outputDecorated =
+                static::$_OS === OS_TYPE_LINUX &&
+                function_exists('posix_isatty') &&
+                posix_isatty($stream);
+        }
+        return static::$_outputStream = $stream;
     }
 
     /**
@@ -1890,7 +2108,7 @@ class Worker
     public function __construct($socket_name = '', $context_option = array())
     {
         // Save all worker instances.
-        $this->workerId                  = spl_object_hash($this);
+        $this->workerId                    = spl_object_hash($this);
         static::$_workers[$this->workerId] = $this;
         static::$_pidMap[$this->workerId]  = array();
 
@@ -1975,9 +2193,11 @@ class Worker
 
             // Try to open keepalive for tcp and disable Nagle algorithm.
             if (function_exists('socket_import_stream') && static::$_builtinTransports[$this->transport] === 'tcp') {
+                set_error_handler(function(){});
                 $socket = socket_import_stream($this->_mainSocket);
-                @socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
-                @socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1);
+                socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
+                socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1);
+                restore_error_handler();
             }
 
             // Non blocking.
@@ -1995,7 +2215,9 @@ class Worker
     public function unlisten() {
         $this->pauseAccept();
         if ($this->_mainSocket) {
-            @fclose($this->_mainSocket);
+            set_error_handler(function(){});
+            fclose($this->_mainSocket);
+            restore_error_handler();
             $this->_mainSocket = null;
         }
     }
@@ -2076,6 +2298,8 @@ class Worker
             $this->onMessage = function () {};
         }
 
+        restore_error_handler();
+        
         // Try to emit onWorkerStart callback.
         if ($this->onWorkerStart) {
             try {
@@ -2126,8 +2350,6 @@ class Worker
         }
         // Clear callback.
         $this->onMessage = $this->onClose = $this->onError = $this->onBufferDrain = $this->onBufferFull = null;
-        // Remove worker instance from static::$_workers.
-        unset(static::$_workers[$this->workerId]);
     }
 
     /**
@@ -2139,7 +2361,10 @@ class Worker
     public function acceptConnection($socket)
     {
         // Accept a connection on server socket.
-        $new_socket = @stream_socket_accept($socket, 0, $remote_address);
+        set_error_handler(function(){});
+        $new_socket = stream_socket_accept($socket, 0, $remote_address);
+        restore_error_handler();
+
         // Thundering herd.
         if (!$new_socket) {
             return;
@@ -2179,7 +2404,9 @@ class Worker
      */
     public function acceptUdpConnection($socket)
     {
+        set_error_handler(function(){});
         $recv_buffer = stream_socket_recvfrom($socket, static::MAX_UDP_PACKAGE_SIZE, 0, $remote_address);
+        restore_error_handler();
         if (false === $recv_buffer || empty($remote_address)) {
             return false;
         }
@@ -2187,16 +2414,16 @@ class Worker
         $connection           = new UdpConnection($socket, $remote_address);
         $connection->protocol = $this->protocol;
         if ($this->onMessage) {
-            if ($this->protocol !== null) {
-                /** @var \Workerman\Protocols\ProtocolInterface $parser */
-                $parser      = $this->protocol;
-                $recv_buffer = $parser::decode($recv_buffer, $connection);
-                // Discard bad packets.
-                if ($recv_buffer === false)
-                    return true;
-            }
-            ConnectionInterface::$statistics['total_request']++;
             try {
+                if ($this->protocol !== null) {
+                    /** @var \Workerman\Protocols\ProtocolInterface $parser */
+                    $parser      = $this->protocol;
+                    $recv_buffer = $parser::decode($recv_buffer, $connection);
+                    // Discard bad packets.
+                    if ($recv_buffer === false)
+                        return true;
+                }
+                ConnectionInterface::$statistics['total_request']++;
                 call_user_func($this->onMessage, $connection, $recv_buffer);
             } catch (\Exception $e) {
                 static::log($e);