Модели конкурентности в Go. Конвейеры.

61 minute read

Перевод статьи из официального блога Go "Go Concurrency Patterns: Pipelines and cancellation". Автор Sameer Ajmani

Примечания.

Давайте сразу договоримся, как переводить "сoncurrency". В русском языке аналога пока нет. Самое близкое - это "параллельность". Но нам это не подходит, потому что с технической стороны "сoncurrency" это совсем не "параллельность". Давайте будем называть это "конкурентность". Такая же проблема и с "pipeline". Как мне кажется, наиболее точный аналог это "конвейер".

Введение

Примитивы конкурентности в Go позволяют построить эффективные конвейеры потоковой обработки данных, которые эффективно используют CPU и I/O. В этой статье описаны приемы создания таких конвейеров, тонкости использования и некоторые способы решения возникающих проблем.

Что такое конвейеры(pipeline)?

В Go нет четкого понятия для конвейера, это всего лишь один из видов параллельного программирования. Неформальное определение конвейера - это ряд этапов, связанных между собой каналами, где каждый этап это набор go-рутин выполняющих определенную функцию. На каждом этапе выполняются определенные действия:

  • Получить значения с предыдущего этапа.
  • Выполняются какие либо действия над этими значениями. Как правило создаются новые значения.
  • Значения отправляются на следующий этап через выходные каналы.

На всех этапах может быть сколько угодно входящих и выходящих каналов, кроме первого и последнего, на которых есть только выходящий и входящий канал. Первый этап иногда называют источник(source) или отправитель(producer). Последний этап называется потребитель(consumer, sink).

Мы начнем с простого примера конвейера для быстрого понимания принципов и идей. Позже разберем более реалистичный пример использования такого подхода.

Возведение в квадрат

Рассмотри трехэтапный конвейер.

Превый этап. gen это функция, которая преобразует список целых чисел в канал, который посылает числа из этого списка. Внутри этой функции запускается go-рутина, которая отправляет целые числа в канал и закрывает этот канал, когда все числа отправлены:

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

Второй этап. sq забирает числа из канала и возвращает новый канал, который отдает квадрат каждого полученного числа. После того как входящий канал закрыт и все значения на этом шаге отправлены в исходящий канал, то исходящий канал закрывается:

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

Функция main оперирует каналами и реализует последний этап. Принимает значения, полученные на втором этапе, и выводит каждое, пока канал не закроется:

func main() {
    // Создаем необходимые каналы.
    c := gen(2, 3)
    out := sq(c)

    // Выводим значения.
    fmt.Println(<-out) // 4
    fmt.Println(<-out) // 9
}

Функция sq принимает и возвращает каналы одинакового типа. А это значит, что мы можем компоновать эти функции сколько угодно. Кроме того, мы можем переписать функцию main с использованием range:

func main() {
    // Создаем необходимые каналы и выводим значения.
    for n := range sq(sq(gen(2, 3))) {
        fmt.Println(n) // 16 затем 81
    }
}

Fan-out, fan-in

Прим. "fan" здесь, вероятно, стоит переводить как "лопастной вентилятор" или, с технической стороны, как "револьверный барабан".

Несколько функций могут читать из одного канала пока он не закроется. Это называется fan-out. Такой подход дает возможность распределять задачи между так называемыми воркерами(исполнителями).

Функция может читать из нескольких входных каналов, мультиплексировать все в один канал и обрабатывать, пока входные каналы не будут закрыты. Это называется fan-in.

Мы можем изменить наш пример так, чтобы запускались два экземпляра sq. Каждый экземпляр читает данные из входного канала. Мы добавим еще одну функцию - merge. Эта функция будет реализовывать fan-in для наших результатов:

func main() {
    in := gen(2, 3)

    // Распределяем работу между двумя воркерами для считывания данных из `in`.
    c1 := sq(in)
    c2 := sq(in)

    // Объединяем вывод из c1 и c2.
    for n := range merge(c1, c2) {
        fmt.Println(n) // 4 затем 9, или 9 затем 4
    }
}

Функция merge преобразует несколько каналов в один канал, запуская go-рутину для каждого входного канала. Внутри этих рутин значения копируются в один выходной канал. После того как все go-рутины, формирующие выходной канал, запущены, стартует еще одна go-рутина, котора нужна для закрытия выходного канала после отправки в него всех данных.

Отправка данных в закрытый канал спровоцирует панику. Поэтому, очень важно убедиться, что все данные отправлены до вызова close. В нашем случае используется sync.WaitGroup. Этот способ обеспечивает простую синхронизацию:

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Запуск go-рутины для каждого входного канала из `cs`. `output`
    // копирует значения из входного канала `с` пока `с` не будет 
    // закрыт. Затем вызывается `wg.Done`.
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // Запуск go-рутины, которая закроет `out` канал после 
    // завершения всех  `output` go-рутин. Этот код должен 
    // выполняться только после вызова `wg.Add`.
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

