Make node id a property

Signed-off-by: Miek Gieben <miek@miek.nl>
This commit is contained in:
Miek Gieben
2020-01-16 08:47:17 +01:00
parent acd0b73a49
commit 6da97627a7
7 changed files with 70 additions and 46 deletions

View File

@@ -33,7 +33,9 @@ Then for CoreDNS, check out the `traffic` branch, create a Corefile:
~~~ Corefile ~~~ Corefile
example.org { example.org {
traffic traffic {
id test-id
}
debug debug
} }
~~~ ~~~

View File

@@ -31,21 +31,24 @@ assignment yet, it will still include this endpoint address in responses.
## Syntax ## Syntax
~~~ ~~~
traffic traffic TO...
~~~ ~~~
The extended syntax (not implemented; everything is hard-coded at the moment): * **TO...** are the Envoy control plane endpoint to connect to. The syntax mimics the *forward*
plugin and must start with `grpc://`.
The extended syntax is available is you want more control.
~~~ ~~~
traffic { traffic {
server grpc://dsdsd <creds> server SERVER [SERVER]...
node ID node ID
} }
~~~ ~~~
* node **ID** is how *traffic* identifies itself to the control plane. This defaults to `coredns`. * node **ID** is how *traffic* identifies itself to the control plane. This defaults to `coredns`.
## Examples ## Examples
~~~ corefile ~~~ corefile

View File

