Some cleanups

Signed-off-by: Miek Gieben <miek@miek.nl>
This commit is contained in:
Miek Gieben
2020-02-05 14:58:14 +01:00
parent b7c85feb05
commit a47aea02f8
2 changed files with 10 additions and 42 deletions

View File

@@ -135,8 +135,8 @@ we continue with step 4 above, ignoring any locality.
If monitoring is enabled (via the *prometheus* plugin) then the following metric are exported:
* `coredns_traffic_cluster_tracked{}` the number of tracked clusters.
* `coredns_traffic_endpoint_tracked{}` the number of tracked clusters.
* `coredns_traffic_clusters_tracked{}` the number of tracked clusters.
* `coredns_traffic_endpoints_tracked{}` the number of tracked clusters.
## Ready
@@ -162,7 +162,3 @@ localhost on port 18000. The node ID will be `test-id` and no TLS will be used.
Priority and locality information from ClusterLoadAssignments is not used. Multiple **TO** addresses
is not implemented. Credentials are not implemented.
## TODO
Node may only be set on the first request.

View File

@@ -23,7 +23,6 @@ package xds
import (
"context"
"fmt"
"os"
"sync"
"time"
@@ -35,7 +34,6 @@ import (
endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
xdspb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/golang/protobuf/ptypes"
structpb "github.com/golang/protobuf/ptypes/struct"
"google.golang.org/grpc"
)
@@ -69,17 +67,7 @@ func New(addr, node string, opts ...grpc.DialOption) (*Client, error) {
if err != nil {
return nil, err
}
hostname, _ := os.Hostname()
c := &Client{cc: cc, to: addr, node: &corepb.Node{Id: node,
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"HOSTNAME": {Kind: &structpb.Value_StringValue{StringValue: hostname}},
"BUILDV": {Kind: &structpb.Value_StringValue{StringValue: "CoreDNS"}},
"BUILDVERSION": {Kind: &structpb.Value_StringValue{StringValue: coremain.CoreVersion}},
},
},
},
}
c := &Client{cc: cc, to: addr, node: &corepb.Node{Id: node, UserAgentName: "CoreDNS", UserAgentVersionType: &corepb.Node_UserAgentVersion{UserAgentVersion: coremain.CoreVersion}}}
c.assignments = &assignment{cla: make(map[string]*endpointpb.ClusterLoadAssignment)}
c.version, c.nonce = make(map[string]string), make(map[string]string)
c.ctx, c.cancel = context.WithCancel(context.Background())
@@ -109,36 +97,18 @@ func (c *Client) Run() {
}
if first {
log.Infof("gRPC stream established to %q", c.to)
// send first request, to create stream, then wait for ADS to send us updates.
if err := c.clusterDiscovery(stream, c.Version(cdsURL), c.Nonce(cdsURL), []string{}); err != nil {
log.Debug(err)
}
log.Infof("gRPC stream established to %q", c.to) // might fail??
c.setSynced()
first = false
}
done := make(chan struct{})
go func() {
if err := c.clusterDiscovery(stream, c.Version(cdsURL), c.Nonce(cdsURL), []string{}); err != nil {
log.Debug(err)
}
tick := time.NewTicker(10 * time.Second)
for {
select {
case <-tick.C:
// send empty list for cluster discovery every 10 seconds
if err := c.clusterDiscovery(stream, c.Version(cdsURL), c.Nonce(cdsURL), []string{}); err != nil {
log.Debug(err)
}
case <-done:
tick.Stop()
return
}
}
}()
if err := c.receive(stream); err != nil {
log.Warning(err)
}
close(done)
}
}
@@ -211,9 +181,11 @@ func (c *Client) receive(stream adsStream) error {
}
cla, ok := any.Message.(*endpointpb.ClusterLoadAssignment)
if !ok {
// TODO warn/err here?
continue
}
c.assignments.SetClusterLoadAssignment(cla.GetClusterName(), cla)
}
// set our local administration and ack the reply. Empty version would signal NACK.
c.SetNonce(edsURL, resp.GetNonce())