Răsfoiți Sursa

check sendbuffer full for websocket

walkor 9 ani în urmă
părinte
comite
86b5a1728a
3 a modificat fișierele cu 117 adăugiri și 19 ștergeri
  1. 39 18
      Connection/TcpConnection.php
  2. 39 0
      Protocols/Websocket.php
  3. 39 1
      Protocols/Ws.php

+ 39 - 18
Connection/TcpConnection.php

@@ -246,7 +246,14 @@ class TcpConnection extends ConnectionInterface
         }
 
         if ($this->_status === self::STATUS_INITIAL || $this->_status === self::STATUS_CONNECTING) {
+            if ($this->_sendBuffer) {
+                if ($this->bufferIsFull()) {
+                    self::$statistics['send_fail']++;
+                    return false;
+                }
+            }
             $this->_sendBuffer .= $send_buffer;
+            $this->checkBufferWillFull();
             return null;
         } elseif ($this->_status === self::STATUS_CLOSING || $this->_status === self::STATUS_CLOSED) {
             return false;
@@ -283,29 +290,18 @@ class TcpConnection extends ConnectionInterface
                 $this->_sendBuffer = $send_buffer;
             }
             Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
-            // Check if the send buffer is full.
-            $this->checkBufferIsFull();
+            // Check if the send buffer will be full.
+            $this->checkBufferWillFull();
             return null;
         } else {
-            // Buffer has been marked as full but still has data to send the packet is discarded.
-            if ($this->maxSendBufferSize <= strlen($this->_sendBuffer)) {
+            if ($this->bufferIsFull()) {
                 self::$statistics['send_fail']++;
-                if ($this->onError) {
-                    try {
-                        call_user_func($this->onError, $this, WORKERMAN_SEND_FAIL, 'send buffer full and drop package');
-                    } catch (\Exception $e) {
-                        Worker::log($e);
-                        exit(250);
-                    } catch (\Error $e) {
-                        Worker::log($e);
-                        exit(250);
-                    }
-                }
                 return false;
             }
+
             $this->_sendBuffer .= $send_buffer;
             // Check if the send buffer is full.
-            $this->checkBufferIsFull();
+            $this->checkBufferWillFull();
         }
     }
 
@@ -568,11 +564,11 @@ class TcpConnection extends ConnectionInterface
     }
 
     /**
-     * Check whether the send buffer is full.
+     * Check whether the send buffer will be full.
      *
      * @return void
      */
