PHP模拟supervisor的进程管理

前言

模拟supervisor进程管理DEMO(简易实现)

截图:
模拟supervisor的进程管理

实现

1、在主进程循环内启动子进程执行命令
2、在web输入 127.0.0.1:7865 获取子进程状态
3、socket接收请求消息,并且执行相应操作,返回web页面
4、回收子进程,防止称为僵尸进程

不足:无法持续监听错误页面。由于socket得到的响应是通过include函数加载的,所以在加载的页面内不能出现tail -f命令,否则stream就会掉入了死循环了~。我想应该有方案解决(写了socket+多进程模式,模仿fpm在接收到请求之后就启动一个子进程去处理的模式,但是执行有问题。因此将代码贴出来希望得到大家的指点)。
延伸:由于对进程可以很好的管理(期望如此),那么就可以定制化自己的一些需求,比如:(1)定制AMQP的消费者进程管理服务。(2)模拟crontab定时服务。

知识点

代码实现的过程中,有很多的细节是值得学习的。
1、在while()循环中,启用了stream的非阻塞模式。所以不能在循环中使用sleep(1),而是用stream_select($read, $write, $except, 1)让stream内部阻塞。
关于阻塞非阻塞模式,可以参阅这里
2、能够执行外部程序的函数很多,但是都稍有不同。这里采用的是proc_open,是一个很强大的函数。在这之前我曾用pcntl_exec执行过外部程序,但是需要先pcntl_fork。而用其他的如exec,shell_exec无法对子进程进行管理。
3、重启或停止等操作子进程时,只是先更改主进程中该子进程在内存中的的状态,并不是真正的对子进程操作。在统一处init()处理子进程。如此才能防止因为子进程启动时的上下文导致的一些怪异的现象。

代码

主进程代码:Process.php

