Process Pool

Manage a pool of Linux worker processes with Swoole. It can be used to manage a group of worker processes to process messages in MQ like Redis or sent from client side. Native Linux commands can also be executed by the pool of processes.

Methods

  • Swoole\Process\Pool::__construct
  • Swoole\Process\Pool->set
  • Swoole\Process\Pool->on
  • Swoole\Process\Pool->getProcess
  • Swoole\Process\Pool->listen
  • Swoole\Process\Pool->start
  • Swoole\Process\Pool->shutdown

    Example:

    <?php
    
    $workerNum = 10;
    $pool = new Swoole\Process\Pool($workerNum);
    
    $pool->on("WorkerStart", function ($pool, $workerId) {
        echo "Worker#{$workerId} is started\n";
        $redis = new Redis();
        $redis->pconnect('127.0.0.1', 6379);
        $key = "key1";
        while (true) {
             $msgs = $redis->brpop($key, 2);
             if ( $msgs == null) continue;
             var_dump($msgs);
         }
    });
    
    $pool->on("WorkerStop", function ($pool, $workerId) {
        echo "Worker#{$workerId} is stopped\n";
    });
    
    $pool->start();
    
    Swoole\Process\Pool::__construct (int $worker_num, int $ipc_type = 0, int $msgqueue_key = 0, bool $enable_coroutine = false)

    Create a pool of processes. There are 3 modes to communicate with and sending new task messages to the worker pool: with MQ like Redis or with system msg queue, or with Socket.

    $ipc_type:

    SWOOLE_IPC_NONE: disable IPC. SWOOLE_IPC_MSGQUEUE: use system msg queue for IPC, for adding new task message into the worker pool. SWOOLE_IPC_SOCKET: use Socket for IPC.

    Swoole\Process\Pool->onWorkerStart(Swoole\Process\Pool $pool, int $workerId)

    The event workerstart happens when the worker process or task worker process starts.

    Swoole\Process\Pool->onWorkerStop(Swoole\Process\Pool $pool, int $workerId)
    Swoole\Process\Pool->onMessage(Swoole\Process\Pool $pool, string $data)
    Swooe\Process\Pool->listen(string $host, int $port = 0, int $backlog = 2048)
    <?php
    $pool->listen('127.0.0.1', 8089);
    $pool->listen('unix:/tmp/php.sock');
    
    Swoole\Process\Pool->write(string $data)

    Send data back through IPC channel, example:

    <?php
    $pool = new Swoole\Process\Pool(2, SWOOLE_IPC_SOCKET);
    
    $pool->on("Message", function ($pool, $message) {
        echo "Message: {$message}\n";
        $pool->write("hello ");
        $pool->write("world ");
        $pool->write("\n");
    });
    
    $pool->listen('127.0.0.1', 8089);
    $pool->start();
    

    Client side example:

    <?php
    $fp = stream_socket_client("tcp://127.0.0.1:8089", $errno, $errstr) or die("error: $errstr\n");
    $msg = json_encode(['data' => 'hello', 'uid' => 1991]);
    fwrite($fp, pack('N', strlen($msg)) . $msg);
    sleep(1);
    $data = fread($fp, 8192);
    var_dump(substr($data, 4, unpack('N', substr($data, 0, 4))[1]));
    fclose($fp);
    
    Swoole\Process\Pool->start()

    Start the process pool.

    Swoole\Process\Pool->getProcess(int $worker_id)

    Get the Linux process object, example:

    <?php
    $workerNum = 10;
    $pool = new Swoole\Process\Pool($workerNum);
    
    $pool->on("WorkerStart", function ($pool, $workerId) {
        $process = $pool->getProcess();
        $process->exec("/bin/sh", ["ls", '-l']);
    });
    
    $pool->on("WorkerStop", function ($pool, $workerId) {
        echo "Worker#{$workerId} is stopped\n";
    });
    
    $pool->start();