断路器是一种模式,可以在进行远程调用时保护接口不发生重复失败、挂起或超时。
在本文中,我将尝试实现一个简单的断路器。
断路器的三种状态:
- 关闭状态(Closed):断路器关闭,流量可以正常进入。此状态下断路器统计请求失败次数,如果达到了阈值就转为open状态。
- 打开/熔断状态(Open):断路器打开,即circuit-breaker熔断状态,此状态下拒绝所有流量,走降级逻辑。
-
半开状态(Half-Open):断路器半开状态,Open状态过一段时间(默认5s)转为此状态来尝试恢复。此状态时:允许有且仅一个请求进入,一旦请求成功就关闭断路器。请求失败就到Open状态(这样再过5秒才能转到半开状态)。
定义状态常量:
const (
StateClosed = iota
StateHalfOpen
StateOpen
)
断路器使用fails和stop通道来同步。Threshold字段是每秒错误个数。timeout字段单位是秒。
type CircuitBreaker struct {
Threshold int
Timeout int
state int
fails chan error
failureCount int
openDuration int
ticker *time.Ticker
stop chan struct{}
}
NewCircuitBreaker创建并返回一个新的断路器实例。需要设置阈值和超时参数。
func NewCircuitBreaker(threshold, timeout int) *CircuitBreaker {
return &CircuitBreaker{
Threshold: threshold,
Timeout: timeout,
}
}
Start方法启动断路器:
func (cb *CircuitBreaker) Start() {
cb.fails = make(chan error)
cb.ticker = time.NewTicker(time.Second)
cb.stop = make(chan struct{})
go func() {
defer close(cb.stop)
for {
select {
case err := <-cb.fails:
//忽略open状态下的错误
if cb.state == StateOpen {
continue
}
// 在半开状态下,如果没有错误时断路器转到closed状态
if cb.state == StateHalfOpen {
if err == nil {
cb.state = StateClosed
}
continue
}
// 在关闭状态下对零错误不做任何事情
if err == nil {
continue
}
// 在关闭状态下发送错误就增加失败计数。
cb.failureCount++
case <-cb.ticker.C:
// 半开状态下计时器超时不做任何事情
if cb.state == StateHalfOpen {
continue
}
// 在open状态下增加开路持续时间,并在每次计时器超时将断路器跳闸到半开路状态,尝试恢复
if cb.state == StateOpen {
cb.openDuration++
if cb.openDuration == cb.Timeout {
cb.state = StateHalfOpen
}
continue
}
// 如果失败计数达到阈值,断路器跳闸进入open状态,并在每次计时器超时在闭合状态重置开路持续时间
if cb.failureCount >= cb.Threshold {
cb.state = StateOpen
cb.openDuration = 0
}
// 在关闭状态下计时器超时就重置统计失败次数
cb.failureCount = 0
case <-cb.stop:
return
}
}
}()
}
Stop方法:停止断路器
func (cb *CircuitBreaker) Stop() {
cb.ticker.Stop()
cb.stop <- struct{}{}
<-cb.stop
cb.state = StateClosed
cb.failureCount = 0
close(cb.fails)
}
Fail方法,通知断路器调用接口失败:
func (cb *CircuitBreaker) Fail(err error) {
cb.fails <- err
}
State方法: 返回断路器状态。
func (cb *CircuitBreaker) State() int {
return cb.state
}
如果您运行以下代码并在一秒钟内执行三个或更多请求,断路器将跳至打开状态并返回404。
package main
import (
"errors"
"net/http"
circuitbreaker "github.com/ermanimer/design-patterns/circuit-breaker"
)
func main() {
// 创建一个新的断路器实例
cb := circuitbreaker.NewCircuitBreaker(3, 2)
//启动断路器
cb.Start()
defer cb.Stop()
// 定义一个http处理程序
http.HandleFunc("/sample-endpoint", func(rw http.ResponseWriter, r *http.Request) {
// 如果断路器不在闭合状态则返回错误
if cb.State() != circuitbreaker.StateClosed {
rw.WriteHeader(http.StatusNotFound)
return
}
// 模拟接口调用失败,通知断路器
err := sampleRemoteCall()
cb.Fail(err)
// 返回错误响应
rw.WriteHeader(http.StatusInternalServerError)
rw.Write([]byte(err.Error()))
})
// 启动http服务器
http.ListenAndServe(":8000", nil)
}
func sampleRemoteCall() error {
return errors.New("sample error")
}