-    protected function checkBufferIsFull()
+    protected function checkBufferWillFull()
     {
         if ($this->maxSendBufferSize <= strlen($this->_sendBuffer)) {
             if ($this->onBufferFull) {
@@ -590,6 +586,31 @@ class TcpConnection extends ConnectionInterface
     }
 
     /**
+     * Whether send buffer is full.
+     *
+     * @return bool
+     */
+    protected function bufferIsFull()
+    {
+        // Buffer has been marked as full but still has data to send then the packet is discarded.
+        if ($this->maxSendBufferSize <= strlen($this->_sendBuffer)) {
+            if ($this->onError) {
+                try {
+                    call_user_func($this->onError, $this, WORKERMAN_SEND_FAIL, 'send buffer full and drop package');
+                } catch (\Exception $e) {
+                    Worker::log($e);
+                    exit(250);
+                } catch (\Error $e) {
+                    Worker::log($e);
+                    exit(250);
+                }
+            }
+            return true;
+        }
+        return false;
+    }
+
+    /**
      * Destroy connection.
      *
      * @return void

+ 39 - 0
Protocols/Websocket.php

@@ -14,6 +14,7 @@
 namespace Workerman\Protocols;
 
 use Workerman\Connection\ConnectionInterface;
+use Workerman\Connection\TcpConnection;
 use Workerman\Worker;
 
 /**
@@ -179,6 +180,14 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
                 }
             }
             $current_frame_length = $head_len + $data_len;
+
+            $total_package_size = strlen($connection->websocketDataBuffer) + $current_frame_length;
+            if ($total_package_size > TcpConnection::$maxPackageSize) {
+                echo "error package. package_length=$total_package_size\n";
+                $connection->close();
+                return 0;
+            }
+
             if ($is_fin_frame) {
                 return $current_frame_length;
             } else {
@@ -240,7 +249,37 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
             if (empty($connection->tmpWebsocketData)) {
                 $connection->tmpWebsocketData = '';
             }
+            // If buffer has already full then discard the current package.
+            if (strlen($connection->tmpWebsocketData) > $connection->maxSendBufferSize) {
+                if ($connection->onError) {
+                    try {
+                        call_user_func($connection->onError, $connection, WORKERMAN_SEND_FAIL, 'send buffer full and drop package');
+                    } catch (\Exception $e) {
+                        Worker::log($e);
+                        exit(250);
+                    } catch (\Error $e) {
+                        Worker::log($e);
+                        exit(250);
+                    }
+                }
+                return '';
+            }
             $connection->tmpWebsocketData .= $encode_buffer;
+            // Check buffer is full.
+            if ($connection->maxSendBufferSize <= strlen($connection->tmpWebsocketData)) {
+                if ($connection->onBufferFull) {
+                    try {
+                        call_user_func($connection->onBufferFull, $connection);
+                    } catch (\Exception $e) {
+                        Worker::log($e);
+                        exit(250);
+                    } catch (\Error $e) {
+                        Worker::log($e);
+                        exit(250);
+                    }
+                }
+            }
+
             // Return empty string.
             return '';
         }

+ 39 - 1
Protocols/Ws.php

@@ -161,6 +161,14 @@ class Ws
             } else {
                 $current_frame_length = $data_len + 2;
             }
+
+            $total_package_size = strlen($connection->websocketDataBuffer) + $current_frame_length;
+            if ($total_package_size > TcpConnection::$maxPackageSize) {
+                echo "error package. package_length=$total_package_size\n";
+                $connection->close();
+                return 0;
+            }
+
             if ($is_fin_frame) {
                 return $current_frame_length;
             } else {
@@ -225,7 +233,36 @@ class Ws
             $frame .= $payload[$i] ^ $mask_key[$i % 4];
         }
         if ($connection->handshakeStep === 1) {
-            $connection->tmpWebsocketData = isset($connection->tmpWebsocketData) ? $connection->tmpWebsocketData . $frame : $frame;
+            // If buffer has already full then discard the current package.
+            if (strlen($connection->tmpWebsocketData) > $connection->maxSendBufferSize) {
+                if ($connection->onError) {
+                    try {
+                        call_user_func($connection->onError, $connection, WORKERMAN_SEND_FAIL, 'send buffer full and drop package');
+                    } catch (\Exception $e) {
+                        Worker::log($e);
+                        exit(250);
+                    } catch (\Error $e) {
+                        Worker::log($e);
+                        exit(250);
+                    }
+                }
+                return '';
+            }
+            $connection->tmpWebsocketData = $connection->tmpWebsocketData . $frame;
+            // Check buffer is full.
+            if ($connection->maxSendBufferSize <= strlen($connection->tmpWebsocketData)) {
+                if ($connection->onBufferFull) {
+                    try {
+                        call_user_func($connection->onBufferFull, $connection);
+                    } catch (\Exception $e) {
+                        Worker::log($e);
+                        exit(250);
+                    } catch (\Error $e) {
+                        Worker::log($e);
+                        exit(250);
+                    }
+                }
+            }
             return '';
         }
         return $frame;
@@ -331,6 +368,7 @@ class Ws
         $connection->handshakeStep               = 1;
         $connection->websocketCurrentFrameLength = 0;
         $connection->websocketDataBuffer         = '';
+        $connection->tmpWebsocketData            = '';
     }
 
     /**