Quellcode durchsuchen

3.0.4 Timer support

walkor vor 10 Jahren
Ursprung
Commit
921c9a5d3d

+ 19 - 1
Workerman/Events/EventInterface.php

@@ -22,13 +22,25 @@ interface EventInterface
     const EV_SIGNAL = 4;
     
     /**
+     * 连续的定时事件
+     * @var int
+     */
+    const EV_TIMER = 8;
+    
+    /**
+     * 定时一次
+     * @var int 
+     */
+    const EV_TIMER_ONCE = 16;
+    
+    /**
      * 添加事件回调 
      * @param resource $fd
      * @param int $flag
      * @param callable $func
      * @return bool
      */
-    public function add($fd, $flag, $func);
+    public function add($fd, $flag, $func, $args = null);
     
     /**
      * 删除事件回调
@@ -39,6 +51,12 @@ interface EventInterface
     public function del($fd, $flag);
     
     /**
+     * 清除所有定时器
+     * @return void
+     */
+    public function clearAllTimer();
+    
+    /**
      * 事件循环
      * @return void
      */

+ 126 - 44
Workerman/Events/Libevent.php

@@ -10,19 +10,26 @@ class Libevent implements EventInterface
      * eventBase
      * @var object
      */
-    protected $eventBase = null;
+    protected $_eventBase = null;
     
     /**
      * 所有的事件
      * @var array
      */
-    protected $allEvents = array();
+    protected $_allEvents = array();
     
     /**
      * 所有的信号事件
      * @var array
      */
-    protected $eventSignal = array();
+    protected $_eventSignal = array();
+    
+    /**
+     * 所有的定时事件
+     * [func, args, event, flag, time_interval]
+     * @var array
+     */
+    protected $_eventTimer = array();
     
     /**
      * 构造函数
@@ -37,48 +44,75 @@ class Libevent implements EventInterface
      * 添加事件
      * @see EventInterface::add()
      */
