From 20626a74641de7bcd07d8ec69d7a60ed8b273682 Mon Sep 17 00:00:00 2001 From: rpb-ant Date: Mon, 30 Mar 2026 16:18:24 -0400 Subject: [PATCH] Add an atomic.Bool to singleflight prefetching (#7963) Also updated plugin to document single-flighting Signed-off-by: Ryan Brewster --- plugin/cache/README.md | 5 +++ plugin/cache/handler.go | 20 +++++++-- plugin/cache/item.go | 9 ++++ plugin/cache/prefetch_test.go | 83 +++++++++++++++++++++++++++++++++++ 4 files changed, 113 insertions(+), 4 deletions(-) diff --git a/plugin/cache/README.md b/plugin/cache/README.md index 209219188..90dfaa6e3 100644 --- a/plugin/cache/README.md +++ b/plugin/cache/README.md @@ -60,6 +60,9 @@ cache [TTL] [ZONES...] { **DURATION** defaults to 1m. Prefetching will happen when the TTL drops below **PERCENTAGE**, which defaults to `10%`, or latest 1 second before TTL expiration. Values should be in the range `[10%, 90%]`. Note the percent sign is mandatory. **PERCENTAGE** is treated as an `int`. + Concurrent requests that trigger a prefetch for the same cache entry dispatch at most one + background fetch, so prefetch load scales with the number of distinct eligible entries rather + than request rate. * `serve_stale`, when serve\_stale is set, cache will always serve an expired entry to a client if there is one available as long as it has not been expired for longer than **DURATION** (default 1 hour). By default, the _cache_ plugin will attempt to refresh the cache entry after sending the expired cache entry to the client. The @@ -69,6 +72,8 @@ cache [TTL] [ZONES...] { checking to see if the entry is available from the source. **REFRESH_MODE** defaults to `immediate`. Setting this value to `verify` can lead to increased latency when serving stale responses, but will prevent stale entries from ever being served if an updated response can be retrieved from the source. + In `immediate` mode, concurrent requests for the same expired entry dispatch at most one + background refresh. * `servfail` cache SERVFAIL responses for **DURATION**. Setting **DURATION** to 0 will disable caching of SERVFAIL responses. If this option is not set, SERVFAIL responses will be cached for 5 seconds. **DURATION** may not be greater than 5 minutes. diff --git a/plugin/cache/handler.go b/plugin/cache/handler.go index 4e8b1450e..96c65384f 100644 --- a/plugin/cache/handler.go +++ b/plugin/cache/handler.go @@ -55,13 +55,11 @@ func (c *Cache) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) // Adjust the time to get a 0 TTL in the reply built from a stale item. now = now.Add(time.Duration(ttl) * time.Second) if !c.verifyStale { - cw := newPrefetchResponseWriter(server, rc, do, cd, c) - go c.doPrefetch(ctx, cw, i, now) + c.tryPrefetch(ctx, i, server, rc, do, cd, now) } servedStale.WithLabelValues(server, c.zonesMetricLabel, c.viewMetricLabel).Inc() } else if c.shouldPrefetch(i, now) { - cw := newPrefetchResponseWriter(server, rc, do, cd, c) - go c.doPrefetch(ctx, cw, i, now) + c.tryPrefetch(ctx, i, server, rc, do, cd, now) } if i.wildcard != "" { @@ -91,6 +89,20 @@ func wildcardFunc(ctx context.Context) func() string { } } +// tryPrefetch dispatches a background prefetch for i if one is not already in +// flight. The CAS on i.refreshing ensures at most one prefetch goroutine per +// item, so prefetch load scales with distinct stale keys rather than QPS. +func (c *Cache) tryPrefetch(ctx context.Context, i *item, server string, req *dns.Msg, do, cd bool, now time.Time) { + if !i.refreshing.CompareAndSwap(false, true) { + return + } + cw := newPrefetchResponseWriter(server, req, do, cd, c) + go func() { + defer i.refreshing.Store(false) + c.doPrefetch(ctx, cw, i, now) + }() +} + func (c *Cache) doPrefetch(ctx context.Context, cw *ResponseWriter, i *item, now time.Time) { // Use a fresh metadata map to avoid concurrent writes to the original request's metadata. ctx = metadata.ContextWithMetadata(ctx) diff --git a/plugin/cache/item.go b/plugin/cache/item.go index c41574c25..f1aaeb910 100644 --- a/plugin/cache/item.go +++ b/plugin/cache/item.go @@ -2,6 +2,7 @@ package cache import ( "strings" + "sync/atomic" "time" "github.com/coredns/coredns/plugin/cache/freq" @@ -25,6 +26,14 @@ type item struct { stored time.Time *freq.Freq + + // refreshing is set via CAS when a prefetch goroutine is dispatched for + // this item and cleared when it returns, bounding in-flight prefetches + // per item to one. A successful prefetch replaces this item in the cache + // with a new one (zero-valued refreshing); the deferred clear matters + // only when the prefetch fails and this item remains cached, so the next + // hit can retry. + refreshing atomic.Bool } func newItem(m *dns.Msg, now time.Time, d time.Duration) *item { diff --git a/plugin/cache/prefetch_test.go b/plugin/cache/prefetch_test.go index afecdc868..4819cd684 100644 --- a/plugin/cache/prefetch_test.go +++ b/plugin/cache/prefetch_test.go @@ -3,6 +3,8 @@ package cache import ( "context" "fmt" + "sync" + "sync/atomic" "testing" "time" @@ -209,6 +211,87 @@ type verification struct { fetch bool } +// TestPrefetchDedup verifies that concurrent hits on a single cache item +// dispatch at most one prefetch goroutine, on both the serve_stale and +// shouldPrefetch paths. See https://github.com/coredns/coredns/issues/7904. +func TestPrefetchDedup(t *testing.T) { + for _, tc := range []struct { + name string + staleUpTo time.Duration + prefetch int + percentage int + hitAt time.Duration // all concurrent hits land here + }{ + {name: "serve_stale", staleUpTo: time.Hour, hitAt: 110 * time.Second}, + {name: "prefetch", prefetch: 1, percentage: 50, hitAt: 70 * time.Second}, + } { + t.Run(tc.name, func(t *testing.T) { + const N = 200 + var upstream atomic.Int32 + release := make(chan struct{}) + done := make(chan struct{}, N) + + c := New() + c.staleUpTo = tc.staleUpTo + c.prefetch = tc.prefetch + c.percentage = tc.percentage + c.duration = time.Minute + c.Next = plugin.HandlerFunc(func(_ context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) { + n := upstream.Add(1) + if n > 1 { + // Block the prefetch so all concurrent hits race against + // the in-flight flag, not a completed refresh. + <-release + defer func() { done <- struct{}{} }() + } + m := new(dns.Msg) + m.SetReply(r) + m.Response = true + m.Answer = []dns.RR{test.A("dedup.example.org. 100 IN A 127.0.0.1")} + w.WriteMsg(m) + return dns.RcodeSuccess, nil + }) + + t0 := time.Now().UTC() + c.now = func() time.Time { return t0 } + req := new(dns.Msg) + req.SetQuestion("dedup.example.org.", dns.TypeA) + c.ServeDNS(context.TODO(), &test.ResponseWriter{}, req) + if upstream.Load() != 1 { + t.Fatalf("initial populate: want 1 upstream call, got %d", upstream.Load()) + } + + // Fire N concurrent hits while the item is prefetch-eligible. + // Without dedup each would spawn its own prefetch goroutine; + // with dedup only the CAS winner spawns one, and the remaining + // N-1 hits serve from cache without touching upstream. + c.now = func() time.Time { return t0.Add(tc.hitAt) } + var wg sync.WaitGroup + wg.Add(N) + for range N { + go func() { + defer wg.Done() + req := new(dns.Msg) + req.SetQuestion("dedup.example.org.", dns.TypeA) + c.ServeDNS(context.TODO(), &test.ResponseWriter{}, req) + }() + } + wg.Wait() + + close(release) + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("prefetch goroutine never completed") + } + + if got := upstream.Load(); got != 2 { + t.Fatalf("want exactly 2 upstream calls (populate + 1 deduped prefetch), got %d", got) + } + }) + } +} + // prefetchHandler is a fake plugin implementation which returns a single A // record with the given qname and ttl. The returned IP address starts at // 127.0.0.1 and is incremented on every request.