Свой memcached. Часть 1

60 minute read

Сайты становятся все больше, информация уже не влазит на терабайтные харды, а пользователи еще более требовательны к задержкам и не готовы ждать даже пары секунд ответа от сервера. К нам на помощь спешат распределенные системы обработки информации.

Распеределенщина и все такое - это целый неизведанный мир, со своими законами. Нахрапам не возьмешь. Но нужно с чего-то начинать. Например, с отличного курса лекций.

Важной частью высоко нагруженных систем является кеширование. И очень часто, это кеш делается распределенным. Используется несколько серверов, на которых сохраняются кешированные данные.

Memcached - одни из самых ярких представителей подобных систем кеширования. Это прекрасный продукт, который работает и достаточно неплохо справляется со своими задачами.

И, если хочется разобраться как все устроено, то самый верный способ - это написать свой велосипед. Конечно, вряд ли наша поделка сможет так сразу тягаться с memcached. Но несколько практически примеров будут очень к стати.

Простейший сервер

Начнем с написания просто tcp сервера к которому можно обратиться по telnet. Такие сервера на Go с использованием пакета net пишутся просто замечательно.

package main

import (
    "log"
    "net"
)

func main() {
    ln, err := net.Listen("tcp", ":11212")
    if err != nil {
        log.Println(err)
    }
    for {
        conn, err := ln.Accept()
        if err != nil {
            log.Println(err)
        }
        go connectionHandler(conn)
    }
}

func connectionHandler(conn net.Conn) {
    buf := make([]byte, 1024)
    _, err := conn.Read(buf)
    if err != nil {
        fmt.Println("Error reading:", err.Error())
    }
    conn.Write([]byte("Message received."))
    conn.Close()
}

Это очень простой сервер, который висит на 11212 порту, ждет когда мы ему что-то отправим. При получении сообщения отправляет нам в ответ "Message received." и закрывает соединение.

    ln, err := net.Listen("tcp", ":11212")

Биндимся на порт под номером 11212.

for {
    conn, err := ln.Accept()
    if err != nil {
        log.Println(err)
    }
    go connectionHandler(conn)
}

В бесконечном цикле ждем подключения. При новом подключении ln.Accept() будет создавать новое соединение conn net.Conn и функция connectionHandler(conn) будет запушена в новой go-рутине. Обратите внимание, что запуск новых рутин обеспечивает конкурентный доступ к нашему серверу.

После подключения клиента, начинает работать функция connectionHandler:

buf := make([]byte, 1024)
_, err := conn.Read(buf)
if err != nil {
    fmt.Println("Error reading:", err.Error())
}

В этой функции создается буфер, затем в него сохраняются данные переданные от клиента.

conn.Write([]byte("Message received."))
conn.Close()

После получения сообщения от клиента, нас сервер сам шлет свое сообщение "Message received." и закрывает соединение.

Связаться с нашим сервером можно по telnet:

$ telnet localhost 11212
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hi
Message received.
Connection closed by foreign host.

Telnet пригодится нам для дальнейшего тестирования. По крайней мере, какое-то время, пока у нас не появится свой клиент.

Протокол

Раз уж мы взялись писать аналог memcached, то давайте постараемся реализовать его протокол. К 100% совместимости мы стремится не будем, но постараемся максимально приближено реализовать большинство команд.

  • get - Чтение из кэша get mysimplekey
  • set - Запись в кэш. Пишет не проверяя есть ли в кэше значение с этим ключом: set mysimplekey 0 100 50
  • add - Запись в кэш. Записывает только тогда, когда в кеше нет значений с таким ключом: add newkey 0 20 50
  • replace - Замена значения записи в кеше. Работает по принципу add: replace key 0 100 50
  • append - Записать в кеш по указанному ключу данные, перед уже находящимися там данными: append key 0 100 155
  • prepend - Записать в кеш по указанному ключу данные, после уже находящихся там данных prepend key 0 60 15
  • incr - Увеличивает числовое значение ключа на указанную величину: incr mykey 2
  • decr - Уменьшает числовое значение ключа на указанную величину: decr mykey 5
  • delete - Удаляет значение из кеша по ключу: delete mykey
  • flush_all - Производит инвалидацю все записей в кэше: flush_all. Отложенная инвалидация на указанное кол-во секунд: flush_all 900.
  • stats - Выводит общую статистику: stats. Статистика использования памяти: stats slabs. Еще одна статистика использования памяти: stats malloc Можно посмотреть список элементов в кеше: stats items, stats detail [on|off|dump], stats sizes. Сброс статистики stats reset
  • version - Показывает версию memcached: version
  • verbosity - Установка уровня детализации логирования: verbosity
  • quit - Закрыть телнет сессию: quit