-    public function add($fd, $flag, $func)
+    public function add($fd, $flag, $func, $args=null)
     {
         $fd_key = (int)$fd;
         
-        if ($flag == self::EV_SIGNAL)
-        {
-            $real_flag = EV_SIGNAL | EV_PERSIST;
-            $this->_eventSignal[$fd_key] = event_new();
-            if(!event_set($this->_eventSignal[$fd_key], $fd, $real_flag, $func, null))
-            {
-                return false;
-            }
-            if(!event_base_set($this->_eventSignal[$fd_key], $this->_eventBase))
-            {
-                return false;
-            }
-            if(!event_add($this->_eventSignal[$fd_key]))
-            {
-                return false;
-            }
-            return true;
-        }
-        
-        $real_flag = $flag == self::EV_READ ? EV_READ | EV_PERSIST : EV_WRITE | EV_PERSIST;
-        
-        $this->_allEvents[$fd_key][$flag] = event_new();
-        
-        if(!event_set($this->_allEvents[$fd_key][$flag], $fd, $real_flag, $func, null))
-        {
-            return false;
-        }
-        
-        if(!event_base_set($this->_allEvents[$fd_key][$flag], $this->_eventBase))
+        switch($flag)
         {
-            return false;
+            case self::EV_SIGNAL:
+                $real_flag = EV_SIGNAL | EV_PERSIST;
+                $this->_eventSignal[$fd_key] = event_new();
+                if(!event_set($this->_eventSignal[$fd_key], $fd, $real_flag, $func, null))
+                {
+                    return false;
+                }
+                if(!event_base_set($this->_eventSignal[$fd_key], $this->_eventBase))
+                {
+                    return false;
+                }
+                if(!event_add($this->_eventSignal[$fd_key]))
+                {
+                    return false;
+                }
+                return true;
+            case self::EV_TIMER:
+            case self::EV_TIMER_ONCE:
+                $event = event_new();
+                $timer_id = (int)$event;
+                if(!event_set($event, 0, EV_TIMEOUT, array($this, 'timerCallback'), $timer_id))
+                {
+                    return false;
+                }
+                
+                if(!event_base_set($event, $this->_eventBase))
+                {
+                    return false;
+                }
+                
+                $time_interval = $fd_key*1000000;
+                if(!event_add($event, $time_interval))
+                {
+                    return false;
+                }
+                $this->_eventTimer[$timer_id] = array($func, (array)$args, $event, $flag, $time_interval);
+                return $timer_id;
+                
+            default :
+                $real_flag = $flag == self::EV_READ ? EV_READ | EV_PERSIST : EV_WRITE | EV_PERSIST;
+                
+                $event = event_new();
+                
+                if(!event_set($event, $fd, $real_flag, $func, null))
+                {
+                    return false;
+                }
+                
+                if(!event_base_set($event, $this->_eventBase))
+                {
+                    return false;
+                }
+                
+                if(!event_add($event))
+                {
+                    return false;
+                }
+                
+                $this->_allEvents[$fd_key][$flag] = $event;
+                
+                return true;
         }
         
-        if(!event_add($this->_allEvents[$fd_key][$flag]))
-        {
-            return false;
-        }
-        return true;
     }
     
     /**
@@ -90,26 +124,74 @@ class Libevent implements EventInterface
         $fd_key = (int)$fd;
         switch($flag)
         {
-            case EventInterface::EV_READ:
-            case EventInterface::EV_WRITE:
+            case self::EV_READ:
+            case self::EV_WRITE:
                 if(isset($this->_allEvents[$fd_key][$flag]))
                 {
                     event_del($this->_allEvents[$fd_key][$flag]);
+                    unset($this->_allEvents[$fd_key][$flag]);
                 }
-                unset($this->_allEvents[$fd_key][$flag]);
                 if(empty($this->_allEvents[$fd_key]))
                 {
                     unset($this->_allEvents[$fd_key]);
                 }
-            case  EventInterface::EV_SIGNAL:
+                break;
+            case  self::EV_SIGNAL:
                 if(isset($this->_eventSignal[$fd_key]))
                 {
                     event_del($this->_eventSignal[$fd_key]);
+                    unset($this->_eventSignal[$fd_key]);
+                }
+                break;
+            case self::EV_TIMER:
+            case self::EV_TIMER_ONCE:
+                if(isset($this->_eventTimer[$fd_key]))
+                {
+                    event_del($this->_eventTimer[$fd_key][2]);
+                    unset($this->_eventTimer[$fd_key]);
                 }
-                unset($this->_eventSignal[$fd_key]);
+                break;
         }
         return true;
     }
+    
+    /**
+     * 定时器回调
+     * @param null $_null
+     * @param null $_null
+     * @param int $timer_id
+     */
+    protected function timerCallback($_null, $_null, $timer_id)
+    {
+        // 如果是连续的定时任务,再把任务加进去
+        if($this->_eventTimer[$timer_id][3] == self::EV_TIMER)
+        {
+            event_add($this->_eventTimer[$timer_id][2], $this->_eventTimer[$timer_id][4]);
+        }
+        try 
+        {
+            // 执行任务
+            call_user_func_array($this->_eventTimer[$timer_id][0], $this->_eventTimer[$timer_id][1]);
+        }
+        catch(\Exception $e)
+        {
+            echo $e;
+        }
+    }
+    
+    /**
+     * 删除所有定时器
+     * @return void
+     */
+    public function clearAllTimer()
+    {
+        foreach($this->_eventTimer as $task_data)
+        {
+            event_del($task_data[2]);
+        }
+        $this->_eventTimer = array();
+    }
+     
 
     /**
      * 事件循环

+ 116 - 10
Workerman/Events/Select.php

@@ -28,6 +28,32 @@ class Select implements EventInterface
     protected $_writeFds = array();
     
     /**
+     * 任务调度器,最大堆
+     * {['data':timer_id, 'priority':run_timestamp], ..}
+     * @var SplPriorityQueue
+     */
+    protected $_scheduler = null;
+    
+    /**
+     * 定时任务
+     * [[func, args, flag, timer_interval], ..]
+     * @var array
+     */
+    protected $_task = array();
+    
+    /**
+     * 定时器id
+     * @var int
+     */
+    protected $_timerId = 1;
+    
+    /**
+     * select超时时间,单位:微妙
+     * @var int
+     */
+    protected $_selectTimeout = 100000000;
+    
+    /**
      * 构造函数
      * @return void
      */
@@ -40,13 +66,16 @@ class Select implements EventInterface
             stream_set_blocking($this->channel[0], 0);
             $this->_readFds[0] = $this->channel[0];
         }