Быстрая остановка(Stopping short)

До этого мы рассматривали паттерны в которых четко обозначены этапы:

  • Закрыть исходящие каналы, после выполнения всех операций.
  • Получить значения из входящих каналов, пока они не будут закрыты.

Такой подход позволяет на каждом этапе получать значения с помощью range и гарантирует, что все go-рутины завершатся после того как все значения будут отправлены.

Но в реальном мире нам не всегда нужно ожидать все отправленные значения. Иногда это может быть нюансами дизайна, когда приемнику нужны только часть значений. Чаще всего, такое происходит, если есть ошибки на более раннем этапе. В таком случае, нам не нужно ждать, пока все значения будут получены и можно прекратить их обработку на более раннем шаге.

В нашем примере конвейера, если на каком-то шаге не получается получить значения, то все попытки отправить новые значения будут заблокированы:

    // Получаем первое значение из выходного канала.
    out := merge(c1, c2)
    fmt.Println(<-out) // 4 or 9
    return
    // Так как мы не получаем второе значение и `out`,
    // то рутина зависает при попытке отправки чего либо.
}

Налицо утечка ресурсов. Go-рутина потребляет память и ресурсы. Все это хранится на стеке самой рутины и не будет подчищено сборщиком мусора, пока go-рутина не завершится.

Нам нужно так организовать передачу по конвейеру, чтобы обеспечить выход из функции, даже когда на более нижних уровнях значения не забираются. Один из способов реализации - это создание буфера. Буфер содержит фиксированное количество значений. Если в буфере есть место, то операция отправки завершается не ожидая получения:

c := make(chan int, 2) // размер буфера 2
c <- 1  // успешная передача
c <- 2  // успешная передача
c <- 3  // блокируется пока другая go-рутина не прочитает из канала <-c

Когда количество отправляемых данных известно, использование буфера может упростить код. Например, можем переписать функцию gen так, чтобы список значений передавался в буферизированный канал. В таком случае, нам не нужно создавать новую go-рутину:

func gen(nums ...int) <-chan int {
    out := make(chan int, len(nums))
    for _, n := range nums {
        out <- n
    }
    close(out)
    return out
}

Вернемся к нашим заблокированным go-рутинам в нашем конвейере. Можем использовать буфер для нашего канала, полученного после мультиплексирования:

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int, 1) // добавляем место
    // ... остальное без изменений ...

Хотя это и решает проблему с заблокированными go-рутинами в конкретном примере, в целом это плохой код. Выбор размера буфера сильно зависит от количества значений в каналах которые мы получаем в merge и количества значений которые будут получены из результирующего канала. Это довольно костыльная схема. Если на вход передать больше значений или прочитать меньше значений, то go-рутины снова будут заблокированы.

Вместо этого, нам нужен механизм, который будет сигнализировать на верхний уровень, что мы прекратили получать значения на текущем уровне.

Явная отмена

Когда в main решено прекратить получать значения из канала out, то нужно как то сообщить go-рутине на верхнем уровне что нужно перестать посылать сообщения. Это возможно с помощью отправки сообщений в специальный канал done. В нашем случае отправляется два значения, так как есть вероятность двух блокировок:

func main() {
    in := gen(2, 3)

    // Распределяем работу `sq` между двумя go-рутинами 
    // которые считывают данные из `in`.
    c1 := sq(in)
    c2 := sq(in)

    // Получаем первое значение из выходного канала.
    done := make(chan struct{}, 2)
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // Сообщаем отправителям, что мы закончили.
    done <- struct{}{}
    done <- struct{}{}
}

Отправка в go-рутине в merge немного изменена, туда добавилась конструкция select. Такая структура будет работать, если можно отправить данные в канал out или можно получить данные из done. Тип, который используются для отправки/получения в канале done, пустая структура. Это самый "легкий" тип, который работает как индикатор, что отправка должна прерваться. В output go-рутине продолжает выполняться цикл по входящим каналам c и верхние этапы не блокируются. Позже мы рассмотрим как совсем остановить этот цикл.

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Готовим go-рутину для каждого входного канала из `cs`. В этой
    // рутине копируются значения из `c` пока он не закроется. Или
    // принимаются значение из `done`. Затем вызывается `wg.Done()`
    output := func(c <-chan int) {
        for n := range c {
            select {
            case out <- n:
            case <-done:
            }
        }
        wg.Done()
    }
    // ... остальное не меняется ...