<?php
require_once __DIR__ . '/Consumer.php';require_once __DIR__ . '/StreamConnection.php';require_once __DIR__ . '/Http.php';class Process{
/** 
* 待启动的消费者数组
*/
protected $consumers = array();
protected $childPids = array();
const PPID_FILE = __DIR__ . '/process';
protected $serializerConsumer;
public function __construct()
{
$this->consumers = $this->getConsumers();
}
// 这里是个DEMO,实际可以用读取配置文件的方式。
public function getConsumers()
{
$consumer = new Consumer([
'program' => 'test',
'command' => '/usr/bin/php test.php',
'directory' => __DIR__,
'logfile' => __DIR__ . '/test.log',
'uniqid' => uniqid(),
'auto_restart' => false,
]);
return [
$consumer->uniqid => $consumer,
];
}
public function run()
{
if (empty($this->consumers)) {
// consumer empty
return;
}
if ($this->_notifyMaster()) {
// master alive
return;
}
$pid = pcntl_fork();
if ($pid < 0) {
exit;
} elseif ($pid > 0) {
exit;
}
if (!posix_setsid()) {
exit;
}
$stream = new StreamConnection('tcp://0.0.0.0:7865');
@cli_set_process_title('AMQP Master Process');
// 将主进程ID写入文件
file_put_contents(self::PPID_FILE, getmypid());
// master进程继续
while (true) {
$this->init();
pcntl_signal_dispatch();
$this->waitpid();
// 如果子进程被全部回收,则主进程退出
// if (empty($this->childPids)) {
//     $stream->close($stream->getSocket());
//     break;
// }
$stream->accept(function ($uniqid, $action) {
$this->handle($uniqid, $action);
return $this->display();
});
}
}
protected function init()
{
foreach ($this->consumers as &$c) {
switch ($c->state) {
case Consumer::RUNNING:
case Consumer::STOP:
break;
case Consumer::NOMINAL:
case Consumer::STARTING:
$this->fork($c);
break;
case Consumer::STOPING:
if ($c->pid && posix_kill($c->pid, SIGTERM)) {
$this->reset($c, Consumer::STOP);
}
break;
case Consumer::RESTART:
if (empty($c->pid)) {
$this->fork($c);
break;
}
if (posix_kill($c->pid, SIGTERM)) {
$this->reset($c, Consumer::STOP);
$this->fork($c);
}
break;
default:
break;
}
}
}
protected function reset(Consumer $c, $state)
{
$c->pid = '';
$c->uptime = '';
$c->state = $state;
$c->process = null;
}
protected function waitpid()
{
foreach ($this->childPids as $uniqid => $pid) {
$result = pcntl_waitpid($pid, $status, WNOHANG);
if ($result == $pid || $result == -1) {
unset($this->childPids[$uniqid]);
$c = &$this->consumers[$uniqid];
$state = pcntl_wifexited($status) ? Consumer::EXITED : Consumer::STOP;
$this->reset($c, $state);
}
}
}
/**
* 父进程存活情况下,只会通知父进程信息,否则可能产生多个守护进程
*/
private function _notifyMaster()
{
$ppid = file_get_contents(self::PPID_FILE );
$isAlive = $this->checkProcessAlive($ppid);
if (!$isAlive) return false;
return true;
}
public function checkProcessAlive($pid)
{
if (empty($pid)) return false;
$pidinfo = `ps co pid {$pid} | xargs`;
$pidinfo = trim($pidinfo);
$pattern = "/.*?PID.*?(\\d+).*?/";
preg_match($pattern, $pidinfo, $matches);
return empty($matches) ? false : ($matches[1] == $pid ? true : false);
}
/**
* fork一个新的子进程
*/
protected function fork(Consumer $c)
{
$descriptorspec = [2 => ['file', $c->logfile, 'a'],];
$process = proc_open('exec ' . $c->command, $descriptorspec, $pipes, $c->directory);
if ($process) {
$ret = proc_get_status($process);
if ($ret['running']) {
$c->state = Consumer::RUNNING;
$c->pid = $ret['pid'];
$c->process = $process;
$c->uptime = date('m-d H:i');
$this->childPids[$c->uniqid] = $ret['pid'];
} else {
$c->state = Consumer::EXITED;
proc_close($process);
}
} else {
$c->state = Consumer::ERROR;
}
return $c;
}
public function display()
{
$location = 'http://127.0.0.1:7865';
$basePath = Http::$basePath;
$scriptName = isset($_SERVER['SCRIPT_NAME']) &&
!empty($_SERVER['SCRIPT_NAME']) &&
$_SERVER['SCRIPT_NAME'] != '/' ? $_SERVER['SCRIPT_NAME'] : '/index.php';
if ($scriptName == '/index.html') {
return Http::status_301($location);
}
$sourcePath = $basePath . $scriptName;
if (!is_file($sourcePath)) {
return Http::status_404();
}
ob_start();
include $sourcePath;
$response = ob_get_contents();
ob_clean();
return Http::status_200($response);
}
public function handle($uniqid, $action)
{
if (!empty($uniqid) && !isset($this->consumers[$uniqid])) {
return;
}
switch ($action) {
case 'refresh':
break;
case 'restartall':
$this->killall(true);
break;
case 'stopall':
$this->killall();
break;
case 'stop':
$c = &$this->consumers[$uniqid];
if ($c->state != Consumer::RUNNING) break;
$c->state = Consumer::STOPING;
break;
case 'start':
$c = &$this->consumers[$uniqid];
if ($c->state == Consumer::RUNNING) break;
$c->state = Consumer::STARTING;
break;
case 'restart':
$c = &$this->consumers[$uniqid];
$c->state = Consumer::RESTART;
break;
case 'copy':
$c = $this->consumers[$uniqid];
$newC = clone $c;
$newC->uniqid = uniqid('C');
$newC->state = Consumer::NOMINAL;
$newC->pid = '';
$this->consumers[$newC->uniqid] = $newC;
break;
default:
break;
}
}
protected function killall($restart = false)
{
foreach ($this->consumers as &$c) {
$c->state = $restart ? Consumer::RESTART : Consumer::STOPING;
}
}}$cli = new Process();$cli->run();

