亲宝软件园·资讯

展开

Go语言并发上下文

宇宙之一粟 人气:0

前言

相信大家以前在做阅读理解的时候,一定有从老师那里学一个技巧或者从参考答案看个:结合上下文。根据上下文我们能够找到有助于解题的相关信息,也能更加了解段落的思想。

在开发过程中,也有这个上下文(Context)的概念,而且上下文也必不可少,缺少上下文,就不能获取完整的程序信息。那么什么是程序中的上下文呢?

简单来说,就是在 API 之间或者函数调用之间,除了业务参数信息之外的额外信息。比如,服务器接收到客户端的 HTTP 请求之后,可以把客户端的 IP 地址和端口、客户端的身份信息、请求接收的时间、Trace ID 等信息放入到上下文中,这个上下文可以在后端的方法调用中传递。

1 Go 中的 Context

Golang 的上下文也是应用开发常用的并发控制工具。同理,上下文可以用于在程序中的 API 层或进程之间共享请求范围的数据,除此之外,Go 的 Context 库还提供取消信号(Cancel)以及超时机制(Timeout)。

Context 又被称为上下文,与 WaitGroup 不同的是,Context 对于派生 goroutine 有更强的控制力,可以管理多级的 goroutine。

但我们在 Go 中创建一个 goroutine 时,如果发生了一个错误,并且这个错误永远不会终止,而其他程序会继续进行。加入有一个不被调用的 goroutine 运行无限循环,如下所示:

package main
import "fmt"
func main() {
    dataCom := []string{"alex", "kyrie", "kobe"}
    go func(data []string) {
        // 模拟大量运算的死循环
    }(dataCom)
    // 其他代码正常执行
    fmt.Println("剩下的代码执行正常逻辑")
}

上面的例子并不完整,dataCom goroutine 可能会也可能不会成功处理数据。它可能会进入无限循环或导致错误。我们的其余代码将不知道发生了什么。

有多种方法可以解决这个问题。其中之一是使用通道向我们的主线程发送一个信号,表明这个 goroutine 花费的时间太长,应该取消它。

package main
import (
	"fmt"
	"time"
)
func main() {
	stopChannel := make(chan bool)
	dataCom := []string{"alex", "kyrie", "kobe"}
	go func(stopChannel chan bool) {
		go func(data []string) {
			// 大量的计算
		}(dataCom)
		for range time.After(2 * time.Second) {
			fmt.Println("此操作运行时间过长,取消中")
			stopChannel <- true
		}
	}(stopChannel)
	<-stopChannel
	// 其他代码正常执行
	fmt.Println("剩下的代码执行正常逻辑")
}

上面的逻辑很简单。我们正在使用一个通道向我们的主线程发出这个 goroutine 花费的时间太长的信号。但是同样的事情可以用 context 来完成,这正是 context 包存在的原因。

package main
import (
	"context"
	"fmt"
	"time"
)
func main() {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
	defer cancel()
	dataCom := []string{"alex", "kyrie", "kobe"}
	go func() {
		go func(data []string) {
			// 大量的计算
		}(dataCom)
		for range time.After(2 * time.Second) {
			fmt.Println("此操作运行时间过长,取消中")
			cancel()
			return
		}
	}()
	select {
	case <-ctx.Done():
		fmt.Println("上下文被取消")
	}
}

2 Context 接口

Context 接口定义:

type Context interface {
  Deadline() (deadline time.Time, ok bool)
  Done &lt;-chan struct{}
  Err() error
  Value(key interface{}) interface{}
}

Context 接口定义了 4 个方法:

每次创建新上下文时,都会得到一个符合此接口的类型。上下文的真正实现隐藏在这个包和这个接口后面。这些是您可以创建的工厂类型的上下文:

context.TODO

context.Background

context.WithCancel

context.WithValue

context.WithTimeout

context.WithDeadline

3 Context Tree

在实际实现中,我们通常使用派生上下文。我们创建一个父上下文并将其传递到一个层,我们派生一个新的上下文,它添加一些额外的信息并将其再次传递到下一层,依此类推。通过这种方式,我们创建了一个从作为父级的根上下文开始的上下文树。

这种结构的优点是我们可以一次性控制所有上下文的取消。如果根信号关闭了上下文,它将在所有派生的上下文中传播,这些上下文可用于终止所有进程,立即释放所有内容。这使得上下文成为并发编程中非常强大的工具。

4 创建上下文

4.1 上下文创建函数

