diff --git a/core/dnsserver/zdirectives.go b/core/dnsserver/zdirectives.go index 61d96c633..08b36475c 100644 --- a/core/dnsserver/zdirectives.go +++ b/core/dnsserver/zdirectives.go @@ -30,6 +30,7 @@ var Directives = []string{ "acl", "any", "chaos", + "traffic", "loadbalance", "cache", "rewrite", diff --git a/core/plugin/zplugin.go b/core/plugin/zplugin.go index 90267f29b..374d0b8fe 100644 --- a/core/plugin/zplugin.go +++ b/core/plugin/zplugin.go @@ -46,6 +46,7 @@ import ( _ "github.com/coredns/coredns/plugin/template" _ "github.com/coredns/coredns/plugin/tls" _ "github.com/coredns/coredns/plugin/trace" + _ "github.com/coredns/coredns/plugin/traffic" _ "github.com/coredns/coredns/plugin/transfer" _ "github.com/coredns/coredns/plugin/whoami" _ "github.com/coredns/federation" diff --git a/go.mod b/go.mod index 7e4178785..60b97b1e8 100644 --- a/go.mod +++ b/go.mod @@ -16,9 +16,11 @@ require ( github.com/coreos/go-systemd v0.0.0-20190212144455-93d5ec2c7f76 // indirect github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect github.com/dnstap/golang-dnstap v0.0.0-20170829151710-2cf77a2b5e11 + github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473 github.com/farsightsec/golang-framestream v0.0.0-20181102145529-8a0cb8ba8710 github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef // indirect github.com/golang/protobuf v1.3.2 + github.com/google/go-cmp v0.3.1 github.com/googleapis/gnostic v0.2.0 // indirect github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 github.com/imdario/mergo v0.3.7 // indirect diff --git a/go.sum b/go.sum index 0bcb3c4ab..be128c24a 100644 --- a/go.sum +++ b/go.sum @@ -124,7 +124,9 @@ github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473 h1:4cmBvAEBNJaGARUEs3/suWRyfyBfhf7I60WBZq+bv2w= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/evanphx/json-patch v4.2.0+incompatible h1:fUDGZCv/7iAN7u0puUVhvKCcsR6vRfwrJatElLBEf0I= github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= diff --git a/plugin.cfg b/plugin.cfg index 3f3dd85fd..f8e00a4e2 100644 --- a/plugin.cfg +++ b/plugin.cfg @@ -39,6 +39,7 @@ dnstap:dnstap acl:acl any:any chaos:chaos +traffic:traffic loadbalance:loadbalance cache:cache rewrite:rewrite diff --git a/plugin/traffic/HACKING.md b/plugin/traffic/HACKING.md new file mode 100644 index 000000000..5fd2faf0d --- /dev/null +++ b/plugin/traffic/HACKING.md @@ -0,0 +1,58 @@ +# Hacking on *traffic* + +Repos used: + + +: implements control plane, has testing stuff in pkg/test/main (iirc). + + +: implements client for xDS - much of this code has been reused here. + +I found these website useful while working on this. + +* https://github.com/envoyproxy/envoy/blob/master/api/API_OVERVIEW.md +* https://github.com/envoyproxy/learnenvoy/blob/master/_articles/service-discovery.md +* This was *really* helpful: https://www.envoyproxy.io/docs/envoy/v1.11.2/api-docs/xds_protocol to + show the flow of the protocol. + +# Testing + +Assuming you have envoyproxy/go-control-plane checked out somewhere, then: + +~~~ sh +% cd ~/src/github.com/envoyproxy/go-control-plane/pkg/test/main +% go build +% ./main --xds=ads --runtimes=2 -debug +~~~ + +This runs a binary from pkg/test/main. Now we're testing aDS. Everything is using gRPC with TLS +disabled: `grpc.WithInsecure()`. The test binary runs on port 18000 on localhost; all these things +are currently hardcoded in the *traffic* plugin. This will be factored out into config as some +point. Another thing that is hardcoded is the use of the "example.org" domain. + +Then for CoreDNS, check out the `traffic` branch, create a Corefile: + +~~~ Corefile +example.org { + traffic grpc://127.0.0.1:18000 { + id test-id + } + debug +} +~~~ + +Start CoreDNS (`coredns -conf Corefile -dns.port=1053`), and see logging/debugging flow by; the +test binary should also spew out a bunch of things. CoreDNS willl build up a list of cluster and +endpoints. Next you can query it: + +~~~ sh +% dig @localhost -p 1053 cluster-v0-0.example.org A +;; QUESTION SECTION: +;cluster-v0-0.example.org. IN A + +;; ANSWER SECTION: +cluster-v0-0.example.org. 5 IN A 127.0.0.1 +~~~ + +Note: the xds/test binary is a go-control-plane binary with added debugging that I'm using for +testing. diff --git a/plugin/traffic/README.md b/plugin/traffic/README.md new file mode 100644 index 000000000..94f0a99bc --- /dev/null +++ b/plugin/traffic/README.md @@ -0,0 +1,129 @@ +# traffic + +## Name + +*traffic* - handout addresses according to assignments from Envoy's xDS. + +## Description + +The *traffic* plugin is a balancer that allows traffic steering, weighted responses +and draining of clusters. The cluster information is retrieved from a service +discovery manager that implements the service discovery protocols that Envoy +[implements](https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol). + +A Cluster is defined as: "A group of logically similar endpoints that Envoy connects to." Each +cluster has a name, which *traffic* extends to be a domain name. See "Naming Clusters" below. + +The use case for this plugin is when a cluster has endpoints running in multiple (Kubernetes?) +clusters and you need to steer traffic to (or away) from these endpoints, i.e. endpoint A needs to +be upgraded, so all traffic to it is drained. Or the entire Kubernetes needs to upgraded, and *all* +endpoints need to be drained from it. + +*Traffic* discovers the endpoints via Envoy's xDS protocol. Endpoints and clusters are discovered +every 10 seconds. The plugin hands out responses that adhere to these assignments. Only endpoints +that are *healthy* are handed out. + +Each DNS response contains a single IP address that's considered the best one. *Traffic* will load +balance A and AAAA queries. The TTL on these answer is set to 5s. It will only return successful +responses either with an answer or otherwise a NODATA response. Queries for non-existent clusters +get a NXDOMAIN. + +The *traffic* plugin has no notion of draining, drop overload and anything that advanced, *it just +acts upon assignments*. This is means that if a endpoint goes down and *traffic* has not seen a new +assignment yet, it will still include this endpoint address in responses. + +## Syntax + +~~~ +traffic TO... +~~~ + +This enabled the *traffic* plugin, with a default node id of `coredns` and no TLS. + +* **TO...** are the Envoy control plane endpoint to connect to. This must start with `grpc://`. + +The extended syntax is available is you want more control. + +~~~ +traffic TO... { + server SERVER [SERVER]... + node ID + tls CERT KEY CA + tls_servername NAME +} +~~~ + +* node **ID** is how *traffic* identifies itself to the control plane. This defaults to `coredns`. +* `tls` **CERT** **KEY** **CA** define the TLS properties for gRPC connection. If this is omitted an + insecure connection is attempted. From 0 to 3 arguments can be provided with the meaning as described below + + * `tls` - no client authentication is used, and the system CAs are used to verify the server certificate + * `tls` **CA** - no client authentication is used, and the file CA is used to verify the server certificate + * `tls` **CERT** **KEY** - client authentication is used with the specified cert/key pair. + The server certificate is verified with the system CAs. + * `tls` **CERT** **KEY** **CA** - client authentication is used with the specified cert/key pair. + The server certificate is verified using the specified CA file. + +* `tls_servername` **NAME** allows you to set a server name in the TLS configuration. This is needed + because *traffic* connects to an IP address, so it can't infer the server name from it. + +## Naming Clusters + +When a cluster is named this usually consists out of a single word, i.e. "cluster-v0", or "web". +The *traffic* plugins uses the name(s) specified in the Server Block to create fully qualified +domain names. For example if the Server Block specifies `lb.example.org` as one of the names, +and "cluster-v0" is one of the load balanced cluster, *traffic* will respond to query asking for +`cluster-v0.lb.example.org.` and the same goes for `web`; `web.lb.example.org`. + +## Metrics + +What metrics should we do? If any? Number of clusters? Number of endpoints and health? + +## Ready + +Should this plugin implement readiness? + +## Examples + +~~~ +lb.example.org { + traffic grpc://127.0.0.1:18000 { + node test-id + } + debug + log +} +~~~ + +This will load balance any names under `lb.example.org` using the data from the manager running on +localhost on port 18000. The node ID will be `test-id` and no TLS will be used. + +## Also See + +The following documents provide some background on Envoy's control plane. + + * + + * + + * + +## Bugs + +Priority and locality information from ClusterLoadAssignments is not used. + +Load reporting via xDS is not supported; this can be implemented, but there are some things that +make this difficult. A single (DNS) query is done by a resolver. Behind this resolver there may be +many clients that will use this reply, the responding server (CoreDNS) has no idea how many clients +use this resolver. So reporting a load of +1 on the CoreDNS side can be anything from 1 to 1000+, +making the load reporting highly inaccurate. + +Multiple **TO** addresses is not implemented. + +## TODO + +* metrics? +* more and better testing +* credentials (other than TLS) - how/what? +* is the protocol correctly implemented? Should we not have a 10s tick, but wait for responses from + the control plane? diff --git a/plugin/traffic/setup.go b/plugin/traffic/setup.go new file mode 100644 index 000000000..bfd9683a4 --- /dev/null +++ b/plugin/traffic/setup.go @@ -0,0 +1,122 @@ +package traffic + +import ( + "crypto/tls" + "fmt" + "math/rand" + "strings" + "time" + + "github.com/coredns/coredns/core/dnsserver" + "github.com/coredns/coredns/plugin" + clog "github.com/coredns/coredns/plugin/pkg/log" + "github.com/coredns/coredns/plugin/pkg/parse" + pkgtls "github.com/coredns/coredns/plugin/pkg/tls" + "github.com/coredns/coredns/plugin/pkg/transport" + "github.com/coredns/coredns/plugin/traffic/xds" + + "github.com/caddyserver/caddy" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +var log = clog.NewWithPlugin("traffic") + +func init() { plugin.Register("traffic", setup) } + +func setup(c *caddy.Controller) error { + rand.Seed(int64(time.Now().Nanosecond())) + t, err := parseTraffic(c) + if err != nil { + return plugin.Error("traffic", err) + } + + dnsserver.GetConfig(c).AddPlugin(func(next plugin.Handler) plugin.Handler { + t.Next = next + return t + }) + + c.OnStartup(func() error { + go t.c.Run() + return nil + }) + c.OnShutdown(func() error { return t.c.Stop() }) + return nil +} + +func parseTraffic(c *caddy.Controller) (*Traffic, error) { + node := "coredns" + toHosts := []string{} + t := &Traffic{} + var ( + err error + tlsConfig *tls.Config + tlsServerName string + ) + + t.origins = make([]string, len(c.ServerBlockKeys)) + for i := range c.ServerBlockKeys { + t.origins[i] = plugin.Host(c.ServerBlockKeys[i]).Normalize() + } + + for c.Next() { + args := c.RemainingArgs() + if len(args) < 1 { + return nil, c.ArgErr() + } + toHosts, err = parse.HostPortOrFile(args...) + if err != nil { + return nil, err + } + for i := range toHosts { + if !strings.HasPrefix(toHosts[i], transport.GRPC+"://") { + return nil, fmt.Errorf("not a %s scheme: %s", transport.GRPC, toHosts[i]) + } + // now cut the prefix off again, because the dialler needs to see normal address strings. All this + // grpc:// stuff is to enforce uniform across plugins and future proofing for other protocols. + toHosts[i] = toHosts[i][len(transport.GRPC+"://"):] + } + for c.NextBlock() { + switch c.Val() { + case "id": + args := c.RemainingArgs() + if len(args) != 1 { + return nil, c.ArgErr() + } + node = args[0] + case "tls": + args := c.RemainingArgs() + if len(args) > 3 { + return nil, c.ArgErr() + } + + tlsConfig, err = pkgtls.NewTLSConfigFromArgs(args...) + if err != nil { + return nil, err + } + case "tls_servername": + if !c.NextArg() { + return nil, c.ArgErr() + } + tlsServerName = c.Val() + default: + return nil, c.Errf("unknown property '%s'", c.Val()) + } + } + } + + opts := []grpc.DialOption{grpc.WithInsecure()} + if tlsConfig != nil { + if tlsServerName != "" { + tlsConfig.ServerName = tlsServerName + } + opts = []grpc.DialOption{grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))} + } + + // TODO: only the first host is used, need to figure out how to reconcile multiple upstream providers. + if t.c, err = xds.New(toHosts[0], node, opts...); err != nil { + return nil, err + } + + return t, nil +} diff --git a/plugin/traffic/setup_test.go b/plugin/traffic/setup_test.go new file mode 100644 index 000000000..99403fbcd --- /dev/null +++ b/plugin/traffic/setup_test.go @@ -0,0 +1,53 @@ +package traffic + +import ( + "testing" + + "github.com/caddyserver/caddy" +) + +func TestSetup(t *testing.T) { + /* + c := caddy.NewTestController("dns", `traffic grpc://bla`) + if err := setup(c); err != nil { + t.Fatalf("Test 1, expected no errors, but got: %q", err) + } + */ +} + +func TestParseTraffic(t *testing.T) { + tests := []struct { + input string + shouldErr bool + }{ + // ok + {`traffic grpc://127.0.0.1:18000 { + id test-id + }`, false}, + + // fail + {`traffic`, true}, + {`traffic tls://1.1.1.1`, true}, + {`traffic { + id bla bla + }`, true}, + {`traffic { + node + }`, true}, + } + for i, test := range tests { + c := caddy.NewTestController("dns", test.input) + _, err := parseTraffic(c) + if test.shouldErr && err == nil { + t.Errorf("Test %v: Expected error but found nil", i) + continue + } else if !test.shouldErr && err != nil { + t.Errorf("Test %v: Expected no error but found error: %v", i, err) + continue + } + + if test.shouldErr { + continue + } + } +} diff --git a/plugin/traffic/traffic.go b/plugin/traffic/traffic.go new file mode 100644 index 000000000..0090bd780 --- /dev/null +++ b/plugin/traffic/traffic.go @@ -0,0 +1,93 @@ +package traffic + +import ( + "context" + "strings" + "time" + + "github.com/coredns/coredns/plugin" + "github.com/coredns/coredns/plugin/pkg/dnsutil" + "github.com/coredns/coredns/plugin/traffic/xds" + "github.com/coredns/coredns/request" + + "github.com/miekg/dns" +) + +// Traffic is a plugin that load balances according to assignments. +type Traffic struct { + c *xds.Client + id string + origins []string + + Next plugin.Handler +} + +// ServeDNS implements the plugin.Handler interface. +func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) { + state := request.Request{Req: r, W: w} + + cluster := "" + for _, o := range t.origins { + if strings.HasSuffix(state.Name(), o) { + cluster, _ = dnsutil.TrimZone(state.Name(), o) + state.Zone = o + break + } + } + m := new(dns.Msg) + m.SetReply(r) + m.Authoritative = true + + addr, ok := t.c.Select(cluster) + if !ok { + m.Ns = soa(state.Zone) + m.Rcode = dns.RcodeNameError + w.WriteMsg(m) + return 0, nil + } + + if addr == nil { + log.Debugf("No (healthy) endpoints found for %q", cluster) + m.Ns = soa(state.Zone) + w.WriteMsg(m) + return 0, nil + } + + switch state.QType() { + case dns.TypeA: + if addr.To4() == nil { // it's an IPv6 address, return nodata in that case. + m.Ns = soa(state.Zone) + break + } + m.Answer = []dns.RR{&dns.A{Hdr: dns.RR_Header{Name: state.QName(), Rrtype: dns.TypeA, Class: dns.ClassINET, Ttl: 5}, A: addr}} + + case dns.TypeAAAA: + if addr.To4() != nil { // it's an IPv4 address, return nodata in that case. + m.Ns = soa(state.Zone) + break + } + m.Answer = []dns.RR{&dns.AAAA{Hdr: dns.RR_Header{Name: state.QName(), Rrtype: dns.TypeA, Class: dns.ClassINET, Ttl: 5}, AAAA: addr}} + default: + m.Ns = soa(state.Zone) + } + + w.WriteMsg(m) + return 0, nil +} + +// soa returns a synthetic so for this zone. +func soa(z string) []dns.RR { + return []dns.RR{&dns.SOA{ + Hdr: dns.RR_Header{Name: z, Rrtype: dns.TypeSOA, Class: dns.ClassINET, Ttl: 5}, + Ns: dnsutil.Join("ns", z), + Mbox: dnsutil.Join("coredns", z), + Serial: uint32(time.Now().UTC().Unix()), + Refresh: 14400, + Retry: 3600, + Expire: 604800, + Minttl: 5, + }} +} + +// Name implements the plugin.Handler interface. +func (t *Traffic) Name() string { return "traffic" } diff --git a/plugin/traffic/traffic_test.go b/plugin/traffic/traffic_test.go new file mode 100644 index 000000000..5db7cc451 --- /dev/null +++ b/plugin/traffic/traffic_test.go @@ -0,0 +1,130 @@ +package traffic + +import ( + "context" + "testing" + + "github.com/coredns/coredns/plugin/pkg/dnstest" + "github.com/coredns/coredns/plugin/pkg/dnsutil" + "github.com/coredns/coredns/plugin/test" + "github.com/coredns/coredns/plugin/traffic/xds" + + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" + "github.com/miekg/dns" + "google.golang.org/grpc" +) + +func TestTraffic(t *testing.T) { + c, err := xds.New("127.0.0.1:0", "test-id", grpc.WithInsecure()) + if err != nil { + t.Fatal(err) + } + tr := &Traffic{c: c, origins: []string{"lb.example.org."}} + + tests := []struct { + cla *xdspb.ClusterLoadAssignment + cluster string + qtype uint16 + rcode int + answer string // address value of the A/AAAA record. + ns bool // should there be a ns section. + }{ + { + cla: &xdspb.ClusterLoadAssignment{}, + cluster: "web", qtype: dns.TypeA, rcode: dns.RcodeSuccess, ns: true, + }, + { + cla: &xdspb.ClusterLoadAssignment{}, + cluster: "web", qtype: dns.TypeSRV, rcode: dns.RcodeSuccess, ns: true, + }, + { + cla: &xdspb.ClusterLoadAssignment{}, + cluster: "does-not-exist", qtype: dns.TypeA, rcode: dns.RcodeNameError, ns: true}, + { + cla: &xdspb.ClusterLoadAssignment{ + ClusterName: "web", + Endpoints: endpoints([]EndpointHealth{{"127.0.0.1", corepb.HealthStatus_HEALTHY}}), + }, + cluster: "web", qtype: dns.TypeA, rcode: dns.RcodeSuccess, answer: "127.0.0.1", + }, + { + cla: &xdspb.ClusterLoadAssignment{ + ClusterName: "web", + Endpoints: endpoints([]EndpointHealth{{"127.0.0.1", corepb.HealthStatus_UNKNOWN}}), + }, + cluster: "web", qtype: dns.TypeA, rcode: dns.RcodeSuccess, ns: true, + }, + } + + ctx := context.TODO() + + for i, tc := range tests { + a := xds.NewAssignment() + a.SetClusterLoadAssignment("web", tc.cla) // web is our cluster + c.SetAssignments(a) + + m := new(dns.Msg) + cl := dnsutil.Join(tc.cluster, tr.origins[0]) + m.SetQuestion(cl, tc.qtype) + + rec := dnstest.NewRecorder(&test.ResponseWriter{}) + _, err := tr.ServeDNS(ctx, rec, m) + if err != nil { + t.Errorf("Test %d: Expected no error, but got %q", i, err) + } + if rec.Msg.Rcode != tc.rcode { + t.Errorf("Test %d: Expected no rcode %d, but got %d", i, tc.rcode, rec.Msg.Rcode) + } + if tc.ns && len(rec.Msg.Ns) == 0 { + t.Errorf("Test %d: Expected authority section, but got none", i) + } + if tc.answer != "" && len(rec.Msg.Answer) == 0 { + t.Fatalf("Test %d: Expected answer section, but got none", i) + } + if tc.answer != "" { + record := rec.Msg.Answer[0] + addr := "" + switch x := record.(type) { + case *dns.A: + addr = x.A.String() + case *dns.AAAA: + addr = x.AAAA.String() + } + if tc.answer != addr { + t.Errorf("Test %d: Expected answer %s, but got %s", i, tc.answer, addr) + } + + } + + } +} + +type EndpointHealth struct { + Address string + Health corepb.HealthStatus +} + +func endpoints(e []EndpointHealth) []*endpointpb.LocalityLbEndpoints { + ep := make([]*endpointpb.LocalityLbEndpoints, len(e)) + for i := range e { + ep[i] = &endpointpb.LocalityLbEndpoints{ + LbEndpoints: []*endpointpb.LbEndpoint{{ + HostIdentifier: &endpointpb.LbEndpoint_Endpoint{ + Endpoint: &endpointpb.Endpoint{ + Address: &corepb.Address{ + Address: &corepb.Address_SocketAddress{ + SocketAddress: &corepb.SocketAddress{ + Address: e[i].Address, + }, + }, + }, + }, + }, + HealthStatus: e[i].Health, + }}, + } + } + return ep +} diff --git a/plugin/traffic/xds/assignment.go b/plugin/traffic/xds/assignment.go new file mode 100644 index 000000000..c8d1a5798 --- /dev/null +++ b/plugin/traffic/xds/assignment.go @@ -0,0 +1,116 @@ +package xds + +import ( + "math/rand" + "net" + "sync" + + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" +) + +type assignment struct { + mu sync.RWMutex + cla map[string]*xdspb.ClusterLoadAssignment +} + +// NewAssignment returns a pointer to an assignment. +func NewAssignment() *assignment { + return &assignment{cla: make(map[string]*xdspb.ClusterLoadAssignment)} +} + +// SetClusterLoadAssignment sets the assignment for the cluster to cla. +func (a *assignment) SetClusterLoadAssignment(cluster string, cla *xdspb.ClusterLoadAssignment) { + // If cla is nil we just found a cluster, check if we already know about it, or if we need to make a new entry. + a.mu.Lock() + defer a.mu.Unlock() + _, ok := a.cla[cluster] + if !ok { + a.cla[cluster] = cla + return + } + if cla == nil { + return + } + a.cla[cluster] = cla + +} + +// ClusterLoadAssignment returns the assignment for the cluster or nil if there is none. +func (a *assignment) ClusterLoadAssignment(cluster string) *xdspb.ClusterLoadAssignment { + a.mu.RLock() + cla, ok := a.cla[cluster] + a.mu.RUnlock() + if !ok { + return nil + } + return cla +} + +func (a *assignment) clusters() []string { + a.mu.RLock() + defer a.mu.RUnlock() + clusters := make([]string, len(a.cla)) + i := 0 + for k := range a.cla { + clusters[i] = k + i++ + } + return clusters +} + +// Select selects a backend from cluster load assignments, using weighted random selection. It only selects +// backends that are reporting healthy. +func (a *assignment) Select(cluster string) (net.IP, bool) { + cla := a.ClusterLoadAssignment(cluster) + if cla == nil { + return nil, false + } + + total := 0 + healthy := 0 + for _, ep := range cla.Endpoints { + for _, lb := range ep.GetLbEndpoints() { + if lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY { + continue + } + total += int(lb.GetLoadBalancingWeight().GetValue()) + healthy++ + } + } + if healthy == 0 { + return nil, true + } + + if total == 0 { + // all weights are 0, randomly select one of the endpoints. + r := rand.Intn(healthy) + i := 0 + for _, ep := range cla.Endpoints { + for _, lb := range ep.GetLbEndpoints() { + if lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY { + continue + } + if r == i { + return net.ParseIP(lb.GetEndpoint().GetAddress().GetSocketAddress().GetAddress()), true + } + i++ + } + } + return nil, true + } + + r := rand.Intn(total) + 1 + for _, ep := range cla.Endpoints { + for _, lb := range ep.GetLbEndpoints() { + if lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY { + continue + } + r -= int(lb.GetLoadBalancingWeight().GetValue()) + if r <= 0 { + return net.ParseIP(lb.GetEndpoint().GetAddress().GetSocketAddress().GetAddress()), true + } + } + } + return nil, true +} diff --git a/plugin/traffic/xds/client.go b/plugin/traffic/xds/client.go new file mode 100644 index 000000000..5b3d736a5 --- /dev/null +++ b/plugin/traffic/xds/client.go @@ -0,0 +1,227 @@ +/* +This package contains code copied from github.com/grpc/grpc-co. The license for that code is: + +Copyright 2019 gRPC authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package xds implements a bidirectional stream to an envoy ADS management endpoint. It will stream +// updates (CDS and EDS) from there to help load balance responses to DNS clients. +package xds + +import ( + "context" + "fmt" + "net" + "os" + "sync" + "time" + + "github.com/coredns/coredns/coremain" + clog "github.com/coredns/coredns/plugin/pkg/log" + + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" + "github.com/golang/protobuf/ptypes" + structpb "github.com/golang/protobuf/ptypes/struct" + "google.golang.org/grpc" +) + +var log = clog.NewWithPlugin("traffic: xds") + +const ( + cdsURL = "type.googleapis.com/envoy.api.v2.Cluster" + edsURL = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment" +) + +type adsStream adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient + +// Client talks to the grpc manager's endpoint to get load assignments. +type Client struct { + cc *grpc.ClientConn + ctx context.Context + assignments *assignment // assignments contains the current clusters and endpoints. + node *corepb.Node + cancel context.CancelFunc + stop chan struct{} + mu sync.RWMutex + + version map[string]string + nonce map[string]string +} + +// New returns a new client that's dialed to addr using node as the local identifier. +func New(addr, node string, opts ...grpc.DialOption) (*Client, error) { + cc, err := grpc.Dial(addr, opts...) + if err != nil { + return nil, err + } + hostname, _ := os.Hostname() + c := &Client{cc: cc, node: &corepb.Node{Id: node, + Metadata: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "HOSTNAME": { + Kind: &structpb.Value_StringValue{StringValue: hostname}, + }, + }, + }, + BuildVersion: coremain.CoreVersion, + }, + } + c.assignments = &assignment{cla: make(map[string]*xdspb.ClusterLoadAssignment)} + c.version, c.nonce = make(map[string]string), make(map[string]string) + c.ctx, c.cancel = context.WithCancel(context.Background()) + + return c, nil +} + +// Stop stops all goroutines and closes the connection to the upstream manager. +func (c *Client) Stop() error { c.cancel(); return c.cc.Close() } + +// Run starts all goroutines and gathers the clusters and endpoint information from the upstream manager. +func (c *Client) Run() { + for { + select { + case <-c.ctx.Done(): + return + default: + } + + cli := adsgrpc.NewAggregatedDiscoveryServiceClient(c.cc) + stream, err := cli.StreamAggregatedResources(c.ctx) + if err != nil { + log.Debug(err) + time.Sleep(2 * time.Second) // grpc's client.go does more spiffy exp. backoff, do we really need that? + continue + } + + done := make(chan struct{}) + go func() { + if err := c.clusterDiscovery(stream, c.Version(cdsURL), c.Nonce(cdsURL), []string{}); err != nil { + log.Debug(err) + } + tick := time.NewTicker(10 * time.Second) + for { + select { + case <-tick.C: + // send empty list for cluster discovery every 10 seconds + if err := c.clusterDiscovery(stream, c.Version(cdsURL), c.Nonce(cdsURL), []string{}); err != nil { + log.Debug(err) + } + + case <-done: + tick.Stop() + return + } + } + }() + + if err := c.Receive(stream); err != nil { + log.Warning(err) + } + close(done) + } +} + +// clusterDiscovery sends a cluster DiscoveryRequest on the stream. +func (c *Client) clusterDiscovery(stream adsStream, version, nonce string, clusters []string) error { + req := &xdspb.DiscoveryRequest{ + Node: c.node, + TypeUrl: cdsURL, + ResourceNames: clusters, // empty for all + VersionInfo: version, + ResponseNonce: nonce, + } + return stream.Send(req) +} + +// endpointDiscovery sends a endpoint DiscoveryRequest on the stream. +func (c *Client) endpointDiscovery(stream adsStream, version, nonce string, clusters []string) error { + req := &xdspb.DiscoveryRequest{ + Node: c.node, + TypeUrl: edsURL, + ResourceNames: clusters, + VersionInfo: version, + ResponseNonce: nonce, + } + return stream.Send(req) +} + +// Receive receives from the stream, it handled both cluster and endpoint DiscoveryResponses. +func (c *Client) Receive(stream adsStream) error { + for { + resp, err := stream.Recv() + if err != nil { + return err + } + + switch resp.GetTypeUrl() { + case cdsURL: + a := NewAssignment() + for _, r := range resp.GetResources() { + var any ptypes.DynamicAny + if err := ptypes.UnmarshalAny(r, &any); err != nil { + log.Debugf("Failed to unmarshal cluster discovery: %s", err) + continue + } + cluster, ok := any.Message.(*xdspb.Cluster) + if !ok { + continue + } + a.SetClusterLoadAssignment(cluster.GetName(), nil) + } + log.Debugf("Cluster discovery processed with %d resources, version %q and nonce %q, clusters: %v", len(resp.GetResources()), c.Version(cdsURL), c.Nonce(cdsURL), a.clusters()) + // set our local administration and ack the reply. Empty version would signal NACK. + c.SetNonce(cdsURL, resp.GetNonce()) + c.SetVersion(cdsURL, resp.GetVersionInfo()) + c.SetAssignments(a) + c.clusterDiscovery(stream, resp.GetVersionInfo(), resp.GetNonce(), a.clusters()) + + // now kick off discovery for endpoints + if err := c.endpointDiscovery(stream, c.Version(edsURL), c.Nonce(edsURL), a.clusters()); err != nil { + log.Debug(err) + } + case edsURL: + for _, r := range resp.GetResources() { + var any ptypes.DynamicAny + if err := ptypes.UnmarshalAny(r, &any); err != nil { + log.Debugf("Failed to unmarshal endpoint discovery: %s", err) + continue + } + cla, ok := any.Message.(*xdspb.ClusterLoadAssignment) + if !ok { + continue + } + c.assignments.SetClusterLoadAssignment(cla.GetClusterName(), cla) + } + log.Debugf("Endpoint discovery processed with %d resources, version %q and nonce %q, clusters: %v", len(resp.GetResources()), c.Version(edsURL), c.Nonce(edsURL), c.assignments.clusters()) + // set our local administration and ack the reply. Empty version would signal NACK. + c.SetNonce(edsURL, resp.GetNonce()) + c.SetVersion(edsURL, resp.GetVersionInfo()) + + default: + return fmt.Errorf("unknown response URL for discovery: %q", resp.GetTypeUrl()) + } + } +} + +// Select returns an address that is deemed to be the correct one for this cluster. The returned +// boolean indicates if the cluster exists. +func (c *Client) Select(cluster string) (net.IP, bool) { + if cluster == "" { + return nil, false + } + return c.assignments.Select(cluster) +} diff --git a/plugin/traffic/xds/fields.go b/plugin/traffic/xds/fields.go new file mode 100644 index 000000000..9dac60cbe --- /dev/null +++ b/plugin/traffic/xds/fields.go @@ -0,0 +1,37 @@ +package xds + +func (c *Client) Assignments() *assignment { + c.mu.RLock() + defer c.mu.RUnlock() + return c.assignments +} + +func (c *Client) SetAssignments(a *assignment) { + c.mu.Lock() + defer c.mu.Unlock() + c.assignments = a +} + +func (c *Client) Version(typeURL string) string { + c.mu.RLock() + defer c.mu.RUnlock() + return c.version[typeURL] +} + +func (c *Client) SetVersion(typeURL, a string) { + c.mu.Lock() + defer c.mu.Unlock() + c.version[typeURL] = a +} + +func (c *Client) Nonce(typeURL string) string { + c.mu.RLock() + defer c.mu.RUnlock() + return c.nonce[typeURL] +} + +func (c *Client) SetNonce(typeURL, n string) { + c.mu.Lock() + defer c.mu.Unlock() + c.nonce[typeURL] = n +}