Prechádzať zdrojové kódy

文件更新监控,每10个文件写一次消息队列

walkor 12 rokov pred
rodič
commit
406dabfb94

+ 0 - 54
applications/Common/Protocols/RpcProtocol.php

@@ -1,54 +0,0 @@
-<?php 
-
-/**
- * RPC 协议解析 相关
- * 协议格式为 [json字符串\n]
- * @author walkor <worker-man@qq.com>
- * */
-class RpcProtocol
-{
-   /**
-    * 从socket缓冲区中预读长度
-    * @var integer
-    */
-    const PRREAD_LENGTH = 87380;
-    
-    /**
-     * 判断数据包是否接收完整
-     * @param string $bin_data
-     * @param mixed $data
-     * @return integer 0代表接收完毕,大于0代表还要接收数据
-     */
-    public static function dealInput($bin_data)
-    {
-        $bin_data_length = strlen($bin_data);
-        // 判断最后一个字符是否为\n,\n代表一个数据包的结束
-        if($bin_data[$bin_data_length-1] !="\n")
-        {
-            // 再读
-            return self::PRREAD_LENGTH;
-        }
-        return 0;
-    }
-    
-    /**
-     * 将数据打包成Rpc协议数据
-     * @param mixed $data
-     * @return string
-     */
-    public static function encode($data)
-    {
-        return json_encode($data)."\n";
-    }
-    
-   /**
-    * 解析Rpc协议数据
-    * @param string $bin_data
-    * @return mixed
-    */
-    public static function decode($bin_data)
-    {
-        return json_decode(trim($bin_data), true);
-    }
-    
-}

+ 0 - 299
applications/Rpc/Clients/RpcClient.php

