From 9433da1a67d75dccb63502b0c20e9b9c7bf8be6e Mon Sep 17 00:00:00 2001 From: Miek Gieben Date: Sat, 5 Oct 2019 11:45:45 +0100 Subject: [PATCH] Add new plugin: traffic Traffic is a plugin that communicates via the xDS protocol to an Envoy control plane. Using the data from this control plane it hands out IP addresses. This allows you (via controlling the data in the control plane) to drain or send more traffic to specific endpoints. The plugin itself only acts upon this data; it doesn't do anything fancy by itself. Code used here is copied from grpc-go and other places, this is clearly marked in the source files. Signed-off-by: Miek Gieben --- core/dnsserver/zdirectives.go | 1 + core/plugin/zplugin.go | 1 + go.mod | 2 + go.sum | 2 + plugin.cfg | 1 + plugin/traffic/HACKING.md | 58 ++++++++ plugin/traffic/README.md | 129 ++++++++++++++++++ plugin/traffic/setup.go | 122 +++++++++++++++++ plugin/traffic/setup_test.go | 53 ++++++++ plugin/traffic/traffic.go | 93 +++++++++++++ plugin/traffic/traffic_test.go | 130 ++++++++++++++++++ plugin/traffic/xds/assignment.go | 116 ++++++++++++++++ plugin/traffic/xds/client.go | 227 +++++++++++++++++++++++++++++++ plugin/traffic/xds/fields.go | 37 +++++ 14 files changed, 972 insertions(+) create mode 100644 plugin/traffic/HACKING.md create mode 100644 plugin/traffic/README.md create mode 100644 plugin/traffic/setup.go create mode 100644 plugin/traffic/setup_test.go create mode 100644 plugin/traffic/traffic.go create mode 100644 plugin/traffic/traffic_test.go create mode 100644 plugin/traffic/xds/assignment.go create mode 100644 plugin/traffic/xds/client.go create mode 100644 plugin/traffic/xds/fields.go diff --git a/core/dnsserver/zdirectives.go b/core/dnsserver/zdirectives.go index 61d96c633..08b36475c 100644 --- a/core/dnsserver/zdirectives.go +++ b/core/dnsserver/zdirectives.go @@ -30,6 +30,7 @@ var Directives = []string{ "acl", "any", "chaos", + "traffic", "loadbalance", "cache", "rewrite", diff --git a/core/plugin/zplugin.go b/core/plugin/zplugin.go index 90267f29b..374d0b8fe 100644 --- a/core/plugin/zplugin.go +++ b/core/plugin/zplugin.go @@ -46,6 +46,7 @@ import ( _ "github.com/coredns/coredns/plugin/template" _ "github.com/coredns/coredns/plugin/tls" _ "github.com/coredns/coredns/plugin/trace" + _ "github.com/coredns/coredns/plugin/traffic" _ "github.com/coredns/coredns/plugin/transfer" _ "github.com/coredns/coredns/plugin/whoami" _ "github.com/coredns/federation" diff --git a/go.mod b/go.mod index 7e4178785..60b97b1e8 100644 --- a/go.mod +++ b/go.mod @@ -16,9 +16,11 @@ require ( github.com/coreos/go-systemd v0.0.0-20190212144455-93d5ec2c7f76 // indirect github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect github.com/dnstap/golang-dnstap v0.0.0-20170829151710-2cf77a2b5e11 + github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473 github.com/farsightsec/golang-framestream v0.0.0-20181102145529-8a0cb8ba8710 github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef // indirect github.com/golang/protobuf v1.3.2 + github.com/google/go-cmp v0.3.1 github.com/googleapis/gnostic v0.2.0 // indirect github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 github.com/imdario/mergo v0.3.7 // indirect diff --git a/go.sum b/go.sum index 0bcb3c4ab..be128c24a 100644 --- a/go.sum +++ b/go.sum @@ -124,7 +124,9 @@ github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473 h1:4cmBvAEBNJaGARUEs3/suWRyfyBfhf7I60WBZq+bv2w= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/evanphx/json-patch v4.2.0+incompatible h1:fUDGZCv/7iAN7u0puUVhvKCcsR6vRfwrJatElLBEf0I= github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= diff --git a/plugin.cfg b/plugin.cfg index 3f3dd85fd..f8e00a4e2 100644 --- a/plugin.cfg +++ b/plugin.cfg @@ -39,6 +39,7 @@ dnstap:dnstap acl:acl any:any chaos:chaos +traffic:traffic loadbalance:loadbalance cache:cache rewrite:rewrite diff --git a/plugin/traffic/HACKING.md b/plugin/traffic/HACKING.md new file mode 100644 index 000000000..5fd2faf0d --- /dev/null +++ b/plugin/traffic/HACKING.md @@ -0,0 +1,58 @@ +# Hacking on *traffic* + +Repos used: + + +: implements control plane, has testing stuff in pkg/test/main (iirc). + + +: implements client for xDS - much of this code has been reused here. + +I found these website useful while working on this. + +* https://github.com/envoyproxy/envoy/blob/master/api/API_OVERVIEW.md +* https://github.com/envoyproxy/learnenvoy/blob/master/_articles/service-discovery.md +* This was *really* helpful: https://www.envoyproxy.io/docs/envoy/v1.11.2/api-docs/xds_protocol to + show the flow of the protocol. + +# Testing + +Assuming you have envoyproxy/go-control-plane checked out somewhere, then: + +~~~ sh +% cd ~/src/github.com/envoyproxy/go-control-plane/pkg/test/main +% go build +% ./main --xds=ads --runtimes=2 -debug +~~~ + +This runs a binary from pkg/test/main. Now we're testing aDS. Everything is using gRPC with TLS +disabled: `grpc.WithInsecure()`. The test binary runs on port 18000 on localhost; all these things +are currently hardcoded in the *traffic* plugin. This will be factored out into config as some +point. Another thing that is hardcoded is the use of the "example.org" domain. + +Then for CoreDNS, check out the `traffic` branch, create a Corefile: + +~~~ Corefile +example.org { + traffic grpc://127.0.0.1:18000 { + id test-id + } + debug +} +~~~ + +Start CoreDNS (`coredns -conf Corefile -dns.port=1053`), and see logging/debugging flow by; the +test binary should also spew out a bunch of things. CoreDNS willl build up a list of cluster and +endpoints. Next you can query it: + +~~~ sh +% dig @localhost -p 1053 cluster-v0-0.example.org A +;; QUESTION SECTION: +;cluster-v0-0.example.org. IN A + +;; ANSWER SECTION: +cluster-v0-0.example.org. 5 IN A 127.0.0.1 +~~~ + +Note: the xds/test binary is a go-control-plane binary with added debugging that I'm using for +testing. diff --git a/plugin/traffic/README.md b/plugin/traffic/README.md new file mode 100644 index 000000000..94f0a99bc --- /dev/null +++ b/plugin/traffic/README.md @@ -0,0 +1,129 @@ +# traffic + +## Name + +*traffic* - handout addresses according to assignments from Envoy's xDS. + +## Description + +The *traffic* plugin is a balancer that allows traffic steering, weighted responses +and draining of clusters. The cluster information is retrieved from a service +discovery manager that implements the service discovery protocols that Envoy +[implements](https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol). + +A Cluster is defined as: "A group of logically similar endpoints that Envoy connects to." Each +cluster has a name, which *traffic* extends to be a domain name. See "Naming Clusters" below. + +The use case for this plugin is when a cluster has endpoints running in multiple (Kubernetes?) +clusters and you need to steer traffic to (or away) from these endpoints, i.e. endpoint A needs to +be upgraded, so all traffic to it is drained. Or the entire Kubernetes needs to upgraded, and *all* +endpoints need to be drained from it. + +*Traffic* discovers the endpoints via Envoy's xDS protocol. Endpoints and clusters are discovered +every 10 seconds. The plugin hands out responses that adhere to these assignments. Only endpoints +that are *healthy* are handed out. + +Each DNS response contains a single IP address that's considered the best one. *Traffic* will load +balance A and AAAA queries. The TTL on these answer is set to 5s. It will only return successful +responses either with an answer or otherwise a NODATA response. Queries for non-existent clusters +get a NXDOMAIN. + +The *traffic* plugin has no notion of draining, drop overload and anything that advanced, *it just +acts upon assignments*. This is means that if a endpoint goes down and *traffic* has not seen a new +assignment yet, it will still include this endpoint address in responses. + +## Syntax + +~~~ +traffic TO... +~~~ + +This enabled the *traffic* plugin, with a default node id of `coredns` and no TLS. + +* **TO...** are the Envoy control plane endpoint to connect to. This must start with `grpc://`. + +The extended syntax is available is you want more control. + +~~~ +traffic TO... { + server SERVER [SERVER]... + node ID + tls CERT KEY CA + tls_servername NAME +} +~~~ + +* node **ID** is how *traffic* identifies itself to the control plane. This defaults to `coredns`. +* `tls` **CERT** **KEY** **CA** define the TLS properties for gRPC connection. If this is omitted an + insecure connection is attempted. From 0 to 3 arguments can be provided with the meaning as described below + + * `tls` - no client authentication is used, and the system CAs are used to verify the server certificate + * `tls` **CA** - no client authentication is used, and the file CA is used to verify the server certificate + * `tls` **CERT** **KEY** - client authentication is used with the specified cert/key pair. + The server certificate is verified with the system CAs. + * `tls` **CERT** **KEY** **CA** - client authentication is used with the specified cert/key pair. + The server certificate is verified using the specified CA file. + +* `tls_servername` **NAME** allows you to set a server name in the TLS configuration. This is needed + because *traffic* connects to an IP address, so it can't infer the server name from it. + +## Naming Clusters + +When a cluster is named this usually consists out of a single word, i.e. "cluster-v0", or "web". +The *traffic* plugins uses the name(s) specified in the Server Block to create fully qualified +domain names. For example if the Server Block specifies `lb.example.org` as one of the names, +and "cluster-v0" is one of the load balanced cluster, *traffic* will respond to query asking for +`cluster-v0.lb.example.org.` and the same goes for `web`; `web.lb.example.org`. + +## Metrics + +What metrics should we do? If any? Number of clusters? Number of endpoints and health? + +## Ready + +Should this plugin implement readiness? + +## Examples + +~~~ +lb.example.org { + traffic grpc://127.0.0.1:18000 { + node test-id + } + debug + log +} +~~~ + +This will load balance any names under `lb.example.org` using the data from the manager running on +localhost on port 18000. The node ID will be `test-id` and no TLS will be used. + +## Also See + +The following documents provide some background on Envoy's control plane. + + * + + * + + * + +## Bugs + +Priority and locality information from ClusterLoadAssignments is not used. + +Load reporting via xDS is not supported; this can be implemented, but there are some things that +make this difficult. A single (DNS) query is done by a resolver. Behind this resolver there may be +many clients that will use this reply, the responding server (CoreDNS) has no idea how many clients +use this resolver. So reporting a load of +1 on the CoreDNS side can be anything from 1 to 1000+, +making the load reporting highly inaccurate. + +Multiple **TO** addresses is not implemented. + +## TODO + +* metrics? +* more and better testing +* credentials (other than TLS) - how/what? +* is the protocol correctly implemented? Should we not have a 10s tick, but wait for responses from + the control plane? diff --git a/plugin/traffic/setup.go b/plugin/traffic/setup.go new file mode 100644 index 000000000..bfd9683a4 --- /dev/null +++ b/plugin/traffic/setup.go @@ -0,0 +1,122 @@ +package traffic + +import ( + "crypto/tls" + "fmt" + "math/rand" + "strings" + "time" + + "github.com/coredns/coredns/core/dnsserver" + "github.com/coredns/coredns/plugin" + clog "github.com/coredns/coredns/plugin/pkg/log" + "github.com/coredns/coredns/plugin/pkg/parse" + pkgtls "github.com/coredns/coredns/plugin/pkg/tls" + "github.com/coredns/coredns/plugin/pkg/transport" + "github.com/coredns/coredns/plugin/traffic/xds" + + "github.com/caddyserver/caddy" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +var log = clog.NewWithPlugin("traffic") + +func init() { plugin.Register("traffic", setup) } + +func setup(c *caddy.Controller) error { + rand.Seed(int64(time.Now().Nanosecond())) + t, err := parseTraffic(c) + if err != nil { + return plugin.Error("traffic", err) + } + + dnsserver.GetConfig(c).AddPlugin(func(next plugin.Handler) plugin.Handler { + t.Next = next + return t + }) + + c.OnStartup(func() error { + go t.c.Run() + return nil + }) + c.OnShutdown(func() error { return t.c.Stop() }) + return nil +} + +func parseTraffic(c *caddy.Controller) (*Traffic, error) { + node := "coredns" + toHosts := []string{} + t := &Traffic{} + var ( + err error + tlsConfig *tls.Config + tlsServerName string + ) + + t.origins = make([]string, len(c.ServerBlockKeys)) + for i := range c.ServerBlockKeys { + t.origins[i] = plugin.Host(c.ServerBlockKeys[i]).Normalize() + } + + for c.Next() { + args := c.RemainingArgs() + if len(args) < 1 { + return nil, c.ArgErr() + } + toHosts, err = parse.HostPortOrFile(args...) + if err != nil { + return nil, err + } + for i := range toHosts { + if !strings.HasPrefix(toHosts[i], transport.GRPC+"://") { + return nil, fmt.Errorf("not a %s scheme: %s", transport.GRPC, toHosts[i]) + } + // now cut the prefix off again, because the dialler needs to see normal address strings. All this + // grpc:// stuff is to enforce uniform across plugins and future proofing for other protocols. + toHosts[i] = toHosts[i][len(transport.GRPC+"://"):] + } + for c.NextBlock() { + switch c.Val() { + case "id": + args := c.RemainingArgs() + if len(args) != 1 { + return nil, c.ArgErr() + } + node = args[0] + case "tls": + args := c.RemainingArgs() + if len(args) > 3 { + return nil, c.ArgErr() + } + + tlsConfig, err = pkgtls.NewTLSConfigFromArgs(args...) + if err != nil { + return nil, err + } + case "tls_servername": + if !c.NextArg() { + return nil, c.ArgErr() + } + tlsServerName = c.Val() + default: + return nil, c.Errf("unknown property '%s'", c.Val()) + } + } + } + + opts := []grpc.DialOption{grpc.WithInsecure()} + if tlsConfig != nil { + if tlsServerName != "" { + tlsConfig.ServerName = tlsServerName + } + opts = []grpc.DialOption{grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))} + } + + // TODO: only the first host is used, need to figure out how to reconcile multiple upstream providers. + if t.c, err = xds.New(toHosts[0], node, opts...); err != nil { + return nil, err + } + + return t, nil +} diff --git a/plugin/traffic/setup_test.go b/plugin/traffic/setup_test.go new file mode 100644 index 000000000..99403fbcd --- /dev/null +++ b/plugin/traffic/setup_test.go @@ -0,0 +1,53 @@ +package traffic + +import ( + "testing" + + "github.com/caddyserver/caddy" +) + +func TestSetup(t *testing.T) { + /* + c := caddy.NewTestController("dns", `traffic grpc://bla`) + if err := setup(c); err != nil { + t.Fatalf("Test 1, expected no errors, but got: %q", err) + } + */ +} + +func TestParseTraffic(t *testing.T) { + tests := []struct { + input string + shouldErr bool + }{ + // ok + {`traffic grpc://127.0.0.1:18000 { + id test-id + }`, false}, + + // fail + {`traffic`, true}, + {`traffic tls://1.1.1.1`, true}, + {`traffic { + id bla bla + }`, true}, + {`traffic { + node + }`, true}, + } + for i, test := range tests { + c := caddy.NewTestController("dns", test.input) + _, err := parseTraffic(c) + if test.shouldErr && err == nil { + t.Errorf("Test %v: Expected error but found nil", i) + continue + } else if !test.shouldErr && err != nil { + t.Errorf("Test %v: Expected no error but found error: %v", i, err) + continue + } + + if test.shouldErr { + continue + } + } +} diff --git a/plugin/traffic/traffic.go b/plugin/traffic/traffic.go new file mode 100644 index 000000000..0090bd780 --- /dev/null +++ b/plugin/traffic/traffic.go @@ -0,0 +1,93 @@ +package traffic + +import ( + "context" + "strings" + "time" + + "github.com/coredns/coredns/plugin" + "github.com/coredns/coredns/plugin/pkg/dnsutil" + "github.com/coredns/coredns/plugin/traffic/xds" + "github.com/coredns/coredns/request" + + "github.com/miekg/dns" +) + +// Traffic is a plugin that load balances according to assignments. +type Traffic struct { + c *xds.Client + id string + origins []string + + Next plugin.Handler +} + +// ServeDNS implements the plugin.Handler interface. +func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) { + state := request.Request{Req: r, W: w} + + cluster := "" + for _, o := range t.origins { + if strings.HasSuffix(state.Name(), o) { + cluster, _ = dnsutil.TrimZone(state.Name(), o) + state.Zone = o + break + } + } + m := new(dns.Msg) + m.SetReply(r) + m.Authoritative = true + + addr, ok := t.c.Select(cluster) + if !ok { + m.Ns = soa(state.Zone) + m.Rcode = dns.RcodeNameError + w.WriteMsg(m) + return 0, nil + } + + if addr == nil { + log.Debugf("No (healthy) endpoints found for %q", cluster) + m.Ns = soa(state.Zone) + w.WriteMsg(m) + return 0, nil + } + + switch state.QType() { + case dns.TypeA: + if addr.To4() == nil { // it's an IPv6 address, return nodata in that case. + m.Ns = soa(state.Zone) + break + } + m.Answer = []dns.RR{&dns.A{Hdr: dns.RR_Header{Name: state.QName(), Rrtype: dns.TypeA, Class: dns.ClassINET, Ttl: 5}, A: addr}} + + case dns.TypeAAAA: + if addr.To4() != nil { // it's an IPv4 address, return nodata in that case. + m.Ns = soa(state.Zone) + break + } + m.Answer = []dns.RR{&dns.AAAA{Hdr: dns.RR_Header{Name: state.QName(), Rrtype: dns.TypeA, Class: dns.ClassINET, Ttl: 5}, AAAA: addr}} + default: + m.Ns = soa(state.Zone) + } + + w.WriteMsg(m) + return 0, nil +} + +// soa returns a synthetic so for this zone. +func soa(z string) []dns.RR { + return []dns.RR{&dns.SOA{ + Hdr: dns.RR_Header{Name: z, Rrtype: dns.TypeSOA, Class: dns.ClassINET, Ttl: 5}, + Ns: dnsutil.Join("ns", z), + Mbox: dnsutil.Join("coredns", z), + Serial: uint32(time.Now().UTC().Unix()), + Refresh: 14400, + Retry: 3600, + Expire: 604800, + Minttl: 5, + }} +} + +// Name implements the plugin.Handler interface. +func (t *Traffic) Name() string { return "traffic" } diff --git a/plugin/traffic/traffic_test.go b/plugin/traffic/traffic_test.go new file mode 100644 index 000000000..5db7cc451 --- /dev/null +++ b/plugin/traffic/traffic_test.go @@ -0,0 +1,130 @@ +package traffic + +import ( + "context" + "testing" + + "github.com/coredns/coredns/plugin/pkg/dnstest" + "github.com/coredns/coredns/plugin/pkg/dnsutil" + "github.com/coredns/coredns/plugin/test" + "github.com/coredns/coredns/plugin/traffic/xds" + + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" + "github.com/miekg/dns" + "google.golang.org/grpc" +) + +func TestTraffic(t *testing.T) { + c, err := xds.New("127.0.0.1:0", "test-id", grpc.WithInsecure()) + if err != nil { + t.Fatal(err) + } + tr := &Traffic{c: c, origins: []string{"lb.example.org."}} + + tests := []struct { + cla *xdspb.ClusterLoadAssignment + cluster string + qtype uint16 + rcode int + answer string // address value of the A/AAAA record. + ns bool // should there be a ns section. + }{ + { + cla: &xdspb.ClusterLoadAssignment{}, + cluster: "web", qtype: dns.TypeA, rcode: dns.RcodeSuccess, ns: true, + }, + { + cla: &xdspb.ClusterLoadAssignment{}, + cluster: "web", qtype: dns.TypeSRV, rcode: dns.RcodeSuccess, ns: true, + }, + { + cla: &xdspb.ClusterLoadAssignment{}, + cluster: "does-not-exist", qtype: dns.TypeA, rcode: dns.RcodeNameError, ns: true}, + { + cla: &xdspb.ClusterLoadAssignment{ + ClusterName: "web", + Endpoints: endpoints([]EndpointHealth{{"127.0.0.1", corepb.HealthStatus_HEALTHY}}), + }, + cluster: "web", qtype: dns.TypeA, rcode: dns.RcodeSuccess, answer: "127.0.0.1", + }, + { + cla: &xdspb.ClusterLoadAssignment{ + ClusterName: "web", + Endpoints: endpoints([]EndpointHealth{{"127.0.0.1", corepb.HealthStatus_UNKNOWN}}), + }, + cluster: "web", qtype: dns.TypeA, rcode: dns.RcodeSuccess, ns: true, + }, + } + + ctx := context.TODO() + + for i, tc := range tests { + a := xds.NewAssignment() + a.SetClusterLoadAssignment("web", tc.cla) // web is our cluster + c.SetAssignments(a) + + m := new(dns.Msg) + cl := dnsutil.Join(tc.cluster, tr.origins[0]) + m.SetQuestion(cl, tc.qtype) + + rec := dnstest.NewRecorder(&test.ResponseWriter{}) + _, err := tr.ServeDNS(ctx, rec, m) + if err != nil { + t.Errorf("Test %d: Expected no error, but got %q", i, err) + } + if rec.Msg.Rcode != tc.rcode { + t.Errorf("Test %d: Expected no rcode %d, but got %d", i, tc.rcode, rec.Msg.Rcode) + } + if tc.ns && len(rec.Msg.Ns) == 0 { + t.Errorf("Test %d: Expected authority section, but got none", i) + } + if tc.answer != "" && len(rec.Msg.Answer) == 0 { + t.Fatalf("Test %d: Expected answer section, but got none", i) + } + if tc.answer != "" { + record := rec.Msg.Answer[0] + addr := "" + switch x := record.(type) { + case *dns.A: + addr = x.A.String() + case *dns.AAAA: + addr = x.AAAA.String() + } + if tc.answer != addr { + t.Errorf("Test %d: Expected answer %s, but got %s", i, tc.answer, addr) + } + + } + + } +} + +type EndpointHealth struct { + Address string + Health corepb.HealthStatus +} + +func endpoints(e []EndpointHealth) []*endpointpb.LocalityLbEndpoints { + ep := make([]*endpointpb.LocalityLbEndpoints, len(e)) + for i := range e { + ep[i] = &endpointpb.LocalityLbEndpoints{ + LbEndpoints: []*endpointpb.LbEndpoint{{ + HostIdentifier: &endpointpb.LbEndpoint_Endpoint{ + Endpoint: &endpointpb.Endpoint{ + Address: &corepb.Address{ + Address: &corepb.Address_SocketAddress{ + SocketAddress: &corepb.SocketAddress{ + Address: e[i].Address, + }, + }, + }, + }, + }, + HealthStatus: e[i].Health, + }}, + } + } + return ep +} diff --git a/plugin/traffic/xds/assignment.go b/plugin/traffic/xds/assignment.go new file mode 100644 index 000000000..c8d1a5798 --- /dev/null +++ b/plugin/traffic/xds/assignment.go @@ -0,0 +1,116 @@ +package xds + +import ( + "math/rand" + "net" + "sync" + + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" +) + +type assignment struct { + mu sync.RWMutex + cla map[string]*xdspb.ClusterLoadAssignment +} + +// NewAssignment returns a pointer to an assignment. +func NewAssignment() *assignment { + return &assignment{cla: make(map[string]*xdspb.ClusterLoadAssignment)} +} + +// SetClusterLoadAssignment sets the assignment for the cluster to cla. +func (a *assignment) SetClusterLoadAssignment(cluster string, cla *xdspb.ClusterLoadAssignment) { + // If cla is nil we just found a cluster, check if we already know about it, or if we need to make a new entry. + a.mu.Lock() + defer a.mu.Unlock() + _, ok := a.cla[cluster] + if !ok { + a.cla[cluster] = cla + return + } + if cla == nil { + return + } + a.cla[cluster] = cla + +} + +// ClusterLoadAssignment returns the assignment for the cluster or nil if there is none. +func (a *assignment) ClusterLoadAssignment(cluster string) *xdspb.ClusterLoadAssignment { + a.mu.RLock() + cla, ok := a.cla[cluster] + a.mu.RUnlock() + if !ok { + return nil + } + return cla +} + +func (a *assignment) clusters() []string { + a.mu.RLock() + defer a.mu.RUnlock() + clusters := make([]string, len(a.cla)) + i := 0 + for k := range a.cla { + clusters[i] = k + i++ + } + return clusters +} + +// Select selects a backend from cluster load assignments, using weighted random selection. It only selects +// backends that are reporting healthy. +func (a *assignment) Select(cluster string) (net.IP, bool) { + cla := a.ClusterLoadAssignment(cluster) + if cla == nil { + return nil, false + } + + total := 0 + healthy := 0 + for _, ep := range cla.Endpoints { + for _, lb := range ep.GetLbEndpoints() { + if lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY { + continue + } + total += int(lb.GetLoadBalancingWeight().GetValue()) + healthy++ + } + } + if healthy == 0 { + return nil, true + } + + if total == 0 { + // all weights are 0, randomly select one of the endpoints. + r := rand.Intn(healthy) + i := 0 + for _, ep := range cla.Endpoints { + for _, lb := range ep.GetLbEndpoints() { + if lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY { + continue + } + if r == i { + return net.ParseIP(lb.GetEndpoint().GetAddress().GetSocketAddress().GetAddress()), true + } + i++ + } + } + return nil, true + } + + r := rand.Intn(total) + 1 + for _, ep := range cla.Endpoints { + for _, lb := range ep.GetLbEndpoints() { + if lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY { + continue + } + r -= int(lb.GetLoadBalancingWeight().GetValue()) + if r <= 0 { + return net.ParseIP(lb.GetEndpoint().GetAddress().GetSocketAddress().GetAddress()), true + } + } + } + return nil, true +} diff --git a/plugin/traffic/xds/client.go b/plugin/traffic/xds/client.go new file mode 100644 index 000000000..5b3d736a5 --- /dev/null +++ b/plugin/traffic/xds/client.go @@ -0,0 +1,227 @@ +/* +This package contains code copied from github.com/grpc/grpc-co. The license for that code is: + +Copyright 2019 gRPC authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package xds implements a bidirectional stream to an envoy ADS management endpoint. It will stream +// updates (CDS and EDS) from there to help load balance responses to DNS clients. +package xds + +import ( + "context" + "fmt" + "net" + "os" + "sync" + "time" + + "github.com/coredns/coredns/coremain" + clog "github.com/coredns/coredns/plugin/pkg/log" + + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" + "github.com/golang/protobuf/ptypes" + structpb "github.com/golang/protobuf/ptypes/struct" + "google.golang.org/grpc" +) + +var log = clog.NewWithPlugin("traffic: xds") + +const ( + cdsURL = "type.googleapis.com/envoy.api.v2.Cluster" + edsURL = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment" +) + +type adsStream adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient + +// Client talks to the grpc manager's endpoint to get load assignments. +type Client struct { + cc *grpc.ClientConn + ctx context.Context + assignments *assignment // assignments contains the current clusters and endpoints. + node *corepb.Node + cancel context.CancelFunc + stop chan struct{} + mu sync.RWMutex + + version map[string]string + nonce map[string]string +} + +// New returns a new client that's dialed to addr using node as the local identifier. +func New(addr, node string, opts ...grpc.DialOption) (*Client, error) { + cc, err := grpc.Dial(addr, opts...) + if err != nil { + return nil, err + } + hostname, _ := os.Hostname() + c := &Client{cc: cc, node: &corepb.Node{Id: node, + Metadata: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "HOSTNAME": { + Kind: &structpb.Value_StringValue{StringValue: hostname}, + }, + }, + }, + BuildVersion: coremain.CoreVersion, + }, + } + c.assignments = &assignment{cla: make(map[string]*xdspb.ClusterLoadAssignment)} + c.version, c.nonce = make(map[string]string), make(map[string]string) + c.ctx, c.cancel = context.WithCancel(context.Background()) + + return c, nil +} + +// Stop stops all goroutines and closes the connection to the upstream manager. +func (c *Client) Stop() error { c.cancel(); return c.cc.Close() } + +// Run starts all goroutines and gathers the clusters and endpoint information from the upstream manager. +func (c *Client) Run() { + for { + select { + case <-c.ctx.Done(): + return + default: + } + + cli := adsgrpc.NewAggregatedDiscoveryServiceClient(c.cc) + stream, err := cli.StreamAggregatedResources(c.ctx) + if err != nil { + log.Debug(err) + time.Sleep(2 * time.Second) // grpc's client.go does more spiffy exp. backoff, do we really need that? + continue + } + + done := make(chan struct{}) + go func() { + if err := c.clusterDiscovery(stream, c.Version(cdsURL), c.Nonce(cdsURL), []string{}); err != nil { + log.Debug(err) + } + tick := time.NewTicker(10 * time.Second) + for { + select { + case <-tick.C: + // send empty list for cluster discovery every 10 seconds + if err := c.clusterDiscovery(stream, c.Version(cdsURL), c.Nonce(cdsURL), []string{}); err != nil { + log.Debug(err) + } + + case <-done: + tick.Stop() + return + } + } + }() + + if err := c.Receive(stream); err != nil { + log.Warning(err) + } + close(done) + } +} + +// clusterDiscovery sends a cluster DiscoveryRequest on the stream. +func (c *Client) clusterDiscovery(stream adsStream, version, nonce string, clusters []string) error { + req := &xdspb.DiscoveryRequest{ + Node: c.node, + TypeUrl: cdsURL, + ResourceNames: clusters, // empty for all + VersionInfo: version, + ResponseNonce: nonce, + } + return stream.Send(req) +} + +// endpointDiscovery sends a endpoint DiscoveryRequest on the stream. +func (c *Client) endpointDiscovery(stream adsStream, version, nonce string, clusters []string) error { + req := &xdspb.DiscoveryRequest{ + Node: c.node, + TypeUrl: edsURL, + ResourceNames: clusters, + VersionInfo: version, + ResponseNonce: nonce, + } + return stream.Send(req) +} + +// Receive receives from the stream, it handled both cluster and endpoint DiscoveryResponses. +func (c *Client) Receive(stream adsStream) error { + for { + resp, err := stream.Recv() + if err != nil { + return err + } + + switch resp.GetTypeUrl() { + case cdsURL: + a := NewAssignment() + for _, r := range resp.GetResources() { + var any ptypes.DynamicAny + if err := ptypes.UnmarshalAny(r, &any); err != nil { + log.Debugf("Failed to unmarshal cluster discovery: %s", err) + continue + } + cluster, ok := any.Message.(*xdspb.Cluster) + if !ok { + continue + } + a.SetClusterLoadAssignment(cluster.GetName(), nil) + } + log.Debugf("Cluster discovery processed with %d resources, version %q and nonce %q, clusters: %v", len(resp.GetResources()), c.Version(cdsURL), c.Nonce(cdsURL), a.clusters()) + // set our local administration and ack the reply. Empty version would signal NACK. + c.SetNonce(cdsURL, resp.GetNonce()) + c.SetVersion(cdsURL, resp.GetVersionInfo()) + c.SetAssignments(a) + c.clusterDiscovery(stream, resp.GetVersionInfo(), resp.GetNonce(), a.clusters()) + + // now kick off discovery for endpoints + if err := c.endpointDiscovery(stream, c.Version(edsURL), c.Nonce(edsURL), a.clusters()); err != nil { + log.Debug(err) + } + case edsURL: + for _, r := range resp.GetResources() { + var any ptypes.DynamicAny + if err := ptypes.UnmarshalAny(r, &any); err != nil { + log.Debugf("Failed to unmarshal endpoint discovery: %s", err) + continue + } + cla, ok := any.Message.(*xdspb.ClusterLoadAssignment) + if !ok { + continue + } + c.assignments.SetClusterLoadAssignment(cla.GetClusterName(), cla) + } + log.Debugf("Endpoint discovery processed with %d resources, version %q and nonce %q, clusters: %v", len(resp.GetResources()), c.Version(edsURL), c.Nonce(edsURL), c.assignments.clusters()) + // set our local administration and ack the reply. Empty version would signal NACK. + c.SetNonce(edsURL, resp.GetNonce()) + c.SetVersion(edsURL, resp.GetVersionInfo()) + + default: + return fmt.Errorf("unknown response URL for discovery: %q", resp.GetTypeUrl()) + } + } +} + +// Select returns an address that is deemed to be the correct one for this cluster. The returned +// boolean indicates if the cluster exists. +func (c *Client) Select(cluster string) (net.IP, bool) { + if cluster == "" { + return nil, false + } + return c.assignments.Select(cluster) +} diff --git a/plugin/traffic/xds/fields.go b/plugin/traffic/xds/fields.go new file mode 100644 index 000000000..9dac60cbe --- /dev/null +++ b/plugin/traffic/xds/fields.go @@ -0,0 +1,37 @@ +package xds + +func (c *Client) Assignments() *assignment { + c.mu.RLock() + defer c.mu.RUnlock() + return c.assignments +} + +func (c *Client) SetAssignments(a *assignment) { + c.mu.Lock() + defer c.mu.Unlock() + c.assignments = a +} + +func (c *Client) Version(typeURL string) string { + c.mu.RLock() + defer c.mu.RUnlock() + return c.version[typeURL] +} + +func (c *Client) SetVersion(typeURL, a string) { + c.mu.Lock() + defer c.mu.Unlock() + c.version[typeURL] = a +} + +func (c *Client) Nonce(typeURL string) string { + c.mu.RLock() + defer c.mu.RUnlock() + return c.nonce[typeURL] +} + +func (c *Client) SetNonce(typeURL, n string) { + c.mu.Lock() + defer c.mu.Unlock() + c.nonce[typeURL] = n +}