diff --git a/plugin/traffic/README.md b/plugin/traffic/README.md index 94f0a99bc..4d9af7bf4 100644 --- a/plugin/traffic/README.md +++ b/plugin/traffic/README.md @@ -40,7 +40,8 @@ traffic TO... This enabled the *traffic* plugin, with a default node id of `coredns` and no TLS. -* **TO...** are the Envoy control plane endpoint to connect to. This must start with `grpc://`. +* **TO...** are the Envoy control plane endpoint to connect to. This must start with `grpc://`. The + port number defaults to 443. The extended syntax is available is you want more control. @@ -81,7 +82,8 @@ What metrics should we do? If any? Number of clusters? Number of endpoints and h ## Ready -Should this plugin implement readiness? +This plugin report readiness to the ready plugin. This will happen after a gRPC stream has been +established to an upstream. ## Examples @@ -123,7 +125,6 @@ Multiple **TO** addresses is not implemented. ## TODO * metrics? -* more and better testing * credentials (other than TLS) - how/what? * is the protocol correctly implemented? Should we not have a 10s tick, but wait for responses from the control plane? diff --git a/plugin/traffic/ready.go b/plugin/traffic/ready.go new file mode 100644 index 000000000..f430f82cc --- /dev/null +++ b/plugin/traffic/ready.go @@ -0,0 +1,4 @@ +package traffic + +// Ready implements the ready.Readiness interface. +func (t *Traffic) Ready() bool { return t.c.HasSynced() } diff --git a/plugin/traffic/setup.go b/plugin/traffic/setup.go index bfd9683a4..83f68bbf3 100644 --- a/plugin/traffic/setup.go +++ b/plugin/traffic/setup.go @@ -117,6 +117,7 @@ func parseTraffic(c *caddy.Controller) (*Traffic, error) { if t.c, err = xds.New(toHosts[0], node, opts...); err != nil { return nil, err } + t.to = toHosts[0] return t, nil } diff --git a/plugin/traffic/setup_test.go b/plugin/traffic/setup_test.go index 99403fbcd..de7056a32 100644 --- a/plugin/traffic/setup_test.go +++ b/plugin/traffic/setup_test.go @@ -7,12 +7,10 @@ import ( ) func TestSetup(t *testing.T) { - /* - c := caddy.NewTestController("dns", `traffic grpc://bla`) - if err := setup(c); err != nil { - t.Fatalf("Test 1, expected no errors, but got: %q", err) - } - */ + c := caddy.NewTestController("dns", `traffic grpc://127.0.0.1`) + if err := setup(c); err != nil { + t.Fatalf("Test 1, expected no errors, but got: %q", err) + } } func TestParseTraffic(t *testing.T) { diff --git a/plugin/traffic/xds/client.go b/plugin/traffic/xds/client.go index 5b3d736a5..45810a383 100644 --- a/plugin/traffic/xds/client.go +++ b/plugin/traffic/xds/client.go @@ -52,14 +52,15 @@ type adsStream adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClien type Client struct { cc *grpc.ClientConn ctx context.Context - assignments *assignment // assignments contains the current clusters and endpoints. node *corepb.Node cancel context.CancelFunc stop chan struct{} - mu sync.RWMutex - - version map[string]string - nonce map[string]string + to string // upstream hosts, mostly here for logging purposes + mu sync.RWMutex // protects everything below + assignments *assignment // assignments contains the current clusters and endpoints + version map[string]string + nonce map[string]string + synced bool // true when we first successfully got a stream } // New returns a new client that's dialed to addr using node as the local identifier. @@ -92,6 +93,7 @@ func (c *Client) Stop() error { c.cancel(); return c.cc.Close() } // Run starts all goroutines and gathers the clusters and endpoint information from the upstream manager. func (c *Client) Run() { + first := true for { select { case <-c.ctx.Done(): @@ -107,6 +109,12 @@ func (c *Client) Run() { continue } + if first { + log.Info("gRPC stream established to %q", c.to) + c.setSynced() + first = false + } + done := make(chan struct{}) go func() { if err := c.clusterDiscovery(stream, c.Version(cdsURL), c.Nonce(cdsURL), []string{}); err != nil { @@ -128,7 +136,7 @@ func (c *Client) Run() { } }() - if err := c.Receive(stream); err != nil { + if err := c.receive(stream); err != nil { log.Warning(err) } close(done) @@ -159,8 +167,8 @@ func (c *Client) endpointDiscovery(stream adsStream, version, nonce string, clus return stream.Send(req) } -// Receive receives from the stream, it handled both cluster and endpoint DiscoveryResponses. -func (c *Client) Receive(stream adsStream) error { +// receive receives from the stream, it handled both cluster and endpoint DiscoveryResponses. +func (c *Client) receive(stream adsStream) error { for { resp, err := stream.Recv() if err != nil { @@ -182,7 +190,7 @@ func (c *Client) Receive(stream adsStream) error { } a.SetClusterLoadAssignment(cluster.GetName(), nil) } - log.Debugf("Cluster discovery processed with %d resources, version %q and nonce %q, clusters: %v", len(resp.GetResources()), c.Version(cdsURL), c.Nonce(cdsURL), a.clusters()) + log.Debugf("Cluster discovery processed with %d resources, version %q and nonce %q", len(resp.GetResources()), c.Version(cdsURL), c.Nonce(cdsURL)) // set our local administration and ack the reply. Empty version would signal NACK. c.SetNonce(cdsURL, resp.GetNonce()) c.SetVersion(cdsURL, resp.GetVersionInfo()) @@ -206,7 +214,7 @@ func (c *Client) Receive(stream adsStream) error { } c.assignments.SetClusterLoadAssignment(cla.GetClusterName(), cla) } - log.Debugf("Endpoint discovery processed with %d resources, version %q and nonce %q, clusters: %v", len(resp.GetResources()), c.Version(edsURL), c.Nonce(edsURL), c.assignments.clusters()) + log.Debugf("Endpoint discovery processed with %d resources, version %q and nonce %q", len(resp.GetResources()), c.Version(edsURL), c.Nonce(edsURL)) // set our local administration and ack the reply. Empty version would signal NACK. c.SetNonce(edsURL, resp.GetNonce()) c.SetVersion(edsURL, resp.GetVersionInfo()) diff --git a/plugin/traffic/xds/fields.go b/plugin/traffic/xds/fields.go index 9dac60cbe..af4411a8b 100644 --- a/plugin/traffic/xds/fields.go +++ b/plugin/traffic/xds/fields.go @@ -1,37 +1,57 @@ package xds +// Assignment returns the current assignment map. func (c *Client) Assignments() *assignment { c.mu.RLock() defer c.mu.RUnlock() return c.assignments } +// SetAssignment sets the assignment map. func (c *Client) SetAssignments(a *assignment) { c.mu.Lock() defer c.mu.Unlock() c.assignments = a } +// Version returns the last version seen from the API for this typeURL. func (c *Client) Version(typeURL string) string { c.mu.RLock() defer c.mu.RUnlock() return c.version[typeURL] } +// SetVersion sets the version for this typeURL. func (c *Client) SetVersion(typeURL, a string) { c.mu.Lock() defer c.mu.Unlock() c.version[typeURL] = a } +// Nonce returns the last nonce seen from the API for this typeURL. func (c *Client) Nonce(typeURL string) string { c.mu.RLock() defer c.mu.RUnlock() return c.nonce[typeURL] } +// SetNonce sets the nonce. for this typeURL. func (c *Client) SetNonce(typeURL, n string) { c.mu.Lock() defer c.mu.Unlock() c.nonce[typeURL] = n } + +// SetSynced sets the synced boolean to true. +func (c *Client) setSynced() { + c.mu.Lock() + defer c.mu.Unlock() + c.synced = true +} + +// Synced return true if the clients has synced. +func (c *Client) HasSynced() bool { + c.mu.RLock() + defer c.mu.RUnlock() + return c.synced +} diff --git a/plugin/traffic/xds/test b/plugin/traffic/xds/test deleted file mode 100755 index 424ebaf7a..000000000 Binary files a/plugin/traffic/xds/test and /dev/null differ