From 6da97627a77beab18171e30d6a4f87eaf2949fdd Mon Sep 17 00:00:00 2001 From: Miek Gieben Date: Thu, 16 Jan 2020 08:47:17 +0100 Subject: [PATCH] Make node id a property Signed-off-by: Miek Gieben --- plugin/traffic/HACKING.md | 4 +++- plugin/traffic/README.md | 11 ++++++---- plugin/traffic/setup.go | 25 ++++++++++++++++------ plugin/traffic/setup_test.go | 36 +++++++++++++++++++++----------- plugin/traffic/traffic.go | 19 ++++------------- plugin/traffic/xds/assignment.go | 9 ++++---- plugin/traffic/xds/client.go | 12 ++++++++--- 7 files changed, 70 insertions(+), 46 deletions(-) diff --git a/plugin/traffic/HACKING.md b/plugin/traffic/HACKING.md index 990c1c353..37d352ef8 100644 --- a/plugin/traffic/HACKING.md +++ b/plugin/traffic/HACKING.md @@ -33,7 +33,9 @@ Then for CoreDNS, check out the `traffic` branch, create a Corefile: ~~~ Corefile example.org { - traffic + traffic { + id test-id + } debug } ~~~ diff --git a/plugin/traffic/README.md b/plugin/traffic/README.md index 18aeb5705..2db909da2 100644 --- a/plugin/traffic/README.md +++ b/plugin/traffic/README.md @@ -31,21 +31,24 @@ assignment yet, it will still include this endpoint address in responses. ## Syntax ~~~ -traffic +traffic TO... ~~~ -The extended syntax (not implemented; everything is hard-coded at the moment): +* **TO...** are the Envoy control plane endpoint to connect to. The syntax mimics the *forward* + plugin and must start with `grpc://`. + + +The extended syntax is available is you want more control. ~~~ traffic { - server grpc://dsdsd + server SERVER [SERVER]... node ID } ~~~ * node **ID** is how *traffic* identifies itself to the control plane. This defaults to `coredns`. - ## Examples ~~~ corefile diff --git a/plugin/traffic/setup.go b/plugin/traffic/setup.go index 946101a7d..010a9858c 100644 --- a/plugin/traffic/setup.go +++ b/plugin/traffic/setup.go @@ -7,6 +7,7 @@ import ( "github.com/coredns/coredns/core/dnsserver" "github.com/coredns/coredns/plugin" clog "github.com/coredns/coredns/plugin/pkg/log" + "github.com/coredns/coredns/plugin/traffic/xds" "github.com/caddyserver/caddy" ) @@ -17,11 +18,7 @@ func init() { plugin.Register("traffic", setup) } func setup(c *caddy.Controller) error { rand.Seed(int64(time.Now().Nanosecond())) - if err := parse(c); err != nil { - return plugin.Error("traffic", err) - } - - t, err := New() + t, err := parse(c) if err != nil { return plugin.Error("traffic", err) } @@ -52,6 +49,8 @@ func setup(c *caddy.Controller) error { } func parse(c *caddy.Controller) (*Traffic, error) { + node := "coredns" + for c.Next() { args := c.RemainingArgs() if len(args) != 0 { @@ -61,8 +60,22 @@ func parse(c *caddy.Controller) (*Traffic, error) { for c.NextBlock() { switch c.Val() { case "id": + args := c.RemainingArgs() + if len(args) != 1 { + return nil, c.ArgErr() + } + node = args[0] + default: + return nil, c.Errf("unknown property '%s'", c.Val()) } } } - return nil, nil + + x, err := xds.New(":18000", node) + if err != nil { + return nil, err + } + + t := &Traffic{c: x} + return t, nil } diff --git a/plugin/traffic/setup_test.go b/plugin/traffic/setup_test.go index 1ed8d1cce..21690df4f 100644 --- a/plugin/traffic/setup_test.go +++ b/plugin/traffic/setup_test.go @@ -7,28 +7,40 @@ import ( ) func TestSetup(t *testing.T) { + /* + c := caddy.NewTestController("dns", `traffic grpc://bla`) + if err := setup(c); err != nil { + t.Fatalf("Test 1, expected no errors, but got: %q", err) + } + */ +} + +func TestParse(t *testing.T) { tests := []struct { input string shouldErr bool }{ - // positive - {`traffic`, false}, - // negative - {`traffic fleeb`, true}, + // fail + {`traffic { + id bla bla + }`, true}, + {`traffic { + node bla bla + }`, true}, } - for i, test := range tests { c := caddy.NewTestController("dns", test.input) - err := parse(c) - + _, err := parse(c) if test.shouldErr && err == nil { - t.Errorf("Test %d: Expected error but found %s for input %s", i, err, test.input) + t.Errorf("Test %v: Expected error but found nil", i) + continue + } else if !test.shouldErr && err != nil { + t.Errorf("Test %v: Expected no error but found error: %v", i, err) + continue } - if err != nil { - if !test.shouldErr { - t.Errorf("Test %d: Expected no error but found one for input %s. Error was: %v", i, test.input, err) - } + if test.shouldErr { + continue } } } diff --git a/plugin/traffic/traffic.go b/plugin/traffic/traffic.go index 958fb219f..61384ab17 100644 --- a/plugin/traffic/traffic.go +++ b/plugin/traffic/traffic.go @@ -18,19 +18,8 @@ type Traffic struct { Next plugin.Handler } -// New returns a pointer to a new and initialized Traffic. -func New(addr, node string) (*Traffic, error) { - c, err := xds.New(":18000", "mycoredns") - if err != nil { - return nil, err - } - - return &Traffic{c: c}, nil -} - -func (t *Traffic) Close() { - t.c.Close() -} +// shutdown closes the connection to the managment endpoints and stops any running goroutines. +func (t *Traffic) shutdown() { t.c.Close() } // ServeDNS implements the plugin.Handler interface. func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) { @@ -49,8 +38,8 @@ func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg m.SetReply(r) m.Answer = []dns.RR{&dns.A{ - dns.RR_Header{Name: state.QName(), Rrtype: dns.TypeA, Class: dns.ClassINET, Ttl: 5}, - addr, + Hdr: dns.RR_Header{Name: state.QName(), Rrtype: dns.TypeA, Class: dns.ClassINET, Ttl: 5}, + A: addr, }} w.WriteMsg(m) diff --git a/plugin/traffic/xds/assignment.go b/plugin/traffic/xds/assignment.go index 60149c3f2..5cdd04104 100644 --- a/plugin/traffic/xds/assignment.go +++ b/plugin/traffic/xds/assignment.go @@ -14,7 +14,7 @@ type assignment struct { version int // not sure what do with and if we should discard all clusters. } -func (a assignment) SetClusterLoadAssignment(cluster string, cla *xdspb.ClusterLoadAssignment) { +func (a *assignment) SetClusterLoadAssignment(cluster string, cla *xdspb.ClusterLoadAssignment) { // If cla is nil we just found a cluster, check if we already know about it, or if we need to make a new entry. a.mu.Lock() defer a.mu.Unlock() @@ -33,7 +33,7 @@ func (a assignment) SetClusterLoadAssignment(cluster string, cla *xdspb.ClusterL } // ClusterLoadAssignment returns the healthy endpoints and their weight. -func (a assignment) ClusterLoadAssignment(cluster string) *xdspb.ClusterLoadAssignment { +func (a *assignment) ClusterLoadAssignment(cluster string) *xdspb.ClusterLoadAssignment { a.mu.RLock() cla, ok := a.cla[cluster] a.mu.RUnlock() @@ -43,7 +43,7 @@ func (a assignment) ClusterLoadAssignment(cluster string) *xdspb.ClusterLoadAssi return cla } -func (a assignment) Clusters() []string { +func (a *assignment) Clusters() []string { a.mu.RLock() defer a.mu.RUnlock() clusters := make([]string, len(a.cla)) @@ -57,7 +57,7 @@ func (a assignment) Clusters() []string { // Select selects a backend from cla, using weighted random selection. It only selects // backends that are reporting healthy. -func (a assignment) Select(cluster string) net.IP { +func (a *assignment) Select(cluster string) net.IP { cla := a.ClusterLoadAssignment(cluster) if cla == nil { return nil @@ -105,6 +105,5 @@ func (a assignment) Select(cluster string) net.IP { } } } - return nil } diff --git a/plugin/traffic/xds/client.go b/plugin/traffic/xds/client.go index bbf92d586..75056782f 100644 --- a/plugin/traffic/xds/client.go +++ b/plugin/traffic/xds/client.go @@ -43,10 +43,11 @@ const ( type adsStream adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient +// Client talks to the grpc manager's endpoint to get load assignments. type Client struct { cc *grpc.ClientConn ctx context.Context - assignments assignment + assignments *assignment node *corepb.Node cancel context.CancelFunc stop chan struct{} @@ -61,14 +62,16 @@ func New(addr, node string) (*Client, error) { return nil, err } c := &Client{cc: cc, node: &corepb.Node{Id: "test-id"}} // do more with this node data? Hostname port?? - c.assignments = assignment{cla: make(map[string]*xdspb.ClusterLoadAssignment)} + c.assignments = &assignment{cla: make(map[string]*xdspb.ClusterLoadAssignment)} c.ctx, c.cancel = context.WithCancel(context.Background()) return c, nil } +// Close closes a client performs cleanups. func (c *Client) Close() { c.cancel(); c.cc.Close() } +// Run runs the gRPC stream to the manager. func (c *Client) Run() (adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient, error) { cli := adsgrpc.NewAggregatedDiscoveryServiceClient(c.cc) stream, err := cli.StreamAggregatedResources(c.ctx) @@ -78,6 +81,7 @@ func (c *Client) Run() (adsgrpc.AggregatedDiscoveryService_StreamAggregatedResou return stream, nil } +// ClusterDiscovery sends a cluster DiscoveryRequest on the stream. func (c *Client) ClusterDiscovery(stream adsStream, version, nonce string, clusters []string) error { req := &xdspb.DiscoveryRequest{ Node: c.node, @@ -89,6 +93,7 @@ func (c *Client) ClusterDiscovery(stream adsStream, version, nonce string, clust return stream.Send(req) } +// EndpointDiscovery sends a endpoint DiscoveryRequest on the stream. func (c *Client) EndpointDiscovery(stream adsStream, version, nonce string, clusters []string) error { req := &xdspb.DiscoveryRequest{ Node: c.node, @@ -100,6 +105,7 @@ func (c *Client) EndpointDiscovery(stream adsStream, version, nonce string, clus return stream.Send(req) } +// Receive receives from the stream, it handled both cluster and endpoint DiscoveryResponses. func (c *Client) Receive(stream adsStream) error { for { resp, err := stream.Recv() @@ -157,5 +163,5 @@ func (c *Client) Receive(stream adsStream) error { } } -// Select is a small wrapper. bla bla, keeps assigmens private. +// Select returns an address that is deemed to be the correct one for this cluster. func (c *Client) Select(cluster string) net.IP { return c.assignments.Select(cluster) }