Go concurrency design




Gregory Vinčić, 2023


Content

  • background and history
  • goroutines
  • channels
  • package context
  • package sync
  • go test bench
  • problem definition
  • letters challenge
  • sequential solution
  • concurrent solution
  • fixing bugs
  • using channels
  • sync and interrupt
  • compare performance

Follow along by cloning the examples with

$ git clone git@github.com:preferit/cotalk.git
$ cd cotalk

Background and history

Sir Charles Antony Richard Hoare (Tony Hoare). Born 1934 in Sri Lanka, studied at Oxford and in Moscow. His research spanned program correctness, sorting and programming languages. His work is freely accessible online and the Go channel construct is his concept.

Goroutines

channels

package sync

package sync // import "sync"

Package sync provides basic synchronization primitives such as mutual exclusion
locks. Other than the Once and WaitGroup types, most are intended for use by
low-level library routines. Higher-level synchronization is better done via
channels and communication.

Values containing the types defined in this package should not be copied.

type Cond struct{ ... }
    func NewCond(l Locker) *Cond
type Locker interface{ ... }
type Map struct{ ... }
type Mutex struct{ ... }
type Once struct{ ... }
type Pool struct{ ... }
type RWMutex struct{ ... }
type WaitGroup struct{ ... }
pkg.go.dev/sync

package context

package context

...

Programs that use Contexts should follow these rules

Do not store Contexts inside a struct type; instead, pass a Context
explicitly to each function that needs it. The Context should be the
first parameter, typically named ctx:

func DoSomething(ctx context.Context, arg Arg) error {
	// ... use ctx ...
}

Do not pass a nil Context, even if a function permits it. Pass
context.TODO if you are unsure about which Context to use.

Use context Values only for request-scoped data that transits
processes and APIs, not for passing optional parameters to functions.
		
var Canceled = errors.New("context canceled")
var DeadlineExceeded error = deadlineExceededError{}
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)
type CancelFunc func()
type Context interface{ ... }
    func Background() Context
    func TODO() Context
    func WithValue(parent Context, key, val any) Context
pkg.go.dev/context

go test -bench

  1 package cotalk
  2 
  3 import (
  4     "fmt"
  5     "testing"
  6     "time"
  7 )
  8 
  9 func BenchmarkX(b *testing.B) {
 10     fmt.Println("N is", b.N)
 11     for i := 0; i < b.N; i++ {
 12         X()
 13     }
 14 }
 15 
 16 func X() {
 17     time.Sleep(time.Millisecond)
 18 }
 19 
$ go test -bench=BenchmarkX -v
goos: linux goarch: amd64 pkg: github.com/preferit/cotalk cpu: Intel(R) Xeon(R) E-2288G CPU @ 3.70GHz BenchmarkX N is 1 N is 100 N is 1100 BenchmarkX-16 1100 1095229 ns/op PASS ok github.com/preferit/cotalk 1.320s

Problem

  1 package cotalk
  2 
  3 import (
  4     "net/http"
  5 )
  6 
  7 // Problem defines the thing that needs to be solved
  8 type Problem interface {
  9 
 10     // Solve returns an error if the given algorithm does not solve
 11     // the problem.
 12     Solve(Algorithm) error
 13 }
 14 
 15 // Algorithm is any func that takes some work and returns it's result.
 16 // The validity of the result must be verified outside.
 17 type Algorithm func(work []*http.Request) (result []*http.Response)
 18 

The letter challenge

 13 // NewLetterChallenge returns a problem defined as
 14 //
 15 //   - get all letters using a list of requests
 16 //   - in the given order
 17 //
 18 // letters should be a space separated string of letters
 19 func NewLetterChallenge(letters string) *OrderedLetters {
 20     return &OrderedLetters{
 21         exp: letters,
 22     }
 23 }
 24 
 25 type OrderedLetters struct {
 26     url string
 27     exp string
 28 }
 29 
 30 func (p *OrderedLetters) Server() *httptest.Server {
 31     handler := func(w http.ResponseWriter, r *http.Request) {
 32         <-time.After(10 * time.Millisecond)
 33         w.Write([]byte(r.URL.Path[1:]))
 34     }
 35     srv := httptest.NewServer(http.HandlerFunc(handler))
 36     p.url = srv.URL
 37     return srv
 38 }
 39 
 40 func (p *OrderedLetters) Solve(alg Algorithm) error {
 41     // create the workload, ie. requests
 42     words := strings.Split(p.exp, " ")
 43     work := make([]*http.Request, len(words))
 44     for i, word := range words {
 45         work[i], _ = http.NewRequest("GET", p.url+"/"+word, http.NoBody)
 46     }
 47 
 48     // run the algorithm
 49     result := alg(work)
 50 
 51     // verify the result
 52     return p.verify(work, result)
 53 }
 54 
 55 func (p *OrderedLetters) verify(work []*http.Request, result []*http.Response) error {
 56     got := make([]string, 0, len(result))
 57     for _, resp := range result {
 58         if resp != nil {
 59             var buf bytes.Buffer
 60             io.Copy(&buf, resp.Body)
 61             resp.Body.Close()
 62             got = append(got, buf.String())
 63         } else {
 64             got = append(got, " ") // makes it easier to see
 65         }
 66     }
 67     if got := strings.Join(got, " "); p.exp != got {
 68         return fmt.Errorf("\nexp: %s\ngot: %s", p.exp, got)
 69     }
 70     return nil
 71 }
 72 

