Add new plugin: traffic

Traffic is a plugin that communicates via the xDS protocol to an Envoy
control plane. Using the data from this control plane it hands out IP
addresses. This allows you (via controlling the data in the control
plane) to drain or send more traffic to specific endpoints.

The plugin itself only acts upon this data; it doesn't do anything fancy
by itself.

Code used here is copied from grpc-go and other places, this is clearly
marked in the source files.

Signed-off-by: Miek Gieben <miek@miek.nl>
This commit is contained in:
Miek Gieben
2019-10-05 11:45:45 +01:00
parent 5f159ca464
commit 9433da1a67
14 changed files with 972 additions and 0 deletions

View File

@@ -30,6 +30,7 @@ var Directives = []string{
"acl",
"any",
"chaos",
"traffic",
"loadbalance",
"cache",
"rewrite",

View File

@@ -46,6 +46,7 @@ import (
_ "github.com/coredns/coredns/plugin/template"
_ "github.com/coredns/coredns/plugin/tls"
_ "github.com/coredns/coredns/plugin/trace"
_ "github.com/coredns/coredns/plugin/traffic"
_ "github.com/coredns/coredns/plugin/transfer"
_ "github.com/coredns/coredns/plugin/whoami"
_ "github.com/coredns/federation"

2
go.mod
View File

@@ -16,9 +16,11 @@ require (
github.com/coreos/go-systemd v0.0.0-20190212144455-93d5ec2c7f76 // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/dnstap/golang-dnstap v0.0.0-20170829151710-2cf77a2b5e11
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473
github.com/farsightsec/golang-framestream v0.0.0-20181102145529-8a0cb8ba8710
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef // indirect
github.com/golang/protobuf v1.3.2
github.com/google/go-cmp v0.3.1
github.com/googleapis/gnostic v0.2.0 // indirect
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
github.com/imdario/mergo v0.3.7 // indirect

2
go.sum
View File

@@ -124,7 +124,9 @@ github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473 h1:4cmBvAEBNJaGARUEs3/suWRyfyBfhf7I60WBZq+bv2w=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/evanphx/json-patch v4.2.0+incompatible h1:fUDGZCv/7iAN7u0puUVhvKCcsR6vRfwrJatElLBEf0I=
github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=

View File

@@ -39,6 +39,7 @@ dnstap:dnstap
acl:acl
any:any
chaos:chaos
traffic:traffic
loadbalance:loadbalance
cache:cache
rewrite:rewrite

58
plugin/traffic/HACKING.md Normal file
View File

@@ -0,0 +1,58 @@
# Hacking on *traffic*
Repos used:
<https://github.com/envoyproxy/go-control-plane>
: implements control plane, has testing stuff in pkg/test/main (iirc).
<https://github.com/grpc/grpc-go/tree/master/xds/internal/client>
: implements client for xDS - much of this code has been reused here.
I found these website useful while working on this.
* https://github.com/envoyproxy/envoy/blob/master/api/API_OVERVIEW.md
* https://github.com/envoyproxy/learnenvoy/blob/master/_articles/service-discovery.md
* This was *really* helpful: https://www.envoyproxy.io/docs/envoy/v1.11.2/api-docs/xds_protocol to
show the flow of the protocol.
# Testing
Assuming you have envoyproxy/go-control-plane checked out somewhere, then:
~~~ sh
% cd ~/src/github.com/envoyproxy/go-control-plane/pkg/test/main
% go build
% ./main --xds=ads --runtimes=2 -debug
~~~
This runs a binary from pkg/test/main. Now we're testing aDS. Everything is using gRPC with TLS
disabled: `grpc.WithInsecure()`. The test binary runs on port 18000 on localhost; all these things
are currently hardcoded in the *traffic* plugin. This will be factored out into config as some
point. Another thing that is hardcoded is the use of the "example.org" domain.
Then for CoreDNS, check out the `traffic` branch, create a Corefile:
~~~ Corefile
example.org {
traffic grpc://127.0.0.1:18000 {
id test-id
}
debug
}
~~~
Start CoreDNS (`coredns -conf Corefile -dns.port=1053`), and see logging/debugging flow by; the
test binary should also spew out a bunch of things. CoreDNS willl build up a list of cluster and
endpoints. Next you can query it:
~~~ sh
% dig @localhost -p 1053 cluster-v0-0.example.org A
;; QUESTION SECTION:
;cluster-v0-0.example.org. IN A
;; ANSWER SECTION:
cluster-v0-0.example.org. 5 IN A 127.0.0.1
~~~
Note: the xds/test binary is a go-control-plane binary with added debugging that I'm using for
testing.

129
plugin/traffic/README.md Normal file
View File

@@ -0,0 +1,129 @@
# traffic
## Name
*traffic* - handout addresses according to assignments from Envoy's xDS.
## Description
The *traffic* plugin is a balancer that allows traffic steering, weighted responses
and draining of clusters. The cluster information is retrieved from a service
discovery manager that implements the service discovery protocols that Envoy
[implements](https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol).
A Cluster is defined as: "A group of logically similar endpoints that Envoy connects to." Each
cluster has a name, which *traffic* extends to be a domain name. See "Naming Clusters" below.
The use case for this plugin is when a cluster has endpoints running in multiple (Kubernetes?)
clusters and you need to steer traffic to (or away) from these endpoints, i.e. endpoint A needs to
be upgraded, so all traffic to it is drained. Or the entire Kubernetes needs to upgraded, and *all*
endpoints need to be drained from it.
*Traffic* discovers the endpoints via Envoy's xDS protocol. Endpoints and clusters are discovered
every 10 seconds. The plugin hands out responses that adhere to these assignments. Only endpoints
that are *healthy* are handed out.
Each DNS response contains a single IP address that's considered the best one. *Traffic* will load
balance A and AAAA queries. The TTL on these answer is set to 5s. It will only return successful
responses either with an answer or otherwise a NODATA response. Queries for non-existent clusters
get a NXDOMAIN.
The *traffic* plugin has no notion of draining, drop overload and anything that advanced, *it just
acts upon assignments*. This is means that if a endpoint goes down and *traffic* has not seen a new
assignment yet, it will still include this endpoint address in responses.
## Syntax
~~~
traffic TO...
~~~
This enabled the *traffic* plugin, with a default node id of `coredns` and no TLS.
* **TO...** are the Envoy control plane endpoint to connect to. This must start with `grpc://`.
The extended syntax is available is you want more control.
~~~
traffic TO... {
server SERVER [SERVER]...
node ID
tls CERT KEY CA
tls_servername NAME
}
~~~
* node **ID** is how *traffic* identifies itself to the control plane. This defaults to `coredns`.
* `tls` **CERT** **KEY** **CA** define the TLS properties for gRPC connection. If this is omitted an
insecure connection is attempted. From 0 to 3 arguments can be provided with the meaning as described below
* `tls` - no client authentication is used, and the system CAs are used to verify the server certificate
* `tls` **CA** - no client authentication is used, and the file CA is used to verify the server certificate
* `tls` **CERT** **KEY** - client authentication is used with the specified cert/key pair.
The server certificate is verified with the system CAs.
* `tls` **CERT** **KEY** **CA** - client authentication is used with the specified cert/key pair.
The server certificate is verified using the specified CA file.
* `tls_servername` **NAME** allows you to set a server name in the TLS configuration. This is needed
because *traffic* connects to an IP address, so it can't infer the server name from it.
## Naming Clusters
When a cluster is named this usually consists out of a single word, i.e. "cluster-v0", or "web".
The *traffic* plugins uses the name(s) specified in the Server Block to create fully qualified
domain names. For example if the Server Block specifies `lb.example.org` as one of the names,
and "cluster-v0" is one of the load balanced cluster, *traffic* will respond to query asking for
`cluster-v0.lb.example.org.` and the same goes for `web`; `web.lb.example.org`.
## Metrics
What metrics should we do? If any? Number of clusters? Number of endpoints and health?
## Ready
Should this plugin implement readiness?
## Examples
~~~
lb.example.org {
traffic grpc://127.0.0.1:18000 {
node test-id
}
debug
log
}
~~~
This will load balance any names under `lb.example.org` using the data from the manager running on
localhost on port 18000. The node ID will be `test-id` and no TLS will be used.
## Also See
The following documents provide some background on Envoy's control plane.
* <https://github.com/envoyproxy/go-control-plane>
* <https://blog.christianposta.com/envoy/guidance-for-building-a-control-plane-to-manage-envoy-proxy-based-infrastructure/>
* <https://github.com/envoyproxy/envoy/blob/442f9fcf21a5f091cec3fe9913ff309e02288659/api/envoy/api/v2/discovery.proto#L63>
## Bugs
Priority and locality information from ClusterLoadAssignments is not used.
Load reporting via xDS is not supported; this can be implemented, but there are some things that
make this difficult. A single (DNS) query is done by a resolver. Behind this resolver there may be
many clients that will use this reply, the responding server (CoreDNS) has no idea how many clients
use this resolver. So reporting a load of +1 on the CoreDNS side can be anything from 1 to 1000+,
making the load reporting highly inaccurate.
Multiple **TO** addresses is not implemented.
## TODO
* metrics?
* more and better testing
* credentials (other than TLS) - how/what?
* is the protocol correctly implemented? Should we not have a 10s tick, but wait for responses from
the control plane?

122
plugin/traffic/setup.go Normal file
View File

@@ -0,0 +1,122 @@
package traffic
import (
"crypto/tls"
"fmt"
"math/rand"
"strings"
"time"
"github.com/coredns/coredns/core/dnsserver"
"github.com/coredns/coredns/plugin"
clog "github.com/coredns/coredns/plugin/pkg/log"
"github.com/coredns/coredns/plugin/pkg/parse"
pkgtls "github.com/coredns/coredns/plugin/pkg/tls"
"github.com/coredns/coredns/plugin/pkg/transport"
"github.com/coredns/coredns/plugin/traffic/xds"
"github.com/caddyserver/caddy"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
var log = clog.NewWithPlugin("traffic")
func init() { plugin.Register("traffic", setup) }
func setup(c *caddy.Controller) error {
rand.Seed(int64(time.Now().Nanosecond()))
t, err := parseTraffic(c)
if err != nil {
return plugin.Error("traffic", err)
}
dnsserver.GetConfig(c).AddPlugin(func(next plugin.Handler) plugin.Handler {
t.Next = next
return t
})
c.OnStartup(func() error {
go t.c.Run()
return nil
})
c.OnShutdown(func() error { return t.c.Stop() })
return nil
}
func parseTraffic(c *caddy.Controller) (*Traffic, error) {
node := "coredns"
toHosts := []string{}
t := &Traffic{}
var (
err error
tlsConfig *tls.Config
tlsServerName string
)
t.origins = make([]string, len(c.ServerBlockKeys))
for i := range c.ServerBlockKeys {
t.origins[i] = plugin.Host(c.ServerBlockKeys[i]).Normalize()
}
for c.Next() {
args := c.RemainingArgs()
if len(args) < 1 {
return nil, c.ArgErr()
}
toHosts, err = parse.HostPortOrFile(args...)
if err != nil {
return nil, err
}
for i := range toHosts {
if !strings.HasPrefix(toHosts[i], transport.GRPC+"://") {
return nil, fmt.Errorf("not a %s scheme: %s", transport.GRPC, toHosts[i])
}
// now cut the prefix off again, because the dialler needs to see normal address strings. All this
// grpc:// stuff is to enforce uniform across plugins and future proofing for other protocols.
toHosts[i] = toHosts[i][len(transport.GRPC+"://"):]
}
for c.NextBlock() {
switch c.Val() {
case "id":
args := c.RemainingArgs()
if len(args) != 1 {
return nil, c.ArgErr()
}
node = args[0]
case "tls":
args := c.RemainingArgs()
if len(args) > 3 {
return nil, c.ArgErr()
}
tlsConfig, err = pkgtls.NewTLSConfigFromArgs(args...)
if err != nil {
return nil, err
}
case "tls_servername":
if !c.NextArg() {
return nil, c.ArgErr()
}
tlsServerName = c.Val()
default:
return nil, c.Errf("unknown property '%s'", c.Val())
}
}
}
opts := []grpc.DialOption{grpc.WithInsecure()}
if tlsConfig != nil {
if tlsServerName != "" {
tlsConfig.ServerName = tlsServerName
}
opts = []grpc.DialOption{grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))}
}
// TODO: only the first host is used, need to figure out how to reconcile multiple upstream providers.
if t.c, err = xds.New(toHosts[0], node, opts...); err != nil {
return nil, err
}
return t, nil
}