我们可以从现有的上下文中创建或派生上下文。顶层(根)上下文是使用 BackgroundTODO 方法创建的,而派生上下文是使用 WithCancel、WithDeadline、WithTimeout 或 WithValue 方法创建的。

所有派生的上下文方法都返回一个取消函数 CancelFunc,但 WithValue 除外,因为它与取消无关。调用 CancelFunc 会取消子项及其子项,删除父项对子项的引用,并停止任何关联的计时器。调用 CancelFunc 失败会泄漏子项及其子项,直到父项被取消或计时器触发。

此函数返回一个空上下文。这通常只应在主请求处理程序或顶级请求处理程序中使用。这可用于为主函数、初始化、测试以及后续层或其他 goroutine 派生上下文的时候。

ctx, cancel := context.Background()

此函数返回一个非 nil 的、空的上下文。没有任何值、不会被 cancel,不会超时,也没有截止日期。但是,这也应该仅在您不确定要使用什么上下文或者该函数还不能用于接收上下文时,可以使用这个方法,并且将在将来需要添加时使用。

ctx, cancel := context.TODO()

这个函数接受一个上下文并返回一个派生的上下文,其中值 val 与 key 相关联,并与上下文一起经过上下文树。

WithValue 方法其实是创建了一个类型为 valueCtx 的上下文,它的类型定义如下:

type valueCtx struct {
    Context
    key, val interface{}
}

这意味着一旦你得到一个带有值的上下文,任何从它派生的上下文都会得到这个值。该值是不可变的,因此是线程安全的。

提供的键必须是可比较的,并且不应该是字符串类型或任何其他内置类型,以避免使用上下文的包之间发生冲突。 WithValue 的用户应该为键定义自己的类型。

为避免在分配给 interface{} 时进行分配,上下文键通常具有具体类型 struct{}。或者,导出的上下文键变量的静态类型应该是指针或接口。

package main
import (
  "context"
  "fmt"
)
type contextKey string
func main() {
  var authToken contextKey = "auth_token"
  ctx := context.WithValue(context.Background(), authToken, "Hello123456")
  fmt.Println(ctx.Value(authToken))
}

运行该代码:

$ go run .           
Hello123456

此函数接收父上下文并返回派生上下文,返回 parent 的副本,只是副本中的 Done Channel 是新建的对象,它的类型是 cancelCtx。在这个派生上下文中,添加了一个新的 Done channel,该 channel 在调用 cancel 函数或父上下文的 Done 通道关闭时关闭。

要记住的一件事是,我们永远不应该在不同的函数或层之间传递这个 cancel ,因为它可能会导致意想不到的结果。创建派生上下文的函数应该只调用取消函数。

下面是一个使用 Done 通道演示 goroutine 泄漏的示例:

package main
import (
  "context"
  "fmt"
  "math/rand"
  "time"
)
func main() {
  rand.Seed(time.Now().UnixNano())
  ctx, cancel := context.WithCancel(context.Background())
  defer cancel()
  for char := range randomCharGenerator(ctx) {
    generatedChar := string(char)
    fmt.Printf("%v\n", generatedChar)
    if generatedChar == "o" {
      break
    }
  }
}
func randomCharGenerator(ctx context.Context) <-chan int {
  char := make(chan int)
  seedChar := int('a')
  go func() {
    for {
      select {
      case <-ctx.Done():
        fmt.Printf("found %v", seedChar)
        return
      case char <- seedChar:
        seedChar = 'a' + rand.Intn(26)
      }
    }
  }()
  return char
}

运行结果:

$ go run .           
a
m
q
c
l
t
o

此函数从其父级返回派生上下文,返回一个 parent 的副本。

当期限超过或调用取消函数时,该派生上下文将被取消。例如,您可以创建一个在未来某个时间自动取消的上下文,并将其传递给子函数。当该上下文由于截止日期用完而被取消时,所有获得该上下文的函数都会收到通知停止工作并返回。如果 parent 的截止日期已经早于 d,则上下文的 Done 通道已经关闭。

下面是我们正在读取一个大文件的示例,该文件的截止时间为当前时间 2 毫秒。我们将获得 2 毫秒的输出,然后将关闭上下文并退出程序。

package main
import (
    "bufio"
    "context"
    "fmt"
    "log"
    "os"
    "time"
)
func main() {
    // context with deadline after 2 millisecond
    ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Millisecond))
    defer cancel()
    lineRead := make(chan string)
    var fileName = "sample-file.txt"
    file, err := os.Open(fileName)
    if err != nil {
        log.Fatalf("failed opening file: %s", err)
    }
    scanner := bufio.NewScanner(file)
    scanner.Split(bufio.ScanLines)
    // goroutine to read file line by line and passing to channel to print
    go func() {
        for scanner.Scan() {
            lineRead <- scanner.Text()
        }
        close(lineRead)
        file.Close()
    }()
