implement readiness

Signed-off-by: Miek Gieben <miek@miek.nl>
This commit is contained in:
Miek Gieben
2020-01-18 07:54:32 +01:00
parent 6fd06df9da
commit 4e38000ec2
7 changed files with 51 additions and 19 deletions

View File

@@ -40,7 +40,8 @@ traffic TO...
This enabled the *traffic* plugin, with a default node id of `coredns` and no TLS. This enabled the *traffic* plugin, with a default node id of `coredns` and no TLS.
* **TO...** are the Envoy control plane endpoint to connect to. This must start with `grpc://`. * **TO...** are the Envoy control plane endpoint to connect to. This must start with `grpc://`. The
port number defaults to 443.
The extended syntax is available is you want more control. The extended syntax is available is you want more control.
@@ -81,7 +82,8 @@ What metrics should we do? If any? Number of clusters? Number of endpoints and h
## Ready ## Ready
Should this plugin implement readiness? This plugin report readiness to the ready plugin. This will happen after a gRPC stream has been
established to an upstream.
## Examples ## Examples
@@ -123,7 +125,6 @@ Multiple **TO** addresses is not implemented.
## TODO ## TODO
* metrics? * metrics?
* more and better testing
* credentials (other than TLS) - how/what? * credentials (other than TLS) - how/what?
* is the protocol correctly implemented? Should we not have a 10s tick, but wait for responses from * is the protocol correctly implemented? Should we not have a 10s tick, but wait for responses from
the control plane? the control plane?

4
plugin/traffic/ready.go Normal file
View File

@@ -0,0 +1,4 @@
package traffic
// Ready implements the ready.Readiness interface.
func (t *Traffic) Ready() bool { return t.c.HasSynced() }

View File

@@ -117,6 +117,7 @@ func parseTraffic(c *caddy.Controller) (*Traffic, error) {
if t.c, err = xds.New(toHosts[0], node, opts...); err != nil { if t.c, err = xds.New(toHosts[0], node, opts...); err != nil {
return nil, err return nil, err
} }
t.to = toHosts[0]
return t, nil return t, nil
} }

View File

