Погружаемся в каналы

33 minute read

Перевод статьи “Diving Deep Into The Golang Channels".

В этой статье поговорим о реализации каналов в Go и связанных с ними операциях.

Конкурентность в Go это больше чем просто синтаксис. Это паттерн. Паттерн это способ решения типичных проблем при работе с конкурентностью когда необходима синхронизация.

В Go используется CSP(Communicating Sequential Processes) модель конкуренции и синхронизация достигается через использование каналов. Главная философия конкурентности в Go выражена одной фразой:

Не сообщайтесь через разделение памяти. Вместо этого разделяйте память для сообщения.

Go надеется, что вы будете поступать правильно. Поэтому в статье рассмотрим как можно следовать этой философии используя каналы.

Что такое каналы

1func goRoutineA(a <-chan int) {
2    val := <-a
3    fmt.Println("goRoutineA received the data", val)
4}
5func main() {
6    ch := make(chan int)
7    go goRoutineA(ch)
8    time.Sleep(time.Second * 1)
9}

Горутины блокируются на каналах до получения сообщения из канала. Канал должен уметь разблокировать рутины.

Небольшое отступлении. Если вы не очень хорошо понимаете как работает планировщик в Go, то вам стоит прочитать эту замечательную статью: https://morsmachine.dk/go-scheduler.

Структура канала

В Go структура канала(channel) это основной способ передачи сообщений между каналами. Как же эта структура выглядит после инициализации?

1ch := make(chan int, 3)
chan int {
    qcount: 0,
    dataqsiz: 3,
    buf: *[3]int [0,0,0],
    elemsize: 8,
    closed: 0,
    elemtype: *runtime._type {
        size: 8,
        ptrdata: 0,
        hash: 4149441018,
        tflag: tflagUncommon|tflagExtraStar|tflagNamed,
        align: 8,
        fieldalign: 8,
        kind: 130,
        alg: *(*runtime.typeAlg)(0x568eb0),
        gcdata: *1,
        str: 1015,
        ptrToThis: 45376,
    },
    sendx: 0,
    recvx: 0,
    recvq: waitq<int> {
        first: *sudog<int> nil,
        lsat: *sudog<int> nil,
    },
    sendq: waitq<int> {
        first: *sudog<int> nil,
        last: *sudog<int> nil,
    },
    lock: runtime.mutex {key:0},
}

Выглядит неплохо, неплохо. Но что это все значит? И откуда берется эта структура? Давайте рассмотрим несколько важных типов до того как двигаться дальше.

hchan

Когда мы пишем make(chan int, 2) то создается экземпляр структуры hchan с такими полями:

 1type hchan struct {
 2	qcount   uint           // total data in the queue
 3	dataqsiz uint           // size of the circular queue
 4	buf      unsafe.Pointer // points to an array of dataqsiz elements
 5	elemsize uint16
 6	closed   uint32
 7	elemtype *_type // element type
 8	sendx    uint   // send index
 9	recvx    uint   // receive index
10	recvq    waitq  // list of recv waiters
11	sendq    waitq  // list of send waiters
12
13	// lock protects all fields in hchan, as well as several
14	// fields in sudogs blocked on this channel.
15	//
16	// Do not change another G's status while holding this lock
17	// (in particular, do not ready a G), as this can deadlock
18	// with stack shrinking.
19	lock mutex
20}
21
22type waitq struct {
23	first *sudog
24	last  *sudog
25}

рассмотрим какие поля что обозначают в этой структуре:

dataqsize это размер буфера который мы указали при создании канала. elemsize размер одного элемента в канале buf циклическая очередь(циклический буфер) где сохраняются данные. Используется только в буферизированных каналах. closed индикатор закрытого канала. При создании канала это поле 0. После вызова close в это поле устанавливается 1. sendx и recvx это поля для сохранения состояния буфера. Они указывают на позиции в массиве откуда должна происходить отправка или куда должны попадать новые данные. recvq и sendq очереди заблокированных горутин, которые ожидают отправки в канал или чтение из него. lock все отправки и получения должны быть защищены блокировкой

