熔断、限流组件实现
通过时间获取当前bucket 并更新 核心代码:
// 文件/sentinel-golang/core/stat/base/leap_array.go
// func currentBucketOfTime
func (la *LeapArray) currentBucketOfTime(now uint64, bg BucketGenerator) (*BucketWrap, error) {
// 计算当前时间对应的窗口下标
idx := la.calculateTimeIdx(now)
// 计算当前时间对应的窗口的开始时间
bucketStart := calculateStartTime(now, la.bucketLengthInMs)
for {
// 获取旧窗口
old := la.array.get(idx)
// 如果旧窗口==nil则初始化(正常不会执行这部分代码)
if old == nil {
newWrap := &BucketWrap{
BucketStart: bucketStart,
Value: atomic.Value{},
}
newWrap.Value.Store(bg.NewEmptyBucket())
if la.array.compareAndSet(idx, nil, newWrap) {
return newWrap, nil
} else {
runtime.Gosched()
}
// 如果本次计算的开始时间等于旧窗口的开始时间,则认为窗口没有过期,直接返回
} else if bucketStart == atomic.LoadUint64(&old.BucketStart) {
return old, nil
// 如果本次计算的开始时间大于旧窗口的开始时间,则认为窗口过期尝试重置
} else if bucketStart > atomic.LoadUint64(&old.BucketStart) {
if la.updateLock.TryLock() {
old = bg.ResetBucketTo(old, bucketStart)
la.updateLock.Unlock()
return old, nil
} else {
runtime.Gosched()
}
......
}
}
总结
Sentienl Go 中实现底层指标本质是通过“时间轮”进行指标的数据统计和存储,在时间轮中借鉴slice的底层实现利用unsafe.Pointer和atomic配合对时间轮进行无锁的原子操作,极大的提升了性能。
Sentinel限流
核心流控参数介绍
Resource: "POST:/group/inner/v1/group/create", //资源名
TokenCalculateStrategy: flow.Direct, //当前流量控制器的Token计算策略。Direct表示直接使用字段 Threshold 作为阈值;WarmUp表示使用预热方式计算Token的阈值。
ControlBehavior: flow.Reject, //表示流量控制器的控制策略;Reject表示超过阈值直接拒绝,Throttling表示匀速排队。
Threshold: commonThreshold, //表示流控阈值
StatIntervalInMs: statIntervalInMs, // 规则对应的流量控制器的独立统计结构的统计周期
MaxQueueingTimeMs: maxQueueingTimeMs, // 匀速排队的最大等待时间,该字段仅仅对 Throttling ControlBehavior生效;
- 当流量非均匀 波动时候 选择匀速排队的方式 实现流控和类似消息队列的削峰的功能 对应的是漏桶算法
- 流控周期应适当 防止脉冲流量 击溃服务
- 因此尽量采用 匀速排队的控制策略,平滑掉流量
流控使用实例
gin 路由请求流控
//添加流控规则
_, err = flow.LoadRules([]*flow.Rule{
{
Resource: resName,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
Threshold: 10,
StatIntervalInMs: 1000,
MaxQueueingTimeMs: 1000,
},
})
// 添加触发流控时要执行的代码
limiterBlockFallback := sentinelPlugin.WithBlockFallback(func(ctx *gin.Context) {
ctx.AbortWithStatusJSON(http.StatusOK, stdout.NewStdResp(ctx, nil, -1, "超过流量限制啦!!!"))
})
//路由应用中间件
rg := r.Group("/group/inner").Use(sentinelPlugin.SentinelMiddleware(limiterBlockFallback)){
rg.POST("/v1/group/create", transport.CreateGroup)
}
代码块流控
//添加流控规则
_, err = flow.LoadRules([]*flow.Rule{
{
Resource: resName,
TokenCalculateStrategy: flow.Direct,
ControlBehavior: flow.Reject,
Threshold: 10,
StatIntervalInMs: 1000,
MaxQueueingTimeMs: 1000,
},
})
// 流控
e, b := sentinel.Entry("some-test", sentinel.WithTrafficType(base.Inbound))
if b != nil {
atomic.AddInt64(counter.block, 1)
} else {
// Be sure the entry is exited finally.
e.Exit()
atomic.AddInt64(counter.pass, 1)
}
系统自适应流控
- 在实际场景中我们追求的目标是 在系统不被拖垮的情况下,提高系统的吞吐率,而不是 load 一定要到低于某个阈值
代码引入
import "github.com/alibaba/sentinel-golang/core/system"
// 自适应流控,启发因子为 load1 >= 8
_, err := system.LoadRules([]*system.SystemRule{
{
MetricType:system.Load, //衡量标准
TriggerCount:8.0, //该维度下最大负载值
Strategy:system.BBR, //自适应策略 brr
},
})
几种模式:
文件地址:core/system/slot.go
MetricType:衡量标准几种选项
// Load represents system load1 in Linux/Unix.
Load MetricType = iota
// AvgRT represents the average response time of all inbound requests.
AvgRT
// Concurrency represents the concurrency of all inbound requests.
Concurrency
// InboundQPS represents the QPS of all inbound requests.
InboundQPS
// CpuUsage represents the CPU usage percentage of the system.
CpuUsage
TriggerCount:上述衡量标准对应的最大限值
Strategy: BBR基于TCP BBR思想的控制策略 或者无策略
具体做限制片段的实现:
func (s *AdaptiveSlot) doCheckRule(rule *Rule) (bool, string, float64) {
var msg string
threshold := rule.TriggerCount
switch rule.MetricType {
case InboundQPS:
qps := stat.InboundNode().GetQPS(base.MetricEventPass)
res := qps < threshold
if !res {
msg = "system qps check blocked"
}
return res, msg, qps
case Concurrency:
n := float64(stat.InboundNode().CurrentConcurrency())
res := n < threshold
if !res {
msg = "system concurrency check blocked"
}
return res, msg, n
case AvgRT:
rt := stat.InboundNode().AvgRT()
res := rt < threshold
if !res {
msg = "system avg rt check blocked"
}
return res, msg, rt
case Load:
l := system_metric.CurrentLoad()
if l > threshold {
if rule.Strategy != BBR || !checkBbrSimple() {
msg = "system load check blocked"
return false, msg, l
}
}
return true, "", l
case CpuUsage:
c := system_metric.CurrentCpuUsage()
if c > threshold {
if rule.Strategy != BBR || !checkBbrSimple() {
msg = "system cpu usage check blocked"
return false, msg, c
}
}
return true, "", c
default:
msg = "system undefined metric type, pass by default"
return true, msg, 0.0
}
}