+        // 初始化优先队列(最大堆)
+        $this->_scheduler = new \SplPriorityQueue();
+        $this->_scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH);
     }
     
     /**
      * 添加事件及处理函数
      * @see Events\EventInterface::add()
      */
-    public function add($fd, $flag, $func)
+    public function add($fd, $flag, $func, $args = null)
     {
         // key
         $fd_key = (int)$fd;
@@ -64,6 +93,14 @@ class Select implements EventInterface
                 $this->_signalEvents[$fd_key][$flag] = array($func, $fd);
                 pcntl_signal($fd, array($this, 'signalHandler'));
                 break;
+            case self::EV_TIMER:
+            case self::EV_TIMER_ONCE:
+                // $fd 为 定时的时间间隔,单位为秒,支持小数,能精确到0.001秒
+                $run_time = microtime(true)+$fd_key;
+                $this->_scheduler->insert($this->_timerId, -$run_time);
+                $this->_task[$this->_timerId] = array($func, $args, $flag, $fd_key);
+                $this->tick();
+                return $this->_timerId++;
         }
         
         return true;
@@ -93,21 +130,89 @@ class Select implements EventInterface
                 {
                     unset($this->_allEvents[$fd_key]);
                 }
-                break;
+                return true;
             case self::EV_WRITE:
                 unset($this->_allEvents[$fd_key][$flag], $this->_writeFds[$fd_key]);
                 if(empty($this->_allEvents[$fd_key]))
                 {
                     unset($this->_allEvents[$fd_key]);
                 }
-                break;
+                return true;
             case self::EV_SIGNAL:
                 unset($this->_signalEvents[$fd_key]);
                 pcntl_signal($fd, SIG_IGN);
                 break;
+            case self::EV_TIMER:
+            case self::EV_TIMER_ONCE;
+                // $fd_key为要删除的定时器id,即timerId
+                unset($this->_task[$fd_key]);
+                return true;
         }
-        return true;
+        return false;;
+    }
+    
+    /**
+     * 检查是否有可执行的定时任务,有的话执行
+     * @return void
+     */
+    protected function tick()
+    {
+        while(!$this->_scheduler->isEmpty())
+        {
+            $scheduler_data = $this->_scheduler->top();
+            $timer_id = $scheduler_data['data'];
+            $next_run_time = -$scheduler_data['priority'];
+            $time_now = microtime(true);
+            if($time_now >= $next_run_time)
+            {
+                $this->_scheduler->extract();
+                
+                // 如果任务不存在,则是对应的定时器已经删除
+                if(!isset($this->_task[$timer_id]))
+                {
+                    continue;
+                }
+                
+                // 任务数据[func, args, flag, timer_interval]
+                $task_data = $this->_task[$timer_id];
+                // 如果是持续的定时任务,再把任务加到定时队列
+                if($task_data[2] == self::EV_TIMER)
+                {
+                    $next_run_time = $time_now+$task_data[3];
+                    $this->_scheduler->insert($timer_id, -$next_run_time);
+                }
+                // 尝试执行任务
+                try
+                {
+                    call_user_func($task_data[0], $task_data[1]);
+                }
+                catch(\Exception $e)
+                {
+                    echo $e;
+                }
+                continue;
+            }
+            else
+            {
+                // 设定超时时间
+                $this->_selectTimeout = ($next_run_time - $time_now)*1000000;
+                return;
+            }
+        }
+        $this->_selectTimeout = 100000000;
     }