Verification

Each algorithm in these examples is tested like this

 10 const Letters = "0 1 2 3 4 5 6 7 8 9 a b c d e f"
 11 
 12 func BenchmarkAlg01(b *testing.B) {
 13     // setup problem outside the loop
 14     problem := NewLetterChallenge(Letters)
 15     srv := problem.Server()
 16     defer srv.Close()
 17     b.ResetTimer()
 18 
 19     for i := 0; i < b.N; i++ {
 20         if err := problem.Solve(Alg01); err != nil {
 21             b.Fatal(err)
 22         }
 23     }
 24 }
 25 

Sequential

Simple implementation though very low performance

  9 // Alg01 solves the work sequentially
 10 func Alg01(work []*http.Request) []*http.Response {
 11     result := make([]*http.Response, 0, len(work))
 12     for _, r := range work {
 13         resp, err := http.DefaultClient.Do(r)
 14         if err != nil {
 15             panic(err.Error())
 16         }
 17         result = append(result, resp)
 18     }
 19     return result
 20 }
 21 
$ go test -benchmem -bench=BenchmarkAlg01
goos: linux goarch: amd64 pkg: github.com/preferit/cotalk cpu: Intel(R) Xeon(R) E-2288G CPU @ 3.70GHz BenchmarkAlg01-16 6 175604692 ns/op 279224 B/op 1980 allocs/op PASS ok github.com/preferit/cotalk 1.236s

Concurrent using sync.WaitGroup

 22 // Alg02 uses sync.WaitGroup to wait for all responses
 23 func Alg02(work []*http.Request) (result []*http.Response) {
 24     var wg sync.WaitGroup
 25     for _, r := range work {
 26         wg.Add(1)
 27         go func() {
 28             resp, _ := http.DefaultClient.Do(r)
 29             result = append(result, resp)
 30             wg.Done()
 31         }()
 32     }
 33     wg.Wait()
 34     return
 35 }
 36 
$ go test -benchmem -bench=BenchmarkAlg02
--- FAIL: BenchmarkAlg02 alg_test.go:22: exp: 0 1 2 3 4 5 6 7 8 9 a b c d e f got: f f f f f f f f f f f f f f f f FAIL exit status 1 FAIL github.com/preferit/cotalk 0.013s FAIL

Why does it fail?

Arguments are evaluated at calltime

 37 // Alg03 fixes reference problem inside loop
 38 func Alg03(work []*http.Request) (result []*http.Response) {
 39     var wg sync.WaitGroup
 40     for _, r := range work {
 41         wg.Add(1)
 42         go func(lr *http.Request) {
 43             // use local argument
 44             resp, _ := http.DefaultClient.Do(lr)
 45             result = append(result, resp)
 46             wg.Done()
 47         }(r) // make a copy of pointer with argument
 48     }
 49     wg.Wait()
 50     return
 51 }
 52 
$ go test -benchmem -bench=BenchmarkAlg03
--- FAIL: BenchmarkAlg03 alg_test.go:32: exp: 0 1 2 3 4 5 6 7 8 9 a b c d e f got: 1 0 5 8 6 2 e 7 d f b 4 c 9 3 FAIL exit status 1 FAIL github.com/preferit/cotalk 0.014s FAIL

You might get a different result; why does it still fail? and can the tooling help identify the problem, try

$ go test -benchmem -bench=BenchmarkAlg03 -race -count 1

