plugin/kubernetes: rate limits to api server (#7771)

Signed-off-by: pasteley <ceasebeing@gmail.com>
This commit is contained in:
pasteley
2025-12-16 05:06:16 +01:00
committed by GitHub
parent 0b420cd49f
commit 388cbc5187
4 changed files with 157 additions and 0 deletions

View File

@@ -34,6 +34,9 @@ kubernetes [ZONES...] {
endpoint URL
tls CERT KEY CACERT
kubeconfig KUBECONFIG [CONTEXT]
apiserver_qps QPS
apiserver_burst BURST
apiserver_max_inflight MAX
namespaces NAMESPACE...
labels EXPRESSION
pods POD-MODE
@@ -55,6 +58,12 @@ kubernetes [ZONES...] {
**[CONTEXT]** is optional, if not set, then the current context specified in kubeconfig will be used.
It supports TLS, username and password, or token-based authentication.
This option is ignored if connecting in-cluster (i.e., the endpoint is not specified).
* `apiserver_qps` **QPS** sets the maximum queries per second (QPS) rate limit for requests.
This allows you to control the rate at which the plugin sends requests to the API server to prevent overwhelming it.
* `apiserver_burst` **BURST** sets the maximum burst size for requests.
This allows temporary spikes in request rate up to this value, even if it exceeds the QPS limit.
* `apiserver_max_inflight` **MAX** sets the maximum number of concurrent in-flight requests.
This caps the total number of simultaneous requests the plugin can make to the API server.
* `namespaces` **NAMESPACE [NAMESPACE...]** only exposes the k8s namespaces listed.
If this option is omitted all namespaces are exposed
* `namespace_labels` **EXPRESSION** only expose the records for Kubernetes namespaces that match this label selector.

View File

@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"net"
"net/http"
"runtime"
"strings"
"time"
@@ -50,6 +51,9 @@ type Kubernetes struct {
localIPs []net.IP
autoPathSearch []string // Local search path from /etc/resolv.conf. Needed for autopath.
startupTimeout time.Duration // startupTimeout set timeout of startup
apiQPS float32 // Maximum queries per second from the client to the API server
apiBurst int // Maximum burst for throttle
apiMaxInflight int // Maximum number of concurrent requests in flight to the API server
}
// Upstreamer is used to resolve CNAME or other external targets
@@ -265,6 +269,24 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (onStart func() error, o
k.opts.namespaceSelector = selector
}
if k.apiQPS > 0 {
config.QPS = k.apiQPS
}
if k.apiBurst > 0 {
config.Burst = k.apiBurst
}
if k.apiMaxInflight > 0 {
existingWrap := config.WrapTransport
config.WrapTransport = func(rt http.RoundTripper) http.RoundTripper {
if existingWrap != nil {
rt = existingWrap(rt)
}
return newMaxInflightRoundTripper(rt, k.apiMaxInflight)
}
}
k.opts.initPodCache = k.podMode == podModeVerified
k.opts.zones = k.Zones
@@ -671,3 +693,28 @@ func matchPortAndProtocol(aPort, bPort, aProtocol, bProtocol string) bool {
}
const coredns = "c" // used as a fake key prefix in msg.Service
// roundTripperFunc is an adapter to allow use of ordinary functions as http.RoundTrippers
type roundTripperFunc func(*http.Request) (*http.Response, error)
func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) {
return f(r)
}
// newMaxInflightRoundTripper returns RoundTripper that limits the number of concurrent requests
func newMaxInflightRoundTripper(next http.RoundTripper, max int) http.RoundTripper {
if max <= 0 {
return next
}
sem := make(chan struct{}, max)
return roundTripperFunc(func(r *http.Request) (*http.Response, error) {
select {
case sem <- struct{}{}:
defer func() { <-sem }()
return next.RoundTrip(r)
case <-r.Context().Done():
return nil, r.Context().Err()
}
})
}

View File

@@ -244,6 +244,45 @@ func ParseStanza(c *caddy.Controller) (*Kubernetes, error) {
return nil, fmt.Errorf("failed to parse startup_timeout: %v, %s", args[0], err)
}
}
case "apiserver_qps":
args := c.RemainingArgs()
if len(args) != 1 {
return nil, c.ArgErr()
}
qps, err := strconv.ParseFloat(args[0], 32)
if err != nil {
return nil, c.Errf("invalid apiserver_qps %q: %v", args[0], err)
}
if qps < 0 {
return nil, c.Errf("apiserver_qps must be >= 0")
}
k8s.apiQPS = float32(qps)
case "apiserver_burst":
args := c.RemainingArgs()
if len(args) != 1 {
return nil, c.ArgErr()
}
burst, err := strconv.Atoi(args[0])
if err != nil {
return nil, c.Errf("invalid apiserver_burst %q: %v", args[0], err)
}
if burst < 0 {
return nil, c.Errf("apiserver_burst must be >= 0")
}
k8s.apiBurst = burst
case "apiserver_max_inflight":
args := c.RemainingArgs()
if len(args) != 1 {
return nil, c.ArgErr()
}
max, err := strconv.Atoi(args[0])
if err != nil {
return nil, c.Errf("invalid apiserver_max_inflight %q: %v", args[0], err)
}
if max < 0 {
return nil, c.Errf("apiserver_max_inflight must be >= 0")
}
k8s.apiMaxInflight = max
default:
return nil, c.Errf("unknown property '%s'", c.Val())
}

View File

@@ -731,3 +731,65 @@ func TestKubernetesParseMulticluster(t *testing.T) {
}
}
}
func TestKubernetesParseAPIRateLimiting(t *testing.T) {
tests := []struct {
input string
shouldErr bool
expectedErrContent string
expectedQPS float32
expectedBurst int
expectedMaxInf int
}{
{
`kubernetes coredns.local {
apiserver_qps 50.0
apiserver_burst 100
apiserver_max_inflight 25
}`,
false, "", 50.0, 100, 25,
},
{
`kubernetes coredns.local {
apiserver_qps -10
}`, true, "apiserver_qps must be >= 0", 0, 0, 0},
{
`kubernetes coredns.local {
apiserver_burst -5
}`, true, "apiserver_burst must be >= 0", 0, 0, 0},
{
`kubernetes coredns.local {
apiserver_max_inflight -1
}`, true, "apiserver_max_inflight must be >= 0", 0, 0, 0},
}
for i, test := range tests {
c := caddy.NewTestController("dns", test.input)
k8s, err := kubernetesParse(c)
if test.shouldErr && err == nil {
t.Errorf("Test %d: Expected error but got none for input '%s'", i, test.input)
continue
}
if !test.shouldErr && err != nil {
t.Errorf("Test %d: Expected no error but got: %v", i, err)
continue
}
if err != nil {
if !strings.Contains(err.Error(), test.expectedErrContent) {
t.Errorf("Test %d: Expected error to contain '%s', got: %v", i, test.expectedErrContent, err)
}
continue
}
if k8s.apiQPS != test.expectedQPS {
t.Errorf("Test %d: Expected apiQPS=%v, got %v", i, test.expectedQPS, k8s.apiQPS)
}
if k8s.apiBurst != test.expectedBurst {
t.Errorf("Test %d: Expected apiBurst=%v, got %v", i, test.expectedBurst, k8s.apiBurst)
}
if k8s.apiMaxInflight != test.expectedMaxInf {
t.Errorf("Test %d: Expected apiMaxInflight=%v, got %v", i, test.expectedMaxInf, k8s.apiMaxInflight)
}
}
}