Полное описание протокола можно найти на github

Стоить заметить, что если нам не обязательна поддержка memcahced протокола и все клиенты будут написаны на Go, то можно было бы реализовать более специфический протокол. Более того, мы могли бы не заморачиваться над текстовым протоколом,а придумать что то более в стиле Go. Пример такого протокола рассмотрим в конце серии.

Основные команды

Давайте начнем с реализации двух базовых команд: get и set. Заодно, определимся с принципами работы.

Нужно научится разделать команды. Для этого в func connectionHandler(conn net.Conn) у нас будет бесконечный цикл, в котором будут читаться данные из соединения. Эти данные будут проверятся на наличие команд.

func connectionHandler(conn net.Conn) {
    for {
        command, err := bufio.NewReader(conn).ReadString('\n')
        if err != nil {
            if err == io.EOF {
                log.Println("Error io.EOF", err)
                break
            } else {
                log.Println("Error reading:", err)
            }
        }
        if strings.HasPrefix(command, "set") {
            // выполняем команду set
        }
        if strings.HasPrefix(command, "get") {
            // выполняем команду get
        }
    }
}

bufio.NewReader(conn).ReadString('\n') постоянно пытаться прочитать строку из соединения до символа '\n'. В зависимости от того как начинается эта строка будет срабатывать один из ифов. Давайте допишем немного кода, чтоб сервер возвращал данные в соединение сигнализируя о сработавшей команде.

if strings.HasPrefix(command, "set") {
    conn.Write([]byte("answer: set command\n"))
}

if strings.HasPrefix(command, "get") {
    conn.Write([]byte("answer: get command\n"))
}

Теперь попробуем подключится по telnet и выполнить пару команд:

telnet localhost 11212
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
set key
answer: set command
get key
answer: get command

set

Принцип работы с соединением должен быть ясен. Теперь можно описывать наши команды. Начнем с того, что команда должна получить и сохранить некоторую информацию - ключ и сами данные. Для команды установки кеша(set) можно определить такую структуру:

// Структура для команды set
type SetCommand struct {
    Key     string
    Length  int
    Text    string
    Conn    net.Conn
}
  • Key - Ключ по которому будет сохраняться ключ
  • Length - Количество данных в байтах, которые нужно сохранить
  • Text - Строка самой команды
  • Conn - Объект соединения. В рамках выполнения команды необходимо будет отправлять ответы пользователю.

Со временем мы будем добавлять новые параметры команды и, соответственно, новые поля в структуре.

Всю логику этой команды можно реализовать внутри метода Run().

func (s *SetCommand) Run() {
    err := ParseTextCommand(s.Text, s, func() error {
        _, err := fmt.Sscanf(s.Text, "set %s %d\n", &s.Key, &s.Length)
        return err
    })

    if err == nil {
        data, _ := bufio.NewReader(s.Conn).ReadBytes('\n')
        storage[s.Key] = data
        s.Conn.Write([]byte("STORED\n"))
    }
}

Переменная storage - это переменная в глобальной области видимости которая имеет тип map[string][]byte. По сути - это хранилище для нашего кеша. Она инициализируется в функции init() нашего модуля.

var storage map[string][]byte

func init() {
    storage = make(map[string][]byte)
}

Чтобы со всеми командами можно было единообразно работать нужно определить общий интерфейс. У всех наших команд будет как минимум метод Run(). А значит нам подойдет интерфейс вида:

type Command interface {
    Run()
}

Теперь мы можем использовать *SetCommand как экземпляр с интерфейсом Command. Обратите внимание, что я написал именно указатель, так как в нашем случае метод Run() определен именно для указателя. Если мы создадим экземпляр команды таким образом:

set := SetCommand{
    Text: command,
    Conn: conn,
}

То не сможем его использовать как интерфейс Command. Нужно получить именно указатель на структуру а не саму структуру:

set := &SetCommand{
    Text: command,
    Conn: conn,
}

Обратите внимание на выражение:

err := ParseTextCommand(s.Text, s, func() error {
    _, err := fmt.Sscanf(s.Text, "set %s\n", &s.Key)
    return err
})

ParseTextCommand - это вспомогательная функция, которая обеспечивает разбор строки в поля структуры. Основная логика описана в замыкании, а внутри ParseTextCommand выполняется вспомогательная работа(логирование, отлавливание ошибок и т.д).

В результате работы функции fmt.Sscanf(s.Text, "set %s\n", &s.Key) в s.Key записывается значение ключа. Теперь можно приступать к чтению основных данных для кеширования.

