Golang并发编程

GO并发编程(Concurrency)

通过通信共享内存(Share by communicating)

并发编程是一个广泛的话题,这里只讨论一些与 Go 语言相关的重点。

在许多环境中,并发编程的难点在于如何正确访问共享变量。Go 语言鼓励一种不同的方法:

通过通道(channel)传递共享值,而不是让多个执行线程主动共享内存。

在任何给定时间,只有一个 goroutine 可以访问该值。通过设计,数据竞争不会发生。为了鼓励这种思维方式,我们将其简化为一个口号:

不要通过共享内存来通信;相反,通过通信来共享内存。

这种方法有时可能被过度使用。例如,引用计数可能最好通过在一个整数变量周围加锁来实现。但作为一种高级方法,使用通道来控制访问可以更容易编写清晰、正确的程序。

这种模型的一个思考方式是考虑一个在单个 CPU 上运行的典型单线程程序。它不需要同步原语。

现在运行另一个这样的实例;它也不需要同步。现在让这两个实例进行通信;如果通信本身就是同步器,那么仍然不需要其他同步。例如,Unix 管道完美地符合这种模型。尽管 Go 的并发方法源自 Hoare 的通信顺序进程(CSP),但它也可以被视为 Unix 管道的一种类型安全的泛化。

Goroutines

它们被称为 goroutines,因为现有的术语(线程、协程、进程等)传达了不准确的含义。Goroutine 有一个简单的模型:它是一个与同一地址空间中的其他 goroutine 并发执行的函数。它是轻量级的,成本几乎只比分配栈空间多一点。而且栈一开始很小,所以它们很便宜,并且根据需要分配(和释放)堆存储来增长。

Goroutines 被多路复用到多个操作系统线程上,因此如果一个 goroutine 阻塞(例如等待 I/O),其他 goroutine 可以继续运行。它们的设计隐藏了许多线程创建和管理的复杂性。

在函数或方法调用前加上 go 关键字,可以在一个新的 goroutine 中运行该调用。当调用完成时,goroutine 会静默退出。(效果类似于 Unix shell 的 & 符号,用于在后台运行命令。)

1
go list.Sort()  // 并发运行 list.Sort;不等待它完成。

在 goroutine 调用中,函数字面量(匿名函数)非常方便。

1
2
3
4
5
6
func Announce(message string, delay time.Duration) {
go func() {
time.Sleep(delay)
fmt.Println(message)
}() // 注意括号 - 必须调用函数。
}

在 Go 中,函数字面量是闭包:实现确保函数引用的变量在它们活跃时一直存在。

这些例子不太实用,因为函数没有办法通知完成。为此,我们需要通道。

通道(Channels)

与映射(map)一样,通道是用 make 分配的,结果值是对底层数据结构的引用。如果提供了一个可选的整数参数,它会设置通道的缓冲区大小。默认值为零,表示无缓冲或同步通道。

1
2
3
ci := make(chan int)            // 无缓冲的整数通道
cj := make(chan int, 0) // 无缓冲的整数通道
cs := make(chan *os.File, 100) // 缓冲的文件指针通道

无缓冲通道将通信(值的交换)与同步(保证两个计算(goroutine)处于已知状态)结合在一起。

使用通道有很多好的习惯用法。这里有一个例子。在上一节中,我们在后台启动了一个排序。通道可以让启动的 goroutine 等待排序完成。

1
2
3
4
5
6
7
8
c := make(chan int)  // 分配一个通道。
// 在 goroutine 中启动排序;完成后,通过通道发送信号。
go func() {
list.Sort()
c <- 1 // 发送信号;值不重要。
}()
doSomethingForAWhile()
<-c // 等待排序完成;丢弃发送的值。

接收者总是阻塞,直到有数据可以接收。如果通道是无缓冲的,发送者会阻塞,直到接收者接收到值。如果通道有缓冲区,发送者只会在值被复制到缓冲区之前阻塞;如果缓冲区已满,这意味着等待某个接收者检索一个值。

