Merge branch 'traffic' of github.com:coredns/coredns into traffic

This commit is contained in:
Miek Gieben
2020-01-18 07:22:32 +01:00
45 changed files with 356 additions and 284 deletions

View File

@@ -178,7 +178,7 @@ func (e *Etcd) TTL(kv *mvccpb.KeyValue, serv *msg.Service) uint32 {
// shouldInclude returns true if the service should be included in a list of records, given the qType. For all the
// currently supported lookup types, the only one to allow for an empty Host field in the service are TXT records
// which resolve directly. If a TXT record is being resolved by CNAME, then we expect the Host field to have a
// which resolve directly. If a TXT record is being resolved by CNAME, then we expect the Host field to have a
// value while the TXT field will be empty.
func shouldInclude(serv *msg.Service, qType uint16) bool {
return (qType == dns.TypeTXT && serv.Text != "") || serv.Host != ""

View File

@@ -12,8 +12,8 @@ import (
"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/debug"
"github.com/coredns/coredns/plugin/pkg/policy"
clog "github.com/coredns/coredns/plugin/pkg/log"
"github.com/coredns/coredns/plugin/pkg/policy"
"github.com/coredns/coredns/request"
"github.com/miekg/dns"

View File

@@ -8,8 +8,8 @@ import (
"github.com/coredns/coredns/core/dnsserver"
"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/metrics"
"github.com/coredns/coredns/plugin/pkg/policy"
"github.com/coredns/coredns/plugin/pkg/parse"
"github.com/coredns/coredns/plugin/pkg/policy"
pkgtls "github.com/coredns/coredns/plugin/pkg/tls"
"github.com/coredns/coredns/plugin/pkg/transport"

View File

@@ -7,10 +7,10 @@
## Description
By just using *log* you dump all queries (and parts for the reply) on standard output. Options exist
to tweak the output a little. The date/time prefix on log lines is RFC3339 formatted with
milliseconds.
to tweak the output a little. Note that for busy servers logging will incur a performance hit.
Note that for busy servers logging will incur a performance hit.
Enabling or disabling the *log* plugin only affects the query logging, any other logging from
CoreDNS will show up regardless.
## Syntax
@@ -18,8 +18,7 @@ Note that for busy servers logging will incur a performance hit.
log
~~~
* With no arguments, a query log entry is written to *stdout* in the common log format for all requests
With no arguments, a query log entry is written to *stdout* in the common log format for all requests.
Or if you want/need slightly more control:
~~~ txt
@@ -47,11 +46,11 @@ The classes of responses have the following meaning:
* `denial`: either NXDOMAIN or nodata responses (Name exists, type does not). A nodata response
sets the return code to NOERROR.
* `error`: SERVFAIL, NOTIMP, REFUSED, etc. Anything that indicates the remote server is not willing to
resolve the request.
resolve the request.
* `all`: the default - nothing is specified. Using of this class means that all messages will be
logged whatever we mix together with "all".
If no class is specified, it defaults to *all*.
If no class is specified, it defaults to `all`.
## Log Format

View File

@@ -4,12 +4,23 @@ import (
"fmt"
"net"
"os"
"strings"
"github.com/coredns/coredns/plugin/pkg/transport"
"github.com/miekg/dns"
)
// Strips the zone, but preserves any port that comes after the zone
func stripZone(host string) string {
if strings.Contains(host, "%") {
lastPercent := strings.LastIndex(host, "%")
newHost := host[:lastPercent]
return newHost
}
return host
}
// HostPortOrFile parses the strings in s, each string can either be a
// address, [scheme://]address:port or a filename. The address part is checked
// and in case of filename a resolv.conf like file is (assumed) and parsed and
@@ -21,10 +32,11 @@ func HostPortOrFile(s ...string) ([]string, error) {
trans, host := Transport(h)
addr, _, err := net.SplitHostPort(host)
if err != nil {
// Parse didn't work, it is not a addr:port combo
if net.ParseIP(host) == nil {
// Not an IP address.
hostNoZone := stripZone(host)
if net.ParseIP(hostNoZone) == nil {
ss, err := tryFile(host)
if err == nil {
servers = append(servers, ss...)
@@ -47,8 +59,7 @@ func HostPortOrFile(s ...string) ([]string, error) {
continue
}
if net.ParseIP(addr) == nil {
// Not an IP address.
if net.ParseIP(stripZone(addr)) == nil {
ss, err := tryFile(host)
if err == nil {
servers = append(servers, ss...)

View File

@@ -34,6 +34,26 @@ func TestHostPortOrFile(t *testing.T) {
"127.0.0.1:53",
false,
},
{
"fe80::1",
"[fe80::1]:53",
false,
},
{
"fe80::1%ens3",
"[fe80::1%ens3]:53",
false,
},
{
"[fd01::1]:153",
"[fd01::1]:153",
false,
},
{
"[fd01::1%ens3]:153",
"[fd01::1%ens3]:153",
false,
},
}
err := ioutil.WriteFile("resolv.conf", []byte("nameserver 127.0.0.1\n"), 0600)

View File

@@ -11,8 +11,8 @@ import (
"sync"
clog "github.com/coredns/coredns/plugin/pkg/log"
"github.com/coredns/coredns/plugin/pkg/uniq"
"github.com/coredns/coredns/plugin/pkg/reuseport"
"github.com/coredns/coredns/plugin/pkg/uniq"
)
var (

View File

@@ -32,7 +32,7 @@ it do key or algorithm rollovers - it just signs.
Both these dates are only checked on the SOA's signature(s).
* Create RRSIGs that have an inception of -3 hours (minus a jitter between 0 and 18 hours)
and a expiration of +32 days for every given DNSKEY.
and a expiration of +32 (plus a jitter between 0 and 5 days) days for every given DNSKEY.
* Add NSEC records for all names in the zone. The TTL for these is the negative cache TTL from the
SOA record.

View File

@@ -23,7 +23,7 @@ func setup(c *caddy.Controller) error {
c.OnStartup(sign.OnStartup)
c.OnStartup(func() error {
for _, signer := range sign.signers {
go signer.refresh(DurationRefreshHours)
go signer.refresh(durationRefreshHours)
}
return nil
})
@@ -64,12 +64,13 @@ func parse(c *caddy.Controller) (*Sign, error) {
signers := make([]*Signer, len(origins))
for i := range origins {
signers[i] = &Signer{
dbfile: dbfile,
origin: plugin.Host(origins[i]).Normalize(),
jitter: time.Duration(float32(DurationJitter) * rand.Float32()),
directory: "/var/lib/coredns",
stop: make(chan struct{}),
signedfile: fmt.Sprintf("db.%ssigned", origins[i]), // origins[i] is a fqdn, so it ends with a dot, hence %ssigned.
dbfile: dbfile,
origin: plugin.Host(origins[i]).Normalize(),
jitterIncep: time.Duration(float32(durationInceptionJitter) * rand.Float32()),
jitterExpir: time.Duration(float32(durationExpirationDayJitter) * rand.Float32()),
directory: "/var/lib/coredns",
stop: make(chan struct{}),
signedfile: fmt.Sprintf("db.%ssigned", origins[i]), // origins[i] is a fqdn, so it ends with a dot, hence %ssigned.
}
}

View File

@@ -26,12 +26,13 @@ func (s *Sign) OnStartup() error {
// Various duration constants for signing of the zones.
const (
DurationExpireDays = 7 * 24 * time.Hour // max time allowed before expiration
DurationResignDays = 6 * 24 * time.Hour // if the last sign happenend this long ago, sign again
DurationSignatureExpireDays = 32 * 24 * time.Hour // sign for 32 days
DurationRefreshHours = 5 * time.Hour // check zones every 5 hours
DurationJitter = -18 * time.Hour // default max jitter
DurationSignatureInceptionHours = -3 * time.Hour // -(2+1) hours, be sure to catch daylight saving time and such, jitter is subtracted
durationExpireDays = 7 * 24 * time.Hour // max time allowed before expiration
durationResignDays = 6 * 24 * time.Hour // if the last sign happenend this long ago, sign again
durationSignatureExpireDays = 32 * 24 * time.Hour // sign for 32 days
durationRefreshHours = 5 * time.Hour // check zones every 5 hours
durationInceptionJitter = -18 * time.Hour // default max jitter for the inception
durationExpirationDayJitter = 5 * 24 * time.Hour // default max jitter for the expiration
durationSignatureInceptionHours = -3 * time.Hour // -(2+1) hours, be sure to catch daylight saving time and such, jitter is subtracted
)
const timeFmt = "2006-01-02T15:04:05.000Z07:00"

View File

@@ -18,11 +18,12 @@ var log = clog.NewWithPlugin("sign")
// Signer holds the data needed to sign a zone file.
type Signer struct {
keys []Pair
origin string
dbfile string
directory string
jitter time.Duration
keys []Pair
origin string
dbfile string
directory string
jitterIncep time.Duration
jitterExpir time.Duration
signedfile string
stop chan struct{}
@@ -42,7 +43,7 @@ func (s *Signer) Sign(now time.Time) (*file.Zone, error) {
mttl := z.Apex.SOA.Minttl
ttl := z.Apex.SOA.Header().Ttl
inception, expiration := lifetime(now, s.jitter)
inception, expiration := lifetime(now, s.jitterIncep, s.jitterExpir)
z.Apex.SOA.Serial = uint32(now.Unix())
for _, pair := range s.keys {
@@ -143,8 +144,8 @@ func resign(rd io.Reader, now time.Time) (why error) {
}
incep, _ := time.Parse("20060102150405", dns.TimeToString(x.Inception))
// If too long ago, resign.
if now.Sub(incep) >= 0 && now.Sub(incep) > DurationResignDays {
return fmt.Errorf("inception %q was more than: %s ago from %s: %s", incep.Format(timeFmt), DurationResignDays, now.Format(timeFmt), now.Sub(incep))
if now.Sub(incep) >= 0 && now.Sub(incep) > durationResignDays {
return fmt.Errorf("inception %q was more than: %s ago from %s: %s", incep.Format(timeFmt), durationResignDays, now.Format(timeFmt), now.Sub(incep))
}
// Inception hasn't even start yet.
if now.Sub(incep) < 0 {
@@ -152,8 +153,8 @@ func resign(rd io.Reader, now time.Time) (why error) {
}
expire, _ := time.Parse("20060102150405", dns.TimeToString(x.Expiration))
if expire.Sub(now) < DurationExpireDays {
return fmt.Errorf("expiration %q is less than: %s away from %s: %s", expire.Format(timeFmt), DurationExpireDays, now.Format(timeFmt), expire.Sub(now))
if expire.Sub(now) < durationExpireDays {
return fmt.Errorf("expiration %q is less than: %s away from %s: %s", expire.Format(timeFmt), durationExpireDays, now.Format(timeFmt), expire.Sub(now))
}
}
i++
@@ -173,7 +174,7 @@ func signAndLog(s *Signer, why error) {
z, err := s.Sign(now)
log.Infof("Signing %q because %s", s.origin, why)
if err != nil {
log.Warningf("Error signing %q with key tags %q in %s: %s, next: %s", s.origin, keyTag(s.keys), time.Since(now), err, now.Add(DurationRefreshHours).Format(timeFmt))
log.Warningf("Error signing %q with key tags %q in %s: %s, next: %s", s.origin, keyTag(s.keys), time.Since(now), err, now.Add(durationRefreshHours).Format(timeFmt))
return
}
@@ -181,7 +182,7 @@ func signAndLog(s *Signer, why error) {
log.Warningf("Error signing %q: failed to move zone file into place: %s", s.origin, err)
return
}
log.Infof("Successfully signed zone %q in %q with key tags %q and %d SOA serial, elapsed %f, next: %s", s.origin, filepath.Join(s.directory, s.signedfile), keyTag(s.keys), z.Apex.SOA.Serial, time.Since(now).Seconds(), now.Add(DurationRefreshHours).Format(timeFmt))
log.Infof("Successfully signed zone %q in %q with key tags %q and %d SOA serial, elapsed %f, next: %s", s.origin, filepath.Join(s.directory, s.signedfile), keyTag(s.keys), z.Apex.SOA.Serial, time.Since(now).Seconds(), now.Add(durationRefreshHours).Format(timeFmt))
}
// refresh checks every val if some zones need to be resigned.
@@ -202,8 +203,8 @@ func (s *Signer) refresh(val time.Duration) {
}
}
func lifetime(now time.Time, jitter time.Duration) (uint32, uint32) {
incep := uint32(now.Add(DurationSignatureInceptionHours).Add(jitter).Unix())
expir := uint32(now.Add(DurationSignatureExpireDays).Unix())
func lifetime(now time.Time, jitterInception, jitterExpiration time.Duration) (uint32, uint32) {
incep := uint32(now.Add(durationSignatureInceptionHours).Add(jitterInception).Unix())
expir := uint32(now.Add(durationSignatureExpireDays).Add(jitterExpiration).Unix())
return incep, expir
}

View File

@@ -1,4 +1,4 @@
Hacking on *traffic*
# Hacking on *traffic*
Repos used:
@@ -12,7 +12,8 @@ I found these website useful while working on this.
* https://github.com/envoyproxy/envoy/blob/master/api/API_OVERVIEW.md
* https://github.com/envoyproxy/learnenvoy/blob/master/_articles/service-discovery.md
* This was *really* helpful: https://www.envoyproxy.io/docs/envoy/v1.11.2/api-docs/xds_protocol
* This was *really* helpful: https://www.envoyproxy.io/docs/envoy/v1.11.2/api-docs/xds_protocol to
show the flow of the protocol.
# Testing
@@ -42,16 +43,8 @@ example.org {
Start CoreDNS (`coredns -conf Corefile -dns.port=1053`), and see logging/debugging flow by; the
test binary should also spew out a bunch of things. CoreDNS willl build up a list of cluster and
endpoints. Next you can query it:
~~~ sh
% dig @localhost -p 1053 cluster-v0-0.example.org A
;; QUESTION SECTION:
;cluster-v0-0.example.org. IN A
;; ANSWER SECTION:
cluster-v0-0.example.org. 5 IN A 127.0.0.1
~~~
endpoints. Next you can query it. Note none of the endpoints are HEALTHY so you'll mostly get NODATA
responses, instead of actual records.
Note: the xds/test binary is a go-control-plane binary with added debugging that I'm using for
testing.

View File

@@ -20,10 +20,13 @@ be upgraded, so all traffic to it is drained. Or the entire Kubernetes needs to
endpoints need to be drained from it.
*Traffic* discovers the endpoints via Envoy's xDS protocol. Endpoints and clusters are discovered
every 10 seconds. The plugin hands out responses that adhere to these assignments. Each DNS response
contains a single IP address that's considered the best one. *Traffic* will load balance A and AAAA
queries. The TTL on these answer is set to 5s. It will only return successful responses either with
an answer or otherwise a NODATA response. Queries for non-existent clusters get a NXDOMAIN.
every 10 seconds. The plugin hands out responses that adhere to these assignments. Only endpoints
that are *healthy* are handed out.
Each DNS response contains a single IP address that's considered the best one. *Traffic* will load
balance A and AAAA queries. The TTL on these answer is set to 5s. It will only return successful
responses either with an answer or otherwise a NODATA response. Queries for non-existent clusters
get a NXDOMAIN.
The *traffic* plugin has no notion of draining, drop overload and anything that advanced, *it just
acts upon assignments*. This is means that if a endpoint goes down and *traffic* has not seen a new
@@ -74,12 +77,11 @@ and "cluster-v0" is one of the load balanced cluster, *traffic* will respond to
## Metrics
What metrics should we do?
What metrics should we do? If any? Number of clusters? Number of endpoints and health?
## Ready
Should this plugin implement readyness?
Should this plugin implement readiness?
## Examples
@@ -108,8 +110,7 @@ The following documents provide some background on Envoy's control plane.
## Bugs
Priority from ClusterLoadAssignments is not used. Locality is also not used. Health status of the
endpoints is ignore (for now).
Priority and locality information from ClusterLoadAssignments is not used.
Load reporting via xDS is not supported; this can be implemented, but there are some things that
make this difficult. A single (DNS) query is done by a resolver. Behind this resolver there may be
@@ -121,9 +122,8 @@ Multiple **TO** addresses is not implemented.
## TODO
* acking responses
* correctly tracking versions and pruning old clusters.
* metrics?
* how to exactly deal with health status from the endpoints.
* testing
* more and better testing
* credentials (other than TLS) - how/what?
* is the protocol correctly implemented? Should we not have a 10s tick, but wait for responses from
the control plane?

View File

@@ -20,12 +20,19 @@ func TestParseTraffic(t *testing.T) {
input string
shouldErr bool
}{
// ok
{`traffic grpc://127.0.0.1:18000 {
id test-id
}`, false},
// fail
{`traffic`, true},
{`traffic tls://1.1.1.1`, true},
{`traffic {
id bla bla
}`, true},
{`traffic {
node bla bla
node
}`, true},
}
for i, test := range tests {

View File

@@ -28,17 +28,12 @@ func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg
cluster := ""
for _, o := range t.origins {
println(o, state.Name())
if strings.HasSuffix(state.Name(), o) {
cluster, _ = dnsutil.TrimZone(state.Name(), o)
state.Zone = o
break
}
}
if cluster == "" {
return plugin.NextOrFailure(t.Name(), t.Next, ctx, w, r)
}
m := new(dns.Msg)
m.SetReply(r)
m.Authoritative = true

View File

@@ -1,123 +1,144 @@
package traffic
/*
func TestTraffic(t *testing.T) {
rm := Traffic{Next: handler()}
import (
"context"
"testing"
"github.com/coredns/coredns/plugin/pkg/dnstest"
"github.com/coredns/coredns/plugin/pkg/dnsutil"
"github.com/coredns/coredns/plugin/test"
"github.com/coredns/coredns/plugin/traffic/xds"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
"github.com/miekg/dns"
"google.golang.org/grpc"
)
func TestTraffic(t *testing.T) {
c, err := xds.New("127.0.0.1:0", "test-id", grpc.WithInsecure())
if err != nil {
t.Fatal(err)
}
tr := &Traffic{c: c, origins: []string{"lb.example.org."}}
// the first X records must be cnames after this test
tests := []struct {
answer []dns.RR
extra []dns.RR
cnameAnswer int
cnameExtra int
addressAnswer int
addressExtra int
mxAnswer int
mxExtra int
cla *xdspb.ClusterLoadAssignment
cluster string
qtype uint16
rcode int
answer string // address value of the A/AAAA record.
ns bool // should there be a ns section.
}{
{
answer: []dns.RR{
test.CNAME("cname1.region2.skydns.test. 300 IN CNAME cname2.region2.skydns.test."),
test.CNAME("cname2.region2.skydns.test. 300 IN CNAME cname3.region2.skydns.test."),
test.CNAME("cname5.region2.skydns.test. 300 IN CNAME cname6.region2.skydns.test."),
test.CNAME("cname6.region2.skydns.test. 300 IN CNAME endpoint.region2.skydns.test."),
test.A("endpoint.region2.skydns.test. 300 IN A 10.240.0.1"),
test.MX("mx.region2.skydns.test. 300 IN MX 1 mx1.region2.skydns.test."),
test.MX("mx.region2.skydns.test. 300 IN MX 2 mx2.region2.skydns.test."),
test.MX("mx.region2.skydns.test. 300 IN MX 3 mx3.region2.skydns.test."),
},
cnameAnswer: 4,
addressAnswer: 1,
mxAnswer: 3,
cla: &xdspb.ClusterLoadAssignment{},
cluster: "web", qtype: dns.TypeA, rcode: dns.RcodeSuccess, ns: true,
},
{
answer: []dns.RR{
test.A("endpoint.region2.skydns.test. 300 IN A 10.240.0.1"),
test.MX("mx.region2.skydns.test. 300 IN MX 1 mx1.region2.skydns.test."),
test.CNAME("cname.region2.skydns.test. 300 IN CNAME endpoint.region2.skydns.test."),
},
cnameAnswer: 1,
addressAnswer: 1,
mxAnswer: 1,
cla: &xdspb.ClusterLoadAssignment{},
cluster: "web", qtype: dns.TypeSRV, rcode: dns.RcodeSuccess, ns: true,
},
{
answer: []dns.RR{
test.MX("mx.region2.skydns.test. 300 IN MX 1 mx1.region2.skydns.test."),
test.A("endpoint.region2.skydns.test. 300 IN A 10.240.0.1"),
test.A("endpoint.region2.skydns.test. 300 IN A 10.240.0.2"),
test.MX("mx.region2.skydns.test. 300 IN MX 1 mx2.region2.skydns.test."),
test.CNAME("cname2.region2.skydns.test. 300 IN CNAME cname3.region2.skydns.test."),
test.A("endpoint.region2.skydns.test. 300 IN A 10.240.0.3"),
test.MX("mx.region2.skydns.test. 300 IN MX 1 mx3.region2.skydns.test."),
cla: &xdspb.ClusterLoadAssignment{},
cluster: "does-not-exist", qtype: dns.TypeA, rcode: dns.RcodeNameError, ns: true,
},
// healthy backend
{
cla: &xdspb.ClusterLoadAssignment{
ClusterName: "web",
Endpoints: endpoints([]EndpointHealth{{"127.0.0.1", corepb.HealthStatus_HEALTHY}}),
},
extra: []dns.RR{
test.A("endpoint.region2.skydns.test. 300 IN A 10.240.0.1"),
test.AAAA("endpoint.region2.skydns.test. 300 IN AAAA ::1"),
test.MX("mx.region2.skydns.test. 300 IN MX 1 mx1.region2.skydns.test."),
test.CNAME("cname2.region2.skydns.test. 300 IN CNAME cname3.region2.skydns.test."),
test.MX("mx.region2.skydns.test. 300 IN MX 1 mx2.region2.skydns.test."),
test.A("endpoint.region2.skydns.test. 300 IN A 10.240.0.3"),
test.AAAA("endpoint.region2.skydns.test. 300 IN AAAA ::2"),
test.MX("mx.region2.skydns.test. 300 IN MX 1 mx3.region2.skydns.test."),
cluster: "web", qtype: dns.TypeA, rcode: dns.RcodeSuccess, answer: "127.0.0.1",
},
// unknown backend
{
cla: &xdspb.ClusterLoadAssignment{
ClusterName: "web",
Endpoints: endpoints([]EndpointHealth{{"127.0.0.1", corepb.HealthStatus_UNKNOWN}}),
},
cnameAnswer: 1,
cnameExtra: 1,
addressAnswer: 3,
addressExtra: 4,
mxAnswer: 3,
mxExtra: 3,
cluster: "web", qtype: dns.TypeA, rcode: dns.RcodeSuccess, ns: true,
},
// unknown backend and healthy backend
{
cla: &xdspb.ClusterLoadAssignment{
ClusterName: "web",
Endpoints: endpoints([]EndpointHealth{
{"127.0.0.1", corepb.HealthStatus_UNKNOWN},
{"127.0.0.2", corepb.HealthStatus_HEALTHY},
}),
},
cluster: "web", qtype: dns.TypeA, rcode: dns.RcodeSuccess, answer: "127.0.0.2",
},
}
rec := dnstest.NewRecorder(&test.ResponseWriter{})
ctx := context.TODO()
for i, test := range tests {
req := new(dns.Msg)
req.SetQuestion("region2.skydns.test.", dns.TypeSRV)
req.Answer = test.answer
req.Extra = test.extra
for i, tc := range tests {
a := xds.NewAssignment()
a.SetClusterLoadAssignment("web", tc.cla) // web is our cluster
c.SetAssignments(a)
_, err := rm.ServeDNS(context.TODO(), rec, req)
m := new(dns.Msg)
cl := dnsutil.Join(tc.cluster, tr.origins[0])
m.SetQuestion(cl, tc.qtype)
rec := dnstest.NewRecorder(&test.ResponseWriter{})
_, err := tr.ServeDNS(ctx, rec, m)
if err != nil {
t.Errorf("Test %d: Expected no error, but got %s", i, err)
continue
t.Errorf("Test %d: Expected no error, but got %q", i, err)
}
if rec.Msg.Rcode != tc.rcode {
t.Errorf("Test %d: Expected no rcode %d, but got %d", i, tc.rcode, rec.Msg.Rcode)
}
if tc.ns && len(rec.Msg.Ns) == 0 {
t.Errorf("Test %d: Expected authority section, but got none", i)
}
if tc.answer != "" && len(rec.Msg.Answer) == 0 {
t.Fatalf("Test %d: Expected answer section, but got none", i)
}
if tc.answer != "" {
record := rec.Msg.Answer[0]
addr := ""
switch x := record.(type) {
case *dns.A:
addr = x.A.String()
case *dns.AAAA:
addr = x.AAAA.String()
}
if tc.answer != addr {
t.Errorf("Test %d: Expected answer %s, but got %s", i, tc.answer, addr)
}
}
cname, address, mx, sorted := countRecords(rec.Msg.Answer)
if !sorted {
t.Errorf("Test %d: Expected CNAMEs, then AAAAs, then MX in Answer, but got mixed", i)
}
if cname != test.cnameAnswer {
t.Errorf("Test %d: Expected %d CNAMEs in Answer, but got %d", i, test.cnameAnswer, cname)
}
if address != test.addressAnswer {
t.Errorf("Test %d: Expected %d A/AAAAs in Answer, but got %d", i, test.addressAnswer, address)
}
if mx != test.mxAnswer {
t.Errorf("Test %d: Expected %d MXs in Answer, but got %d", i, test.mxAnswer, mx)
}
cname, address, mx, sorted = countRecords(rec.Msg.Extra)
if !sorted {
t.Errorf("Test %d: Expected CNAMEs, then AAAAs, then MX in Extra, but got mixed", i)
}
if cname != test.cnameExtra {
t.Errorf("Test %d: Expected %d CNAMEs in Extra, but got %d", i, test.cnameAnswer, cname)
}
if address != test.addressExtra {
t.Errorf("Test %d: Expected %d A/AAAAs in Extra, but got %d", i, test.addressAnswer, address)
}
if mx != test.mxExtra {
t.Errorf("Test %d: Expected %d MXs in Extra, but got %d", i, test.mxAnswer, mx)
}
}
}
func handler() plugin.Handler {
return plugin.HandlerFunc(func(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) {
w.WriteMsg(r)
return dns.RcodeSuccess, nil
})
type EndpointHealth struct {
Address string
Health corepb.HealthStatus
}
func endpoints(e []EndpointHealth) []*endpointpb.LocalityLbEndpoints {
ep := make([]*endpointpb.LocalityLbEndpoints, len(e))
for i := range e {
ep[i] = &endpointpb.LocalityLbEndpoints{
LbEndpoints: []*endpointpb.LbEndpoint{{
HostIdentifier: &endpointpb.LbEndpoint_Endpoint{
Endpoint: &endpointpb.Endpoint{
Address: &corepb.Address{
Address: &corepb.Address_SocketAddress{
SocketAddress: &corepb.SocketAddress{
Address: e[i].Address,
},
},
},
},
},
HealthStatus: e[i].Health,
}},
}
}
return ep
}
*/

View File

@@ -6,15 +6,21 @@ import (
"sync"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
)
type assignment struct {
mu sync.RWMutex
cla map[string]*xdspb.ClusterLoadAssignment
version int // not sure what do with and if we should discard all clusters.
mu sync.RWMutex
cla map[string]*xdspb.ClusterLoadAssignment
}
func (a *assignment) setClusterLoadAssignment(cluster string, cla *xdspb.ClusterLoadAssignment) {
// NewAssignment returns a pointer to an assignment.
func NewAssignment() *assignment {
return &assignment{cla: make(map[string]*xdspb.ClusterLoadAssignment)}
}
// SetClusterLoadAssignment sets the assignment for the cluster to cla.
func (a *assignment) SetClusterLoadAssignment(cluster string, cla *xdspb.ClusterLoadAssignment) {
// If cla is nil we just found a cluster, check if we already know about it, or if we need to make a new entry.
a.mu.Lock()
defer a.mu.Unlock()
@@ -30,7 +36,8 @@ func (a *assignment) setClusterLoadAssignment(cluster string, cla *xdspb.Cluster
}
func (a *assignment) clusterLoadAssignment(cluster string) *xdspb.ClusterLoadAssignment {
// ClusterLoadAssignment returns the assignment for the cluster or nil if there is none.
func (a *assignment) ClusterLoadAssignment(cluster string) *xdspb.ClusterLoadAssignment {
a.mu.RLock()
cla, ok := a.cla[cluster]
a.mu.RUnlock()
@@ -52,55 +59,58 @@ func (a *assignment) clusters() []string {
return clusters
}
// Select selects a backend from cla, using weighted random selection. It only selects
// Select selects a backend from cluster load assignments, using weighted random selection. It only selects
// backends that are reporting healthy.
func (a *assignment) Select(cluster string) (net.IP, bool) {
cla := a.clusterLoadAssignment(cluster)
cla := a.ClusterLoadAssignment(cluster)
if cla == nil {
return nil, false
}
total := 0
i := 0
healthy := 0
for _, ep := range cla.Endpoints {
for _, lb := range ep.GetLbEndpoints() {
// if lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY {
// continue
// }
if lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY {
continue
}
total += int(lb.GetLoadBalancingWeight().GetValue())
i++
healthy++
}
}
if healthy == 0 {
return nil, true
}
if total == 0 {
// all weights are 0, randomly select one of the endpoints.
r := rand.Intn(i)
r := rand.Intn(healthy)
i := 0
for _, ep := range cla.Endpoints {
for _, lb := range ep.GetLbEndpoints() {
// if lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY {
// continue
// }
if lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY {
continue
}
if r == i {
return net.ParseIP(lb.GetEndpoint().GetAddress().GetSocketAddress().GetAddress()), true
}
i++
}
}
return nil
return nil, true
}
r := rand.Intn(total) + 1
for _, ep := range cla.Endpoints {
for _, lb := range ep.GetLbEndpoints() {
// if lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY {
// continue
// }
if lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY {
continue
}
r -= int(lb.GetLoadBalancingWeight().GetValue())
if r <= 0 {
return net.ParseIP(lb.GetEndpoint().GetAddress().GetSocketAddress().GetAddress()), true
}
}
}
return nil, false
return nil, true
}

View File

@@ -52,12 +52,14 @@ type adsStream adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClien
type Client struct {
cc *grpc.ClientConn
ctx context.Context
assignments *assignment
assignments *assignment // assignments contains the current clusters and endpoints.
node *corepb.Node
cancel context.CancelFunc
stop chan struct{}
mu sync.RWMutex
nonce string
version map[string]string
nonce map[string]string
}
// New returns a new client that's dialed to addr using node as the local identifier.
@@ -79,6 +81,7 @@ func New(addr, node string, opts ...grpc.DialOption) (*Client, error) {
},
}
c.assignments = &assignment{cla: make(map[string]*xdspb.ClusterLoadAssignment)}
c.version, c.nonce = make(map[string]string), make(map[string]string)
c.ctx, c.cancel = context.WithCancel(context.Background())
return c, nil
@@ -106,13 +109,15 @@ func (c *Client) Run() {
done := make(chan struct{})
go func() {
tick := time.NewTicker(1 * time.Second)
if err := c.clusterDiscovery(stream, c.Version(cdsURL), c.Nonce(cdsURL), []string{}); err != nil {
log.Debug(err)
}
tick := time.NewTicker(10 * time.Second)
for {
select {
case <-tick.C:
// send empty list for cluster discovery again and again
log.Debugf("Requesting cluster list, nonce %q:", c.Nonce())
if err := c.clusterDiscovery(stream, "", c.Nonce(), []string{}); err != nil {
// send empty list for cluster discovery every 10 seconds
if err := c.clusterDiscovery(stream, c.Version(cdsURL), c.Nonce(cdsURL), []string{}); err != nil {
log.Debug(err)
}
@@ -124,7 +129,7 @@ func (c *Client) Run() {
}()
if err := c.Receive(stream); err != nil {
log.Debug(err)
log.Warning(err)
}
close(done)
}
@@ -164,7 +169,7 @@ func (c *Client) Receive(stream adsStream) error {
switch resp.GetTypeUrl() {
case cdsURL:
a := &assignment{cla: make(map[string]*xdspb.ClusterLoadAssignment)}
a := NewAssignment()
for _, r := range resp.GetResources() {
var any ptypes.DynamicAny
if err := ptypes.UnmarshalAny(r, &any); err != nil {
@@ -175,24 +180,18 @@ func (c *Client) Receive(stream adsStream) error {
if !ok {
continue
}
a.setClusterLoadAssignment(cluster.GetName(), nil)
a.SetClusterLoadAssignment(cluster.GetName(), nil)
}
log.Debugf("Cluster discovery processed with %d resources", len(resp.GetResources()))
// ack the CDS proto, with we we've got. (empty version would be NACK)
if err := c.clusterDiscovery(stream, resp.GetVersionInfo(), resp.GetNonce(), a.clusters()); err != nil {
log.Debug(err)
continue
}
// need to figure out how to handle the versions and nounces exactly.
c.SetNonce(resp.GetNonce())
log.Debugf("Cluster discovery processed with %d resources, version %q and nonce %q, clusters: %v", len(resp.GetResources()), c.Version(cdsURL), c.Nonce(cdsURL), a.clusters())
// set our local administration and ack the reply. Empty version would signal NACK.
c.SetNonce(cdsURL, resp.GetNonce())
c.SetVersion(cdsURL, resp.GetVersionInfo())
c.SetAssignments(a)
c.clusterDiscovery(stream, resp.GetVersionInfo(), resp.GetNonce(), a.clusters())
// now kick off discovery for endpoints
if err := c.endpointDiscovery(stream, "", resp.GetNonce(), a.clusters()); err != nil {
if err := c.endpointDiscovery(stream, c.Version(edsURL), c.Nonce(edsURL), a.clusters()); err != nil {
log.Debug(err)
continue
}
case edsURL:
for _, r := range resp.GetResources() {
@@ -205,10 +204,12 @@ func (c *Client) Receive(stream adsStream) error {
if !ok {
continue
}
c.assignments.setClusterLoadAssignment(cla.GetClusterName(), cla)
// ack the bloody thing
c.assignments.SetClusterLoadAssignment(cla.GetClusterName(), cla)
}
log.Debugf("Endpoint discovery processed with %d resources", len(resp.GetResources()))
log.Debugf("Endpoint discovery processed with %d resources, version %q and nonce %q, clusters: %v", len(resp.GetResources()), c.Version(edsURL), c.Nonce(edsURL), c.assignments.clusters())
// set our local administration and ack the reply. Empty version would signal NACK.
c.SetNonce(edsURL, resp.GetNonce())
c.SetVersion(edsURL, resp.GetVersionInfo())
default:
return fmt.Errorf("unknown response URL for discovery: %q", resp.GetTypeUrl())
@@ -218,4 +219,9 @@ func (c *Client) Receive(stream adsStream) error {
// Select returns an address that is deemed to be the correct one for this cluster. The returned
// boolean indicates if the cluster exists.
func (c *Client) Select(cluster string) (net.IP, bool) { return c.assignments.Select(cluster) }
func (c *Client) Select(cluster string) (net.IP, bool) {
if cluster == "" {
return nil, false
}
return c.assignments.Select(cluster)
}

View File

@@ -1,17 +1,5 @@
package xds
func (c *Client) Nonce() string {
c.mu.RLock()
defer c.mu.RUnlock()
return c.nonce
}
func (c *Client) SetNonce(n string) {
c.mu.Lock()
defer c.mu.Unlock()
c.nonce = n
}
func (c *Client) Assignments() *assignment {
c.mu.RLock()
defer c.mu.RUnlock()
@@ -23,3 +11,27 @@ func (c *Client) SetAssignments(a *assignment) {
defer c.mu.Unlock()
c.assignments = a
}
func (c *Client) Version(typeURL string) string {
c.mu.RLock()
defer c.mu.RUnlock()
return c.version[typeURL]
}
func (c *Client) SetVersion(typeURL, a string) {
c.mu.Lock()
defer c.mu.Unlock()
c.version[typeURL] = a
}
func (c *Client) Nonce(typeURL string) string {
c.mu.RLock()
defer c.mu.RUnlock()
return c.nonce[typeURL]
}
func (c *Client) SetNonce(typeURL, n string) {
c.mu.Lock()
defer c.mu.Unlock()
c.nonce[typeURL] = n
}