diff --git a/plugin/traffic/README.md b/plugin/traffic/README.md index 98297be4d..da92e82e9 100644 --- a/plugin/traffic/README.md +++ b/plugin/traffic/README.md @@ -47,11 +47,7 @@ is smart enough to select the best one. When SRV records are returned, the endpo are synthesized `endpoint-..` that carries the IP address. Querying for these synthesized names works as well. -[gRPC LB SRV records](https://github.com/grpc/proposal/blob/master/A5-grpclb-in-dns.md) are -supported and returned by the *traffic* plugin for all clusters. The returned endpoints are, -however, the ones from the management cluster. - -*Traffic* implements version 3 of the xDS API. It works with the management server as written in +*Traffic* implements version 2 of the xDS API. It works with the management server as written in . ## Syntax diff --git a/plugin/traffic/grpc_lb.go b/plugin/traffic/grpc_lb.go deleted file mode 100644 index 2b27d20a3..000000000 --- a/plugin/traffic/grpc_lb.go +++ /dev/null @@ -1,19 +0,0 @@ -package traffic - -import ( - "fmt" - - "github.com/miekg/dns" -) - -// See https://github.com/grpc/grpc/blob/master/doc/service_config.md for the fields in this proto. -// We encode it as json and return it in a TXT field. -// TOOD(miek): balancer name should not be hardcoded -var lbTXT = `grpc_config=[{"serviceConfig":{"loadBalancingConfig":[{"eds_experimental":{"Cluster": "xds", "EDSServiceName":"%s", "BalancerName":"xds"}}]}}]` - -func txt(z, cluster string) []dns.RR { - return []dns.RR{&dns.TXT{ - Hdr: dns.RR_Header{Name: z, Rrtype: dns.TypeTXT, Class: dns.ClassINET, Ttl: 5}, - Txt: []string{fmt.Sprintf(lbTXT, cluster)}, - }} -} diff --git a/plugin/traffic/setup_test.go b/plugin/traffic/setup_test.go index b8a9b7042..8649801de 100644 --- a/plugin/traffic/setup_test.go +++ b/plugin/traffic/setup_test.go @@ -1,7 +1,6 @@ package traffic import ( - "encoding/json" "testing" "github.com/caddyserver/caddy" @@ -14,17 +13,6 @@ func TestSetup(t *testing.T) { } } -func TestLBTxt(t *testing.T) { - for _, txt := range []string{lbTXT} { - if _, err := json.Marshal(txt); err != nil { - t.Errorf("Failed to marshal grpc serverConfig: %s", err) - } - if len(txt) > 255 { - t.Errorf("Too long grpc serverConfig (>255): %d", len(txt)) - } - } -} - func TestParseTraffic(t *testing.T) { tests := []struct { input string diff --git a/plugin/traffic/traffic.go b/plugin/traffic/traffic.go index fbc57d101..1bf36006d 100644 --- a/plugin/traffic/traffic.go +++ b/plugin/traffic/traffic.go @@ -54,28 +54,10 @@ func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg // ok this cluster doesn't exist, potentially due to extra labels, which may be garbage or legit queries: // legit is: // endpoint-N.cluster - // _grpclb._tcp.cluster // _tcp.cluster - // _grpc_config.cluster (singled out here, but not handled) labels := dns.SplitDomainName(cluster) switch len(labels) { case 2: - // endpoint or _tcp or _grpc_config query - if strings.ToLower(labels[0]) == "_tcp" { - // nodata, because empty non-terminal - m.Ns = soa(state.Zone) - m.Rcode = dns.RcodeSuccess - w.WriteMsg(m) - return 0, nil - } - if strings.HasPrefix(strings.ToLower(labels[0]), "_grpc_config") { - // this is the grpc config blob encoded in a TXT record, see documentation for lbTXT. - m.Answer = txt(state.Zone, labels[1]) // 1 is the cluster - m.Rcode = dns.RcodeSuccess - w.WriteMsg(m) - return 0, nil - - } if strings.HasPrefix(strings.ToLower(labels[0]), "endpoint-") { // recheck if the cluster exist. cluster = labels[1] @@ -88,24 +70,6 @@ func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg } return t.serveEndpoint(ctx, state, labels[0], cluster, healthy) } - case 3: - if strings.ToLower(labels[0]) != "_grpclb" || strings.ToLower(labels[1]) != "_tcp" { - m.Ns = soa(state.Zone) - m.Rcode = dns.RcodeNameError - w.WriteMsg(m) - return 0, nil - } - // OK, _grcplb._tcp query; we need to return the endpoint for the cluster in this query - cluster = labels[2] - sockaddr, ok = t.c.Select(cluster, healthy) - if !ok { - // nodata error when this cluster doesn't exist. - m.Ns = soa(state.Zone) - m.Rcode = dns.RcodeSuccess - w.WriteMsg(m) - return 0, nil - } - break default: m.Ns = soa(state.Zone) m.Rcode = dns.RcodeNameError diff --git a/plugin/traffic/traffic_test.go b/plugin/traffic/traffic_test.go index d05eb68ea..81c418046 100644 --- a/plugin/traffic/traffic_test.go +++ b/plugin/traffic/traffic_test.go @@ -9,8 +9,9 @@ import ( "github.com/coredns/coredns/plugin/test" "github.com/coredns/coredns/plugin/traffic/xds" - corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + xdspb2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" + corepb2 "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + endpointpb2 "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" "github.com/miekg/dns" "google.golang.org/grpc" ) @@ -23,7 +24,7 @@ func TestTraffic(t *testing.T) { tr := &Traffic{c: c, origins: []string{"lb.example.org."}} tests := []struct { - cla *endpointpb.ClusterLoadAssignment + cla *xdspb2.ClusterLoadAssignment cluster string qtype uint16 rcode int @@ -31,98 +32,98 @@ func TestTraffic(t *testing.T) { ns bool // should there be a ns section. }{ { - cla: &endpointpb.ClusterLoadAssignment{}, + cla: &xdspb2.ClusterLoadAssignment{}, cluster: "web", qtype: dns.TypeA, rcode: dns.RcodeSuccess, ns: true, }, { - cla: &endpointpb.ClusterLoadAssignment{}, + cla: &xdspb2.ClusterLoadAssignment{}, cluster: "web", qtype: dns.TypeSRV, rcode: dns.RcodeSuccess, ns: true, }, { - cla: &endpointpb.ClusterLoadAssignment{}, + cla: &xdspb2.ClusterLoadAssignment{}, cluster: "does-not-exist", qtype: dns.TypeA, rcode: dns.RcodeNameError, ns: true, }, // healthy endpoint { - cla: &endpointpb.ClusterLoadAssignment{ + cla: &xdspb2.ClusterLoadAssignment{ ClusterName: "web", - Endpoints: endpoints([]EndpointHealth{{"127.0.0.1", 18008, corepb.HealthStatus_HEALTHY}}), + Endpoints: endpoints([]EndpointHealth{{"127.0.0.1", 18008, corepb2.HealthStatus_HEALTHY}}), }, cluster: "web", qtype: dns.TypeA, rcode: dns.RcodeSuccess, answer: "127.0.0.1", }, { - cla: &endpointpb.ClusterLoadAssignment{ + cla: &xdspb2.ClusterLoadAssignment{ ClusterName: "web", - Endpoints: endpoints([]EndpointHealth{{"::1", 18008, corepb.HealthStatus_HEALTHY}}), + Endpoints: endpoints([]EndpointHealth{{"::1", 18008, corepb2.HealthStatus_HEALTHY}}), }, cluster: "web", qtype: dns.TypeAAAA, rcode: dns.RcodeSuccess, answer: "::1", }, // unknown endpoint { - cla: &endpointpb.ClusterLoadAssignment{ + cla: &xdspb2.ClusterLoadAssignment{ ClusterName: "web", - Endpoints: endpoints([]EndpointHealth{{"127.0.0.1", 18008, corepb.HealthStatus_UNKNOWN}}), + Endpoints: endpoints([]EndpointHealth{{"127.0.0.1", 18008, corepb2.HealthStatus_UNKNOWN}}), }, cluster: "web", qtype: dns.TypeA, rcode: dns.RcodeSuccess, ns: true, }, // unknown endpoint and healthy endpoint { - cla: &endpointpb.ClusterLoadAssignment{ + cla: &xdspb2.ClusterLoadAssignment{ ClusterName: "web", Endpoints: endpoints([]EndpointHealth{ - {"127.0.0.1", 18008, corepb.HealthStatus_UNKNOWN}, - {"127.0.0.2", 18008, corepb.HealthStatus_HEALTHY}, + {"127.0.0.1", 18008, corepb2.HealthStatus_UNKNOWN}, + {"127.0.0.2", 18008, corepb2.HealthStatus_HEALTHY}, }), }, cluster: "web", qtype: dns.TypeA, rcode: dns.RcodeSuccess, answer: "127.0.0.2", }, // unknown endpoint and healthy endpoint, TXT query { - cla: &endpointpb.ClusterLoadAssignment{ + cla: &xdspb2.ClusterLoadAssignment{ ClusterName: "web", Endpoints: endpoints([]EndpointHealth{ - {"127.0.0.1", 18008, corepb.HealthStatus_UNKNOWN}, + {"127.0.0.1", 18008, corepb2.HealthStatus_UNKNOWN}, }), }, cluster: "web", qtype: dns.TypeTXT, rcode: dns.RcodeSuccess, answer: "endpoint-0.web.lb.example.org.", }, // SRV query healthy endpoint { - cla: &endpointpb.ClusterLoadAssignment{ + cla: &xdspb2.ClusterLoadAssignment{ ClusterName: "web", Endpoints: endpoints([]EndpointHealth{ - {"127.0.0.2", 18008, corepb.HealthStatus_HEALTHY}, + {"127.0.0.2", 18008, corepb2.HealthStatus_HEALTHY}, }), }, cluster: "web", qtype: dns.TypeSRV, rcode: dns.RcodeSuccess, answer: "endpoint-0.web.lb.example.org.", }, // A query for endpoint-0. { - cla: &endpointpb.ClusterLoadAssignment{ + cla: &xdspb2.ClusterLoadAssignment{ ClusterName: "web", Endpoints: endpoints([]EndpointHealth{ - {"127.0.0.2", 18008, corepb.HealthStatus_HEALTHY}, + {"127.0.0.2", 18008, corepb2.HealthStatus_HEALTHY}, }), }, cluster: "endpoint-0.web", qtype: dns.TypeA, rcode: dns.RcodeSuccess, answer: "127.0.0.2", }, // A query for endpoint-1. { - cla: &endpointpb.ClusterLoadAssignment{ + cla: &xdspb2.ClusterLoadAssignment{ ClusterName: "web", Endpoints: endpoints([]EndpointHealth{ - {"127.0.0.2", 18008, corepb.HealthStatus_HEALTHY}, - {"127.0.0.3", 18008, corepb.HealthStatus_HEALTHY}, + {"127.0.0.2", 18008, corepb2.HealthStatus_HEALTHY}, + {"127.0.0.3", 18008, corepb2.HealthStatus_HEALTHY}, }), }, cluster: "endpoint-1.web", qtype: dns.TypeA, rcode: dns.RcodeSuccess, answer: "127.0.0.3", }, // TXT query for _grpc_config { - cla: &endpointpb.ClusterLoadAssignment{ + cla: &xdspb2.ClusterLoadAssignment{ ClusterName: "web", Endpoints: endpoints([]EndpointHealth{ - {"127.0.0.2", 18008, corepb.HealthStatus_HEALTHY}, + {"127.0.0.2", 18008, corepb2.HealthStatus_HEALTHY}, }), }, cluster: "_grpc_config.web", qtype: dns.TypeTXT, rcode: dns.RcodeSuccess, @@ -182,7 +183,7 @@ func TestTrafficSRV(t *testing.T) { tr := &Traffic{c: c, origins: []string{"lb.example.org."}} tests := []struct { - cla *endpointpb.ClusterLoadAssignment + cla *xdspb2.ClusterLoadAssignment cluster string qtype uint16 rcode int @@ -190,11 +191,11 @@ func TestTrafficSRV(t *testing.T) { }{ // SRV query healthy endpoint { - cla: &endpointpb.ClusterLoadAssignment{ + cla: &xdspb2.ClusterLoadAssignment{ ClusterName: "web", Endpoints: endpoints([]EndpointHealth{ - {"127.0.0.2", 18008, corepb.HealthStatus_HEALTHY}, - {"127.0.0.3", 18008, corepb.HealthStatus_HEALTHY}, + {"127.0.0.2", 18008, corepb2.HealthStatus_HEALTHY}, + {"127.0.0.3", 18008, corepb2.HealthStatus_HEALTHY}, }), }, cluster: "web", qtype: dns.TypeSRV, rcode: dns.RcodeSuccess, answer: 2, @@ -226,76 +227,33 @@ func TestTrafficSRV(t *testing.T) { } } -func TestTrafficManagement(t *testing.T) { - c, err := xds.New("127.0.0.1:0", "test-id", grpc.WithInsecure()) - if err != nil { - t.Fatal(err) - } - tr := &Traffic{c: c, origins: []string{"lb.example.org."}, mgmt: "xds"} - - for _, cla := range []*endpointpb.ClusterLoadAssignment{ - &endpointpb.ClusterLoadAssignment{ - ClusterName: "web", - Endpoints: endpoints([]EndpointHealth{{"127.0.0.1", 18008, corepb.HealthStatus_HEALTHY}}), - }, - &endpointpb.ClusterLoadAssignment{ - ClusterName: "xds", - Endpoints: endpoints([]EndpointHealth{{"::1", 18008, corepb.HealthStatus_HEALTHY}}), - }, - } { - a := xds.NewAssignment() - a.SetClusterLoadAssignment(cla.ClusterName, cla) - c.SetAssignments(a) - } - ctx := context.TODO() - - // Now we ask for the grpclb endpoint in the web cluster, this should give us the endpoint of the xds (mgmt) cluster. - // ; ANSWER SECTION: - // _grpclb._tcp.web.lb.example.org. 5 IN SRV 100 100 18008 endpoint-0.xds.lb.example.org. - // ;; ADDITIONAL SECTION: - // endpoint-0.xds.lb.example.org. 5 IN AAAA ::1 - - m := new(dns.Msg) - m.SetQuestion("_grpclb._tcp.web.lb.example.org.", dns.TypeSRV) - rec := dnstest.NewRecorder(&test.ResponseWriter{}) - if _, err := tr.ServeDNS(ctx, rec, m); err != nil { - t.Errorf("Expected no error, but got %q", err) - } - if len(rec.Msg.Answer) == 0 { - t.Fatalf("Expected answer section, got none") - } - if x := rec.Msg.Answer[0].(*dns.SRV).Target; x != "endpoint-0.xds.lb.example.org." { - t.Errorf("Expected %s, got %s", "endpoint-0.xds.lb.example.org.", x) - } -} - type EndpointHealth struct { Address string Port uint16 - Health corepb.HealthStatus + Health corepb2.HealthStatus } -func endpoints(e []EndpointHealth) []*endpointpb.LocalityLbEndpoints { +func endpoints(e []EndpointHealth) []*endpointpb2.LocalityLbEndpoints { return endpointsWithLocality(e, xds.Locality{}) } -func endpointsWithLocality(e []EndpointHealth, loc xds.Locality) []*endpointpb.LocalityLbEndpoints { - ep := make([]*endpointpb.LocalityLbEndpoints, len(e)) +func endpointsWithLocality(e []EndpointHealth, loc xds.Locality) []*endpointpb2.LocalityLbEndpoints { + ep := make([]*endpointpb2.LocalityLbEndpoints, len(e)) for i := range e { - ep[i] = &endpointpb.LocalityLbEndpoints{ - Locality: &corepb.Locality{ + ep[i] = &endpointpb2.LocalityLbEndpoints{ + Locality: &corepb2.Locality{ Region: loc.Region, Zone: loc.Zone, SubZone: loc.SubZone, }, - LbEndpoints: []*endpointpb.LbEndpoint{{ - HostIdentifier: &endpointpb.LbEndpoint_Endpoint{ - Endpoint: &endpointpb.Endpoint{ - Address: &corepb.Address{ - Address: &corepb.Address_SocketAddress{ - SocketAddress: &corepb.SocketAddress{ + LbEndpoints: []*endpointpb2.LbEndpoint{{ + HostIdentifier: &endpointpb2.LbEndpoint_Endpoint{ + Endpoint: &endpointpb2.Endpoint{ + Address: &corepb2.Address{ + Address: &corepb2.Address_SocketAddress{ + SocketAddress: &corepb2.SocketAddress{ Address: e[i].Address, - PortSpecifier: &corepb.SocketAddress_PortValue{ + PortSpecifier: &corepb2.SocketAddress_PortValue{ PortValue: uint32(e[i].Port), }, }, diff --git a/plugin/traffic/xds/assignment.go b/plugin/traffic/xds/assignment.go index cb80d9df4..8852dcb96 100644 --- a/plugin/traffic/xds/assignment.go +++ b/plugin/traffic/xds/assignment.go @@ -5,14 +5,14 @@ import ( "net" "sync" - corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + xdspb2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" + corepb2 "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" ) -// SocketAddress holds a corepb.SocketAddress and a health status +// SocketAddress holds a corepb2.SocketAddress and a health status type SocketAddress struct { - *corepb.SocketAddress - Health corepb.HealthStatus + *corepb2.SocketAddress + Health corepb2.HealthStatus } // Address returns the address from s. @@ -23,16 +23,16 @@ func (s *SocketAddress) Port() uint16 { return uint16(s.GetPortValue()) } type assignment struct { mu sync.RWMutex - cla map[string]*endpointpb.ClusterLoadAssignment + cla map[string]*xdspb2.ClusterLoadAssignment } // NewAssignment returns a pointer to an assignment. func NewAssignment() *assignment { - return &assignment{cla: make(map[string]*endpointpb.ClusterLoadAssignment)} + return &assignment{cla: make(map[string]*xdspb2.ClusterLoadAssignment)} } // SetClusterLoadAssignment sets the assignment for the cluster to cla. -func (a *assignment) SetClusterLoadAssignment(cluster string, cla *endpointpb.ClusterLoadAssignment) { +func (a *assignment) SetClusterLoadAssignment(cluster string, cla *xdspb2.ClusterLoadAssignment) { // If cla is nil we just found a cluster, check if we already know about it, or if we need to make a new entry. a.mu.Lock() defer a.mu.Unlock() @@ -49,7 +49,7 @@ func (a *assignment) SetClusterLoadAssignment(cluster string, cla *endpointpb.Cl } // ClusterLoadAssignment returns the assignment for the cluster or nil if there is none. -func (a *assignment) ClusterLoadAssignment(cluster string) *endpointpb.ClusterLoadAssignment { +func (a *assignment) ClusterLoadAssignment(cluster string) *xdspb2.ClusterLoadAssignment { a.mu.RLock() cla, ok := a.cla[cluster] a.mu.RUnlock() @@ -82,7 +82,7 @@ func (a *assignment) Select(cluster string, healthy bool) (*SocketAddress, bool) health := 0 for _, ep := range cla.Endpoints { for _, lb := range ep.GetLbEndpoints() { - if healthy && lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY { + if healthy && lb.GetHealthStatus() != corepb2.HealthStatus_HEALTHY { continue } weight += int(lb.GetLoadBalancingWeight().GetValue()) @@ -99,7 +99,7 @@ func (a *assignment) Select(cluster string, healthy bool) (*SocketAddress, bool) i := 0 for _, ep := range cla.Endpoints { for _, lb := range ep.GetLbEndpoints() { - if healthy && lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY { + if healthy && lb.GetHealthStatus() != corepb2.HealthStatus_HEALTHY { continue } if r == i { @@ -114,7 +114,7 @@ func (a *assignment) Select(cluster string, healthy bool) (*SocketAddress, bool) r := rand.Intn(health) + 1 for _, ep := range cla.Endpoints { for _, lb := range ep.GetLbEndpoints() { - if healthy && lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY { + if healthy && lb.GetHealthStatus() != corepb2.HealthStatus_HEALTHY { continue } r -= int(lb.GetLoadBalancingWeight().GetValue()) @@ -136,7 +136,7 @@ func (a *assignment) All(cluster string, healthy bool) ([]*SocketAddress, bool) sa := []*SocketAddress{} for _, ep := range cla.Endpoints { for _, lb := range ep.GetLbEndpoints() { - if healthy && lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY { + if healthy && lb.GetHealthStatus() != corepb2.HealthStatus_HEALTHY { continue } sa = append(sa, &SocketAddress{lb.GetEndpoint().GetAddress().GetSocketAddress(), lb.GetHealthStatus()}) diff --git a/plugin/traffic/xds/client.go b/plugin/traffic/xds/client.go index 46e980504..c55a779e0 100644 --- a/plugin/traffic/xds/client.go +++ b/plugin/traffic/xds/client.go @@ -22,16 +22,14 @@ package xds import ( "context" - "fmt" "sync" "github.com/coredns/coredns/coremain" clog "github.com/coredns/coredns/plugin/pkg/log" - clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" - corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" - endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" - xdspb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + xdspb2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" + corepb2 "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + adspb2 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" "github.com/golang/protobuf/ptypes" "google.golang.org/grpc" ) @@ -39,17 +37,17 @@ import ( var log = clog.NewWithPlugin("traffic: xds") const ( - cdsURL = "type.googleapis.com/envoy.config.cluster.v3.Cluster" - edsURL = "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment" + clusterType = "type.googleapis.com/envoy.api.v2.Cluster" + endpointType = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment" ) -type adsStream xdspb.AggregatedDiscoveryService_StreamAggregatedResourcesClient +type adsStream adspb2.AggregatedDiscoveryService_StreamAggregatedResourcesClient // Client talks to the grpc manager's endpoint to get load assignments. type Client struct { cc *grpc.ClientConn ctx context.Context - node *corepb.Node + node *corepb2.Node cancel context.CancelFunc stop chan struct{} to string // upstream hosts, mostly here for logging purposes @@ -66,8 +64,8 @@ func New(addr, node string, opts ...grpc.DialOption) (*Client, error) { if err != nil { return nil, err } - c := &Client{cc: cc, to: addr, node: &corepb.Node{Id: node, UserAgentName: "CoreDNS", UserAgentVersionType: &corepb.Node_UserAgentVersion{UserAgentVersion: coremain.CoreVersion}}} - c.assignments = &assignment{cla: make(map[string]*endpointpb.ClusterLoadAssignment)} + c := &Client{cc: cc, to: addr, node: &corepb2.Node{Id: node, UserAgentName: "CoreDNS", UserAgentVersionType: &corepb2.Node_UserAgentVersion{UserAgentVersion: coremain.CoreVersion}}} + c.assignments = &assignment{cla: make(map[string]*xdspb2.ClusterLoadAssignment)} c.version, c.nonce = make(map[string]string), make(map[string]string) c.ctx, c.cancel = context.WithCancel(context.Background()) @@ -87,7 +85,7 @@ func (c *Client) Run() error { default: } - cli := xdspb.NewAggregatedDiscoveryServiceClient(c.cc) + cli := adspb2.NewAggregatedDiscoveryServiceClient(c.cc) stream, err := cli.StreamAggregatedResources(c.ctx) if err != nil { return err @@ -95,7 +93,7 @@ func (c *Client) Run() error { if first { // send first request, to create stream, then wait for ADS to send us updates. - if err := c.clusterDiscovery(stream, c.Version(cdsURL), c.Nonce(cdsURL), []string{}); err != nil { + if err := c.clusterDiscovery(stream, c.Version(clusterType), c.Nonce(clusterType), []string{}); err != nil { return err } log.Infof("gRPC stream established to %q", c.to) // might fail?? @@ -111,9 +109,9 @@ func (c *Client) Run() error { // clusterDiscovery sends a cluster DiscoveryRequest on the stream. func (c *Client) clusterDiscovery(stream adsStream, version, nonce string, clusters []string) error { - req := &xdspb.DiscoveryRequest{ + req := &xdspb2.DiscoveryRequest{ Node: c.node, - TypeUrl: cdsURL, + TypeUrl: clusterType, ResourceNames: clusters, // empty for all VersionInfo: version, ResponseNonce: nonce, @@ -123,9 +121,9 @@ func (c *Client) clusterDiscovery(stream adsStream, version, nonce string, clust // endpointDiscovery sends a endpoint DiscoveryRequest on the stream. func (c *Client) endpointDiscovery(stream adsStream, version, nonce string, clusters []string) error { - req := &xdspb.DiscoveryRequest{ + req := &xdspb2.DiscoveryRequest{ Node: c.node, - TypeUrl: edsURL, + TypeUrl: endpointType, ResourceNames: clusters, VersionInfo: version, ResponseNonce: nonce, @@ -142,7 +140,7 @@ func (c *Client) receive(stream adsStream) error { } switch resp.GetTypeUrl() { - case cdsURL: + case clusterType: a := NewAssignment() for _, r := range resp.GetResources() { var any ptypes.DynamicAny @@ -150,33 +148,33 @@ func (c *Client) receive(stream adsStream) error { log.Debugf("Failed to unmarshal cluster discovery: %s", err) continue } - cluster, ok := any.Message.(*clusterpb.Cluster) + cluster, ok := any.Message.(*xdspb2.Cluster) if !ok { continue } a.SetClusterLoadAssignment(cluster.GetName(), nil) } // set our local administration and ack the reply. Empty version would signal NACK. - c.SetNonce(cdsURL, resp.GetNonce()) - c.SetVersion(cdsURL, resp.GetVersionInfo()) + c.SetNonce(clusterType, resp.GetNonce()) + c.SetVersion(clusterType, resp.GetVersionInfo()) c.SetAssignments(a) c.clusterDiscovery(stream, resp.GetVersionInfo(), resp.GetNonce(), a.clusters()) - log.Debugf("Cluster discovery processed with %d resources, version %q and nonce %q", len(resp.GetResources()), c.Version(cdsURL), c.Nonce(cdsURL)) + log.Debugf("Cluster discovery processed with %d resources, version %q and nonce %q", len(resp.GetResources()), c.Version(clusterType), c.Nonce(clusterType)) ClusterGauge.Set(float64(len(resp.GetResources()))) // now kick off discovery for endpoints - if err := c.endpointDiscovery(stream, c.Version(edsURL), c.Nonce(edsURL), a.clusters()); err != nil { + if err := c.endpointDiscovery(stream, c.Version(endpointType), c.Nonce(endpointType), a.clusters()); err != nil { log.Debug(err) } - case edsURL: + case endpointType: for _, r := range resp.GetResources() { var any ptypes.DynamicAny if err := ptypes.UnmarshalAny(r, &any); err != nil { log.Debugf("Failed to unmarshal endpoint discovery: %s", err) continue } - cla, ok := any.Message.(*endpointpb.ClusterLoadAssignment) + cla, ok := any.Message.(*xdspb2.ClusterLoadAssignment) if !ok { // TODO warn/err here? continue @@ -185,14 +183,14 @@ func (c *Client) receive(stream adsStream) error { } // set our local administration and ack the reply. Empty version would signal NACK. - c.SetNonce(edsURL, resp.GetNonce()) - c.SetVersion(edsURL, resp.GetVersionInfo()) + c.SetNonce(endpointType, resp.GetNonce()) + c.SetVersion(endpointType, resp.GetVersionInfo()) - log.Debugf("Endpoint discovery processed with %d resources, version %q and nonce %q", len(resp.GetResources()), c.Version(edsURL), c.Nonce(edsURL)) + log.Debugf("Endpoint discovery processed with %d resources, version %q and nonce %q", len(resp.GetResources()), c.Version(endpointType), c.Nonce(endpointType)) EndpointGauge.Set(float64(len(resp.GetResources()))) default: - return fmt.Errorf("unknown response URL for discovery: %q", resp.GetTypeUrl()) + // ignore anything we don't know how to process. Probably should NACK these properly. } } }