data, _ := bufio.NewReader(s.Conn).Peek(s.Length)
storage[s.Key] = data
s.Conn.Write([]byte("STORED\n"))

Нам нужно учесть возможные ошибки. Данных может быть отправлено больше или меньше чем указанно в поле s.Length. Если меньше, то мы получим ошибку при вызове Peek(s.Length)

reader := bufio.NewReader(s.Conn)
data, err := reader.Peek(s.Length)

if (err != nil) {
    s.Conn.Write([]byte(CLIENT_ERROR + "\n"))
    return
}

А чтобы определить конец сообщения, договоримся что каждое сообщения должно завершаться комбинацией \r\n - как это реализованно в самом memcached.

control, err := reader.Peek(s.Length + 2)
if (err != nil) {
    s.Conn.Write([]byte(CLIENT_ERROR + "\n"))
    return
}

if !strings.HasSuffix(string(control), "\r\n") {
    s.Conn.Write([]byte(CLIENT_ERROR + "\n"))
    return
}

На этом пока закончим функционал для команды set. В будущем нужно будет добавить время хранения кеша и поле флагов для более точного соответствия протоколу memcached.

get

Поняв общую логику написания команд значительно проще писать остальные команды. В get будет чуть меньше параметров и сама логика работы будет сравнительно проще. Напишем структуру с такими полями:

type GetCommand struct {
    Name string
    Key  string
    Text string
    Conn net.Conn
}

А всю логику реализуем в методе Run(), которая будет заключаться в получении значения по ключу. Обратите внимание, что в начале и в конце добавляются специальные данные.

func (g *GetCommand) Run() {
    err := ParseTextCommand(g.Text, g, func() error {
        _, err := fmt.Sscanf(g.Text, "get %s\n", &g.Key)
        return err
    })

    if err == nil {
        data, ok := storage[g.Key]
        if ok {
            g.Conn.Write([]byte("VALUE " + g.Key + "\r\n"))
            g.Conn.Write(data)
            g.Conn.Write([]byte("\r\n"))
            g.Conn.Write([]byte("END\r\n"))
        }
    }
}

Есть один нюанс. На самом деле нам нужно запоминать и возвращать не только сами данные, но всякую и служебную информацию, такую как длину данных и время жизни кеша(на будущее). Кроме того, в стандартном проколе при записи кеша используется дополнительное полк ключей для команды set

Это решается довольно просто. Мы будем хранить в мапе storage не просто набор байтов а структуры с дополнительной метаинформацией:

type Item struct {
    Key     string
    Flags   int32
    Exptime int
    Length  int
    Data    []byte
}

Нам нужно немного поправить команду set, и заменить var storage map[string][]byte на var storage map[string]Item:

storage[s.Key] = Item{Key:s.Key, Length:s.Length, Data:data}

Поля Flags и Exptime мы пока не используем. Вносим изменения в метод Run() для команды get:

g.Conn.Write([]byte("VALUE " + g.Key + " " + strconv.Itoa(data.Length) + "\r\n"))
g.Conn.Write(data.Data)
g.Conn.Write([]byte("\r\n"))
g.Conn.Write([]byte("END\r\n"))

Теперь можем собрать получившийся сервер и протестировать как он работает с помощью все того же telnet:

telnet localhost 11212
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
set key 4
nnnn
STORED
get key
VALUE key 4
nnnn
END

Все получилось, как и задумывалось.

Конкурентный мап

У нашего кеш-сервера есть один очень существенный недостаток - способ записи данных в память. Мы используем простую запись в map. Но наше приложение работает конкурентно, каждое соединение - это новая go-рутина. И при одновременном доступе к storage из двух рутин могут возникнуть большие проблемы проблемы.

Чтобы избежать конфликтов при доступе к элементам переменной типа map, нам нужно написать свой интерфейс, с помощью которого мы будем выполнять необходимые манипуляции с элементами этого самого map.

Для обеспечения безопасного использования отображения мы вынесем его в отдельный пакет и сделаем не экспортируемым, доступ к которому можно будет получить только через каналы. Именно каналы будут обеспечивать очередность доступа к значениям map которое, в свою очередь, будет завернуто в не экспортируемый метод. Этот метод будет выполнять бесконечный цикл, блокирующийся до получения команд ("вставить это значение", "удалить этот элемент" и т.д.). Для начала рассмотрим интерфейс SafeMap, потом разберемся с методами типа safeMap, затем с функцией New() из пакета safemap и в конце с не экспортируемым методом safeMap.run().

type SafeMap interface {
    Insert(string, interface{})
    Delete(string)
    Find(string) (interface{}, bool)
    Len() int
    Update(string, UpdateFunc)
    Close() map[string]interface
    interface{}
}
type UpdateFunc func(interface{}, bool) interface{}

