From 388cbc5187bd00d09c8c91bc7afdf5b8483d3aea Mon Sep 17 00:00:00 2001 From: pasteley Date: Tue, 16 Dec 2025 05:06:16 +0100 Subject: [PATCH] plugin/kubernetes: rate limits to api server (#7771) Signed-off-by: pasteley --- plugin/kubernetes/README.md | 9 +++++ plugin/kubernetes/kubernetes.go | 47 +++++++++++++++++++++++++ plugin/kubernetes/setup.go | 39 +++++++++++++++++++++ plugin/kubernetes/setup_test.go | 62 +++++++++++++++++++++++++++++++++ 4 files changed, 157 insertions(+) diff --git a/plugin/kubernetes/README.md b/plugin/kubernetes/README.md index 4273a9ea0..92c161000 100644 --- a/plugin/kubernetes/README.md +++ b/plugin/kubernetes/README.md @@ -34,6 +34,9 @@ kubernetes [ZONES...] { endpoint URL tls CERT KEY CACERT kubeconfig KUBECONFIG [CONTEXT] + apiserver_qps QPS + apiserver_burst BURST + apiserver_max_inflight MAX namespaces NAMESPACE... labels EXPRESSION pods POD-MODE @@ -55,6 +58,12 @@ kubernetes [ZONES...] { **[CONTEXT]** is optional, if not set, then the current context specified in kubeconfig will be used. It supports TLS, username and password, or token-based authentication. This option is ignored if connecting in-cluster (i.e., the endpoint is not specified). +* `apiserver_qps` **QPS** sets the maximum queries per second (QPS) rate limit for requests. + This allows you to control the rate at which the plugin sends requests to the API server to prevent overwhelming it. +* `apiserver_burst` **BURST** sets the maximum burst size for requests. + This allows temporary spikes in request rate up to this value, even if it exceeds the QPS limit. +* `apiserver_max_inflight` **MAX** sets the maximum number of concurrent in-flight requests. + This caps the total number of simultaneous requests the plugin can make to the API server. * `namespaces` **NAMESPACE [NAMESPACE...]** only exposes the k8s namespaces listed. If this option is omitted all namespaces are exposed * `namespace_labels` **EXPRESSION** only expose the records for Kubernetes namespaces that match this label selector. diff --git a/plugin/kubernetes/kubernetes.go b/plugin/kubernetes/kubernetes.go index 5bf064c3c..8bd72db69 100644 --- a/plugin/kubernetes/kubernetes.go +++ b/plugin/kubernetes/kubernetes.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "net" + "net/http" "runtime" "strings" "time" @@ -50,6 +51,9 @@ type Kubernetes struct { localIPs []net.IP autoPathSearch []string // Local search path from /etc/resolv.conf. Needed for autopath. startupTimeout time.Duration // startupTimeout set timeout of startup + apiQPS float32 // Maximum queries per second from the client to the API server + apiBurst int // Maximum burst for throttle + apiMaxInflight int // Maximum number of concurrent requests in flight to the API server } // Upstreamer is used to resolve CNAME or other external targets @@ -265,6 +269,24 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (onStart func() error, o k.opts.namespaceSelector = selector } + if k.apiQPS > 0 { + config.QPS = k.apiQPS + } + + if k.apiBurst > 0 { + config.Burst = k.apiBurst + } + + if k.apiMaxInflight > 0 { + existingWrap := config.WrapTransport + config.WrapTransport = func(rt http.RoundTripper) http.RoundTripper { + if existingWrap != nil { + rt = existingWrap(rt) + } + return newMaxInflightRoundTripper(rt, k.apiMaxInflight) + } + } + k.opts.initPodCache = k.podMode == podModeVerified k.opts.zones = k.Zones @@ -671,3 +693,28 @@ func matchPortAndProtocol(aPort, bPort, aProtocol, bProtocol string) bool { } const coredns = "c" // used as a fake key prefix in msg.Service + +// roundTripperFunc is an adapter to allow use of ordinary functions as http.RoundTrippers +type roundTripperFunc func(*http.Request) (*http.Response, error) + +func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { + return f(r) +} + +// newMaxInflightRoundTripper returns RoundTripper that limits the number of concurrent requests +func newMaxInflightRoundTripper(next http.RoundTripper, max int) http.RoundTripper { + if max <= 0 { + return next + } + sem := make(chan struct{}, max) + + return roundTripperFunc(func(r *http.Request) (*http.Response, error) { + select { + case sem <- struct{}{}: + defer func() { <-sem }() + return next.RoundTrip(r) + case <-r.Context().Done(): + return nil, r.Context().Err() + } + }) +} diff --git a/plugin/kubernetes/setup.go b/plugin/kubernetes/setup.go index cc73e0fe5..a58ac8a27 100644 --- a/plugin/kubernetes/setup.go +++ b/plugin/kubernetes/setup.go @@ -244,6 +244,45 @@ func ParseStanza(c *caddy.Controller) (*Kubernetes, error) { return nil, fmt.Errorf("failed to parse startup_timeout: %v, %s", args[0], err) } } + case "apiserver_qps": + args := c.RemainingArgs() + if len(args) != 1 { + return nil, c.ArgErr() + } + qps, err := strconv.ParseFloat(args[0], 32) + if err != nil { + return nil, c.Errf("invalid apiserver_qps %q: %v", args[0], err) + } + if qps < 0 { + return nil, c.Errf("apiserver_qps must be >= 0") + } + k8s.apiQPS = float32(qps) + case "apiserver_burst": + args := c.RemainingArgs() + if len(args) != 1 { + return nil, c.ArgErr() + } + burst, err := strconv.Atoi(args[0]) + if err != nil { + return nil, c.Errf("invalid apiserver_burst %q: %v", args[0], err) + } + if burst < 0 { + return nil, c.Errf("apiserver_burst must be >= 0") + } + k8s.apiBurst = burst + case "apiserver_max_inflight": + args := c.RemainingArgs() + if len(args) != 1 { + return nil, c.ArgErr() + } + max, err := strconv.Atoi(args[0]) + if err != nil { + return nil, c.Errf("invalid apiserver_max_inflight %q: %v", args[0], err) + } + if max < 0 { + return nil, c.Errf("apiserver_max_inflight must be >= 0") + } + k8s.apiMaxInflight = max default: return nil, c.Errf("unknown property '%s'", c.Val()) } diff --git a/plugin/kubernetes/setup_test.go b/plugin/kubernetes/setup_test.go index 097569c1c..39d942662 100644 --- a/plugin/kubernetes/setup_test.go +++ b/plugin/kubernetes/setup_test.go @@ -731,3 +731,65 @@ func TestKubernetesParseMulticluster(t *testing.T) { } } } + +func TestKubernetesParseAPIRateLimiting(t *testing.T) { + tests := []struct { + input string + shouldErr bool + expectedErrContent string + expectedQPS float32 + expectedBurst int + expectedMaxInf int + }{ + { + `kubernetes coredns.local { + apiserver_qps 50.0 + apiserver_burst 100 + apiserver_max_inflight 25 +}`, + false, "", 50.0, 100, 25, + }, + { + `kubernetes coredns.local { + apiserver_qps -10 +}`, true, "apiserver_qps must be >= 0", 0, 0, 0}, + { + `kubernetes coredns.local { + apiserver_burst -5 +}`, true, "apiserver_burst must be >= 0", 0, 0, 0}, + { + `kubernetes coredns.local { + apiserver_max_inflight -1 +}`, true, "apiserver_max_inflight must be >= 0", 0, 0, 0}, + } + + for i, test := range tests { + c := caddy.NewTestController("dns", test.input) + k8s, err := kubernetesParse(c) + + if test.shouldErr && err == nil { + t.Errorf("Test %d: Expected error but got none for input '%s'", i, test.input) + continue + } + if !test.shouldErr && err != nil { + t.Errorf("Test %d: Expected no error but got: %v", i, err) + continue + } + if err != nil { + if !strings.Contains(err.Error(), test.expectedErrContent) { + t.Errorf("Test %d: Expected error to contain '%s', got: %v", i, test.expectedErrContent, err) + } + continue + } + + if k8s.apiQPS != test.expectedQPS { + t.Errorf("Test %d: Expected apiQPS=%v, got %v", i, test.expectedQPS, k8s.apiQPS) + } + if k8s.apiBurst != test.expectedBurst { + t.Errorf("Test %d: Expected apiBurst=%v, got %v", i, test.expectedBurst, k8s.apiBurst) + } + if k8s.apiMaxInflight != test.expectedMaxInf { + t.Errorf("Test %d: Expected apiMaxInflight=%v, got %v", i, test.expectedMaxInf, k8s.apiMaxInflight) + } + } +}