跳转至

并发编程

类似 Java 需要 volatile 修饰,Go 没有保证一个协程的写入对另一个协程立即可见,除非使用同步原语(如互斥锁或原子操作)

var flag int32
atomic.StoreInt32(&flag, 1) // 写入(对其他 goroutine 可见)
value := atomic.LoadInt32(&flag) // 读取(获取最新值)

Channel

环形缓冲区

sendx / recvx 两个指针在缓冲数组中达到末尾后重新从起点开始

使用

接收(阻塞式):

<- ch // 丢弃
v := <-ch
v, ok := <-ch 

chan 为 nil 时也会发生阻塞

关闭

chan 关闭后,

  • chan 中还有数据:可以正常读出数据
  • chan 中没有数据了:不会发生阻塞,而是读出零值

特别地,使用 for range 持续读取 channel 时,若 chan 关闭,则循环退出

重复关闭 chan 会产生 panic,对已关闭的 chan 发送消息会产生 panic,关闭未初始化(没有通过 make 创建)的 chan 会产生 panic

Select

原子操作的特性

select {
case: a <-ch1
case: ch2 <-b
default
}

无 default 时会发生阻塞,直到某一种情况被满足

按序加锁,乱序轮询

context

接口定义

type Context interface {
    Deadline() (deadline time.Time, ok bool)
    Done() <-chan struct{} // 返回一个只读的 chan
    Err() error
    Value(key interface{}) interface{}
}

Done 这个 chan 不会被写入,只会被关闭(读一个被关闭,且无数据的 chan,会拿到一个零值)

context 包中提供的 context.Backgroundcontext.TODOcontext.WithDeadlinecontext.WithValue 函数会返回实现该接口的私有结构体

demo - 控制子 goroutine

子 goroutine 通过监听 ctx.Done 来接受父协程的控制

一个经典的样例:

var wg sync.WaitGroup

func worker1(ctx context.Context) {

    go worker2(ctx)

    for {
        fmt.Println("--worker1 is working")
        time.Sleep(time.Second)
        select {
        case <-ctx.Done():
            wg.Done()
            return
        default:
        }
    }
}


func worker2(ctx context.Context) {
    for {
        fmt.Println("--worker2 is working")
        time.Sleep(time.Second)
        select {
        case <-ctx.Done():
            wg.Done()
            return
        default:
        }
    }
}


func main() {
    wg.Add(2)

    ctx, cancel := context.WithCancel(context.Background())

    go worker1(ctx)

    go func() {
        time.Sleep(5 * time.Second)
        cancel()
    }()

    wg.Wait()

    fmt.Println("all work done")
}

创建 context

Background() 返回一个不为 nil,且为空的 Context 对象(没有超时时间,没有值)

func Background() Context {
    return background
}

func TODO() Context {
    return todo
}

这两种初始化方式均返回实现了 context.Context 的两个结构体对象

这两个函数只是互为别名。当前函数没有上下文作为入参时,一般使用 context.Background 来作为起始的上下文开始传递

With 系列函数

WithCancel

传入一个父 ctx,返回一个子 ctx 和一个可以被调用的取消函数

如果父context 的 Done 被关闭了,子context 会自动调用 cancel,也就是子 context 的 Done 也会被关闭

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

WithDeadline / WithTimeout

区别在于一个是到达绝对时间之后触发 Done(),一个是相对时间

如果父协程也有一个 deadline,则会取二者中更小的值作为子协程的 deadline

deadline := time.Now().Add(2 * time.Second)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()