outer:
    for {
        // printing file line by line until deadline is reached
        select {
        case <-ctx.Done():
            fmt.Println("process stopped. reason: ", ctx.Err())
            break outer
        case line := <-lineRead:
            fmt.Println(line)
        }
    }
}

这个函数类似于 context.WithDeadline。不同之处在于它将持续时间作为输入而不是时间对象。此函数返回一个派生上下文,如果调用取消函数或超过超时持续时间,该上下文将被取消。

WithTimeout 的实现是:

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
    // 当前时间+timeout就是deadline
    return WithDeadline(parent, time.Now().Add(timeout))
}

WithTimeout 返回 WithDeadline(parent, time.Now().Add(timeout))

package main
import (
    "bufio"
    "context"
    "fmt"
    "log"
    "os"
    "time"
)
func main() {
    // context with deadline after 2 millisecond
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Millisecond)
    defer cancel()
    lineRead := make(chan string)
    var fileName = "sample-file.txt"
    file, err := os.Open(fileName)
    if err != nil {
        log.Fatalf("failed opening file: %s", err)
    }
    scanner := bufio.NewScanner(file)
    scanner.Split(bufio.ScanLines)
    // goroutine to read file line by line and passing to channel to print
    go func() {
        for scanner.Scan() {
            lineRead <- scanner.Text()
        }
        close(lineRead)
        file.Close()
    }()
outer:
    for {
        // printing file line by line until deadline is reached
        select {
        case <-ctx.Done():
            fmt.Println("process stopped. reason: ", ctx.Err())
            break outer
        case line := <-lineRead:
            fmt.Println(line)
        }
    }
}

如果父上下文的 Done 通道关闭,它最终将关闭所有派生的 Done 通道(所有后代),如:

package main
import (
    "context"
    "fmt"
    "time"
)
func main() {
    c := make(chan string)
    go func() {
        time.Sleep(1 * time.Second)
        c <- "one"
    }()
    ctx1 := context.Context(context.Background())
    ctx2, cancel2 := context.WithTimeout(ctx1, 2*time.Second)
    ctx3, cancel3 := context.WithTimeout(ctx2, 10*time.Second) // derives from ctx2
    ctx4, cancel4 := context.WithTimeout(ctx2, 3*time.Second)  // derives from ctx2
    ctx5, cancel5 := context.WithTimeout(ctx4, 5*time.Second)  // derives from ctx4
    cancel2()
    defer cancel3()
    defer cancel4()
    defer cancel5()
    select {
    case <-ctx3.Done():
        fmt.Println("ctx3 closed! error: ", ctx3.Err())
    case <-ctx4.Done():
        fmt.Println("ctx4 closed! error: ", ctx4.Err())
    case <-ctx5.Done():
        fmt.Println("ctx5 closed! error: ", ctx5.Err())
    case msg := <-c:
        fmt.Println("received", msg)
    }
}

在这里,由于我们在创建其他派生上下文后立即关闭 ctx2,因此所有其他上下文也会立即关闭,随机打印 ctx3、ctx4 和 ctx5 关闭消息。 ctx5 是从 ctx4 派生的,由于 ctx2 关闭的级联效应,它正在关闭。尝试多次运行,您会看到不同的结果。

使用 Background 或 TODO 方法创建的上下文没有取消、值或截止日期。

package main
import (
    "context"
    "fmt"
)
func main() {
    ctx := context.Background()
    _, ok := ctx.Deadline()
    if !ok {
        fmt.Println("no dealine is set")
    }
    done := ctx.Done()
    if done == nil {
        fmt.Println("channel is nil")
    }
}

4.2 Context 使用规范

func DoSomething(ctx context.Context, arg Arg) error {
    // ... use ctx ...
}

4.3 Context 使用场景

5 总结

Context 是在 Go 中进行并发编程时最重要的工具之一。上下文的主要作用是在多个 Goroutine 或者模块之间同步取消信号或者截止日期,用于减少对资源的消耗和长时间占用,避免资源浪费。标准库中的 database/sql、os/exec、net、net/http 等包中都使用到了 Context。

参考链接:

Go Concurrency Patterns: Context

Simplifying context in Go

加载全部内容

相关教程
猜你喜欢
用户评论