@@ -1,299 +0,0 @@
-<?php
-/**
- * 
- *  RpcClient Rpc客户端
- *  
- *  
- *  示例
- *  // 服务端列表
-    $address_array = array(
-            'tcp://127.0.0.1:2015',
-            'tcp://127.0.0.1:2015'
-            );
-    // 配置服务端列表
-    RpcClient::config($address_array);
-    
-    $uid = 567;
-    $user_client = RpcClient::instance('User');
-    // ==同步调用==
-    $ret_sync = $user_client->getInfoByUid($uid);
-   
-    // ==异步调用==
-    // 异步发送数据
-    $user_client->asend_getInfoByUid($uid);
-    $user_client->asend_getEmail($uid);
-
-     这里是其它的业务代码
-     ..............................................
-     
-    // 异步接收数据
-    $ret_async1 = $user_client->arecv_getEmail($uid);
-    $ret_async2 = $user_client->arecv_getInfoByUid($uid);
- *  
- * @author walkor <worker-man@qq.com>
- */
-class RpcClient
-{
-    /**
-     * 发送数据和接收数据的超时时间  单位S
-     * @var integer
-     */
-    const TIME_OUT = 1;
-    
-    /**
-     * 异步调用发送数据前缀
-     * @var string
-     */
-    const ASYNC_SEND_PREFIX = 'asend_';
-    
-    /**
-     * 异步调用接收数据
-     * @var string
-     */
-    const ASYNC_RECV_PREFIX = 'arecv_';
-    
-    /**
-     * 服务端地址
-     * @var array
-     */
-    protected static $addressArray = array();
-    
-    /**
-     * 异步调用实例
-     * @var string
-     */
-    protected static $asyncInstances = array();
-    
-    /**
-     * 同步调用实例
-     * @var string
-     */
-    protected static $instances = array();
-    
-    /**
-     * 到服务端的socket连接
-     * @var resource
-     */
-    protected  $connection = null;
-    
-    /**
-     * 实例的服务名
-     * @var string
-     */
-    protected $serviceName = '';
-    
-    /**
-     * 设置/获取服务端地址
-     * @param array $address_array
-     */
-    public static function config($address_array = array())
-    {
-        if(!empty($address_array))
-        {
-            self::$addressArray = $address_array;
-        }
-        return self::$addressArray;
-    }
-    
-    /**
-     * 获取一个实例
-     * @param string $service_name
-     * @return instance of RpcClient
-     */
-    public static function instance($service_name)
-    {
-        if(!isset(self::$instances[$service_name]))
-        {
-            self::$instances[$service_name] = new self($service_name);
-        }
-        return self::$instances[$service_name];
-    }
-    
-    /**
-     * 构造函数
-     * @param string $service_name
-     */
-    protected function __construct($service_name)
-    {
-        $this->serviceName = $service_name;
-    }
-    
-    /**
-     * 调用
-     * @param string $method
-     * @param array $arguments
-     * @throws Exception
-     * @return 
-     */
-    public function __call($method, $arguments)
-    {
-        // 判断是否是异步发送
-        if(0 === strpos($method, self::ASYNC_SEND_PREFIX))
-        {
-            $real_method = substr($method, strlen(self::ASYNC_SEND_PREFIX));
-            $instance_key = $real_method . serialize($arguments);
-            if(isset(self::$asyncInstances[$instance_key]))
-            {
-                throw new Exception($this->serviceName . "->$method(".implode(',', $arguments).") have already been called");
-            }
-            self::$asyncInstances[$instance_key] = new self($this->serviceName);
-            return self::$asyncInstances[$instance_key]->sendData($real_method, $arguments);
-        }
-        // 如果是异步接受数据
-        if(0 === strpos($method, self::ASYNC_RECV_PREFIX))
-        {
-            $real_method = substr($method, strlen(self::ASYNC_RECV_PREFIX));
-            $instance_key = $real_method . serialize($arguments);
-            if(!isset(self::$asyncInstances[$instance_key]))
-            {
-                throw new Exception($this->serviceName . "->arecv_$real_method(".implode(',', $arguments).") have not been called");
-            }
-            return self::$asyncInstances[$instance_key]->recvData($real_method, $arguments);
-        }
-        // 同步发送接收
-        $this->sendData($method, $arguments);
-        return $this->recvData();
-    }
-    
-    /**
-     * 发送数据给服务端
-     * @param string $method
-     * @param array $arguments
-     */
-    public function sendData($method, $arguments)
-    {
-        $this->openConnection();
-        $bin_data = RpcProtocol::encode(array(
-                'class'              => $this->serviceName,
-                'method'         => $method,
-                'param_array'  => $arguments,
-                ));
-        return fwrite($this->connection, $bin_data) == strlen($bin_data);
-    }
-    
-    /**
-     * 从服务端接收数据
-     * @throws Exception
-     */
-    public function recvData()
-    {
-        $ret = fgets($this->connection);
-        $this->closeConnection();
-        if(!$ret)
-        {
-            throw new Exception("recvData empty");
-        }
-        return RpcProtocol::decode($ret);
-    }
-    
-    /**
-     * 打开到服务端的连接
-     * @return void
-     */
-    protected function openConnection()
-    {
-        $address = self::$addressArray[array_rand(self::$addressArray)];
-        $this->connection = stream_socket_client($address, $err_no, $err_msg);
-        if(!$this->connection)
-        {
-            throw new Exception("can not connect to $address , $err_no:$err_msg");
-        }
-        stream_set_timeout($this->connection, self::TIME_OUT);
-    }
-    
-    /**
-     * 关闭到服务端的连接
-     * @return void
-     */
-    protected function closeConnection()
-    {
-        fclose($this->connection);
-        $this->connection = null;
-    }
-}
-
-/**
- * RPC 协议解析 相关
- * 协议格式为 [json字符串\n]
- * @author walkor <worker-man@qq.com>
- * */
-class RpcProtocol
-{
-    /**
-     * 从socket缓冲区中预读长度
-     * @var integer
-     */
-    const PRREAD_LENGTH = 87380;
-
-    /**
-     * 判断数据包是否接收完整
-     * @param string $bin_data
-     * @param mixed $data
-     * @return integer 0代表接收完毕,大于0代表还要接收数据
-     */
-    public static function dealInput($bin_data)
-    {
-        $bin_data_length = strlen($bin_data);
-        // 判断最后一个字符是否为\n,\n代表一个数据包的结束
-        if($bin_data[$bin_data_length-1] !="\n")
-        {
-            // 再读
-            return self::PRREAD_LENGTH;
-        }
-        return 0;
-    }
-
-    /**
-     * 将数据打包成Rpc协议数据
-     * @param mixed $data
-     * @return string
-     */
-    public static function encode($data)
-    {
-        return json_encode($data)."\n";
-    }
-
-    /**
-     * 解析Rpc协议数据
-     * @param string $bin_data
-     * @return mixed
-     */
-    public static function decode($bin_data)
-    {
-        return json_decode(trim($bin_data), true);
-    }
-}
-
-// ==以下调用示例==
-if(false)
-{
-    // 服务端列表
-    $address_array = array(
-            'tcp://127.0.0.1:2015',
-            'tcp://127.0.0.1:2015'
-            );
-    // 配置服务端列表
-    RpcClient::config($address_array);
-    
-    $uid = 567;
-    $user_client = RpcClient::instance('User');
-    // ==同步调用==
-    $ret_sync = $user_client->getInfoByUid($uid);
-   
-    // ==异步调用==
-    // 异步发送数据
-    $user_client->asend_getInfoByUid($uid);
-    $user_client->asend_getEmail($uid);
-    
-    /**
-     * 这里是其它的业务代码
-     * ..............................................
-     **/
-    
-    // 异步接收数据
-    $ret_async1 = $user_client->arecv_getEmail($uid);
-    $ret_async2 = $user_client->arecv_getInfoByUid($uid);
-    
-    // 打印结果
-    var_dump($ret_sync, $ret_async1, $ret_async2);
-}

