PHP MQTT Server

Latest version: pecl install openswoole-22.1.2 | composer require openswoole/core:22.1.5

A OpenSwoole MQTT Server is a TCP server with the MQTT protocol enabled in the server configuration.

The MQTT protocol is supported by both the OpenSwoole Server and Client. You can enable the MQTT protocol by setting open_mqtt_protocol to be true in the server configuration, see the example below.

The MQTT packages will be received at the Receive event callback function. Then you can parse the MQTT package based on your requirement and implement your own logic. A MQTT server is great for building Internet-of-Things (IoT) applications.

You can use the methods of OpenSwoole\Server within a MQTT Server.

MQTT Server Example

<?php
function decodeValue($data)
{
    return 256 * ord($data[0]) + ord($data[1]);
}

function decodeString($data)
{
    $length = decodeValue($data);

    return substr($data, 2, $length);
}

function mqttGetHeader($data)
{
    $byte = ord($data[0]);

    $header['type'] = ($byte & 0xF0) >> 4;
    $header['dup'] = ($byte & 0x08) >> 3;
    $header['qos'] = ($byte & 0x06) >> 1;
    $header['retain'] = $byte & 0x01;

    return $header;
}

function eventConnect($header, $data)
{
    $connect_info['protocol_name'] = decodeString($data);
    $offset = strlen($connect_info['protocol_name']) + 2;

    $connect_info['version'] = ord(substr($data, $offset, 1));
    $offset += 1;

    $byte = ord($data[$offset]);
    $connect_info['willRetain'] = ($byte & 0x20 == 0x20);
    $connect_info['willQos'] = ($byte & 0x18 >> 3);
    $connect_info['willFlag'] = ($byte & 0x04 == 0x04);
    $connect_info['cleanStart'] = ($byte & 0x02 == 0x02);
    $offset += 1;

    $connect_info['keepalive'] = decodeValue(substr($data, $offset, 2));
    $offset += 2;
    $connect_info['clientId'] = decodeString(substr($data, $offset));

    return $connect_info;
}

$server = new OpenSwoole\Server("127.0.0.1", 9501, OpenSwoole\Server::SIMPLE_MODE);

$server->set([
    'open_mqtt_protocol' => true,
    'worker_num' => 1,
]);

$server->on('Connect', function($server, $fd)
{
    echo "Client:Connect.\n";
});

$server->on('Receive', function($server, $fd, $fromId, $data)
{
    $header = mqttGetHeader($data);

    if ($header['type'] == 1)
    {
        $resp = chr(32) . chr(2) . chr(0) . chr(0);
        eventConnect($header, substr($data, 2));
        $server->send($fd, $resp);
    }
    else if ($header['type'] == 3)
    {
        $offset = 2;
        $topic = decodeString(substr($data, $offset));
        $offset += strlen($topic) + 2;
        $msg = substr($data, $offset);
        echo "client msg: $topic\n----------\n$msg\n";
    }

    echo "received length=" . strlen($data) . "\n";
});

$server->on('Close', function ($server, $fd)
{
    echo "Client: Close.\n";
});

$server->start();
Last updated on September 20, 2022