Consumer消费者对象

<?php
require_once __DIR__ . '/BaseObject.php';class Consumer extends BaseObject{
/** 开启多少个消费者 */
public $numprocs = 1;
/** 当前配置的唯一标志 */
public $program;
/** 执行的命令 */
public $command;
/** 当前工作的目录 */
public $directory;
/** 通过 $qos $queueName $duplicate 生成的 $queue */
public $queue;
/** 程序执行日志记录 */
public $logfile = '';
/** 消费进程的唯一ID */
public $uniqid;
/** 进程IDpid */
public $pid;
/** 进程状态 */
public $state = self::NOMINAL;
/** 自启动 */
public $auto_restart = false;
public $process;
/** 启动时间 */
public $uptime;
const RUNNING = 'running';
const STOP = 'stoped';
const NOMINAL = 'nominal';
const RESTART = 'restart';
const STOPING = 'stoping';
const STARTING = 'stating';
const ERROR = 'error';
const BLOCKED = 'blocked';
const EXITED = 'exited';
const FATEL = 'fatel';}

stream相关代码:StreamConnection.php

<?php
class StreamConnection{
protected $socket;
protected $timeout = 2; //s
protected $client;
public function __construct($host)
{
$this->socket = $this->connect($host);
}
public function connect($host)
{
$socket = stream_socket_server($host, $errno, $errstr);
if (!$socket) {
exit('stream error');
}
stream_set_timeout($socket, $this->timeout);
stream_set_chunk_size($socket, 1024);
stream_set_blocking($socket, false);
$this->client = [$socket];
return $socket;
}
public function accept(Closure $callback)
{
$read = $this->client;
if (stream_select($read, $write, $except, 1) < 1) return;
if (in_array($this->socket, $read)) {
$cs = stream_socket_accept($this->socket);
$this->client[] = $cs;
}
foreach ($read as $s) {
if ($s == $this->socket) continue;
$header = fread($s, 1024);
if (empty($header)) {
$index = array_search($s, $this->client);
if ($index)
unset($this->client[$index]);
$this->close($s);
continue;
}
Http::parse_http($header);
$uniqid = isset($_GET['uniqid']) ? $_GET['uniqid'] : '';
$action = isset($_GET['action']) ? $_GET['action'] : '';
$response = $callback($uniqid, $action);
$this->write($s, $response);
$index = array_search($s, $this->client);
if ($index)
unset($this->client[$index]);
$this->close($s);
}
}
public function write($socket, $response)
{
$ret = fwrite($socket, $response, strlen($response));
}
public function close($socket)
{
$flag = fclose($socket);
}
public function getSocket()
{
return $this->socket;
}}

Http响应代码:Http.php

