Websockets, ZMQ и Go

21 minute read

Риалтайм - это стильно, модно, молодежно. Самый риалтаймистый риалтайм - это вебсокеты. Для работы с ними уже много всего написанного на PHP, Nodejs и Python. Тот же Ratchet, к примеру. Однако, нам нужны серьезные RPS и тут PHP нам не подходит. Кроме того, приложение на Go будет с уже встроенным вебсервером, а, например, с PHP нам придется что-то выдумывать с nginx, или (не дай Бог) запускать PHP как демона. Сами понимаете - PHP не тру вей для написания долгоработающих приложений.

Таким образом, выходит что Go один из лучших кандидатов для написания вебсокет-сервиса. К тому же, есть уже много готовых пакетов, реализующих поддержку вебсокетов в Go.

Но как написать стабильное приложение, при условии что часть проекта уже написана на PHP? Например, нам нужно реализовать чат для пользователей, зарегистрированных на сайте, или в реальном времени отображать сообщения на странице. Для этого нам нужен некоторый механизм взаимодействия между PHP кодом и Go сервисом. В этом случае нам нужно продумать надежный механизм взаимодействия между частями нашего приложения.

Бэкенд

Начинаем с создания простого http сервера. Все что нам нужно есть в коробочной поставке самого Go.

http.HandleFunc("/ws", handler)
err = http.ListenAndServe(wsaddr, nil)
if err != nil {
    log.Fatal("ListenAndServe: ", err)
}

handler - Это хендлер, который нам нужно реализовать для соединения с вебсокетом. wsaddr - url, на который будут вешаться соединения. Этот url, в нашем случае, берется из конфига.

Примеров и статей по написанию веб сервера на Go великое множество, не политесь нагуглить, если у вас что-то не получилось.

ZMQ

Есть множество вариантов взаимодействия между Go сервисом и PHP кодом. Например, мы можем писать сообщения в базу, а затем читать от туда в Go через определенные промежутки времени. Не самый быстрый способ. Еще один вариант - написать tcp клиент/серверное приложение, однако, написать действительно надежное приложение тот еще челендж. В конце концов, можем взаимодействовать через REST API, но опят же, большой скорости тут не стоит ожидать.

К счастью, все уже написано до нас. Нам достаточно воспользоваться одним из вариантов очередей, например ZMQ. Для своих нужд вы можете выбрать что ни будь другое, например RabitMQ. В моем случае ZMQ удовлетворяет всем требованиям и есть один большой плюс - для PHP реализованна нативная поддержка в виде сишного расширения.

Работа с ZMQ в Go не сложнее чем написание http сервера. Создаем новый сокет, подключаемся по адресу из конфига(zmqaddr) и подписываемся на сообщения от транспорта. zmqsubject - это название канала, по которому будут приходить сообщения, берется из конфига.

responder, _ = zmq.NewSocket(zmq.SUB)
defer responder.Close()
responder.Connect(zmqaddr)
responder.SetSubscribe(zmqsubject)

Устанавливаем соединение и ждем собщения от брокера. Для работы с вебсокетами используем пакет websocket из gorillatoolkit.

Хендлер, который обслуживает вебсокет-соединение будет выглядеть так:

func handler(w http.ResponseWriter, r *http.Request) {

    conn, err := websocket.Upgrade(w, r, nil, 1024, 1024)
    if _, ok := err.(websocket.HandshakeError); ok {
        http.Error(w, "Not a websocket handshake", 400)
        return
    } else if err != nil {
        log.Println(err)
        return
    }

    log.Println("Start...")
    for {
        msg, _ := responder.RecvMessage(0)
        log.Println("Received ", msg)    
        if err := conn.WriteMessage(websocket.TextMessage, []byte(msg[1])); err != nil {
            log.Println(err)
            return
        }
    }
}

Начинаем с создания веб-сокета. Вызов websocket.Upgrade апгрейдит текущее соединение, переводит его на уровень работы с вебсокетами:

conn, err := websocket.Upgrade(w, r, nil, 1024, 1024)