+ 0 - 27
applications/Rpc/Services/Blog.php

@@ -1,27 +0,0 @@
-<?php
-/**
- *  测试
- * @author walkor <worker-man@qq.com>
- */
-class Blog
-{
-   public static function getByBlogId($blog_id)
-   {
-       return array(
-               'blog_id'    => $blog_id,
-               'title'=> 'workerman is a high performance RPC server framework for network applications implemented in PHP using libevent',
-               'content'   => 'this is content ...',
-               );
-   }
-   
-   public static function getTitleListByUid($uid)
-   {
-       return array(
-               'blog title 1',
-               'blog title 2',
-               'blog title 3',
-               'blog title 4',
-               'blog title 5',
-               );
-   }
-}

+ 0 - 22
applications/Rpc/Services/User.php

@@ -1,22 +0,0 @@
-<?php
-/**
- *  测试
- * @author walkor <worker-man@qq.com>
- */
-class User
-{
-   public static function getInfoByUid($uid)
-   {
-       return array(
-               'uid'    => $uid,
-               'name'=> 'test',
-               'age'   => 18,
-               'sex'    => 'hmm..',
-               );
-   }
-   
-   public static function getEmail($uid)
-   {
-       return 'worker-man@qq.com';
-   }
-}

+ 0 - 13
conf/conf.d/RpcWorker.conf

@@ -1,13 +0,0 @@
-;Rpc服务
-;监听的端口
-listen = tcp://0.0.0.0:2015
-;这里设置成短连接
-persistent_connection = 0
-;启动多少worker进程
-start_workers=1
-;接收多少请求后退出
-max_requests=1000
-;以哪个用户运行该worker进程
-user=www-data
-;socket有数据可读的时候预读长度,一般设置为应用层协议包头的长度
-preread_length=84000

+ 0 - 13
conf/conf.d/WorkerManAdmin.conf

