分布式组件原理之熔断、限流组件实现


熔断、限流组件实现

Untitled

Untitled

Untitled

通过时间获取当前bucket 并更新  核心代码:

//  文件/sentinel-golang/core/stat/base/leap_array.go  
//  func currentBucketOfTime
func (la *LeapArray) currentBucketOfTime(now uint64, bg BucketGenerator) (*BucketWrap, error) {
   // 计算当前时间对应的窗口下标
   idx := la.calculateTimeIdx(now)
   // 计算当前时间对应的窗口的开始时间
   bucketStart := calculateStartTime(now, la.bucketLengthInMs)
 
   for {
     // 获取旧窗口
      old := la.array.get(idx)
      // 如果旧窗口==nil则初始化(正常不会执行这部分代码)
      if old == nil {
         newWrap := &BucketWrap{
            BucketStart: bucketStart,
            Value:       atomic.Value{},
         }
         newWrap.Value.Store(bg.NewEmptyBucket())
         if la.array.compareAndSet(idx, nil, newWrap) {
            return newWrap, nil
         } else {
            runtime.Gosched()
         }
      // 如果本次计算的开始时间等于旧窗口的开始时间,则认为窗口没有过期,直接返回
      } else if bucketStart == atomic.LoadUint64(&old.BucketStart) {
         return old, nil
      //  如果本次计算的开始时间大于旧窗口的开始时间,则认为窗口过期尝试重置
      } else if bucketStart > atomic.LoadUint64(&old.BucketStart) {
         if la.updateLock.TryLock() {
            old = bg.ResetBucketTo(old, bucketStart)
            la.updateLock.Unlock()
            return old, nil
         } else {
            runtime.Gosched()
         }
        ......
      }
}

总结

Sentienl Go 中实现底层指标本质是通过“时间轮”进行指标的数据统计和存储,在时间轮中借鉴slice的底层实现利用unsafe.Pointer和atomic配合对时间轮进行无锁的原子操作,极大的提升了性能。

Sentinel限流

核心流控参数介绍


Resource:               "POST:/group/inner/v1/group/create",  //资源名
TokenCalculateStrategy: flow.Direct,       //当前流量控制器的Token计算策略。Direct表示直接使用字段 Threshold 作为阈值;WarmUp表示使用预热方式计算Token的阈值。
ControlBehavior:        flow.Reject,   //表示流量控制器的控制策略;Reject表示超过阈值直接拒绝,Throttling表示匀速排队。
Threshold:              commonThreshold,   //表示流控阈值
StatIntervalInMs:       statIntervalInMs,  // 规则对应的流量控制器的独立统计结构的统计周期
MaxQueueingTimeMs:      maxQueueingTimeMs, //    匀速排队的最大等待时间,该字段仅仅对 Throttling ControlBehavior生效;
  • 当流量非均匀 波动时候 选择匀速排队的方式 实现流控和类似消息队列的削峰的功能 对应的是漏桶算法
  • 流控周期应适当 防止脉冲流量 击溃服务
  • 因此尽量采用 匀速排队的控制策略,平滑掉流量

流控使用实例

gin 路由请求流控
 
 
//添加流控规则
_, err = flow.LoadRules([]*flow.Rule{
        {
            Resource:               resName,
            TokenCalculateStrategy: flow.Direct,
            ControlBehavior:        flow.Reject,
            Threshold:              10,
            StatIntervalInMs:       1000,
            MaxQueueingTimeMs:      1000,
        },
    })
 
 
// 添加触发流控时要执行的代码
limiterBlockFallback := sentinelPlugin.WithBlockFallback(func(ctx *gin.Context) {
        ctx.AbortWithStatusJSON(http.StatusOK, stdout.NewStdResp(ctx, nil, -1, "超过流量限制啦!!!"))
    })
 
 
//路由应用中间件
rg := r.Group("/group/inner").Use(sentinelPlugin.SentinelMiddleware(limiterBlockFallback)){
    rg.POST("/v1/group/create", transport.CreateGroup)
}
 
 
 
 
 
 
 
 
代码块流控
 
 
//添加流控规则
_, err = flow.LoadRules([]*flow.Rule{
        {
            Resource:               resName,
            TokenCalculateStrategy: flow.Direct,
            ControlBehavior:        flow.Reject,
            Threshold:              10,
            StatIntervalInMs:       1000,
            MaxQueueingTimeMs:      1000,
        },
    })
 
 
 
 
// 流控
e, b := sentinel.Entry("some-test", sentinel.WithTrafficType(base.Inbound))
        if b != nil {
            atomic.AddInt64(counter.block, 1)
        } else {
            // Be sure the entry is exited finally.
            e.Exit()
            atomic.AddInt64(counter.pass, 1)
        }

系统自适应流控

  • 在实际场景中我们追求的目标是 在系统不被拖垮的情况下,提高系统的吞吐率,而不是 load 一定要到低于某个阈值

代码引入

import "github.com/alibaba/sentinel-golang/core/system"
 
// 自适应流控,启发因子为 load1 >= 8
_, err := system.LoadRules([]*system.SystemRule{
    {
        MetricType:system.Load,  //衡量标准
        TriggerCount:8.0,        //该维度下最大负载值 
        Strategy:system.BBR,     //自适应策略 brr
    },
})

几种模式:

文件地址:core/system/slot.go
 
 
MetricType:衡量标准几种选项
    // Load represents system load1 in Linux/Unix.
    Load MetricType = iota
    // AvgRT represents the average response time of all inbound requests.
    AvgRT
    // Concurrency represents the concurrency of all inbound requests.
    Concurrency
    // InboundQPS represents the QPS of all inbound requests.
    InboundQPS
    // CpuUsage represents the CPU usage percentage of the system.
    CpuUsage
 
 
 
 
 
TriggerCount:上述衡量标准对应的最大限值
 
 
Strategy: BBR基于TCP BBR思想的控制策略   或者无策略
 
 
 
 
具体做限制片段的实现:
 
 
func (s *AdaptiveSlot) doCheckRule(rule *Rule) (bool, string, float64) {
    var msg string
 
 
    threshold := rule.TriggerCount
    switch rule.MetricType {
    case InboundQPS:
        qps := stat.InboundNode().GetQPS(base.MetricEventPass)
        res := qps < threshold
        if !res {
            msg = "system qps check blocked"
        }
        return res, msg, qps
    case Concurrency:
        n := float64(stat.InboundNode().CurrentConcurrency())
        res := n < threshold
        if !res {
            msg = "system concurrency check blocked"
        }
        return res, msg, n
    case AvgRT:
        rt := stat.InboundNode().AvgRT()
        res := rt < threshold
        if !res {
            msg = "system avg rt check blocked"
        }
        return res, msg, rt
    case Load:
        l := system_metric.CurrentLoad()
        if l > threshold {
            if rule.Strategy != BBR || !checkBbrSimple() {
                msg = "system load check blocked"
                return false, msg, l
            }
        }
        return true, "", l
    case CpuUsage:
        c := system_metric.CurrentCpuUsage()
        if c > threshold {
            if rule.Strategy != BBR || !checkBbrSimple() {
                msg = "system cpu usage check blocked"
                return false, msg, c
            }
        }
        return true, "", c
    default:
        msg = "system undefined metric type, pass by default"
        return true, msg, 0.0
    }
}

文章作者: biturd
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 biturd !
  目录