diff --git a/plugin/traffic/xds/client.go b/plugin/traffic/xds/client.go index 1baeba408..aacbf61be 100644 --- a/plugin/traffic/xds/client.go +++ b/plugin/traffic/xds/client.go @@ -106,7 +106,7 @@ func (c *Client) Run() { done := make(chan struct{}) go func() { - tick := time.NewTicker(10 * time.Second) + tick := time.NewTicker(1 * time.Second) for { select { case <-tick.C: @@ -164,6 +164,7 @@ func (c *Client) Receive(stream adsStream) error { switch resp.GetTypeUrl() { case cdsURL: + a := &assignment{cla: make(map[string]*xdspb.ClusterLoadAssignment)} for _, r := range resp.GetResources() { var any ptypes.DynamicAny if err := ptypes.UnmarshalAny(r, &any); err != nil { @@ -174,24 +175,25 @@ func (c *Client) Receive(stream adsStream) error { if !ok { continue } - c.assignments.setClusterLoadAssignment(cluster.GetName(), nil) + a.setClusterLoadAssignment(cluster.GetName(), nil) } log.Debugf("Cluster discovery processed with %d resources", len(resp.GetResources())) // ack the CDS proto, with we we've got. (empty version would be NACK) - if err := c.clusterDiscovery(stream, resp.GetVersionInfo(), resp.GetNonce(), c.assignments.clusters()); err != nil { + if err := c.clusterDiscovery(stream, resp.GetVersionInfo(), resp.GetNonce(), a.clusters()); err != nil { log.Debug(err) continue } // need to figure out how to handle the versions and nounces exactly. + c.SetNonce(resp.GetNonce()) + c.SetAssignments(a) + // now kick off discovery for endpoints - if err := c.endpointDiscovery(stream, "", resp.GetNonce(), c.assignments.clusters()); err != nil { + if err := c.endpointDiscovery(stream, "", resp.GetNonce(), a.clusters()); err != nil { log.Debug(err) continue } - c.SetNonce(resp.GetNonce()) - case edsURL: for _, r := range resp.GetResources() { var any ptypes.DynamicAny diff --git a/plugin/traffic/xds/fields.go b/plugin/traffic/xds/fields.go new file mode 100644 index 000000000..3169254c0 --- /dev/null +++ b/plugin/traffic/xds/fields.go @@ -0,0 +1,25 @@ +package xds + +func (c *Client) Nonce() string { + c.mu.RLock() + defer c.mu.RUnlock() + return c.nonce +} + +func (c *Client) SetNonce(n string) { + c.mu.Lock() + defer c.mu.Unlock() + c.nonce = n +} + +func (c *Client) Assignments() *assignment { + c.mu.RLock() + defer c.mu.RUnlock() + return c.assignments +} + +func (c *Client) SetAssignments(a *assignment) { + c.mu.Lock() + defer c.mu.Unlock() + c.assignments = a +} diff --git a/plugin/traffic/xds/nonce.go b/plugin/traffic/xds/nonce.go deleted file mode 100644 index 1d2d18f4d..000000000 --- a/plugin/traffic/xds/nonce.go +++ /dev/null @@ -1,13 +0,0 @@ -package xds - -func (c *Client) Nonce() string { - c.mu.RLock() - defer c.mu.RUnlock() - return c.nonce -} - -func (c *Client) SetNonce(n string) { - c.mu.Lock() - defer c.mu.Unlock() - c.nonce = n -}