View File

@@ -0,0 +1,53 @@
package traffic
import (
"testing"
"github.com/caddyserver/caddy"
)
func TestSetup(t *testing.T) {
/*
c := caddy.NewTestController("dns", `traffic grpc://bla`)
if err := setup(c); err != nil {
t.Fatalf("Test 1, expected no errors, but got: %q", err)
}
*/
}
func TestParseTraffic(t *testing.T) {
tests := []struct {
input string
shouldErr bool
}{
// ok
{`traffic grpc://127.0.0.1:18000 {
id test-id
}`, false},
// fail
{`traffic`, true},
{`traffic tls://1.1.1.1`, true},
{`traffic {
id bla bla
}`, true},
{`traffic {
node
}`, true},
}
for i, test := range tests {
c := caddy.NewTestController("dns", test.input)
_, err := parseTraffic(c)
if test.shouldErr && err == nil {
t.Errorf("Test %v: Expected error but found nil", i)
continue
} else if !test.shouldErr && err != nil {
t.Errorf("Test %v: Expected no error but found error: %v", i, err)
continue
}
if test.shouldErr {
continue
}
}
}

93
plugin/traffic/traffic.go Normal file
View File

@@ -0,0 +1,93 @@
package traffic
import (
"context"
"strings"
"time"
"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/pkg/dnsutil"
"github.com/coredns/coredns/plugin/traffic/xds"
"github.com/coredns/coredns/request"
"github.com/miekg/dns"
)
// Traffic is a plugin that load balances according to assignments.
type Traffic struct {
c *xds.Client
id string
origins []string
Next plugin.Handler
}
// ServeDNS implements the plugin.Handler interface.
func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) {
state := request.Request{Req: r, W: w}
cluster := ""
for _, o := range t.origins {
if strings.HasSuffix(state.Name(), o) {
cluster, _ = dnsutil.TrimZone(state.Name(), o)
state.Zone = o
break
}
}
m := new(dns.Msg)
m.SetReply(r)
m.Authoritative = true
addr, ok := t.c.Select(cluster)
if !ok {
m.Ns = soa(state.Zone)
m.Rcode = dns.RcodeNameError
w.WriteMsg(m)
return 0, nil
}
if addr == nil {
log.Debugf("No (healthy) endpoints found for %q", cluster)
m.Ns = soa(state.Zone)
w.WriteMsg(m)
return 0, nil
}
switch state.QType() {
case dns.TypeA:
if addr.To4() == nil { // it's an IPv6 address, return nodata in that case.
m.Ns = soa(state.Zone)
break
}
m.Answer = []dns.RR{&dns.A{Hdr: dns.RR_Header{Name: state.QName(), Rrtype: dns.TypeA, Class: dns.ClassINET, Ttl: 5}, A: addr}}
case dns.TypeAAAA:
if addr.To4() != nil { // it's an IPv4 address, return nodata in that case.
m.Ns = soa(state.Zone)
break
}
m.Answer = []dns.RR{&dns.AAAA{Hdr: dns.RR_Header{Name: state.QName(), Rrtype: dns.TypeA, Class: dns.ClassINET, Ttl: 5}, AAAA: addr}}
default:
m.Ns = soa(state.Zone)
}
w.WriteMsg(m)
return 0, nil
}
// soa returns a synthetic so for this zone.
func soa(z string) []dns.RR {
return []dns.RR{&dns.SOA{
Hdr: dns.RR_Header{Name: z, Rrtype: dns.TypeSOA, Class: dns.ClassINET, Ttl: 5},
Ns: dnsutil.Join("ns", z),
Mbox: dnsutil.Join("coredns", z),
Serial: uint32(time.Now().UTC().Unix()),
Refresh: 14400,
Retry: 3600,
Expire: 604800,
Minttl: 5,
}}
}
// Name implements the plugin.Handler interface.
func (t *Traffic) Name() string { return "traffic" }