Пока еще непонятно, что за тип sudog.

sudog

sudog это представление горутины которая стоит в очереди

 1type sudog struct {
 2	// The following fields are protected by the hchan.lock of the
 3	// channel this sudog is blocking on. shrinkstack depends on
 4	// this for sudogs involved in channel ops.
 5
 6	g *g
 7
 8	// isSelect indicates g is participating in a select, so
 9	// g.selectDone must be CAS'd to win the wake-up race.
10	isSelect bool
11	next     *sudog
12	prev     *sudog
13	elem     unsafe.Pointer // data element (may point to stack)
14
15	// The following fields are never accessed concurrently.
16	// For channels, waitlink is only accessed by g.
17	// For semaphores, all fields (including the ones above)
18	// are only accessed when holding a semaRoot lock.
19
20	acquiretime int64
21	releasetime int64
22	ticket      uint32
23	parent      *sudog // semaRoot binary tree
24	waitlink    *sudog // g.waiting list or semaRoot
25	waittail    *sudog // semaRoot
26	c           *hchan // channel
27}

Давайте немного расширим наш пример с каналом и шаг за шагом посмотрим как он работает. Разберемся что делает его таким мощным инструментом.

 1func goRoutineA(a <-chan int) {
 2    val := <-a
 3    fmt.Println("goRoutineA received the data", val)
 4}
 5
 6func goRoutineB(a <-chan int) {
 7    val := <-a
 8    fmt.Println("goRoutineB received the data", val)
 9}
10
11func main() {
12    ch := make(chan int)
13    go goRoutineA(ch)
14    go goRoutineB(ch)
15    ch <- 3
16    time.Sleep(time.Second * 1)
17}

Какой теперь станет структура канала? Что поменяется?

chan int {
    qcount: 0,
    dataqsiz: 0,
    buf: *[0]int [],
    elemsize: 8,
    closed: 0,
    elemtype: *runtime._type {
        size: 8,
        ptrdata: 0,
        hash: 4149441018,
        tflag: tflagUncommon|tflagExtraStar|tflagNamed,
        align: 8,
        fieldalign: 8,
        kind: 130,
        alg: *(*runtime.typeAlg)(0x568eb0),
        gcdata: *1,
        str: 1015,
        ptrToThis: 45376,
    },
    sendx: 0,
    recvx: 0,
    recvq: waitq<int> {
        first: *(*sudog<int>)(0xc000088000),
        last: *(*sudog<int>)(0xc000088060),
    },
    sendq: waitq<int> {
        first: *sudog<int> nil,
        last: *sudog<int> nil,
    },
    lock: runtime.mutex {key:0},
}

Обратите внимание на поля recvq.first и recvq.last. recvq сейчас содержит заблокированные горутины. В нашем примере goroutineA и goroutineB сначала пытаются читать данные из ch. Но там ничего нет пока мы не пошлем в канал первые данные(ch <- 3), поэтому горутины блокируются на операции чтения и в поля recvq.first и recvq.last сетятся объекты типа sudog представляющие эти рутины.

По сути recvq и sendq это связанные списки, которые работают как показано на картинке:

Эти структуры играют важную роль в работе каналов. Давайте разберемся, что происходит при отправке данных в канал.

Отправка в канал

Во время отправки могут возникнуть разные ситуации: отправка в закрытый канал, неинициализированный и так далее. Давайте разберемся со всеми случаями.

Отправка в nil канал

1if c == nil {
2    // ...
3    gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
4    throw("unreachable")
5}

Если мы попытаемся отправить данные в nil канал то горутина приостановит свою работу.

Отправка в закрытый канал

1if c.closed != 0 {
2    unlock(&c.lock)
3    panic(plainError("send on closed channel"))
4}

При отправке в закрытый канал мы получим панику.

Горутина блокируется. Данные передаются самой горутине

1if sg := c.recvq.dequeue(); sg != nil {
2    send(c, sg, ep, func() {unlock(&c.lock)}, 3)
3    return true
4}