@@ -7,12 +7,10 @@ import (
) )
func TestSetup(t *testing.T) { func TestSetup(t *testing.T) {
/* c := caddy.NewTestController("dns", `traffic grpc://127.0.0.1`)
c := caddy.NewTestController("dns", `traffic grpc://bla`) if err := setup(c); err != nil {
if err := setup(c); err != nil { t.Fatalf("Test 1, expected no errors, but got: %q", err)
t.Fatalf("Test 1, expected no errors, but got: %q", err) }
}
*/
} }
func TestParseTraffic(t *testing.T) { func TestParseTraffic(t *testing.T) {

View File

@@ -52,14 +52,15 @@ type adsStream adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClien
type Client struct { type Client struct {
cc *grpc.ClientConn cc *grpc.ClientConn
ctx context.Context ctx context.Context
assignments *assignment // assignments contains the current clusters and endpoints.
node *corepb.Node node *corepb.Node
cancel context.CancelFunc cancel context.CancelFunc
stop chan struct{} stop chan struct{}
mu sync.RWMutex to string // upstream hosts, mostly here for logging purposes
mu sync.RWMutex // protects everything below
version map[string]string assignments *assignment // assignments contains the current clusters and endpoints
nonce map[string]string version map[string]string
nonce map[string]string
synced bool // true when we first successfully got a stream
} }
// New returns a new client that's dialed to addr using node as the local identifier. // New returns a new client that's dialed to addr using node as the local identifier.
@@ -92,6 +93,7 @@ func (c *Client) Stop() error { c.cancel(); return c.cc.Close() }
// Run starts all goroutines and gathers the clusters and endpoint information from the upstream manager. // Run starts all goroutines and gathers the clusters and endpoint information from the upstream manager.
func (c *Client) Run() { func (c *Client) Run() {
first := true
for { for {
select { select {
case <-c.ctx.Done(): case <-c.ctx.Done():
@@ -107,6 +109,12 @@ func (c *Client) Run() {
continue continue
} }
if first {
log.Info("gRPC stream established to %q", c.to)
c.setSynced()
first = false
}
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
if err := c.clusterDiscovery(stream, c.Version(cdsURL), c.Nonce(cdsURL), []string{}); err != nil { if err := c.clusterDiscovery(stream, c.Version(cdsURL), c.Nonce(cdsURL), []string{}); err != nil {
@@ -128,7 +136,7 @@ func (c *Client) Run() {
} }
}() }()
if err := c.Receive(stream); err != nil { if err := c.receive(stream); err != nil {
log.Warning(err) log.Warning(err)
} }
close(done) close(done)
@@ -159,8 +167,8 @@ func (c *Client) endpointDiscovery(stream adsStream, version, nonce string, clus
return stream.Send(req) return stream.Send(req)
} }
// Receive receives from the stream, it handled both cluster and endpoint DiscoveryResponses. // receive receives from the stream, it handled both cluster and endpoint DiscoveryResponses.
func (c *Client) Receive(stream adsStream) error { func (c *Client) receive(stream adsStream) error {
for { for {
resp, err := stream.Recv() resp, err := stream.Recv()
if err != nil { if err != nil {
@@ -182,7 +190,7 @@ func (c *Client) Receive(stream adsStream) error {
} }
a.SetClusterLoadAssignment(cluster.GetName(), nil) a.SetClusterLoadAssignment(cluster.GetName(), nil)
} }
log.Debugf("Cluster discovery processed with %d resources, version %q and nonce %q, clusters: %v", len(resp.GetResources()), c.Version(cdsURL), c.Nonce(cdsURL), a.clusters()) log.Debugf("Cluster discovery processed with %d resources, version %q and nonce %q", len(resp.GetResources()), c.Version(cdsURL), c.Nonce(cdsURL))
// set our local administration and ack the reply. Empty version would signal NACK. // set our local administration and ack the reply. Empty version would signal NACK.
c.SetNonce(cdsURL, resp.GetNonce()) c.SetNonce(cdsURL, resp.GetNonce())
c.SetVersion(cdsURL, resp.GetVersionInfo()) c.SetVersion(cdsURL, resp.GetVersionInfo())
@@ -206,7 +214,7 @@ func (c *Client) Receive(stream adsStream) error {
} }
c.assignments.SetClusterLoadAssignment(cla.GetClusterName(), cla) c.assignments.SetClusterLoadAssignment(cla.GetClusterName(), cla)
} }
log.Debugf("Endpoint discovery processed with %d resources, version %q and nonce %q, clusters: %v", len(resp.GetResources()), c.Version(edsURL), c.Nonce(edsURL), c.assignments.clusters()) log.Debugf("Endpoint discovery processed with %d resources, version %q and nonce %q", len(resp.GetResources()), c.Version(edsURL), c.Nonce(edsURL))
// set our local administration and ack the reply. Empty version would signal NACK. // set our local administration and ack the reply. Empty version would signal NACK.
c.SetNonce(edsURL, resp.GetNonce()) c.SetNonce(edsURL, resp.GetNonce())
c.SetVersion(edsURL, resp.GetVersionInfo()) c.SetVersion(edsURL, resp.GetVersionInfo())

View File

@@ -1,37 +1,57 @@
package xds package xds
// Assignment returns the current assignment map.
func (c *Client) Assignments() *assignment { func (c *Client) Assignments() *assignment {
c.mu.RLock() c.mu.RLock()
defer c.mu.RUnlock() defer c.mu.RUnlock()
return c.assignments return c.assignments
} }
// SetAssignment sets the assignment map.
func (c *Client) SetAssignments(a *assignment) { func (c *Client) SetAssignments(a *assignment) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
c.assignments = a c.assignments = a
} }
// Version returns the last version seen from the API for this typeURL.
func (c *Client) Version(typeURL string) string { func (c *Client) Version(typeURL string) string {
c.mu.RLock() c.mu.RLock()
defer c.mu.RUnlock() defer c.mu.RUnlock()
return c.version[typeURL] return c.version[typeURL]
} }
// SetVersion sets the version for this typeURL.
func (c *Client) SetVersion(typeURL, a string) { func (c *Client) SetVersion(typeURL, a string) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
c.version[typeURL] = a c.version[typeURL] = a
} }
// Nonce returns the last nonce seen from the API for this typeURL.
func (c *Client) Nonce(typeURL string) string { func (c *Client) Nonce(typeURL string) string {
c.mu.RLock() c.mu.RLock()
defer c.mu.RUnlock() defer c.mu.RUnlock()
return c.nonce[typeURL] return c.nonce[typeURL]
} }
// SetNonce sets the nonce. for this typeURL.
func (c *Client) SetNonce(typeURL, n string) { func (c *Client) SetNonce(typeURL, n string) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
c.nonce[typeURL] = n c.nonce[typeURL] = n
} }
// SetSynced sets the synced boolean to true.
func (c *Client) setSynced() {
c.mu.Lock()
defer c.mu.Unlock()
c.synced = true
}
// Synced return true if the clients has synced.
func (c *Client) HasSynced() bool {
c.mu.RLock()
defer c.mu.RUnlock()
return c.synced
}

Binary file not shown.