Protect concurrent writes with sync.Mutex

 53 // Alg04 synchronizes writes accross go routines
 54 func Alg04(work []*http.Request) (result []*http.Response) {
 55     var wg sync.WaitGroup
 56     var m sync.Mutex
 57     for _, r := range work {
 58         wg.Add(1)
 59         go func(lr *http.Request) {
 60             resp, _ := http.DefaultClient.Do(lr)
 61             // protect result
 62             m.Lock()
 63             result = append(result, resp)
 64             m.Unlock()
 65             wg.Done()
 66         }(r)
 67     }
 68     wg.Wait()
 69     return
 70 }
 71 
$ go test -benchmem -bench=BenchmarkAlg04
--- FAIL: BenchmarkAlg04 alg_test.go:42: exp: 0 1 2 3 4 5 6 7 8 9 a b c d e f got: d 1 c 5 4 3 8 7 e 2 a 9 f 6 b 0 FAIL exit status 1 FAIL github.com/preferit/cotalk 0.014s FAIL

Why does it fail?

Sort results

 72 // Alg05 fix order
 73 func Alg05(work []*http.Request) (result []*http.Response) {
 74     var wg sync.WaitGroup
 75     var m sync.Mutex
 76     result = make([]*http.Response, len(work))
 77     for i, r := range work {
 78         wg.Add(1)
 79         go func(i int, lr *http.Request) {
 80             resp, _ := http.DefaultClient.Do(lr)
 81             // protect result
 82             m.Lock()
 83             result[i] = resp
 84             m.Unlock()
 85             wg.Done()
 86         }(i, r)
 87     }
 88     wg.Wait()
 89     return
 90 }
 91 
$ go test -benchmem -bench=BenchmarkAlg05
goos: linux goarch: amd64 pkg: github.com/preferit/cotalk cpu: Intel(R) Xeon(R) E-2288G CPU @ 3.70GHz BenchmarkAlg05-16 100 13281838 ns/op 289424 B/op 2018 allocs/op PASS ok github.com/preferit/cotalk 1.344s

Improved performance paid with complexity

Comparing the sequential working algorithm to the working concurrent one, tests reveal a substantial improvement.

$ go test -benchmem -bench="(Alg01|Alg05)$"
goos: linux goarch: amd64 pkg: github.com/preferit/cotalk cpu: Intel(R) Xeon(R) E-2288G CPU @ 3.70GHz BenchmarkAlg1-16 6 175609134 ns/op 276349 B/op 1976 allocs/op BenchmarkAlg5-16 87 13308022 ns/op 292652 B/op 2021 allocs/op PASS ok github.com/preferit/cotalk 2.413s

Using channel

 92 // Alg06 uses channel to synchronize responses
 93 func Alg06(work []*http.Request) (result []*http.Response) {
 94     c := make(chan *http.Response)
 95     for _, r := range work {
 96         go func(lr *http.Request) {
 97             resp, _ := http.DefaultClient.Do(lr)
 98             c <- resp // write to channel
 99         }(r)
100     }
101     for range work {
102         resp := <-c // read from channel
103         result = append(result, resp)
104     }
105     return
106 }
107 
$ go test -benchmem -bench=BenchmarkAlg06
--- FAIL: BenchmarkAlg06 alg_test.go:62: exp: 0 1 2 3 4 5 6 7 8 9 a b c d e f got: f c b 9 d a 0 3 4 2 5 7 e 1 6 8 FAIL exit status 1 FAIL github.com/preferit/cotalk 0.014s FAIL

Correct order using channel

108 // Alg07 uses channel to synchronize responses with ordered result
109 func Alg07(work []*http.Request) (result []*http.Response) {
110     type m struct {
111         index int
112         *http.Response
113     }
114     c := make(chan m)
115     result = make([]*http.Response, len(work))
116     for i, r := range work {
117         go func(i int, lr *http.Request) {
118             resp, _ := http.DefaultClient.Do(lr)
119             c <- m{i, resp} // write to channel
120         }(i, r)
121     }
122     for range work {
123         v := <-c // read from channel
124         result[v.index] = v.Response
125     }
126     return
127 }
128 
$ go test -benchmem -bench=BenchmarkAlg07
goos: linux goarch: amd64 pkg: github.com/preferit/cotalk cpu: Intel(R) Xeon(R) E-2288G CPU @ 3.70GHz BenchmarkAlg07-16 100 13411328 ns/op 293160 B/op 2025 allocs/op PASS ok github.com/preferit/cotalk 1.357s

There is still a bug in this code, do you see it?

Clean up resources

