WorkerPool
in package
implements
Iterator, Countable
The Worker Pool class runs worker processes in parallel
Table of Contents
Interfaces
- Iterator
- Countable
Constants
- CHILD_TIMEOUT_SEC = 10
- Default child timeout in seconds
Properties
- $childProcessTitleFormat : string
- $parentPid : int
- $parentProcessTitleFormat : string
- $resultPosition : int
- $results : array<string|int, mixed>
- $semaphore : Semaphore
- $signals : array<string|int, mixed>
- $worker : WorkerInterface
- $workerProcesses : ProcessDetailsCollection|array<string|int, ProcessDetails>
- $child_timeout_sec : int
- $created : bool
- $currentWorkerIndex : int
- $initialPoolSize : int
- $respawnAutomatically : bool
- $workerPoolSize : int
Methods
- __construct() : mixed
- The constructor
- __destruct() : mixed
- The destructor
- clearResults() : WorkerPool
- Clear all the results
- count() : int
- Countable Method count
- countResults() : int
- How many results did we receive?
- create() : WorkerPool
- Creates the worker pool (forks the children)
- current() : null|WorkerPoolResult
- Iterator Method current
- destroy() : WorkerPool
- Destroy the WorkerPool with all its children
- disableSemaphore() : WorkerPool
- Disables the semaphore feature in the workerpool
- exitPhp() : mixed
- Terminates the current process
- getBusyWorkers() : int
- Returns the number of busy workers
- getChildProcessTitleFormat() : string
- Returns the process title of the child
- getFreeAndBusyWorkers() : array<string|int, mixed>
- Returns the number of busy and free workers
- getFreeWorkers() : int
- Returns the number of free workers
- getNextResult() : null|WorkerPoolResult
- Shifts the next result from the result queue
- getParentProcessTitleFormat() : string
- Returns the process title of the parent
- getSemaphore() : null|Semaphore
- Gets the Semaphore, that will be used within the worker processes
- getWorkerPoolSize() : int
- Returns the current size of the worker pool
- hasResults() : bool
- Is there any result available?
- key() : int
- Iterator Method key
- next() : void
- Iterator Method next()
- onShutDown() : mixed
- This runs on shutdown to prevent the system from semaphore leaks
- respawnAutomatically() : WorkerPool
- Respawn workers automatically if they died
- rewind() : void
- Iterator Method rewind()
- run() : int
- Sends the input to the next free worker process
- setChildProcessTitleFormat() : WorkerPool
- Sets the process title of the child
- setChildTimeoutSec() : WorkerPool
- Sets default child timeout in seconds.
- setParentProcessTitleFormat() : WorkerPool
- Sets the process title of the parent
- setSemaphore() : WorkerPool
- Sets the Semaphore, that will be used within the worker processes
- setWorkerPoolSize() : WorkerPool
- Sets the current size of the worker pool
- signalHandler() : mixed
- Receives signals
- tryWaitForOneFreeWorker() : bool
- Trys to wait for one free worker within a fixed timeout
- valid() : bool
- Iterator Method valid()
- waitForAllWorkers() : void
- Waits for all children to finish their worker
- waitForOneFreeWorker() : void
- Waits for one free worker
- collectWorkerResults() : mixed
- Collects the results form the workers and processes any pending signals
- getNextFreeWorker() : ProcessDetails
- Get the pid of the next free worker
- reaper() : mixed
- Child process reaper
- runWorkerProcess() : mixed
- Run the worker process
- createWorker() : mixed
- Creates the worker
- respawnIfRequired() : mixed
Constants
CHILD_TIMEOUT_SEC
Default child timeout in seconds
public
mixed
CHILD_TIMEOUT_SEC
= 10
Properties
$childProcessTitleFormat
protected
string
$childProcessTitleFormat
= '%basename%: Worker %i% of %class% [%state%]'
process title of the children
$parentPid
protected
int
$parentPid
= 0
id of the parent
$parentProcessTitleFormat
protected
string
$parentProcessTitleFormat
= '%basename%: Parent'
process title of the parent
$resultPosition
protected
int
$resultPosition
= 0
number of received results
$results
protected
array<string|int, mixed>
$results
= array()
received results from the workers
$semaphore
protected
Semaphore
$semaphore
the semaphore, that is used to synchronizd tasks across all processes
$signals
protected
array<string|int, mixed>
$signals
= array(SIGCHLD, SIGTERM, SIGHUP, SIGUSR1, SIGINT)
signals, that should be watched
$worker
protected
WorkerInterface
$worker
the worker class, that is used to run the tasks
$workerProcesses
protected
ProcessDetailsCollection|array<string|int, ProcessDetails>
$workerProcesses
Collection of the worker processes
$child_timeout_sec
private
int
$child_timeout_sec
= self::CHILD_TIMEOUT_SEC
Default child timeout in seconds
$created
private
bool
$created
= FALSE
is the pool created? (children forked)
$currentWorkerIndex
private
int
$currentWorkerIndex
= 0
Current index for the last worker created in the pool
$initialPoolSize
private
int
$initialPoolSize
number of children initially in the pool
$respawnAutomatically
private
bool
$respawnAutomatically
= false
Respawn dead workers automatically if set to TRUE
$workerPoolSize
private
int
$workerPoolSize
= 2
number of children in the pool
Methods
__construct()
The constructor
public
__construct() : mixed
__destruct()
The destructor
public
__destruct() : mixed
clearResults()
Clear all the results
public
clearResults() : WorkerPool
Return values
WorkerPoolcount()
Countable Method count
public
count() : int
Tags
Return values
int —the number of results
countResults()
How many results did we receive?
public
countResults() : int
Return values
int —the number of results
create()
Creates the worker pool (forks the children)
public
create(WorkerInterface $worker) : WorkerPool
Please close all open resources before running this function. Child processes are going to close all open resources uppon exit, leaving the parent process behind with invalid resource handles.
Parameters
- $worker : WorkerInterface
-
the worker, that runs future tasks
Tags
Return values
WorkerPoolcurrent()
Iterator Method current
public
current() : null|WorkerPoolResult
Return values
null|WorkerPoolResult —gets the current result
destroy()
Destroy the WorkerPool with all its children
public
destroy([int $maxWaitSecs = null ]) : WorkerPool
Parameters
- $maxWaitSecs : int = null
-
a timeout to wait for the children, before killing them
Tags
Return values
WorkerPooldisableSemaphore()
Disables the semaphore feature in the workerpool
public
disableSemaphore() : WorkerPool
Attention: You will lose the possibility to synchronize worker processes
Tags
Return values
WorkerPoolexitPhp()
Terminates the current process
public
exitPhp(int $code) : mixed
Parameters
- $code : int
-
the exit code
getBusyWorkers()
Returns the number of busy workers
public
getBusyWorkers() : int
PAY ATTENTION WHEN USING THIS FUNCTION WITH A SUBSEQUENT CALL OF getFreeWorkers(). IN THIS CASE THE SUM MIGHT NOT EQUAL TO THE CURRENT POOL SIZE. USE getFreeAndBusyWorkers() TO GET CONSISTENT RESULTS.
Return values
int —number of free workers
getChildProcessTitleFormat()
Returns the process title of the child
public
getChildProcessTitleFormat() : string
Return values
string —the process title of the child
getFreeAndBusyWorkers()
Returns the number of busy and free workers
public
getFreeAndBusyWorkers() : array<string|int, mixed>
This function collects all the information at once.
Return values
array<string|int, mixed> —with the keys 'free', 'busy', 'total'
getFreeWorkers()
Returns the number of free workers
public
getFreeWorkers() : int
PAY ATTENTION WHEN USING THIS FUNCTION WITH A SUBSEQUENT CALL OF getBusyWorkers(). IN THIS CASE THE SUM MIGHT NOT EQUAL TO THE CURRENT POOL SIZE. USE getFreeAndBusyWorkers() TO GET CONSISTENT RESULTS.
Return values
int —number of free workers
getNextResult()
Shifts the next result from the result queue
public
getNextResult() : null|WorkerPoolResult
Return values
null|WorkerPoolResult —gets the next result
getParentProcessTitleFormat()
Returns the process title of the parent
public
getParentProcessTitleFormat() : string
Return values
string —the process title of the parent
getSemaphore()
Gets the Semaphore, that will be used within the worker processes
public
getSemaphore() : null|Semaphore
Return values
null|Semaphore —$semaphore the Semaphore, that should be used for the workers
getWorkerPoolSize()
Returns the current size of the worker pool
public
getWorkerPoolSize() : int
In case the pool hasn't yet been created, this method returns the value of the currently set pool size. In case of a created pool, this method reports the real pool size (number of alive worker processes).
Return values
int —the number of processes
hasResults()
Is there any result available?
public
hasResults() : bool
Return values
bool —true, in case we have received some results
key()
Iterator Method key
public
key() : int
Return values
int —returns the current key
next()
Iterator Method next()
public
next() : void
onShutDown()
This runs on shutdown to prevent the system from semaphore leaks
public
onShutDown() : mixed
respawnAutomatically()
Respawn workers automatically if they died
public
respawnAutomatically([bool $respawn = true ]) : WorkerPool
Parameters
- $respawn : bool = true
Return values
WorkerPoolrewind()
Iterator Method rewind()
public
rewind() : void
run()
Sends the input to the next free worker process
public
run(mixed $input) : int
This function blocks until a worker has finished its work. You can kill all child processes, so that the parent will be unblocked.
Parameters
- $input : mixed
-
any serializable value
Tags
Return values
int —The PID of the processing worker process
setChildProcessTitleFormat()
Sets the process title of the child
public
setChildProcessTitleFormat(string $string) : WorkerPool
Listing permitted replacments %i% The Child's Number %basename% The base name of PHPSELF %fullname% The value of PHPSELF %class% The Worker's Classname %state% The Worker's State
Parameters
- $string : string
-
the process title of the child
Tags
Return values
WorkerPoolsetChildTimeoutSec()
Sets default child timeout in seconds.
public
setChildTimeoutSec(int $child_timeout_sec) : WorkerPool
Parameters
- $child_timeout_sec : int
Return values
WorkerPoolsetParentProcessTitleFormat()
Sets the process title of the parent
public
setParentProcessTitleFormat(string $string) : WorkerPool
Listing permitted replacments %basename% The base name of PHPSELF %fullname% The value of PHPSELF %class% The WorkerPool's Classname
Parameters
- $string : string
-
the process title of the parent
Tags
Return values
WorkerPoolsetSemaphore()
Sets the Semaphore, that will be used within the worker processes
public
setSemaphore(Semaphore $semaphore) : WorkerPool
Parameters
- $semaphore : Semaphore
-
the Semaphore, that should be used for the workers
Tags
Return values
WorkerPoolsetWorkerPoolSize()
Sets the current size of the worker pool
public
setWorkerPoolSize(int $size) : WorkerPool
Parameters
- $size : int
-
the new worker pool size
Tags
Return values
WorkerPoolsignalHandler()
Receives signals
public
signalHandler(int $signo) : mixed
DO NOT MANUALLY CALL THIS METHOD! pcntl_signal_dispatch() will be calling this method.
Parameters
- $signo : int
-
the signal number
Tags
tryWaitForOneFreeWorker()
Trys to wait for one free worker within a fixed timeout
public
tryWaitForOneFreeWorker([int $timeout = 10 ]) : bool
This function blocks until a worker has finished its work or the timeout has been reached. You can kill hanging child processes, so that the parent will be unblocked. Note: the run method already blocks until a free worker is available.
Parameters
- $timeout : int = 10
-
the timeout in seconds
Return values
bool —true, in case there is a free worker or false, in case the timeout has been reached
valid()
Iterator Method valid()
public
valid() : bool
Return values
bool —true = there is a pending result
waitForAllWorkers()
Waits for all children to finish their worker
public
waitForAllWorkers() : void
This function blocks until every worker has finished its work. You can kill hanging child processes, so that the parent will be unblocked.
waitForOneFreeWorker()
Waits for one free worker
public
waitForOneFreeWorker() : void
This function blocks until a worker has finished its work. You can kill hanging child processes, so that the parent will be unblocked. Note: the run method already blocks until a free worker is available.
collectWorkerResults()
Collects the results form the workers and processes any pending signals
protected
collectWorkerResults([int $sec = 0 ]) : mixed
Parameters
- $sec : int = 0
-
timeout to wait for new results from the workers
Tags
getNextFreeWorker()
Get the pid of the next free worker
protected
getNextFreeWorker() : ProcessDetails
This function blocks until a worker has finished its work. You can kill all child processes, so that the parent will be unblocked.
Tags
Return values
ProcessDetails —the pid of the next free child
reaper()
Child process reaper
protected
reaper([int $pid = -1 ]) : mixed
Parameters
- $pid : int = -1
-
the process id
Tags
runWorkerProcess()
Run the worker process
protected
runWorkerProcess(WorkerInterface $worker, SimpleSocket $simpleSocket, int $i) : mixed
Parameters
- $worker : WorkerInterface
-
the worker, that runs the tasks
- $simpleSocket : SimpleSocket
-
the simpleSocket, that is used for the communication
- $i : int
-
the number of the child
createWorker()
Creates the worker
private
createWorker(int $i) : mixed
Parameters
- $i : int
Tags
respawnIfRequired()
private
respawnIfRequired() : mixed