Все эти методы реализуются типом safeMap.

Тип UpdateFunc определяет сигнатуру функции обновления: она будет рассматриваться после знакомства с методом Update() ниже.

type safeMap chan commandData
type commandData struct {
    action  commandAction
    key     string
    value   interface{}
    result  chan<- interface{}
    data    chan<- map[string]interface{}
    updater UpdateFunc
}

type commandAction int
const (
    remove commandAction = iota
    end
    find
    insert
    length
    update
)

safeMap, по сути, это канал, в который можно посылать и из которого можно принимать значения типа commandData. Когда в канал приходит значение commandData, то оно определяет какую операцию нужно выполнять и какие данные использовать(например ключ). Подробности про поля увидим дальше.

Обратите внимание, что каналы result и data, объявлены как однонаправленные. Это значит что наш поточно-безопасный map может только посылать значения в них, но не может принимать из них. Дальше будет видно, что эти каналы создаются как двунаправленные, но в рамках нашего типа нет необходимости что то принимать из этих каналов, но их можно использовать для приема вне safeMap.

func (sm safeMap) Insert(key string, value interface{}) {
    sm <- commandData{action: insert, key: key, value: value}
}

Это ни что иное, как безопасный эквивалент инструкции m[key] = value, где m - значение типа map[string]interface{}. Метод создает значение типа commandData с командой insert, указанным ключом key и значением value, и посылает его в поточно-ориентированный map, который, как было показано выше, имеет тип chan commandData.

Когда будем рассматривать метод New() из пакета safemap увидим, что safeMap возвращается функцией New() как интерфейс SafeMap и уже привязанным к go-рутине. Метод safeMap.run() выполняется в отдельной рутине в рамках замыкания. Этот метод также содержит в себе реальный map, используемый для хранения элементов, и цикл for, который производит итерации по элементам в канале safeMap и выполняет команды, принимаемые из канала.

func (sm safeMap) Delete(key string) {
    sm <- commandData{action: remove, key: key}
}

Этот метод посылает команду на удаление элемента с указанным ключом.

type findResult struct {
    interface{}
    value interface
    found bool
}
func (sm safeMap) Find(key string) (value interface{}, found bool) {
    reply := make(chan interface{})
    sm <- commandData{action: find, key: key, result: reply}
    result := (<-reply).(findResult)
    return result.value, result.found
}

В методе safeMap.Find() создается канал reply, с помощью которого можно получить значение из нашего безопасного map. Для этого в методе посылается команда find с нужным ключем и с указанием канала reply. Так как это не буферизированный канал, то операция блокируется пока безопасный map не обработает все запросы. После отправки запроса, метод получает ответ в виде структуры findResult. Из этой структуры мы можем получить необходимые поля и вернуть из как результат работы метода.

func (sm safeMap) Len() int {
    reply := make(chan interface{})
    sm <- commandData{action: length, result: reply}
    return (<-reply).(int)
}

Принцип работы этого метода очень похож на Find(). Аналогичным образом создается канал reply, из которого забирается результат.

func (sm safeMap) Update(key string, updater UpdateFunc) {
    sm <- commandData{action: update, key: key, updater: updater}
}

В методе создается и отправляется команда update с указанным ключем key и функцией updater. В момент выполнения операции вызовется функция updater в которую будет передано значение элемента по указанному ключи и булевый параметр, указывающий найдено ли такое значение в map или нет. Самому элементу будет присвоено новое значение, которое вернет функция updater. Если элемента по такому ключу раньше не существовало, то будет создан новый элемент.

Важное замечание - если в updater будут вызываться методы safeMap, то есть вероятность взаимоблокировок. Причина этого в методе safemap.safeMap.run()

Казалось бы, у нас уже есть методы Insert(), Delete() и Find(), зачем нам еще один метод? Его нужно использовать, когда возникает необходимость не просто записать новое значение, а изменить старое. Например, если у нас в map хранятся цены на товары и цена одного из товаров возросла на 5%. Если мы используем обычный map, то достаточно просто написать m[key] *= 1.05 - значение элемента будет увеличено на 5%, если такого элемента не существует, то создастся новый элемент с нулевым значением. В нашем случае, подобную операцию можно реализовать с помощью метода Update().

if price, found := priceMap.Find(part); found { // ОШИБКА!
    priceMap.Insert(part, price.(float64)*1.05)
}

Проблема в том, что используя вызов двух методов для записи значения, мы нарушаем атомарность операции. Нет никакой уверенности, что между вызовом Find() и Insert() какая ни будь другая go-рутина не изменит значение.