@@ -1,13 +0,0 @@
-;WorkerMan管理后台
-;监听的端口
-listen = tcp://0.0.0.0:3000
-;http 协议 这里设置成短连接
-persistent_connection = 0
-;启动多少worker进程
-start_workers=1
-;接收多少请求后退出
-max_requests=1000
-;以哪个用户运行该worker进程
-user=root
-;socket有数据可读的时候预读长度,一般设置为应用层协议包头的长度
-preread_length=84000

+ 1 - 1
man/Core/Master.php

@@ -313,7 +313,7 @@ class Master
                 self::$listenedSockets[$worker_name] = stream_socket_server($config['listen'], $error_no, $error_msg, $flags);
                 if(!self::$listenedSockets[$worker_name])
                 {
-                    Lib\Log::add("can not create socket {{$config['listen']} info:{$error_no} {$error_msg}\tServer start fail");
+                    Lib\Log::add("can not create socket {$config['listen']} info:{$error_no} {$error_msg}\tServer start fail");
                     exit("\n\033[31;40mcan not create socket {{$config['listen']} info:{$error_no} {$error_msg}\033[0m\n\n\033[31;40mServer start fail\033[0m\n\n");
                 }
             }

+ 4 - 1
man/Core/SocketWorker.php

@@ -736,7 +736,10 @@ abstract class SocketWorker extends AbstractWorker
         $this->includeFiles = $flip_file_list;
         if($file_list)
         {
-            msg_send(Master::getQueueId(), self::MSG_TYPE_FILE_MONITOR, array_keys($file_list), true, false, $error_code);
+            foreach(array_chunk($file_list, 10, true) as $list)
+            {
+                msg_send(Master::getQueueId(), self::MSG_TYPE_FILE_MONITOR, array_keys($list), true, false, $error_code);
+            }
         }
     }
     

+ 0 - 72
workers/RpcWorker.php

@@ -1,72 +0,0 @@
-<?php
-require_once WORKERMAN_ROOT_DIR . 'man/Core/SocketWorker.php';
-require_once WORKERMAN_ROOT_DIR . 'applications/Common/Protocols/RpcProtocol.php';
-
-/**
- * 
- *  RpcWorker,Rpc服务的入口文件
- *  根据客户端传递参数调用 applications/Rpc/Services/目录下的文件的类的方法
- *  
- * @author walkor <worker-man@qq.com>
- */
-class RpcWorker extends Man\Core\SocketWorker
-{
-    /**
-     * 确定数据是否接收完整
-     * @see Man\Core.SocketWorker::dealInput()
-     */
-    public function dealInput($recv_str)
-    {
-        return RpcProtocol::dealInput($recv_str); 
-    }
-
-    /**
-     * 数据接收完整后处理业务逻辑
-     * @see Man\Core.SocketWorker::dealProcess()
-     */
-    public function dealProcess($recv_str)
-    {
-        /**
-         * data的数据格式为
-         * ['class'=>xx, 'method'=>xx, 'param_array'=>array(xx)]
-         * @var array
-         */
-        $data = RpcProtocol::decode($recv_str);
-        // 判断数据是否正确
-        if(empty($data['class']) || empty($data['method']) || !isset($data['param_array']))
-        {
-            // 发送数据给客户端,请求包错误
-            return $this->sendToClient(RpcProtocol::encode(array('code'=>400, 'msg'=>'bad request', 'data'=>null)));
-        }
-        // 获得要调用的类、方法、及参数
-        $class = $data['class'];
-        $method = $data['method'];
-        $param_array = $data['param_array'];
-        
-        // 判断类对应文件是否载入
-        if(!class_exists($class))
-        {
-            $include_file = WORKERMAN_ROOT_DIR . "applications/Rpc/Services/$class.php";
-            if(!is_file($include_file))
-            {
-                // 发送数据给客户端 类不存在
-                return $this->sendToClient(RpcProtocol::encode(array('code'=>404, 'msg'=>'class not found', 'data'=>null)));
-            }
-            require_once $include_file;
-        }
-        
-        // 调用类的方法
-        try 
-        {
-            $ret = call_user_func_array(array($class, $method), $param_array);
-            // 发送数据给客户端,调用成功,data下标对应的元素即为调用结果
-            return $this->sendToClient(RpcProtocol::encode(array('code'=>0, 'msg'=>'ok', 'data'=>$ret)));
-        }
-        // 有异常
-        catch(Exception $e)
-        {
-            // 发送数据给客户端,发生异常,调用失败
-            return $this->sendToClient(RpcProtocol::encode(array('code'=>500, 'msg'=>$e->getMessage(), 'data'=>$e)));
-        }
-    }
-}

