Documentation

WorkerPool
in package
implements Iterator, Countable

The Worker Pool class runs worker processes in parallel

Table of Contents

Interfaces

Iterator
Countable

Constants

CHILD_TIMEOUT_SEC  = 10
Default child timeout in seconds

Properties

$childProcessTitleFormat  : string
$parentPid  : int
$parentProcessTitleFormat  : string
$resultPosition  : int
$results  : array<string|int, mixed>
$semaphore  : Semaphore
$signals  : array<string|int, mixed>
$worker  : WorkerInterface
$workerProcesses  : ProcessDetailsCollection|array<string|int, ProcessDetails>
$child_timeout_sec  : int
$created  : bool
$currentWorkerIndex  : int
$initialPoolSize  : int
$respawnAutomatically  : bool
$workerPoolSize  : int

Methods

__construct()  : mixed
The constructor
__destruct()  : mixed
The destructor
clearResults()  : WorkerPool
Clear all the results
count()  : int
Countable Method count
countResults()  : int
How many results did we receive?
create()  : WorkerPool
Creates the worker pool (forks the children)
current()  : null|WorkerPoolResult
Iterator Method current
destroy()  : WorkerPool
Destroy the WorkerPool with all its children
disableSemaphore()  : WorkerPool
Disables the semaphore feature in the workerpool
exitPhp()  : mixed
Terminates the current process
getBusyWorkers()  : int
Returns the number of busy workers
getChildProcessTitleFormat()  : string
Returns the process title of the child
getFreeAndBusyWorkers()  : array<string|int, mixed>
Returns the number of busy and free workers
getFreeWorkers()  : int
Returns the number of free workers
getNextResult()  : null|WorkerPoolResult
Shifts the next result from the result queue
getParentProcessTitleFormat()  : string
Returns the process title of the parent
getSemaphore()  : null|Semaphore
Gets the Semaphore, that will be used within the worker processes
getWorkerPoolSize()  : int
Returns the current size of the worker pool
hasResults()  : bool
Is there any result available?
key()  : int
Iterator Method key
next()  : void
Iterator Method next()
onShutDown()  : mixed
This runs on shutdown to prevent the system from semaphore leaks
respawnAutomatically()  : WorkerPool
Respawn workers automatically if they died
rewind()  : void
Iterator Method rewind()
run()  : int
Sends the input to the next free worker process
setChildProcessTitleFormat()  : WorkerPool
Sets the process title of the child
setChildTimeoutSec()  : WorkerPool
Sets default child timeout in seconds.
setParentProcessTitleFormat()  : WorkerPool
Sets the process title of the parent
setSemaphore()  : WorkerPool
Sets the Semaphore, that will be used within the worker processes
setWorkerPoolSize()  : WorkerPool
Sets the current size of the worker pool
signalHandler()  : mixed
Receives signals
tryWaitForOneFreeWorker()  : bool
Trys to wait for one free worker within a fixed timeout
valid()  : bool
Iterator Method valid()
waitForAllWorkers()  : void
Waits for all children to finish their worker
waitForOneFreeWorker()  : void
Waits for one free worker
collectWorkerResults()  : mixed
Collects the results form the workers and processes any pending signals
getNextFreeWorker()  : ProcessDetails
Get the pid of the next free worker
reaper()  : mixed
Child process reaper
runWorkerProcess()  : mixed
Run the worker process
createWorker()  : mixed
Creates the worker
respawnIfRequired()  : mixed

Constants

CHILD_TIMEOUT_SEC

Default child timeout in seconds

public mixed CHILD_TIMEOUT_SEC = 10

Properties

$childProcessTitleFormat

protected string $childProcessTitleFormat = '%basename%: Worker %i% of %class% [%state%]'

process title of the children

$parentPid

protected int $parentPid = 0

id of the parent

$parentProcessTitleFormat

protected string $parentProcessTitleFormat = '%basename%: Parent'

process title of the parent

$resultPosition

protected int $resultPosition = 0

number of received results

$results

protected array<string|int, mixed> $results = array()

received results from the workers

$semaphore

protected Semaphore $semaphore

the semaphore, that is used to synchronizd tasks across all processes

$signals

protected array<string|int, mixed> $signals = array(SIGCHLD, SIGTERM, SIGHUP, SIGUSR1, SIGINT)

signals, that should be watched

$child_timeout_sec

private int $child_timeout_sec = self::CHILD_TIMEOUT_SEC

Default child timeout in seconds

$created

private bool $created = FALSE