<?php
class Http{
public static $basePath = __DIR__ . '/views';
public static $max_age = 120; //秒
/*
*  函数:     parse_http
*  描述:     解析http协议
*/
public static function parse_http($http)
{
// 初始化
$_POST = $_GET = $_COOKIE = $_REQUEST = $_SESSION = $_FILES =  array();
$GLOBALS['HTTP_RAW_POST_DATA'] = '';
// 需要设置的变量名
$_SERVER = array(
'QUERY_STRING' => '',
'REQUEST_METHOD' => '',
'REQUEST_URI' => '',
'SERVER_PROTOCOL' => '',
'SERVER_SOFTWARE' => '',
'SERVER_NAME' => '',
'HTTP_HOST' => '',
'HTTP_USER_AGENT' => '',
'HTTP_ACCEPT' => '',
'HTTP_ACCEPT_LANGUAGE' => '',
'HTTP_ACCEPT_ENCODING' => '',
'HTTP_COOKIE' => '',
'HTTP_CONNECTION' => '',
'REMOTE_ADDR' => '',
'REMOTE_PORT' => '0',
'SCRIPT_NAME' => '',
'HTTP_REFERER' => '',
'CONTENT_TYPE' => '',
'HTTP_IF_NONE_MATCH' => '',
);
// 将header分割成数组
list($http_header, $http_body) = explode("\\r\\n\\r\\n", $http, 2);
$header_data = explode("\\r\\n", $http_header);
list($_SERVER['REQUEST_METHOD'], $_SERVER['REQUEST_URI'], $_SERVER['SERVER_PROTOCOL']) = explode(' ', $header_data[0]);
unset($header_data[0]);
foreach ($header_data as $content) {
// \\r\\n\\r\\n
if (empty($content)) {
continue;
}
list($key, $value) = explode(':', $content, 2);
$key = strtolower($key);
$value = trim($value);
switch ($key) {
case 'host':
$_SERVER['HTTP_HOST'] = $value;
$tmp = explode(':', $value);
$_SERVER['SERVER_NAME'] = $tmp[0];
if (isset($tmp[1])) {
$_SERVER['SERVER_PORT'] = $tmp[1];
}
break;
case 'cookie':
$_SERVER['HTTP_COOKIE'] = $value;
parse_str(str_replace('; ', '&', $_SERVER['HTTP_COOKIE']), $_COOKIE);
break;
case 'user-agent':
$_SERVER['HTTP_USER_AGENT'] = $value;
break;
case 'accept':
$_SERVER['HTTP_ACCEPT'] = $value;
break;
case 'accept-language':
$_SERVER['HTTP_ACCEPT_LANGUAGE'] = $value;
break;
case 'accept-encoding':
$_SERVER['HTTP_ACCEPT_ENCODING'] = $value;
break;
case 'connection':
$_SERVER['HTTP_CONNECTION'] = $value;
break;
case 'referer':
$_SERVER['HTTP_REFERER'] = $value;
break;
case 'if-modified-since':
$_SERVER['HTTP_IF_MODIFIED_SINCE'] = $value;
break;
case 'if-none-match':
$_SERVER['HTTP_IF_NONE_MATCH'] = $value;
break;
case 'content-type':
if (!preg_match('/boundary="?(\\S+)"?/', $value, $match)) {
$_SERVER['CONTENT_TYPE'] = $value;
} else {
$_SERVER['CONTENT_TYPE'] = 'multipart/form-data';
$http_post_boundary = '--' . $match[1];
}
break;
}
}
// script_name
$_SERVER['SCRIPT_NAME'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_PATH);
// QUERY_STRING
$_SERVER['QUERY_STRING'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_QUERY);
if ($_SERVER['QUERY_STRING']) {
// $GET
parse_str($_SERVER['QUERY_STRING'], $_GET);
} else {
$_SERVER['QUERY_STRING'] = '';
}
// REQUEST
$_REQUEST = array_merge($_GET, $_POST);
return array('get' => $_GET, 'post' => $_POST, 'cookie' => $_COOKIE, 'server' => $_SERVER, 'files' => $_FILES);
}
public static function status_404()
{
return <<<EOFHTTP/1.1 404 OK
content-type: text/htmlEOF;
}
public static function status_301($location)
{
return <<<EOFHTTP/1.1 301 Moved Permanently
Content-Length: 0
Content-Type: text/plain
Location: $locationCache-Control: no-cacheEOF;
}
public static function status_304()
{
return <<<EOFHTTP/1.1 304 Not Modified
Content-Length: 0EOF;
}
public static function status_200($response)
{
$contentType = $_SERVER['CONTENT_TYPE'];
$length = strlen($response);
$header = '';
if ($contentType)
$header = 'Cache-Control: max-age=180';
return <<<EOFHTTP/1.1 200 OK
Content-Type: $contentTypeContent-Length: $length$header$responseEOF;
}}

