ソースを参照

nonblock write

walkor 12 年 前
コミット
139d7b0bcc
3 ファイル変更65 行追加13 行削除
  1. 1 1
      man/Core/Events/Libevent.php
  2. 1 1
      man/Core/Events/Select.php
  3. 63 11
      man/Core/SocketWorker.php

+ 1 - 1
man/Core/Events/Libevent.php

@@ -67,7 +67,7 @@ class Libevent implements BaseEvent
             return true;
         }
         
-        $real_flag = EV_READ | EV_PERSIST;
+        $real_flag = $flag == self::EV_READ ? EV_READ | EV_PERSIST : EV_WRITE | EV_PERSIST;
         
         // 创建一个用于监听的event
         $this->allEvents[$fd_key][$flag] = event_new();

+ 1 - 1
man/Core/Events/Select.php

@@ -198,7 +198,7 @@ class Select implements BaseEvent
                 $fd_key = (int) $fd;
                 if(isset($this->allEvents[$fd_key][self::EV_WRITE]))
                 {
-                    // 留空
+                    call_user_func_array($this->allEvents[$fd_key][self::EV_WRITE]['func'], array($this->allEvents[$fd_key][self::EV_WRITE]['fd'], self::EV_WRITE,  $this->allEvents[$fd_key][self::EV_WRITE]['args']));
                 }
             }
         }

+ 63 - 11
man/Core/SocketWorker.php

@@ -1,5 +1,7 @@
 <?php
 namespace Man\Core;
+use Man\Core\Events\BaseEvent;
+
 require_once WORKERMAN_ROOT_DIR . 'man/Core/Events/Select.php';
 require_once WORKERMAN_ROOT_DIR . 'man/Core/AbstractWorker.php';
 require_once WORKERMAN_ROOT_DIR . 'man/Core/Lib/Config.php';
@@ -52,12 +54,18 @@ abstract class SocketWorker extends AbstractWorker
     protected $connections = array();
     
     /**
-     * worker的所有读buffer
+     * 客户端连接的读buffer
      * @var array
      */
     protected $recvBuffers = array();
     
     /**
+     *  客户端连接的写buffer
+     * @var array
+     */
+    protected $sendBuffers = array();
+    
+    /**
      * 当前处理的fd
      * @var integer
      */
@@ -357,7 +365,7 @@ abstract class SocketWorker extends AbstractWorker
             
             // 客户端提前断开链接
             $this->statusInfo['client_close']++;
-            // 如果该链接对应的buffer有数据,说明生错误
+            // 如果该链接对应的buffer有数据,说明生错误
             if(!empty($this->recvBuffers[$fd]['buf']))
             {
                 $this->notice("CLIENT_CLOSE\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".var_export($this->recvBuffers[$fd]['buf'],true)."]\n");
@@ -399,7 +407,10 @@ abstract class SocketWorker extends AbstractWorker
             else
             {
                 // 关闭链接
-                $this->closeClient($fd);
+                if(empty($this->sendBuffers[$fd]))
+                {
+                    $this->closeClient($fd);
+                }
             }
         }
         // 出错
@@ -419,11 +430,6 @@ abstract class SocketWorker extends AbstractWorker
         // 检查是否是关闭状态或者是否到达请求上限
         if($this->workerStatus == self::STATUS_SHUTDOWN || $this->statusInfo['total_request'] >= $this->maxRequests)
         {
-            // 关闭链接
-            if($this->isPersistentConnection)
-            {
-                $this->closeClient($fd);
-            }
             // 停止服务
             $this->stop();
             // EXIT_WAIT_TIME秒后退出进程
@@ -442,8 +448,9 @@ abstract class SocketWorker extends AbstractWorker
         if($this->protocol != 'udp')
         {
             $this->event->del($this->connections[$fd], Events\BaseEvent::EV_READ);
+            $this->event->del($this->connections[$fd], Events\BaseEvent::EV_WRITE);
             fclose($this->connections[$fd]);
-            unset($this->connections[$fd], $this->recvBuffers[$fd]);
+            unset($this->connections[$fd], $this->recvBuffers[$fd], $this->sendBuffers[$fd]);
         }
     }
     
@@ -517,20 +524,65 @@ abstract class SocketWorker extends AbstractWorker
         // tcp
         if($this->protocol != 'udp')
         {
+            $send_len = @stream_socket_sendto($this->connections[$this->currentDealFd], $str_to_send);
+            if($send_len === strlen($str_to_send))
+            {
+                return true;
+            }
+            if($send_len > 0)
+            {
+                $this->sendBuffers[$this->currentDealFd] = substr($str_to_send, $send_len);
+            }
+            else
+            {
+                $this->sendBuffers[$this->currentDealFd] = $str_to_send;
+            }
+            $this->event->add($this->connections[$this->currentDealFd],  Events\BaseEvent::EV_WRITE, array($this, 'tcpWriteToClient'), array($this->currentDealFd));
+            /*
             // tcp 如果一次没写完(一般是缓冲区满的情况),则阻塞写
             if(!$this->blockWrite($this->connections[$this->currentDealFd], $str_to_send, 500))
             {
                 $this->notice('sendToClient fail ,Data length = ' . strlen($str_to_send));
                 $this->statusInfo['send_fail']++;
                 return false;
-            }
-            return true;
+            }*/
+            return null;
         }
         // udp 直接发送,要求数据包不能超过65515
        return strlen($str_to_send) == stream_socket_sendto($this->mainSocket, $str_to_send, 0, $this->currentClientAddress);
     }
     
     /**
+     * 向客户端socket写数据
+     * @param resource $fd
+     * @param string $bin_data
+     */
+    public function tcpWriteToClient($fd)
+    {
+        if(empty($this->connections[$fd]))
+        {
+            $this->notice("tcpWriteToClient \$this->connections[$fd] is null");
+            return false;
+        }
+        
+        $send_len = @stream_socket_sendto($this->connections[$fd], $this->sendBuffers[$fd]);
+        if($send_len === strlen($this->sendBuffers[$fd]))
+        {
+            if(!$this->isPersistentConnection)
+            {
+                return $this->closeClient($fd);
+            }
+            $this->event->del($this->connections[$fd], BaseEvent::EV_WRITE);
+            return;
+        }
+        
+        if($send_len > 0)
+        {
+            $this->sendBuffers[$fd] = substr($this->sendBuffers[$fd], $send_len);
+        }
+    }
+    
+    /**
      * 向fd写数据,如果socket缓冲区满了,则改用阻塞模式写数据
      * @param resource $fd
      * @param string $str_to_write