is the pool created? (children forked)

$currentWorkerIndex

private int $currentWorkerIndex = 0

Current index for the last worker created in the pool

$initialPoolSize

private int $initialPoolSize

number of children initially in the pool

$respawnAutomatically

private bool $respawnAutomatically = false

Respawn dead workers automatically if set to TRUE

$workerPoolSize

private int $workerPoolSize = 2

number of children in the pool

Methods

__construct()

The constructor

public __construct() : mixed

__destruct()

The destructor

public __destruct() : mixed

countResults()

How many results did we receive?

public countResults() : int
Return values
int

the number of results

create()

Creates the worker pool (forks the children)

public create(WorkerInterface $worker) : WorkerPool

Please close all open resources before running this function. Child processes are going to close all open resources uppon exit, leaving the parent process behind with invalid resource handles.

Parameters
$worker : WorkerInterface

the worker, that runs future tasks

Tags
throws
RuntimeException
throws
WorkerPoolException
Return values
WorkerPool

destroy()

Destroy the WorkerPool with all its children

public destroy([int $maxWaitSecs = null ]) : WorkerPool
Parameters
$maxWaitSecs : int = null

a timeout to wait for the children, before killing them

Tags
throws
WorkerPoolException
Return values
WorkerPool

disableSemaphore()

Disables the semaphore feature in the workerpool

public disableSemaphore() : WorkerPool

Attention: You will lose the possibility to synchronize worker processes

Tags
throws
WorkerPoolException

in case the WorkerPool has already been created

throws
InvalidArgumentException

in case the semaphre hasn't been created

Return values
WorkerPool

exitPhp()

Terminates the current process

public exitPhp(int $code) : mixed
Parameters
$code : int

the exit code

getBusyWorkers()

Returns the number of busy workers

public getBusyWorkers() : int

PAY ATTENTION WHEN USING THIS FUNCTION WITH A SUBSEQUENT CALL OF getFreeWorkers(). IN THIS CASE THE SUM MIGHT NOT EQUAL TO THE CURRENT POOL SIZE. USE getFreeAndBusyWorkers() TO GET CONSISTENT RESULTS.

Return values
int

number of free workers

getChildProcessTitleFormat()

Returns the process title of the child

public getChildProcessTitleFormat() : string
Return values
string

the process title of the child

getFreeAndBusyWorkers()

Returns the number of busy and free workers

public getFreeAndBusyWorkers() : array<string|int, mixed>

This function collects all the information at once.

Return values
array<string|int, mixed>

with the keys 'free', 'busy', 'total'

getFreeWorkers()

Returns the number of free workers

public getFreeWorkers() : int

PAY ATTENTION WHEN USING THIS FUNCTION WITH A SUBSEQUENT CALL OF getBusyWorkers(). IN THIS CASE THE SUM MIGHT NOT EQUAL TO THE CURRENT POOL SIZE. USE getFreeAndBusyWorkers() TO GET CONSISTENT RESULTS.

Return values
int

number of free workers

getParentProcessTitleFormat()

Returns the process title of the parent

public getParentProcessTitleFormat() : string
Return values
string

the process title of the parent

getSemaphore()

Gets the Semaphore, that will be used within the worker processes

public getSemaphore() : null|Semaphore
Return values
null|Semaphore

$semaphore the Semaphore, that should be used for the workers

getWorkerPoolSize()

Returns the current size of the worker pool

public getWorkerPoolSize() : int

In case the pool hasn't yet been created, this method returns the value of the currently set pool size. In case of a created pool, this method reports the real pool size (number of alive worker processes).

Return values
int

the number of processes

hasResults()

Is there any result available?

public hasResults() : bool
Return values
bool

true, in case we have received some results

key()

Iterator Method key

public key() : int
Return values
int

returns the current key

next()

Iterator Method next()

public next() : void

onShutDown()

This runs on shutdown to prevent the system from semaphore leaks

public onShutDown() : mixed

respawnAutomatically()

Respawn workers automatically if they died

public respawnAutomatically([bool $respawn = true ]) : WorkerPool
Parameters
$respawn : bool = true
Return values
WorkerPool

rewind()

Iterator Method rewind()

public rewind() : void

run()

Sends the input to the next free worker process

public run(mixed $input) : int

This function blocks until a worker has finished its work. You can kill all child processes, so that the parent will be unblocked.

Parameters
$input : mixed

any serializable value

Tags
throws
WorkerPoolException
Return values
int

The PID of the processing worker process

