October 29, 2019 · Laravel PHP Design Pattern
PHP multithread development with Laravel 5+
Reference: Parallel Processing Multi tasking PHP
Thread
- Need to install PThread.
- PHP7 is not stable.
- PHP5.6 is ok. Github Step by Step Guide
- Need to recompile the PHP compiler and reinstall the -zts configuration.
Fork
- Need to enable pcntl_fork
- Not available in APACHE2 php gateway, due to the security reasons, even if you enabled the settings in php.ini, it is still not available to use.
- Only Available in PHP-CLI, try to create a daemon and listen the application interface call via file-contents exchange or database records update.
Example
Daemon Dispatcher
<?php
namespace App\Model\Crawler;
use Carbon\Carbon;
use App\Utility\Utility;
use App\Model\Crawler\Crawler;
class DaemonDispatcher
{
/**
* Singleton Instance.
*
* @var DaemonDispatcher
*/
protected static $instance = null;
/**
* Disable Constructor.
*
*/
protected function __construct(){}
/**
* Disable Clone.
*
*/
protected function __clone(){}
/**
* Disable Serialization.
*
*/
protected function __wakeup()
{
throw new Exception ('Cannot unserialize singleton.');
}
/**
* Job Dispatcher Instance.
*
* @return static::$instance
*/
public static function getInstance()
{
if (static::$instance === null)
{
static::$instance = new static();
}
return static::$instance;
}
/**
* Start to listen and control the Crawlers.
*
*/
public function listen()
{
$ut = new Utility();
$ut->info ('Daemon Dispatcher started to listen.');
while (true)
{
$crawlers = Crawler::where('isactivated',1)->get();
if (count($crawlers) > 0 )
{
$ut->info (count($crawlers) .' crawlers activated.');
}
foreach ($crawlers as $crawler)
{
if (!in_array($crawler->id,Crawler::$activatedCrawlersId))
{
$ut->info ('Activating #' . $crawler->id . ' Crawler');
$crawler->listen();
}
sleep(1);
}
sleep(1);
}
}
}
Async Crawler
<?php
namespace App\Model\Crawler;
use App\Utility\Thread;
use App\Utility\Utili\Utilityy;
use App\Model\Crawler\CrawlJob;
class AsyncCrawler extends Thread
{
/**
* A controlled CrawlJob for the AsyncCrawler.
*
* @var CrawlJob
*/
protected $crawlJob;
/**
* Construct a AsyncCrawler.
*
* @param CrawlJob $crawlJob A CrawlJob Object with completed data set(url, name, etc).
*/
public function __construct(CrawlJob $crawlJob)
{
$this->crawlJob = $crawlJob;
parent::__construct($crawlJob);
}
/**
* Set a CrawlJob into AsyncCrawler.
*
* @param CrawlJob $crawlJob A Completed CrawlJob Object.
*/
public function setCrawlJob(CrawlJob $crawlJob)
{
$this->crawlJob = $crawlJob;
}
/**
* Start the Crawling in Multithread.
*
* @throws \Exception Thread::COULD_NOT_FORK|Thread::FUNCTION_NOT_CALLABLE
*/
public function start()
{
$pid = @pcntl_fork();
if ( $pid == -1 )
{
$this->fatalError(Thread::COULD_NOT_FORK);
}
if ( $pid )
{
// Parent Process
$this->_pid = $pid;
} else {
// Child Process
pcntl_signal(SIGTERM, array( $this, 'handleSignal' ));
$this->crawlJob->crawl();
exit( 0 );
}
}
}
Threading
<?php
namespace App\Utility;
use \Exception;
/**
* Implements threading in PHP
*
* PHP version 5
*
* @category Threading
* @package None
* @author Tudor Barbu <[email protected]>
* @copyright 2009 MIT
* @license http://en.wikipedia.org/wiki/MIT_License MIT License
* @link http://blog.motane.lu/2009/01/02/multithreading-in-php/
*/
/**
* Thread class
*
* @category Threading
* @package None
* @author Tudor Barbu <[email protected]>
* @copyright 2009 MIT
* @license http://en.wikipedia.org/wiki/MIT_License MIT License
* @link http://blog.motane.lu/2009/01/02/multithreading-in-php/
*/
class Thread
{
const FUNCTION_NOT_CALLABLE = 10;
const COULD_NOT_FORK = 15;
/**
* Possible errors
*
* @var array
*/
private $_errors = array(
Thread::FUNCTION_NOT_CALLABLE => 'You must specify a valid function name that can be called from the current scope.',
Thread::COULD_NOT_FORK => 'pcntl_fork() returned a status of -1. No new process was created'
);
/**
* Callback for the function that should run as a separate thread
*
* @var callback
*/
protected $runnable;
/**
* Holds the current process id
*
* @var integer
*/
private $_pid;
/**
* Exits with error
*
* @return void
*/
private function fatalError($errorCode){
throw new Exception( $this->getError($errorCode) );
}
/**
* Checks if threading is supported by the current PHP configuration
*
* @return boolean
*/
public static function isAvailable()
{
$required_functions = array(
'pcntl_fork',
'pcntl_waitpid',
'pcntl_signal',
);
foreach ( $required_functions as $function ) {
if ( !function_exists($function) ) {
die ('Please enable ' . $function );
return false;
}
}
return true;
}
/**
* Class constructor - you can pass
* the callback function as an argument
*
* @param callback $runnable Callback reference
*/
public function __construct( $runnable = null )
{
if(!Thread::isAvailable() )throw new Exception("Threads not supported");
if ( $runnable !== null ) {
$this->setRunnable($runnable);
}
}
/**
* Sets the callback
*
* @param callback $runnable Callback reference
*
* @return callback
*/
public function setRunnable( $runnable )
{
if ( self::isRunnableOk($runnable) ) {
$this->runnable = $runnable;
} else {
$this->fatalError(Thread::FUNCTION_NOT_CALLABLE);
}
}
/**
* Gets the callback
*
* @return callback
*/
public function getRunnable()
{
return $this->runnable;
}
/**
* Checks if the callback is ok (the function/method
* is runnable from the current context)
*
* can be called statically
*
* @param callback $runnable Callback
*
* @return boolean
*/
public static function isRunnableOk( $runnable )
{
return true;
//return ( is_callable($runnable) ); Removed due to its too hard to refractor.
}
/**
* Returns the process id (pid) of the simulated thread
*
* @return int
*/
public function getPid()
{
return $this->_pid;
}
/**
* Checks if the child thread is alive
*
* @return boolean
*/
public function isAlive()
{
$pid = pcntl_waitpid($this->_pid, $status, WNOHANG);
return ( $pid === 0 );
}
/**
* Starts the thread, all the parameters are
* passed to the callback function
*
* @return void
*/
public function start()
{
$pid = @pcntl_fork();
if ( $pid == -1 ) {
$this->fatalError(Thread::COULD_NOT_FORK);
}
if ( $pid ) {
// parent
$this->_pid = $pid;
} else {
// child
pcntl_signal(SIGTERM, array( $this, 'handleSignal' ));
$arguments = func_get_args();
if ( !empty($arguments) ) {
call_user_func_array($this->runnable, $arguments);
} else {
call_user_func($this->runnable);
}
exit( 0 );
}
}
/**
* Attempts to stop the thread
* returns true on success and false otherwise
*
* @param integer $signal SIGKILL or SIGTERM
* @param boolean $wait Wait until child has exited
*
* @return void
*/
public function stop( $signal = SIGKILL, $wait = false )
{
if ( $this->isAlive() ) {
posix_kill($this->_pid, $signal);
if ( $wait ) {
pcntl_waitpid($this->_pid, $status = 0);
}
}
}
/**
* Alias of stop();
*
* @param integer $signal SIGKILL or SIGTERM
* @param boolean $wait Wait until child has exited
*
* @return void
*/
public function kill( $signal = SIGKILL, $wait = false )
{
return $this->stop($signal, $wait);
}
/**
* Gets the error's message based on its id
*
* @param integer $code The error code
*
* @return string
*/
public function getError( $code )
{
if ( isset( $this->_errors[$code] ) ) {
return $this->_errors[$code];
} else {
return "No such error code $code ! Quit inventing errors!!!";
}
}
/**
* Signal handler
*
* @param integer $signal Signal
*
* @return void
*/
protected function handleSignal( $signal )
{
switch( $signal ) {
case SIGTERM:
exit( 0 );
break;
}
}
}