Swoole Process Pool

Manage a pool of Linux worker processes with Swoole. It can be used to manage a group of worker processes to process messages in MQ like Redis or sent from client side. Native Linux commands can also be executed by the pool of processes.

Example:

<?php

$workerNum = 10;
$pool = new Swoole\Process\Pool($workerNum);

$pool->on("WorkerStart", function ($pool, $workerId) {
    echo "Worker#{$workerId} is started\n";
    $redis = new Redis();
    $redis->pconnect('127.0.0.1', 6379);
    $key = "key1";
    while (true) {
         $msgs = $redis->brpop($key, 2);
         if ( $msgs == null) continue;
         var_dump($msgs);
     }
});

$pool->on("WorkerStop", function ($pool, $workerId) {
    echo "Worker#{$workerId} is stopped\n";
});

$pool->start();
Swoole\Process\Pool::__construct (int $worker_num, int $ipc_type = 0, int $msgqueue_key = 0, bool $enable_coroutine = false)

Create a pool of processes. There are 3 modes to communicate with and sending new task messages to the worker pool: with MQ like Redis or with system msg queue, or with Socket.

$ipc_type:

SWOOLE_IPC_NONE: disable IPC. SWOOLE_IPC_MSGQUEUE: use system msg queue for IPC, for adding new task message into the worker pool. SWOOLE_IPC_SOCKET: use Socket for IPC.

Swoole\Process\Pool->onWorkerStart(Swoole\Process\Pool $pool, int $workerId)

The event workerstart happens when the worker process or task worker process starts.

Swoole\Process\Pool->onWorkerStop(Swoole\Process\Pool $pool, int $workerId)
Swoole\Process\Pool->onMessage(Swoole\Process\Pool $pool, string $data)
Swooe\Process\Pool->listen(string $host, int $port = 0, int $backlog = 2048)
<?php
$pool->listen('127.0.0.1', 8089);
$pool->listen('unix:/tmp/php.sock');
Swoole\Process\Pool->write(string $data)

Send data back through IPC channel, example:

<?php
$pool = new Swoole\Process\Pool(2, SWOOLE_IPC_SOCKET);

$pool->on("Message", function ($pool, $message) {
    echo "Message: {$message}\n";
    $pool->write("hello ");
    $pool->write("world ");
    $pool->write("\n");
});

$pool->listen('127.0.0.1', 8089);
$pool->start();

Client side example:

<?php
$fp = stream_socket_client("tcp://127.0.0.1:8089", $errno, $errstr) or die("error: $errstr\n");
$msg = json_encode(['data' => 'hello', 'uid' => 1991]);
fwrite($fp, pack('N', strlen($msg)) . $msg);
sleep(1);
$data = fread($fp, 8192);
var_dump(substr($data, 4, unpack('N', substr($data, 0, 4))[1]));
fclose($fp);
Swoole\Process\Pool->start()

Start the process pool.

Swoole\Process\Pool->getProcess(int $worker_id)

Get the Linux process object, example:

<?php
$workerNum = 10;
$pool = new Swoole\Process\Pool($workerNum);

$pool->on("WorkerStart", function ($pool, $workerId) {
    $process = $pool->getProcess();
    $process->exec("/bin/sh", ["ls", '-l']);
});

$pool->on("WorkerStop", function ($pool, $workerId) {
    echo "Worker#{$workerId} is stopped\n";
});

$pool->start();