Именно атомарность метода Update() обеспечивает безопасное сохранение значения.

priceMap.Update(part, func(price interface{}, found bool) interface{} {
    if found {
        return price.(float64) * 1.05
    }
    return 0.0
})

Если элемент с указанным ключом отсутствует, будет создан новый элемент со значением 0.0. Иначе существующие значение будет увеличено на 5%.

func (sm safeMap) Close() map[string]interface{} {
    reply := make(chan map[string]interface{})
    sm <- commandData{action: end, data: reply}
    return <-reply
}

Метод Close() закрывает канал safeMap внутри метода safeMap.run(), после этого завершается цикл в методе safeMap.run(). Затем, метод Close() возвращает используемый map[string]interface{}, который может быть использован в рамках программы. Это метод может быть вызван только один раз, несмотря на то, сколько go-рутин используют наш безопасный map.

Теперь глянем на функцию New() из пакета safemap. Эта функция создает значение типа safeMap с интерфейсом SafeMap. Этот интерфейс можно использовать вне модуля и внутри метода safeMap.run(), в котором находится используемый канал и фактически map[string]interface{} для хранения данных.

func New() SafeMap {
    sm := make(safeMap) // тип safeMap chan commandData
    go sm.run()
    return sm
}

safeMap - по сути это канал, поэтому для его создания и получения ссылки необходимо использовать встроенную функцию make(). Сразу после создания safeMap вызывается метод run(), внутри которого создается обычный map. Метод run() запускается в go-рутине. В конце функция New() возвращает созданный safeMap как экземпляр с интерфейсом SafeMap.

func (sm safeMap) run() {
    store := make(map[string]interface{})
    for command := range sm {
        switch command.action {
        case insert:
            store[command.key] = command.value
        case remove:
            delete(store, command.key)
        case find:
            value, found := store[command.key]
            command.result <- findResult{value, found}
        case length:
            command.result <- len(store)
        case update:
            value, found := store[command.key]
            store[command.key] = command.updater(value, found)
        case end:
            close(sm)
            command.data <- store
        }
    }
}

После создания обычного map метод run() запускает бесконечный цикл, который пытается получить значения из канала. Таким образом, цикл блокируется, если в канале нет ни одного значения.

В принципе, обработка команд очень проста - это все стандартные операции работы с map, кроме команды update. В случае этой команды, элементу присваивается значение, которое вернет функция command.updater().

//...
case end:
    close(sm)
    command.data <- store
}
//...

Этот код срабатывает при вызове метода Close(). Закрывается канал safeMap, и реальный map отправляется в канал результатов.

Самое важное при написании функции command.updater() - это избегать вызовов методов из safeMap, так как произойдет блокировка. Обработка команды update не может завершиться, пока command.updater() не вернет управление, но если функция вызовет метод типа safeMap, этот вызов заблокируется в ожидании завершения обработки текущей команды, и ни то, ни другое не смогут завершиться.

Теперь вернемся к нашему кеш серверу. Нам нужно изменить операцию добавления нового элемента в кеш и операцию получения данных из кеша. Начнем с нового определения нашей переменной storage

var storage safemap.SafeMap

func init() {
    storage = safemap.New()
}

Теперь изменяем метод Run() для команды set:

storage.Insert(s.Key, Item{Key: s.Key, Length: s.Length, Data: data})

И последним изменяем метод Run() для команды get. Обратите внимание на выражение item := data.(Item) - нам приходится кастовать тип. Приводить переменную типа interface{} к типу Item{}

data, ok := storage.Find(g.Key)
item := data.(Item)
if ok {
    // Необходимо для адекватного переноса, так как при считывании
    // последний перенос не учитывался
    g.Conn.Write([]byte("VALUE " + g.Key + " " + strconv.Itoa(item.Length) + "\r\n"))
    g.Conn.Write(item.Data)
    g.Conn.Write([]byte("\r\n"))
    g.Conn.Write([]byte("END\r\n"))
}

Заключение

В этой части мы рассмотрели как реализовать сервер, способный выполнять команды по сети. Сделали небольшой прототип своего memcached и узнали как писать потокобезопасные программы, основанные на канал и рутинах.

В следующей части мы продолжим реализовывать команды из протокола memcached. Также, нам нужно будет написать клиентскую библиотеку и посмотреть, как этот кеш-сервер можно использовать совместно с распределенным приложением.

Исходный код примера можно посмотреть на гитхабе.

Примечания

Для статьи был использован код из примеров к книге "Programming in Go: Creating Applications for the 21st Century (Developer's Library)" Mark Summerfield.