缓冲通道可以像信号量一样使用,例如限制吞吐量。在这个例子中,传入的请求被传递给 handle,它向通道发送一个值,处理请求,然后从通道接收一个值,以便为下一个消费者准备好“信号量”。通道缓冲区的容量限制了同时调用 process 的数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
var sem = make(chan int, MaxOutstanding)

func handle(r *Request) {
sem <- 1 // 等待活动队列排空。
process(r) // 可能需要很长时间。
<-sem // 完成;启用下一个请求运行。
}

func Serve(queue chan *Request) {
for {
req := <-queue
go handle(req) // 不要等待 handle 完成。
}
}

一旦 MaxOutstanding 个处理程序正在执行 process,任何更多的处理程序都会阻塞,尝试向已满的通道缓冲区发送数据,直到现有的处理程序之一完成并从缓冲区接收数据。

不过,这个设计有一个问题:Serve 为每个传入的请求创建一个新的 goroutine,即使在任何时刻只有 MaxOutstanding 个 goroutine 可以运行。因此,如果请求来得太快,程序可能会消耗无限的资源。我们可以通过修改 Serve 来限制 goroutine 的创建来解决这个问题:

1
2
3
4
5
6
7
8
9
func Serve(queue chan *Request) {
for req := range queue {
sem <- 1
go func() {
process(req)
<-sem
}()
}
}

(注意,在 Go 1.22 之前的版本中,这段代码有一个 bug:循环变量在所有 goroutine 之间共享。详情请参阅 Go wiki。)

另一种管理资源的方法是启动固定数量的 handle goroutine,它们都从请求通道中读取。goroutine 的数量限制了同时调用 process 的数量。这个 Serve 函数还接受一个通道,用于通知它退出;在启动 goroutine 后,它会阻塞接收该通道。

1
2
3
4
5
6
7
8
9
10
11
12
13
func handle(queue chan *Request) {
for r := range queue {
process(r)
}
}

func Serve(clientRequests chan *Request, quit chan bool) {
// 启动处理程序
for i := 0; i < MaxOutstanding; i++ {
go handle(clientRequests)
}
<-quit // 等待被告知退出。
}

通道的通道(Channels of channels)

Go 最重要的特性之一是通道是一等值,可以像其他值一样分配和传递。这个特性的一个常见用途是实现安全的并行多路分解。

在上一节的例子中,handle 是一个理想化的请求处理程序,但我们没有定义它处理的类型。如果该类型包含一个用于回复的通道,每个客户端都可以提供自己的答案路径。以下是 Request 类型的示意定义。

1
2
3
4
5
type Request struct {
args []int
f func([]int) int
resultChan chan int
}

客户端提供一个函数及其参数,以及请求对象中的一个通道,用于接收答案。

1
2
3
4
5
6
7
8
9
10
11
12
func sum(a []int) (s int) {
for _, v := range a {
s += v
}
return
}

request := &Request{[]int{3, 4, 5}, sum, make(chan int)}
// 发送请求
clientRequests <- request
// 等待响应。
fmt.Printf("answer: %d\n", <-request.resultChan)

在服务器端,处理函数是唯一改变的部分。

1
2
3
4
5
func handle(queue chan *Request) {
for req := range queue {
req.resultChan <- req.f(req.args)
}
}

显然,要使它更现实,还有很多工作要做,但这段代码是一个限速、并行、非阻塞 RPC 系统的框架,而且没有使用任何互斥锁。

并行化(Parallelization)

这些思想的另一个应用是在多个 CPU 核心上并行化计算。如果计算可以分解为可以独立执行的单独部分,那么它可以并行化,并使用通道来通知每个部分何时完成。

假设我们有一个对向量中的项目执行昂贵操作的需求,并且每个项目的操作值是独立的,如这个理想化的例子所示。

1
2
3
4
5
6
7
8
9
type Vector []float64

