Browse Source

fix baseWrite broken pipe

walkor 10 years ago
parent
commit
5018369da3
1 changed files with 98 additions and 94 deletions
  1. 98 94
      Connection/TcpConnection.php

+ 98 - 94
Connection/TcpConnection.php

@@ -260,7 +260,7 @@ class TcpConnection extends ConnectionInterface
             else
             {
                 // 如果连接断开
-                if(feof($this->_socket) || !is_resource($this->_socket))
+                if(!is_resource($this->_socket) || feof($this->_socket))
                 {
                     // status统计发送失败次数
                     self::$statistics['send_fail']++;
@@ -384,37 +384,61 @@ class TcpConnection extends ConnectionInterface
      */
     public function baseRead($socket)
     {
-        if(!is_resource($socket) || feof($socket))
+        $read_data = false;
+        while(1)
+        {
+            $buffer = fread($socket, self::READ_BUFFER_SIZE);
+            if($buffer === '' || $buffer === false)
+            {
+                break;
+            }
+            $read_data = true;
+            $this->_recvBuffer .= $buffer;
+        }
+        
+        // 没有读到数据时检查连接是否断开
+        if(!$read_data && (!is_resource($socket) || feof($socket)))
         {
             $this->destroy();
             return;
         }
-       while(1)
-       {
-           $buffer = fread($socket, self::READ_BUFFER_SIZE);
-           if($buffer === '' || $buffer === false)
-           {
-               break;
-           }
-           $this->_recvBuffer .= $buffer; 
-       }
        
-       if($this->_recvBuffer)
-       {
-           if(!$this->onMessage)
-           {
-               $this->_recvBuffer = '';
-               return ;
-           }
-           
-           // 如果设置了协议
-           if($this->protocol)
+        if(!$this->_recvBuffer)
+        {
+            return;
+        }
+        
+        if(!$this->onMessage)
+        {
+            $this->_recvBuffer = '';
+            return ;
+        }
+       
+        // 如果设置了协议
+        if($this->protocol)
+        {
+           $parser = $this->protocol;
+           while($this->_recvBuffer && !$this->_isPaused)
            {
-               $parser = $this->protocol;
-               while($this->_recvBuffer && !$this->_isPaused)
+               // 当前包的长度已知
+               if($this->_currentPackageLength)
+               {
+                   // 数据不够一个包
+                   if($this->_currentPackageLength > strlen($this->_recvBuffer))
+                   {
+                       break;
+                   }
+               }
+               else
                {
-                   // 当前包的长度已知
-                   if($this->_currentPackageLength)
+                   // 获得当前包长
+                   $this->_currentPackageLength = $parser::input($this->_recvBuffer, $this);
+                   // 数据不够,无法获得包长
+                   if($this->_currentPackageLength === 0)
+                   {
+                       break;
+                   }
+                   elseif($this->_currentPackageLength > 0 && $this->_currentPackageLength <= self::$maxPackageSize)
                    {
                        // 数据不够一个包
                        if($this->_currentPackageLength > strlen($this->_recvBuffer))
@@ -422,75 +446,57 @@ class TcpConnection extends ConnectionInterface
                            break;
                        }
                    }
+                   // 包错误
                    else
                    {
-                       // 获得当前包长
-                       $this->_currentPackageLength = $parser::input($this->_recvBuffer, $this);
-                       // 数据不够,无法获得包长
-                       if($this->_currentPackageLength === 0)
-                       {
-                           break;
-                       }
-                       elseif($this->_currentPackageLength > 0 && $this->_currentPackageLength <= self::$maxPackageSize)
-                       {
-                           // 数据不够一个包
-                           if($this->_currentPackageLength > strlen($this->_recvBuffer))
-                           {
-                               break;
-                           }
-                       }
-                       // 包错误
-                       else
-                       {
-                           $this->close('error package. package_length='.var_export($this->_currentPackageLength, true));
-                           return;
-                       }
-                   }
-                   
-                   // 数据足够一个包长
-                   self::$statistics['total_request']++;
-                   // 当前包长刚好等于buffer的长度
-                   if(strlen($this->_recvBuffer) === $this->_currentPackageLength)
-                   {
-                       $one_request_buffer = $this->_recvBuffer;
-                       $this->_recvBuffer = '';
-                   }
-                   else
-                   {
-                       // 从缓冲区中获取一个完整的包
-                       $one_request_buffer = substr($this->_recvBuffer, 0, $this->_currentPackageLength);
-                       // 将当前包从接受缓冲区中去掉
-                       $this->_recvBuffer = substr($this->_recvBuffer, $this->_currentPackageLength);
-                   }
-                   // 重置当前包长为0
-                   $this->_currentPackageLength = 0;
-                   // 处理数据包
-                   try
-                   {
-                       call_user_func($this->onMessage, $this, $parser::decode($one_request_buffer, $this));
-                   }
-                   catch(Exception $e)
-                   {
-                       self::$statistics['throw_exception']++;
-                       echo $e;
+                       $this->close('error package. package_length='.var_export($this->_currentPackageLength, true));
+                       return;
                    }
                }
-               return;
-           }
-           // 没有设置协议,则直接把接收的数据当做一个包处理
-           self::$statistics['total_request']++;
-           try 
-           {
-               call_user_func($this->onMessage, $this, $this->_recvBuffer);
-           }
-           catch(Exception $e)
-           {
-               self::$statistics['throw_exception']++;
-               echo $e;
+               
+               // 数据足够一个包长
+               self::$statistics['total_request']++;
+               // 当前包长刚好等于buffer的长度
+               if(strlen($this->_recvBuffer) === $this->_currentPackageLength)
+               {
+                   $one_request_buffer = $this->_recvBuffer;
+                   $this->_recvBuffer = '';
+               }
+               else
+               {
+                   // 从缓冲区中获取一个完整的包
+                   $one_request_buffer = substr($this->_recvBuffer, 0, $this->_currentPackageLength);
+                   // 将当前包从接受缓冲区中去掉
+                   $this->_recvBuffer = substr($this->_recvBuffer, $this->_currentPackageLength);
+               }
+               // 重置当前包长为0
+               $this->_currentPackageLength = 0;
+               // 处理数据包
+               try
+               {
+                   call_user_func($this->onMessage, $this, $parser::decode($one_request_buffer, $this));
+               }
+               catch(Exception $e)
+               {
+                   self::$statistics['throw_exception']++;
+                   echo $e;
+               }
            }
-           // 清空缓冲区
-           $this->_recvBuffer = '';
-       }
+           return;
+        }
+        // 没有设置协议,则直接把接收的数据当做一个包处理
+        self::$statistics['total_request']++;
+        try 
+        {
+           call_user_func($this->onMessage, $this, $this->_recvBuffer);
+        }
+        catch(Exception $e)
+        {
+           self::$statistics['throw_exception']++;
+           echo $e;
+        }
+        // 清空缓冲区
+        $this->_recvBuffer = '';
     }
 
     /**
@@ -527,13 +533,11 @@ class TcpConnection extends ConnectionInterface
         {
            $this->_sendBuffer = substr($this->_sendBuffer, $len);
         }
+        // 可写但是写失败,说明连接断开
         else
         {
-           if(feof($this->_socket) || !is_resource($this->_socket))
-           {
-               self::$statistics['send_fail']++;
-               $this->destroy();
-           }
+            self::$statistics['send_fail']++;
+            $this->destroy();
         }
     }