Dedup policy implement between grpc and proxy plugin (#3537)

Signed-off-by: zouyee <zounengren@cmss.chinamobile.com>
This commit is contained in:
Zou Nengren
2019-12-17 16:15:31 +08:00
committed by Miek Gieben
parent acb75ea904
commit 5e04c27238
7 changed files with 104 additions and 142 deletions

View File

@@ -12,6 +12,7 @@ import (
"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/debug"
"github.com/coredns/coredns/plugin/pkg/policy"
clog "github.com/coredns/coredns/plugin/pkg/log"
"github.com/coredns/coredns/request"
@@ -25,7 +26,7 @@ var log = clog.NewWithPlugin("forward")
// of proxies each representing one upstream proxy.
type Forward struct {
proxies []*Proxy
p Policy
p policy.Policy
hcInterval time.Duration
from string
@@ -43,7 +44,7 @@ type Forward struct {
// New returns a new Forward.
func New() *Forward {
f := &Forward{maxfails: 2, tlsConfig: new(tls.Config), expire: defaultExpire, p: new(random), from: ".", hcInterval: hcInterval}
f := &Forward{maxfails: 2, tlsConfig: new(tls.Config), expire: defaultExpire, p: new(policy.Random), from: ".", hcInterval: hcInterval}
return f
}
@@ -91,8 +92,8 @@ func (f *Forward) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg
}
// All upstream proxies are dead, assume healthcheck is completely broken and randomly
// select an upstream to connect to.
r := new(random)
proxy = r.List(f.proxies)[0]
r := new(policy.Random)
proxy = r.List(f.proxies)[0].([]*Proxy)[0]
HealthcheckBrokenCount.Add(1)
}
@@ -188,7 +189,12 @@ func (f *Forward) ForceTCP() bool { return f.opts.forceTCP }
func (f *Forward) PreferUDP() bool { return f.opts.preferUDP }
// List returns a set of proxies to be used for this client depending on the policy in f.
func (f *Forward) List() []*Proxy { return f.p.List(f.proxies) }
func (f *Forward) List() []*Proxy {
if len(f.p.List(f.proxies)) == 1 {
return f.p.List(f.proxies)[0].([]*Proxy)
}
return nil
}
var (
// ErrNoHealthy means no healthy proxies left.

View File

@@ -1,64 +0,0 @@
package forward
import (
"math/rand"
"sync/atomic"
)
// Policy defines a policy we use for selecting upstreams.
type Policy interface {
List([]*Proxy) []*Proxy
String() string
}
// random is a policy that implements random upstream selection.
type random struct{}
func (r *random) String() string { return "random" }
func (r *random) List(p []*Proxy) []*Proxy {
switch len(p) {
case 1:
return p
case 2:
if rand.Int()%2 == 0 {
return []*Proxy{p[1], p[0]} // swap
}
return p
}
perms := rand.Perm(len(p))
rnd := make([]*Proxy, len(p))
for i, p1 := range perms {
rnd[i] = p[p1]
}
return rnd
}
// roundRobin is a policy that selects hosts based on round robin ordering.
type roundRobin struct {
robin uint32
}
func (r *roundRobin) String() string { return "round_robin" }
func (r *roundRobin) List(p []*Proxy) []*Proxy {
poolLen := uint32(len(p))
i := atomic.AddUint32(&r.robin, 1) % poolLen
robin := []*Proxy{p[i]}
robin = append(robin, p[:i]...)
robin = append(robin, p[i+1:]...)
return robin
}
// sequential is a policy that selects hosts based on sequential ordering.
type sequential struct{}
func (r *sequential) String() string { return "sequential" }
func (r *sequential) List(p []*Proxy) []*Proxy {
return p
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/coredns/coredns/core/dnsserver"
"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/metrics"
"github.com/coredns/coredns/plugin/pkg/policy"
"github.com/coredns/coredns/plugin/pkg/parse"
pkgtls "github.com/coredns/coredns/plugin/pkg/tls"
"github.com/coredns/coredns/plugin/pkg/transport"
@@ -202,11 +203,11 @@ func parseBlock(c *caddy.Controller, f *Forward) error {
}
switch x := c.Val(); x {
case "random":
f.p = &random{}
f.p = &policy.Random{}
case "round_robin":
f.p = &roundRobin{}
f.p = &policy.RoundRobin{}
case "sequential":
f.p = &sequential{}
f.p = &policy.Sequential{}
default:
return c.Errf("unknown policy '%s'", x)
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/debug"
"github.com/coredns/coredns/plugin/pkg/policy"
"github.com/coredns/coredns/request"
"github.com/miekg/dns"
@@ -17,7 +18,7 @@ import (
// It has a list of proxies each representing one upstream proxy.
type GRPC struct {
proxies []*Proxy
p Policy
p policy.Policy
from string
ignored []string
@@ -93,7 +94,7 @@ func (g *GRPC) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (
// NewGRPC returns a new GRPC.
func newGRPC() *GRPC {
g := &GRPC{
p: new(random),
p: new(policy.Random),
}
return g
}
@@ -126,6 +127,11 @@ func (g *GRPC) isAllowedDomain(name string) bool {
}
// List returns a set of proxies to be used for this client depending on the policy in p.
func (g *GRPC) list() []*Proxy { return g.p.List(g.proxies) }
func (g *GRPC) list() []*Proxy {
if len(g.p.List(g.proxies)) == 1 {
return g.p.List(g.proxies)[0].([]*Proxy)
}
return nil
}
const defaultTimeout = 5 * time.Second

View File

@@ -1,64 +0,0 @@
package grpc
import (
"math/rand"
"sync/atomic"
)
// Policy defines a policy we use for selecting upstreams.
type Policy interface {
List([]*Proxy) []*Proxy
String() string
}
// random is a policy that implements random upstream selection.
type random struct{}
func (r *random) String() string { return "random" }
func (r *random) List(p []*Proxy) []*Proxy {
switch len(p) {
case 1:
return p
case 2:
if rand.Int()%2 == 0 {
return []*Proxy{p[1], p[0]} // swap
}
return p
}
perms := rand.Perm(len(p))
rnd := make([]*Proxy, len(p))
for i, p1 := range perms {
rnd[i] = p[p1]
}
return rnd
}
// roundRobin is a policy that selects hosts based on round robin ordering.
type roundRobin struct {
robin uint32
}
func (r *roundRobin) String() string { return "round_robin" }
func (r *roundRobin) List(p []*Proxy) []*Proxy {
poolLen := uint32(len(p))
i := atomic.AddUint32(&r.robin, 1) % poolLen
robin := []*Proxy{p[i]}
robin = append(robin, p[:i]...)
robin = append(robin, p[i+1:]...)
return robin
}
// sequential is a policy that selects hosts based on sequential ordering.
type sequential struct{}
func (r *sequential) String() string { return "sequential" }
func (r *sequential) List(p []*Proxy) []*Proxy {
return p
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/metrics"
"github.com/coredns/coredns/plugin/pkg/parse"
"github.com/coredns/coredns/plugin/pkg/policy"
pkgtls "github.com/coredns/coredns/plugin/pkg/tls"
"github.com/caddyserver/caddy"
@@ -132,11 +133,11 @@ func parseBlock(c *caddy.Controller, g *GRPC) error {
}
switch x := c.Val(); x {
case "random":
g.p = &random{}
g.p = &policy.Random{}
case "round_robin":
g.p = &roundRobin{}
g.p = &policy.RoundRobin{}
case "sequential":
g.p = &sequential{}
g.p = &policy.Sequential{}
default:
return c.Errf("unknown policy '%s'", x)
}

View File

@@ -0,0 +1,76 @@
package policy
import (
"math/rand"
"sync/atomic"
)
// Policy defines a policy we use for selecting upstreams.
type Policy interface {
List(policy ...interface{}) []interface{}
String() string
}
// Random is a policy that implements random upstream selection.
type Random struct{}
var _ Policy = &Random{}
// String returns the name of policy Random
func (r *Random) String() string { return "random" }
// List returns a set of proxies to be used for this client depending on Random policy.
func (r *Random) List(p ...interface{}) []interface{} {
switch len(p) {
case 1:
return p
case 2:
if rand.Int()%2 == 0 {
return []interface{}{p[1], p[0]} // swap
}
return p
}
perms := rand.Perm(len(p))
rnd := make([]interface{}, len(p))
for i, p1 := range perms {
rnd[i] = p[p1]
}
return rnd
}
// RoundRobin is a policy that selects hosts based on round robin ordering.
type RoundRobin struct {
robin uint32
}
var _ Policy = &RoundRobin{}
// String returns the name of policy RoundRobin
func (r *RoundRobin) String() string { return "round_robin" }
// List returns a set of proxies to be used for this client depending on RoundRobin policy.
func (r *RoundRobin) List(p ...interface{}) []interface{} {
poolLen := uint32(len(p))
i := atomic.AddUint32(&r.robin, 1) % poolLen
robin := []interface{}{p[i]}
robin = append(robin, p[:i]...)
robin = append(robin, p[i+1:]...)
return robin
}
// Sequential is a policy that selects hosts based on sequential ordering.
type Sequential struct{}
var _ Policy = &Sequential{}
// String returns the name of policy Sequential
func (r *Sequential) String() string { return "sequential" }
// List returns a set of proxies without filter.
func (r *Sequential) List(p ...interface{}) []interface{} {
return p
}