select {
case <-time.After(3 * time.Second):
    fmt.Println("操作完成")
case <-ctx.Done():
    fmt.Println("截止时间到达:", ctx.Err())
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() // 确保资源释放

select {
case <-time.After(3 * time.Second):
    fmt.Println("操作完成")
case <-ctx.Done():
    fmt.Println("超时取消:", ctx.Err()) // 2秒后输出: "超时取消: context deadline exceeded"
}

最佳实践是,即使规定了超时时间,也需要使用 defer cancel(),这是因为调用 context.WithTimeout 时,底层会启动一个计时器,调用 cancel 可以在任务提前完成时,释放计时器相关资源

WithValue

适合存储令牌、TraceID 等

注意:

Go 的类型系统规定:即使底层类型相同,不同的命名类型也是不相等的。

这里 key 的类型,首先必须是可比较的(不能是 slice,map,func 等);其次不应该为 string 或者任何内置类型,以防止不同的包往 ctx 中存入了同名且同类型的 key 发生覆盖问题

每次调用 WithValue 会创建一个新的 context.Context,不会修改原本的(父 goroutine 的)

type TraceCode string
func demo(ctx context.Context) {
    traceCode, ok := ctx.Value(key).(string)
}
func main() {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    ctx = context.WithValue(ctx, TraceCode("TRACE_CODE"), "123456")
}

客户端超时取消示例

调用方:

type respData struct {
    resp *http.Response
    err  error
}

func doCall(ctx context.Context) {
    transport := http.Transport{
       DisableKeepAlives: true,     
    }
    client := http.Client{
        Transport: &transport,
    }

    respChan := make(chan *respData, 1)
    req, err := http.NewRequest("GET", "http://127.0.0.1:8000/", nil)
    if err != nil {
        fmt.Printf("new requestg failed, err:%v\n", err)
        return
    }
    req = req.WithContext(ctx) // 使用带超时的ctx创建一个新的client request
    var wg sync.WaitGroup
    wg.Add(1)
    defer wg.Wait()
    go func() {
        resp, err := client.Do(req) // http.Client 若检测到超时,立刻返回
        fmt.Printf("client.do resp:%v, err:%v\n", resp, err)
        rd := &respData{
            resp: resp,
            err:  err,
        }
        respChan <- rd
        wg.Done()
    }()

    select {
    case <-ctx.Done():
        fmt.Println("call api timeout")
    case result := <-respChan:
        fmt.Println("call server api success")
        if result.err != nil {
            fmt.Printf("call server api failed, err:%v\n", result.err)
            return
        }
        defer result.resp.Body.Close()
        data, _ := ioutil.ReadAll(result.resp.Body)
        fmt.Printf("resp:%v\n", string(data))
    }
}

func main() {
    // 定义一个100毫秒的超时
    ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
    defer cancel() // 调用cancel释放子goroutine资源
    doCall(ctx)
}

sync.Mutex

sync.Mutex 是不可重入锁

基于 chann 保护共享资源

var (
    semaphere = make(chan struct{}, 1)
    balance int
)

func Deposit(amount int) {
    semaphere <- struct{}
    balance += amount
    <- semaphere
}

func QueryBalance() {
    semaphere <- struct{}
    ret := balance
    <- semaphere
    return ret
}

sync.Mutex 应用

var (
    mu sync.Mutex
    balance int
)

func Balance() int {
    mu.Lock()
    defer mu.Unlock()
    return balance
}

并发编程应用

多个协程交替打印 1 ~ 100

使用 sync.WaitGroup 保证主协程等待所有协程退出

chan

func print(number chan int, done chan struct{}, wg *sync.WaitGroup) {
    wg.Add(1)
    defer func() {
        wg.Done()
        fmt.Println("done")
    }()
    for {
        select {
        case x := <-number:
            fmt.Println(x)
            if x == 100 {
                close(done)
                return
            }
            number<- x + 1
        case <-done:
            return
        }
    }
}

func main() {
    number := make(chan int)
    done := make(chan struct{})
    var wg sync.WaitGroup
    for i := 0; i < 4; i++ {
        go print(number, done, &wg)
    }
    number<- 1
    wg.Wait()
}

mutex

func main() {
    x := 0
    var mu sync.Mutex
    var wg sync.WaitGroup
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for {
                mu.Lock()
                if x == 100 {
                    mu.Unlock()
                    return
                }
                x += 1
                mu.Unlock()
            }
        }()
    }
    wg.Wait()
    fmt.Println(x)
}

注意:mutex 可以保证内存修改在不同协程间的可见性,可以不用 atomic