walkor пре 10 година
родитељ
комит
a0fa0eac11
2 измењених фајлова са 33 додато и 5 уклоњено
  1. 6 5
      Connection/AsyncTcpConnection.php
  2. 27 0
      Connection/TcpConnection.php

+ 6 - 5
Connection/AsyncTcpConnection.php

@@ -106,11 +106,11 @@ class AsyncTcpConnection extends TcpConnection
      */
     public function checkConnection($socket)
     {
-        // 删除连接可写监听
-        Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
         // 需要判断两次连接是否已经断开
         if(!feof($this->_socket) && !feof($this->_socket) && is_resource($this->_socket))
         {
+            // 删除连接可写监听
+            Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
             // 设置非阻塞
             stream_set_blocking($this->_socket, 0);
             // 监听可读事件
@@ -138,11 +138,12 @@ class AsyncTcpConnection extends TcpConnection
         }
         else
         {
-            $this->_status = self::STATUS_CLOSED;
-            // 关闭socket
-            @fclose($this->_socket);
             // 连接未建立成功
             $this->emitError(WORKERMAN_CONNECT_FAIL, 'connect fail');
+            // 触发onClsoe
+            $this->destroy();
+            // 清理onConnect回调
+            $this->onConnect = null;
         }
     }
 }

+ 27 - 0
Connection/TcpConnection.php

@@ -542,6 +542,31 @@ class TcpConnection extends ConnectionInterface
     }
     
     /**
+     * 管道重定向
+     * @return void
+     */
+    public function pipe($dest)
+    {
+        $source = $this;
+        $this->onMessage = function($source, $data)use($dest)
+        {
+            $dest->send($data);
+        };
+        $this->onClose = function($source)use($dest)
+        {
+            $dest->destroy();
+        };
+        $dest->onBufferFull = function($dest)use($source)
+        {
+            $source->pauseRecv();
+        };
+        $dest->onBufferDrain = function($dest)use($source)
+        {
+            $source->resumeRecv();
+        };
+    }
+    
+    /**
      * 从缓冲区中消费掉$length长度的数据
      * @param int $length
      * @return void
@@ -642,6 +667,8 @@ class TcpConnection extends ConnectionInterface
                echo $e;
            }
        }
+       // 清理回调,避免内存泄露
+       $this->onMessage = $this->onClose = $this->onError = $this->onBufferFull = $this->onBufferDrain = null;
     }
     
     /**