Go 并发模式详解
Go 的并发模型基于 CSP(Communicating Sequential Processes),核心是 goroutine 和 channel。
Goroutine
Goroutine 是 Go 的轻量级线程,创建成本极低:
go func() {
fmt.Println("Hello from goroutine")
}()
一个程序可以轻松创建数十万个 goroutine。
Channel
Channel 是 goroutine 之间通信的管道:
ch := make(chan string)
go func() {
ch <- "hello"
}()
msg := <-ch
fmt.Println(msg)
常见并发模式
Fan-Out / Fan-In
多个 goroutine 读取同一个 channel(fan-out),多个 channel 的结果汇聚到一个 channel(fan-in):
func fanIn(channels ...<-chan string) <-chan string {
var wg sync.WaitGroup
out := make(chan string)
for _, ch := range channels {
wg.Add(1)
go func(c <-chan string) {
defer wg.Done()
for v := range c {
out <- v
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
Pipeline
将处理流程拆分为多个阶段,每个阶段通过 channel 连接:
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// 使用
c := gen(2, 3, 4)
out := sq(sq(c))
Worker Pool
固定数量的 worker 处理任务:
func worker(id int, jobs <-chan int, results chan<- int) {
for j := range jobs {
results <- j * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
for j := 1; j <= 9; j++ {
jobs <- j
}
close(jobs)
for a := 1; a <= 9; a++ {
<-results
}
}
总结
Go 的并发模型简洁而强大。用好 goroutine 和 channel,可以写出高效且易读的并发程序。