+    
+    /**
+     * 删除所有定时器
+     * @return void
+     */
+    public function clearAllTimer()
+    {
+        $this->_scheduler = new \SplPriorityQueue();
+        $this->_scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH);
+        $this->_task = array();
+    }
+    
     /**
      * 主循环
      * @see Events\EventInterface::loop()
@@ -123,12 +228,7 @@ class Select implements EventInterface
             $read = $this->_readFds;
             $write = $this->_writeFds;
             // 等待可读或者可写事件
-            if(!@stream_select($read, $write, $e, 60))
-            {
-                // 可能是被信号打断,尝试执行信号处理函数
-                pcntl_signal_dispatch();
-                continue;
-            }
+            @stream_select($read, $write, $e, 0, $this->_selectTimeout);
             
             // 这些描述符可读,执行对应描述符的读回调函数
             if($read)
@@ -155,6 +255,12 @@ class Select implements EventInterface
                     }
                 }
             }
+            
+            // 尝试执行定时任务
+            if(!$this->_scheduler->isEmpty())
+            {
+                $this->tick();
+            }
         }
     }
 }

+ 47 - 14
Workerman/Lib/Timer.php

@@ -19,14 +19,21 @@ use \Exception;
 class Timer 
 {
     /**
+     * 基于ALARM信号的任务
      * [
-     *   run_time => [[$func, $args, $persistent, timelong],[$func, $args, $persistent, timelong],..]],
-     *   run_time => [[$func, $args, $persistent, timelong],[$func, $args, $persistent, timelong],..]],
+     *   run_time => [[$func, $args, $persistent, time_interval],[$func, $args, $persistent, time_interval],..]],
+     *   run_time => [[$func, $args, $persistent, time_interval],[$func, $args, $persistent, time_interval],..]],
      *   .. 
      * ]
      * @var array
      */
-    protected static $tasks = array();
+    protected static $_tasks = array();
+    
+    /**
+     * event
+     * @var event
+     */
+    protected static $_event = null;
     
     
     /**
@@ -37,7 +44,7 @@ class Timer
     {
         if($event)
         {
-            $event->add(SIGALRM, EventInterface::EV_SIGNAL, array('\Workerman\Lib\Timer', 'signalHandle'));
+            self::$_event = $event;
         }
         else 
         {
@@ -51,8 +58,11 @@ class Timer
      */
     public static function signalHandle()
     {
-        pcntl_alarm(1);
-        self::tick();
+        if(!self::$_event)
+        {
+            pcntl_alarm(1);
+            self::tick();
+        }
     }
     
     
@@ -67,26 +77,33 @@ class Timer
     {
         if($time_interval <= 0)
         {
+            echo new Exception("bad time_interval");
             return false;
         }
+        
+        if(self::$_event)
+        {
+            return self::$_event->add($time_interval, $persistent ? EventInterface::EV_TIMER : EventInterface::EV_TIMER_ONCE , $func, $args);
+        }
+        
         if(!is_callable($func))
         {
             echo new Exception("not callable");
             return false;
         }
         
-        if(empty(self::$tasks))
+        if(empty(self::$_tasks))
         {
             pcntl_alarm(1);
         }
         
         $time_now = time();
         $run_time = $time_now + $time_interval;
-        if(!isset(self::$tasks[$run_time]))
+        if(!isset(self::$_tasks[$run_time]))
         {
-            self::$tasks[$run_time] = array();
+            self::$_tasks[$run_time] = array();
         }
-        self::$tasks[$run_time][] = array($func, $args, $persistent, $time_interval);
+        self::$_tasks[$run_time][] = array($func, $args, $persistent, $time_interval);
         return true;
     }
     
@@ -97,14 +114,14 @@ class Timer
      */
     public static function tick()
     {
-        if(empty(self::$tasks))
+        if(empty(self::$_tasks))
         {
             pcntl_alarm(0);
             return;
         }
         
         $time_now = time();
-        foreach (self::$tasks as $run_time=>$task_data)
+        foreach (self::$_tasks as $run_time=>$task_data)
         {
             if($time_now >= $run_time)
             {
@@ -127,17 +144,33 @@ class Timer
                         self::add($time_interval, $task_func, $task_args);
                     }
                 }
-                unset(self::$tasks[$run_time]);
+                unset(self::$_tasks[$run_time]);
             }
         }
     }
     
     /**
+     * 删除定时器
+     * @param $timer_id
+     */
+    public static function del($timer_id)
+    {
+        if(self::$_event)
+        {
+            return self::$_event->del($timer_id, EventInterface::EV_TIMER);
+        }
+    }
+    
+    /**
      * 删除所有定时
      */
     public static function delAll()
     {
-        self::$tasks = array();
+        self::$_tasks = array();
         pcntl_alarm(0);
+        if(self::$_event)
+        {
+            self::$_event->clearAllTimer();
+        }
     }
 }

+ 1 - 1
Workerman/Worker.php

@@ -21,7 +21,7 @@ class Worker
      * 版本号
      * @var string
      */
-    const VERSION = '3.0.3';
+    const VERSION = '3.0.4';
     
     /**
      * 状态 启动中