Такой подход тоже имеет некоторые недостатки. Каждый приемник, который находится на нижнем уровне, должен знать, сколько потенциально заблокированных отправителей находятся на верхнем уровне и передавать сигнал завершения для конкретных отправителей. Однако, подсчет количества отправителей это утомительная задача. К тому же, такой подсчет подвержен ошибкам.

Нам нужен способ сообщить неизвестному числу go-рутин о прекращении передачи значений на более низкий уровень. В Go это можно реализовать путем закрытия канала. Операция получения значения на закрытом канале выполняется немедленно и всегда возвращает нулевое значение.

Это означает, что main может разблокировать всех отправителей просто закрыв канал done. Это напоминает отправку широковещательного сообщения. Мы расширим наши конвейерные функции так, чтобы они принимали канал done как параметр и организуем закрытие этого канала с помощью defer выражение, которое сработает как только завершится main. Закрытие этого канала будет сигналом для остановки конвейера.

func main() {
    // Подготавливаем канал `done`, который будет общим для всего
    // конвейера и закрытие этого канала с помощью `defer` будет 
    // сигналом завершения для всех go-рутин.
    done := make(chan struct{})
    defer close(done)

    in := gen(done, 2, 3)

    // Распределяем `sq` между двумя go-рутинами, 
    // которые считывают данные из `in`.
    c1 := sq(done, in)
    c2 := sq(done, in)

    // Забираем первое значение из `output`.
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 или 9

    // Будет вызвано отложенное закрытие канала.
}

Теперь каждый шаг нашего конвейера может быть завершенным, как только done буде закрыт. Go-рутина output в функции merge может завершится без полной выборки данных из входного канала так как, она "знает", что отправитель на более верхнем уровне(sq) прекратил посылать данные как только done закрылся. output обеспечивает вызов wg.Done с помощью defer.

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Готовим go-рутину для каждого входного канала из `cs`. В этой
    // рутине копируются значения из `c` пока он или `done` не закроются.
    // Затем вызывается `wg.Done()`
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }
    // ... остальной код без изменений ...

Кроме того, даже функция sq может выйти, как только done будет закрыт. sq реализует закрытие канала out с помощью все того же defer:

func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}

Сформулируем основные принципы построения конвейеров:

  • Исходящие каналы закрываются на своем этапе когда вся отправка завершена.
  • На всех этапах происходит получение из каналов пока эти каналы не закрыты и отправители не заблокированы.

В конвейер поддерживается неблокируемость или через использование буферизированного канала, или с помощью отправки сообщения, что нам больше не нужно получать сообщения.

Хеширование файлов

Давайте рассмотрим более реалистичный пример конвейера.

MD5 это алгоритм для создания "отпечатков" или дайджестов сообщений(message digest). Этот алгоритм можно использовать для создания проверочной суммы файлов. Консольная программа md5sum выводит хешированные значения для списка файлов.

% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

Наша программа будет работать аналогично md5sum, только в качестве аргумента будет принимать директорию и выводить хеши для всех файлов в директории с сортировкой по имени.

% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

Функция main в нашем приложении использует вспомогательную функцию MD5All, которая возвращает map в котором имена файла это ключи, а хеши это значения. Затем этот map сортируется и в консоль выводится результат:

func main() {
    // Считаем MD5 хеш для всех файлов в указанной директории,
    // затем отображение отсортированных результатов.
    m, err := MD5All(os.Args[1])
    if err != nil {
        fmt.Println(err)
        return
    }
    var paths []string
    for path := range m {
        paths = append(paths, path)
    }
    sort.Strings(paths)
    for _, path := range paths {
        fmt.Printf("%x  %s\n", m[path], path)
    }
}

Функция MD5All этот самое интересное в нашем примере. В файле serial.go эта функция реализована без использования конкурентности и просто получает хеш для каждого файла из дерева.


// MD5All читает все файлы в дереве с помощью `filepath.Walk` начиная с `root` // и возвращает `map` в котором ключи это путь к файлу, а значения - // хеш содержимого. Если при обходе директории или чтении файла // возникает ошибка, то она возвращается из функции. func MD5All(root string) (map[string][md5.Size]byte, error) { m := make(map[string][md5.Size]byte) err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } data, err := ioutil.ReadFile(path) if err != nil { return err } m[path] = md5.Sum(data) return nil }) if err != nil { return nil, err } return m, nil }

Параллельный обход

В parallel.go, переделанная функция MD5All которая работает как двухуровневый конвейер. Первый этап это функция sumFiles в которой реализован обход по дереву, хеширование каждого файла в отдельной go-рутине и отправка результата в канал в виде значения типа result:

type result struct {
    path string
    sum  [md5.Size]byte
    err  error
}

