在如今硬件非常发达的时代,并发编程(concurrent programming)变得非常重要,Go在并发编程比大多数语言更方便。
1. Goroutines
Go中没有多进程多线程的概念,而是使用goroutine的概念。为了方便理解,你暂时可以把goroutine理解为其他语言中的线程,后面我们会说二者之间有什么区别。使用go
关键字就可以启动一个goroutine。看下面例子:
package main
import (
"fmt"
"time"
)
func sub_goroutine() {
fmt.Println("I'm goroutine 1")
}
func main() {
fmt.Println("In Main Goroutine")
go sub_goroutine()
go func() {
fmt.Println("I'm goroutine 2")
}()
time.Sleep(time.Second * 1)
}
上面的程序中我们在main(main其实也是goroutine,类似于主进程的概念)中创建了两个goroutine,和线程一样,goroutine一旦被创建就会立刻去执行。如果在goroutine返回前,main就结束的话,那这些子goroutine也会被强制退出。上面程序中,我们为了保证main退出前两个goroutine执行完毕,我们在main中调用了sleep,当然这样并不能绝对保证goroutine之间的先后顺序,下面我们介绍一种可靠的机制来实现多个goroutine之间的先后顺序。
2. Channels
我们知道进程间通信和线程间通信有很多种方法,那goroutine间如何通信呢?没错,就是用channel(信道)。我们先简单介绍一下Channel,再看如何利用Channel在不同goroutine之间通信。关于Channel有如下要点:
声明一个channel使用关键字
chan
,后面跟这个channel里面所传递的对象类型,创建channel使用make
。ch := make(chan int) // ch has type 'chan int'
上面创建了一个(只能)传递int类型元素的channel。
channel主要有三个操作:
- send。比如
ch <- x
表示将x发送到ch。 - receive。比如
x = <-ch
表示从ch中接收值并赋给变量x。 - close。我们可以使用
close
函数关闭一个channel:close(ch)
。当一个channel被关闭的时候,表示不会再有数据发送到这个channel上面。如果向一个已经关闭的channel发送数据,到导致panic。如果从一个已经关闭的channel上面接收数据,会先将channel上面残留的数据全部接收,后面再接收时会收到该channel类型的零值。
- send。比如
- channel是引用类型的。相同类型的channel可以使用
==
进行比较,如果他们引用了相同的数据结构,则结果为真,否则为假。 - channel的零值是nil,在一个为nil的channel上面执行send或者receive将永远阻塞。
make创建channel时,可以接收第二个参数,表示创建一个容量为指定值得channel:
ch = make(chan int)
ch = make(chan int, 0)
ch = make(chan int, 3)
上面的例子中,前两行创建的channel称为Unbuffered Channel,最后一句创建的channel称为Buffered Channel,下面我们分别介绍这两种不同类型的Channel。
2.1 Unbuffered Channel
假设ch是一个Unbuffered Channel,那么我们在goroutine A中向ch中发送数据时,A会一直被阻塞,直到有另外一个goroutine从ch中读取数据,A才会继续往下执行。同理,如果A从ch中读取数据,那么A会一直被阻塞,直到有另外一个goroutine向ch中发送数据。所以可以看到,通过Unbuffered Channel,我们可以实现两个进程的同步(synchronize),所以Unbuffered Channel有时也被称为同步信道(synchronous channels)。
看一个例子:
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan int)
go func() {
for i := 0; i < 10; i++ {
time.Sleep(time.Millisecond)
fmt.Println("Sub goroutine send: ", i)
ch <- i
}
close(ch)
}()
for i := 0; i < 10; i++ {
time.Sleep(time.Millisecond)
fmt.Println("Main goroutine receive: ", <-ch)
}
}
程序执行结果:
Sub goroutine send: 0
Main goroutine receive: 0
Sub goroutine send: 1
Main goroutine receive: 1
Sub goroutine send: 2
Main goroutine receive: 2
Sub goroutine send: 3
Main goroutine receive: 3
Sub goroutine send: 4
Main goroutine receive: 4
Sub goroutine send: 5
Main goroutine receive: 5
Sub goroutine send: 6
Main goroutine receive: 6
Sub goroutine send: 7
Main goroutine receive: 7
Sub goroutine send: 8
Main goroutine receive: 8
Sub goroutine send: 9
Main goroutine receive: 9
[Finished in 0.2s]
上面的例子中我们在Main goroutine中从ch中接收数据,在Sub goroutine中发送数据,可以看到收发是同步的。(PS:这里在收发中个sleep 1毫秒是因为CPU的执行速度远远大于IO速度,当我们从ch接收数据后,发送端就不再阻塞,就可以马上再发送,并且打印日志,但此时接收端可能还在打印输出,所以可能最终的打印就会有些错位,没有那么直观。所以我们增加一个sleep来消除这个问题)
当然,从这个例子中我们预先知道发送了10次,所以我们也就接收10次,但如果我们不知道发送了多少次,那接收端如何知道发送端是否已经发送结束了呢?有同学可能会说当我们接收到零值的时候。显然这个是行不通的,比如上面的例子中,ch中元素类型的零值就是整数0,那如果发送端发送的数据也是0的话,接收端是无法判断出发送端发送的数据是0,还是因为发送端已经关闭了channel而导致接收到0.当然,Go设计者早就考虑到了这个问题,所以提供了'comma, ok'机制来解决这个问题。我们从channel接收数据时,可以获得一个bool值,如果channel已经关闭并且已经没有数据可以再接收,那这个bool值就为false,否则为true,这样我们通过这个bool值就可以判断上面的情景了。这里我们更改一下上面的例子:
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan int)
go func() {
for i := 0; i < 10; i++ {
time.Sleep(time.Millisecond)
fmt.Println("Sub goroutine send: ", i)
ch <- i
}
close(ch)
}()
var (
temp int
ok bool
)
for {
if temp, ok = <-ch; !ok {
fmt.Println("Channel has been closed and drained.")
break
}
fmt.Println("Main goroutine receive: ", temp)
time.Sleep(time.Millisecond)
}
return
}
除此以外,还可以使用更加方便的range loop
来遍历channel,当channel里面的所有元素被读取完后,循环会自动退出。看下面的例子:
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan int)
go func() {
for i := 0; i < 10; i++ {
time.Sleep(time.Millisecond)
fmt.Println("Sub goroutine send: ", i)
ch <- i
}
close(ch)
}()
for temp := range ch {
fmt.Println("Main goroutine receive: ", temp)
time.Sleep(time.Millisecond)
}
return
}
最后关于Unbuffered Channel还有以下注意点:
- channel使用完以后可以不用显式的关闭,程序结束时会自动关掉。但是如果我们想告诉接收端所有数据已经发送完毕的话,那我们就需要在所有数据发送后显式的关闭channel。但是关闭已经关闭的channel会导致panic。
- 我们可以声明只用于发送(
chan <- type
,send-only channel)或者只用于接收(<- chan type
,receive-only channel)的channel,这在函数传参是往往非常有用。这种单向的channel我们称之为Unidirectional Channel。
2.2 Buffered Channel
Buffered Channel有点像消息队列,其大小在使用make创建的时候由第二个参数指定。它和Unbuffered Channel的区别在于它没有被填满之前是非阻塞的,比如一个容量为100的Buffered Channel,我们可以一直往里面发送数据,在channel达到100个元素之前,发送是不会被阻塞的。当满100后,发送就会被阻塞,此时,接收端接收一个,就可以再发送一个。接收端也是相同的道理,只有当里面没有元素时才会阻塞。
我们可以使用len
函数获取当前channel中元素的个数,可以使用cap
函数获取channel的容量。
看个例子:
package main
import (
"fmt"
)
func main() {
ch := make(chan int, 4)
fmt.Println("chan cap:", cap(ch))
fmt.Println("chan len:", len(ch))
go func() {
for i := 0; i < 10; i++ {
fmt.Println("Sub goroutine send: ", i)
ch <- i
}
close(ch)
}()
for temp := range ch {
fmt.Println("Main goroutine receive: ", temp)
}
return
}
程序执行结果:
chan cap: 4
chan len: 0
Sub goroutine send: 0
Sub goroutine send: 1
Sub goroutine send: 2
Sub goroutine send: 3
Sub goroutine send: 4
Sub goroutine send: 5
Main goroutine receive: 0
Main goroutine receive: 1
Main goroutine receive: 2
Main goroutine receive: 3
Main goroutine receive: 4
Main goroutine receive: 5
Sub goroutine send: 6
Sub goroutine send: 7
Sub goroutine send: 8
Sub goroutine send: 9
Main goroutine receive: 6
Main goroutine receive: 7
Main goroutine receive: 8
Main goroutine receive: 9
[Finished in 0.2s]
3. Select
Go也提供了多路复用机制——select,语法如下:
select {
case 场景1:
// ...
case 场景2:
//...
case 场景n:
//...
default: // 可选的
//...
}
上面的每种场景都必须指定从某个channel读取数据或者向某个channel发送数据(也就是说select是配合channel用的)。
- 没有default语句的情况下,select会一直阻塞到某个case可以处理,即成功发送或成功接收到数据。如果有多个case同时ready,就会随机从中选一个进行处理。当然,select也可以不包含任何case,此时,select将永远阻塞。
- 如果有default语句的话,select将是非阻塞的:如果所有的case都没有ready,将直接执行default语句。
看两个例子:
阻塞型:
package main
import "time"
import "fmt"
func main() {
c1 := make(chan string)
c2 := make(chan string)
go func() {
time.Sleep(time.Second * 1)
c1 <- "one"
}()
go func() {
time.Sleep(time.Second * 2)
c2 <- "two"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-c1:
fmt.Println("received", msg1)
case msg2 := <-c2:
fmt.Println("received", msg2)
}
}
}
程序执行结果:
received one
received two
[Finished in 2.4s]
非阻塞型:
package main
import "time"
import "fmt"
func main() {
c1 := make(chan string)
c2 := make(chan string)
go func() {
time.Sleep(time.Second * 1)
c1 <- "one"
}()
go func() {
time.Sleep(time.Second * 2)
c2 <- "two"
}()
for i := 0; i < 2; i++ {
select {
case msg1 := <-c1:
fmt.Println("received", msg1)
case msg2 := <-c2:
fmt.Println("received", msg2)
default:
fmt.Println("Non-blocking")
}
}
}
程序运行结果:
Non-blocking
Non-blocking
[Finished in 0.3s]
4. 锁
锁是并发程序中必不可少的一部分,Go的sync包提供了排它锁sync.Mutex
和读写锁sync.RWMutex
.除此以外,还提供了一个保证函数只执行一次的函数sync.Once
.下面我们分别介绍。
4.1 sync.Mutex
排它锁的使用很简单,利用sync.Mutex,我们可以保证同一时刻只有一个goroutine可以访问临界区。看个例子:
不使用锁:
package main
import (
"fmt"
// "sync"
)
func main() {
done := make(chan bool, 100)
var a int
for i := 0; i < 100; i++ {
go func() {
for j := 0; j < 10000; j++ {
a++
}
done <- true
}()
}
for i := 0; i < 100; i++ {
<-done
}
fmt.Println("a:", a)
}
这里我们创建100个goroutine,每个goroutine里面循环10000,每次都对a加1。抛开并发的概念的话,最终a的值应该是100*10000=1000000.但实际我们运行程序却会发现每次运行a的值都不一样,且都比1000000小。这个就是因为100个goroutine并发执行的结果。下面我们对操作a的地方都加上锁:
package main
import (
"fmt"
"sync"
)
func main() {
done := make(chan bool, 100)
var mu sync.Mutex
var a int
for i := 0; i < 100; i++ {
go func() {
for j := 0; j < 10000; j++ {
mu.Lock()
a++
mu.Unlock()
}
done <- true
}()
}
for i := 0; i < 100; i++ {
<-done
}
fmt.Println("a:", a)
}
这样同一时刻只有一个goroutine可以操作a,便解决了并发的问题,多次运行程序a的值都是1000000。当然我们发现程序执行的时间也变长了。
sync.Mutex类型只有Lock和Unlock两个函数,且默认值(零值)为unlock状态。Lock和Unlock必须成对。
4.2 sync.RWMutex
sync.RWMutex为读写锁,所谓读写锁就是对于临界区资源,可以有多个人同时去读,但同一时刻只能有一个人写。即只存在两种状态:①一个人或多个人读临界区资源,没有人写临界区资源②只有1个人写临界区资源且无人读临界区资源。简单说就是读和读不互斥,但读和写互斥。
sync.RWMutex类型有四个主要函数(假设rw为sync.RWMutex类型的变量):
- rw.Lock():锁住rw用于写,如果已经有人锁住rw,不论是读或者写,则操作被阻塞。
- rw.Unlock():解锁rw,只能与rw.Lock配合使用。
- rw.RLock():锁住rw用于读。
- rw.RUlock():解锁rw,只能与rw.RLock配合使用。
关于读写锁,需要注意必须配合使用,即Lock和Unlock成对使用(读),RLock和RUlock成对使用(写)。
这里我们举一个简单的例子:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
done := make(chan bool, 6)
var rw sync.RWMutex
go func() {
rw.RLock()
fmt.Println("Enter goroutine 1")
time.Sleep(time.Second * 5)
fmt.Println("Exit goroutine 1")
rw.RUnlock()
done <- true
}()
go func() {
rw.RLock()
fmt.Println("Enter goroutine 2")
time.Sleep(time.Second * 5)
fmt.Println("Exit goroutine 2")
rw.RUnlock()
done <- true
}()
go func() {
rw.RLock()
fmt.Println("Enter goroutine 3")
time.Sleep(time.Second * 5)
fmt.Println("Exit goroutine 3")
rw.RUnlock()
done <- true
}()
go func() {
rw.Lock()
fmt.Println("Enter goroutine 4")
time.Sleep(time.Second * 5)
fmt.Println("Exit goroutine 4")
rw.Unlock()
done <- true
}()
go func() {
rw.Lock()
fmt.Println("Enter goroutine 5")
time.Sleep(time.Second * 5)
fmt.Println("Exit goroutine 5")
rw.Unlock()
done <- true
}()
go func() {
rw.Lock()
fmt.Println("Enter goroutine 6")
time.Sleep(time.Second * 5)
fmt.Println("Exit goroutine 6")
rw.Unlock()
done <- true
}()
for i := 0; i < 6; i++ {
<-done
}
}
上面的例子中,前3个goroutine都申请的是读锁,所以他们几个是可以同时进入临界区的,而后三个申请的都是写锁,它们要进入临界区的条件是没有任何人在临界区中,且它进入临界区后,其他所有人都不能再进入临界区。
关于Go的两种锁(排他锁和读写锁)我们需要注意以下两点:
- 只有加锁后才可以解锁,否则将导致运行时错误(runtime error)。比如,如果一个锁mu(排他锁)或者rw(读写锁)并没有调用过Lock/RLock,我们就在上面调用Unlock/RUlock的话,将导致运行时错误。所以,锁必须成对使用。
- 锁虽然必须成对使用,但可以在这个goroutine中加锁后,在另外一个goroutine里面解锁。即锁状态不是和goroutine绑定的。
4.3 sync.Once
这个锁的典型使用场景为:我们有一个程序有许多goroutine,但是他们都共享了一个全局资源,这个资源可以由任意一个且只能有一个goroutine初始化,而且这个初始化一般还是比较耗时的。这种情况下,我们就可以将初始化的动作写成一个函数,然后使用这个锁去调用这个函数,那么这个函数将只被执行一次。
这里我们用一个例子进行说明,例子中,有一个全局的map,里面记录了图标和图标所对应的图片,程序(该程序包含多个goroutine)启动时,任意一个goroutine需要检查这个map是否已经被初始化,若干没有,就初始化这个map,但是因为图片很多,初始化动作比较慢。很显然,因为map是个全局资源,这个初始化动作只能由一个goroutine去完成。下面看我们是怎么去实现的:
package main
import (
"fmt"
"image"
"sync"
"time"
)
var icons map[string]image.Image
var loadIconsOnce sync.Once
func loadIcon(name string) {
// do something
return
}
func loadIcons() {
icons = map[string]image.Image{
"a.png": loadIcon("a.png"),
"b.png": loadIcon("b.png"),
"c.png": loadIcon("c.png"),
...
}
}
func Icon(name string) image.Image {
loadIconsOnce.Do(loadIcons)
return icons[name]
}
我们声明了一个sync.Once变量,该类型有一个函数Do,其参数为一个不带任何参数,也没有返回值的函数。它的特性是:如果有多个人调用xxx.Do(f),那么只有第一个调用的人会执行函数f。对于这种场景,我们在其他语言中的,我们一般是使用一个排它锁去实现,没错,Go中的sync.Once底层实现也是借助排它锁和一个无符号整数变量实现的。Go的源代码如下:
// Do calls the function f if and only if Do is being called for the
// first time for this instance of Once. In other words, given
// var once Once
// if once.Do(f) is called multiple times, only the first call will invoke f,
// even if f has a different value in each invocation. A new instance of
// Once is required for each function to execute.
//
// Do is intended for initialization that must be run exactly once. Since f
// is niladic, it may be necessary to use a function literal to capture the
// arguments to a function to be invoked by Do:
// config.once.Do(func() { config.init(filename) })
//
// Because no call to Do returns until the one call to f returns, if f causes
// Do to be called, it will deadlock.
//
// If f panics, Do considers it to have returned; future calls of Do return
// without calling f.
//
func (o *Once) Do(f func()) {
if atomic.LoadUint32(&o.done) == 1 {
return
}
// Slow-path.
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
}
5. Goroutine和Thread
从前面的介绍中,我们可以看到宏观上Goroutine和Thread几乎没有区别。现在我们从“微观”看看二者的区别。
5.1 栈(stack)的区别
我们知道OS的thread都有一个固定大小的栈,一般为2MB。而goroutine的栈是可变大小的,一般初始值2KB,随着使用可以动态扩展,最大可至1GB。这样有什么好处呢?
- 一般的线程可能用不到2MB大小的栈,这对于资源是一个浪费,限制了系统thread的个数。而goroutine栈初始值为2KB,很节省资源。
- 对于一些深递归函数,2MB的栈空间很可能不够用,栈溢出导致段错误。而Goroutine的栈大小是动态调整的。所以总体而言,goroutine栈大小的机制对于资源利用的更加充分。
5.2 调度的区别
thread是系统级别的,是由内核负责调度的。线程切换是需要进行上下文切换(context switch),而且需要更新调度器的数据结构。这个切换过程是比较缓慢的(当然线程级的切换比进程切换还是快不少的),其中可能还涉及CPU与内存的结构(可了解CPU与内存的亲和性)。
而goroutine是应用级的,是有goroutine自己的调度器调度的(使用的是“m:n scheduling”算法)。首先它是应用级的,所以多个goroutine之间的切换比thread之间的切换更加轻量级,因为不涉及内核级别的上下文切换。而且它的调度算法也尽量避免了goroutine在多个CPU核之间的切换,有点类似于进程/线程绑核后的效果。
一般如果一个机器有N个CPU,那goroutine的调度器会一次将go程序运行在N个thread上面。当然我们也可以使用GOMAXPROCS来指定我们所要使用的CPU核数(不能超出机器实际的核数)。
简单理解就是,系统底层依旧是thread,但goroutine有自己的调度器,这个调度器会起n(n为GOMAXPROCS的值或者CPU的核数)个thread,然后它会将go程序分布在这n个thread上去执行,且保证同一个goroutine以及相关的代码一直只在一个thread上面运行,这样就可以尽量避免内核级别的线程切换,从而提高了效率。
这里推荐一个一篇关于Go调度器的文章《The Go scheduler》。
线程和协程只有内核态的切换吗?用户态的切换可以讲讲吗?线程和协程通信时如果访问同一资源的处理区别可以讲下吗?
我没太明白你的意思,如果只是在Go里面的话,是没有线程这个概念的。而所谓的goroutine(或者说协程)是用户态的,它实际运行时Go的调度器会将它调度到线程上面去运行,但是不是一对一的关系,具体的调度是由调度器去控制,而且也受
GOMAXPROCS
的控制,一般都是很多个goroutine运行在一组线程上面。所以,如果在Go里面的话,我理解是没有线程和协程相互切换的场景,因为二者没有在一个层级上面,不是平行的关系,自然也就不会有线程和协程访问同一资源的情况了。当然这些只是我自己的理解。另外,其实goroutine和协程(coroutines)不完全是一个概念/模型,二者还是有些差别,可参考https://golang.org/doc/effective_go.html#goroutines。
欢迎继续交流讨论~