plugin/forward: use dynamic read timeout (#1659)

- each proxy stores average RTT (round trip time) of last rttCount queries.
   For now, I assigned the value 4 to rttCount
 - the read timeout is calculated as doubled average RTT, but it cannot
   exceed default timeout
 - initial avg RTT is set to a half of default timeout, so initial timeout
   is equal to default timeout
 - the RTT for failed read is considered equal to default timeout, so any
   failed read will lead to increasing average RTT (up to default timeout)
 - dynamic timeouts will let us react faster on lost UDP packets
 - in future, we may develop a low-latency forward policy based on
   collected RTT values of proxies
This commit is contained in:
Ruslan Drozhdzh
2018-04-11 09:50:06 +03:00
committed by Miek Gieben
parent 5a546f743e
commit a20b4fe2de
2 changed files with 24 additions and 1 deletions

View File

@@ -7,6 +7,7 @@ package forward
import ( import (
"io" "io"
"strconv" "strconv"
"sync/atomic"
"time" "time"
"github.com/coredns/coredns/request" "github.com/coredns/coredns/request"
@@ -15,6 +16,19 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
) )
func (p *Proxy) readTimeout() time.Duration {
rtt := time.Duration(atomic.LoadInt64(&p.avgRtt))
if rtt < timeout/2 {
return 2 * rtt
}
return timeout
}
func (p *Proxy) updateRtt(newRtt time.Duration) {
rtt := time.Duration(atomic.LoadInt64(&p.avgRtt))
atomic.AddInt64(&p.avgRtt, int64((newRtt-rtt)/rttCount))
}
func (p *Proxy) connect(ctx context.Context, state request.Request, forceTCP, metric bool) (*dns.Msg, error) { func (p *Proxy) connect(ctx context.Context, state request.Request, forceTCP, metric bool) (*dns.Msg, error) {
start := time.Now() start := time.Now()
@@ -35,6 +49,7 @@ func (p *Proxy) connect(ctx context.Context, state request.Request, forceTCP, me
} }
conn.SetWriteDeadline(time.Now().Add(timeout)) conn.SetWriteDeadline(time.Now().Add(timeout))
reqTime := time.Now()
if err := conn.WriteMsg(state.Req); err != nil { if err := conn.WriteMsg(state.Req); err != nil {
conn.Close() // not giving it back conn.Close() // not giving it back
if err == io.EOF && cached { if err == io.EOF && cached {
@@ -43,9 +58,10 @@ func (p *Proxy) connect(ctx context.Context, state request.Request, forceTCP, me
return nil, err return nil, err
} }
conn.SetReadDeadline(time.Now().Add(timeout)) conn.SetReadDeadline(time.Now().Add(p.readTimeout()))
ret, err := conn.ReadMsg() ret, err := conn.ReadMsg()
if err != nil { if err != nil {
p.updateRtt(timeout)
conn.Close() // not giving it back conn.Close() // not giving it back
if err == io.EOF && cached { if err == io.EOF && cached {
return nil, errCachedClosed return nil, errCachedClosed
@@ -53,6 +69,8 @@ func (p *Proxy) connect(ctx context.Context, state request.Request, forceTCP, me
return nil, err return nil, err
} }
p.updateRtt(time.Since(reqTime))
p.Yield(conn) p.Yield(conn)
if metric { if metric {
@@ -68,3 +86,5 @@ func (p *Proxy) connect(ctx context.Context, state request.Request, forceTCP, me
return ret, nil return ret, nil
} }
const rttCount = 4

View File

@@ -22,6 +22,8 @@ type Proxy struct {
// health checking // health checking
probe *up.Probe probe *up.Probe
fails uint32 fails uint32
avgRtt int64
} }
// NewProxy returns a new proxy. // NewProxy returns a new proxy.
@@ -31,6 +33,7 @@ func NewProxy(addr string, tlsConfig *tls.Config) *Proxy {
fails: 0, fails: 0,
probe: up.New(), probe: up.New(),
transport: newTransport(addr, tlsConfig), transport: newTransport(addr, tlsConfig),
avgRtt: int64(timeout / 2),
} }
p.client = dnsClient(tlsConfig) p.client = dnsClient(tlsConfig)
return p return p