待执行的脚本:test.php

<?php
while(true) {
file_put_contents(__DIR__  .  '/test.log', date('Y-m-d H:i:s'));
sleep(1);}

在当前目录下的视图页面:
|- Process.php
|- Http.php
|- StreamConnection.php
|- Consumer.php
|- BaseObject.php
|- views/

更多编程相关知识,请访问:编程教学!!

关于PHP模拟supervisor的进程管理的文章就分享到这,如果对你有帮助欢迎继续关注我们哦

本文来自投稿,不代表重蔚自留地立场,如若转载,请注明出处https://www.cwhello.com/44691.html

如有侵犯您的合法权益请发邮件951076433@qq.com联系删除

(0)
php学习php学习订阅用户
上一篇 2022年6月27日
下一篇 2022年6月27日

相关推荐

  • 重蔚自留地php学习第45天——序列化-自定义自动加载-迭代

    回顾 面向对象三大特性 封装:隐藏数据实现,提供外部调用的方法 继承:实现代码的重用,提高效率 多态:方法的重载,PHP不支持多态   PHP继承:extends 如果一个类是用来被实例化的,那么尽可能的将内容私有…

    2019年1月11日 我php路线
    0366
  • php.ini与phpinfo()的用处详解

    phpinfophp -i 和 phpinfo() 可以展示出 phpinfo 信息,展示当前PHP环境的上下文信息;Compiler : PHP的编译器版本PHP Version: PHP版本Loaded Configuration File : 当前环境的PHP配置文件路径Thread Safety: 是否…

    2022年6月27日
    0276
  • PHP中Memcached缓存技术的实现和原理。

    Memcached是一种高速缓存系统,被广泛应用于Web服务器和其他需要缓存数据的场合。在PHP开发中,Memcached常用于提升应用程序的性能和优化数据库访问。本文将介绍Memcached缓存技术的实现和原理。一、Memcached的基…

    2023年5月21日
    00
  • PHP中的最佳ORM框架。

    随着Web应用程序的不断增长和复杂性的提高,数据访问层也变得越来越重要。ORM(Object Relational Mapping)框架已经成为现代Web应用程序的必备工具之一。ORM框架帮助开发者将数据存储和查询的复杂性抽象化,使得开发…

    2023年5月28日
    02
  • PHP与数据库日志管理的集成

    随着互联网技术的发展,越来越多的网站或应用程序需要在后台对数据库进行管理和维护。而在这个过程中,如何记录和分析日志是非常重要的一环。PHP作为最流行的Web开发语言之一,在日志记录方面也拥有着强大的功能。…

    2023年5月19日
    01
  • 使用PHP和Haskell进行函数式编程。

    随着互联网的发展,编程语言也随之不断地更新和完善。如今,各种编程语言层出不穷,而其中PHP和Haskell这两种编程语言,都在开发者中备受关注。PHP是一种十分流行的服务器端脚本语言,用于快速开发Web应用程序。PHP…

    2023年5月30日
    00
  • 经验分享宝塔面板 php。

    宝塔面板是一款方便的服务器管理工具,支持多种语言,包括PHP。它可以帮助用户轻松管理服务器、网站和数据库,提高运维效率。 宝塔面板轻松部署PHP项目完善解决困扰多年的问题 在互联网时代,越来越多的人开始接触…

    2024年7月6日
    00
  • PHP回调函数以及匿名函数用法与概念详解(基础篇)

    1、回调函数 PHP的回调函数其实和C、Java等语言的回调函数的作用是一模一样的,都是在主线程执行的过程中,突然跳去执行设置的回调函数;回调函数执行完毕之后,再回到主线程处理接下来的流程而在php调用回调函数,…

    2018年8月30日
    0295

联系我们

QQ:951076433

在线咨询:点击这里给我发消息邮件:951076433@qq.com工作时间:周一至周五,9:30-18:30,节假日休息