В этом месте recvqиграет очень важную роль. Если в recvq нет других рутин, то его можно считать ожидающим получателем и ближайшая операция записи сразу передает значение этому получателю. Это реализовано в функции send().

 1func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
 2    // ...
 3    if sg.elem != nil {
 4        sendDirect(c.elementype, sg, ep)
 5        sg.elem = nil
 6    }
 7    gp := sg.g
 8    gp.param = unsafe.Pointer(sg)
 9    if sg.releasetime != 0 {
10        ag.releasetime = cputicks()
11    }
12    goready(gp, skip+1)
13}

Обратите внимание на goready(gp, skip+1). Горутина которая заблокировалась до получения данных продолжит свою работу после вызова goready - планировщик снова ее запустит.

Отправка в буфферизированный канал когда в буффере еще есть место

 1if c.qcount < c.dataqsiz {
 2    // Space is available in the channel buffer. Enqueue the element to send.
 3    qp := chanbuf(c, c.sendx)
 4    if raceenabled {
 5        raceacquire(qp)
 6        racerelease(qp)
 7    }
 8    typedmemmove(c.elemtype, qp, ep)
 9    c.sendx++
10    if c.sendx == c.dataqsiz {
11        c.sendx = 0
12    }
13    c.qcount++
14    unlock(&c.lock)
15    return true
16}

chanbuf(c, i) - доступ к нужному куску памяти. Чтобы определить есть ли свободное место сравниваем qcount и dataqsiz. Елемент ставится в очередь через копирование памяти на которую указывает ep в буффер и нкремент счетчика sendx

Отправка в канал с заполненным буффером

 1// Block on the channel. Some receiver will complete our operation for us.
 2gp := getg()
 3mysg := acquireSudog()
 4mysg.releasetime = 0
 5if t0 != 0 {
 6    mysg.releasetime = -1
 7}
 8// No stack splits between assigning elem and enqueuing mysg
 9// on gp.waiting where copystack can find it.
10mysg.elem = ep
11mysg.waitlink = nil
12mysg.g = gp
13mysg.isSelect = false
14mysg.c = c
15gp.waiting = mysg
16gp.param = nil
17c.sendq.enqueue(mysg)
18goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)

Получаем объек горутины в текущем стеке. С помощью acquireSudog паркуем горутину и добавлем ее sendq канала.

Отправка в канал. Вводы

  • Внутри канала активно используется структура lock
  • Запись может происходить напрямую через выбор ожидающей горутины из recvq и передачу сообщения непосредственно ей.
  • Если очередт с горутинами пустая, то пытаемся записать сообщение в буффер если он доступен и там есть место. Запись происходит через копирование данных из горутины в буфер.
  • Если буфер заполнен, то данные сохраняются в структуре текущей горутины и горутина блокируется и ставится в очередь sendq

Обратите внимание на последний пункт. Это актуально для небуферезированных каналов, даже если к них есть полу buf. При отправке сообщения в такой канал оно сохранится в поле elem структуры sudog.

Давайте рассмотрим еще один пример:

 1package main
 2
 3func goroutineA(c2 chan int) {
 4    c2 <- 2
 5}
 6
 7func main() {
 8    c2 := make(chan int)
 9    go goroutineA(c2)
10
11    for {
12    }
13}

Как выглядит структура c2 в рантайме?

chan int {
    qcount: 0,
    dataqsiz: 0,
    buf: *[0]int [],
    elemsize: 8,
    closed: 0,
    elemtype: *runtime._type {
        size: 8,
        ptrdata: 0,
        hash: 4149441018,
        tflag: tflagUncommon|tflagExtraStar|tflagNamed,
        align: 8,
        fieldalign: 8,
        kind: 130,
        alg: *(*runtime.typeAlg)(0x4bff90),
        gcdata: *1,
        str: 775,
        ptrToThis: 28320,
    },
    sendx: 0,
    recvx: 0,
    recvq: waitq<int> {
        first: *sudog<int> nil,
        last: *sudog<int> nil,
    },
    sendq: waitq<int> {
        first: *(*sudog<int>)(0xc000074000),
        last: *(*sudog<int>)(0xc000074000),
    },
    lock: runtime.mutex {key:0},
}

