Jelajahi Sumber

add connection status

walkor 8 tahun lalu
induk
melakukan
42623cabe4
4 mengubah file dengan 319 tambahan dan 4 penghapusan
  1. 42 0
      Connection/ConnectionInterface.php
  2. 118 3
      Connection/TcpConnection.php
  3. 76 0
      Connection/UdpConnection.php
  4. 83 1
      Worker.php

+ 42 - 0
Connection/ConnectionInterface.php

@@ -74,6 +74,48 @@ abstract class  ConnectionInterface
     abstract public function getRemotePort();
 
     /**
+     * Get remote address.
+     *
+     * @return string
+     */
+    abstract public function getRemoteAddress();
+
+    /**
+     * Get remote IP.
+     *
+     * @return string
+     */
+    abstract public function getLocalIp();
+
+    /**
+     * Get remote port.
+     *
+     * @return int
+     */
+    abstract public function getLocalPort();
+
+    /**
+     * Get remote address.
+     *
+     * @return string
+     */
+    abstract public function getLocalAddress();
+
+    /**
+     * Is ipv4.
+     *
+     * @return bool
+     */
+    abstract public function isIPv4();
+
+    /**
+     * Is ipv6.
+     *
+     * @return bool
+     */
+    abstract public function isIPv6();
+
+    /**
      * Close connection.
      *
      * @param $data

+ 118 - 3
Connection/TcpConnection.php

@@ -122,6 +122,20 @@ class TcpConnection extends ConnectionInterface
     public $worker = null;
 
     /**
+     * Bytes read.
+     *
+     * @var int
+     */
+    public $bytesRead = 0;
+
+    /**
+     * Bytes written.
+     *
+     * @var int
+     */
+    public $bytesWritten = 0;
+
+    /**
      * Connection->id.
      *
      * @var int
@@ -214,7 +228,7 @@ class TcpConnection extends ConnectionInterface
     protected $_isPaused = false;
 
     /**
-     * SSL handshake completed or not
+     * SSL handshake completed or not.
      *
      * @var bool
      */
