Погружаемся в каналы
Перевод статьи “Diving Deep Into The Golang Channels".
В этой статье поговорим о реализации каналов в Go и связанных с ними операциях.
Конкурентность в Go это больше чем просто синтаксис. Это паттерн. Паттерн это способ решения типичных проблем при работе с конкурентностью когда необходима синхронизация.
В Go используется CSP(Communicating Sequential Processes) модель конкуренции и синхронизация достигается через использование каналов. Главная философия конкурентности в Go выражена одной фразой:
Не сообщайтесь через разделение памяти. Вместо этого разделяйте память для сообщения.
Go надеется, что вы будете поступать правильно. Поэтому в статье рассмотрим как можно следовать этой философии используя каналы.
Что такое каналы
1func goRoutineA(a <-chan int) {
2 val := <-a
3 fmt.Println("goRoutineA received the data", val)
4}
5func main() {
6 ch := make(chan int)
7 go goRoutineA(ch)
8 time.Sleep(time.Second * 1)
9}
Горутины блокируются на каналах до получения сообщения из канала. Канал должен уметь разблокировать рутины.
Небольшое отступлении. Если вы не очень хорошо понимаете как работает планировщик в Go, то вам стоит прочитать эту замечательную статью: https://morsmachine.dk/go-scheduler.
Структура канала
В Go структура канала(channel) это основной способ передачи сообщений между каналами. Как же эта структура выглядит после инициализации?
1ch := make(chan int, 3)
chan int {
qcount: 0,
dataqsiz: 3,
buf: *[3]int [0,0,0],
elemsize: 8,
closed: 0,
elemtype: *runtime._type {
size: 8,
ptrdata: 0,
hash: 4149441018,
tflag: tflagUncommon|tflagExtraStar|tflagNamed,
align: 8,
fieldalign: 8,
kind: 130,
alg: *(*runtime.typeAlg)(0x568eb0),
gcdata: *1,
str: 1015,
ptrToThis: 45376,
},
sendx: 0,
recvx: 0,
recvq: waitq<int> {
first: *sudog<int> nil,
lsat: *sudog<int> nil,
},
sendq: waitq<int> {
first: *sudog<int> nil,
last: *sudog<int> nil,
},
lock: runtime.mutex {key:0},
}
Выглядит неплохо, неплохо. Но что это все значит? И откуда берется эта структура? Давайте рассмотрим несколько важных типов до того как двигаться дальше.
hchan
Когда мы пишем make(chan int, 2)
то создается экземпляр структуры hchan
с такими полями:
1type hchan struct {
2 qcount uint // total data in the queue
3 dataqsiz uint // size of the circular queue
4 buf unsafe.Pointer // points to an array of dataqsiz elements
5 elemsize uint16
6 closed uint32
7 elemtype *_type // element type
8 sendx uint // send index
9 recvx uint // receive index
10 recvq waitq // list of recv waiters
11 sendq waitq // list of send waiters
12
13 // lock protects all fields in hchan, as well as several
14 // fields in sudogs blocked on this channel.
15 //
16 // Do not change another G's status while holding this lock
17 // (in particular, do not ready a G), as this can deadlock
18 // with stack shrinking.
19 lock mutex
20}
21
22type waitq struct {
23 first *sudog
24 last *sudog
25}
рассмотрим какие поля что обозначают в этой структуре:
dataqsize это размер буфера который мы указали при создании канала.
elemsize размер одного элемента в канале
buf циклическая очередь(циклический буфер) где сохраняются данные. Используется только в буферизированных каналах.
closed индикатор закрытого канала. При создании канала это поле 0. После вызова close
в это поле устанавливается 1.
sendx и recvx это поля для сохранения состояния буфера. Они указывают на позиции в массиве откуда должна происходить отправка или куда должны попадать новые данные.
recvq и sendq очереди заблокированных горутин, которые ожидают отправки в канал или чтение из него.
lock все отправки и получения должны быть защищены блокировкой
Пока еще непонятно, что за тип sudog
.
sudog
sudog это представление горутины которая стоит в очереди
1type sudog struct {
2 // The following fields are protected by the hchan.lock of the
3 // channel this sudog is blocking on. shrinkstack depends on
4 // this for sudogs involved in channel ops.
5
6 g *g
7
8 // isSelect indicates g is participating in a select, so
9 // g.selectDone must be CAS'd to win the wake-up race.
10 isSelect bool
11 next *sudog
12 prev *sudog
13 elem unsafe.Pointer // data element (may point to stack)
14
15 // The following fields are never accessed concurrently.
16 // For channels, waitlink is only accessed by g.
17 // For semaphores, all fields (including the ones above)
18 // are only accessed when holding a semaRoot lock.
19
20 acquiretime int64
21 releasetime int64
22 ticket uint32
23 parent *sudog // semaRoot binary tree
24 waitlink *sudog // g.waiting list or semaRoot
25 waittail *sudog // semaRoot
26 c *hchan // channel
27}
Давайте немного расширим наш пример с каналом и шаг за шагом посмотрим как он работает. Разберемся что делает его таким мощным инструментом.
1func goRoutineA(a <-chan int) {
2 val := <-a
3 fmt.Println("goRoutineA received the data", val)
4}
5
6func goRoutineB(a <-chan int) {
7 val := <-a
8 fmt.Println("goRoutineB received the data", val)
9}
10
11func main() {
12 ch := make(chan int)
13 go goRoutineA(ch)
14 go goRoutineB(ch)
15 ch <- 3
16 time.Sleep(time.Second * 1)
17}
Какой теперь станет структура канала? Что поменяется?
chan int {
qcount: 0,
dataqsiz: 0,
buf: *[0]int [],
elemsize: 8,
closed: 0,
elemtype: *runtime._type {
size: 8,
ptrdata: 0,
hash: 4149441018,
tflag: tflagUncommon|tflagExtraStar|tflagNamed,
align: 8,
fieldalign: 8,
kind: 130,
alg: *(*runtime.typeAlg)(0x568eb0),
gcdata: *1,
str: 1015,
ptrToThis: 45376,
},
sendx: 0,
recvx: 0,
recvq: waitq<int> {
first: *(*sudog<int>)(0xc000088000),
last: *(*sudog<int>)(0xc000088060),
},
sendq: waitq<int> {
first: *sudog<int> nil,
last: *sudog<int> nil,
},
lock: runtime.mutex {key:0},
}
Обратите внимание на поля recvq.first
и recvq.last
. recvq
сейчас содержит заблокированные горутины. В нашем примере goroutineA
и goroutineB
сначала пытаются читать данные из ch
. Но там ничего нет пока мы не пошлем в канал первые данные(ch <- 3
), поэтому горутины блокируются на операции чтения и в поля recvq.first
и recvq.last
сетятся объекты типа sudog
представляющие эти рутины.
По сути recvq
и sendq
это связанные списки, которые работают как показано на картинке:
Эти структуры играют важную роль в работе каналов. Давайте разберемся, что происходит при отправке данных в канал.
Отправка в канал
Во время отправки могут возникнуть разные ситуации: отправка в закрытый канал, неинициализированный и так далее. Давайте разберемся со всеми случаями.
Отправка в nil канал
1if c == nil {
2 // ...
3 gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
4 throw("unreachable")
5}
Если мы попытаемся отправить данные в nil канал то горутина приостановит свою работу.
Отправка в закрытый канал
1if c.closed != 0 {
2 unlock(&c.lock)
3 panic(plainError("send on closed channel"))
4}
При отправке в закрытый канал мы получим панику.
Горутина блокируется. Данные передаются самой горутине
1if sg := c.recvq.dequeue(); sg != nil {
2 send(c, sg, ep, func() {unlock(&c.lock)}, 3)
3 return true
4}
В этом месте recvq
играет очень важную роль. Если в recvq
нет других рутин, то его можно считать ожидающим получателем и ближайшая операция записи сразу передает значение этому получателю. Это реализовано в функции send()
.
1func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
2 // ...
3 if sg.elem != nil {
4 sendDirect(c.elementype, sg, ep)
5 sg.elem = nil
6 }
7 gp := sg.g
8 gp.param = unsafe.Pointer(sg)
9 if sg.releasetime != 0 {
10 ag.releasetime = cputicks()
11 }
12 goready(gp, skip+1)
13}
Обратите внимание на goready(gp, skip+1)
. Горутина которая заблокировалась до получения данных
продолжит свою работу после вызова goready
- планировщик снова ее запустит.
Отправка в буфферизированный канал когда в буффере еще есть место
1if c.qcount < c.dataqsiz {
2 // Space is available in the channel buffer. Enqueue the element to send.
3 qp := chanbuf(c, c.sendx)
4 if raceenabled {
5 raceacquire(qp)
6 racerelease(qp)
7 }
8 typedmemmove(c.elemtype, qp, ep)
9 c.sendx++
10 if c.sendx == c.dataqsiz {
11 c.sendx = 0
12 }
13 c.qcount++
14 unlock(&c.lock)
15 return true
16}
chanbuf(c, i)
- доступ к нужному куску памяти. Чтобы определить есть ли свободное место сравниваем qcount
и dataqsiz
. Елемент ставится в очередь через копирование памяти на которую указывает ep
в буффер и нкремент счетчика sendx
Отправка в канал с заполненным буффером
1// Block on the channel. Some receiver will complete our operation for us.
2gp := getg()
3mysg := acquireSudog()
4mysg.releasetime = 0
5if t0 != 0 {
6 mysg.releasetime = -1
7}
8// No stack splits between assigning elem and enqueuing mysg
9// on gp.waiting where copystack can find it.
10mysg.elem = ep
11mysg.waitlink = nil
12mysg.g = gp
13mysg.isSelect = false
14mysg.c = c
15gp.waiting = mysg
16gp.param = nil
17c.sendq.enqueue(mysg)
18goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
Получаем объек горутины в текущем стеке. С помощью acquireSudog
паркуем горутину и добавлем ее sendq
канала.
Отправка в канал. Вводы
- Внутри канала активно используется структура
lock
- Запись может происходить напрямую через выбор ожидающей горутины из
recvq
и передачу сообщения непосредственно ей. - Если очередт с горутинами пустая, то пытаемся записать сообщение в буффер если он доступен и там есть место. Запись происходит через копирование данных из горутины в буфер.
- Если буфер заполнен, то данные сохраняются в структуре текущей горутины и горутина блокируется и ставится в очередь
sendq
Обратите внимание на последний пункт. Это актуально для небуферезированных каналов, даже если к них есть полу buf
. При отправке сообщения в такой канал оно сохранится в поле elem
структуры sudog
.
Давайте рассмотрим еще один пример:
1package main
2
3func goroutineA(c2 chan int) {
4 c2 <- 2
5}
6
7func main() {
8 c2 := make(chan int)
9 go goroutineA(c2)
10
11 for {
12 }
13}
Как выглядит структура c2
в рантайме?
chan int {
qcount: 0,
dataqsiz: 0,
buf: *[0]int [],
elemsize: 8,
closed: 0,
elemtype: *runtime._type {
size: 8,
ptrdata: 0,
hash: 4149441018,
tflag: tflagUncommon|tflagExtraStar|tflagNamed,
align: 8,
fieldalign: 8,
kind: 130,
alg: *(*runtime.typeAlg)(0x4bff90),
gcdata: *1,
str: 775,
ptrToThis: 28320,
},
sendx: 0,
recvx: 0,
recvq: waitq<int> {
first: *sudog<int> nil,
last: *sudog<int> nil,
},
sendq: waitq<int> {
first: *(*sudog<int>)(0xc000074000),
last: *(*sudog<int>)(0xc000074000),
},
lock: runtime.mutex {key:0},
}
Видно что когда мы отправляем в канал новое значение 2
, оно не попадает в буфер. Это значение
сохраняется в структуре sudog
. Когда горутина goroutineA
пытается отправить сообщение в канал - еще нет нт одного получателя. Горутина попадает в список sendq
и блокируется. Можно посмотреть как выглядит структура sendq
в рантайме:
p c2.sendq
waitq<int> {
first: *sudog<int> {
g: *(*runtime.g)(0xc000001080),
isSelect: false,
next: *runtime.sudog nil,
prev: *runtime.sudog nil,
elem: 2,
acquiretime: 0,
releasetime 0,
ticket: 0,
parent: *runtime.sudog nil,
waitlink: *runtime.sudog nil,
waittail: *runtime.sudog nil,
c: *(*runtime.hchan)(0xc00001e120),
}
}
Все значения в канал передаются по значению. Это важно запомнить. Давайте расмотрим такой пример.
1type user struct {
2 name string
3 age int8
4}
5
6var u = user{name:"Anku", age:25}
7var g := &g
8
9func modifyUser(pu *user) {
10 fmt.Println("modifyUser Receive Value", pu)
11 ou.name = "Anand"
12}
13
14func printUser(u <-chan *user) {
15 time.Sleep(2 * time.Second)
16 fmt.Println("printUser goroutine called", <-u)
17}
18
19func main() {
20 c := make(chan *user, 5)
21 c <- g
22 fmt.Println(g)
23 // modify g
24 g := &user{name: "Ankur Anand", age:100}
25 go printUser(c)
26 go modifyUser(g)
27 time.Sleep(time.Second * 5)
28 fmt.Println(g)
29}
Что выведет эта программа? Значения передаются через копирование. В нашем случае в канал будет скопировано значение g
.
Не сообщайтесь через разделение памяти. Вместо этого разделяйте память для сообщения
Вот что выведет программа:
&{Ankur 25}
modifyUser Received Value &{Ankur Anand 100}
printUser goRoutine called &{Ankur 25}
&{Anand 100}
Операции чтения из канала
Операция чтения очень похожа на запись.
1func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
2 // raceenabled: don't need to check ep, as it is always on the stack
3 // or is new memory allocated by reflect.
4
5 if debugChan {
6 print("chanrecv: chan=", c, "\n")
7 }
8
9 if c == nil {
10 if !block {
11 return
12 }
13 gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
14 throw("unreachable")
15 }
16 // ...
17 lock(&c.lock)
18
19 if c.closed != 0 && c.qcount == 0 {
20 if raceenabled {
21 raceacquire(c.raceaddr())
22 }
23 unlock(&c.lock)
24 if ep != nil {
25 typedmemclr(c.elemtype, ep)
26 }
27 return true, false
28 }
29
30 if sg := c.sendq.dequeue(); sg != nil {
31 // Found a waiting sender. If buffer is size 0, receive value
32 // directly from sender. Otherwise, receive from head of queue
33 // and add sender's value to the tail of the queue (both map to
34 // the same buffer slot because the queue is full).
35 recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
36 return true, true
37 }
38
39 if c.qcount > 0 {
40 qp := chanbuf(c, c.recvx)
41 //...
42 if ep != nil {
43 typedmemmove(c.elemtype, ep, qp)
44 }
45 typedmemclr(c.elemtype, qp)
46 c.recvx++
47 if c.recvx == c.dataqsiz {
48 c.recvx = 0
49 }
50 c.qcount--
51 unlock(&c.lock)
52 return true, true
53 }
54 // ...
55 // no sender available: block on this channel.
56 gp := getg()
57 mysg := acquireSudog()
58 mysg.releasetime = 0
59 if t0 != 0 {
60 mysg.releasetime = -1
61 }
62 // No stack splits between assigning elem and enqueuing mysg
63 // on gp.waiting where copystack can find it.
64 mysg.elem = ep
65 mysg.waitlink = nil
66 gp.waiting = mysg
67 mysg.g = gp
68 mysg.isSelect = false
69 mysg.c = c
70 gp.param = nil
71 c.recvq.enqueue(mysg)
72 goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
73 // ...
74}
Select
Объединение нескольких каналов.
1ch := make(chan int, 5)
2chs := make(chan string, 5)
3select {
4 case msg := <- ch:
5 fmt.Println("receive message", msg)
6 case msgs := <- chs:
7 fmt.Println("receive message", msgs)
8 default:
9 fmt.Println("no message received")
10}
Операции взаимно исключающие. Значит, нам нужно использовать блокировку всех используемых в селекте каналов. Блокировки приобретаются в зависимости от текущего кейса, а это значит что каналы блокируются не одновременно.
sellock(scases, lockorder)
Каждый scase
в массиве scases
это структура, которая содержит данные по операции в текущем кейсе и канал, для которого эта операция будет выполнятся.
1type scase struct {
2 c *hchan
3 elem unsafe.Pointer
4 kind uint16
5 pc uintptr
6 releasetime int64
7}
kind
это тип операции в кейсе и он может быть CaseRecv
, CaseSend
и CaseDefault
.
Порядок опроса расчитывается так чтобы задействовать каналы в псевдо случайном порядке. После этого каналы опрашиваются в расчитаном порядке. То как в каком порядке вы напишете кейсы в программе не имеет значения.
Генерация порядка опроса:
1for i := 1; i < ncases; i++ {
2 j := fastrandn(uint32(i+1))
3 pollorder[i] = pollorder[j]
4 pollorder[j] = uint16(i)
5}
Непосредственно проход по очереди опроса:
1for i := 0; i < ncases; i++ {
2 casi = int(pollorder[i])
3 cas = &scases[casi]
4 c = cas.c
5
6 switch cas.kind {
7 case caseNil:
8 continue
9
10 case caseRecv:
11 sg = c.sendq.dequeue()
12 if sg != nil {
13 goto recv
14 }
15 if c.qcount > 0 {
16 goto bufrecv
17 }
18 if c.closed != 0 {
19 goto rclose
20 }
21
22 case caseSend:
23 if raceenabled {
24 racereadpc(c.raceaddr(), cas.pc, chansendpc)
25 }
26 if c.closed != 0 {
27 goto sclose
28 }
29 sg = c.recvq.dequeue()
30 if sg != nil {
31 goto send
32 }
33 if c.qcount < c.dataqsiz {
34 goto bufsend
35 }
36
37 case caseDefault:
38 dfli = casi
39 dfl = cas
40 }
41}
select
может сработать без блокировки если в канале еть данные. Даже без прохода по всем каналам.
Если нет ни одного готового канала и кейса default
то горутина g
блокируется попадает в очередь доступности одного из каналов.
1gp = getg()
2// ...
3for _, casei := range lockorder {
4 casi = int(casei)
5 cas = &scases[casi]
6 if cas.kind == caseNil {
7 continue
8 }
9 c = cas.c
10 sg := acquireSudog()
11 sg.g = gp
12 sg.isSelect = true
13 // ...
14 switch cas.kind {
15 case caseRecv:
16 c.recvq.enqueue(sg)
17
18 case caseSend:
19 c.sendq.enqueue(sg)
20 }
21}
Поле ag.isSelect
указывает что горутина участвует в селекте
Операции получения, отправки и закрытия в селекте аналогичны обычным операциям получения, отправки и закрытия.
Заключение
Каналы это очень мощный и интересный механизм в Go. Но чтобы правильно их использовать нужно знать как они устроены. Надеюсь что в этой статье удалось опиать базовые принципы работы каналов.