Файл: gapps/vendor/laravel/framework/src/Illuminate/Queue/Listener.php
Строк: 291
<?php
namespace IlluminateQueue;
use Closure;
use SymfonyComponentProcessProcess;
use SymfonyComponentProcessProcessUtils;
use SymfonyComponentProcessPhpExecutableFinder;
class Listener
{
/**
* The command working path.
*
* @var string
*/
protected $commandPath;
/**
* The environment the workers should run under.
*
* @var string
*/
protected $environment;
/**
* The amount of seconds to wait before polling the queue.
*
* @var int
*/
protected $sleep = 3;
/**
* The amount of times to try a job before logging it failed.
*
* @var int
*/
protected $maxTries = 0;
/**
* The queue worker command line.
*
* @var string
*/
protected $workerCommand;
/**
* The output handler callback.
*
* @var Closure|null
*/
protected $outputHandler;
/**
* Create a new queue listener.
*
* @param string $commandPath
* @return void
*/
public function __construct($commandPath)
{
$this->commandPath = $commandPath;
$this->workerCommand = $this->buildWorkerCommand();
}
/**
* Build the environment specific worker command.
*
* @return string
*/
protected function buildWorkerCommand()
{
$binary = ProcessUtils::escapeArgument((new PhpExecutableFinder)->find(false));
if (defined('HHVM_VERSION')) {
$binary .= ' --php';
}
if (defined('ARTISAN_BINARY')) {
$artisan = ProcessUtils::escapeArgument(ARTISAN_BINARY);
} else {
$artisan = 'artisan';
}
$command = 'queue:work %s --queue=%s --delay=%s --memory=%s --sleep=%s --tries=%s';
return "{$binary} {$artisan} {$command}";
}
/**
* Listen to the given queue connection.
*
* @param string $connection
* @param string $queue
* @param string $delay
* @param string $memory
* @param int $timeout
* @return void
*/
public function listen($connection, $queue, $delay, $memory, $timeout = 60)
{
$process = $this->makeProcess($connection, $queue, $delay, $memory, $timeout);
while (true) {
$this->runProcess($process, $memory);
}
}
/**
* Run the given process.
*
* @param SymfonyComponentProcessProcess $process
* @param int $memory
* @return void
*/
public function runProcess(Process $process, $memory)
{
$process->run(function ($type, $line) {
$this->handleWorkerOutput($type, $line);
});
// Once we have run the job we'll go check if the memory limit has been
// exceeded for the script. If it has, we will kill this script so a
// process manager will restart this with a clean slate of memory.
if ($this->memoryExceeded($memory)) {
$this->stop();
}
}
/**
* Create a new Symfony process for the worker.
*
* @param string $connection
* @param string $queue
* @param int $delay
* @param int $memory
* @param int $timeout
* @return SymfonyComponentProcessProcess
*/
public function makeProcess($connection, $queue, $delay, $memory, $timeout)
{
$string = $this->workerCommand;
// If the environment is set, we will append it to the command string so the
// workers will run under the specified environment. Otherwise, they will
// just run under the production environment which is not always right.
if (isset($this->environment)) {
$string .= ' --env='.ProcessUtils::escapeArgument($this->environment);
}
// Next, we will just format out the worker commands with all of the various
// options available for the command. This will produce the final command
// line that we will pass into a Symfony process object for processing.
$command = sprintf(
$string,
ProcessUtils::escapeArgument($connection),
ProcessUtils::escapeArgument($queue),
$delay,
$memory,
$this->sleep,
$this->maxTries
);
return new Process($command, $this->commandPath, null, null, $timeout);
}
/**
* Handle output from the worker process.
*
* @param int $type
* @param string $line
* @return void
*/
protected function handleWorkerOutput($type, $line)
{
if (isset($this->outputHandler)) {
call_user_func($this->outputHandler, $type, $line);
}
}
/**
* Determine if the memory limit has been exceeded.
*
* @param int $memoryLimit
* @return bool
*/
public function memoryExceeded($memoryLimit)
{
return (memory_get_usage() / 1024 / 1024) >= $memoryLimit;
}
/**
* Stop listening and bail out of the script.
*
* @return void
*/
public function stop()
{
die;
}
/**
* Set the output handler callback.
*
* @param Closure $outputHandler
* @return void
*/
public function setOutputHandler(Closure $outputHandler)
{
$this->outputHandler = $outputHandler;
}
/**
* Get the current listener environment.
*
* @return string
*/
public function getEnvironment()
{
return $this->environment;
}
/**
* Set the current environment.
*
* @param string $environment
* @return void
*/
public function setEnvironment($environment)
{
$this->environment = $environment;
}
/**
* Get the amount of seconds to wait before polling the queue.
*
* @return int
*/
public function getSleep()
{
return $this->sleep;
}
/**
* Set the amount of seconds to wait before polling the queue.
*
* @param int $sleep
* @return void
*/
public function setSleep($sleep)
{
$this->sleep = $sleep;
}
/**
* Set the amount of times to try a job before logging it failed.
*
* @param int $tries
* @return void
*/
public function setMaxTries($tries)
{
$this->maxTries = $tries;
}
}