Jelajahi Sumber

Merge pull request #1 from walkor/master

update
linkec 7 tahun lalu
induk
melakukan
90f8d1799e

+ 7 - 7
Connection/AsyncTcpConnection.php

@@ -288,15 +288,18 @@ class AsyncTcpConnection extends TcpConnection
      */
     public function checkConnection()
     {
-        if ($this->_status != self::STATUS_CONNECTING) {
-            return;
-        }
-
         // Remove EV_EXPECT for windows.
         if(DIRECTORY_SEPARATOR === '\\') {
             Worker::$globalEvent->del($this->_socket, EventInterface::EV_EXCEPT);
         }
 
+        // Remove write listener.
+        Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
+
+        if ($this->_status != self::STATUS_CONNECTING) {
+            return;
+        }
+
         // Check socket state.
         if ($address = stream_socket_get_name($this->_socket, true)) {
             // Nonblocking.
@@ -312,9 +315,6 @@ class AsyncTcpConnection extends TcpConnection
                 socket_set_option($raw_socket, SOL_TCP, TCP_NODELAY, 1);
             }
 
-            // Remove write listener.
-            Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
-
             // SSL handshake.
             if ($this->transport === 'ssl') {
                 $this->_sslHandshakeCompleted = $this->doSslHandshake($this->_socket);

+ 3 - 0
Connection/AsyncUdpConnection.php

@@ -185,6 +185,9 @@ class AsyncUdpConnection extends UdpConnection
             Worker::safeEcho(new \Exception($errmsg));
             return;
         }
+        
+        stream_set_blocking($this->_socket, false);
+        
         if ($this->onMessage) {
             Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
         }

+ 13 - 3
Connection/TcpConnection.php

@@ -165,11 +165,18 @@ class TcpConnection extends ConnectionInterface
     public static $defaultMaxSendBufferSize = 1048576;
 
     /**
-     * Maximum acceptable packet size.
+     * Sets the maximum acceptable packet size for the current connection.
      *
      * @var int
      */
-    public static $maxPackageSize = 10485760;
+    public $maxPackageSize = 1048576;
+    
+    /**
+     * Default maximum acceptable packet size.
+     *
+     * @var int
+     */
+    public static $defaultMaxPackageSize = 10485760;
 
     /**
      * Id recorder.
@@ -298,6 +305,7 @@ class TcpConnection extends ConnectionInterface
         }
         Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
         $this->maxSendBufferSize        = self::$defaultMaxSendBufferSize;
+        $this->maxPackageSize           = self::$defaultMaxPackageSize;
         $this->_remoteAddress           = $remote_address;
         static::$connections[$this->id] = $this;
     }
@@ -615,7 +623,7 @@ class TcpConnection extends ConnectionInterface
                     // The packet length is unknown.
                     if ($this->_currentPackageLength === 0) {
                         break;
-                    } elseif ($this->_currentPackageLength > 0 && $this->_currentPackageLength <= static::$maxPackageSize) {
+                    } elseif ($this->_currentPackageLength > 0 && $this->_currentPackageLength <= $this->maxPackageSize) {
                         // Data is not enough for a package.
                         if ($this->_currentPackageLength > strlen($this->_recvBuffer)) {
                             break;
@@ -827,6 +835,8 @@ class TcpConnection extends ConnectionInterface
         }
         if ($this->_sendBuffer === '') {
             $this->destroy();
+        } else {
+            $this->pauseRecv();
         }
     }
 

+ 0 - 1
Events/Select.php

@@ -262,7 +262,6 @@ class Select implements EventInterface
      */
     public function loop()
     {
-        $e = null;
         while (1) {
             if(DIRECTORY_SEPARATOR === '/') {
                 // Calls signal handlers for pending signals

+ 1 - 0
Lib/Timer.php

@@ -14,6 +14,7 @@
 namespace Workerman\Lib;
 
 use Workerman\Events\EventInterface;
+use Workerman\Worker;
 use Exception;
 
 /**

+ 2 - 2
Protocols/Http.php

@@ -38,7 +38,7 @@ class Http
     {
         if (!strpos($recv_buffer, "\r\n\r\n")) {
             // Judge whether the package length exceeds the limit.
-            if (strlen($recv_buffer) >= $connection::$maxPackageSize) {
+            if (strlen($recv_buffer) >= $connection->maxPackageSize) {
                 $connection->close();
                 return 0;
             }
@@ -565,7 +565,7 @@ class Http
                                 'file_data' => $boundary_value,
                                 'file_size' => strlen($boundary_value),
                             );
-                            continue;
+                            continue 2;
                         } // Is post field.
                         else {
                             // Parse $_POST.

+ 2 - 2
Protocols/Text.php

@@ -30,7 +30,7 @@ class Text
     public static function input($buffer, TcpConnection $connection)
     {
         // Judge whether the package length exceeds the limit.
-        if (strlen($buffer) >= $connection::$maxPackageSize) {
+        if (strlen($buffer) >= $connection->maxPackageSize) {
             $connection->close();
             return 0;
         }
@@ -65,6 +65,6 @@ class Text
     public static function decode($buffer)
     {
         // Remove "\n"
-        return trim($buffer);
+        return rtrim($buffer, "\r\n");
     }
 }

+ 1 - 1
Protocols/Websocket.php

@@ -140,7 +140,7 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
             $current_frame_length = $head_len + $data_len;
 
             $total_package_size = strlen($connection->websocketDataBuffer) + $current_frame_length;
-            if ($total_package_size > $connection::$maxPackageSize) {
+            if ($total_package_size > $connection->maxPackageSize) {
                 Worker::safeEcho("error package. package_length=$total_package_size\n");
                 $connection->close();
                 return 0;

+ 13 - 11
Protocols/Ws.php

@@ -137,7 +137,7 @@ class Ws
             }
 
             $total_package_size = strlen($connection->websocketDataBuffer) + $current_frame_length;
-            if ($total_package_size > $connection::$maxPackageSize) {
+            if ($total_package_size > $connection->maxPackageSize) {
                 Worker::safeEcho("error package. package_length=$total_package_size\n");
                 $connection->close();
                 return 0;
@@ -366,25 +366,27 @@ class Ws
         $host = $port === 80 ? $connection->getRemoteHost() : $connection->getRemoteHost() . ':' . $port;
         // Handshake header.
         $connection->websocketSecKey = base64_encode(md5(mt_rand(), true));
-        $userHeader = '';
-        if (!empty($connection->wsHttpHeader)) {
-            if (is_array($connection->wsHttpHeader)){
-                foreach($connection->wsHttpHeader as $k=>$v){
-                    $userHeader .= "$k: $v\r\n";
+        $user_header = isset($connection->headers) ? $connection->headers :
+            (isset($connection->wsHttpHeader) ? $connection->wsHttpHeader : null);
+        $user_header_str = '';
+        if (!empty($user_header)) {
+            if (is_array($user_header)){
+                foreach($user_header as $k=>$v){
+                    $user_header_str .= "$k: $v\r\n";
                 }
-            }else{
-                $userHeader .= $connection->wsHttpHeader;
+            } else {
+                $user_header_str .= $user_header;
             }
-            $userHeader = "\r\n".trim($userHeader);
+            $user_header_str = "\r\n".trim($user_header_str);
         }
         $header = 'GET ' . $connection->getRemoteURI() . " HTTP/1.1\r\n".
-        "Host: $host\r\n".
+        (!preg_match("/\nHost:/i", $user_header_str) ? "Host: $host\r\n" : '').
         "Connection: Upgrade\r\n".
         "Upgrade: websocket\r\n".
         "Origin: ". (isset($connection->websocketOrigin) ? $connection->websocketOrigin : '*') ."\r\n".
         (isset($connection->WSClientProtocol)?"Sec-WebSocket-Protocol: ".$connection->WSClientProtocol."\r\n":'').
         "Sec-WebSocket-Version: 13\r\n".
-        "Sec-WebSocket-Key: " . $connection->websocketSecKey . $userHeader . "\r\n\r\n";
+        "Sec-WebSocket-Key: " . $connection->websocketSecKey . $user_header_str . "\r\n\r\n";
         $connection->send($header, true);
         $connection->handshakeStep               = 1;
         $connection->websocketCurrentFrameLength = 0;

+ 151 - 49
Worker.php

@@ -33,7 +33,7 @@ class Worker
      *
      * @var string
      */
-    const VERSION = '3.5.14';
+    const VERSION = '3.5.16';
 
     /**
      * Status starting.
@@ -85,6 +85,13 @@ class Worker
     const MAX_UDP_PACKAGE_SIZE = 65535;
 
     /**
+     * The safe distance for columns adjacent
+     *
+     * @var int
+     */
+    const UI_SAFE_LENGTH = 4;
+
+    /**
      * Worker id.
      *
      * @var int
@@ -329,7 +336,7 @@ class Worker
     protected static $_workers = array();
 
     /**
-     * All worker porcesses pid.
+     * All worker processes pid.
      * The format is like this [worker_id=>[pid=>pid, pid=>pid, ..], ..]
      *
      * @var array
@@ -381,6 +388,27 @@ class Worker
     protected static $_maxUserNameLength = 12;
 
     /**
+     * Maximum length of the Proto names.
+     *
+     * @var int
+     */
+    protected static $_maxProtoNameLength = 4;
+
+    /**
+     * Maximum length of the Processes names.
+     *
+     * @var int
+     */
+    protected static $_maxProcessesNameLength = 9;
+
+    /**
+     * Maximum length of the Status names.
+     *
+     * @var int
+     */
+    protected static $_maxStatusNameLength = 1;
+
+    /**
      * The file to store status info of current worker process.
      *
      * @var string
@@ -425,8 +453,9 @@ class Worker
      */
     protected static $_availableEventLoops = array(
         'libevent' => '\Workerman\Events\Libevent',
-        'event'    => '\Workerman\Events\Event',
-        'swoole'   => '\Workerman\Events\Swoole'
+        'event'    => '\Workerman\Events\Event'
+        // Temporarily removed swoole because it is not stable enough  
+        //'swoole'   => '\Workerman\Events\Swoole'
     );
 
     /**
@@ -562,18 +591,6 @@ class Worker
                 $worker->name = 'none';
             }
 
-            // Get maximum length of worker name.
-            $worker_name_length = strlen($worker->name);
-            if (static::$_maxWorkerNameLength < $worker_name_length) {
-                static::$_maxWorkerNameLength = $worker_name_length;
-            }
-
-            // Get maximum length of socket name.
-            $socket_name_length = strlen($worker->getSocketName());
-            if (static::$_maxSocketNameLength < $socket_name_length) {
-                static::$_maxSocketNameLength = $socket_name_length;
-            }
-
             // Get unix user of the worker process.
             if (empty($worker->user)) {
                 $worker->user = static::getCurrentUser();
@@ -583,10 +600,18 @@ class Worker
                 }
             }
 
-            // Get maximum length of unix user name.
-            $user_name_length = strlen($worker->user);
-            if (static::$_maxUserNameLength < $user_name_length) {
-                static::$_maxUserNameLength = $user_name_length;
+            // Socket name.
+            $worker->socket = $worker->getSocketName();
+
+            // Status name.
+            $worker->status = '<g> [OK] </g>';
+
+            // Get column mapping for UI
+            foreach(static::getUiColumns() as $column_name => $prop){
+                !isset($worker->{$prop}) && $worker->{$prop}= 'NNNN';
+                $prop_length = strlen($worker->{$prop});
+                $key = '_max' . ucfirst(strtolower($column_name)) . 'NameLength';
+                static::$$key = max(static::$$key, $prop_length);
             }
 
             // Listen.
@@ -661,19 +686,41 @@ class Worker
             static::safeEcho("worker               listen                              processes status\r\n");
             return;
         }
-        static::safeEcho("<n>-----------------------<w> WORKERMAN </w>-----------------------------</n>\r\n");
-        static::safeEcho('Workerman version:'. static::VERSION. "          PHP version:". PHP_VERSION. "\r\n");
-        static::safeEcho("------------------------<w> WORKERS </w>-------------------------------\r\n");
-        static::safeEcho("<w>proto</w>    <w>user</w>". str_pad('',
-                static::$_maxUserNameLength + 2 - strlen('user')). "<w>worker</w>". str_pad('',
-                static::$_maxWorkerNameLength + 2 - strlen('worker')). "<w>listen</w>". str_pad('',
-                static::$_maxSocketNameLength + 2 - strlen('listen')). "<w>processes</w> <w>status</w>\n");
+
+        //show version
+        $line_version = 'Workerman version:' . static::VERSION . str_pad('PHP version:', 22, ' ', STR_PAD_LEFT) . PHP_VERSION . PHP_EOL;
+        !defined('LINE_VERSIOIN_LENGTH') && define('LINE_VERSIOIN_LENGTH', strlen($line_version));
+        $total_length = static::getSingleLineTotalLength();
+        $line_one = '<n>' . str_pad('<w> WORKERMAN </w>', $total_length + strlen('<w></w>'), '-', STR_PAD_BOTH) . '</n>'. PHP_EOL;
+        $line_two = str_pad('<w> WORKERS </w>' , $total_length  + strlen('<w></w>'), '-', STR_PAD_BOTH) . PHP_EOL;
+        static::safeEcho($line_one . $line_version . $line_two);
+
+        //Show title
+        $title = '';
+        foreach(static::getUiColumns() as $column_name => $prop){
+            $key = '_max' . ucfirst(strtolower($column_name)) . 'NameLength';
+            //just keep compatible with listen name 
+            $column_name == 'socket' && $column_name = 'listen';
+            $title.= "<w>{$column_name}</w>"  .  str_pad('', static::$$key + static::UI_SAFE_LENGTH - strlen($column_name));
+        }
+        $title && static::safeEcho($title . PHP_EOL);
+
+        //Show content
         foreach (static::$_workers as $worker) {
-            static::safeEcho(str_pad($worker->transport,9). str_pad($worker->user, static::$_maxUserNameLength + 2). str_pad($worker->name,
-                    static::$_maxWorkerNameLength + 2). str_pad($worker->getSocketName(),
-                    static::$_maxSocketNameLength + 2). str_pad(' ' . $worker->count, 9). " <g> [OK] </g>\n");
+            $content = '';
+            foreach(static::getUiColumns() as $column_name => $prop){
+                $key = '_max' . ucfirst(strtolower($column_name)) . 'NameLength';
+                preg_match_all("/(<n>|<\/n>|<w>|<\/w>|<g>|<\/g>)/is", $worker->{$prop}, $matches);
+                $place_holder_length = !empty($matches) ? strlen(implode('', $matches[0])) : 0;
+                $content .= str_pad($worker->{$prop}, static::$$key + static::UI_SAFE_LENGTH + $place_holder_length);
+            }
+            $content && static::safeEcho($content . PHP_EOL);
         }
-        static::safeEcho("----------------------------------------------------------------\n");
+
+        //Show last line
+        $line_last = str_pad('', static::getSingleLineTotalLength(), '-') . PHP_EOL;
+        $content && static::safeEcho($line_last);
+
         if (static::$daemonize) {
             static::safeEcho("Input \"php $argv[0] stop\" to stop. Start success.\n\n");
         } else {
@@ -682,6 +729,49 @@ class Worker
     }
 
     /**
+     * Get UI columns to be shown in terminal
+     *
+     * 1. $column_map: array('ui_column_name' => 'clas_property_name')
+     * 2. Consider move into configuration in future
+     *
+     * @return array
+     */
+    public static function getUiColumns()
+    {
+        $column_map = array(
+            'proto'     =>  'transport',
+            'user'      =>  'user',
+            'worker'    =>  'name',
+            'socket'    =>  'socket',
+            'processes' =>  'count',
+            'status'    =>  'status',
+        );
+
+        return $column_map;
+    }
+
+    /**
+     * Get single line total length for ui
+     *
+     * @return int
+     */
+    public static function getSingleLineTotalLength()
+    {
+        $total_length = 0;
+
+        foreach(static::getUiColumns() as $column_name => $prop){
+            $key = '_max' . ucfirst(strtolower($column_name)) . 'NameLength';
+            $total_length += static::$$key + static::UI_SAFE_LENGTH;
+        }
+
+        //keep beauty when show less colums
+        !defined('LINE_VERSIOIN_LENGTH') && define('LINE_VERSIOIN_LENGTH', 0);
+        $total_length <= LINE_VERSIOIN_LENGTH && $total_length = LINE_VERSIOIN_LENGTH;
+
+        return $total_length;
+    }
+
+    /**
      * Parse command.
      *
      * @return void
@@ -1105,7 +1195,7 @@ class Worker
             return static::$eventLoopClass;
         }
 
-        if (!class_exists('\Swoole\Event')) {
+        if (!class_exists('\Swoole\Event', false)) {
             unset(static::$_availableEventLoops['swoole']);
         }
         
@@ -1339,6 +1429,8 @@ class Worker
             static::$_idMap[$worker->workerId][$id]   = $pid;
         } // For child processes.
         elseif (0 === $pid) {
+            srand();
+            mt_srand();
             if ($worker->reusePort) {
                 $worker->listen();
             }
@@ -1497,18 +1589,13 @@ class Worker
                         unset(static::$_pidsToRestart[$pid]);
                         static::reload();
                     }
-                } else {
-                    // If shutdown state and all child processes exited then master process exit.
-                    if (!static::getAllWorkerPids()) {
-                        static::exitAndClearAll();
-                    }
-                }
-            } else {
-                // If shutdown state and all child processes exited then master process exit.
-                if (static::$_status === static::STATUS_SHUTDOWN && !static::getAllWorkerPids()) {
-                    static::exitAndClearAll();
                 }
             }
+
+            // If shutdown state and all child processes exited then master process exit.
+            if (static::$_status === static::STATUS_SHUTDOWN && !static::getAllWorkerPids()) {
+                static::exitAndClearAll();
+            }
         }
     }
 
@@ -2161,8 +2248,7 @@ class Worker
             if ($this->transport !== 'udp') {
                 static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));
             } else {
-                static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ,
-                    array($this, 'acceptUdpConnection'));
+                static::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptUdpConnection'));
             }
             $this->_pauseAccept = false;
         }
@@ -2332,13 +2418,29 @@ class Worker
                 if ($this->protocol !== null) {
                     /** @var \Workerman\Protocols\ProtocolInterface $parser */
                     $parser      = $this->protocol;
-                    $recv_buffer = $parser::decode($recv_buffer, $connection);
-                    // Discard bad packets.
-                    if ($recv_buffer === false)
-                        return true;
+                    if(method_exists($parser,'input')){
+                        while($recv_buffer !== ''){
+                            $len = $parser::input($recv_buffer, $connection);
+                            if($len == 0)
+                                return true;
+                            $package = substr($recv_buffer,0,$len);
+                            $recv_buffer = substr($recv_buffer,$len);
+                            $data = $parser::decode($package,$connection);
+                            if ($data === false)
+                                continue;
+                            call_user_func($this->onMessage, $connection, $data);
+                        }
+                    }else{
+                        $data = $parser::decode($recv_buffer, $connection);
+                        // Discard bad packets.
+                        if ($data === false)
+                            return true;
+                        call_user_func($this->onMessage, $connection, $data);
+                    }
+                }else{
+                    call_user_func($this->onMessage, $connection, $recv_buffer);
                 }
                 ConnectionInterface::$statistics['total_request']++;
-                call_user_func($this->onMessage, $connection, $recv_buffer);
             } catch (\Exception $e) {
                 static::log($e);
                 exit(250);