如果您正在运行HTTP服务器,并希望对用户请求进行限制,那么你可能首选Didip Kerabat的Tollbooth库。该第三方库得到了很好的维护,具有一系列特性和清晰的API。
但是,如果您想要一些简单且轻量级的内容(或者只是想学习),那么自己开发中间件来处理流量限制并不太难。在这篇文章中,我将介绍如何使用x/time/rate包来实现限流,它提供了一个令牌桶限流算法(注意:Tollbooth也是使用这个算法)。
如果你想一起动手实践,先创建一个demo目录,包含两个文件limit.go和main.go,然后初始化一个新的go模块。像这样:
$ mkdir ratelimit-demo
$ cd ratelimit-demo
$ touch limit.go main.go
$ go mod init example.com/ratelimit-demo
让我们从创建一个全局限流器开始,它对HTTP服务器接收到的所有请求进行限速。打开limit.go文件添加如下代码:
package main
import (
"net/http"
"golang.org/x/time/rate"
)
var limiter = rate.NewLimiter(1, 3)
func limit(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if limiter.Allow() == false {
http.Error(w, http.StatusText(429), http.StatusTooManyRequests)
return
}
next.ServeHTTP(w, r)
})
}
在上面的代码中使用rate.NewLimiter()函数初始化并返回一个新的限流器。它的签名是这样的:
func NewLimiter(r Limit, b int) *Limiter
限流的定义:
限流器控制允许事件发生的频率。它实现了一个大小为b的“令牌桶”,最初是满的,然后以每秒r个令牌的速度消耗。
或者用另一种方式来描述它——限流器允许你平均每秒消费r个令牌,在任何一次“突发”中最多消费b个令牌。所以在上面的代码中,我们的限流器允许每秒消耗1个令牌,最大突发大小为3。
在limit中间件函数中,每当中间件接收到HTTP请求时,我们都会调用全局限流器的Allow()方法。如果桶中没有令牌了,Allow()将返回false,我们将向用户发送一个429 Too Many Requests响应。否则,调用Allow()将消耗桶中的一个令牌,并将控制权传递给链中的下一个处理程序。
值得注意的是,Allow()方法背后的代码是由互斥锁保护的,可以安全地并发使用。
我们把这个限流器使用到http服务当中。打开main.go文件设置一个简单的web服务器,使用限流器中间件如下:
package main
import (
"log"
"net/http"
)
func main() {
mux := http.NewServeMux()
mux.HandleFunc("/", okHandler)
// Wrap the servemux with the limit middleware.
log.Println("Listening on :4000...")
http.ListenAndServe(":4000", limit(mux))
}
func okHandler(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("OK"))
}
运行应用程序…
$ go run .
如果你连续快速地发出足够多的请求,最终会得到这样的响应:
curl -i localhost:4000
HTTP/1.1 429 Too Many Requests
Content-Type: text/plain; charset=utf-8
X-Content-Type-Options: nosniff
Date: Thu, 21 Dec 2017 19:25:52 GMT
Content-Length: 18
Too Many Requests
对每个用户限速
虽然在某些情况下使用一个全局限流器是有用的,但另一个常见的场景是基于IP地址或API key等标识符为每个用户实现一个限流器。在本文中,我们将使用IP地址作为标识符。
一种直观的方法是创建限流器map,使用每个用户的标识符作为map的key。
此时,您可能会考虑在Go 1.9中引入的sync.map类型。它本质上提供了一个并发安全的映射,设计用于多个goroutine访问,而没有竞争条件的风险。但有一点需要注意:
sync.map优化后多用于map的keys随时间稳定的,一些稳态存储或存储每个键本地化到一个goroutine。如果不遵循该特点可能使用普通map结合互斥锁会更高效。
在我们的特定用例中,map的键值将是用户的IP地址,因此每当有新用户访问我们的应用程序时,都会向map添加新的键。当用户长时间没有访问,我们还希望通过从map中删除旧的键来防止不必要的内存消耗。
因此,在我们的例子中,map的键不是稳定的,而由互斥锁保护的普通映射可能会执行得更好。
让我们更新下limit.go文件包含一个简单实现。我将保持代码结构的简洁如下:
package main
import (
"log"
"net"
"net/http"
"sync"
"golang.org/x/time/rate"
)
//创建一个map来保存每个访问者和互斥锁的限流器。
var visitors = make(map[string]*rate.Limiter)
var mu sync.Mutex
// 检索并返回当前访问者的限流器(如果它已经存在的话)。否则创建一个新的限流器并将其添加到访问者map中,使用IP地址作为key。
func getVisitor(ip string) *rate.Limiter {
mu.Lock()
defer mu.Unlock()
limiter, exists := visitors[ip]
if !exists {
limiter = rate.NewLimiter(1, 3)
visitors[ip] = limiter
}
return limiter
}
func limit(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// 获取当前用户的IP地址
ip, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
log.Println(err.Error())
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
// 调用getVisitor函数获取当前用户的限流器。
limiter := getVisitor(ip)
if limiter.Allow() == false {
http.Error(w, http.StatusText(429), http.StatusTooManyRequests)
return
}
next.ServeHTTP(w, r)
})
}
从map中删除旧key-value
这样做有一个问题:只要应用程序还在运行,访问map就会继续无限制地增长。我们可以很简单地解决这个问题,记录每个访问者最后一次访问时间,并运行一个后台goroutine从映射中删除旧的条目(因此在运行过程中释放内存)。
package main
import (
"log"
"net"
"net/http"
"sync"
"time"
"golang.org/x/time/rate"
)
// 创建一个自定义visitor结构体,包含每个访问者的限流器和最后一次看到访问者的时间。
type visitor struct {
limiter *rate.Limiter
lastSeen time.Time
}
// 更改映射以保存类型visitor的值。
var visitors = make(map[string]*visitor)
var mu sync.Mutex
// 运行一个后台goroutine从访客map中删除旧的条目。
func init() {
go cleanupVisitors()
}
func getVisitor(ip string) *rate.Limiter {
mu.Lock()
defer mu.Unlock()
v, exists := visitors[ip]
if !exists {
limiter := rate.NewLimiter(1, 3)
// 在创建新访问者时,添加当前时间。
visitors[ip] = &visitor{limiter, time.Now()}
return limiter
}
// 更新访客最后一次出现的时间。
v.lastSeen = time.Now()
return v.limiter
}
// 每分钟检查map上有没有超过3分钟的访客,如果有删除。
func cleanupVisitors() {
for {
time.Sleep(time.Minute)
mu.Lock()
for ip, v := range visitors {
if time.Since(v.lastSeen) > 3*time.Minute {
delete(visitors, ip)
}
}
mu.Unlock()
}
}
func limit(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ip, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
log.Println(err.Error())
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
limiter := getVisitor(ip)
if limiter.Allow() == false {
http.Error(w, http.StatusText(429), http.StatusTooManyRequests)
return
}
next.ServeHTTP(w, r)
})
}
更多改进...
对于简单的应用程序,这段代码可以正常工作,但您可能希望根据您的需求进一步优化。例如:
- 如果你的服务是经过代理服务器的,检查请求头X-Forwarded-For或X-Real-IP。
- 将代码移植到独立的包中。
- 使限流器和清除设置在运行时可配置。
- 消除对全局变量的依赖,以便使用不同的设置创建不同的限流器。
- 使用sync.RWMutex减少map的竞争。