+ 0 - 108
workers/WorkerManAdmin.php

@@ -1,108 +0,0 @@
-<?php
-require_once WORKERMAN_ROOT_DIR . 'man/Core/SocketWorker.php';
-require_once WORKERMAN_ROOT_DIR . 'applications/Common/Protocols/Http.php';
-
-/**
- * 
- *  WorkerMan 管理后台
- *  HTTP协议
- *  
- * @author walkor <worker-man@qq.com>
- */
-class WorkerManAdmin extends Man\Core\SocketWorker
-{
-    /**
-     * 资源类型
-     * @var array
-     */
-    protected static $typeMap = array(
-            'js'     => 'text/javascript',
-            'png' => 'image/png',
-            'jpg'  => 'image/jpeg',
-            'gif'   => 'image/gif',
-            'png' => 'image/png',
-            'css'   => 'text/css',
-        );
-    
-    public function onStart()
-    {
-        App\Common\Protocols\HttpCache::init();
-    }
-
-    /**
-     * 确定数据是否接收完整
-     * @see Man\Core.SocketWorker::dealInput()
-     */
-    public function dealInput($recv_str)
-    {
-        return App\Common\Protocols\http_input($recv_str); 
-    }
-
-    /**
-     * 数据接收完整后处理业务逻辑
-     * @see Man\Core.SocketWorker::dealProcess()
-     */
-    public function dealProcess($recv_str)
-    {
-         // http请求处理开始。解析http协议,生成$_POST $_GET $_COOKIE
-        App\Common\Protocols\http_start($recv_str);
-        // 开启session
-        App\Common\Protocols\session_start();
-        // 缓冲输出
-        ob_start();
-        // 请求的文件
-        $file = $_SERVER['REQUEST_URI'];
-        $pos = strpos($file, '?');
-        if($pos !== false)
-        {
-            // 去掉文件名后面的querystring
-            $file = substr($_SERVER['REQUEST_URI'], 0, $pos);
-        }
-        // 得到文件真实路径
-        $file = WORKERMAN_ROOT_DIR . 'applications/WorkerManAdmin/'.$file;
-        if(!is_file($file))
-        {
-            // 从定向到index.php
-            $file = WORKERMAN_ROOT_DIR . 'applications/WorkerManAdmin/index.php';
-        }
-        // 请求的文件存在
-        if(is_file($file))
-        {
-            $extension = pathinfo($file, PATHINFO_EXTENSION);
-            // 如果请求的是php文件
-            if($extension == 'php')
-            {
-                // 载入php文件
-                try 
-                {
-                    include $file;
-                }
-                catch(\Exception $e) 
-                {
-                    // 如果不是exit
-                    if($e->getMessage() != 'jump_exit')
-                    {
-                        echo $e;
-                    }
-                }
-            }
-            // 请求的是静态资源文件
-            else
-            {
-                if(isset(self::$typeMap[$extension]))
-                {
-                    App\Common\Protocols\header('Content-Type: '. self::$typeMap[$extension]);
-                }
-                // 发送文件
-                readfile($file);
-            }
-        }
-        else 
-        {
-            echo 'index.php missed';
-        }
-        $content = ob_get_clean();
-        $buffer = App\Common\Protocols\http_end($content);
-        $this->sendToClient($buffer);
-    }
-}