// 对 v[i], v[i+1] ... 直到 v[n-1] 应用操作。
func (v Vector) DoSome(i, n int, u Vector, c chan int) {
for ; i < n; i++ {
v[i] += u.Op(v[i])
}
c <- 1 // 发送信号表示这部分已完成
}

我们在循环中独立启动这些部分,每个 CPU 一个。它们可以以任何顺序完成,但这并不重要;我们只需在启动所有 goroutine 后通过排空通道来计算完成信号。

1
2
3
4
5
6
7
8
9
10
11
12
13
const numCPU = 4 // CPU 核心数

func (v Vector) DoAll(u Vector) {
c := make(chan int, numCPU) // 缓冲是可选的,但明智的。
for i := 0; i < numCPU; i++ {
go v.DoSome(i*len(v)/numCPU, (i+1)*len(v)/numCPU, u, c)
}
// 排空通道。
for i := 0; i < numCPU; i++ {
<-c // 等待一个任务完成
}
// 全部完成。
}

与其为 numCPU 创建一个常量值,我们可以询问运行时什么值是合适的。函数 runtime.NumCPU 返回机器中的硬件 CPU 核心数,因此我们可以这样写:

1
var numCPU = runtime.NumCPU()

还有一个函数 runtime.GOMAXPROCS,它报告(或设置)用户指定的 Go 程序可以同时运行的 CPU 核心数。它默认为 runtime.NumCPU 的值,但可以通过设置同名的 shell 环境变量或调用该函数并传递一个正数来覆盖。传递零只是查询该值。因此,如果我们想尊重用户的资源请求,我们应该这样写:

1
var numCPU = runtime.GOMAXPROCS(0)

确保不要混淆并发(将程序结构化为独立执行的组件)和并行(在多个 CPU 上并行执行计算以提高效率)的概念。尽管 Go 的并发特性可以使一些问题容易结构化为并行计算,但 Go 是一种并发语言,而不是并行语言,并非所有并行化问题都适合 Go 的模型。有关区别的讨论,请参阅此博客文章中引用的演讲。

漏桶(Leaky buffer)

并发编程的工具甚至可以使非并发的思想更容易表达。这里有一个从 RPC 包中抽象出来的例子。客户端 goroutine 循环从某个源(可能是网络)接收数据。为了避免分配和释放缓冲区,它保留一个空闲列表,并使用一个缓冲通道来表示它。如果通道为空,则分配一个新的缓冲区。一旦消息缓冲区准备好,它就会被发送到 serverChan 上的服务器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var freeList = make(chan *Buffer, 100)
var serverChan = make(chan *Buffer)

func client() {
for {
var b *Buffer
// 如果有可用的缓冲区,则获取;否则分配一个新的。
select {
case b = <-freeList:
// 获取一个;无需做更多操作。
default:
// 没有空闲的,所以分配一个新的。
b = new(Buffer)
}
load(b) // 从网络读取下一条消息。
serverChan <- b // 发送到服务器。
}
}

服务器循环接收来自客户端的每条消息,处理它,并将缓冲区返回到空闲列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
func server() {
for {
b := <-serverChan // 等待工作。
process(b)
// 如果有空间,则重用缓冲区。
select {
case freeList <- b:
// 缓冲区在空闲列表上;无需做更多操作。
default:
// 空闲列表已满,继续执行。
}
}
}

客户端尝试从 freeList 中检索一个缓冲区;如果没有可用的,则分配一个新的。服务器的发送到 freeListb 放回空闲列表,除非列表已满,在这种情况下,缓冲区会被丢弃,由垃圾回收器回收。(select 语句中的 default 子句在没有其他 case 准备就绪时执行,这意味着 select 永远不会阻塞。)这个实现在几行代码中构建了一个漏桶空闲列表,依赖于缓冲通道和垃圾回收器进行簿记。


Golang并发编程
https://mfzzf.github.io/2025/03/18/golang-concurrency/
作者
Mzzf
发布于
2025年3月18日
许可协议