From a231daf17f6a7d16c8efd554f7039e2c9fc15551 Mon Sep 17 00:00:00 2001 From: Miek Gieben Date: Thu, 16 Jan 2020 16:39:28 +0100 Subject: [PATCH] more updates Signed-off-by: Miek Gieben --- plugin/traffic/README.md | 36 ++++++++++++- plugin/traffic/setup.go | 57 +++++++++++++------- plugin/traffic/traffic.go | 7 +-- plugin/traffic/xds/assignment.go | 9 ++-- plugin/traffic/xds/client.go | 91 ++++++++++++++++++++++---------- 5 files changed, 142 insertions(+), 58 deletions(-) diff --git a/plugin/traffic/README.md b/plugin/traffic/README.md index 94e32e8ba..391f0cc5b 100644 --- a/plugin/traffic/README.md +++ b/plugin/traffic/README.md @@ -44,10 +44,24 @@ The extended syntax is available is you want more control. traffic TO... { server SERVER [SERVER]... node ID + tls CERT KEY CA + tls_servername NAME } ~~~ - * node **ID** is how *traffic* identifies itself to the control plane. This defaults to `coredns`. +* node **ID** is how *traffic* identifies itself to the control plane. This defaults to `coredns`. +* `tls` **CERT** **KEY** **CA** define the TLS properties for gRPC connection. If this is omitted an + insecure connection is attempted. From 0 to 3 arguments can be provided with the meaning as described below + + * `tls` - no client authentication is used, and the system CAs are used to verify the server certificate + * `tls` **CA** - no client authentication is used, and the file CA is used to verify the server certificate + * `tls` **CERT** **KEY** - client authentication is used with the specified cert/key pair. + The server certificate is verified with the system CAs. + * `tls` **CERT** **KEY** **CA** - client authentication is used with the specified cert/key pair. + The server certificate is verified using the specified CA file. + +* `tls_servername` **NAME** allows you to set a server name in the TLS configuration. This is needed + because *traffic* connects to an IP address, so it can't infer the server name from it. ## Naming Clusters @@ -57,6 +71,15 @@ domain names. For example if the Server Block specifies `lb.example.org` as one and "cluster-v0" is one of the load balanced cluster, *traffic* will respond to query asking for `cluster-v0.lb.example.org.` and the same goes for `web`; `web.lb.example.org`. +## Metrics + +What metrics should we do? + +## Ready + +Should this plugin implement readyness? + + ## Examples ~~~ @@ -70,7 +93,7 @@ lb.example.org { ~~~ This will load balance any names under `lb.example.org` using the data from the manager running on -localhost on port 18000. The node ID will default to `coredns`. +localhost on port 18000. The node ID will be `test-id` and no TLS will be used. ## Also See @@ -94,3 +117,12 @@ use this resolver. So reporting a load of +1 on the CoreDNS side can be anything making the load reporting highly inaccurate. Multiple **TO** addresses is not implemented. + +## TODO + +* reconnecting the stream +* acking responses +* correctly tracking versions and pruning old clusters. +* metrics? +* testing +* credentials (other than TLS) diff --git a/plugin/traffic/setup.go b/plugin/traffic/setup.go index bdd61a381..bfd9683a4 100644 --- a/plugin/traffic/setup.go +++ b/plugin/traffic/setup.go @@ -1,6 +1,7 @@ package traffic import ( + "crypto/tls" "fmt" "math/rand" "strings" @@ -10,10 +11,13 @@ import ( "github.com/coredns/coredns/plugin" clog "github.com/coredns/coredns/plugin/pkg/log" "github.com/coredns/coredns/plugin/pkg/parse" + pkgtls "github.com/coredns/coredns/plugin/pkg/tls" "github.com/coredns/coredns/plugin/pkg/transport" "github.com/coredns/coredns/plugin/traffic/xds" "github.com/caddyserver/caddy" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" ) var log = clog.NewWithPlugin("traffic") @@ -32,23 +36,11 @@ func setup(c *caddy.Controller) error { return t }) - stream, err := t.c.Run() - if err != nil { - return plugin.Error("traffic", err) - } - - if err := t.c.ClusterDiscovery(stream, "", "", []string{}); err != nil { - log.Error(err) - } - - go func() { - err = t.c.Receive(stream) - if err != nil { - // can't do log debug in setup functions - log.Debug(err) - } - }() - + c.OnStartup(func() error { + go t.c.Run() + return nil + }) + c.OnShutdown(func() error { return t.c.Stop() }) return nil } @@ -56,7 +48,11 @@ func parseTraffic(c *caddy.Controller) (*Traffic, error) { node := "coredns" toHosts := []string{} t := &Traffic{} - var err error + var ( + err error + tlsConfig *tls.Config + tlsServerName string + ) t.origins = make([]string, len(c.ServerBlockKeys)) for i := range c.ServerBlockKeys { @@ -88,14 +84,37 @@ func parseTraffic(c *caddy.Controller) (*Traffic, error) { return nil, c.ArgErr() } node = args[0] + case "tls": + args := c.RemainingArgs() + if len(args) > 3 { + return nil, c.ArgErr() + } + + tlsConfig, err = pkgtls.NewTLSConfigFromArgs(args...) + if err != nil { + return nil, err + } + case "tls_servername": + if !c.NextArg() { + return nil, c.ArgErr() + } + tlsServerName = c.Val() default: return nil, c.Errf("unknown property '%s'", c.Val()) } } } + opts := []grpc.DialOption{grpc.WithInsecure()} + if tlsConfig != nil { + if tlsServerName != "" { + tlsConfig.ServerName = tlsServerName + } + opts = []grpc.DialOption{grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))} + } + // TODO: only the first host is used, need to figure out how to reconcile multiple upstream providers. - if t.c, err = xds.New(toHosts[0], node); err != nil { + if t.c, err = xds.New(toHosts[0], node, opts...); err != nil { return nil, err } diff --git a/plugin/traffic/traffic.go b/plugin/traffic/traffic.go index c278051e7..3360be033 100644 --- a/plugin/traffic/traffic.go +++ b/plugin/traffic/traffic.go @@ -18,11 +18,9 @@ type Traffic struct { c *xds.Client id string origins []string - Next plugin.Handler -} -// shutdown closes the connection to the managment endpoints and stops any running goroutines. -func (t *Traffic) shutdown() { t.c.Close() } + Next plugin.Handler +} // ServeDNS implements the plugin.Handler interface. func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) { @@ -38,7 +36,6 @@ func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg } } if cluster == "" { - // TODO(miek): can this actually happen? return plugin.NextOrFailure(t.Name(), t.Next, ctx, w, r) } diff --git a/plugin/traffic/xds/assignment.go b/plugin/traffic/xds/assignment.go index bca1697f9..b4fce3406 100644 --- a/plugin/traffic/xds/assignment.go +++ b/plugin/traffic/xds/assignment.go @@ -14,7 +14,7 @@ type assignment struct { version int // not sure what do with and if we should discard all clusters. } -func (a *assignment) SetClusterLoadAssignment(cluster string, cla *xdspb.ClusterLoadAssignment) { +func (a *assignment) setClusterLoadAssignment(cluster string, cla *xdspb.ClusterLoadAssignment) { // If cla is nil we just found a cluster, check if we already know about it, or if we need to make a new entry. a.mu.Lock() defer a.mu.Unlock() @@ -30,8 +30,7 @@ func (a *assignment) SetClusterLoadAssignment(cluster string, cla *xdspb.Cluster } -// ClusterLoadAssignment returns the healthy endpoints and their weight. -func (a *assignment) ClusterLoadAssignment(cluster string) *xdspb.ClusterLoadAssignment { +func (a *assignment) clusterLoadAssignment(cluster string) *xdspb.ClusterLoadAssignment { a.mu.RLock() cla, ok := a.cla[cluster] a.mu.RUnlock() @@ -41,7 +40,7 @@ func (a *assignment) ClusterLoadAssignment(cluster string) *xdspb.ClusterLoadAss return cla } -func (a *assignment) Clusters() []string { +func (a *assignment) clusters() []string { a.mu.RLock() defer a.mu.RUnlock() clusters := make([]string, len(a.cla)) @@ -56,7 +55,7 @@ func (a *assignment) Clusters() []string { // Select selects a backend from cla, using weighted random selection. It only selects // backends that are reporting healthy. func (a *assignment) Select(cluster string) net.IP { - cla := a.ClusterLoadAssignment(cluster) + cla := a.clusterLoadAssignment(cluster) if cla == nil { return nil } diff --git a/plugin/traffic/xds/client.go b/plugin/traffic/xds/client.go index 2b57c61ff..1baeba408 100644 --- a/plugin/traffic/xds/client.go +++ b/plugin/traffic/xds/client.go @@ -22,8 +22,10 @@ package xds import ( "context" + "fmt" "net" "os" + "sync" "time" "github.com/coredns/coredns/coremain" @@ -54,12 +56,12 @@ type Client struct { node *corepb.Node cancel context.CancelFunc stop chan struct{} + mu sync.RWMutex + nonce string } // New returns a new client that's dialed to addr using node as the local identifier. -func New(addr, node string) (*Client, error) { - // todo credentials! - opts := []grpc.DialOption{grpc.WithInsecure()} +func New(addr, node string, opts ...grpc.DialOption) (*Client, error) { cc, err := grpc.Dial(addr, opts...) if err != nil { return nil, err @@ -82,21 +84,54 @@ func New(addr, node string) (*Client, error) { return c, nil } -// Close closes a client performs cleanups. -func (c *Client) Close() { c.cancel(); c.cc.Close() } +// Stop stops all goroutines and closes the connection to the upstream manager. +func (c *Client) Stop() error { c.cancel(); return c.cc.Close() } -// Run runs the gRPC stream to the manager. -func (c *Client) Run() (adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient, error) { - cli := adsgrpc.NewAggregatedDiscoveryServiceClient(c.cc) - stream, err := cli.StreamAggregatedResources(c.ctx) - if err != nil { - return nil, err +// Run starts all goroutines and gathers the clusters and endpoint information from the upstream manager. +func (c *Client) Run() { + for { + select { + case <-c.ctx.Done(): + return + default: + } + + cli := adsgrpc.NewAggregatedDiscoveryServiceClient(c.cc) + stream, err := cli.StreamAggregatedResources(c.ctx) + if err != nil { + log.Debug(err) + time.Sleep(2 * time.Second) // grpc's client.go does more spiffy exp. backoff, do we really need that? + continue + } + + done := make(chan struct{}) + go func() { + tick := time.NewTicker(10 * time.Second) + for { + select { + case <-tick.C: + // send empty list for cluster discovery again and again + log.Debugf("Requesting cluster list, nonce %q:", c.Nonce()) + if err := c.clusterDiscovery(stream, "", c.Nonce(), []string{}); err != nil { + log.Debug(err) + } + + case <-done: + tick.Stop() + return + } + } + }() + + if err := c.Receive(stream); err != nil { + log.Debug(err) + } + close(done) } - return stream, nil } -// ClusterDiscovery sends a cluster DiscoveryRequest on the stream. -func (c *Client) ClusterDiscovery(stream adsStream, version, nonce string, clusters []string) error { +// clusterDiscovery sends a cluster DiscoveryRequest on the stream. +func (c *Client) clusterDiscovery(stream adsStream, version, nonce string, clusters []string) error { req := &xdspb.DiscoveryRequest{ Node: c.node, TypeUrl: cdsURL, @@ -107,8 +142,8 @@ func (c *Client) ClusterDiscovery(stream adsStream, version, nonce string, clust return stream.Send(req) } -// EndpointDiscovery sends a endpoint DiscoveryRequest on the stream. -func (c *Client) EndpointDiscovery(stream adsStream, version, nonce string, clusters []string) error { +// endpointDiscovery sends a endpoint DiscoveryRequest on the stream. +func (c *Client) endpointDiscovery(stream adsStream, version, nonce string, clusters []string) error { req := &xdspb.DiscoveryRequest{ Node: c.node, TypeUrl: edsURL, @@ -124,8 +159,7 @@ func (c *Client) Receive(stream adsStream) error { for { resp, err := stream.Recv() if err != nil { - log.Warningf("Trouble receiving from the gRPC connection: %s", err) - time.Sleep(10 * time.Second) // better. + return err } switch resp.GetTypeUrl() { @@ -133,25 +167,30 @@ func (c *Client) Receive(stream adsStream) error { for _, r := range resp.GetResources() { var any ptypes.DynamicAny if err := ptypes.UnmarshalAny(r, &any); err != nil { + log.Debugf("Failed to unmarshal cluster discovery: %s", err) continue } cluster, ok := any.Message.(*xdspb.Cluster) if !ok { continue } - c.assignments.SetClusterLoadAssignment(cluster.GetName(), nil) + c.assignments.setClusterLoadAssignment(cluster.GetName(), nil) } log.Debugf("Cluster discovery processed with %d resources", len(resp.GetResources())) + // ack the CDS proto, with we we've got. (empty version would be NACK) - if err := c.ClusterDiscovery(stream, resp.GetVersionInfo(), resp.GetNonce(), c.assignments.Clusters()); err != nil { - log.Warningf("Failed to acknowledge cluster discovery: %s", err) + if err := c.clusterDiscovery(stream, resp.GetVersionInfo(), resp.GetNonce(), c.assignments.clusters()); err != nil { + log.Debug(err) + continue } // need to figure out how to handle the versions and nounces exactly. // now kick off discovery for endpoints - if err := c.EndpointDiscovery(stream, "", "", c.assignments.Clusters()); err != nil { - log.Warningf("Failed to perform endpoint discovery: %s", err) + if err := c.endpointDiscovery(stream, "", resp.GetNonce(), c.assignments.clusters()); err != nil { + log.Debug(err) + continue } + c.SetNonce(resp.GetNonce()) case edsURL: for _, r := range resp.GetResources() { @@ -162,17 +201,15 @@ func (c *Client) Receive(stream adsStream) error { } cla, ok := any.Message.(*xdspb.ClusterLoadAssignment) if !ok { - log.Debugf("Unexpected resource type: %T in endpoint discovery", any.Message) continue } - c.assignments.SetClusterLoadAssignment(cla.GetClusterName(), cla) + c.assignments.setClusterLoadAssignment(cla.GetClusterName(), cla) // ack the bloody thing } log.Debugf("Endpoint discovery processed with %d resources", len(resp.GetResources())) default: - log.Warningf("Unknown response URL for discovery: %q", resp.GetTypeUrl()) - continue + return fmt.Errorf("unknown response URL for discovery: %q", resp.GetTypeUrl()) } } }