并发编程
类似 Java 需要 volatile 修饰,Go 没有保证一个协程的写入对另一个协程立即可见,除非使用同步原语(如互斥锁或原子操作)
var flag int32
atomic.StoreInt32(&flag, 1) // 写入(对其他 goroutine 可见)
value := atomic.LoadInt32(&flag) // 读取(获取最新值)
Channel¶
环形缓冲区¶
sendx / recvx 两个指针在缓冲数组中达到末尾后重新从起点开始
使用¶
接收(阻塞式):
chan 为 nil 时也会发生阻塞
关闭¶
chan 关闭后,
- chan 中还有数据:可以正常读出数据
- chan 中没有数据了:不会发生阻塞,而是读出零值
特别地,使用 for range 持续读取 channel 时,若 chan 关闭,则循环退出
重复关闭 chan 会产生 panic,对已关闭的 chan 发送消息会产生 panic,关闭未初始化(没有通过 make 创建)的 chan 会产生 panic
Select¶
原子操作的特性
无 default 时会发生阻塞,直到某一种情况被满足
按序加锁,乱序轮询
context¶
接口定义¶
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{} // 返回一个只读的 chan
Err() error
Value(key interface{}) interface{}
}
Done 这个 chan 不会被写入,只会被关闭(读一个被关闭,且无数据的 chan,会拿到一个零值)
context 包中提供的 context.Background、context.TODO、context.WithDeadline 和 context.WithValue 函数会返回实现该接口的私有结构体
demo - 控制子 goroutine¶
子 goroutine 通过监听 ctx.Done 来接受父协程的控制
一个经典的样例:
var wg sync.WaitGroup
func worker1(ctx context.Context) {
go worker2(ctx)
for {
fmt.Println("--worker1 is working")
time.Sleep(time.Second)
select {
case <-ctx.Done():
wg.Done()
return
default:
}
}
}
func worker2(ctx context.Context) {
for {
fmt.Println("--worker2 is working")
time.Sleep(time.Second)
select {
case <-ctx.Done():
wg.Done()
return
default:
}
}
}
func main() {
wg.Add(2)
ctx, cancel := context.WithCancel(context.Background())
go worker1(ctx)
go func() {
time.Sleep(5 * time.Second)
cancel()
}()
wg.Wait()
fmt.Println("all work done")
}
创建 context¶
Background() 返回一个不为 nil,且为空的 Context 对象(没有超时时间,没有值)
这两种初始化方式均返回实现了 context.Context 的两个结构体对象
这两个函数只是互为别名。当前函数没有上下文作为入参时,一般使用 context.Background 来作为起始的上下文开始传递
With 系列函数¶
WithCancel¶
传入一个父 ctx,返回一个子 ctx 和一个可以被调用的取消函数
如果父context 的 Done 被关闭了,子context 会自动调用 cancel,也就是子 context 的 Done 也会被关闭
WithDeadline / WithTimeout¶
区别在于一个是到达绝对时间之后触发 Done(),一个是相对时间
如果父协程也有一个 deadline,则会取二者中更小的值作为子协程的 deadline
deadline := time.Now().Add(2 * time.Second)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()
select {
case <-time.After(3 * time.Second):
fmt.Println("操作完成")
case <-ctx.Done():
fmt.Println("截止时间到达:", ctx.Err())
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() // 确保资源释放
select {
case <-time.After(3 * time.Second):
fmt.Println("操作完成")
case <-ctx.Done():
fmt.Println("超时取消:", ctx.Err()) // 2秒后输出: "超时取消: context deadline exceeded"
}
最佳实践是,即使规定了超时时间,也需要使用 defer cancel(),这是因为调用 context.WithTimeout 时,底层会启动一个计时器,调用 cancel 可以在任务提前完成时,释放计时器相关资源
WithValue¶
适合存储令牌、TraceID 等
注意:
Go 的类型系统规定:即使底层类型相同,不同的命名类型也是不相等的。
这里 key 的类型,首先必须是可比较的(不能是 slice,map,func 等);其次不应该为 string 或者任何内置类型,以防止不同的包往 ctx 中存入了同名且同类型的 key 发生覆盖问题
每次调用 WithValue 会创建一个新的 context.Context,不会修改原本的(父 goroutine 的)
type TraceCode string
func demo(ctx context.Context) {
traceCode, ok := ctx.Value(key).(string)
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
ctx = context.WithValue(ctx, TraceCode("TRACE_CODE"), "123456")
}
客户端超时取消示例¶
调用方:
type respData struct {
resp *http.Response
err error
}
func doCall(ctx context.Context) {
transport := http.Transport{
DisableKeepAlives: true,
}
client := http.Client{
Transport: &transport,
}
respChan := make(chan *respData, 1)
req, err := http.NewRequest("GET", "http://127.0.0.1:8000/", nil)
if err != nil {
fmt.Printf("new requestg failed, err:%v\n", err)
return
}
req = req.WithContext(ctx) // 使用带超时的ctx创建一个新的client request
var wg sync.WaitGroup
wg.Add(1)
defer wg.Wait()
go func() {
resp, err := client.Do(req) // http.Client 若检测到超时,立刻返回
fmt.Printf("client.do resp:%v, err:%v\n", resp, err)
rd := &respData{
resp: resp,
err: err,
}
respChan <- rd
wg.Done()
}()
select {
case <-ctx.Done():
fmt.Println("call api timeout")
case result := <-respChan:
fmt.Println("call server api success")
if result.err != nil {
fmt.Printf("call server api failed, err:%v\n", result.err)
return
}
defer result.resp.Body.Close()
data, _ := ioutil.ReadAll(result.resp.Body)
fmt.Printf("resp:%v\n", string(data))
}
}
func main() {
// 定义一个100毫秒的超时
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
defer cancel() // 调用cancel释放子goroutine资源
doCall(ctx)
}
sync.Mutex¶
sync.Mutex 是不可重入锁
基于 chann 保护共享资源¶
var (
semaphere = make(chan struct{}, 1)
balance int
)
func Deposit(amount int) {
semaphere <- struct{}
balance += amount
<- semaphere
}
func QueryBalance() {
semaphere <- struct{}
ret := balance
<- semaphere
return ret
}
sync.Mutex 应用¶
并发编程应用¶
多个协程交替打印 1 ~ 100
使用 sync.WaitGroup 保证主协程等待所有协程退出
chan
func print(number chan int, done chan struct{}, wg *sync.WaitGroup) {
wg.Add(1)
defer func() {
wg.Done()
fmt.Println("done")
}()
for {
select {
case x := <-number:
fmt.Println(x)
if x == 100 {
close(done)
return
}
number<- x + 1
case <-done:
return
}
}
}
func main() {
number := make(chan int)
done := make(chan struct{})
var wg sync.WaitGroup
for i := 0; i < 4; i++ {
go print(number, done, &wg)
}
number<- 1
wg.Wait()
}
mutex
func main() {
x := 0
var mu sync.Mutex
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
mu.Lock()
if x == 100 {
mu.Unlock()
return
}
x += 1
mu.Unlock()
}
}()
}
wg.Wait()
fmt.Println(x)
}
注意:mutex 可以保证内存修改在不同协程间的可见性,可以不用 atomic