setChildProcessTitleFormat()

Sets the process title of the child

public setChildProcessTitleFormat(string $string) : WorkerPool

Listing permitted replacments %i% The Child's Number %basename% The base name of PHPSELF %fullname% The value of PHPSELF %class% The Worker's Classname %state% The Worker's State

Parameters
$string : string

the process title of the child

Tags
throws
WorkerPoolException

in case the WorkerPool has already been created

throws
DomainException

in case the $string value is not within the permitted range

Return values
WorkerPool

setChildTimeoutSec()

Sets default child timeout in seconds.

public setChildTimeoutSec(int $child_timeout_sec) : WorkerPool
Parameters
$child_timeout_sec : int
Return values
WorkerPool

setParentProcessTitleFormat()

Sets the process title of the parent

public setParentProcessTitleFormat(string $string) : WorkerPool

Listing permitted replacments %basename% The base name of PHPSELF %fullname% The value of PHPSELF %class% The WorkerPool's Classname

Parameters
$string : string

the process title of the parent

Tags
throws
WorkerPoolException

in case the WorkerPool has already been created

throws
DomainException

in case the $string value is not within the permitted range

Return values
WorkerPool

setSemaphore()

Sets the Semaphore, that will be used within the worker processes

public setSemaphore(Semaphore $semaphore) : WorkerPool
Parameters
$semaphore : Semaphore

the Semaphore, that should be used for the workers

Tags
throws
WorkerPoolException

in case the WorkerPool has already been created

throws
InvalidArgumentException

in case the semaphre hasn't been created

Return values
WorkerPool

setWorkerPoolSize()

Sets the current size of the worker pool

public setWorkerPoolSize(int $size) : WorkerPool
Parameters
$size : int

the new worker pool size

Tags
throws
WorkerPoolException

in case the WorkerPool has already been created

throws
InvalidArgumentException

in case the $size value is not within the permitted range

Return values
WorkerPool

signalHandler()

Receives signals

public signalHandler(int $signo) : mixed

DO NOT MANUALLY CALL THIS METHOD! pcntl_signal_dispatch() will be calling this method.

Parameters
$signo : int

the signal number

Tags
see
pcntl_signal_dispatch
see
pcntl_signal

tryWaitForOneFreeWorker()

Trys to wait for one free worker within a fixed timeout

public tryWaitForOneFreeWorker([int $timeout = 10 ]) : bool

This function blocks until a worker has finished its work or the timeout has been reached. You can kill hanging child processes, so that the parent will be unblocked. Note: the run method already blocks until a free worker is available.

Parameters
$timeout : int = 10

the timeout in seconds

Return values
bool

true, in case there is a free worker or false, in case the timeout has been reached

valid()

Iterator Method valid()

public valid() : bool
Return values
bool

true = there is a pending result

waitForAllWorkers()

Waits for all children to finish their worker

public waitForAllWorkers() : void

This function blocks until every worker has finished its work. You can kill hanging child processes, so that the parent will be unblocked.

waitForOneFreeWorker()

Waits for one free worker

public waitForOneFreeWorker() : void

This function blocks until a worker has finished its work. You can kill hanging child processes, so that the parent will be unblocked. Note: the run method already blocks until a free worker is available.

collectWorkerResults()

Collects the results form the workers and processes any pending signals

protected collectWorkerResults([int $sec = 0 ]) : mixed
Parameters
$sec : int = 0

timeout to wait for new results from the workers

Tags
throws
WorkerPoolException

getNextFreeWorker()

Get the pid of the next free worker

protected getNextFreeWorker() : ProcessDetails

This function blocks until a worker has finished its work. You can kill all child processes, so that the parent will be unblocked.

Tags
throws
WorkerPoolException
Return values
ProcessDetails

the pid of the next free child

reaper()

Child process reaper

protected reaper([int $pid = -1 ]) : mixed
Parameters
$pid : int = -1

the process id

Tags
see
pcntl_waitpid

runWorkerProcess()

Run the worker process

protected runWorkerProcess(WorkerInterface $worker, SimpleSocket $simpleSocket, int $i) : mixed
Parameters
$worker : WorkerInterface

the worker, that runs the tasks

$simpleSocket : SimpleSocket

the simpleSocket, that is used for the communication

$i : int

the number of the child

createWorker()

Creates the worker

private createWorker(int $i) : mixed
Parameters
$i : int
Tags
throws
RuntimeException

respawnIfRequired()

private respawnIfRequired() : mixed
Loading…
On this page

Search results