sumFiles возвращает два канала. Первый это канал для результатов results. И второй - для ошибок работы filepath.Walk. Функция обхода запускает новую функцию для обработки файла, затем проверяет done. Если канал done закрыт функция обхода завершается немедленно:

func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
    // Для каждого файла запускается новая go-рутина, которая подсчитывает
    // хеш и отправляет результат в `c`. Отправляем результат 
    // `filepath.Walk` в `errc`.
    c := make(chan result)
    errc := make(chan error, 1)
    go func() {
        var wg sync.WaitGroup
        err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            wg.Add(1)
            go func() {
                data, err := ioutil.ReadFile(path)
                select {
                case c <- result{path, md5.Sum(data), err}:
                case <-done:
                }
                wg.Done()
            }()
            // Завершаем обход, если канал `done` закрыт.
            select {
            case <-done:
                return errors.New("walk canceled")
            default:
                return nil
            }
        })
        // Обход закончен. Это значит все вызовы `wg.Add` завершены. 
        // Запускаем go-рутину для закрытия канала `c` как только 
        // все результаты отправлены.
        go func() {
            wg.Wait()
            close(c)
        }()
        // `errc` буферизированный канал.
        errc <- err
    }()
    return c, errc
}

MD5All получает хеши из c. И MD5All завершается немедленно в случае получения ошибки и закрывает канал done с помощью defer:

func MD5All(root string) (map[string][md5.Size]byte, error) {
    // `MD5All` закрывает канал `done` когда завершается. 
    // Это может произойти до получения 
    // всех значений из `c` и `errc`.
    done := make(chan struct{})
    defer close(done)

    c, errc := sumFiles(done, root)

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

Ограниченный параллелизм

Функция MD5All в из файла parallel.go запускает новую go-рутину для хеширования каждого файла. В папке с большим количеством файлов это может вызвать проблемы с потреблением памяти.

Мы можем сократить выделение памяти с помощью ограничения количества параллельно обрабатываемых файлов. В bounded.go мы реализуем этот подход, создавая фиксированное количество go-рутин для чтения файлов. Наш конвейер теперь будет трехэтапный: обходим дерево, считаем хеши файлов, и собираем эти хеши.

Первый этап - функция walkFiles которая собирает пути файлов в дереве:

func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
    paths := make(chan string)
    errc := make(chan error, 1)
    go func() {
        // Закрываем канал для путей после обхода дерева.
        defer close(paths)
        // Канал `errc` буферизированный.
        errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            select {
            case paths <- path:
            case <-done:
                return errors.New("walk canceled")
            }
            return nil
        })
    }()
    return paths, errc
}

На втором шаге стартует функция digester запускает фиксированное число хеширующих go-рутин, которые получают имена файлов из paths и отправляют результаты results в канал c:

func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
    for path := range paths {
        data, err := ioutil.ReadFile(path)
        select {
        case c <- result{path, md5.Sum(data), err}:
        case <-done:
            return
        }
    }
}

В отличии от предыдущего примера, функция digester не закрывает свой выходной канал после того как go-рутины отправят всю информацию. Вместо этого, функция MD5All обеспечивает закрытие всех каналов после завершения всех запущенных digester:

    // Запускаем фиксированное количество go-рутин 
    // для чтения и хеширования файлов
    c := make(chan result)
    var wg sync.WaitGroup
    const numDigesters = 20
    wg.Add(numDigesters)
    for i := 0; i < numDigesters; i++ {
        go func() {
            digester(done, paths, c)
            wg.Done()
        }()
    }
    go func() {
        wg.Wait()
        close(c)
    }()

Мы могли бы делать отдельный канал для каждого вызова digester. Но тогда нам понадобилась бы еще одна go-рутина для fan-in.

На последнем этапе мы собираем все results из канала c, затем проверяем наличие ошибок в канале errc. Мы не можем проверить канал errc раньше, до этого места, потому что walkFiles просто заблокирует отправку сообщений:

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    // Check whether the Walk failed.
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

Заключения

В этой статье показаны технологии обработки данных с использованием конвейеров в Go. Отмена работы может оказаться нетривиальной задачей, так как на каждом этапе могут возникнуть блокировки и следующие этапы не смогут получить данные. В статье был показан пример как закрытие канала может транслировать сигнал "done" для всех go-рутин запущенных в потоке.

Что почитать:

  • Go Concurrency Patterns (видео) презентация базовых примитивов конкурентного программирования на Go и несколько способов их применения.
  • Роб Пайк. Advanced Go Concurrency Patterns (видео) рассматриваются более комплексные примитивы.
  • Статья Squinting at Power Series от Douglas McIlroy's в которой показано, как элегпнтно можно использовать Go конкурентность для комплексных вычислений.