Browse Source

support onBufferFull onBufferDrain

walkor 10 years ago
parent
commit
d0359591fd

+ 81 - 4
Workerman/Connection/TcpConnection.php

@@ -63,6 +63,18 @@ class TcpConnection extends ConnectionInterface
     public $onError = null;
     public $onError = null;
     
     
     /**
     /**
+     * 当发送缓冲区满时,如果设置了$onBufferFull回调,则执行
+     * @var callback
+     */
+    public $onBufferFull = null;
+    
+    /**
+     * 当发送缓冲区被清空时,如果设置了$onBufferDrain回调,则执行
+     * @var callback
+     */
+    public $onBufferDrain = null;
+    
+    /**
      * 使用的应用层协议,是协议类的名称
      * 使用的应用层协议,是协议类的名称
      * 值类似于 Workerman\\Protocols\\Http
      * 值类似于 Workerman\\Protocols\\Http
      * @var string
      * @var string
@@ -135,6 +147,18 @@ class TcpConnection extends ConnectionInterface
      * @var string
      * @var string
      */
      */
     protected $_remoteAddress = '';
     protected $_remoteAddress = '';
+    
+    /**
+     * 是否是停止接收数据
+     * @var bool
+     */
+    protected $_isPaused = false;
+    
+    /**
+     * 应用层发送缓冲区是否已经满了
+     * @var bool
+     */
+    protected $_bufferIsFull = false;
 
 
     /**
     /**
      * 构造函数
      * 构造函数
@@ -215,17 +239,17 @@ class TcpConnection extends ConnectionInterface
         }
         }
         else
         else
         {
         {
-            // 检查发送缓冲区是否已满
-            if(self::$maxSendBufferSize <= strlen($this->_sendBuffer) + strlen($send_buffer))
+            // 缓冲区已经标记为,任然有数据发送,则丢弃数据包
+            if($this->_bufferIsFull)
             {
             {
                 // 为status命令统计发送失败次数
                 // 为status命令统计发送失败次数
                 self::$statistics['send_fail']++;
                 self::$statistics['send_fail']++;
                 // 如果有设置失败回调,则执行
                 // 如果有设置失败回调,则执行
                 if($this->onError)
                 if($this->onError)
                 {
                 {
-                    try 
+                    try
                     {
                     {
-                        call_user_func($this->onError, $this, WORKERMAN_SEND_FAIL, 'send buffer full');
+                        call_user_func($this->onError, $this, WORKERMAN_SEND_FAIL, 'send buffer full and drop package');
                     }
                     }
                     catch(Exception $e)
                     catch(Exception $e)
                     {
                     {
@@ -236,6 +260,22 @@ class TcpConnection extends ConnectionInterface
             }
             }
             // 将数据放入放缓冲区
             // 将数据放入放缓冲区
             $this->_sendBuffer .= $send_buffer;
             $this->_sendBuffer .= $send_buffer;
+            // 检查发送缓冲区是否已满,如果满了尝试触发onBufferFull回调
+            if(self::$maxSendBufferSize <= strlen($this->_sendBuffer))
+            {
+                $this->_bufferIsFull = true;
+                if($this->onBufferFull)
+                {
+                    try
+                    {
+                        call_user_func($this->onBufferFull, $this);
+                    }
+                    catch(Exception $e)
+                    {
+                        echo $e;
+                    }
+                }
+            }
         }
         }
     }
     }
     
     
@@ -259,6 +299,7 @@ class TcpConnection extends ConnectionInterface
     
     
     /**
     /**
      * 获得对端端口
      * 获得对端端口
+     * @return int
      */
      */
     public function getRemotePort()
     public function getRemotePort()
     {
     {
@@ -273,6 +314,29 @@ class TcpConnection extends ConnectionInterface
         }
         }
         return $this->_remotePort;
         return $this->_remotePort;
     }
     }
+    
+    /**
+     * 暂停接收数据,一般用于控制上传流量
+     * @return void
+     */
+    public function pauseRecv()
+    {
+        Worker::$globalEvent->del($this->_socket, EventInterface::EV_READ);
+        $this->_isPaused = true;
+    }
+    
+    /**
+     * 恢复接收数据,一般用户控制上传流量
+     * @return void
+     */
+    public function resumeRecv()
+    {
+        if($this->_isPaused == true)
+        {
+            Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
+            $this->_isPaused = false;
+        }
+    }
 
 
     /**
     /**
      * 当socket可读时的回调
      * 当socket可读时的回调
@@ -396,6 +460,19 @@ class TcpConnection extends ConnectionInterface
         {
         {
             Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
             Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
             $this->_sendBuffer = '';
             $this->_sendBuffer = '';
+            // 发送缓冲区的数据被发送完毕,尝试触发onBufferDrain回调
+            if($this->onBufferDrain)
+            {
+                try 
+                {
+                    call_user_func($this->onBufferDrain, $this);
+                }
+                catch(Exception $e)
+                {
+                    echo $e;
+                }
+            }
+            // 如果连接状态为关闭,则销毁连接
             if($this->_status == self::STATUS_CLOSING)
             if($this->_status == self::STATUS_CLOSING)
             {
             {
                 $this->destroy();
                 $this->destroy();

+ 13 - 7
Workerman/Protocols/Http.php

@@ -1,7 +1,7 @@
 <?php 
 <?php 
 namespace  Workerman\Protocols;
 namespace  Workerman\Protocols;
 
 
-use Workerman\Connection\ConnectionInterface;
+use Workerman\Connection\TcpConnection;
 
 
 /**
 /**
  * http protocol
  * http protocol
@@ -12,13 +12,19 @@ class Http implements \Workerman\Protocols\ProtocolInterface
     /**
     /**
      * 判断包长
      * 判断包长
      * @param string $recv_buffer
      * @param string $recv_buffer
-     * @param ConnectionInterface $connection
+     * @param TcpConnection $connection
      * @return int
      * @return int
      */
      */