Видно что когда мы отправляем в канал новое значение 2, оно не попадает в буфер. Это значение сохраняется в структуре sudog. Когда горутина goroutineA пытается отправить сообщение в канал - еще нет нт одного получателя. Горутина попадает в список sendq и блокируется. Можно посмотреть как выглядит структура sendq в рантайме:

p c2.sendq
waitq<int> {
    first: *sudog<int> {
        g: *(*runtime.g)(0xc000001080),
        isSelect: false,
        next: *runtime.sudog nil,
        prev: *runtime.sudog nil,
        elem: 2,
        acquiretime: 0,
	    releasetime 0,
        ticket: 0,
        parent: *runtime.sudog nil,
        waitlink: *runtime.sudog nil,
        waittail: *runtime.sudog nil,
        c: *(*runtime.hchan)(0xc00001e120),
    }
}

Все значения в канал передаются по значению. Это важно запомнить. Давайте расмотрим такой пример.

 1type user struct {
 2    name string
 3    age int8
 4}
 5
 6var u = user{name:"Anku", age:25}
 7var g := &g
 8
 9func modifyUser(pu *user) {
10    fmt.Println("modifyUser Receive Value", pu)
11    ou.name = "Anand"
12}
13
14func printUser(u <-chan *user) {
15    time.Sleep(2 * time.Second)
16    fmt.Println("printUser goroutine called", <-u)   
17}
18
19func main() {
20    c := make(chan *user, 5)
21    c <- g
22    fmt.Println(g)
23    // modify g
24    g := &user{name: "Ankur Anand", age:100}
25    go printUser(c)
26    go modifyUser(g)
27    time.Sleep(time.Second * 5)
28    fmt.Println(g)
29}

Что выведет эта программа? Значения передаются через копирование. В нашем случае в канал будет скопировано значение g.

Не сообщайтесь через разделение памяти. Вместо этого разделяйте память для сообщения

Вот что выведет программа:

&{Ankur 25}
modifyUser Received Value &{Ankur Anand 100}
printUser goRoutine called &{Ankur 25}
&{Anand 100}

Операции чтения из канала

Операция чтения очень похожа на запись.

 1func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
 2	// raceenabled: don't need to check ep, as it is always on the stack
 3	// or is new memory allocated by reflect.
 4
 5	if debugChan {
 6		print("chanrecv: chan=", c, "\n")
 7	}
 8
 9	if c == nil {
10		if !block {
11			return
12		}
13		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
14		throw("unreachable")
15    }
16    // ...
17    lock(&c.lock)
18
19	if c.closed != 0 && c.qcount == 0 {
20		if raceenabled {
21			raceacquire(c.raceaddr())
22		}
23		unlock(&c.lock)
24		if ep != nil {
25			typedmemclr(c.elemtype, ep)
26		}
27		return true, false
28	}
29
30	if sg := c.sendq.dequeue(); sg != nil {
31		// Found a waiting sender. If buffer is size 0, receive value
32		// directly from sender. Otherwise, receive from head of queue
33		// and add sender's value to the tail of the queue (both map to
34		// the same buffer slot because the queue is full).
35		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
36		return true, true
37	}
38
39	if c.qcount > 0 {
40        qp := chanbuf(c, c.recvx)
41        //...
42        if ep != nil {
43			typedmemmove(c.elemtype, ep, qp)
44		}
45		typedmemclr(c.elemtype, qp)
46		c.recvx++
47		if c.recvx == c.dataqsiz {
48			c.recvx = 0
49		}
50		c.qcount--
51		unlock(&c.lock)
52		return true, true
53    }
54    // ...
55    // no sender available: block on this channel.
56	gp := getg()
57	mysg := acquireSudog()
58	mysg.releasetime = 0
59	if t0 != 0 {
60		mysg.releasetime = -1
61	}
62	// No stack splits between assigning elem and enqueuing mysg
63	// on gp.waiting where copystack can find it.
64	mysg.elem = ep
65	mysg.waitlink = nil
66	gp.waiting = mysg
67	mysg.g = gp
68	mysg.isSelect = false
69	mysg.c = c
70	gp.param = nil
71	c.recvq.enqueue(mysg)
72    goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
73    // ...
74}

