在如今硬件非常发达的时代,并发编程(concurrent programming)变得非常重要,Go在并发编程比大多数语言更方便。

1. Goroutines

Go中没有多进程多线程的概念,而是使用goroutine的概念。为了方便理解,你暂时可以把goroutine理解为其他语言中的线程,后面我们会说二者之间有什么区别。使用go关键字就可以启动一个goroutine。看下面例子:

package main

import (
    "fmt"
    "time"
)

func sub_goroutine() {
    fmt.Println("I'm goroutine 1")
}

func main() {
    fmt.Println("In Main Goroutine")

    go sub_goroutine()

    go func() {
        fmt.Println("I'm goroutine 2")
    }()

    time.Sleep(time.Second * 1)
}

上面的程序中我们在main(main其实也是goroutine,类似于主进程的概念)中创建了两个goroutine,和线程一样,goroutine一旦被创建就会立刻去执行。如果在goroutine返回前,main就结束的话,那这些子goroutine也会被强制退出。上面程序中,我们为了保证main退出前两个goroutine执行完毕,我们在main中调用了sleep,当然这样并不能绝对保证goroutine之间的先后顺序,下面我们介绍一种可靠的机制来实现多个goroutine之间的先后顺序。

2. Channels

我们知道进程间通信和线程间通信有很多种方法,那goroutine间如何通信呢?没错,就是用channel(信道)。我们先简单介绍一下Channel,再看如何利用Channel在不同goroutine之间通信。关于Channel有如下要点:

  • 声明一个channel使用关键字chan,后面跟这个channel里面所传递的对象类型,创建channel使用make

    ch := make(chan int)    // ch has type 'chan int'

    上面创建了一个(只能)传递int类型元素的channel。

  • channel主要有三个操作:

    1. send。比如ch <- x表示将x发送到ch。
    2. receive。比如x = <-ch表示从ch中接收值并赋给变量x。
    3. close。我们可以使用close函数关闭一个channel:close(ch)。当一个channel被关闭的时候,表示不会再有数据发送到这个channel上面。如果向一个已经关闭的channel发送数据,到导致panic。如果从一个已经关闭的channel上面接收数据,会先将channel上面残留的数据全部接收,后面再接收时会收到该channel类型的零值。
  • channel是引用类型的。相同类型的channel可以使用==进行比较,如果他们引用了相同的数据结构,则结果为真,否则为假。
  • channel的零值是nil,在一个为nil的channel上面执行send或者receive将永远阻塞。

make创建channel时,可以接收第二个参数,表示创建一个容量为指定值得channel:

ch = make(chan int)
ch = make(chan int, 0)
ch = make(chan int, 3)

上面的例子中,前两行创建的channel称为Unbuffered Channel,最后一句创建的channel称为Buffered Channel,下面我们分别介绍这两种不同类型的Channel。

2.1 Unbuffered Channel

假设ch是一个Unbuffered Channel,那么我们在goroutine A中向ch中发送数据时,A会一直被阻塞,直到有另外一个goroutine从ch中读取数据,A才会继续往下执行。同理,如果A从ch中读取数据,那么A会一直被阻塞,直到有另外一个goroutine向ch中发送数据。所以可以看到,通过Unbuffered Channel,我们可以实现两个进程的同步(synchronize),所以Unbuffered Channel有时也被称为同步信道(synchronous channels)。

看一个例子:

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan int)

    go func() {
        for i := 0; i < 10; i++ {
            time.Sleep(time.Millisecond)
            fmt.Println("Sub goroutine send: ", i)
            ch <- i
        }
        close(ch)
    }()

    for i := 0; i < 10; i++ {
        time.Sleep(time.Millisecond)
        fmt.Println("Main goroutine receive: ", <-ch)
    }
}

程序执行结果:

Sub goroutine send:  0
Main goroutine receive:  0
Sub goroutine send:  1
Main goroutine receive:  1
Sub goroutine send:  2
Main goroutine receive:  2
Sub goroutine send:  3
Main goroutine receive:  3
Sub goroutine send:  4
Main goroutine receive:  4
Sub goroutine send:  5
Main goroutine receive:  5
Sub goroutine send:  6
Main goroutine receive:  6
Sub goroutine send:  7
Main goroutine receive:  7
Sub goroutine send:  8
Main goroutine receive:  8
Sub goroutine send:  9
Main goroutine receive:  9
[Finished in 0.2s]