@@ -283,11 +297,13 @@ class TcpConnection extends ConnectionInterface
             $len = @fwrite($this->_socket, $send_buffer, 8192);
             // send successful.
             if ($len === strlen($send_buffer)) {
+                $this->bytesWritten += $len;
                 return true;
             }
             // Send only part of the data.
             if ($len > 0) {
                 $this->_sendBuffer = substr($send_buffer, $len);
+                $this->bytesWritten += $len;
             } else {
                 // Connection closed?
                 if (!is_resource($this->_socket) || feof($this->_socket)) {
@@ -333,7 +349,7 @@ class TcpConnection extends ConnectionInterface
     {
         $pos = strrpos($this->_remoteAddress, ':');
         if ($pos) {
-            return trim(substr($this->_remoteAddress, 0, $pos), '[]');
+            return substr($this->_remoteAddress, 0, $pos);
         }
         return '';
     }
@@ -352,6 +368,102 @@ class TcpConnection extends ConnectionInterface
     }
 
     /**
+     * Get remote address.
+     *
+     * @return string
+     */
+    public function getRemoteAddress()
+    {
+        return $this->_remoteAddress;
+    }
+
+    /**
+     * Get local IP.
+     *
+     * @return string
+     */
+    public function getLocalIp()
+    {
+        $address = $this->getLocalAddress();
+        $pos = strrpos($address, ':');
+        if (!$pos) {
+            return '';
+        }
+        return substr($address, 0, $pos);
+    }
+
+    /**
+     * Get local port.
+     *
+     * @return int
+     */
+    public function getLocalPort()
+    {
+        $address = $this->getLocalAddress();
+        $pos = strrpos($address, ':');
+        if (!$pos) {
+            return 0;
+        }
+        return (int)substr(strrchr($address, ':'), 1);
+    }
+
+    /**
+     * Get local address.
+     *
+     * @return string
+     */
+    public function getLocalAddress()
+    {
+        return (string)@stream_socket_get_name($this->_socket, false);
+    }
+
+    /**
+     * Get send buffer queue size.
+     *
+     * @return integer
+     */
+    public function getSendBufferQueueSize()
+    {
+        return strlen($this->_sendBuffer);
+    }
+
+    /**
+     * Get recv buffer queue size.
+     *
+     * @return integer
+     */
+    public function getRecvBufferQueueSize()
+    {
+        return strlen($this->_recvBuffer);
+    }
+
+    /**
+     * Is ipv4.
+     *
+     * return bool.
+     */
+    public function isIpV4()
+    {
+        if ($this->transport === 'unix') {
+            return false;
+        }
+        return strpos($this->getRemoteIp(), ':') === false;
+    }
+
+    /**
+     * Is ipv6.
+     *
+     * return bool.
+     */
+    public function isIpV6()
+    {
+        if ($this->transport === 'unix') {
+            return false;
+        }
+        return strpos($this->getRemoteIp(), ':') !== false;
+    }
+
+    /**
      * Pauses the reading of data. That is onMessage will not be emitted. Useful to throttle back an upload.
      *
      * @return void
@@ -417,7 +529,7 @@ class TcpConnection extends ConnectionInterface
             return;
         }
 
-        $buffer = fread($socket, self::READ_BUFFER_SIZE);
+        $buffer = @fread($socket, self::READ_BUFFER_SIZE);
 
         // Check connection closed.
         if ($buffer === '' || $buffer === false) {
@@ -426,6 +538,7 @@ class TcpConnection extends ConnectionInterface
                 return;
             }
         } else {
+            $this->bytesRead += strlen($buffer);
             $this->_recvBuffer .= $buffer;
         }
 
@@ -521,6 +634,7 @@ class TcpConnection extends ConnectionInterface
     {
         $len = @fwrite($this->_socket, $this->_sendBuffer, 8192);
         if ($len === strlen($this->_sendBuffer)) {
+            $this->bytesWritten += $len;
             Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
             $this->_sendBuffer = '';
             // Try to emit onBufferDrain callback when the send buffer becomes empty. 
@@ -541,6 +655,7 @@ class TcpConnection extends ConnectionInterface
             return true;
         }
         if ($len > 0) {
+            $this->bytesWritten += $len;
             $this->_sendBuffer = substr($this->_sendBuffer, $len);
         } else {
             self::$statistics['send_fail']++;

+ 76 - 0
Connection/UdpConnection.php

@@ -99,6 +99,82 @@ class UdpConnection extends ConnectionInterface
     }
 
     /**
+     * Get remote address.
+     *
+     * @return string
+     */
+    public function getRemoteAddress()
+    {
+        return $this->_remoteAddress;
+    }
+
+    /**
+     * Get local IP.
+     *
+     * @return string
+     */
+    public function getLocalIp()
+    {
+        $address = $this->getLocalAddress();
+        $pos = strrpos($address, ':');
+        if (!$pos) {
+            return '';
+        }
+        return substr($address, 0, $pos);
+    }
+
+    /**
+     * Get local port.
+     *
+     * @return int
+     */
+    public function getLocalPort()
+    {
+        $address = $this->getLocalAddress();
+        $pos = strrpos($address, ':');
+        if (!$pos) {
+            return 0;
+        }
+        return (int)substr(strrchr($address, ':'), 1);
+    }
+
+    /**
+     * Get local address.
+     *
+     * @return string
+     */
+    public function getLocalAddress()
+    {
+        return (string)@stream_socket_get_name($this->_socket, false);
+    }
+
+    /**
+     * Is ipv4.
+     *
+     * return bool.
+     */
+    public function isIpV4()
+    {
+        if ($this->transport === 'unix') {
+            return false;
+        }
+        return strpos($this->getRemoteIp(), ':') === false;
+    }
+
+    /**
+     * Is ipv6.
+     *
+     * return bool.
+     */
+    public function isIpV6()
+    {
+        if ($this->transport === 'unix') {
+            return false;
+        }
+        return strpos($this->getRemoteIp(), ':') !== false;
+    }
+
+    /**
      * Close connection.
      *
      * @param mixed $data

+ 83 - 1
Worker.php

@@ -629,6 +629,7 @@ class Worker
             'restart',
             'reload',
             'status',
+            'connections',
         );
         if (!isset($argv[1]) || !in_array($argv[1], $available_commands)) {
             exit("Usage: php yourfile.php {" . implode('|', $available_commands) . "}\n");
@@ -671,11 +672,13 @@ class Worker
                 }
                 break;
             case 'status':
+            case 'connections':
                 if (is_file(self::$_statisticsFile)) {
                     @unlink(self::$_statisticsFile);
                 }
                 // Master process will send status signal to all child processes.
-                posix_kill($master_pid, SIGUSR2);
+                $signal = $command === 'status' ? SIGUSR2 : SIGIO;
+                posix_kill($master_pid, $signal);
                 // Waiting amoment.
                 usleep(500000);
                 // Display statisitcs data from a disk file.
@@ -735,6 +738,8 @@ class Worker
         pcntl_signal(SIGUSR1, array('\Workerman\Worker', 'signalHandler'), false);
         // status
         pcntl_signal(SIGUSR2, array('\Workerman\Worker', 'signalHandler'), false);
+        // connection status
+        pcntl_signal(SIGIO, array('\Workerman\Worker', 'signalHandler'), false);
         // ignore
         pcntl_signal(SIGPIPE, SIG_IGN, false);
     }
@@ -758,6 +763,8 @@ class Worker
         self::$globalEvent->add(SIGUSR1, EventInterface::EV_SIGNAL, array('\Workerman\Worker', 'signalHandler'));
         // reinstall  status signal handler
         self::$globalEvent->add(SIGUSR2, EventInterface::EV_SIGNAL, array('\Workerman\Worker', 'signalHandler'));
+        // reinstall connection status signal handler
+        self::$globalEvent->add(SIGIO, EventInterface::EV_SIGNAL, array('\Workerman\Worker', 'signalHandler'));
     }
 
     /**
@@ -781,6 +788,10 @@ class Worker
             case SIGUSR2:
                 self::writeStatisticsToStatusFile();
                 break;
+            // Show connection status.
+            case SIGIO:
+                self::writeConnectionsStatisticsToStatusFile();
+                break;
         }
     }
 
@@ -1306,6 +1317,77 @@ class Worker
     }
 
     /**
+     * Write statistics data to disk.
+     *
+     * @return void
+     */
+    protected static function writeConnectionsStatisticsToStatusFile()
+    {
+        // For master process.
+        if (self::$_masterPid === posix_getpid()) {
+            file_put_contents(self::$_statisticsFile, "Trans   ipv4   ipv6   Recv-Q       Send-Q       Bytes-R      Bytes-W      Local Address          Foreign Address        PID     ID        Protocol     Worker\n", FILE_APPEND);
+            chmod(self::$_statisticsFile, 0722);
+            foreach (self::getAllWorkerPids() as $worker_pid) {
+                posix_kill($worker_pid, SIGIO);
+            }
+            return;
+        }
+
+        // For child processes.
+        $bytes_format = function($bytes)
+        {
+            if($bytes > 1024*1024*1024*1024) {
+                return round($bytes/(1024*1024*1024*1024), 1)."TB";
+            }
+            if($bytes > 1024*1024*1024) {
+                return round($bytes/(1024*1024*1024), 1)."GB";
+            }
+            if($bytes > 1024*1024) {
+                return round($bytes/(1024*1024), 1)."MB";
+            }
+            if($bytes > 1024) {
+                return round($bytes/(1024), 1)."KB";
+            }
+            return $bytes."B";
+        };
+
+        $pid = posix_getpid();
+        $str = '';
+        /** @var Worker $worker */
+        foreach(self::$_workers as $worker) {
+            foreach($worker->connections as $connection) {
+                /** @var Connection\TcpConnection $connection */
+                $transport = $connection->transport;
+                $ipv4 = $connection->isIpV4() ? ' 1' : ' 0';
+                $ipv6 = $connection->isIpV6() ? ' 1' : ' 0';
+                $recv_q = $bytes_format($connection->getRecvBufferQueueSize());
+                $send_q = $bytes_format($connection->getSendBufferQueueSize());
+                $local_address = $connection->getLocalAddress();
+                $remote_address = $connection->getRemoteAddress();
+                $bytes_read = $bytes_format($connection->bytesRead);
+                $bytes_written = $bytes_format($connection->bytesWritten);
+                $id = $connection->id;
+                $pos = strrpos($connection->protocol, '\\');
+                if ($pos) {
+                    $protocol = substr($connection->protocol, $pos+1);
+                } else {
+                    $protocol = $connection->protocol;
+                }
+
+                $str .= str_pad($transport, 8).str_pad($ipv4, 7).str_pad($ipv6, 7)
+                    .str_pad($recv_q, 13).str_pad($send_q, 13).str_pad($bytes_read, 13).str_pad($bytes_written, 13)
+                    .str_pad($local_address, 22).' '.str_pad($remote_address, 22).' '.str_pad($pid, 8).str_pad($id, 10)
+                    .str_pad($protocol, 12).' '.$worker->name."\n" ;
+            }
+        }
+        if ($str) {
+            file_put_contents(self::$_statisticsFile, $str, FILE_APPEND);
+        }
+
+        reset(self::$_workers);
+    }
+
+    /**
      * Check errors when current process exited.
      *
      * @return void