QPS、Burst这两个是rest.Config里的配置,由ctrl.GetConfigOrDie()获得
QPS、Burst的默认值为:
k8s.io/client-go@v0.25.0/rest/config.go
1 const ( 2 DefaultQPS float32 = 5.0 3 DefaultBurst int = 10 4 )
operator的main.go
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{//...})
controller-runtime代码链条
// sigs.k8s.io/controller-runtime@v0.13.0/alias.go NewManager = manager.New// sigs.k8s.io/controller-runtime@v0.13.0/pkg/cluster/cluster.go func New(config *rest.Config, options Options) (Manager, error) {if config == nil {return nil, errors.New("must specify Config")}// Set default values for options fieldsoptions = setOptionsDefaults(options)// ... cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {//... }recorderProvider, err := options.newRecorderProvider(config, cluster.GetHTTPClient(), cluster.GetScheme(), options.Logger.WithName("events"), options.makeBroadcaster)// ... }// sigs.k8s.io/controller-runtime@v0.13.0/pkg/internal/recorder/recorder.go // NewProvider create a new Provider instance. func NewProvider(config *rest.Config, httpClient *http.Client, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster EventBroadcasterProducer) (*Provider, error) {if httpClient == nil {panic("httpClient must not be nil")}corev1Client, err := corev1client.NewForConfigAndClient(config, httpClient)if err != nil {return nil, fmt.Errorf("failed to init client: %w", err)}p := &Provider{scheme: scheme, logger: logger, makeBroadcaster: makeBroadcaster, evtClient: corev1Client.Events("")}return p, nil }
client-go代码链条
// k8s.io/client-go@v0.25.0/kubernetes/clientset.go // NewForConfigAndClient creates a new CoreV1Client for the given config and http client. // Note the http client provided takes precedence over the configured transport values. func NewForConfigAndClient(c *rest.Config, h *http.Client) (*CoreV1Client, error) {config := *csetConfigDefaults(&config)client, err := rest.RESTClientForConfigAndClient(&config, h)if err != nil {return nil, err}return &CoreV1Client{client}, nil }// k8s.io/client-go@v0.25.0/rest/config.go // RESTClientForConfigAndClient returns a RESTClient that satisfies the requested attributes on a // client Config object. // Unlike RESTClientFor, RESTClientForConfigAndClient allows to pass an http.Client that is shared // between all the API Groups and Versions. // Note that the http client takes precedence over the transport values configured. // The http client defaults to the `http.DefaultClient` if nil. func RESTClientForConfigAndClient(config *Config, httpClient *http.Client) (*RESTClient, error) {if config.GroupVersion == nil {return nil, fmt.Errorf("GroupVersion is required when initializing a RESTClient")}if config.NegotiatedSerializer == nil {return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")}baseURL, versionedAPIPath, err := DefaultServerUrlFor(config)if err != nil {return nil, err}rateLimiter := config.RateLimiterif rateLimiter == nil {qps := config.QPSif config.QPS == 0.0 {qps = DefaultQPS}burst := config.Burstif config.Burst == 0 {burst = DefaultBurst}if qps > 0 {rateLimiter = flowcontrol.NewTokenBucketRateLimiter(qps, burst)}}var gv schema.GroupVersionif config.GroupVersion != nil {gv = *config.GroupVersion}clientContent := ClientContentConfig{AcceptContentTypes: config.AcceptContentTypes,ContentType: config.ContentType,GroupVersion: gv,Negotiator: runtime.NewClientNegotiator(config.NegotiatedSerializer, gv),}restClient, err := NewRESTClient(baseURL, versionedAPIPath, clientContent, rateLimiter, httpClient)return restClient, err }// k8s.io/client-go@v0.25.0/util/flowcontrol/throttle.go // NewTokenBucketRateLimiter creates a rate limiter which implements a token bucket approach. // The rate limiter allows bursts of up to 'burst' to exceed the QPS, while still maintaining a // smoothed qps rate of 'qps'. // The bucket is initially filled with 'burst' tokens, and refills at a rate of 'qps'. // The maximum number of tokens in the bucket is capped at 'burst'. func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {limiter := rate.NewLimiter(rate.Limit(qps), burst)return newTokenBucketRateLimiterWithClock(limiter, clock.RealClock{}, qps) }// k8s.io/client-go@v0.25.0/rest/client.go // NewRESTClient creates a new RESTClient. This client performs generic REST functions // such as Get, Put, Post, and Delete on specified paths. func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ClientContentConfig, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) {base := *baseURLif !strings.HasSuffix(base.Path, "/") {base.Path += "/"}base.RawQuery = ""base.Fragment = ""return &RESTClient{base: &base,versionedAPIPath: versionedAPIPath,content: requestClientContentConfigProvider{base: scrubCBORContentConfigIfDisabled(config)},createBackoffMgr: readExpBackoffConfig,rateLimiter: rateLimiter,Client: client,}, nil }// k8s.io/client-go@v0.25.0/rest/client.go // GetRateLimiter returns rate limiter for a given client, or nil if it's called on a nil client func (c *RESTClient) GetRateLimiter() flowcontrol.RateLimiter {if c == nil {return nil}return c.rateLimiter }// k8s.io/client-go@v0.25.0/rest/client.go // Interface captures the set of operations for generically interacting with Kubernetes REST apis. type Interface interface {GetRateLimiter() flowcontrol.RateLimiterVerb(verb string) *RequestPost() *RequestPut() *RequestPatch(pt types.PatchType) *RequestGet() *RequestDelete() *RequestAPIVersion() schema.GroupVersion }// RESTClient imposes common Kubernetes API conventions on a set of resource paths. // The baseURL is expected to point to an HTTP or HTTPS path that is the parent // of one or more resources. The server should return a decodable API resource // object, or an api.Status object which contains information about the reason for // any failure. // // Most consumers should use client.New() to get a Kubernetes API client. type RESTClient struct {// base is the root URL for all invocations of the clientbase *url.URL// versionedAPIPath is a path segment connecting the base URL to the resource rootversionedAPIPath string// content describes how a RESTClient encodes and decodes responses. content ClientContentConfig// creates BackoffManager that is passed to requests.createBackoffMgr func() BackoffManager// rateLimiter is shared among all requests created by this client unless specifically// overridden. rateLimiter flowcontrol.RateLimiter// warningHandler is shared among all requests created by this client.// If not set, defaultWarningHandler is used. warningHandler WarningHandler// Set specific behavior of the client. If not set http.DefaultClient will be used.Client *http.Client }// k8s.io/client-go@v0.25.0/rest/request.go // Request allows for building up a request to a server in a chained fashion. // Any errors are stored until the end of your call, so you only have to // check once. type Request struct {c *RESTClientwarningHandler WarningHandlerrateLimiter flowcontrol.RateLimiterbackoff BackoffManagertimeout time.DurationmaxRetries int// generic components accessible via method settersverb stringpathPrefix stringsubpath stringparams url.Valuesheaders http.Header// structural elements of the request that are part of the Kubernetes API conventionsnamespace stringnamespaceSet boolresource stringresourceName stringsubresource string// outputerr errorbody io.ReaderretryFn requestRetryFunc } func (r *Request) tryThrottle(ctx context.Context) error {return r.tryThrottleWithInfo(ctx, "") } // Stream formats and executes the request, and offers streaming of the response. // Returns io.ReadCloser which could be used for streaming of the response, or an error // Any non-2xx http status code causes an error. If we get a non-2xx code, we try to convert the body into an APIStatus object. // If we can, we return that as an error. Otherwise, we create an error that lists the http status and the content of the response. func (r *Request) Stream(ctx context.Context) (io.ReadCloser, error) {if r.err != nil {return nil, r.err}if err := r.tryThrottle(ctx); err != nil {return nil, err}client := r.c.Clientif client == nil {client = http.DefaultClient}retry := r.retryFn(r.maxRetries)url := r.URL().String()for {if err := retry.Before(ctx, r); err != nil {return nil, err}req, err := r.newHTTPRequest(ctx)if err != nil {return nil, err}if r.body != nil {req.Body = ioutil.NopCloser(r.body)}resp, err := client.Do(req)updateURLMetrics(ctx, r, resp, err)retry.After(ctx, r, resp, err)if err != nil {// we only retry on an HTTP response with 'Retry-After' headerreturn nil, err}switch {case (resp.StatusCode >= 200) && (resp.StatusCode < 300):handleWarnings(resp.Header, r.warningHandler)return resp.Body, nildefault:done, transformErr := func() (bool, error) {defer resp.Body.Close()if retry.IsNextRetry(ctx, r, req, resp, err, neverRetryError) {return false, nil}result := r.transformResponse(resp, req)if err := result.Error(); err != nil {return true, err}return true, fmt.Errorf("%d while accessing %v: %s", result.statusCode, url, string(result.body))}()if done {return nil, transformErr}}} } // request connects to the server and invokes the provided function when a server response is // received. It handles retry behavior and up front validation of requests. It will invoke // fn at most once. It will return an error if a problem occurred prior to connecting to the // server - the provided function is responsible for handling server errors. func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {// Metrics for total request latencystart := time.Now()defer func() {metrics.RequestLatency.Observe(ctx, r.verb, *r.URL(), time.Since(start))}()if r.err != nil {klog.V(4).Infof("Error in request: %v", r.err)return r.err}if err := r.requestPreflightCheck(); err != nil {return err}client := r.c.Clientif client == nil {client = http.DefaultClient}// Throttle the first try before setting up the timeout configured on the// client. We don't want a throttled client to return timeouts to callers// before it makes a single request.if err := r.tryThrottle(ctx); err != nil {return err}if r.timeout > 0 {var cancel context.CancelFuncctx, cancel = context.WithTimeout(ctx, r.timeout)defer cancel()}isErrRetryableFunc := func(req *http.Request, err error) bool {// "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors.// Thus in case of "GET" operations, we simply retry it.// We are not automatically retrying "write" operations, as they are not idempotent.if req.Method != "GET" {return false}// For connection errors and apiserver shutdown errors retry.if net.IsConnectionReset(err) || net.IsProbableEOF(err) {return true}return false}// Right now we make about ten retry attempts if we get a Retry-After response.retry := r.retryFn(r.maxRetries)for {if err := retry.Before(ctx, r); err != nil {return retry.WrapPreviousError(err)}req, err := r.newHTTPRequest(ctx)if err != nil {return err}resp, err := client.Do(req)updateURLMetrics(ctx, r, resp, err)// The value -1 or a value of 0 with a non-nil Body indicates that the length is unknown.// https://pkg.go.dev/net/http#Requestif req.ContentLength >= 0 && !(req.Body != nil && req.ContentLength == 0) {metrics.RequestSize.Observe(ctx, r.verb, r.URL().Host, float64(req.ContentLength))}retry.After(ctx, r, resp, err)done := func() bool {defer readAndCloseResponseBody(resp)// if the server returns an error in err, the response will be nil.f := func(req *http.Request, resp *http.Response) {if resp == nil {return}fn(req, resp)}if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {return false}f(req, resp)return true}()if done {return retry.WrapPreviousError(err)}} }