上面的例子中我们在Main goroutine中从ch中接收数据,在Sub goroutine中发送数据,可以看到收发是同步的。(PS:这里在收发中个sleep 1毫秒是因为CPU的执行速度远远大于IO速度,当我们从ch接收数据后,发送端就不再阻塞,就可以马上再发送,并且打印日志,但此时接收端可能还在打印输出,所以可能最终的打印就会有些错位,没有那么直观。所以我们增加一个sleep来消除这个问题)

当然,从这个例子中我们预先知道发送了10次,所以我们也就接收10次,但如果我们不知道发送了多少次,那接收端如何知道发送端是否已经发送结束了呢?有同学可能会说当我们接收到零值的时候。显然这个是行不通的,比如上面的例子中,ch中元素类型的零值就是整数0,那如果发送端发送的数据也是0的话,接收端是无法判断出发送端发送的数据是0,还是因为发送端已经关闭了channel而导致接收到0.当然,Go设计者早就考虑到了这个问题,所以提供了'comma, ok'机制来解决这个问题。我们从channel接收数据时,可以获得一个bool值,如果channel已经关闭并且已经没有数据可以再接收,那这个bool值就为false,否则为true,这样我们通过这个bool值就可以判断上面的情景了。这里我们更改一下上面的例子:

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan int)

    go func() {
        for i := 0; i < 10; i++ {
            time.Sleep(time.Millisecond)
            fmt.Println("Sub goroutine send: ", i)
            ch <- i
        }
        close(ch)
    }()

    var (
        temp int
        ok   bool
    )

    for {
        if temp, ok = <-ch; !ok {
            fmt.Println("Channel has been closed and drained.")
            break
        }
        fmt.Println("Main goroutine receive: ", temp)
        time.Sleep(time.Millisecond)
    }

    return
}

除此以外,还可以使用更加方便的range loop来遍历channel,当channel里面的所有元素被读取完后,循环会自动退出。看下面的例子:

package main

import (
    "fmt"
    "time"
)

func main() {
    ch := make(chan int)

    go func() {
        for i := 0; i < 10; i++ {
            time.Sleep(time.Millisecond)
            fmt.Println("Sub goroutine send: ", i)
            ch <- i
        }
        close(ch)
    }()

    for temp := range ch {
        fmt.Println("Main goroutine receive: ", temp)
        time.Sleep(time.Millisecond)
    }

    return
}

最后关于Unbuffered Channel还有以下注意点:

  • channel使用完以后可以不用显式的关闭,程序结束时会自动关掉。但是如果我们想告诉接收端所有数据已经发送完毕的话,那我们就需要在所有数据发送后显式的关闭channel。但是关闭已经关闭的channel会导致panic。
  • 我们可以声明只用于发送(chan <- type,send-only channel)或者只用于接收(<- chan type,receive-only channel)的channel,这在函数传参是往往非常有用。这种单向的channel我们称之为Unidirectional Channel。

2.2 Buffered Channel

Buffered Channel有点像消息队列,其大小在使用make创建的时候由第二个参数指定。它和Unbuffered Channel的区别在于它没有被填满之前是非阻塞的,比如一个容量为100的Buffered Channel,我们可以一直往里面发送数据,在channel达到100个元素之前,发送是不会被阻塞的。当满100后,发送就会被阻塞,此时,接收端接收一个,就可以再发送一个。接收端也是相同的道理,只有当里面没有元素时才会阻塞。

我们可以使用len函数获取当前channel中元素的个数,可以使用cap函数获取channel的容量。

看个例子:

package main

import (
    "fmt"
)

func main() {
    ch := make(chan int, 4)

    fmt.Println("chan cap:", cap(ch))
    fmt.Println("chan len:", len(ch))

    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println("Sub goroutine send: ", i)
            ch <- i
        }
        close(ch)
    }()

    for temp := range ch {
        fmt.Println("Main goroutine receive: ", temp)
    }

    return
}

程序执行结果:

chan cap: 4
chan len: 0
Sub goroutine send:  0
Sub goroutine send:  1
Sub goroutine send:  2
Sub goroutine send:  3
Sub goroutine send:  4
Sub goroutine send:  5
Main goroutine receive:  0
Main goroutine receive:  1
Main goroutine receive:  2
Main goroutine receive:  3
Main goroutine receive:  4
Main goroutine receive:  5
Sub goroutine send:  6
Sub goroutine send:  7
Sub goroutine send:  8
Sub goroutine send:  9
Main goroutine receive:  6
Main goroutine receive:  7
Main goroutine receive:  8
Main goroutine receive:  9
[Finished in 0.2s]

3. Select

Go也提供了多路复用机制——select,语法如下:

select {
case 场景1:
    // ...
case 场景2:
    //...
case 场景n:
    //...
default:    // 可选的
    //...
}

上面的每种场景都必须指定从某个channel读取数据或者向某个channel发送数据(也就是说select是配合channel用的)。

  1. 没有default语句的情况下,select会一直阻塞到某个case可以处理,即成功发送或成功接收到数据。如果有多个case同时ready,就会随机从中选一个进行处理。当然,select也可以不包含任何case,此时,select将永远阻塞。
  2. 如果有default语句的话,select将是非阻塞的:如果所有的case都没有ready,将直接执行default语句。

看两个例子:

阻塞型:

package main

import "time"
import "fmt"

func main() {
    c1 := make(chan string)
    c2 := make(chan string)
    
    go func() {
        time.Sleep(time.Second * 1)
        c1 <- "one"
    }()
    
    go func() {
        time.Sleep(time.Second * 2)
        c2 <- "two"
    }()
    
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-c1:
            fmt.Println("received", msg1)
        case msg2 := <-c2:
            fmt.Println("received", msg2)
        }
    }
}

程序执行结果:

received one
received two
[Finished in 2.4s]

非阻塞型:

package main

import "time"
import "fmt"

func main() {
    c1 := make(chan string)
    c2 := make(chan string)
    
    go func() {
        time.Sleep(time.Second * 1)
        c1 <- "one"
    }()
    
    go func() {
        time.Sleep(time.Second * 2)
        c2 <- "two"
    }()
    
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-c1:
            fmt.Println("received", msg1)
        case msg2 := <-c2:
            fmt.Println("received", msg2)
        default:
            fmt.Println("Non-blocking")
        }
    }
}

程序运行结果:

Non-blocking
Non-blocking
[Finished in 0.3s]

4. 锁

锁是并发程序中必不可少的一部分,Go的sync包提供了排它锁sync.Mutex和读写锁sync.RWMutex.除此以外,还提供了一个保证函数只执行一次的函数sync.Once.下面我们分别介绍。

4.1 sync.Mutex

排它锁的使用很简单,利用sync.Mutex,我们可以保证同一时刻只有一个goroutine可以访问临界区。看个例子:

不使用锁:

package main

import (
    "fmt"
    // "sync"
)

func main() {
    done := make(chan bool, 100)
    var a int

    for i := 0; i < 100; i++ {
        go func() {
            for j := 0; j < 10000; j++ {
                a++
            }
            done <- true
        }()
    }

    for i := 0; i < 100; i++ {
        <-done
    }
    fmt.Println("a:", a)
}

这里我们创建100个goroutine,每个goroutine里面循环10000,每次都对a加1。抛开并发的概念的话,最终a的值应该是100*10000=1000000.但实际我们运行程序却会发现每次运行a的值都不一样,且都比1000000小。这个就是因为100个goroutine并发执行的结果。下面我们对操作a的地方都加上锁:

package main

import (
    "fmt"
    "sync"
)

func main() {
    done := make(chan bool, 100)
    var mu sync.Mutex
    var a int

    for i := 0; i < 100; i++ {
        go func() {
            for j := 0; j < 10000; j++ {
                mu.Lock()
                a++
                mu.Unlock()
            }
            done <- true
        }()
    }

    for i := 0; i < 100; i++ {
        <-done
    }
    fmt.Println("a:", a)
}

这样同一时刻只有一个goroutine可以操作a,便解决了并发的问题,多次运行程序a的值都是1000000。当然我们发现程序执行的时间也变长了。