-    public static function input($recv_buffer, ConnectionInterface $connection)
+    public static function input($recv_buffer, TcpConnection $connection)
     {
     {
         if(!strpos($recv_buffer, "\r\n\r\n"))
         if(!strpos($recv_buffer, "\r\n\r\n"))
         {
         {
+            // 无法获得包长,避免客户端传递超大头部的数据包
+            if(strlen($recv_buffer)>=TcpConnection::$maxPackageSize)
+            {
+                $connection->close();
+                return 0;
+            }
             return 0;
             return 0;
         }
         }
         
         
@@ -46,10 +52,10 @@ class Http implements \Workerman\Protocols\ProtocolInterface
     /**
     /**
      * 从http数据包中解析$_POST、$_GET、$_COOKIE等 
      * 从http数据包中解析$_POST、$_GET、$_COOKIE等 
      * @param string $recv_buffer
      * @param string $recv_buffer
-     * @param ConnectionInterface $connection
+     * @param TcpConnection $connection
      * @return void
      * @return void
      */
      */
-    public static function decode($recv_buffer, ConnectionInterface $connection)
+    public static function decode($recv_buffer, TcpConnection $connection)
     {
     {
         // 初始化
         // 初始化
         $_POST = $_GET = $_COOKIE = $_REQUEST = $_SESSION = $_FILES =  array();
         $_POST = $_GET = $_COOKIE = $_REQUEST = $_SESSION = $_FILES =  array();
@@ -193,10 +199,10 @@ class Http implements \Workerman\Protocols\ProtocolInterface
     /**
     /**
      * 编码,增加HTTP头
      * 编码,增加HTTP头
      * @param string $content
      * @param string $content
-     * @param ConnectionInterface $connection
+     * @param TcpConnection $connection
      * @return string
      * @return string
      */
      */
-    public static function encode($content, ConnectionInterface $connection)
+    public static function encode($content, TcpConnection $connection)
     {
     {
         // 没有http-code默认给个
         // 没有http-code默认给个
         if(!isset(HttpCache::$header['Http-Code']))
         if(!isset(HttpCache::$header['Http-Code']))

+ 10 - 1
Workerman/Protocols/Text.php

@@ -1,5 +1,7 @@
 <?php 
 <?php 
 namespace Workerman\Protocols;
 namespace Workerman\Protocols;
+use \Workerman\Connection\TcpConnection;
+
 /**
 /**
  * Text协议
  * Text协议
  * 以换行为请求结束标记
  * 以换行为请求结束标记
@@ -13,8 +15,15 @@ class Text
      * 如果能够得到包长,则返回包的长度,否则返回0继续等待数据
      * 如果能够得到包长,则返回包的长度,否则返回0继续等待数据
      * @param string $buffer
      * @param string $buffer
      */
      */
-    public static function input($buffer)
+    public static function input($buffer ,TcpConnection $connection)
     {
     {
+        // 由于没有包头,无法预先知道包长,不能无限制的接收数据,
+        // 所以需要判断当前接收的数据是否超过限定值
+        if(strlen($buffer)>=TcpConnection::$maxPackageSize)
+        {
+            $connection->close();
+            return 0;
+        }
         // 获得换行字符"\n"位置
         // 获得换行字符"\n"位置
         $pos = strpos($buffer, "\n");
         $pos = strpos($buffer, "\n");
         // 没有换行符,无法得知包长,返回0继续等待数据
         // 没有换行符,无法得知包长,返回0继续等待数据

+ 14 - 0
Workerman/Worker.php

@@ -124,6 +124,18 @@ class Worker
     public $onError = null;
     public $onError = null;
     
     
     /**
     /**
+     * 当连接的发送缓冲区满时,如果设置了$onBufferFull回调,则执行
+     * @var callback
+     */
+    public $onBufferFull = null;
+    
+    /**
+     * 当链接的发送缓冲区被清空时,如果设置了$onBufferDrain回调,则执行
+     * @var callback
+     */
+    public $onBufferDrain = null;
+    
+    /**
      * 当前进程退出时(由于平滑重启或者服务停止导致),如果设置了此回调,则运行
      * 当前进程退出时(由于平滑重启或者服务停止导致),如果设置了此回调,则运行
      * @var callback
      * @var callback
      */
      */
@@ -1202,6 +1214,8 @@ class Worker
         $connection->onMessage = $this->onMessage;
         $connection->onMessage = $this->onMessage;
         $connection->onClose = $this->onClose;
         $connection->onClose = $this->onClose;
         $connection->onError = $this->onError;
         $connection->onError = $this->onError;
+        $connection->onBufferDrain = $this->onBufferDrain;
+        $connection->onBufferFull = $this->onBufferFull;
         
         
         // 如果有设置连接回调,则执行
         // 如果有设置连接回调,则执行
         if($this->onConnect)
         if($this->onConnect)