浏览代码

support Protocol::onConnect Protocol::onClose

walkor 9 年之前
父节点
当前提交
3fc35c30cb
共有 4 个文件被更改,包括 53 次插入21 次删除
  1. 9 7
      Connection/AsyncTcpConnection.php
  2. 12 0
      Connection/TcpConnection.php
  3. 1 1
      Protocols/Websocket.php
  4. 31 13
      Protocols/Ws.php

+ 9 - 7
Connection/AsyncTcpConnection.php

@@ -176,7 +176,7 @@ class AsyncTcpConnection extends TcpConnection
     public function checkConnection($socket)
     {
         // Check socket state.
-        if (stream_socket_get_name($socket, true)) {
+        if ($address = stream_socket_get_name($socket, true)) {
             // Remove write listener.
             Worker::$globalEvent->del($socket, EventInterface::EV_WRITE);
             // Nonblocking.
@@ -195,10 +195,12 @@ class AsyncTcpConnection extends TcpConnection
                 Worker::$globalEvent->add($socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
             }
             $this->_status        = self::STATUS_ESTABLISH;
-            $this->_remoteAddress = stream_socket_get_name($socket, true);
-            if (is_callable(array($this->protocol, 'onConnect'))) {
+            $this->_remoteAddress = $address;
+
+            // Try to emit onConnect callback.
+            if ($this->onConnect) {
                 try {
-                    call_user_func(array($this->protocol, 'onConnect'), $this);
+                    call_user_func($this->onConnect, $this);
                 } catch (\Exception $e) {
                     Worker::log($e);
                     exit(250);
@@ -207,10 +209,10 @@ class AsyncTcpConnection extends TcpConnection
                     exit(250);
                 }
             }
-            // Try to emit onConnect callback.
-            if ($this->onConnect) {
+            // Try to emit protocol::onConnect
+            if (method_exists($this->protocol, 'onConnect')) {
                 try {
-                    call_user_func($this->onConnect, $this);
+                    call_user_func(array($this->protocol, 'onConnect'), $this);
                 } catch (\Exception $e) {
                     Worker::log($e);
                     exit(250);

+ 12 - 0
Connection/TcpConnection.php

@@ -619,6 +619,18 @@ class TcpConnection extends ConnectionInterface
                 exit(250);
             }
         }
+        // Try to emit protocol::onClose
+        if (method_exists($this->protocol, 'onClose')) {
+            try {
+                call_user_func(array($this->protocol, 'onClose'), $this);
+            } catch (\Exception $e) {
+                Worker::log($e);
+                exit(250);
+            } catch (\Error $e) {
+                Worker::log($e);
+                exit(250);
+            }
+        }
         if ($this->_status === self::STATUS_CLOSED) {
             // Cleaning up the callback to avoid memory leaks.
             $this->onMessage = $this->onClose = $this->onError = $this->onBufferFull = $this->onBufferDrain = null;

+ 1 - 1
Protocols/Websocket.php

@@ -154,7 +154,7 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
                     break;
                 // Wrong opcode. 
                 default :
-                    echo "error opcode $opcode and close websocket connection\n";
+                    echo "error opcode $opcode and close websocket connection. Buffer:" . bin2hex($buffer) . "\n";
                     $connection->close();
                     return 0;
             }

+ 31 - 13
Protocols/Ws.php

@@ -2,6 +2,7 @@
 namespace Workerman\Protocols;
 
 use Workerman\Worker;
+use Workerman\Lib\Timer;
 
 /**
  * Websocket protocol for client.
@@ -39,7 +40,7 @@ class Ws
     public static function input($buffer, $connection)
     {
         if (empty($connection->handshakeStep)) {
-            echo "recv data before handshake\n";
+            echo "recv data before handshake. Buffer:" . bin2hex($buffer) . "\n";
             return false;
         }
         // Recv handshake response
@@ -140,7 +141,7 @@ class Ws
                     break;
                 // Wrong opcode. 
                 default :
-                    echo "error opcode $opcode and close websocket connection\n";
+                    echo "error opcode $opcode and close websocket connection. Buffer:" . $buffer . "\n";
                     $connection->close();
                     return 0;
             }
@@ -195,6 +196,9 @@ class Ws
      */
     public static function encode($payload, $connection)
     {
+        if (empty($connection->websocketType)) {
+            $connection->websocketType = self::BINARY_TYPE_BLOB;
+        }
         $payload = (string)$payload;
         if (empty($connection->handshakeStep)) {
             self::sendHandshake($connection);
@@ -281,11 +285,27 @@ class Ws
      */
     public static function onConnect($connection)
     {
-        $connection->handshakeStep = null;
         self::sendHandshake($connection);
     }
 
     /**
+     * Clean
+     *
+     * @param $connection
+     */
+    public static function onClose($connection)
+    {
+        $connection->handshakeStep               = null;
+        $connection->websocketCurrentFrameLength = 0;
+        $connection->tmpWebsocketData            = '';
+        $connection->websocketDataBuffer         = '';
+        if (!empty($connection->websocketPingTimer)) {
+            Timer::del($connection->websocketPingTimer);
+            $connection->websocketPingTimer = null;
+        }
+    }
+
+    /**
      * Send websocket handshake.
      *
      * @param \Workerman\Connection\TcpConnection $connection
@@ -311,9 +331,6 @@ class Ws
         $connection->handshakeStep               = 1;
         $connection->websocketCurrentFrameLength = 0;
         $connection->websocketDataBuffer         = '';
-        if (empty($connection->websocketType)) {
-            $connection->websocketType = self::BINARY_TYPE_BLOB;
-        }
     }
 
     /**
@@ -329,11 +346,11 @@ class Ws
         if ($pos) {
             // handshake complete
             $connection->handshakeStep = 2;
-            $handshake_respnse_length = $pos + 4;
+            $handshake_response_length = $pos + 4;
             // Try to emit onWebSocketConnect callback.
             if (isset($connection->onWebSocketConnect)) {
                 try {
-                    call_user_func($connection->onWebSocketConnect, $connection, substr($buffer, 0, $handshake_respnse_length));
+                    call_user_func($connection->onWebSocketConnect, $connection, substr($buffer, 0, $handshake_response_length));
                 } catch (\Exception $e) {
                     Worker::log($e);
                     exit(250);
@@ -344,20 +361,21 @@ class Ws
             }
             // Headbeat.
             if (!empty($connection->websocketPingInterval)) {
-                $connection->websocketPingTimer = \Workerman\Lib\Timer::add($connection->websocketPingInterval, function() use ($connection){
+                $connection->websocketPingTimer = Timer::add($connection->websocketPingInterval, function() use ($connection){
                     if (false === $connection->send(pack('H*', '8900'), true)) {
-                        \Workerman\Lib\Timer::del($connection->websocketPingTimer);
+                        Timer::del($connection->websocketPingTimer);
+                        $connection->websocketPingTimer = null;
                     }
                 });
             }
 
-            $connection->consumeRecvBuffer($handshake_respnse_length);
+            $connection->consumeRecvBuffer($handshake_response_length);
             if (!empty($connection->tmpWebsocketData)) {
                 $connection->send($connection->tmpWebsocketData, true);
                 $connection->tmpWebsocketData = '';
             }
-            if (strlen($buffer > $handshake_respnse_length)) {
-                return self::input(substr($buffer, $handshake_respnse_length));
+            if (strlen($buffer > $handshake_response_length)) {
+                return self::input(substr($buffer, $handshake_response_length), $connection);
             }
         }
         return 0;