Przeglądaj źródła

connection status

walkor 8 lat temu
rodzic
commit
709ba27802
3 zmienionych plików z 65 dodań i 21 usunięć
  1. 5 4
      Connection/AsyncTcpConnection.php
  2. 43 6
      Connection/TcpConnection.php
  3. 17 11
      Worker.php

+ 5 - 4
Connection/AsyncTcpConnection.php

@@ -128,7 +128,7 @@ class AsyncTcpConnection extends TcpConnection
             $scheme               = isset($address_info['scheme']) ? $address_info['scheme'] : 'tcp';
         }
 
-        $this->id             = self::$_idRecorder++;
+        $this->id = $this->_id = self::$_idRecorder++;
         // Check application layer protocol class.
         if (!isset(self::$_builtinTransports[$scheme])) {
             $scheme         = ucfirst($scheme);
@@ -145,8 +145,9 @@ class AsyncTcpConnection extends TcpConnection
 
         // For statistics.
         self::$statistics['connection_count']++;
-        $this->maxSendBufferSize = self::$defaultMaxSendBufferSize;
-        $this->_contextOption    = $context_option;
+        $this->maxSendBufferSize        = self::$defaultMaxSendBufferSize;
+        $this->_contextOption           = $context_option;
+        static::$connections[$this->id] = $this;
     }
 
     /**
@@ -285,7 +286,7 @@ class AsyncTcpConnection extends TcpConnection
             if ($this->_sendBuffer) {
                 Worker::$globalEvent->add($socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
             }
-            $this->_status                = self::STATUS_ESTABLISH;
+            $this->_status                = self::STATUS_ESTABLISHED;
             $this->_remoteAddress         = $address;
             $this->_sslHandshakeCompleted = true;
 

+ 43 - 6
Connection/TcpConnection.php

@@ -48,7 +48,7 @@ class TcpConnection extends ConnectionInterface
      *
      * @var int
      */
-    const STATUS_ESTABLISH = 2;
+    const STATUS_ESTABLISHED = 2;
 
     /**
      * Status closing.
@@ -105,7 +105,7 @@ class TcpConnection extends ConnectionInterface
      *
      * @var \Workerman\Protocols\ProtocolInterface
      */
-    public $protocol = null;
+    public $protocol = 'tcp';
 
     /**
      * Transport (tcp/udp/unix/ssl).
@@ -211,7 +211,7 @@ class TcpConnection extends ConnectionInterface
      *
      * @var int
      */
-    protected $_status = self::STATUS_ESTABLISH;
+    protected $_status = self::STATUS_ESTABLISHED;
 
     /**
      * Remote address.
@@ -235,6 +235,26 @@ class TcpConnection extends ConnectionInterface
     protected $_sslHandshakeCompleted = false;
 
     /**
+     * All connection instances.
+     *
+     * @var array
+     */
+    public static $connections = array();
+
+    /**
+     * Status to string.
+     *
+     * @var array
+     */
+    public static $_statusToString = array(
+        self::STATUS_INITIAL     => 'INITIAL',
+        self::STATUS_CONNECTING  => 'CONNECTING',
+        self::STATUS_ESTABLISHED => 'ESTABLISHED',
+        self::STATUS_CLOSING     => 'CLOSING',
+        self::STATUS_CLOSED      => 'CLOSED',
+    );
+
+    /**
      * Construct.
      *
      * @param resource $socket
@@ -253,6 +273,22 @@ class TcpConnection extends ConnectionInterface
         Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
         $this->maxSendBufferSize = self::$defaultMaxSendBufferSize;
         $this->_remoteAddress    = $remote_address;
+        static::$connections[$this->id] = $this;
+    }
+
+    /**
+     * Get status.
+     *
+     * @param bool $raw_output
+     *
+     * @return int
+     */
+    public function getStatus($raw_output = true)
+    {
+        if ($raw_output) {
+            return $this->_status;
+        }
+        return self::$_statusToString[$this->_status];
     }
 
     /**
@@ -269,7 +305,7 @@ class TcpConnection extends ConnectionInterface
         }
 
         // Try to call protocol::encode($send_buffer) before sending.
-        if (false === $raw && $this->protocol) {
+        if (false === $raw && $this->protocol !== 'tcp') {
             $parser      = $this->protocol;
             $send_buffer = $parser::encode($send_buffer, $this);
             if ($send_buffer === '') {
@@ -277,7 +313,7 @@ class TcpConnection extends ConnectionInterface
             }
         }
 
-        if ($this->_status !== self::STATUS_ESTABLISH ||
+        if ($this->_status !== self::STATUS_ESTABLISHED ||
             ($this->transport === 'ssl' && $this->_sslHandshakeCompleted !== true)
         ) {
             if ($this->_sendBuffer) {
@@ -543,7 +579,7 @@ class TcpConnection extends ConnectionInterface
         }
 
         // If the application layer protocol has been set up.
-        if ($this->protocol) {
+        if ($this->protocol !== 'tcp') {
             $parser = $this->protocol;
             while ($this->_recvBuffer !== '' && !$this->_isPaused) {
                 // The current packet length is known.
@@ -796,6 +832,7 @@ class TcpConnection extends ConnectionInterface
         if ($this->worker) {
             unset($this->worker->connections[$this->_id]);
         }
+        unset(static::$connections[$this->_id]);
         $this->_status = self::STATUS_CLOSED;
         // Try to emit onClose callback.
         if ($this->onClose) {

+ 17 - 11
Worker.php

@@ -33,7 +33,7 @@ class Worker
      *
      * @var string
      */
