|
|
@@ -1,10 +1,9 @@
|
|
|
<?php
|
|
|
namespace Man\Core;
|
|
|
-use Man\Core\Events\BaseEvent;
|
|
|
|
|
|
-require_once WORKERMAN_ROOT_DIR . 'Core/Events/Select.php';
|
|
|
-require_once WORKERMAN_ROOT_DIR . 'Core/AbstractWorker.php';
|
|
|
-require_once WORKERMAN_ROOT_DIR . 'Core/Lib/Config.php';
|
|
|
+use \Man\Core\Events\BaseEvent;
|
|
|
+use \Man\Core\Lib\Task;
|
|
|
+use \Man\Core\Lib\Config;
|
|
|
|
|
|
/**
|
|
|
* SocketWorker 监听某个端口,对外提供网络服务的worker
|
|
|
@@ -120,10 +119,16 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
protected $prereadLength = 4;
|
|
|
|
|
|
/**
|
|
|
- * 该进程使用的php文件
|
|
|
+ * 哪些文件不监控
|
|
|
* @var array
|
|
|
*/
|
|
|
- protected $includeFiles = array();
|
|
|
+ protected $monitorExcludePaths = array();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 哪些文件在监控中
|
|
|
+ * @var array
|
|
|
+ */
|
|
|
+ protected $filesToMonitor = array();
|
|
|
|
|
|
/**
|
|
|
* 统计信息
|
|
|
@@ -169,13 +174,13 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
$this->workerName = $worker_name ? $worker_name : get_class($this);
|
|
|
|
|
|
// 是否开启长连接
|
|
|
- $this->isPersistentConnection = (bool)Lib\Config::get( $this->workerName . '.persistent_connection');
|
|
|
+ $this->isPersistentConnection = (bool)Config::get( $this->workerName . '.persistent_connection');
|
|
|
// 最大请求数,超过这个数则安全重启,如果没有配置则使用PHP_INT_MAX
|
|
|
- $this->maxRequests = (int)Lib\Config::get( $this->workerName . '.max_requests');
|
|
|
+ $this->maxRequests = (int)Config::get( $this->workerName . '.max_requests');
|
|
|
$this->maxRequests = $this->maxRequests <= 0 ? PHP_INT_MAX : $this->maxRequests;
|
|
|
|
|
|
// 预读数据长度,长连接需要设置此项
|
|
|
- $preread_length = (int)Lib\Config::get( $this->workerName . '.preread_length');
|
|
|
+ $preread_length = (int)Config::get( $this->workerName . '.preread_length');
|
|
|
if($preread_length > 0)
|
|
|
{
|
|
|
$this->prereadLength = $preread_length;
|
|
|
@@ -186,13 +191,13 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
}
|
|
|
|
|
|
// 接收缓冲区大小限制
|
|
|
- if(($max_recv_buffer_size = Lib\Config::get($this->workerName . '.max_recv_buffer_size')) && $max_recv_buffer_size > 0)
|
|
|
+ if(($max_recv_buffer_size = Config::get($this->workerName . '.max_recv_buffer_size')) && $max_recv_buffer_size > 0)
|
|
|
{
|
|
|
$this->maxRecvBufferSize = $max_recv_buffer_size;
|
|
|
}
|
|
|
|
|
|
// 发送缓冲区大小限制
|
|
|
- if(($max_send_buffer_size = Lib\Config::get($this->workerName . '.max_send_buffer_size')) && $max_send_buffer_size > 0)
|
|
|
+ if(($max_send_buffer_size = Config::get($this->workerName . '.max_send_buffer_size')) && $max_send_buffer_size > 0)
|
|
|
{
|
|
|
$this->maxSendBufferSize = $max_send_buffer_size;
|
|
|
}
|
|
|
@@ -213,7 +218,16 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
$this->event = new $this->eventLoopName();
|
|
|
|
|
|
// 初始化任务系统
|
|
|
- \Man\Core\Lib\Task::init($this->event);
|
|
|
+ Task::init($this->event);
|
|
|
+
|
|
|
+ // 哪些目录或者文件不被监控
|
|
|
+ if(Config::get('workerman.debug') && !Config::get($this->workerName . '.no_debug') && !Config::get($this->workerName . '.no_reload'))
|
|
|
+ {
|
|
|
+ // 保存排除监控的目录或者文件
|
|
|
+ $this->monitorExcludePaths = $this->getExcludePaths();
|
|
|
+ // 添加任务
|
|
|
+ Task::add(1, array($this, 'checkFilesModify'));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -236,12 +250,12 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
if($this->protocol == 'udp')
|
|
|
{
|
|
|
// 添加读udp事件
|
|
|
- $this->event->add($this->mainSocket, Events\BaseEvent::EV_READ, array($this, 'recvUdp'));
|
|
|
+ $this->event->add($this->mainSocket, BaseEvent::EV_READ, array($this, 'recvUdp'));
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
// 添加accept事件
|
|
|
- $ret = $this->event->add($this->mainSocket, Events\BaseEvent::EV_READ, array($this, 'accept'));
|
|
|
+ $ret = $this->event->add($this->mainSocket, BaseEvent::EV_READ, array($this, 'accept'));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -255,10 +269,17 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
* 停止服务
|
|
|
* @return void
|
|
|
*/
|
|
|
- public function stop()
|
|
|
+ public function stop($force = false)
|
|
|
{
|
|
|
// 触发该worker进程onStop事件
|
|
|
$this->onStop();
|
|
|
+
|
|
|
+ // 强制退出
|
|
|
+ if($force)
|
|
|
+ {
|
|
|
+ $this->workerStatus = self::STATUS_SHUTDOWN;
|
|
|
+ exit(0);
|
|
|
+ }
|
|
|
|
|
|
// 标记这个worker开始停止服务
|
|
|
if($this->workerStatus != self::STATUS_SHUTDOWN)
|
|
|
@@ -266,7 +287,7 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
// 停止接收连接
|
|
|
if($this->mainSocket)
|
|
|
{
|
|
|
- $this->event->del($this->mainSocket, Events\BaseEvent::EV_READ);
|
|
|
+ $this->event->del($this->mainSocket, BaseEvent::EV_READ);
|
|
|
fclose($this->mainSocket);
|
|
|
}
|
|
|
$this->workerStatus = self::STATUS_SHUTDOWN;
|
|
|
@@ -277,6 +298,11 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
{
|
|
|
exit(0);
|
|
|
}
|
|
|
+
|
|
|
+ // EXIT_WAIT_TIME秒后退出进程
|
|
|
+ Task::add(self::EXIT_WAIT_TIME, function(){
|
|
|
+ exit(0);
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -292,7 +318,7 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
stream_set_blocking($this->mainSocket, 0);
|
|
|
// 获取协议
|
|
|
$mata_data = stream_get_meta_data($socket);
|
|
|
- $this->protocol = substr($mata_data['stream_type'], 0, 3);
|
|
|
+ list($this->protocol) = explode('_' ,$mata_data['stream_type']);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -303,7 +329,6 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
public function setEventLoopName($event_loop_name)
|
|
|
{
|
|
|
$this->eventLoopName = "\\Man\\Core\\Events\\".$event_loop_name;
|
|
|
- require_once WORKERMAN_ROOT_DIR . 'Core/Events/'.ucfirst(str_replace('WORKERMAN', '', $event_loop_name)).'.php';
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -331,7 +356,7 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
|
|
|
// 非阻塞
|
|
|
stream_set_blocking($this->connections[$fd], 0);
|
|
|
- $this->event->add($this->connections[$fd], Events\BaseEvent::EV_READ , array($this, 'dealInputBase'), $fd);
|
|
|
+ $this->event->add($this->connections[$fd], BaseEvent::EV_READ , array($this, 'dealInputBase'), $fd);
|
|
|
return $new_connection;
|
|
|
}
|
|
|
|
|
|
@@ -465,8 +490,6 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
{
|
|
|
// 停止服务
|
|
|
$this->stop();
|
|
|
- // EXIT_WAIT_TIME秒后退出进程
|
|
|
- pcntl_alarm(self::EXIT_WAIT_TIME);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -484,8 +507,8 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
// udp忽略
|
|
|
if($this->protocol != 'udp' && isset($this->connections[$fd]))
|
|
|
{
|
|
|
- $this->event->del($this->connections[$fd], Events\BaseEvent::EV_READ);
|
|
|
- $this->event->del($this->connections[$fd], Events\BaseEvent::EV_WRITE);
|
|
|
+ $this->event->del($this->connections[$fd], BaseEvent::EV_READ);
|
|
|
+ $this->event->del($this->connections[$fd], BaseEvent::EV_WRITE);
|
|
|
fclose($this->connections[$fd]);
|
|
|
}
|
|
|
unset($this->connections[$fd], $this->recvBuffers[$fd], $this->sendBuffers[$fd]);
|
|
|
@@ -497,18 +520,16 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
*/
|
|
|
protected function installSignal()
|
|
|
{
|
|
|
- // 闹钟信号
|
|
|
- $this->event->add(SIGALRM, Events\BaseEvent::EV_SIGNAL, array($this, 'signalHandler'));
|
|
|
// 终止进程信号
|
|
|
- $this->event->add(SIGINT, Events\BaseEvent::EV_SIGNAL, array($this, 'signalHandler'));
|
|
|
+ $this->event->add(SIGINT, BaseEvent::EV_SIGNAL, array($this, 'signalHandler'));
|
|
|
// 平滑重启信号
|
|
|
- $this->event->add(SIGHUP, Events\BaseEvent::EV_SIGNAL, array($this, 'signalHandler'));
|
|
|
+ $this->event->add(SIGHUP, BaseEvent::EV_SIGNAL, array($this, 'signalHandler'));
|
|
|
// 报告进程状态
|
|
|
- $this->event->add(SIGUSR1, Events\BaseEvent::EV_SIGNAL, array($this, 'signalHandler'));
|
|
|
+ $this->event->add(SIGUSR1, BaseEvent::EV_SIGNAL, array($this, 'signalHandler'));
|
|
|
// 报告该进程使用的文件
|
|
|
- $this->event->add(SIGUSR2, Events\BaseEvent::EV_SIGNAL, array($this, 'signalHandler'));
|
|
|
+ $this->event->add(SIGUSR2, BaseEvent::EV_SIGNAL, array($this, 'signalHandler'));
|
|
|
// 关闭标准输入输出
|
|
|
- $this->event->add(SIGTTOU, Events\BaseEvent::EV_SIGNAL, array($this, 'signalHandler'));
|
|
|
+ $this->event->add(SIGTTOU, BaseEvent::EV_SIGNAL, array($this, 'signalHandler'));
|
|
|
|
|
|
// 设置忽略信号
|
|
|
pcntl_signal(SIGTTIN, SIG_IGN);
|
|
|
@@ -526,39 +547,26 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
{
|
|
|
switch($signal)
|
|
|
{
|
|
|
- // 时钟处理函数
|
|
|
- case SIGALRM:
|
|
|
- // 停止服务后EXIT_WAIT_TIME秒还没退出则强制退出
|
|
|
- if($this->workerStatus == self::STATUS_SHUTDOWN)
|
|
|
- {
|
|
|
- exit(0);
|
|
|
- }
|
|
|
- break;
|
|
|
// 停止该进程
|
|
|
case SIGINT:
|
|
|
$this->stop();
|
|
|
- // EXIT_WAIT_TIME秒后退出进程
|
|
|
- pcntl_alarm(self::EXIT_WAIT_TIME);
|
|
|
break;
|
|
|
// 平滑重启
|
|
|
case SIGHUP:
|
|
|
$this->onReload();
|
|
|
// 如果配置了no_reload则不重启该进程
|
|
|
- if(\Man\Core\Lib\Config::get($this->workerName.'.no_reload'))
|
|
|
+ if(Config::get($this->workerName.'.no_reload'))
|
|
|
{
|
|
|
return;
|
|
|
}
|
|
|
$this->stop();
|
|
|
- // EXIT_WAIT_TIME秒后退出进程
|
|
|
- pcntl_alarm(self::EXIT_WAIT_TIME);
|
|
|
break;
|
|
|
// 报告进程状态
|
|
|
case SIGUSR1:
|
|
|
$this->writeStatusToQueue();
|
|
|
break;
|
|
|
- // 报告进程使用的php文件
|
|
|
+ // 预留空
|
|
|
case SIGUSR2:
|
|
|
- $this->writeFilesListToQueue();
|
|
|
break;
|
|
|
// FileMonitor检测到终端已经关闭,向此进程发送SIGTTOU信号,关闭此进程的标准输入输出
|
|
|
case SIGTTOU:
|
|
|
@@ -617,7 +625,7 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
{
|
|
|
return false;
|
|
|
}
|
|
|
- $this->event->add($this->connections[$this->currentDealFd], Events\BaseEvent::EV_WRITE, array($this, 'tcpWriteToClient'));
|
|
|
+ $this->event->add($this->connections[$this->currentDealFd], BaseEvent::EV_WRITE, array($this, 'tcpWriteToClient'));
|
|
|
return null;
|
|
|
}
|
|
|
// udp 直接发送,要求数据包不能超过65515
|
|
|
@@ -663,6 +671,11 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
*/
|
|
|
public function getRemoteAddress($fd = null)
|
|
|
{
|
|
|
+ if($this->protocol === 'unix')
|
|
|
+ {
|
|
|
+ return '';
|
|
|
+ }
|
|
|
+
|
|
|
if(empty($fd) && $this->protocol !== 'udp')
|
|
|
{
|
|
|
if(!isset($this->connections[$this->currentDealFd]))
|
|
|
@@ -689,6 +702,10 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
*/
|
|
|
public function getRemoteIp($fd = null)
|
|
|
{
|
|
|
+ if($this->protocol === 'unix')
|
|
|
+ {
|
|
|
+ return '';
|
|
|
+ }
|
|
|
$ip = '';
|
|
|
$address= $this->getRemoteAddress($fd);
|
|
|
if($address)
|
|
|
@@ -705,6 +722,10 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
*/
|
|
|
public function getRemotePort($fd = null)
|
|
|
{
|
|
|
+ if($this->protocol === 'unix')
|
|
|
+ {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
$port = 0;
|
|
|
$address= $this->getRemoteAddress($fd);
|
|
|
if($address)
|
|
|
@@ -759,29 +780,6 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 开发环境将当前进程使用的文件写入消息队列,用于FileMonitor监控文件更新
|
|
|
- * @return void
|
|
|
- */
|
|
|
- protected function writeFilesListToQueue()
|
|
|
- {
|
|
|
- if(!Master::getQueueId())
|
|
|
- {
|
|
|
- return;
|
|
|
- }
|
|
|
- $error_code = 0;
|
|
|
- $flip_file_list = array_flip(get_included_files());
|
|
|
- $file_list = array_diff_key($flip_file_list, $this->includeFiles);
|
|
|
- $this->includeFiles = $flip_file_list;
|
|
|
- if($file_list)
|
|
|
- {
|
|
|
- foreach(array_chunk($file_list, 10, true) as $list)
|
|
|
- {
|
|
|
- @msg_send(Master::getQueueId(), self::MSG_TYPE_FILE_MONITOR, array_keys($list), true, false, $error_code);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
* 是否所有任务都已经完成
|
|
|
* @return bool
|
|
|
*/
|
|
|
@@ -816,6 +814,78 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
return empty($this->connections);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 检查文件更新时间,如果有更改则平滑重启服务(开发的时候用到)
|
|
|
+ * @return void
|
|
|
+ */
|
|
|
+ public function checkFilesModify()
|
|
|
+ {
|
|
|
+ // 当前进程使用的所有php文件
|
|
|
+ $include_files = get_included_files();
|
|
|
+ // 清除文件缓存
|
|
|
+ clearstatcache();
|
|
|
+ foreach($include_files as $file)
|
|
|
+ {
|
|
|
+ // 是新文件,尝试加入监控列表
|
|
|
+ if(!isset($this->filesToMonitor[$file]))
|
|
|
+ {
|
|
|
+ $is_exclude_file = false;
|
|
|
+ foreach($this->monitorExcludePaths as $path)
|
|
|
+ {
|
|
|
+ // 是被排除的文件
|
|
|
+ if(0 === strpos($file, $path))
|
|
|
+ {
|
|
|
+ $is_exclude_file = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 不是排除文件则加入监控列表
|
|
|
+ if(!$is_exclude_file)
|
|
|
+ {
|
|
|
+ if($mtime = @filemtime($file))
|
|
|
+ {
|
|
|
+ $this->filesToMonitor[$file] = $mtime;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 是已经加入监控列表的文件,查看文件更改时间
|
|
|
+ $mtime_now = @filemtime($file);
|
|
|
+ if(false === $mtime_now)
|
|
|
+ {
|
|
|
+ unset($this->filesToMonitor[$file]);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ // 文件有修改,退出进程以便重新加载文件
|
|
|
+ if($this->filesToMonitor[$file] != $mtime_now)
|
|
|
+ {
|
|
|
+ $this->notice("$file updated and reload workers");
|
|
|
+ $this->stop(true);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 那些路径不监控更新
|
|
|
+ * @return array
|
|
|
+ */
|
|
|
+ protected function getExcludePaths()
|
|
|
+ {
|
|
|
+ $config_exclude_path = Config::get('Monitor.exclude_path');
|
|
|
+ // 被排除的路径
|
|
|
+ $exclude_path = array();
|
|
|
+ // 因为配置可能会被更改,所以每次都会重新从配置中查找排除路径
|
|
|
+ foreach($config_exclude_path as $path)
|
|
|
+ {
|
|
|
+ if($real_path = realpath($path))
|
|
|
+ {
|
|
|
+ $exclude_path[] = $real_path;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return (array)$exclude_path;
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
/**
|
|
|
* 该worker进程开始服务的时候会触发一次,可以在这里做一些全局的事情
|