Select

Объединение нескольких каналов.

 1ch := make(chan int, 5)
 2chs := make(chan string, 5)
 3select {
 4    case msg := <- ch:
 5        fmt.Println("receive message", msg)
 6    case msgs := <- chs:
 7        fmt.Println("receive message", msgs)
 8    default:
 9        fmt.Println("no message received")
10}

Операции взаимно исключающие. Значит, нам нужно использовать блокировку всех используемых в селекте каналов. Блокировки приобретаются в зависимости от текущего кейса, а это значит что каналы блокируются не одновременно.

sellock(scases, lockorder)

Каждый scase в массиве scases это структура, которая содержит данные по операции в текущем кейсе и канал, для которого эта операция будет выполнятся.

1type scase struct {
2    c           *hchan
3    elem        unsafe.Pointer
4    kind        uint16
5    pc          uintptr
6    releasetime int64
7}

kind это тип операции в кейсе и он может быть CaseRecv, CaseSend и CaseDefault.

Порядок опроса расчитывается так чтобы задействовать каналы в псевдо случайном порядке. После этого каналы опрашиваются в расчитаном порядке. То как в каком порядке вы напишете кейсы в программе не имеет значения.

Генерация порядка опроса:

1for i := 1; i < ncases; i++ {
2    j := fastrandn(uint32(i+1))
3    pollorder[i] = pollorder[j]
4    pollorder[j] = uint16(i)
5}

Непосредственно проход по очереди опроса:

 1for i := 0; i < ncases; i++ {
 2    casi = int(pollorder[i])
 3    cas = &scases[casi]
 4    c = cas.c
 5
 6    switch cas.kind {
 7    case caseNil:
 8        continue
 9
10    case caseRecv:
11        sg = c.sendq.dequeue()
12        if sg != nil {
13            goto recv
14        }
15        if c.qcount > 0 {
16            goto bufrecv
17        }
18        if c.closed != 0 {
19            goto rclose
20        }
21
22    case caseSend:
23        if raceenabled {
24            racereadpc(c.raceaddr(), cas.pc, chansendpc)
25        }
26        if c.closed != 0 {
27            goto sclose
28        }
29        sg = c.recvq.dequeue()
30        if sg != nil {
31            goto send
32        }
33        if c.qcount < c.dataqsiz {
34            goto bufsend
35        }
36
37    case caseDefault:
38        dfli = casi
39        dfl = cas
40    }
41}

select может сработать без блокировки если в канале еть данные. Даже без прохода по всем каналам.

Если нет ни одного готового канала и кейса default то горутина g блокируется попадает в очередь доступности одного из каналов.

 1gp = getg()
 2// ...
 3for _, casei := range lockorder {
 4    casi = int(casei)
 5    cas = &scases[casi]
 6    if cas.kind == caseNil {
 7        continue
 8    }
 9    c = cas.c
10    sg := acquireSudog()
11    sg.g = gp
12    sg.isSelect = true
13    // ...
14    switch cas.kind {
15    case caseRecv:
16        c.recvq.enqueue(sg)
17
18    case caseSend:
19        c.sendq.enqueue(sg)
20    }
21}

Поле ag.isSelect указывает что горутина участвует в селекте

Операции получения, отправки и закрытия в селекте аналогичны обычным операциям получения, отправки и закрытия.

Заключение

Каналы это очень мощный и интересный механизм в Go. Но чтобы правильно их использовать нужно знать как они устроены. Надеюсь что в этой статье удалось опиать базовые принципы работы каналов.

comments powered by Disqus