Go 并发模式详解

Go 的并发模型基于 CSP(Communicating Sequential Processes),核心是 goroutine 和 channel。

Goroutine

Goroutine 是 Go 的轻量级线程,创建成本极低:

go func() {
    fmt.Println("Hello from goroutine")
}()

一个程序可以轻松创建数十万个 goroutine。

Channel

Channel 是 goroutine 之间通信的管道:

ch := make(chan string)

go func() {
    ch <- "hello"
}()

msg := <-ch
fmt.Println(msg)

常见并发模式

Fan-Out / Fan-In

多个 goroutine 读取同一个 channel(fan-out),多个 channel 的结果汇聚到一个 channel(fan-in):

func fanIn(channels ...<-chan string) <-chan string {
    var wg sync.WaitGroup
    out := make(chan string)

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan string) {
            defer wg.Done()
            for v := range c {
                out <- v
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

Pipeline

将处理流程拆分为多个阶段,每个阶段通过 channel 连接:

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

// 使用
c := gen(2, 3, 4)
out := sq(sq(c))

Worker Pool

固定数量的 worker 处理任务:

func worker(id int, jobs <-chan int, results chan<- int) {
    for j := range jobs {
        results <- j * 2
    }
}

func main() {
    jobs := make(chan int, 100)
    results := make(chan int, 100)

    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }

    for j := 1; j <= 9; j++ {
        jobs <- j
    }
    close(jobs)

    for a := 1; a <= 9; a++ {
        <-results
    }
}

总结

Go 的并发模型简洁而强大。用好 goroutine 和 channel,可以写出高效且易读的并发程序。

并发