-    const VERSION = '3.4.7';
+    const VERSION = '3.4.8';
 
     /**
      * Status starting.
@@ -215,7 +215,7 @@ class Worker
      *
      * @var Protocols\ProtocolInterface
      */
-    public $protocol = '';
+    public $protocol = 'tcp';
 
     /**
      * Root path for autoload.
@@ -988,6 +988,8 @@ class Worker
      *
      * @param int $worker_id
      * @param int $pid
+     *
+     * @return integer
      */
     protected static function getId($worker_id, $pid)
     {
@@ -1195,6 +1197,7 @@ class Worker
             Timer::add(self::KILL_WORKER_TIMER_TIME, 'posix_kill', array($one_worker_pid, SIGKILL), false);
         } // For child processes.
         else {
+            reset(self::$_workers);
             $worker = current(self::$_workers);
             // Try to emit onWorkerReload callback.
             if ($worker->onWorkerReload) {
@@ -1325,7 +1328,8 @@ class Worker
     {
         // For master process.
         if (self::$_masterPid === posix_getpid()) {
-            file_put_contents(self::$_statisticsFile, "Trans   ipv4   ipv6   Recv-Q       Send-Q       Bytes-R      Bytes-W      Local Address          Foreign Address        PID     ID        Protocol     Worker\n", FILE_APPEND);
+            file_put_contents(self::$_statisticsFile, "------------------------------------------------------------ WORKERMAN CONNECTION STATUS --------------------------------------------------------------------------------\n", FILE_APPEND);
+            file_put_contents(self::$_statisticsFile, "Trans   ipv4   ipv6   Recv-Q       Send-Q       Bytes-R      Bytes-W      Local Address          Foreign Address        Status      PID     ID        Protocol     Worker\n", FILE_APPEND);
             chmod(self::$_statisticsFile, 0722);
             foreach (self::getAllWorkerPids() as $worker_pid) {
                 posix_kill($worker_pid, SIGIO);
@@ -1353,9 +1357,12 @@ class Worker
 
         $pid = posix_getpid();
         $str = '';
+        reset(self::$_workers);
+        $current_worker = current(self::$_workers);
+        $default_worker_name = $current_worker->name;
+
         /** @var Worker $worker */
-        foreach(self::$_workers as $worker) {
-            foreach($worker->connections as $connection) {
+        foreach(TcpConnection::$connections as $connection) {
                 /** @var Connection\TcpConnection $connection */
                 $transport = $connection->transport;
                 $ipv4 = $connection->isIpV4() ? ' 1' : ' 0';
@@ -1364,6 +1371,7 @@ class Worker
                 $send_q = $bytes_format($connection->getSendBufferQueueSize());
                 $local_address = $connection->getLocalAddress();
                 $remote_address = $connection->getRemoteAddress();
+                $state = $connection->getStatus(false);
                 $bytes_read = $bytes_format($connection->bytesRead);
                 $bytes_written = $bytes_format($connection->bytesWritten);
                 $id = $connection->id;
@@ -1373,18 +1381,16 @@ class Worker
                 } else {
                     $protocol = $connection->protocol;
                 }
-
+                $worker_name = isset($connection->worker) ? $connection->worker->name : $default_worker_name;
                 $str .= str_pad($transport, 8).str_pad($ipv4, 7).str_pad($ipv6, 7)
                     .str_pad($recv_q, 13).str_pad($send_q, 13).str_pad($bytes_read, 13).str_pad($bytes_written, 13)
-                    .str_pad($local_address, 22).' '.str_pad($remote_address, 22).' '.str_pad($pid, 8).str_pad($id, 10)
-                    .str_pad($protocol, 12).' '.$worker->name."\n" ;
-            }
+                    .str_pad($local_address, 22).' '.str_pad($remote_address, 22).' ' . str_pad($state, 12) . str_pad($pid, 8).str_pad($id, 10)
+                    .str_pad($protocol, 12).' '.$worker_name."\n" ;
+
         }
         if ($str) {
             file_put_contents(self::$_statisticsFile, $str, FILE_APPEND);
         }
-
-        reset(self::$_workers);
     }
 
     /**