Свой memcached. Часть 1
Сайты становятся все больше, информация уже не влазит на терабайтные харды, а пользователи еще более требовательны к задержкам и не готовы ждать даже пары секунд ответа от сервера. К нам на помощь спешат распределенные системы обработки информации.
Распеределенщина и все такое - это целый неизведанный мир, со своими законами. Нахрапам не возьмешь. Но нужно с чего-то начинать. Например, с отличного курса лекций.
Важной частью высоко нагруженных систем является кеширование. И очень часто, это кеш делается распределенным. Используется несколько серверов, на которых сохраняются кешированные данные.
Memcached - одни из самых ярких представителей подобных систем кеширования. Это прекрасный продукт, который работает и достаточно неплохо справляется со своими задачами.
И, если хочется разобраться как все устроено, то самый верный способ - это написать свой велосипед. Конечно, вряд ли наша поделка сможет так сразу тягаться с memcached. Но несколько практически примеров будут очень к стати.
Простейший сервер
Начнем с написания просто tcp сервера к которому можно обратиться по telnet. Такие сервера на Go с использованием пакета net
пишутся просто замечательно.
package main
import (
"log"
"net"
)
func main() {
ln, err := net.Listen("tcp", ":11212")
if err != nil {
log.Println(err)
}
for {
conn, err := ln.Accept()
if err != nil {
log.Println(err)
}
go connectionHandler(conn)
}
}
func connectionHandler(conn net.Conn) {
buf := make([]byte, 1024)
_, err := conn.Read(buf)
if err != nil {
fmt.Println("Error reading:", err.Error())
}
conn.Write([]byte("Message received."))
conn.Close()
}
Это очень простой сервер, который висит на 11212 порту, ждет когда мы ему что-то отправим. При получении сообщения отправляет нам в ответ "Message received." и закрывает соединение.
ln, err := net.Listen("tcp", ":11212")
Биндимся на порт под номером 11212.
for {
conn, err := ln.Accept()
if err != nil {
log.Println(err)
}
go connectionHandler(conn)
}
В бесконечном цикле ждем подключения. При новом подключении ln.Accept()
будет создавать новое соединение conn net.Conn
и функция connectionHandler(conn)
будет запушена в новой go-рутине. Обратите внимание, что запуск новых рутин обеспечивает конкурентный доступ к нашему серверу.
После подключения клиента, начинает работать функция connectionHandler
:
buf := make([]byte, 1024)
_, err := conn.Read(buf)
if err != nil {
fmt.Println("Error reading:", err.Error())
}
В этой функции создается буфер, затем в него сохраняются данные переданные от клиента.
conn.Write([]byte("Message received."))
conn.Close()
После получения сообщения от клиента, нас сервер сам шлет свое сообщение "Message received." и закрывает соединение.
Связаться с нашим сервером можно по telnet:
$ telnet localhost 11212
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hi
Message received.
Connection closed by foreign host.
Telnet пригодится нам для дальнейшего тестирования. По крайней мере, какое-то время, пока у нас не появится свой клиент.
Протокол
Раз уж мы взялись писать аналог memcached, то давайте постараемся реализовать его протокол. К 100% совместимости мы стремится не будем, но постараемся максимально приближено реализовать большинство команд.
- get - Чтение из кэша
get mysimplekey
- set - Запись в кэш. Пишет не проверяя есть ли в кэше значение с этим ключом:
set mysimplekey 0 100 50
- add - Запись в кэш. Записывает только тогда, когда в кеше нет значений с таким ключом:
add newkey 0 20 50
- replace - Замена значения записи в кеше. Работает по принципу add:
replace key 0 100 50
- append - Записать в кеш по указанному ключу данные, перед уже находящимися там данными:
append key 0 100 155
- prepend - Записать в кеш по указанному ключу данные, после уже находящихся там данных
prepend key 0 60 15
- incr - Увеличивает числовое значение ключа на указанную величину:
incr mykey 2
- decr - Уменьшает числовое значение ключа на указанную величину:
decr mykey 5
- delete - Удаляет значение из кеша по ключу:
delete mykey
- flush_all - Производит инвалидацю все записей в кэше:
flush_all
. Отложенная инвалидация на указанное кол-во секунд:flush_all 900
. - stats - Выводит общую статистику:
stats
. Статистика использования памяти:stats slabs
. Еще одна статистика использования памяти:stats malloc
Можно посмотреть список элементов в кеше:stats items
,stats detail [on|off|dump]
,stats sizes
. Сброс статистикиstats reset
- version - Показывает версию memcached:
version
- verbosity - Установка уровня детализации логирования:
verbosity
- quit - Закрыть телнет сессию:
quit
Полное описание протокола можно найти на github
Стоить заметить, что если нам не обязательна поддержка memcahced протокола и все клиенты будут написаны на Go, то можно было бы реализовать более специфический протокол. Более того, мы могли бы не заморачиваться над текстовым протоколом,а придумать что то более в стиле Go. Пример такого протокола рассмотрим в конце серии.
Основные команды
Давайте начнем с реализации двух базовых команд: get
и set
. Заодно, определимся с принципами работы.
Нужно научится разделать команды. Для этого в func connectionHandler(conn net.Conn)
у нас будет бесконечный цикл, в котором будут читаться данные из соединения. Эти данные будут проверятся на наличие команд.
func connectionHandler(conn net.Conn) {
for {
command, err := bufio.NewReader(conn).ReadString('\n')
if err != nil {
if err == io.EOF {
log.Println("Error io.EOF", err)
break
} else {
log.Println("Error reading:", err)
}
}
if strings.HasPrefix(command, "set") {
// выполняем команду set
}
if strings.HasPrefix(command, "get") {
// выполняем команду get
}
}
}
bufio.NewReader(conn).ReadString('\n')
постоянно пытаться прочитать строку из соединения до символа '\n'
. В зависимости от того как начинается эта строка будет срабатывать один из ифов. Давайте допишем немного кода, чтоб сервер возвращал данные в соединение сигнализируя о сработавшей команде.
if strings.HasPrefix(command, "set") {
conn.Write([]byte("answer: set command\n"))
}
if strings.HasPrefix(command, "get") {
conn.Write([]byte("answer: get command\n"))
}
Теперь попробуем подключится по telnet и выполнить пару команд:
telnet localhost 11212
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
set key
answer: set command
get key
answer: get command
set
Принцип работы с соединением должен быть ясен. Теперь можно описывать наши команды. Начнем с того, что команда должна получить и сохранить некоторую информацию - ключ и сами данные. Для команды установки кеша(set) можно определить такую структуру:
// Структура для команды set
type SetCommand struct {
Key string
Length int
Text string
Conn net.Conn
}
Key
- Ключ по которому будет сохраняться ключLength
- Количество данных в байтах, которые нужно сохранитьText
- Строка самой командыConn
- Объект соединения. В рамках выполнения команды необходимо будет отправлять ответы пользователю.
Со временем мы будем добавлять новые параметры команды и, соответственно, новые поля в структуре.
Всю логику этой команды можно реализовать внутри метода Run()
.
func (s *SetCommand) Run() {
err := ParseTextCommand(s.Text, s, func() error {
_, err := fmt.Sscanf(s.Text, "set %s %d\n", &s.Key, &s.Length)
return err
})
if err == nil {
data, _ := bufio.NewReader(s.Conn).ReadBytes('\n')
storage[s.Key] = data
s.Conn.Write([]byte("STORED\n"))
}
}
Переменная storage
- это переменная в глобальной области видимости которая имеет тип map[string][]byte
. По сути - это хранилище для нашего кеша. Она инициализируется в функции init()
нашего модуля.
var storage map[string][]byte
func init() {
storage = make(map[string][]byte)
}
Чтобы со всеми командами можно было единообразно работать нужно определить общий интерфейс. У всех наших команд будет как минимум метод Run()
. А значит нам подойдет интерфейс вида:
type Command interface {
Run()
}
Теперь мы можем использовать *SetCommand
как экземпляр с интерфейсом Command
. Обратите внимание, что я написал именно указатель, так как в нашем случае метод Run()
определен именно для указателя. Если мы создадим экземпляр команды таким образом:
set := SetCommand{
Text: command,
Conn: conn,
}
То не сможем его использовать как интерфейс Command
. Нужно получить именно указатель на структуру а не саму структуру:
set := &SetCommand{
Text: command,
Conn: conn,
}
Обратите внимание на выражение:
err := ParseTextCommand(s.Text, s, func() error {
_, err := fmt.Sscanf(s.Text, "set %s\n", &s.Key)
return err
})
ParseTextCommand
- это вспомогательная функция, которая обеспечивает разбор строки в поля структуры. Основная логика описана в замыкании, а внутри ParseTextCommand
выполняется вспомогательная работа(логирование, отлавливание ошибок и т.д).
В результате работы функции fmt.Sscanf(s.Text, "set %s\n", &s.Key)
в s.Key
записывается значение ключа. Теперь можно приступать к чтению основных данных для кеширования.
data, _ := bufio.NewReader(s.Conn).Peek(s.Length)
storage[s.Key] = data
s.Conn.Write([]byte("STORED\n"))
Нам нужно учесть возможные ошибки. Данных может быть отправлено больше или меньше чем указанно в поле s.Length
. Если меньше, то мы получим ошибку при вызове Peek(s.Length)
reader := bufio.NewReader(s.Conn)
data, err := reader.Peek(s.Length)
if (err != nil) {
s.Conn.Write([]byte(CLIENT_ERROR + "\n"))
return
}
А чтобы определить конец сообщения, договоримся что каждое сообщения должно завершаться комбинацией \r\n
- как это реализованно в самом memcached.
control, err := reader.Peek(s.Length + 2)
if (err != nil) {
s.Conn.Write([]byte(CLIENT_ERROR + "\n"))
return
}
if !strings.HasSuffix(string(control), "\r\n") {
s.Conn.Write([]byte(CLIENT_ERROR + "\n"))
return
}
На этом пока закончим функционал для команды set. В будущем нужно будет добавить время хранения кеша и поле флагов для более точного соответствия протоколу memcached.
get
Поняв общую логику написания команд значительно проще писать остальные команды. В get будет чуть меньше параметров и сама логика работы будет сравнительно проще. Напишем структуру с такими полями:
type GetCommand struct {
Name string
Key string
Text string
Conn net.Conn
}
А всю логику реализуем в методе Run()
, которая будет заключаться в получении значения по ключу. Обратите внимание, что в начале и в конце добавляются специальные данные.
func (g *GetCommand) Run() {
err := ParseTextCommand(g.Text, g, func() error {
_, err := fmt.Sscanf(g.Text, "get %s\n", &g.Key)
return err
})
if err == nil {
data, ok := storage[g.Key]
if ok {
g.Conn.Write([]byte("VALUE " + g.Key + "\r\n"))
g.Conn.Write(data)
g.Conn.Write([]byte("\r\n"))
g.Conn.Write([]byte("END\r\n"))
}
}
}
Есть один нюанс. На самом деле нам нужно запоминать и возвращать не только сами данные, но всякую и служебную информацию, такую как длину данных и время жизни кеша(на будущее). Кроме того, в стандартном проколе при записи кеша используется дополнительное полк ключей для команды set
Это решается довольно просто. Мы будем хранить в мапе storage
не просто набор байтов а структуры с дополнительной метаинформацией:
type Item struct {
Key string
Flags int32
Exptime int
Length int
Data []byte
}
Нам нужно немного поправить команду set, и заменить var storage map[string][]byte
на var storage map[string]Item
:
storage[s.Key] = Item{Key:s.Key, Length:s.Length, Data:data}
Поля Flags
и Exptime
мы пока не используем. Вносим изменения в метод Run()
для команды get:
g.Conn.Write([]byte("VALUE " + g.Key + " " + strconv.Itoa(data.Length) + "\r\n"))
g.Conn.Write(data.Data)
g.Conn.Write([]byte("\r\n"))
g.Conn.Write([]byte("END\r\n"))
Теперь можем собрать получившийся сервер и протестировать как он работает с помощью все того же telnet:
telnet localhost 11212
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
set key 4
nnnn
STORED
get key
VALUE key 4
nnnn
END
Все получилось, как и задумывалось.
Конкурентный мап
У нашего кеш-сервера есть один очень существенный недостаток - способ записи данных в память. Мы используем простую запись в map. Но наше приложение работает конкурентно, каждое соединение - это новая go-рутина. И при одновременном доступе к storage
из двух рутин могут возникнуть большие проблемы проблемы.
Чтобы избежать конфликтов при доступе к элементам переменной типа map, нам нужно написать свой интерфейс, с помощью которого мы будем выполнять необходимые манипуляции с элементами этого самого map.
Для обеспечения безопасного использования отображения мы вынесем его в отдельный пакет и сделаем не экспортируемым, доступ к которому можно будет получить только через каналы. Именно каналы будут обеспечивать очередность доступа к значениям map которое, в свою очередь, будет завернуто в не экспортируемый метод. Этот метод будет выполнять бесконечный цикл, блокирующийся до получения команд ("вставить это значение", "удалить этот элемент" и т.д.). Для начала рассмотрим интерфейс SafeMap
, потом разберемся с методами типа safeMap
, затем с функцией New()
из пакета safemap
и в конце с не экспортируемым методом safeMap.run()
.
type SafeMap interface {
Insert(string, interface{})
Delete(string)
Find(string) (interface{}, bool)
Len() int
Update(string, UpdateFunc)
Close() map[string]interface
interface{}
}
type UpdateFunc func(interface{}, bool) interface{}
Все эти методы реализуются типом safeMap
.
Тип UpdateFunc
определяет сигнатуру функции обновления: она будет рассматриваться после знакомства с методом Update()
ниже.
type safeMap chan commandData
type commandData struct {
action commandAction
key string
value interface{}
result chan<- interface{}
data chan<- map[string]interface{}
updater UpdateFunc
}
type commandAction int
const (
remove commandAction = iota
end
find
insert
length
update
)
safeMap
, по сути, это канал, в который можно посылать и из которого можно принимать значения типа commandData
. Когда в канал приходит значение commandData
, то оно определяет какую операцию нужно выполнять и какие данные использовать(например ключ). Подробности про поля увидим дальше.
Обратите внимание, что каналы result
и data
, объявлены как однонаправленные. Это значит что наш поточно-безопасный map может только посылать значения в них, но не может принимать из них. Дальше будет видно, что эти каналы создаются как двунаправленные, но в рамках нашего типа нет необходимости что то принимать из этих каналов, но их можно использовать для приема вне safeMap
.
func (sm safeMap) Insert(key string, value interface{}) {
sm <- commandData{action: insert, key: key, value: value}
}
Это ни что иное, как безопасный эквивалент инструкции m[key] = value
, где m
- значение типа map[string]interface{}
. Метод создает значение типа commandData
с командой insert
, указанным ключом key
и значением value
, и посылает его в поточно-ориентированный map, который, как было показано выше, имеет тип chan commandData
.
Когда будем рассматривать метод New()
из пакета safemap
увидим, что safeMap
возвращается функцией New()
как интерфейс SafeMap
и уже привязанным к go-рутине. Метод safeMap.run()
выполняется в отдельной рутине в рамках замыкания. Этот метод также содержит в себе реальный map, используемый для хранения элементов, и цикл for
, который производит итерации по элементам в канале safeMap
и выполняет команды, принимаемые из канала.
func (sm safeMap) Delete(key string) {
sm <- commandData{action: remove, key: key}
}
Этот метод посылает команду на удаление элемента с указанным ключом.
type findResult struct {
interface{}
value interface
found bool
}
func (sm safeMap) Find(key string) (value interface{}, found bool) {
reply := make(chan interface{})
sm <- commandData{action: find, key: key, result: reply}
result := (<-reply).(findResult)
return result.value, result.found
}
В методе safeMap.Find()
создается канал reply
, с помощью которого можно получить значение из нашего безопасного map. Для этого в методе посылается команда find
с нужным ключем и с указанием канала reply
. Так как это не буферизированный канал, то операция блокируется пока безопасный map не обработает все запросы. После отправки запроса, метод получает ответ в виде структуры findResult
. Из этой структуры мы можем получить необходимые поля и вернуть из как результат работы метода.
func (sm safeMap) Len() int {
reply := make(chan interface{})
sm <- commandData{action: length, result: reply}
return (<-reply).(int)
}
Принцип работы этого метода очень похож на Find()
. Аналогичным образом создается канал reply
, из которого забирается результат.
func (sm safeMap) Update(key string, updater UpdateFunc) {
sm <- commandData{action: update, key: key, updater: updater}
}
В методе создается и отправляется команда update
с указанным ключем key
и функцией updater
. В момент выполнения операции вызовется функция updater
в которую будет передано значение элемента по указанному ключи и булевый параметр, указывающий найдено ли такое значение в map или нет. Самому элементу будет присвоено новое значение, которое вернет функция updater
. Если элемента по такому ключу раньше не существовало, то будет создан новый элемент.
Важное замечание - если в updater
будут вызываться методы safeMap
, то есть вероятность взаимоблокировок. Причина этого в методе safemap.safeMap.run()
Казалось бы, у нас уже есть методы Insert()
, Delete()
и Find()
, зачем нам еще один метод? Его нужно использовать, когда возникает необходимость не просто записать новое значение, а изменить старое. Например, если у нас в map хранятся цены на товары и цена одного из товаров возросла на 5%. Если мы используем обычный map, то достаточно просто написать m[key] *= 1.05
- значение элемента будет увеличено на 5%, если такого элемента не существует, то создастся новый элемент с нулевым значением. В нашем случае, подобную операцию можно реализовать с помощью метода Update()
.
if price, found := priceMap.Find(part); found { // ОШИБКА!
priceMap.Insert(part, price.(float64)*1.05)
}
Проблема в том, что используя вызов двух методов для записи значения, мы нарушаем атомарность операции. Нет никакой уверенности, что между вызовом Find()
и Insert()
какая ни будь другая go-рутина не изменит значение.
Именно атомарность метода Update()
обеспечивает безопасное сохранение значения.
priceMap.Update(part, func(price interface{}, found bool) interface{} {
if found {
return price.(float64) * 1.05
}
return 0.0
})
Если элемент с указанным ключом отсутствует, будет создан новый элемент со значением 0.0. Иначе существующие значение будет увеличено на 5%.
func (sm safeMap) Close() map[string]interface{} {
reply := make(chan map[string]interface{})
sm <- commandData{action: end, data: reply}
return <-reply
}
Метод Close()
закрывает канал safeMap
внутри метода safeMap.run()
, после этого завершается цикл в методе safeMap.run()
. Затем, метод Close()
возвращает используемый map[string]interface{}
, который может быть использован в рамках программы. Это метод может быть вызван только один раз, несмотря на то, сколько go-рутин используют наш безопасный map.
Теперь глянем на функцию New()
из пакета safemap
. Эта функция создает значение типа safeMap
с интерфейсом SafeMap
. Этот интерфейс можно использовать вне модуля и внутри метода safeMap.run()
, в котором находится используемый канал и фактически map[string]interface{}
для хранения данных.
func New() SafeMap {
sm := make(safeMap) // тип safeMap chan commandData
go sm.run()
return sm
}
safeMap
- по сути это канал, поэтому для его создания и получения ссылки необходимо использовать встроенную функцию make()
. Сразу после создания safeMap
вызывается метод run()
, внутри которого создается обычный map. Метод run()
запускается в go-рутине. В конце функция New()
возвращает созданный safeMap
как экземпляр с интерфейсом SafeMap
.
func (sm safeMap) run() {
store := make(map[string]interface{})
for command := range sm {
switch command.action {
case insert:
store[command.key] = command.value
case remove:
delete(store, command.key)
case find:
value, found := store[command.key]
command.result <- findResult{value, found}
case length:
command.result <- len(store)
case update:
value, found := store[command.key]
store[command.key] = command.updater(value, found)
case end:
close(sm)
command.data <- store
}
}
}
После создания обычного map метод run()
запускает бесконечный цикл, который пытается получить значения из канала. Таким образом, цикл блокируется, если в канале нет ни одного значения.
В принципе, обработка команд очень проста - это все стандартные операции работы с map, кроме команды update
. В случае этой команды, элементу присваивается значение, которое вернет функция command.updater()
.
//...
case end:
close(sm)
command.data <- store
}
//...
Этот код срабатывает при вызове метода Close()
. Закрывается канал safeMap
, и реальный map отправляется в канал результатов.
Самое важное при написании функции command.updater()
- это избегать вызовов методов из safeMap
, так как произойдет блокировка. Обработка команды update
не может завершиться, пока command.updater()
не вернет управление, но если функция вызовет метод типа safeMap
, этот вызов заблокируется в ожидании завершения обработки текущей команды, и ни то, ни другое не смогут завершиться.
Теперь вернемся к нашему кеш серверу. Нам нужно изменить операцию добавления нового элемента в кеш и операцию получения данных из кеша. Начнем с нового определения нашей переменной storage
var storage safemap.SafeMap
func init() {
storage = safemap.New()
}
Теперь изменяем метод Run()
для команды set:
storage.Insert(s.Key, Item{Key: s.Key, Length: s.Length, Data: data})
И последним изменяем метод Run()
для команды get. Обратите внимание на выражение item := data.(Item)
- нам приходится кастовать тип. Приводить переменную типа interface{}
к типу Item{}
data, ok := storage.Find(g.Key)
item := data.(Item)
if ok {
// Необходимо для адекватного переноса, так как при считывании
// последний перенос не учитывался
g.Conn.Write([]byte("VALUE " + g.Key + " " + strconv.Itoa(item.Length) + "\r\n"))
g.Conn.Write(item.Data)
g.Conn.Write([]byte("\r\n"))
g.Conn.Write([]byte("END\r\n"))
}
Заключение
В этой части мы рассмотрели как реализовать сервер, способный выполнять команды по сети. Сделали небольшой прототип своего memcached и узнали как писать потокобезопасные программы, основанные на канал и рутинах.
В следующей части мы продолжим реализовывать команды из протокола memcached. Также, нам нужно будет написать клиентскую библиотеку и посмотреть, как этот кеш-сервер можно использовать совместно с распределенным приложением.
Исходный код примера можно посмотреть на гитхабе.
Примечания
Для статьи был использован код из примеров к книге "Programming in Go: Creating Applications for the 21st Century (Developer's Library)" Mark Summerfield.