Add an atomic.Bool to singleflight prefetching (#7963)

Also updated plugin to document single-flighting

Signed-off-by: Ryan Brewster <rpb@anthropic.com>
This commit is contained in:
rpb-ant
2026-03-30 16:18:24 -04:00
committed by GitHub
parent 0ba8e3c850
commit 20626a7464
4 changed files with 113 additions and 4 deletions

View File

@@ -60,6 +60,9 @@ cache [TTL] [ZONES...] {
**DURATION** defaults to 1m. Prefetching will happen when the TTL drops below **PERCENTAGE**, **DURATION** defaults to 1m. Prefetching will happen when the TTL drops below **PERCENTAGE**,
which defaults to `10%`, or latest 1 second before TTL expiration. Values should be in the range `[10%, 90%]`. which defaults to `10%`, or latest 1 second before TTL expiration. Values should be in the range `[10%, 90%]`.
Note the percent sign is mandatory. **PERCENTAGE** is treated as an `int`. Note the percent sign is mandatory. **PERCENTAGE** is treated as an `int`.
Concurrent requests that trigger a prefetch for the same cache entry dispatch at most one
background fetch, so prefetch load scales with the number of distinct eligible entries rather
than request rate.
* `serve_stale`, when serve\_stale is set, cache will always serve an expired entry to a client if there is one * `serve_stale`, when serve\_stale is set, cache will always serve an expired entry to a client if there is one
available as long as it has not been expired for longer than **DURATION** (default 1 hour). By default, the _cache_ plugin will available as long as it has not been expired for longer than **DURATION** (default 1 hour). By default, the _cache_ plugin will
attempt to refresh the cache entry after sending the expired cache entry to the client. The attempt to refresh the cache entry after sending the expired cache entry to the client. The
@@ -69,6 +72,8 @@ cache [TTL] [ZONES...] {
checking to see if the entry is available from the source. **REFRESH_MODE** defaults to `immediate`. Setting this checking to see if the entry is available from the source. **REFRESH_MODE** defaults to `immediate`. Setting this
value to `verify` can lead to increased latency when serving stale responses, but will prevent stale entries value to `verify` can lead to increased latency when serving stale responses, but will prevent stale entries
from ever being served if an updated response can be retrieved from the source. from ever being served if an updated response can be retrieved from the source.
In `immediate` mode, concurrent requests for the same expired entry dispatch at most one
background refresh.
* `servfail` cache SERVFAIL responses for **DURATION**. Setting **DURATION** to 0 will disable caching of SERVFAIL * `servfail` cache SERVFAIL responses for **DURATION**. Setting **DURATION** to 0 will disable caching of SERVFAIL
responses. If this option is not set, SERVFAIL responses will be cached for 5 seconds. **DURATION** may not be responses. If this option is not set, SERVFAIL responses will be cached for 5 seconds. **DURATION** may not be
greater than 5 minutes. greater than 5 minutes.

View File

@@ -55,13 +55,11 @@ func (c *Cache) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg)
// Adjust the time to get a 0 TTL in the reply built from a stale item. // Adjust the time to get a 0 TTL in the reply built from a stale item.
now = now.Add(time.Duration(ttl) * time.Second) now = now.Add(time.Duration(ttl) * time.Second)
if !c.verifyStale { if !c.verifyStale {
cw := newPrefetchResponseWriter(server, rc, do, cd, c) c.tryPrefetch(ctx, i, server, rc, do, cd, now)
go c.doPrefetch(ctx, cw, i, now)
} }
servedStale.WithLabelValues(server, c.zonesMetricLabel, c.viewMetricLabel).Inc() servedStale.WithLabelValues(server, c.zonesMetricLabel, c.viewMetricLabel).Inc()
} else if c.shouldPrefetch(i, now) { } else if c.shouldPrefetch(i, now) {
cw := newPrefetchResponseWriter(server, rc, do, cd, c) c.tryPrefetch(ctx, i, server, rc, do, cd, now)
go c.doPrefetch(ctx, cw, i, now)
} }
if i.wildcard != "" { if i.wildcard != "" {
@@ -91,6 +89,20 @@ func wildcardFunc(ctx context.Context) func() string {
} }
} }
// tryPrefetch dispatches a background prefetch for i if one is not already in
// flight. The CAS on i.refreshing ensures at most one prefetch goroutine per
// item, so prefetch load scales with distinct stale keys rather than QPS.
func (c *Cache) tryPrefetch(ctx context.Context, i *item, server string, req *dns.Msg, do, cd bool, now time.Time) {
if !i.refreshing.CompareAndSwap(false, true) {
return
}
cw := newPrefetchResponseWriter(server, req, do, cd, c)
go func() {
defer i.refreshing.Store(false)
c.doPrefetch(ctx, cw, i, now)
}()
}
func (c *Cache) doPrefetch(ctx context.Context, cw *ResponseWriter, i *item, now time.Time) { func (c *Cache) doPrefetch(ctx context.Context, cw *ResponseWriter, i *item, now time.Time) {
// Use a fresh metadata map to avoid concurrent writes to the original request's metadata. // Use a fresh metadata map to avoid concurrent writes to the original request's metadata.
ctx = metadata.ContextWithMetadata(ctx) ctx = metadata.ContextWithMetadata(ctx)

View File

@@ -2,6 +2,7 @@ package cache
import ( import (
"strings" "strings"
"sync/atomic"
"time" "time"
"github.com/coredns/coredns/plugin/cache/freq" "github.com/coredns/coredns/plugin/cache/freq"
@@ -25,6 +26,14 @@ type item struct {
stored time.Time stored time.Time
*freq.Freq *freq.Freq
// refreshing is set via CAS when a prefetch goroutine is dispatched for
// this item and cleared when it returns, bounding in-flight prefetches
// per item to one. A successful prefetch replaces this item in the cache
// with a new one (zero-valued refreshing); the deferred clear matters
// only when the prefetch fails and this item remains cached, so the next
// hit can retry.
refreshing atomic.Bool
} }
func newItem(m *dns.Msg, now time.Time, d time.Duration) *item { func newItem(m *dns.Msg, now time.Time, d time.Duration) *item {

View File

@@ -3,6 +3,8 @@ package cache
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"sync/atomic"
"testing" "testing"
"time" "time"
@@ -209,6 +211,87 @@ type verification struct {
fetch bool fetch bool
} }
// TestPrefetchDedup verifies that concurrent hits on a single cache item
// dispatch at most one prefetch goroutine, on both the serve_stale and
// shouldPrefetch paths. See https://github.com/coredns/coredns/issues/7904.
func TestPrefetchDedup(t *testing.T) {
for _, tc := range []struct {
name string
staleUpTo time.Duration
prefetch int
percentage int
hitAt time.Duration // all concurrent hits land here
}{
{name: "serve_stale", staleUpTo: time.Hour, hitAt: 110 * time.Second},
{name: "prefetch", prefetch: 1, percentage: 50, hitAt: 70 * time.Second},
} {
t.Run(tc.name, func(t *testing.T) {
const N = 200
var upstream atomic.Int32
release := make(chan struct{})
done := make(chan struct{}, N)
c := New()
c.staleUpTo = tc.staleUpTo
c.prefetch = tc.prefetch
c.percentage = tc.percentage
c.duration = time.Minute
c.Next = plugin.HandlerFunc(func(_ context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) {
n := upstream.Add(1)
if n > 1 {
// Block the prefetch so all concurrent hits race against
// the in-flight flag, not a completed refresh.
<-release
defer func() { done <- struct{}{} }()
}
m := new(dns.Msg)
m.SetReply(r)
m.Response = true
m.Answer = []dns.RR{test.A("dedup.example.org. 100 IN A 127.0.0.1")}
w.WriteMsg(m)
return dns.RcodeSuccess, nil
})
t0 := time.Now().UTC()
c.now = func() time.Time { return t0 }
req := new(dns.Msg)
req.SetQuestion("dedup.example.org.", dns.TypeA)
c.ServeDNS(context.TODO(), &test.ResponseWriter{}, req)
if upstream.Load() != 1 {
t.Fatalf("initial populate: want 1 upstream call, got %d", upstream.Load())
}
// Fire N concurrent hits while the item is prefetch-eligible.
// Without dedup each would spawn its own prefetch goroutine;
// with dedup only the CAS winner spawns one, and the remaining
// N-1 hits serve from cache without touching upstream.
c.now = func() time.Time { return t0.Add(tc.hitAt) }
var wg sync.WaitGroup
wg.Add(N)
for range N {
go func() {
defer wg.Done()
req := new(dns.Msg)
req.SetQuestion("dedup.example.org.", dns.TypeA)
c.ServeDNS(context.TODO(), &test.ResponseWriter{}, req)
}()
}
wg.Wait()
close(release)
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("prefetch goroutine never completed")
}
if got := upstream.Load(); got != 2 {
t.Fatalf("want exactly 2 upstream calls (populate + 1 deduped prefetch), got %d", got)
}
})
}
}
// prefetchHandler is a fake plugin implementation which returns a single A // prefetchHandler is a fake plugin implementation which returns a single A
// record with the given qname and ttl. The returned IP address starts at // record with the given qname and ttl. The returned IP address starts at
// 127.0.0.1 and is incremented on every request. // 127.0.0.1 and is incremented on every request.