sync.Mutex类型只有Lock和Unlock两个函数,且默认值(零值)为unlock状态。Lock和Unlock必须成对。

4.2 sync.RWMutex

sync.RWMutex为读写锁,所谓读写锁就是对于临界区资源,可以有多个人同时去读,但同一时刻只能有一个人写。即只存在两种状态:①一个人或多个人读临界区资源,没有人写临界区资源②只有1个人写临界区资源且无人读临界区资源。简单说就是读和读不互斥,但读和写互斥。

sync.RWMutex类型有四个主要函数(假设rw为sync.RWMutex类型的变量):

  1. rw.Lock():锁住rw用于写,如果已经有人锁住rw,不论是读或者写,则操作被阻塞。
  2. rw.Unlock():解锁rw,只能与rw.Lock配合使用。
  3. rw.RLock():锁住rw用于读。
  4. rw.RUlock():解锁rw,只能与rw.RLock配合使用。

关于读写锁,需要注意必须配合使用,即Lock和Unlock成对使用(读),RLock和RUlock成对使用(写)。

这里我们举一个简单的例子:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    done := make(chan bool, 6)

    var rw sync.RWMutex

    go func() {
        rw.RLock()
        fmt.Println("Enter goroutine 1")
        time.Sleep(time.Second * 5)
        fmt.Println("Exit goroutine 1")
        rw.RUnlock()

        done <- true
    }()

    go func() {
        rw.RLock()
        fmt.Println("Enter goroutine 2")
        time.Sleep(time.Second * 5)
        fmt.Println("Exit goroutine 2")
        rw.RUnlock()

        done <- true
    }()

    go func() {
        rw.RLock()
        fmt.Println("Enter goroutine 3")
        time.Sleep(time.Second * 5)
        fmt.Println("Exit goroutine 3")
        rw.RUnlock()

        done <- true
    }()

    go func() {
        rw.Lock()
        fmt.Println("Enter goroutine 4")
        time.Sleep(time.Second * 5)
        fmt.Println("Exit goroutine 4")
        rw.Unlock()

        done <- true
    }()

    go func() {
        rw.Lock()
        fmt.Println("Enter goroutine 5")
        time.Sleep(time.Second * 5)
        fmt.Println("Exit goroutine 5")
        rw.Unlock()

        done <- true
    }()
    go func() {
        rw.Lock()
        fmt.Println("Enter goroutine 6")
        time.Sleep(time.Second * 5)
        fmt.Println("Exit goroutine 6")
        rw.Unlock()

        done <- true
    }()

    for i := 0; i < 6; i++ {
        <-done
    }

}

上面的例子中,前3个goroutine都申请的是读锁,所以他们几个是可以同时进入临界区的,而后三个申请的都是写锁,它们要进入临界区的条件是没有任何人在临界区中,且它进入临界区后,其他所有人都不能再进入临界区。

关于Go的两种锁(排他锁和读写锁)我们需要注意以下两点:

  1. 只有加锁后才可以解锁,否则将导致运行时错误(runtime error)。比如,如果一个锁mu(排他锁)或者rw(读写锁)并没有调用过Lock/RLock,我们就在上面调用Unlock/RUlock的话,将导致运行时错误。所以,锁必须成对使用。
  2. 锁虽然必须成对使用,但可以在这个goroutine中加锁后,在另外一个goroutine里面解锁。即锁状态不是和goroutine绑定的。

4.3 sync.Once

这个锁的典型使用场景为:我们有一个程序有许多goroutine,但是他们都共享了一个全局资源,这个资源可以由任意一个且只能有一个goroutine初始化,而且这个初始化一般还是比较耗时的。这种情况下,我们就可以将初始化的动作写成一个函数,然后使用这个锁去调用这个函数,那么这个函数将只被执行一次。

这里我们用一个例子进行说明,例子中,有一个全局的map,里面记录了图标和图标所对应的图片,程序(该程序包含多个goroutine)启动时,任意一个goroutine需要检查这个map是否已经被初始化,若干没有,就初始化这个map,但是因为图片很多,初始化动作比较慢。很显然,因为map是个全局资源,这个初始化动作只能由一个goroutine去完成。下面看我们是怎么去实现的:

package main

import (
    "fmt"
    "image"
    "sync"
    "time"
)

var icons map[string]image.Image
var loadIconsOnce sync.Once

func loadIcon(name string) {
    // do something
    return
}

func loadIcons() {
    icons = map[string]image.Image{
        "a.png": loadIcon("a.png"),
        "b.png": loadIcon("b.png"),
        "c.png": loadIcon("c.png"),
        ...
    }
}

func Icon(name string) image.Image {
    loadIconsOnce.Do(loadIcons)
    return icons[name]
}

我们声明了一个sync.Once变量,该类型有一个函数Do,其参数为一个不带任何参数,也没有返回值的函数。它的特性是:如果有多个人调用xxx.Do(f),那么只有第一个调用的人会执行函数f。对于这种场景,我们在其他语言中的,我们一般是使用一个排它锁去实现,没错,Go中的sync.Once底层实现也是借助排它锁和一个无符号整数变量实现的。Go的源代码如下:

// Do calls the function f if and only if Do is being called for the
// first time for this instance of Once. In other words, given
//  var once Once
// if once.Do(f) is called multiple times, only the first call will invoke f,
// even if f has a different value in each invocation. A new instance of
// Once is required for each function to execute.
//
// Do is intended for initialization that must be run exactly once. Since f
// is niladic, it may be necessary to use a function literal to capture the
// arguments to a function to be invoked by Do:
//  config.once.Do(func() { config.init(filename) })
//
// Because no call to Do returns until the one call to f returns, if f causes
// Do to be called, it will deadlock.
//
// If f panics, Do considers it to have returned; future calls of Do return
// without calling f.
//
func (o *Once) Do(f func()) {
    if atomic.LoadUint32(&o.done) == 1 {
        return
    }
    // Slow-path.
    o.m.Lock()
    defer o.m.Unlock()
    if o.done == 0 {
        defer atomic.StoreUint32(&o.done, 1)
        f()
    }
}

5. Goroutine和Thread

从前面的介绍中,我们可以看到宏观上Goroutine和Thread几乎没有区别。现在我们从“微观”看看二者的区别。

5.1 栈(stack)的区别

我们知道OS的thread都有一个固定大小的栈,一般为2MB。而goroutine的栈是可变大小的,一般初始值2KB,随着使用可以动态扩展,最大可至1GB。这样有什么好处呢?

  1. 一般的线程可能用不到2MB大小的栈,这对于资源是一个浪费,限制了系统thread的个数。而goroutine栈初始值为2KB,很节省资源。
  2. 对于一些深递归函数,2MB的栈空间很可能不够用,栈溢出导致段错误。而Goroutine的栈大小是动态调整的。所以总体而言,goroutine栈大小的机制对于资源利用的更加充分。

5.2 调度的区别

thread是系统级别的,是由内核负责调度的。线程切换是需要进行上下文切换(context switch),而且需要更新调度器的数据结构。这个切换过程是比较缓慢的(当然线程级的切换比进程切换还是快不少的),其中可能还涉及CPU与内存的结构(可了解CPU与内存的亲和性)。

而goroutine是应用级的,是有goroutine自己的调度器调度的(使用的是“m:n scheduling”算法)。首先它是应用级的,所以多个goroutine之间的切换比thread之间的切换更加轻量级,因为不涉及内核级别的上下文切换。而且它的调度算法也尽量避免了goroutine在多个CPU核之间的切换,有点类似于进程/线程绑核后的效果。

一般如果一个机器有N个CPU,那goroutine的调度器会一次将go程序运行在N个thread上面。当然我们也可以使用GOMAXPROCS来指定我们所要使用的CPU核数(不能超出机器实际的核数)。

简单理解就是,系统底层依旧是thread,但goroutine有自己的调度器,这个调度器会起n(n为GOMAXPROCS的值或者CPU的核数)个thread,然后它会将go程序分布在这n个thread上去执行,且保证同一个goroutine以及相关的代码一直只在一个thread上面运行,这样就可以尽量避免内核级别的线程切换,从而提高了效率。

这里推荐一个一篇关于Go调度器的文章《The Go scheduler》。