View File

@@ -0,0 +1,130 @@
package traffic
import (
"context"
"testing"
"github.com/coredns/coredns/plugin/pkg/dnstest"
"github.com/coredns/coredns/plugin/pkg/dnsutil"
"github.com/coredns/coredns/plugin/test"
"github.com/coredns/coredns/plugin/traffic/xds"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
"github.com/miekg/dns"
"google.golang.org/grpc"
)
func TestTraffic(t *testing.T) {
c, err := xds.New("127.0.0.1:0", "test-id", grpc.WithInsecure())
if err != nil {
t.Fatal(err)
}
tr := &Traffic{c: c, origins: []string{"lb.example.org."}}
tests := []struct {
cla *xdspb.ClusterLoadAssignment
cluster string
qtype uint16
rcode int
answer string // address value of the A/AAAA record.
ns bool // should there be a ns section.
}{
{
cla: &xdspb.ClusterLoadAssignment{},
cluster: "web", qtype: dns.TypeA, rcode: dns.RcodeSuccess, ns: true,
},
{
cla: &xdspb.ClusterLoadAssignment{},
cluster: "web", qtype: dns.TypeSRV, rcode: dns.RcodeSuccess, ns: true,
},
{
cla: &xdspb.ClusterLoadAssignment{},
cluster: "does-not-exist", qtype: dns.TypeA, rcode: dns.RcodeNameError, ns: true},
{
cla: &xdspb.ClusterLoadAssignment{
ClusterName: "web",
Endpoints: endpoints([]EndpointHealth{{"127.0.0.1", corepb.HealthStatus_HEALTHY}}),
},
cluster: "web", qtype: dns.TypeA, rcode: dns.RcodeSuccess, answer: "127.0.0.1",
},
{
cla: &xdspb.ClusterLoadAssignment{
ClusterName: "web",
Endpoints: endpoints([]EndpointHealth{{"127.0.0.1", corepb.HealthStatus_UNKNOWN}}),
},
cluster: "web", qtype: dns.TypeA, rcode: dns.RcodeSuccess, ns: true,
},
}
ctx := context.TODO()
for i, tc := range tests {
a := xds.NewAssignment()
a.SetClusterLoadAssignment("web", tc.cla) // web is our cluster
c.SetAssignments(a)
m := new(dns.Msg)
cl := dnsutil.Join(tc.cluster, tr.origins[0])
m.SetQuestion(cl, tc.qtype)
rec := dnstest.NewRecorder(&test.ResponseWriter{})
_, err := tr.ServeDNS(ctx, rec, m)
if err != nil {
t.Errorf("Test %d: Expected no error, but got %q", i, err)
}
if rec.Msg.Rcode != tc.rcode {
t.Errorf("Test %d: Expected no rcode %d, but got %d", i, tc.rcode, rec.Msg.Rcode)
}
if tc.ns && len(rec.Msg.Ns) == 0 {
t.Errorf("Test %d: Expected authority section, but got none", i)
}
if tc.answer != "" && len(rec.Msg.Answer) == 0 {
t.Fatalf("Test %d: Expected answer section, but got none", i)
}
if tc.answer != "" {
record := rec.Msg.Answer[0]
addr := ""
switch x := record.(type) {
case *dns.A:
addr = x.A.String()
case *dns.AAAA:
addr = x.AAAA.String()
}
if tc.answer != addr {
t.Errorf("Test %d: Expected answer %s, but got %s", i, tc.answer, addr)
}
}
}
}
type EndpointHealth struct {
Address string
Health corepb.HealthStatus
}
func endpoints(e []EndpointHealth) []*endpointpb.LocalityLbEndpoints {
ep := make([]*endpointpb.LocalityLbEndpoints, len(e))
for i := range e {
ep[i] = &endpointpb.LocalityLbEndpoints{
LbEndpoints: []*endpointpb.LbEndpoint{{
HostIdentifier: &endpointpb.LbEndpoint_Endpoint{
Endpoint: &endpointpb.Endpoint{
Address: &corepb.Address{
Address: &corepb.Address_SocketAddress{
SocketAddress: &corepb.SocketAddress{
Address: e[i].Address,
},
},
},
},
},
HealthStatus: e[i].Health,
}},
}
}
return ep
}