129 // Alg08 uses channel to synchronize responses with ordered result
130 func Alg08(work []*http.Request) (result []*http.Response) {
131     type m struct {
132         index int
133         *http.Response
134     }
135     c := make(chan m)
136     defer close(c) // make sure you clean up when done
137     result = make([]*http.Response, len(work))
138     for i, r := range work {
139         go func(i int, lr *http.Request) {
140             resp, _ := http.DefaultClient.Do(lr)
141             c <- m{i, resp} // write to channel
142         }(i, r)
143     }
144     for range work {
145         v := <-c // read from channel
146         result[v.index] = v.Response
147     }
148     return
149 }
150 
$ go test -benchmem -bench=BenchmarkAlg08
goos: linux goarch: amd64 pkg: github.com/preferit/cotalk cpu: Intel(R) Xeon(R) E-2288G CPU @ 3.70GHz BenchmarkAlg08-16 99 13400211 ns/op 292396 B/op 2022 allocs/op PASS ok github.com/preferit/cotalk 1.356s

Interrupt

151 // Alg09 returns when all work is done or context is cancelled
152 func Alg09(ctx context.Context, work []*http.Request) (result []*http.Response) {
153     type m struct {
154         index int
155         *http.Response
156     }
157     c := make(chan m)
158     complete := make(chan struct{})
159     defer close(c) // make sure you clean up when done
160     result = make([]*http.Response, len(work))
161     go func() {
162         defer close(complete)
163         for i, r := range work {
164             go func(i int, lr *http.Request) {
165                 resp, _ := http.DefaultClient.Do(lr)
166                 c <- m{i, resp} // write to channel
167             }(i, r)
168         }
169         for range work {
170             v := <-c // read from channel
171             result[v.index] = v.Response
172         }
173     }()
174     select {
175     case <-ctx.Done():
176         // interrupted
177     case <-complete:
178     }
179     return
180 }
181 
$ go test -benchmem -bench=BenchmarkAlg09
goos: linux goarch: amd64 pkg: github.com/preferit/cotalk cpu: Intel(R) Xeon(R) E-2288G CPU @ 3.70GHz BenchmarkAlg09-16 panic: send on closed channel goroutine 499 [running]: github.com/preferit/cotalk.Alg9.func1.1(0x3, 0xc00040a780?) /home/gregory/src/github.com/preferit/cotalk/alg.go:162 +0x55 created by github.com/preferit/cotalk.Alg9.func1 /home/gregory/src/github.com/preferit/cotalk/alg.go:160 +0x8e exit status 2 FAIL github.com/preferit/cotalk 0.054s FAIL

Respect context cancellation

182 // Alg10 returns when all work is done or context is cancelled
183 func Alg10(ctx context.Context, work []*http.Request) (result []*http.Response) {
184     type m struct {
185         index int
186         *http.Response
187     }
188     c := make(chan m)
189     complete := make(chan struct{})
190     defer close(c) // make sure you clean up when done
191     result = make([]*http.Response, len(work))
192     go func() {
193         defer close(complete)
194         for i, r := range work {
195             go func(i int, lr *http.Request) {
196                 resp, _ := http.DefaultClient.Do(lr)
197                 select {
198                 case <-ctx.Done():
199                 default:
200                     c <- m{i, resp} // write to channel
201                 }
202             }(i, r)
203         }
204         for range work {
205             v := <-c // read from channel
206             result[v.index] = v.Response
207         }
208     }()
209     select {
210     case <-ctx.Done():
211         // interrupted
212     case <-complete:
213     }
214     return
215 }
216 
$ go test -benchmem -bench=BenchmarkAlg10
goos: linux goarch: amd64 pkg: github.com/preferit/cotalk cpu: Intel(R) Xeon(R) E-2288G CPU @ 3.70GHz BenchmarkAlg10-16 --- FAIL: BenchmarkAlg10-16 alg_test.go:123: exp: 0 1 2 3 4 5 6 7 8 9 a b c d e f got: 0 7 b f FAIL exit status 1 FAIL github.com/preferit/cotalk 0.074s FAIL

Compare all

In this example using channels and sync package primitives seem to yield more or less the same result. There performance boost would be to try and minimize number of allocations. But that is out of scope for this talk.

$ go test -benchmem -bench="(Alg01|Alg05|Alg08)$"
goos: linux goarch: amd64 pkg: github.com/preferit/cotalk cpu: Intel(R) Xeon(R) E-2288G CPU @ 3.70GHz BenchmarkAlg1-16 6 175310048 ns/op 277350 B/op 1982 allocs/op BenchmarkAlg5-16 88 13202327 ns/op 293016 B/op 2026 allocs/op BenchmarkAlg8-16 91 13449404 ns/op 289586 B/op 2019 allocs/op PASS ok github.com/preferit/cotalk 3.652s

Go concurrency design summary