当前位置: 首页 > news >正文

client-go限速之QPS、Burst 和 RateLimiter

QPS、Burst这两个是rest.Config里的配置,由ctrl.GetConfigOrDie()获得
QPS、Burst的默认值为:
k8s.io/client-go@v0.25.0/rest/config.go
1 const (
2     DefaultQPS   float32 = 5.0
3     DefaultBurst int     = 10
4 )

operator的main.go

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{//...})

controller-runtime代码链条

// sigs.k8s.io/controller-runtime@v0.13.0/alias.go
NewManager = manager.New// sigs.k8s.io/controller-runtime@v0.13.0/pkg/cluster/cluster.go
func New(config *rest.Config, options Options) (Manager, error) {if config == nil {return nil, errors.New("must specify Config")}// Set default values for options fieldsoptions = setOptionsDefaults(options)// ...
cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {//...
    }recorderProvider, err := options.newRecorderProvider(config, cluster.GetHTTPClient(), cluster.GetScheme(), options.Logger.WithName("events"), options.makeBroadcaster)// ...
}// sigs.k8s.io/controller-runtime@v0.13.0/pkg/internal/recorder/recorder.go
// NewProvider create a new Provider instance.
func NewProvider(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster EventBroadcasterProducer) (*Provider, error) {if httpClient == nil {panic("httpClient must not be nil")}corev1Client, err := corev1client.NewForConfigAndClient(config, httpClient)if err != nil {return nil, fmt.Errorf("failed to init client: %w", err)}p := &Provider{scheme: scheme, logger: logger, makeBroadcaster: makeBroadcaster, evtClient: corev1Client.Events("")}return p, nil
}

client-go代码链条

// k8s.io/client-go@v0.25.0/kubernetes/clientset.go
// NewForConfigAndClient creates a new CoreV1Client for the given config and http client.
// Note the http client provided takes precedence over the configured transport values.
func NewForConfigAndClient(c *rest.Config, h *http.Client) (*CoreV1Client, error) {config := *csetConfigDefaults(&config)client, err := rest.RESTClientForConfigAndClient(&config, h)if err != nil {return nil, err}return &CoreV1Client{client}, nil
}// k8s.io/client-go@v0.25.0/rest/config.go
// RESTClientForConfigAndClient returns a RESTClient that satisfies the requested attributes on a
// client Config object.
// Unlike RESTClientFor, RESTClientForConfigAndClient allows to pass an http.Client that is shared
// between all the API Groups and Versions.
// Note that the http client takes precedence over the transport values configured.
// The http client defaults to the `http.DefaultClient` if nil.
func RESTClientForConfigAndClient(config *Config, httpClient *http.Client) (*RESTClient, error) {if config.GroupVersion == nil {return nil, fmt.Errorf("GroupVersion is required when initializing a RESTClient")}if config.NegotiatedSerializer == nil {return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")}baseURL, versionedAPIPath, err := DefaultServerUrlFor(config)if err != nil {return nil, err}rateLimiter := config.RateLimiterif rateLimiter == nil {qps := config.QPSif config.QPS == 0.0 {qps = DefaultQPS}burst := config.Burstif config.Burst == 0 {burst = DefaultBurst}if qps > 0 {rateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst)}}var gv schema.GroupVersionif config.GroupVersion != nil {gv = *config.GroupVersion}clientContent := ClientContentConfig{AcceptContentTypes: config.AcceptContentTypes,ContentType:        config.ContentType,GroupVersion:       gv,Negotiator:         runtime.NewClientNegotiator(config.NegotiatedSerializer, gv),}restClient, err := NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient)return restClient, err
}// k8s.io/client-go@v0.25.0/util/flowcontrol/throttle.go
// NewTokenBucketRateLimiter creates a rate limiter which implements a token bucket approach.
// The rate limiter allows bursts of up to 'burst' to exceed the QPS, while still maintaining a
// smoothed qps rate of 'qps'.
// The bucket is initially filled with 'burst' tokens, and refills at a rate of 'qps'.
// The maximum number of tokens in the bucket is capped at 'burst'.
func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {limiter := rate.NewLimiter(rate.Limit(qps), burst)return newTokenBucketRateLimiterWithClock(limiter, clock.RealClock{}, qps)
}// k8s.io/client-go@v0.25.0/rest/client.go
// NewRESTClient creates a new RESTClient. This client performs generic REST functions
// such as Get, Put, Post, and Delete on specified paths.
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ClientContentConfig, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) {base := *baseURLif !strings.HasSuffix(base.Path, "/") {base.Path += "/"}base.RawQuery = ""base.Fragment = ""return &RESTClient{base:             &base,versionedAPIPath: versionedAPIPath,content:          requestClientContentConfigProvider{base: scrubCBORContentConfigIfDisabled(config)},createBackoffMgr: readExpBackoffConfig,rateLimiter:      rateLimiter,Client:           client,}, nil
}// k8s.io/client-go@v0.25.0/rest/client.go
// GetRateLimiter returns rate limiter for a given client, or nil if it's called on a nil client
func (c *RESTClient) GetRateLimiter() flowcontrol.RateLimiter {if c == nil {return nil}return c.rateLimiter
}// k8s.io/client-go@v0.25.0/rest/client.go
// Interface captures the set of operations for generically interacting with Kubernetes REST apis.
type Interface interface {GetRateLimiter() flowcontrol.RateLimiterVerb(verb string) *RequestPost() *RequestPut() *RequestPatch(pt types.PatchType) *RequestGet() *RequestDelete() *RequestAPIVersion() schema.GroupVersion
}// RESTClient imposes common Kubernetes API conventions on a set of resource paths.
// The baseURL is expected to point to an HTTP or HTTPS path that is the parent
// of one or more resources.  The server should return a decodable API resource
// object, or an api.Status object which contains information about the reason for
// any failure.
//
// Most consumers should use client.New() to get a Kubernetes API client.
type RESTClient struct {// base is the root URL for all invocations of the clientbase *url.URL// versionedAPIPath is a path segment connecting the base URL to the resource rootversionedAPIPath string// content describes how a RESTClient encodes and decodes responses.
    content ClientContentConfig// creates BackoffManager that is passed to requests.createBackoffMgr func() BackoffManager// rateLimiter is shared among all requests created by this client unless specifically// overridden.
    rateLimiter flowcontrol.RateLimiter// warningHandler is shared among all requests created by this client.// If not set, defaultWarningHandler is used.
    warningHandler WarningHandler// Set specific behavior of the client.  If not set http.DefaultClient will be used.Client *http.Client
}// k8s.io/client-go@v0.25.0/rest/request.go
// Request allows for building up a request to a server in a chained fashion.
// Any errors are stored until the end of your call, so you only have to
// check once.
type Request struct {c *RESTClientwarningHandler WarningHandlerrateLimiter flowcontrol.RateLimiterbackoff     BackoffManagertimeout     time.DurationmaxRetries  int// generic components accessible via method settersverb       stringpathPrefix stringsubpath    stringparams     url.Valuesheaders    http.Header// structural elements of the request that are part of the Kubernetes API conventionsnamespace    stringnamespaceSet boolresource     stringresourceName stringsubresource  string// outputerr  errorbody io.ReaderretryFn requestRetryFunc
}
func (r *Request) tryThrottle(ctx context.Context) error {return r.tryThrottleWithInfo(ctx, "")
}
// Stream formats and executes the request, and offers streaming of the response.
// Returns io.ReadCloser which could be used for streaming of the response, or an error
// Any non-2xx http status code causes an error.  If we get a non-2xx code, we try to convert the body into an APIStatus object.
// If we can, we return that as an error.  Otherwise, we create an error that lists the http status and the content of the response.
func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {if r.err != nil {return nil, r.err}if err := r.tryThrottle(ctx); err != nil {return nil, err}client := r.c.Clientif client == nil {client = http.DefaultClient}retry := r.retryFn(r.maxRetries)url := r.URL().String()for {if err := retry.Before(ctx, r); err != nil {return nil, err}req, err := r.newHTTPRequest(ctx)if err != nil {return nil, err}if r.body != nil {req.Body = ioutil.NopCloser(r.body)}resp, err := client.Do(req)updateURLMetrics(ctx, r, resp, err)retry.After(ctx, r, resp, err)if err != nil {// we only retry on an HTTP response with 'Retry-After' headerreturn nil, err}switch {case (resp.StatusCode >= 200) && (resp.StatusCode < 300):handleWarnings(resp.Header, r.warningHandler)return resp.Body, nildefault:done, transformErr := func() (bool, error) {defer resp.Body.Close()if retry.IsNextRetry(ctx, r, req, resp, err, neverRetryError) {return false, nil}result := r.transformResponse(resp, req)if err := result.Error(); err != nil {return true, err}return true, fmt.Errorf("%d while accessing %v: %s", result.statusCode, url, string(result.body))}()if done {return nil, transformErr}}}
}
// request connects to the server and invokes the provided function when a server response is
// received. It handles retry behavior and up front validation of requests. It will invoke
// fn at most once. It will return an error if a problem occurred prior to connecting to the
// server - the provided function is responsible for handling server errors.
func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {// Metrics for total request latencystart := time.Now()defer func() {metrics.RequestLatency.Observe(ctx, r.verb, *r.URL(), time.Since(start))}()if r.err != nil {klog.V(4).Infof("Error in request: %v", r.err)return r.err}if err := r.requestPreflightCheck(); err != nil {return err}client := r.c.Clientif client == nil {client = http.DefaultClient}// Throttle the first try before setting up the timeout configured on the// client. We don't want a throttled client to return timeouts to callers// before it makes a single request.if err := r.tryThrottle(ctx); err != nil {return err}if r.timeout > 0 {var cancel context.CancelFuncctx, cancel = context.WithTimeout(ctx, r.timeout)defer cancel()}isErrRetryableFunc := func(req *http.Request, err error) bool {// "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors.// Thus in case of "GET" operations, we simply retry it.// We are not automatically retrying "write" operations, as they are not idempotent.if req.Method != "GET" {return false}// For connection errors and apiserver shutdown errors retry.if net.IsConnectionReset(err) || net.IsProbableEOF(err) {return true}return false}// Right now we make about ten retry attempts if we get a Retry-After response.retry := r.retryFn(r.maxRetries)for {if err := retry.Before(ctx, r); err != nil {return retry.WrapPreviousError(err)}req, err := r.newHTTPRequest(ctx)if err != nil {return err}resp, err := client.Do(req)updateURLMetrics(ctx, r, resp, err)// The value -1 or a value of 0 with a non-nil Body indicates that the length is unknown.// https://pkg.go.dev/net/http#Requestif req.ContentLength >= 0 && !(req.Body != nil && req.ContentLength == 0) {metrics.RequestSize.Observe(ctx, r.verb, r.URL().Host, float64(req.ContentLength))}retry.After(ctx, r, resp, err)done := func() bool {defer readAndCloseResponseBody(resp)// if the server returns an error in err, the response will be nil.f := func(req *http.Request, resp *http.Response) {if resp == nil {return}fn(req, resp)}if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {return false}f(req, resp)return true}()if done {return retry.WrapPreviousError(err)}}
}

 

 

http://www.hskmm.com/?act=detail&tid=13355

相关文章:

  • 三度蝉联Gartner SASE领导者:唯一厂商的技术实力解析
  • 基础命令
  • 水水水 || CSP-S 2025 初赛
  • python实现网站登录会话脚本 - wanghongwei
  • HCM 性能优化函数
  • Nginx配置里alias和root的区别
  • 国产DevOps生态崛起:Gitee如何赋能企业数字化转型
  • 【OpenCV】10 图像滤波
  • 基于java+springboot的社区居民诊疗健康管理系统(源代码+文档+讲解视频) - 指南
  • 时序数据库IoTDB的六大实用场景盘点 - 指南
  • 50系GPU上安装MMCV
  • K8S的CoreDns配置文件添加域名解析
  • 20250308_信安一把梭_web
  • nodify_介绍及安装
  • MQTT协议(消息队列遥测传输)
  • 如何构建embeding 的就是pytorch 中
  • 萤石设备视频接入平台EasyCVR国标GB28181视频平台整合铁路抑尘喷洒智能视频监控方案
  • 【低代码平台之应用构建展示】数智化贸易订单管理平台
  • C# 第 17天 028 029接口,依赖反转,单元测试
  • 2025年项目管理软件革命:AI与空间计算如何重塑企业协作范式
  • Threading 串行VS并发
  • parallel index
  • C语言 第三讲:分支和循环(上) - 教程
  • 中间件专题:Redis
  • 微信个人号开发API/文档/教程
  • 微指令控制器基本原理
  • 一个拒绝过度设计的 .NET 快速开发框架:开箱即用,专注干活
  • 个人微信号二次开发API调用、微信API接口
  • 2025.9.21+7 [未完]
  • VisualStudio-Python-工具指南-全-