View File

@@ -0,0 +1,116 @@
package xds
import (
"math/rand"
"net"
"sync"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
)
type assignment struct {
mu sync.RWMutex
cla map[string]*xdspb.ClusterLoadAssignment
}
// NewAssignment returns a pointer to an assignment.
func NewAssignment() *assignment {
return &assignment{cla: make(map[string]*xdspb.ClusterLoadAssignment)}
}
// SetClusterLoadAssignment sets the assignment for the cluster to cla.
func (a *assignment) SetClusterLoadAssignment(cluster string, cla *xdspb.ClusterLoadAssignment) {
// If cla is nil we just found a cluster, check if we already know about it, or if we need to make a new entry.
a.mu.Lock()
defer a.mu.Unlock()
_, ok := a.cla[cluster]
if !ok {
a.cla[cluster] = cla
return
}
if cla == nil {
return
}
a.cla[cluster] = cla
}
// ClusterLoadAssignment returns the assignment for the cluster or nil if there is none.
func (a *assignment) ClusterLoadAssignment(cluster string) *xdspb.ClusterLoadAssignment {
a.mu.RLock()
cla, ok := a.cla[cluster]
a.mu.RUnlock()
if !ok {
return nil
}
return cla
}
func (a *assignment) clusters() []string {
a.mu.RLock()
defer a.mu.RUnlock()
clusters := make([]string, len(a.cla))
i := 0
for k := range a.cla {
clusters[i] = k
i++
}
return clusters
}
// Select selects a backend from cluster load assignments, using weighted random selection. It only selects
// backends that are reporting healthy.
func (a *assignment) Select(cluster string) (net.IP, bool) {
cla := a.ClusterLoadAssignment(cluster)
if cla == nil {
return nil, false
}
total := 0
healthy := 0
for _, ep := range cla.Endpoints {
for _, lb := range ep.GetLbEndpoints() {
if lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY {
continue
}
total += int(lb.GetLoadBalancingWeight().GetValue())
healthy++
}
}
if healthy == 0 {
return nil, true
}
if total == 0 {
// all weights are 0, randomly select one of the endpoints.
r := rand.Intn(healthy)
i := 0
for _, ep := range cla.Endpoints {
for _, lb := range ep.GetLbEndpoints() {
if lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY {
continue
}
if r == i {
return net.ParseIP(lb.GetEndpoint().GetAddress().GetSocketAddress().GetAddress()), true
}
i++
}
}
return nil, true
}
r := rand.Intn(total) + 1
for _, ep := range cla.Endpoints {
for _, lb := range ep.GetLbEndpoints() {
if lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY {
continue
}
r -= int(lb.GetLoadBalancingWeight().GetValue())
if r <= 0 {
return net.ParseIP(lb.GetEndpoint().GetAddress().GetSocketAddress().GetAddress()), true
}
}
}
return nil, true
}

