本文旨在简洁地向您展示我自己花了很长时间组装起来的东西,从而节省您的时间和精力。希望您发现里面的内容是一个具有启发性的、实用的、真实场景的API并发案例。
API并发模式
如果您遵循下面列出的准则,就可以以最少的错误和工作量来构建一个高度并发的API。
1、使用go关键字来异步执行,可以达到并发的目的
2、创建和关闭专用通道,这样可以最大限度地减少内存泄漏和死锁的机会。
3、使用context.Context,停止不再需要的挂起请求。
4、使用指针代替channel返回结果,可以减少需要管理的channel数量。
5、通过<-chan error返回错误,您可以在返回响应之前等待阻塞操作完成。
就是这些!如果你坚持这5条规则,就可以写出可读的、整洁的Go代码,而且具有高度并发,不会出现死锁或内存泄漏。
代码示例
下面是使用这些规则的示例实现。希望可以说明如何写出易于阅读、测试和维护的高并发API代码。
API请求
步骤1:异步地发出所有API请求和阻塞操作。
// Piece 表示部分的响应结果
type Piece struct {
ID uint `json:"id"`
}
// getPiece 调用`GET /piece/:id`
func getPiece(ctx context.Context, id uint, piece *Piece) <-chan error {
out := make(chan error)
go func() {
// 关闭channel,合理管理内存
defer close(out)
// NewRequestWithContext在调用者取消ctx时会自动取消请求
req, err := http.NewRequestWithContext(
ctx,
"GET",
fmt.Sprintf("api.url.com/piece/%d", id),
nil,
)
if err != nil {
out <- err
return
}
// 发起请求
rsp, err := http.DefaultClient.Do(req)
if err != nil {
out <- err
return
} else if rsp.StatusCode != http.StatusOK {
out <- fmt.Errorf("%d: %s", rsp.StatusCode, rsp.Status)
return
}
// 将响应结果解析到piece
defer rsp.Body.Close()
if err := json.NewDecoder(rsp.Body).Decode(piece); err != nil {
out <- err
return
}
}()
return out
}
API响应
步骤2:将多个阻塞操作和API请求组合到一个响应结构体中
// Result是将并发检索的多个阻塞操作组合在一起
type Result struct {
FirstPiece *Piece `json:"firstPiece,omitempty"`
SecondPiece *Piece `json:"secondPiece,omitempty"`
ThirdPiece *Piece `json:"thirdPiece,omitempty"`
}
// GetResult提供API请求到处理程序
func GetResult(w http.ResponseWriter, r *http.Request) {
// 解析和验证请求参数…
// getResult立即停止如果http.Request被取消
var result Result
if err := <-getResult(r.Context(), &result); err != nil {
w.Write([]byte(err.Error()))
w.WriteHeader(http.StatusInternalServerError)
return
}
// 对响应结果进行序列号
bs, err := json.Marshal(&result)
if err != nil {
w.Write([]byte(err.Error()))
w.WriteHeader(http.StatusInternalServerError)
return
}
// 成功!
w.Write(bs)
w.WriteHeader(http.StatusOK)
}
// getResult 返回多个并发API调用的结果
func getResult(ctx context.Context, result *Result) <-chan error {
out := make(chan error)
go func() {
// 正确管理内存
defer close(out)
// 如果有一个请求失败,cancel func将允许我们停止所有挂起的请求
ctx, cancel := context.WithCancel(ctx)
// Merge将所有getPieces返回的errors统一到一个“<-chan error"中
//如果没有发生错误,Merge会等待所有<-chan error关闭
for err := range util.Merge(
getPiece(ctx, 1, result.FirstPiece),
getPiece(ctx, 2, result.SecondPiece),
getPiece(ctx, 3, result.ThirdPiece),
) {
if err != nil {
// 取消所有挂起的请求
cancel()
// 将错误传给调用者
out <- err
return
}
}
}()
return out
}
Merge函数
步骤3:实现一个聚合函数。即使你非常熟悉go,这里也可能是最复杂和最容易出错的部分。我建议直接复制黏贴这段代码到你自己的util包中。
package util
import (
"sync"
)
// 合并多个错误通道到一个错误通道中
func Merge(errChans ...<-chan error) <-chan error {
mergedChan := make(chan error)
// 创建WaitGroup等待所有的errChans关闭
var wg sync.WaitGroup
wg.Add(len(errChans))
go func() {
// 当所有的errchan都关闭时,关闭mergedChan
wg.Wait()
close(mergedChan)
}()
for i := range errChans {
go func(errChan <-chan error) {
// 等待每个errChan关闭
for err := range errChan {
if err != nil {
// 将每个errChan内容发送到mergedChan
mergedChan <- err
}
}
//通知WaitGroup其中一个errChans关闭
wg.Done()
}(errChans[i])
}
return mergedChan
}
总结
在Go中有许多方法来实现并发。根据作者经验,这是在构建API时实现并发的一种清晰而有效的方法,该API可以保持代码的整洁性并最小化内存管理错误。
希望这对Golang中的并发提供一个实用的说明。也希望在把这些信息拼凑在一起时,它能帮你节省一些时间和烦恼。