Это как бы рукопожатие между клиентом и сервером. Более подробно о протоколе можно почитать в той же википедии.

Далее создается бесконечный цикл, который блокируется в ожидании получения сообщения из ZMQ очереди.

msg, _ := responder.RecvMessage(0)

Как только в очередь приходит новое сообщение, тут же отдаем его в вебсокет.

if err := conn.WriteMessage(websocket.TextMessage, []byte(msg[1])); err != nil {
    log.Println(err)
    return
}

Кроме все этого, рядом с приложением лежит файлик config.yml, который парситься приложением. Настройки очень банальные и понятные

Кстати, для тестирования нашего приложения можно использовать инструмент telsocket.org. Это как телнет, только для вебсокетов.

Конфиг

Как вы успели заметить, много параметров в коде задается с помощью конфига. Это обычный YAML файл, который лежит в корне нашего приложения config.yaml. Для работы с ним используется пакет github.com/kylelemons/go-gypsy/yaml

file := "config.yaml"
config, err := yaml.ReadFile(file)
if err != nil {
    log.Fatal("Error load config", err)
}

wsaddr, _ = config.Get("ws.address")
zmqaddr, _ = config.Get("zmq.address")
zmqsubject, _ = config.Get("zmq.subject")  

В этом примере ошибки игнорируются. Не забывайте их проверять в своих реальных приложениях.

JS Клиент

Окей, с бекендом мы справились. Давайте теперь напишем клиентский код, который будет получать сообщения и отображать их на HTML страничке.

В качестве примера будем использовать API браузера, в моем случае хрома. Конечно, работать это будет только в нормальных браузерах. Для поддержки более старых версий используйте дополнительные библиотеки и инструменты.

Создадим файл index.html:

try {
    var sock = new WebSocket("ws://localhost:8080/ws");
    console.log("Websocket - status: " + sock.readyState);
    sock.onopen = function(m) { 
        console.log("CONNECTION opened..." + this.readyState);
    }

    sock.onmessage = function(m) { 
        console.log("message: " + m.data);
    }

    sock.onerror = function(m) {
        console.log("Error occured sending..." + m.data);
    }

    sock.onclose = function(m) { 
        console.log("Disconnected - status " + this.readyState);
    }
} catch(exception) {
    console.log(exception);
}

Создаем вебсокет-соединение, подключаемся к серверу.

var sock = new WebSocket("ws://localhost:8080/ws");

Все приходящие соединения будут в консоле:

sock.onmessage = function(m) { 
    console.log("message: " + m.data);
}

Для тестирования можно просто запустить встроенный PHP сервер и посмотреть на наш файлик index.html

$ php -S localhost:12345

PHP Приложение.

В конце концов, перейдем к тому ради чего все это затевалось. Давайте напишем пример минимального PHP приложения, которое отправляет сообщения в очередь.

Расширение для ZMQ нативное и его можно установить из pecl репозиториев.

Инструкция по установке есть на cайте ZMQ.

Максимально простое PHP приложение, с использование очереди выглядит как то так:

<?php

if (count($argv) < 2) {
    echo "usage: php ./zmq.php <message>\n";
}

$message = $argv[1];

$context = new ZMQContext();
$socket = $context->getSocket(ZMQ::SOCKET_PUB, 'message');
$socket->bind("tcp://127.0.0.1:5563");

echo "Sending: ".$message."\n";

$socket->sendmulti(array("message", $message));
?>

Обратите внимание, как мы отправляем сообщение:

$socket->sendmulti(array("message", $message));

Первый элемент массива строка message - тема сообщения. Когда мы в Go подписываемся на получение сообщений, то указываем zmqsubject, значение которой и есть строка message. Таким образом, мы будем получать только сообщения с правильно указанной темой.

Использовать режим SOCKET_PUB не самый хороший вариант, но для начала сойдет.

На этом можно закончить этот небольшой обзор вебсокетов и очередей. Экспериментируйте с очередями и вебсокетами в своих приложениях и делитесь опытом.

Исходники к статье можно найти на github.

Почитать