View File

@@ -0,0 +1,227 @@
/*
This package contains code copied from github.com/grpc/grpc-co. The license for that code is:
Copyright 2019 gRPC authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package xds implements a bidirectional stream to an envoy ADS management endpoint. It will stream
// updates (CDS and EDS) from there to help load balance responses to DNS clients.
package xds
import (
"context"
"fmt"
"net"
"os"
"sync"
"time"
"github.com/coredns/coredns/coremain"
clog "github.com/coredns/coredns/plugin/pkg/log"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
"github.com/golang/protobuf/ptypes"
structpb "github.com/golang/protobuf/ptypes/struct"
"google.golang.org/grpc"
)
var log = clog.NewWithPlugin("traffic: xds")
const (
cdsURL = "type.googleapis.com/envoy.api.v2.Cluster"
edsURL = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"
)
type adsStream adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient
// Client talks to the grpc manager's endpoint to get load assignments.
type Client struct {
cc *grpc.ClientConn
ctx context.Context
assignments *assignment // assignments contains the current clusters and endpoints.
node *corepb.Node
cancel context.CancelFunc
stop chan struct{}
mu sync.RWMutex
version map[string]string
nonce map[string]string
}
// New returns a new client that's dialed to addr using node as the local identifier.
func New(addr, node string, opts ...grpc.DialOption) (*Client, error) {
cc, err := grpc.Dial(addr, opts...)
if err != nil {
return nil, err
}
hostname, _ := os.Hostname()
c := &Client{cc: cc, node: &corepb.Node{Id: node,
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"HOSTNAME": {
Kind: &structpb.Value_StringValue{StringValue: hostname},
},
},
},
BuildVersion: coremain.CoreVersion,
},
}
c.assignments = &assignment{cla: make(map[string]*xdspb.ClusterLoadAssignment)}
c.version, c.nonce = make(map[string]string), make(map[string]string)
c.ctx, c.cancel = context.WithCancel(context.Background())
return c, nil
}
// Stop stops all goroutines and closes the connection to the upstream manager.
func (c *Client) Stop() error { c.cancel(); return c.cc.Close() }
// Run starts all goroutines and gathers the clusters and endpoint information from the upstream manager.
func (c *Client) Run() {
for {
select {
case <-c.ctx.Done():
return
default:
}
cli := adsgrpc.NewAggregatedDiscoveryServiceClient(c.cc)
stream, err := cli.StreamAggregatedResources(c.ctx)
if err != nil {
log.Debug(err)
time.Sleep(2 * time.Second) // grpc's client.go does more spiffy exp. backoff, do we really need that?
continue
}
done := make(chan struct{})
go func() {
if err := c.clusterDiscovery(stream, c.Version(cdsURL), c.Nonce(cdsURL), []string{}); err != nil {
log.Debug(err)
}
tick := time.NewTicker(10 * time.Second)
for {
select {
case <-tick.C:
// send empty list for cluster discovery every 10 seconds
if err := c.clusterDiscovery(stream, c.Version(cdsURL), c.Nonce(cdsURL), []string{}); err != nil {
log.Debug(err)
}
case <-done:
tick.Stop()
return
}
}
}()
if err := c.Receive(stream); err != nil {
log.Warning(err)
}
close(done)
}
}
// clusterDiscovery sends a cluster DiscoveryRequest on the stream.
func (c *Client) clusterDiscovery(stream adsStream, version, nonce string, clusters []string) error {
req := &xdspb.DiscoveryRequest{
Node: c.node,
TypeUrl: cdsURL,
ResourceNames: clusters, // empty for all
VersionInfo: version,
ResponseNonce: nonce,
}
return stream.Send(req)
}
// endpointDiscovery sends a endpoint DiscoveryRequest on the stream.
func (c *Client) endpointDiscovery(stream adsStream, version, nonce string, clusters []string) error {
req := &xdspb.DiscoveryRequest{
Node: c.node,
TypeUrl: edsURL,
ResourceNames: clusters,
VersionInfo: version,
ResponseNonce: nonce,
}
return stream.Send(req)
}
// Receive receives from the stream, it handled both cluster and endpoint DiscoveryResponses.
func (c *Client) Receive(stream adsStream) error {
for {
resp, err := stream.Recv()
if err != nil {
return err
}
switch resp.GetTypeUrl() {
case cdsURL:
a := NewAssignment()
for _, r := range resp.GetResources() {
var any ptypes.DynamicAny
if err := ptypes.UnmarshalAny(r, &any); err != nil {
log.Debugf("Failed to unmarshal cluster discovery: %s", err)
continue
}
cluster, ok := any.Message.(*xdspb.Cluster)
if !ok {
continue
}
a.SetClusterLoadAssignment(cluster.GetName(), nil)
}
log.Debugf("Cluster discovery processed with %d resources, version %q and nonce %q, clusters: %v", len(resp.GetResources()), c.Version(cdsURL), c.Nonce(cdsURL), a.clusters())
// set our local administration and ack the reply. Empty version would signal NACK.
c.SetNonce(cdsURL, resp.GetNonce())
c.SetVersion(cdsURL, resp.GetVersionInfo())
c.SetAssignments(a)
c.clusterDiscovery(stream, resp.GetVersionInfo(), resp.GetNonce(), a.clusters())
// now kick off discovery for endpoints
if err := c.endpointDiscovery(stream, c.Version(edsURL), c.Nonce(edsURL), a.clusters()); err != nil {
log.Debug(err)
}
case edsURL:
for _, r := range resp.GetResources() {
var any ptypes.DynamicAny
if err := ptypes.UnmarshalAny(r, &any); err != nil {
log.Debugf("Failed to unmarshal endpoint discovery: %s", err)
continue
}
cla, ok := any.Message.(*xdspb.ClusterLoadAssignment)
if !ok {
continue
}
c.assignments.SetClusterLoadAssignment(cla.GetClusterName(), cla)
}
log.Debugf("Endpoint discovery processed with %d resources, version %q and nonce %q, clusters: %v", len(resp.GetResources()), c.Version(edsURL), c.Nonce(edsURL), c.assignments.clusters())
// set our local administration and ack the reply. Empty version would signal NACK.
c.SetNonce(edsURL, resp.GetNonce())
c.SetVersion(edsURL, resp.GetVersionInfo())
default:
return fmt.Errorf("unknown response URL for discovery: %q", resp.GetTypeUrl())
}
}
}
// Select returns an address that is deemed to be the correct one for this cluster. The returned
// boolean indicates if the cluster exists.
func (c *Client) Select(cluster string) (net.IP, bool) {
if cluster == "" {
return nil, false
}
return c.assignments.Select(cluster)
}

View File

@@ -0,0 +1,37 @@
package xds
func (c *Client) Assignments() *assignment {
c.mu.RLock()
defer c.mu.RUnlock()
return c.assignments
}
func (c *Client) SetAssignments(a *assignment) {
c.mu.Lock()
defer c.mu.Unlock()
c.assignments = a
}
func (c *Client) Version(typeURL string) string {
c.mu.RLock()
defer c.mu.RUnlock()
return c.version[typeURL]
}
func (c *Client) SetVersion(typeURL, a string) {
c.mu.Lock()
defer c.mu.Unlock()
c.version[typeURL] = a
}
func (c *Client) Nonce(typeURL string) string {
c.mu.RLock()
defer c.mu.RUnlock()
return c.nonce[typeURL]
}
func (c *Client) SetNonce(typeURL, n string) {
c.mu.Lock()
defer c.mu.Unlock()
c.nonce[typeURL] = n
}