@@ -7,6 +7,7 @@ import (
"github.com/coredns/coredns/core/dnsserver" "github.com/coredns/coredns/core/dnsserver"
"github.com/coredns/coredns/plugin" "github.com/coredns/coredns/plugin"
clog "github.com/coredns/coredns/plugin/pkg/log" clog "github.com/coredns/coredns/plugin/pkg/log"
"github.com/coredns/coredns/plugin/traffic/xds"
"github.com/caddyserver/caddy" "github.com/caddyserver/caddy"
) )
@@ -17,11 +18,7 @@ func init() { plugin.Register("traffic", setup) }
func setup(c *caddy.Controller) error { func setup(c *caddy.Controller) error {
rand.Seed(int64(time.Now().Nanosecond())) rand.Seed(int64(time.Now().Nanosecond()))
if err := parse(c); err != nil { t, err := parse(c)
return plugin.Error("traffic", err)
}
t, err := New()
if err != nil { if err != nil {
return plugin.Error("traffic", err) return plugin.Error("traffic", err)
} }
@@ -52,6 +49,8 @@ func setup(c *caddy.Controller) error {
} }
func parse(c *caddy.Controller) (*Traffic, error) { func parse(c *caddy.Controller) (*Traffic, error) {
node := "coredns"
for c.Next() { for c.Next() {
args := c.RemainingArgs() args := c.RemainingArgs()
if len(args) != 0 { if len(args) != 0 {
@@ -61,8 +60,22 @@ func parse(c *caddy.Controller) (*Traffic, error) {
for c.NextBlock() { for c.NextBlock() {
switch c.Val() { switch c.Val() {
case "id": case "id":
args := c.RemainingArgs()
if len(args) != 1 {
return nil, c.ArgErr()
}
node = args[0]
default:
return nil, c.Errf("unknown property '%s'", c.Val())
} }
} }
} }
return nil, nil
x, err := xds.New(":18000", node)
if err != nil {
return nil, err
}
t := &Traffic{c: x}
return t, nil
} }

View File

@@ -7,28 +7,40 @@ import (
) )
func TestSetup(t *testing.T) { func TestSetup(t *testing.T) {
/*
c := caddy.NewTestController("dns", `traffic grpc://bla`)
if err := setup(c); err != nil {
t.Fatalf("Test 1, expected no errors, but got: %q", err)
}
*/
}
func TestParse(t *testing.T) {
tests := []struct { tests := []struct {
input string input string
shouldErr bool shouldErr bool
}{ }{
// positive // fail
{`traffic`, false}, {`traffic {
// negative id bla bla
{`traffic fleeb`, true}, }`, true},
{`traffic {
node bla bla
}`, true},
} }
for i, test := range tests { for i, test := range tests {
c := caddy.NewTestController("dns", test.input) c := caddy.NewTestController("dns", test.input)
err := parse(c) _, err := parse(c)
if test.shouldErr && err == nil { if test.shouldErr && err == nil {
t.Errorf("Test %d: Expected error but found %s for input %s", i, err, test.input) t.Errorf("Test %v: Expected error but found nil", i)
continue
} else if !test.shouldErr && err != nil {
t.Errorf("Test %v: Expected no error but found error: %v", i, err)
continue
} }
if err != nil { if test.shouldErr {
if !test.shouldErr { continue
t.Errorf("Test %d: Expected no error but found one for input %s. Error was: %v", i, test.input, err)
}
} }
} }
} }

View File

@@ -18,19 +18,8 @@ type Traffic struct {
Next plugin.Handler Next plugin.Handler
} }
// New returns a pointer to a new and initialized Traffic. // shutdown closes the connection to the managment endpoints and stops any running goroutines.
func New(addr, node string) (*Traffic, error) { func (t *Traffic) shutdown() { t.c.Close() }
c, err := xds.New(":18000", "mycoredns")
if err != nil {
return nil, err
}
return &Traffic{c: c}, nil
}
func (t *Traffic) Close() {
t.c.Close()
}
// ServeDNS implements the plugin.Handler interface. // ServeDNS implements the plugin.Handler interface.
func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) { func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) {
@@ -49,8 +38,8 @@ func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg
m.SetReply(r) m.SetReply(r)
m.Answer = []dns.RR{&dns.A{ m.Answer = []dns.RR{&dns.A{
dns.RR_Header{Name: state.QName(), Rrtype: dns.TypeA, Class: dns.ClassINET, Ttl: 5}, Hdr: dns.RR_Header{Name: state.QName(), Rrtype: dns.TypeA, Class: dns.ClassINET, Ttl: 5},
addr, A: addr,
}} }}
w.WriteMsg(m) w.WriteMsg(m)

View File

@@ -14,7 +14,7 @@ type assignment struct {
version int // not sure what do with and if we should discard all clusters. version int // not sure what do with and if we should discard all clusters.
} }
func (a assignment) SetClusterLoadAssignment(cluster string, cla *xdspb.ClusterLoadAssignment) { func (a *assignment) SetClusterLoadAssignment(cluster string, cla *xdspb.ClusterLoadAssignment) {
// If cla is nil we just found a cluster, check if we already know about it, or if we need to make a new entry. // If cla is nil we just found a cluster, check if we already know about it, or if we need to make a new entry.
a.mu.Lock() a.mu.Lock()
defer a.mu.Unlock() defer a.mu.Unlock()
@@ -33,7 +33,7 @@ func (a assignment) SetClusterLoadAssignment(cluster string, cla *xdspb.ClusterL
} }
// ClusterLoadAssignment returns the healthy endpoints and their weight. // ClusterLoadAssignment returns the healthy endpoints and their weight.
func (a assignment) ClusterLoadAssignment(cluster string) *xdspb.ClusterLoadAssignment { func (a *assignment) ClusterLoadAssignment(cluster string) *xdspb.ClusterLoadAssignment {
a.mu.RLock() a.mu.RLock()
cla, ok := a.cla[cluster] cla, ok := a.cla[cluster]
a.mu.RUnlock() a.mu.RUnlock()
@@ -43,7 +43,7 @@ func (a assignment) ClusterLoadAssignment(cluster string) *xdspb.ClusterLoadAssi
return cla return cla
} }
func (a assignment) Clusters() []string { func (a *assignment) Clusters() []string {
a.mu.RLock() a.mu.RLock()
defer a.mu.RUnlock() defer a.mu.RUnlock()
clusters := make([]string, len(a.cla)) clusters := make([]string, len(a.cla))
@@ -57,7 +57,7 @@ func (a assignment) Clusters() []string {
// Select selects a backend from cla, using weighted random selection. It only selects // Select selects a backend from cla, using weighted random selection. It only selects
// backends that are reporting healthy. // backends that are reporting healthy.
func (a assignment) Select(cluster string) net.IP { func (a *assignment) Select(cluster string) net.IP {
cla := a.ClusterLoadAssignment(cluster) cla := a.ClusterLoadAssignment(cluster)
if cla == nil { if cla == nil {
return nil return nil
@@ -105,6 +105,5 @@ func (a assignment) Select(cluster string) net.IP {
} }
} }
} }
return nil return nil
} }

View File

@@ -43,10 +43,11 @@ const (
type adsStream adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient type adsStream adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient
// Client talks to the grpc manager's endpoint to get load assignments.
type Client struct { type Client struct {
cc *grpc.ClientConn cc *grpc.ClientConn
ctx context.Context ctx context.Context
assignments assignment assignments *assignment
node *corepb.Node node *corepb.Node
cancel context.CancelFunc cancel context.CancelFunc
stop chan struct{} stop chan struct{}
@@ -61,14 +62,16 @@ func New(addr, node string) (*Client, error) {
return nil, err return nil, err
} }
c := &Client{cc: cc, node: &corepb.Node{Id: "test-id"}} // do more with this node data? Hostname port?? c := &Client{cc: cc, node: &corepb.Node{Id: "test-id"}} // do more with this node data? Hostname port??
c.assignments = assignment{cla: make(map[string]*xdspb.ClusterLoadAssignment)} c.assignments = &assignment{cla: make(map[string]*xdspb.ClusterLoadAssignment)}
c.ctx, c.cancel = context.WithCancel(context.Background()) c.ctx, c.cancel = context.WithCancel(context.Background())
return c, nil return c, nil
} }
// Close closes a client performs cleanups.
func (c *Client) Close() { c.cancel(); c.cc.Close() } func (c *Client) Close() { c.cancel(); c.cc.Close() }
// Run runs the gRPC stream to the manager.
func (c *Client) Run() (adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient, error) { func (c *Client) Run() (adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient, error) {
cli := adsgrpc.NewAggregatedDiscoveryServiceClient(c.cc) cli := adsgrpc.NewAggregatedDiscoveryServiceClient(c.cc)
stream, err := cli.StreamAggregatedResources(c.ctx) stream, err := cli.StreamAggregatedResources(c.ctx)
@@ -78,6 +81,7 @@ func (c *Client) Run() (adsgrpc.AggregatedDiscoveryService_StreamAggregatedResou
return stream, nil return stream, nil
} }
// ClusterDiscovery sends a cluster DiscoveryRequest on the stream.
func (c *Client) ClusterDiscovery(stream adsStream, version, nonce string, clusters []string) error { func (c *Client) ClusterDiscovery(stream adsStream, version, nonce string, clusters []string) error {
req := &xdspb.DiscoveryRequest{ req := &xdspb.DiscoveryRequest{
Node: c.node, Node: c.node,
@@ -89,6 +93,7 @@ func (c *Client) ClusterDiscovery(stream adsStream, version, nonce string, clust
return stream.Send(req) return stream.Send(req)
} }
// EndpointDiscovery sends a endpoint DiscoveryRequest on the stream.
func (c *Client) EndpointDiscovery(stream adsStream, version, nonce string, clusters []string) error { func (c *Client) EndpointDiscovery(stream adsStream, version, nonce string, clusters []string) error {
req := &xdspb.DiscoveryRequest{ req := &xdspb.DiscoveryRequest{
Node: c.node, Node: c.node,
@@ -100,6 +105,7 @@ func (c *Client) EndpointDiscovery(stream adsStream, version, nonce string, clus
return stream.Send(req) return stream.Send(req)
} }
// Receive receives from the stream, it handled both cluster and endpoint DiscoveryResponses.
func (c *Client) Receive(stream adsStream) error { func (c *Client) Receive(stream adsStream) error {
for { for {
resp, err := stream.Recv() resp, err := stream.Recv()
@@ -157,5 +163,5 @@ func (c *Client) Receive(stream adsStream) error {
} }
} }
// Select is a small wrapper. bla bla, keeps assigmens private. // Select returns an address that is deemed to be the correct one for this cluster.
func (c *Client) Select(cluster string) net.IP { return c.assignments.Select(cluster) } func (c *Client) Select(cluster string) net.IP { return c.assignments.Select(cluster) }