diff --git a/plugin/traffic/HACKING.md b/plugin/traffic/HACKING.md index aa811e259..990c1c353 100644 --- a/plugin/traffic/HACKING.md +++ b/plugin/traffic/HACKING.md @@ -8,27 +8,26 @@ Repos used: : implements client for xDS - much of this code has been reused here. -To see if things are working start the testing control plane from go-control-plane: +I found these website useful while working on this. * https://github.com/envoyproxy/envoy/blob/master/api/API_OVERVIEW.md * https://github.com/envoyproxy/learnenvoy/blob/master/_articles/service-discovery.md -* This was really helpful: https://www.envoyproxy.io/docs/envoy/v1.11.2/api-docs/xds_protocol - -Cluster: A cluster is a group of logically similar endpoints that Envoy connects to. In v2, RDS -routes points to clusters, CDS provides cluster configuration and Envoy discovers the cluster -members via EDS. +* This was *really* helpful: https://www.envoyproxy.io/docs/envoy/v1.11.2/api-docs/xds_protocol # Testing +Assuming you have envoyproxy/go-control-plane checked out somewhere, then: + ~~~ sh % cd ~/src/github.com/envoyproxy/go-control-plane/pkg/test/main % go build % ./main --xds=ads --runtimes=2 -debug ~~~ -This runs a binary from pkg/test/main. Now we're testing aDS. Everything is using gRPC with TLS, -`grpc.WithInsecure()`. The binary runs on port 18000 on localhost; all these things are currently -hardcoded in the *traffic* plugin. This will be factored out into config as some point. +This runs a binary from pkg/test/main. Now we're testing aDS. Everything is using gRPC with TLS +disabled: `grpc.WithInsecure()`. The test binary runs on port 18000 on localhost; all these things +are currently hardcoded in the *traffic* plugin. This will be factored out into config as some +point. Another thing that is hardcoded is the use of the "example.org" domain. Then for CoreDNS, check out the `traffic` branch, create a Corefile: @@ -39,7 +38,18 @@ example.org { } ~~~ -Start CoreDNS, and see logging/debugging flow by; the test binary should also spew out a bunch of -things. CoreDNS willl build up a list of cluster and endpoints. Next you can query it. +Start CoreDNS (`coredns -conf Corefile -dns.port=1053`), and see logging/debugging flow by; the +test binary should also spew out a bunch of things. CoreDNS willl build up a list of cluster and +endpoints. Next you can query it: -TODO +~~~ sh +% dig @localhost -p 1053 cluster-v0-0.example.org A +;; QUESTION SECTION: +;cluster-v0-0.example.org. IN A + +;; ANSWER SECTION: +cluster-v0-0.example.org. 5 IN A 127.0.0.1 +~~~ + +Note: the xds/test binary is a go-control-plane binary with added debugging that I'm using for +testing. diff --git a/plugin/traffic/README.md b/plugin/traffic/README.md index ec053d5e9..c15bda016 100644 --- a/plugin/traffic/README.md +++ b/plugin/traffic/README.md @@ -2,47 +2,39 @@ ## Name -*traffic* - handout addresses according to assignments. +*traffic* - handout addresses according to assignments from Envoy's xDS. ## Description -The *traffic* plugin is a load balancer that allows traffic steering, weighted responses and -draining of endpoints. The use case for this plugin is when a service is running in multiple -(Kubernetes?) clusters and need to steer traffic to (or away) from these, i.e. cluster A needs to be -upgraded, so all traffic to it is drained, while cluster B now takes on all the extra load. After -the maintenance cluster A is simply undrained. +The *traffic* plugin is a balancer that allows traffic steering, weighted responses +and draining of clusters. The cluster information is retrieved from a service +discovery manager that implements the service discovery protocols that Envoy +[implements](https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol). -*Traffic* discovers the endpoints via the Envoy xDS protocol, specifically messages of the type -"envoy.api.v2.ClusterLoadAssignment", these contain endpoints and an (optional) weight for each. -The `cluster_name` or `service_name` for a service must be a domain name. (TODO: check is this is -already the case). The plugin hands out responses that adhere to these assignments. -Assignments will need to be updated frequently, as discussed the [Envoy xDS -protocol](https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol) documentation. +A Cluster is defined as: "A cluster is a group of logically similar endpoints that Envoy connects +to. Each cluster has a name, which *traffic* extends to be a domain name. -Multiple endpoints for a service may exist; for every query *traffic* will hand out exactly one -address. When there are no assignments for a service name (yet), the responses will also be modified -(see below). +The use case for this plugin is when a cluster has endpoints running in multiple +(Kubernetes?) clusters and you need to steer traffic to (or away) from these endpoints, i.e. +endpoint A needs to be upgraded, so all traffic to it is drained. Or the entire Kubernetes needs to +upgraded, and *all* endpoints need to be drained from it. -*Traffic* will load balance A and AAAA queries. As said, it will return precisely one record in a -response. If a service should be load balanced, but no assignment can be found a random record from -the *answer section* will be choosen. - -Every message that is handled by the *traffic* plugin will have its TTLs set to 5 seconds, the -authority section, and all RRSIGs are removed from it. +*Traffic* discovers the endpoints via Envoy's xDS protocol. Endpoints and clusters are discovered +every 10 seconds. The plugin hands out responses that adhere to these assignments. Each DNS response +contains a single IP address that's considered the best one. *Traffic* will load balance A and AAAA +queries. The TTL on these answer is set to 5s. The *traffic* plugin has no notion of draining, drop overload and anything that advanced, *it just acts upon assignments*. This is means that if a endpoint goes down and *traffic* has not seen a new assignment yet, it will still include this endpoint address in responses. -Findign the xDS endpoint. - ## Syntax ~~~ traffic ~~~ -The extended syntax: +The extended syntax (not implemented; everything is hard-coded at the moment): ~~~ traffic { @@ -53,74 +45,35 @@ traffic { * id **ID** is how *traffic* identifies itself to the control plane. -This enables traffic load balancing for all (sub-)domains named in the server block. ## Examples ~~~ corefile example.org { traffic - forward . 10.12.13.14 + debug + log } ~~~ This will add load balancing for domains under example.org; the upstream information comes from 10.12.13.14; depending on received assignments, replies will be let through as-is or are load balanced. -## Assignments - -Assignments are streamed for a service that implements the xDS protocol, *traffic* will bla bla. -TODO. - -Picking an endpoint is done as follows: (still true for xDs - check afer implementing things) - -* include spiffy algorithm. - -On seeing a query for a service, *traffic* will track the reply. When it returns with an answer -*traffic* will rewrite it (and discard of any RRSIGs). Using the assignments the answer section will -be rewritten as such: - -* A endpoint will be picked using the algorithm from above. -* The TTL on the response will be 5s for all included records. -* According to previous responses for this service and the relative weights of each endpoints the - best endpoint will be put in the response. -* If after the selection *no* endpoints are available an NODATA response will be sent. An SOA - record will be synthesised, and a low TTL (and negative TTL) of 5 seconds will be set. - -Authority section will be removed. -If no assignment, randomly pick an address -other types then A and AAAA, like SRV - do the same selection. - -## Limitations - -Loadreporting via xDS is not supported; this can be implemented, but there are some things that make -this difficult. A single (DNS) query is done by a resolver. Behind this resolver there may be many -clients that will use this assignment. - - ## Bugs -This plugin does not play nice with DNSSEC - if the endpoint returns signatures with the answer; they -will be stripped. You can optionally sign responses on the fly by using the *dnssec* plugin. +Priority from ClusterLoadAssignments is not used. Locality is also not used. Health status of the +endpoints is ignore (for now). + +Load reporting via xDS is not supported; this can be implemented, but there are some things that make +this difficult. A single (DNS) query is done by a resolver. Behind this resolver there may be many +clients that will use this reply, the responding server (CoreDNS) has no idea how many clients use +this resolver. So reporting a load of +1 on the CoreDNS side can be anything from 1 to 1000+, making +the load reporting highly inaccurate. ## Also See +The following documents provide some background on Envoy's control plane. + * https://github.com/envoyproxy/go-control-plane * https://blog.christianposta.com/envoy/guidance-for-building-a-control-plane-to-manage-envoy-proxy-based-infrastructure/ * https://github.com/envoyproxy/envoy/blob/442f9fcf21a5f091cec3fe9913ff309e02288659/api/envoy/api/v2/discovery.proto#L63 -* This is a [post on weighted random selection](https://medium.com/@peterkellyonline/weighted-random-selection-3ff222917eb6). - -# TODO - -* wording: cluster, endpoints, assignments, service_name are all used and roughly mean the same - thing; unify this. -const ( - HealthStatus_UNKNOWN HealthStatus = 0 - HealthStatus_HEALTHY HealthStatus = 1 - HealthStatus_UNHEALTHY HealthStatus = 2 - HealthStatus_DRAINING HealthStatus = 3 - HealthStatus_TIMEOUT HealthStatus = 4 - HealthStatus_DEGRADED HealthStatus = 5 -) - -https://www.envoyproxy.io/docs/envoy/v1.11.2/api-docs/xds_protocol diff --git a/plugin/traffic/TODO b/plugin/traffic/TODO deleted file mode 100644 index c42225848..000000000 --- a/plugin/traffic/TODO +++ /dev/null @@ -1,4 +0,0 @@ -* unpack grpc-go's resolver and copy the bits we need here (we need *way* less) -* hook this up, potentially look at the tls plugin for creds -* hook up callbacks and test things out -* are unit tests possible? diff --git a/plugin/traffic/assignment.go b/plugin/traffic/assignment.go deleted file mode 100644 index e98779cfb..000000000 --- a/plugin/traffic/assignment.go +++ /dev/null @@ -1,39 +0,0 @@ -package traffic - -import ( - "math/rand" - "net" -) - -// assignment is an assignment for a single service. It contains multiple backends. -type assignment struct { - service string - backends []*backend -} - -// backend is a backend specified by an address, port and a weight. -type backend struct { - addr net.IP - port int - weight int -} - -// Select selects a backend from a, using weighted random selection -func (a assignment) Select() *backend { - total := 0 - for _, b := range a.backends { - total += b.weight - } - if total == 0 { - return nil - } - r := rand.Intn(total) + 1 - - for _, b := range a.backends { - r -= b.weight - if r <= 0 { - return b - } - } - return nil -} diff --git a/plugin/traffic/assignment_test.go b/plugin/traffic/assignment_test.go deleted file mode 100644 index fb9a74bcb..000000000 --- a/plugin/traffic/assignment_test.go +++ /dev/null @@ -1,38 +0,0 @@ -package traffic - -import ( - "math/rand" - "net" - "testing" - "time" -) - -func TestAssignment(t *testing.T) { - rand.Seed(int64(time.Now().Nanosecond())) - - backends := []*backend{ - {net.IPv4zero, 0, 6}, - {net.IPv4allrouter, 0, 4}, - {net.IPv4allsys, 0, 0}, - } - a := assignment{"www.example.org", backends} - - // should never get 0 weight, could be improved to check the difference between 4 and 6. - for i := 0; i < 100; i++ { - if x := a.Select(); x.weight == 0 { - t.Errorf("Expected non-nil weight for Select, got %v", x) - } - } -} - -func TestAssignmentZero(t *testing.T) { - rand.Seed(int64(time.Now().Nanosecond())) - - backends := []*backend{ - {net.IPv4zero, 0, 0}, - } - a := assignment{"www.example.org", backends} - if x := a.Select(); x != nil { - t.Errorf("Expected nil for Select, got %v", x) - } -} diff --git a/plugin/traffic/setup.go b/plugin/traffic/setup.go index d091a9882..eb203a3e0 100644 --- a/plugin/traffic/setup.go +++ b/plugin/traffic/setup.go @@ -40,10 +40,13 @@ func setup(c *caddy.Controller) error { log.Error(err) } - err = t.c.Receive(stream) - if err != nil { - return plugin.Error("traffic", err) - } + go func() { + err = t.c.Receive(stream) + if err != nil { + // can't do log debug in setup functions + log.Debug(err) + } + }() return nil } diff --git a/plugin/traffic/traffic.go b/plugin/traffic/traffic.go index b8a41dfc8..e1ff156fe 100644 --- a/plugin/traffic/traffic.go +++ b/plugin/traffic/traffic.go @@ -2,12 +2,11 @@ package traffic import ( "context" - "math/rand" - "time" "github.com/coredns/coredns/plugin" - "github.com/coredns/coredns/plugin/pkg/response" + "github.com/coredns/coredns/plugin/pkg/dnsutil" "github.com/coredns/coredns/plugin/traffic/xds" + "github.com/coredns/coredns/request" "github.com/miekg/dns" ) @@ -34,39 +33,28 @@ func (t *Traffic) Close() { // ServeDNS implements the plugin.Handler interface. func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) { - tw := &ResponseWriter{ResponseWriter: w} - return plugin.NextOrFailure(t.Name(), t.Next, ctx, tw, r) + state := request.Request{Req: r, W: w} + + cluster, _ := dnsutil.TrimZone(state.Name(), "example.org") + addr := t.c.Select(cluster) + if addr == nil { + return plugin.NextOrFailure(t.Name(), t.Next, ctx, w, r) + } + + log.Debugf("Found address %q for %q", addr, cluster) + + // assemble reply + m := new(dns.Msg) + m.SetReply(r) + + m.Answer = []dns.RR{&dns.A{ + dns.RR_Header{Name: state.QName(), Rrtype: dns.TypeA, Class: dns.ClassINET, Ttl: 5}, + addr, + }} + + w.WriteMsg(m) + return 0, nil } // Name implements the plugin.Handler interface. func (t *Traffic) Name() string { return "traffic" } - -// ResponseWriter writes a traffic load balanced response. -type ResponseWriter struct { - dns.ResponseWriter -} - -// WriteMsg implements the dns.ResponseWriter interface. -func (r *ResponseWriter) WriteMsg(res *dns.Msg) error { - // set all TTLs to 5, also negative TTL? - if res.Rcode != dns.RcodeSuccess { - return r.ResponseWriter.WriteMsg(res) - } - - if res.Question[0].Qtype != dns.TypeA && res.Question[0].Qtype != dns.TypeAAAA { - return r.ResponseWriter.WriteMsg(res) - } - - typ, _ := response.Typify(res, time.Now().UTC()) - if typ != response.NoError { - return r.ResponseWriter.WriteMsg(res) - } - - if len(res.Answer) > 1 { - res.Answer = []dns.RR{res.Answer[rand.Intn(len(res.Answer))]} - res.Answer[0].Header().Ttl = 5 - } - res.Ns = []dns.RR{} // remove auth section, we don't care - - return r.ResponseWriter.WriteMsg(res) -} diff --git a/plugin/traffic/xds/assignment.go b/plugin/traffic/xds/assignment.go new file mode 100644 index 000000000..97412ef31 --- /dev/null +++ b/plugin/traffic/xds/assignment.go @@ -0,0 +1,112 @@ +package xds + +import ( + "fmt" + "math/rand" + "net" + "sync" + + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" +) + +type assignment struct { + mu sync.RWMutex + cla map[string]*xdspb.ClusterLoadAssignment + version int // not sure what do with and if we should discard all clusters. +} + +func (a assignment) SetClusterLoadAssignment(cluster string, cla *xdspb.ClusterLoadAssignment) { + // If cla is nil we just found a cluster, check if we already know about it, or if we need to make a new entry. + a.mu.Lock() + defer a.mu.Unlock() + _, ok := a.cla[cluster] + if !ok { + log.Debugf("Adding cluster %q", cluster) + a.cla[cluster] = cla + return + } + if cla == nil { + return + } + fmt.Printf("%+v\n", cla) + log.Debugf("Updating cluster %q", cluster) + a.cla[cluster] = cla + +} + +// ClusterLoadAssignment returns the healthy endpoints and their weight. +func (a assignment) ClusterLoadAssignment(cluster string) *xdspb.ClusterLoadAssignment { + a.mu.RLock() + cla, ok := a.cla[cluster] + a.mu.RUnlock() + if !ok { + return nil + } + return cla +} + +func (a assignment) Clusters() []string { + a.mu.RLock() + defer a.mu.RUnlock() + clusters := make([]string, len(a.cla)) + i := 0 + for k := range a.cla { + clusters[i] = k + i++ + } + return clusters +} + +// Select selects a backend from cla, using weighted random selection. It only selects +// backends that are reporting healthy. +func (a assignment) Select(cluster string) net.IP { + cla := a.ClusterLoadAssignment(cluster) + if cla == nil { + return nil + } + + total := 0 + i := 0 + for _, ep := range cla.Endpoints { + for _, lb := range ep.GetLbEndpoints() { + // if lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY { + // continue + // } + total += int(lb.GetLoadBalancingWeight().GetValue()) + i++ + } + } + if total == 0 { + // all weights are 0, randomly select one of the endpoints. + r := rand.Intn(i) + i := 0 + for _, ep := range cla.Endpoints { + for _, lb := range ep.GetLbEndpoints() { + // if lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY { + // continue + // } + if r == i { + return net.ParseIP(lb.GetEndpoint().GetAddress().GetSocketAddress().GetAddress()) + } + i++ + } + } + return nil + } + + r := rand.Intn(total) + 1 + + for _, ep := range cla.Endpoints { + for _, lb := range ep.GetLbEndpoints() { + // if lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY { + // continue + // } + r -= int(lb.GetLoadBalancingWeight().GetValue()) + if r <= 0 { + return net.ParseIP(lb.GetEndpoint().GetAddress().GetSocketAddress().GetAddress()) + } + } + } + + return nil +} diff --git a/plugin/traffic/xds/client.go b/plugin/traffic/xds/client.go index 03c203d7a..d8dae31fc 100644 --- a/plugin/traffic/xds/client.go +++ b/plugin/traffic/xds/client.go @@ -22,7 +22,7 @@ package xds import ( "context" - "sync" + "net" "time" clog "github.com/coredns/coredns/plugin/pkg/log" @@ -34,7 +34,7 @@ import ( "google.golang.org/grpc" ) -var log = clog.NewWithPlugin("traffic xds:") +var log = clog.NewWithPlugin("traffic: xds") const ( cdsURL = "type.googleapis.com/envoy.api.v2.Cluster" @@ -52,45 +52,6 @@ type Client struct { stop chan struct{} } -type assignment struct { - mu sync.RWMutex - cla map[string]*xdspb.ClusterLoadAssignment - version int // not sure what do with and if we should discard all clusters. -} - -func (a assignment) SetClusterLoadAssignment(cluster string, cla *xdspb.ClusterLoadAssignment) { - // if cla is nil we just found a cluster, check if we already know about it, or if we need to make - // a new entry - a.mu.Lock() - defer a.mu.Unlock() - _, ok := a.cla[cluster] - if !ok { - a.cla[cluster] = cla - return - } - if cla == nil { - return - } - a.cla[cluster] = cla - -} - -func (a assignment) ClusterLoadAssignment(cluster string) *xdspb.ClusterLoadAssignment { - return nil -} - -func (a assignment) Clusters() []string { - a.mu.RLock() - defer a.mu.RUnlock() - clusters := make([]string, len(a.cla)) - i := 0 - for k := range a.cla { - clusters[i] = k - i++ - } - return clusters -} - // New returns a new client that's dialed to addr using node as the local identifier. func New(addr, node string) (*Client, error) { // todo credentials @@ -144,7 +105,7 @@ func (c *Client) Receive(stream adsStream) error { resp, err := stream.Recv() if err != nil { log.Warningf("Trouble receiving from the gRPC connection: %s", err) - time.Sleep(1 * time.Second) // better. + time.Sleep(10 * time.Second) // better. } switch resp.GetTypeUrl() { @@ -160,8 +121,7 @@ func (c *Client) Receive(stream adsStream) error { } c.assignments.SetClusterLoadAssignment(cluster.GetName(), nil) } - println("CDS", len(resp.GetResources()), "processed") - log.Debug("Cluster discovery processed with %d resources", len(resp.GetResources())) + log.Debugf("Cluster discovery processed with %d resources", len(resp.GetResources())) // ack the CDS proto, with we we've got. (empty version would be NACK) if err := c.ClusterDiscovery(stream, resp.GetVersionInfo(), resp.GetNonce(), c.assignments.Clusters()); err != nil { log.Warningf("Failed to acknowledge cluster discovery: %s", err) @@ -188,13 +148,14 @@ func (c *Client) Receive(stream adsStream) error { c.assignments.SetClusterLoadAssignment(cla.GetClusterName(), cla) // ack the bloody thing } - println("EDS", len(resp.GetResources()), "processed") - log.Debug("Endpoint discovery processed with %d resources", len(resp.GetResources())) + log.Debugf("Endpoint discovery processed with %d resources", len(resp.GetResources())) default: log.Warningf("Unknown response URL for discovery: %q", resp.GetTypeUrl()) continue } } - return nil } + +// Select is a small wrapper. bla bla, keeps assigmens private. +func (c *Client) Select(cluster string) net.IP { return c.assignments.Select(cluster) } diff --git a/plugin/traffic/xds_bootstrap.json b/plugin/traffic/xds_bootstrap.json deleted file mode 100644 index e6df9bbbb..000000000 --- a/plugin/traffic/xds_bootstrap.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "node": { - "id": "ENVOY_NODE_ID", - "metadata": { - "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" - } - }, - "xds_servers" : [{ - "server_uri": "localhost:18000", - "channel_creds": [ - { "type": "google_default" } - ] - }] -} diff --git a/plugin/traffic/xds_bootstrap_insecure.json b/plugin/traffic/xds_bootstrap_insecure.json deleted file mode 100644 index 724de2409..000000000 --- a/plugin/traffic/xds_bootstrap_insecure.json +++ /dev/null @@ -1,11 +0,0 @@ -{ - "node": { - "id": "COREDNS_NODE_ID", - "metadata": { - "TRAFFICDIRECTOR_GRPC_HOSTNAME": "xds_cluster" - } - }, - "xds_servers" : [{ - "server_uri": "localhost:18000" - }] -} diff --git a/plugin/traffic/xds_old/README_coredns.md b/plugin/traffic/xds_old/README_coredns.md deleted file mode 100644 index 6c5ece8f8..000000000 --- a/plugin/traffic/xds_old/README_coredns.md +++ /dev/null @@ -1,10 +0,0 @@ -This code is copied from -[https://github.com/grpc/grpc-go/tree/master/xds](https://github.com/grpc/grpc-go/tree/master/xds). -Grpc-go is also a consumer of the Envoy xDS data and acts upon it. - -The *traffic* plugin only cares about clusters and endpoints, the following bits are deleted: - -* lDS; listener discovery is not used here. -* rDS: routes have no use for DNS responses. - -Load reporting is also not implemented, although this can be done on the DNS level. diff --git a/plugin/traffic/xds_old/bootstrap/bootstrap.go b/plugin/traffic/xds_old/bootstrap/bootstrap.go deleted file mode 100644 index 910120c0c..000000000 --- a/plugin/traffic/xds_old/bootstrap/bootstrap.go +++ /dev/null @@ -1,165 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -// Package bootstrap provides the functionality to initialize certain aspects -// of an xDS client by reading a bootstrap file. -package bootstrap - -import ( - "bytes" - "encoding/json" - "fmt" - "io/ioutil" - "os" - - "github.com/coredns/coredns/plugin/pkg/log" - - corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" - "github.com/golang/protobuf/jsonpb" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/google" - "google.golang.org/grpc/grpclog" -) - -const ( - // Environment variable which holds the name of the xDS bootstrap file. - fileEnv = "GRPC_XDS_BOOTSTRAP" - // Type name for Google default credentials. - googleDefaultCreds = "google_default" -) - -var gRPCVersion = fmt.Sprintf("gRPC-Go %s", grpc.Version) - -// For overriding in unit tests. -var fileReadFunc = ioutil.ReadFile - -// Config provides the xDS client with several key bits of information that it -// requires in its interaction with an xDS server. The Config is initialized -// from the bootstrap file. -type Config struct { - // BalancerName is the name of the xDS server to connect to. - // - // The bootstrap file contains a list of servers (with name+creds), but we - // pick the first one. - BalancerName string - // Creds contains the credentials to be used while talking to the xDS - // server, as a grpc.DialOption. - Creds grpc.DialOption - // NodeProto contains the node proto to be used in xDS requests. - NodeProto *corepb.Node -} - -type channelCreds struct { - Type string `json:"type"` - Config json.RawMessage `json:"config"` -} - -type xdsServer struct { - ServerURI string `json:"server_uri"` - ChannelCreds []channelCreds `json:"channel_creds"` -} - -// NewConfig returns a new instance of Config initialized by reading the -// bootstrap file found at ${GRPC_XDS_BOOTSTRAP}. -// -// The format of the bootstrap file will be as follows: -// { -// "xds_server": { -// "server_uri": , -// "channel_creds": [ -// { -// "type": , -// "config": -// } -// ] -// }, -// "node": -// } -// -// Currently, we support exactly one type of credential, which is -// "google_default", where we use the host's default certs for transport -// credentials and a Google oauth token for call credentials. -// -// This function tries to process as much of the bootstrap file as possible (in -// the presence of the errors) and may return a Config object with certain -// fields left unspecified, in which case the caller should use some sane -// defaults. -func NewConfig() (*Config, error) { - config := &Config{} - - fName, ok := os.LookupEnv(fileEnv) - if !ok { - return config, fmt.Errorf("xds: %s environment variable not set", fileEnv) - } - - grpclog.Infof("xds: Reading bootstrap file from %s", fName) - data, err := fileReadFunc(fName) - if err != nil { - return config, fmt.Errorf("xds: bootstrap file {%v} read failed: %v", fName, err) - } - - var jsonData map[string]json.RawMessage - if err := json.Unmarshal(data, &jsonData); err != nil { - return config, fmt.Errorf("xds: json.Unmarshal(%v) failed during bootstrap: %v", string(data), err) - } - - m := jsonpb.Unmarshaler{AllowUnknownFields: true} - for k, v := range jsonData { - switch k { - case "node": - n := &corepb.Node{} - if err := m.Unmarshal(bytes.NewReader(v), n); err != nil { - log.Errorf("xds: jsonpb.Unmarshal(%v) for field %q failed during bootstrap: %v", string(v), k, err) - break - } - config.NodeProto = n - case "xds_servers": - var servers []*xdsServer - if err := json.Unmarshal(v, &servers); err != nil { - log.Errorf("xds: json.Unmarshal(%v) for field %q failed during bootstrap: %v", string(v), k, err) - break - } - if len(servers) < 1 { - log.Errorf("xds: bootstrap file parsing failed during bootstrap: file doesn't contain any xds server to connect to") - break - } - xs := servers[0] - config.BalancerName = xs.ServerURI - for _, cc := range xs.ChannelCreds { - if cc.Type == googleDefaultCreds { - config.Creds = grpc.WithCredentialsBundle(google.NewComputeEngineCredentials()) - // We stop at the first credential type that we support. - break - } - } - default: - // Do not fail the xDS bootstrap when an unknown field is seen. - log.Warningf("xds: unexpected data in bootstrap file: {%v, %v}", k, string(v)) - } - } - - // If we don't find a nodeProto in the bootstrap file, we just create an - // empty one here. That way, callers of this function can always expect - // that the NodeProto field is non-nil. - if config.NodeProto == nil { - config.NodeProto = &corepb.Node{} - } - config.NodeProto.BuildVersion = gRPCVersion - - return config, nil -} diff --git a/plugin/traffic/xds_old/bootstrap/bootstrap_test.go b/plugin/traffic/xds_old/bootstrap/bootstrap_test.go deleted file mode 100644 index a55e1d0f7..000000000 --- a/plugin/traffic/xds_old/bootstrap/bootstrap_test.go +++ /dev/null @@ -1,260 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package bootstrap - -import ( - "os" - "testing" - - "github.com/golang/protobuf/proto" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/google" - - corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" - structpb "github.com/golang/protobuf/ptypes/struct" -) - -var ( - nodeProto = &corepb.Node{ - Id: "ENVOY_NODE_ID", - Metadata: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - "TRAFFICDIRECTOR_GRPC_HOSTNAME": { - Kind: &structpb.Value_StringValue{StringValue: "trafficdirector"}, - }, - }, - }, - BuildVersion: gRPCVersion, - } - nilCredsConfig = &Config{ - BalancerName: "trafficdirector.googleapis.com:443", - Creds: nil, - NodeProto: nodeProto, - } - nonNilCredsConfig = &Config{ - BalancerName: "trafficdirector.googleapis.com:443", - Creds: grpc.WithCredentialsBundle(google.NewComputeEngineCredentials()), - NodeProto: nodeProto, - } -) - -// TestNewConfig exercises the functionality in NewConfig with different -// bootstrap file contents. It overrides the fileReadFunc by returning -// bootstrap file contents defined in this test, instead of reading from a -// file. -func TestNewConfig(t *testing.T) { - bootstrapFileMap := map[string]string{ - "empty": "", - "badJSON": `["test": 123]`, - "emptyNodeProto": ` - { - "xds_servers" : [{ - "server_uri": "trafficdirector.googleapis.com:443" - }] - }`, - "emptyXdsServer": ` - { - "node": { - "id": "ENVOY_NODE_ID", - "metadata": { - "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" - } - } - }`, - "unknownTopLevelFieldInFile": ` - { - "node": { - "id": "ENVOY_NODE_ID", - "metadata": { - "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" - } - }, - "xds_servers" : [{ - "server_uri": "trafficdirector.googleapis.com:443", - "channel_creds": [ - { "type": "not-google-default" } - ] - }], - "unknownField": "foobar" - }`, - "unknownFieldInNodeProto": ` - { - "node": { - "id": "ENVOY_NODE_ID", - "unknownField": "foobar", - "metadata": { - "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" - } - } - }`, - "unknownFieldInXdsServer": ` - { - "node": { - "id": "ENVOY_NODE_ID", - "metadata": { - "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" - } - }, - "xds_servers" : [{ - "server_uri": "trafficdirector.googleapis.com:443", - "channel_creds": [ - { "type": "not-google-default" } - ], - "unknownField": "foobar" - }] - }`, - "emptyChannelCreds": ` - { - "node": { - "id": "ENVOY_NODE_ID", - "metadata": { - "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" - } - }, - "xds_servers" : [{ - "server_uri": "trafficdirector.googleapis.com:443" - }] - }`, - "nonGoogleDefaultCreds": ` - { - "node": { - "id": "ENVOY_NODE_ID", - "metadata": { - "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" - } - }, - "xds_servers" : [{ - "server_uri": "trafficdirector.googleapis.com:443", - "channel_creds": [ - { "type": "not-google-default" } - ] - }] - }`, - "multipleChannelCreds": ` - { - "node": { - "id": "ENVOY_NODE_ID", - "metadata": { - "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" - } - }, - "xds_servers" : [{ - "server_uri": "trafficdirector.googleapis.com:443", - "channel_creds": [ - { "type": "not-google-default" }, - { "type": "google_default" } - ] - }] - }`, - "goodBootstrap": ` - { - "node": { - "id": "ENVOY_NODE_ID", - "metadata": { - "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" - } - }, - "xds_servers" : [{ - "server_uri": "trafficdirector.googleapis.com:443", - "channel_creds": [ - { "type": "google_default" } - ] - }] - }`, - "multipleXDSServers": ` - { - "node": { - "id": "ENVOY_NODE_ID", - "metadata": { - "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" - } - }, - "xds_servers" : [ - { - "server_uri": "trafficdirector.googleapis.com:443", - "channel_creds": [{ "type": "google_default" }] - }, - { - "server_uri": "backup.never.use.com:1234", - "channel_creds": [{ "type": "not-google-default" }] - } - ] - }`, - } - - oldFileReadFunc := fileReadFunc - fileReadFunc = func(name string) ([]byte, error) { - if b, ok := bootstrapFileMap[name]; ok { - return []byte(b), nil - } - return nil, os.ErrNotExist - } - defer func() { - fileReadFunc = oldFileReadFunc - os.Unsetenv(fileEnv) - }() - - tests := []struct { - name string - wantConfig *Config - }{ - {"nonExistentBootstrapFile", &Config{}}, - {"empty", &Config{}}, - {"badJSON", &Config{}}, - {"emptyNodeProto", &Config{ - BalancerName: "trafficdirector.googleapis.com:443", - NodeProto: &corepb.Node{BuildVersion: gRPCVersion}, - }}, - {"emptyXdsServer", &Config{NodeProto: nodeProto}}, - {"unknownTopLevelFieldInFile", nilCredsConfig}, - {"unknownFieldInNodeProto", &Config{NodeProto: nodeProto}}, - {"unknownFieldInXdsServer", nilCredsConfig}, - {"emptyChannelCreds", nilCredsConfig}, - {"nonGoogleDefaultCreds", nilCredsConfig}, - {"multipleChannelCreds", nonNilCredsConfig}, - {"goodBootstrap", nonNilCredsConfig}, - {"multipleXDSServers", nonNilCredsConfig}, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - if err := os.Setenv(fileEnv, test.name); err != nil { - t.Fatalf("os.Setenv(%s, %s) failed with error: %v", fileEnv, test.name, err) - } - config := NewConfig() - if config.BalancerName != test.wantConfig.BalancerName { - t.Errorf("config.BalancerName is %s, want %s", config.BalancerName, test.wantConfig.BalancerName) - } - if !proto.Equal(config.NodeProto, test.wantConfig.NodeProto) { - t.Errorf("config.NodeProto is %#v, want %#v", config.NodeProto, test.wantConfig.NodeProto) - } - if (config.Creds != nil) != (test.wantConfig.Creds != nil) { - t.Errorf("config.Creds is %#v, want %#v", config.Creds, test.wantConfig.Creds) - } - }) - } -} - -func TestNewConfigEnvNotSet(t *testing.T) { - os.Unsetenv(fileEnv) - wantConfig := Config{} - if config := NewConfig(); *config != wantConfig { - t.Errorf("NewConfig() returned : %#v, wanted an empty Config object", config) - } -} diff --git a/plugin/traffic/xds_old/buffer/unbounded.go b/plugin/traffic/xds_old/buffer/unbounded.go deleted file mode 100644 index 9f6a0c120..000000000 --- a/plugin/traffic/xds_old/buffer/unbounded.go +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -// Package buffer provides an implementation of an unbounded buffer. -package buffer - -import "sync" - -// Unbounded is an implementation of an unbounded buffer which does not use -// extra goroutines. This is typically used for passing updates from one entity -// to another within gRPC. -// -// All methods on this type are thread-safe and don't block on anything except -// the underlying mutex used for synchronization. -// -// Unbounded supports values of any type to be stored in it by using a channel -// of `interface{}`. This means that a call to Put() incurs an extra memory -// allocation, and also that users need a type assertion while reading. For -// performance critical code paths, using Unbounded is strongly discouraged and -// defining a new type specific implementation of this buffer is preferred. See -// internal/transport/transport.go for an example of this. -type Unbounded struct { - c chan interface{} - mu sync.Mutex - backlog []interface{} -} - -// NewUnbounded returns a new instance of Unbounded. -func NewUnbounded() *Unbounded { - return &Unbounded{c: make(chan interface{}, 1)} -} - -// Put adds t to the unbounded buffer. -func (b *Unbounded) Put(t interface{}) { - b.mu.Lock() - if len(b.backlog) == 0 { - select { - case b.c <- t: - b.mu.Unlock() - return - default: - } - } - b.backlog = append(b.backlog, t) - b.mu.Unlock() -} - -// Load sends the earliest buffered data, if any, onto the read channel -// returned by Get(). Users are expected to call this every time they read a -// value from the read channel. -func (b *Unbounded) Load() { - b.mu.Lock() - if len(b.backlog) > 0 { - select { - case b.c <- b.backlog[0]: - b.backlog[0] = nil - b.backlog = b.backlog[1:] - default: - } - } - b.mu.Unlock() -} - -// Get returns a read channel on which values added to the buffer, via Put(), -// are sent on. -// -// Upon reading a value from this channel, users are expected to call Load() to -// send the next buffered value onto the channel if there is any. -func (b *Unbounded) Get() <-chan interface{} { - return b.c -} diff --git a/plugin/traffic/xds_old/buffer/unbounded_test.go b/plugin/traffic/xds_old/buffer/unbounded_test.go deleted file mode 100644 index c8067019b..000000000 --- a/plugin/traffic/xds_old/buffer/unbounded_test.go +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package buffer - -import ( - "reflect" - "sort" - "sync" - "testing" -) - -const ( - numWriters = 10 - numWrites = 10 -) - -// wantReads contains the set of values expected to be read by the reader -// goroutine in the tests. -var wantReads []int - -func init() { - for i := 0; i < numWriters; i++ { - for j := 0; j < numWrites; j++ { - wantReads = append(wantReads, i) - } - } -} - -// TestSingleWriter starts one reader and one writer goroutine and makes sure -// that the reader gets all the value added to the buffer by the writer. -func TestSingleWriter(t *testing.T) { - ub := NewUnbounded() - reads := []int{} - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - ch := ub.Get() - for i := 0; i < numWriters*numWrites; i++ { - r := <-ch - reads = append(reads, r.(int)) - ub.Load() - } - }() - - wg.Add(1) - go func() { - defer wg.Done() - for i := 0; i < numWriters; i++ { - for j := 0; j < numWrites; j++ { - ub.Put(i) - } - } - }() - - wg.Wait() - if !reflect.DeepEqual(reads, wantReads) { - t.Errorf("reads: %#v, wantReads: %#v", reads, wantReads) - } -} - -// TestMultipleWriters starts multiple writers and one reader goroutine and -// makes sure that the reader gets all the data written by all writers. -func TestMultipleWriters(t *testing.T) { - ub := NewUnbounded() - reads := []int{} - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - ch := ub.Get() - for i := 0; i < numWriters*numWrites; i++ { - r := <-ch - reads = append(reads, r.(int)) - ub.Load() - } - }() - - wg.Add(numWriters) - for i := 0; i < numWriters; i++ { - go func(index int) { - defer wg.Done() - for j := 0; j < numWrites; j++ { - ub.Put(index) - } - }(i) - } - - wg.Wait() - sort.Ints(reads) - if !reflect.DeepEqual(reads, wantReads) { - t.Errorf("reads: %#v, wantReads: %#v", reads, wantReads) - } -} diff --git a/plugin/traffic/xds_old/cds.go b/plugin/traffic/xds_old/cds.go deleted file mode 100644 index c51d0a46a..000000000 --- a/plugin/traffic/xds_old/cds.go +++ /dev/null @@ -1,90 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package xds - -import ( - "fmt" - - xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" - "github.com/golang/protobuf/ptypes" -) - -// handleCDSResponse processes an CDS response received from the xDS server. On -// receipt of a good response, it also invokes the registered watcher callback. -func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error { - println("handlCDSResponse") - v2c.mu.Lock() - defer v2c.mu.Unlock() - - wi := v2c.watchMap[cdsURL] - if wi == nil { - return fmt.Errorf("xds: no CDS watcher found when handling CDS response: %+v", resp) - } - - var returnUpdate CDSUpdate - localCache := make(map[string]CDSUpdate) - for _, r := range resp.GetResources() { - var resource ptypes.DynamicAny - if err := ptypes.UnmarshalAny(r, &resource); err != nil { - return fmt.Errorf("xds: failed to unmarshal resource in CDS response: %v", err) - } - cluster, ok := resource.Message.(*xdspb.Cluster) - if !ok { - return fmt.Errorf("xds: unexpected resource type: %T in CDS response", resource.Message) - } - fmt.Printf("CLUSTER %+v\n", cluster) - update, err := validateCluster(cluster) - if err != nil { - return err - } - - // If the Cluster message in the CDS response did not contain a - // serviceName, we will just use the clusterName for EDS. - if update.ServiceName == "" { - update.ServiceName = cluster.GetName() - } - localCache[cluster.GetName()] = update - if cluster.GetName() == wi.target[0] { - returnUpdate = update - } - } - v2c.cdsCache = localCache - - var err error - if returnUpdate.ServiceName == "" { - err = fmt.Errorf("xds: CDS target %s not found in received response %+v", wi.target, resp) - } - wi.stopTimer() - wi.callback.(cdsCallback)(returnUpdate, err) - return nil -} - -func validateCluster(cluster *xdspb.Cluster) (CDSUpdate, error) { - emptyUpdate := CDSUpdate{ServiceName: ""} - switch { - case cluster.GetType() != xdspb.Cluster_EDS: - return emptyUpdate, fmt.Errorf("xds: unexpected cluster type %v in response: %+v", cluster.GetType(), cluster) - case cluster.GetEdsClusterConfig().GetEdsConfig().GetAds() == nil: - return emptyUpdate, fmt.Errorf("xds: unexpected edsConfig in response: %+v", cluster) - case cluster.GetLbPolicy() != xdspb.Cluster_ROUND_ROBIN: - return emptyUpdate, fmt.Errorf("xds: unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster) - } - - return CDSUpdate{ServiceName: cluster.GetEdsClusterConfig().GetServiceName()}, nil -} diff --git a/plugin/traffic/xds_old/cds_test.go b/plugin/traffic/xds_old/cds_test.go deleted file mode 100644 index c8412f52a..000000000 --- a/plugin/traffic/xds_old/cds_test.go +++ /dev/null @@ -1,487 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package xds - -import ( - "errors" - "fmt" - "reflect" - "testing" - "time" - - discoverypb "github.com/envoyproxy/go-control-plane/envoy/api/v2" - xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" - corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" - "github.com/golang/protobuf/proto" - anypb "github.com/golang/protobuf/ptypes/any" - "google.golang.org/grpc/xds/internal/testutils" - "google.golang.org/grpc/xds/internal/testutils/fakeserver" -) - -const ( - clusterName1 = "foo-cluster" - clusterName2 = "bar-cluster" - serviceName1 = "foo-service" - serviceName2 = "bar-service" -) - -func (v2c *v2Client) cloneCDSCacheForTesting() map[string]CDSUpdate { - v2c.mu.Lock() - defer v2c.mu.Unlock() - - cloneCache := make(map[string]CDSUpdate) - for k, v := range v2c.cdsCache { - cloneCache[k] = v - } - return cloneCache -} - -func TestValidateCluster(t *testing.T) { - emptyUpdate := CDSUpdate{ServiceName: "", EnableLRS: false} - tests := []struct { - name string - cluster *xdspb.Cluster - wantUpdate CDSUpdate - wantErr bool - }{ - { - name: "non-eds-cluster-type", - cluster: &xdspb.Cluster{ - ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_STATIC}, - EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{ - EdsConfig: &corepb.ConfigSource{ - ConfigSourceSpecifier: &corepb.ConfigSource_Ads{ - Ads: &corepb.AggregatedConfigSource{}, - }, - }, - }, - LbPolicy: xdspb.Cluster_LEAST_REQUEST, - }, - wantUpdate: emptyUpdate, - wantErr: true, - }, - { - name: "no-eds-config", - cluster: &xdspb.Cluster{ - ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS}, - LbPolicy: xdspb.Cluster_ROUND_ROBIN, - }, - wantUpdate: emptyUpdate, - wantErr: true, - }, - { - name: "no-ads-config-source", - cluster: &xdspb.Cluster{ - ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS}, - EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{}, - LbPolicy: xdspb.Cluster_ROUND_ROBIN, - }, - wantUpdate: emptyUpdate, - wantErr: true, - }, - { - name: "non-round-robin-lb-policy", - cluster: &xdspb.Cluster{ - ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS}, - EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{ - EdsConfig: &corepb.ConfigSource{ - ConfigSourceSpecifier: &corepb.ConfigSource_Ads{ - Ads: &corepb.AggregatedConfigSource{}, - }, - }, - }, - LbPolicy: xdspb.Cluster_LEAST_REQUEST, - }, - wantUpdate: emptyUpdate, - wantErr: true, - }, - { - name: "happy-case-no-service-name-no-lrs", - cluster: &xdspb.Cluster{ - ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS}, - EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{ - EdsConfig: &corepb.ConfigSource{ - ConfigSourceSpecifier: &corepb.ConfigSource_Ads{ - Ads: &corepb.AggregatedConfigSource{}, - }, - }, - }, - LbPolicy: xdspb.Cluster_ROUND_ROBIN, - }, - wantUpdate: emptyUpdate, - }, - { - name: "happy-case-no-lrs", - cluster: &xdspb.Cluster{ - ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS}, - EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{ - EdsConfig: &corepb.ConfigSource{ - ConfigSourceSpecifier: &corepb.ConfigSource_Ads{ - Ads: &corepb.AggregatedConfigSource{}, - }, - }, - ServiceName: serviceName1, - }, - LbPolicy: xdspb.Cluster_ROUND_ROBIN, - }, - wantUpdate: CDSUpdate{ServiceName: serviceName1, EnableLRS: false}, - }, - { - name: "happiest-case", - cluster: goodCluster1, - wantUpdate: CDSUpdate{ServiceName: serviceName1, EnableLRS: true}, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - gotUpdate, gotErr := validateCluster(test.cluster) - if (gotErr != nil) != test.wantErr { - t.Errorf("validateCluster(%+v) returned error: %v, wantErr: %v", test.cluster, gotErr, test.wantErr) - } - if !reflect.DeepEqual(gotUpdate, test.wantUpdate) { - t.Errorf("validateCluster(%+v) = %v, want: %v", test.cluster, gotUpdate, test.wantUpdate) - } - }) - } -} - -// TestCDSHandleResponse starts a fake xDS server, makes a ClientConn to it, -// and creates a v2Client using it. Then, it registers a CDS watcher and tests -// different CDS responses. -func TestCDSHandleResponse(t *testing.T) { - fakeServer, cc, cleanup := startServerAndGetCC(t) - defer cleanup() - - v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) - defer v2c.close() - - tests := []struct { - name string - cdsResponse *xdspb.DiscoveryResponse - wantErr bool - wantUpdate *CDSUpdate - wantUpdateErr bool - }{ - // Badly marshaled CDS response. - { - name: "badly-marshaled-response", - cdsResponse: badlyMarshaledCDSResponse, - wantErr: true, - wantUpdate: nil, - wantUpdateErr: false, - }, - // Response does not contain Cluster proto. - { - name: "no-cluster-proto-in-response", - cdsResponse: badResourceTypeInLDSResponse, - wantErr: true, - wantUpdate: nil, - wantUpdateErr: false, - }, - // Response contains no clusters. - { - name: "no-cluster", - cdsResponse: &xdspb.DiscoveryResponse{}, - wantErr: false, - wantUpdate: &CDSUpdate{}, - wantUpdateErr: true, - }, - // Response contains one good cluster we are not interested in. - { - name: "one-uninteresting-cluster", - cdsResponse: goodCDSResponse2, - wantErr: false, - wantUpdate: &CDSUpdate{}, - wantUpdateErr: true, - }, - // Response contains one cluster and it is good. - { - name: "one-good-cluster", - cdsResponse: goodCDSResponse1, - wantErr: false, - wantUpdate: &CDSUpdate{ServiceName: serviceName1, EnableLRS: true}, - wantUpdateErr: false, - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - testWatchHandle(t, &watchHandleTestcase{ - responseToHandle: test.cdsResponse, - wantHandleErr: test.wantErr, - wantUpdate: test.wantUpdate, - wantUpdateErr: test.wantUpdateErr, - - cdsWatch: v2c.watchCDS, - watchReqChan: fakeServer.XDSRequestChan, - handleXDSResp: v2c.handleCDSResponse, - }) - }) - } -} - -// TestCDSHandleResponseWithoutWatch tests the case where the v2Client receives -// a CDS response without a registered watcher. -func TestCDSHandleResponseWithoutWatch(t *testing.T) { - _, cc, cleanup := startServerAndGetCC(t) - defer cleanup() - - v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) - defer v2c.close() - - if v2c.handleCDSResponse(goodCDSResponse1) == nil { - t.Fatal("v2c.handleCDSResponse() succeeded, should have failed") - } -} - -// cdsTestOp contains all data related to one particular test operation. Not -// all fields make sense for all tests. -type cdsTestOp struct { - // target is the resource name to watch for. - target string - // responseToSend is the xDS response sent to the client - responseToSend *fakeserver.Response - // wantOpErr specfies whether the main operation should return an error. - wantOpErr bool - // wantCDSCache is the expected rdsCache at the end of an operation. - wantCDSCache map[string]CDSUpdate - // wantWatchCallback specifies if the watch callback should be invoked. - wantWatchCallback bool -} - -// testCDSCaching is a helper function which starts a fake xDS server, makes a -// ClientConn to it, creates a v2Client using it. It then reads a bunch of -// test operations to be performed from cdsTestOps and returns error, if any, -// on the provided error channel. This is executed in a separate goroutine. -func testCDSCaching(t *testing.T, cdsTestOps []cdsTestOp, errCh *testutils.Channel) { - t.Helper() - - fakeServer, cc, cleanup := startServerAndGetCC(t) - defer cleanup() - - v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) - defer v2c.close() - t.Log("Started xds v2Client...") - - callbackCh := make(chan struct{}, 1) - for _, cdsTestOp := range cdsTestOps { - // Register a watcher if required, and use a channel to signal the - // successful invocation of the callback. - if cdsTestOp.target != "" { - v2c.watchCDS(cdsTestOp.target, func(u CDSUpdate, err error) { - t.Logf("Received callback with CDSUpdate {%+v} and error {%v}", u, err) - callbackCh <- struct{}{} - }) - t.Logf("Registered a watcher for CDS target: %v...", cdsTestOp.target) - - // Wait till the request makes it to the fakeServer. This ensures that - // the watch request has been processed by the v2Client. - if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { - errCh.Send(fmt.Errorf("Timeout waiting for CDS request: %v", err)) - return - } - t.Log("FakeServer received request...") - } - - // Directly push the response through a call to handleCDSResponse, - // thereby bypassing the fakeServer. - if cdsTestOp.responseToSend != nil { - resp := cdsTestOp.responseToSend.Resp.(*discoverypb.DiscoveryResponse) - if err := v2c.handleCDSResponse(resp); (err != nil) != cdsTestOp.wantOpErr { - errCh.Send(fmt.Errorf("v2c.handleRDSResponse(%+v) returned err: %v", resp, err)) - return - } - } - - // If the test needs the callback to be invoked, just verify that - // it was invoked. Since we verify the contents of the cache, it's - // ok not to verify the contents of the callback. - if cdsTestOp.wantWatchCallback { - <-callbackCh - } - - if !reflect.DeepEqual(v2c.cloneCDSCacheForTesting(), cdsTestOp.wantCDSCache) { - errCh.Send(fmt.Errorf("gotCDSCache: %v, wantCDSCache: %v", v2c.rdsCache, cdsTestOp.wantCDSCache)) - return - } - } - t.Log("Completed all test ops successfully...") - errCh.Send(nil) -} - -// TestCDSCaching tests some end-to-end CDS flows using a fake xDS server, and -// verifies the CDS data cached at the v2Client. -func TestCDSCaching(t *testing.T) { - ops := []cdsTestOp{ - // Add an CDS watch for a cluster name (clusterName1), which returns one - // matching resource in the response. - { - target: clusterName1, - responseToSend: &fakeserver.Response{Resp: goodCDSResponse1}, - wantCDSCache: map[string]CDSUpdate{ - clusterName1: {serviceName1, true}, - }, - wantWatchCallback: true, - }, - // Push an CDS response which contains a new resource (apart from the - // one received in the previous response). This should be cached. - { - responseToSend: &fakeserver.Response{Resp: cdsResponseWithMultipleResources}, - wantCDSCache: map[string]CDSUpdate{ - clusterName1: {serviceName1, true}, - clusterName2: {serviceName2, false}, - }, - wantWatchCallback: true, - }, - // Switch the watch target to clusterName2, which was already cached. No - // response is received from the server (as expected), but we want the - // callback to be invoked with the new serviceName. - { - target: clusterName2, - wantCDSCache: map[string]CDSUpdate{ - clusterName1: {serviceName1, true}, - clusterName2: {serviceName2, false}, - }, - wantWatchCallback: true, - }, - // Push an empty CDS response. This should clear the cache. - { - responseToSend: &fakeserver.Response{Resp: &xdspb.DiscoveryResponse{TypeUrl: cdsURL}}, - wantOpErr: false, - wantCDSCache: map[string]CDSUpdate{}, - wantWatchCallback: true, - }, - } - errCh := testutils.NewChannel() - go testCDSCaching(t, ops, errCh) - waitForNilErr(t, errCh) -} - -// TestCDSWatchExpiryTimer tests the case where the client does not receive an -// CDS response for the request that it sends out. We want the watch callback -// to be invoked with an error once the watchExpiryTimer fires. -func TestCDSWatchExpiryTimer(t *testing.T) { - oldWatchExpiryTimeout := defaultWatchExpiryTimeout - defaultWatchExpiryTimeout = 500 * time.Millisecond - defer func() { - defaultWatchExpiryTimeout = oldWatchExpiryTimeout - }() - - fakeServer, cc, cleanup := startServerAndGetCC(t) - defer cleanup() - - v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) - defer v2c.close() - t.Log("Started xds v2Client...") - - callbackCh := testutils.NewChannel() - v2c.watchCDS(clusterName1, func(u CDSUpdate, err error) { - t.Logf("Received callback with CDSUpdate {%+v} and error {%v}", u, err) - if u.ServiceName != "" { - callbackCh.Send(fmt.Errorf("received serviceName %v in cdsCallback, wanted empty string", u.ServiceName)) - } - if err == nil { - callbackCh.Send(errors.New("received nil error in cdsCallback")) - } - callbackCh.Send(nil) - }) - - // Wait till the request makes it to the fakeServer. This ensures that - // the watch request has been processed by the v2Client. - if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { - t.Fatalf("Timeout expired when expecting an CDS request") - } - waitForNilErr(t, callbackCh) -} - -var ( - badlyMarshaledCDSResponse = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - { - TypeUrl: cdsURL, - Value: []byte{1, 2, 3, 4}, - }, - }, - TypeUrl: cdsURL, - } - goodCluster1 = &xdspb.Cluster{ - Name: clusterName1, - ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS}, - EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{ - EdsConfig: &corepb.ConfigSource{ - ConfigSourceSpecifier: &corepb.ConfigSource_Ads{ - Ads: &corepb.AggregatedConfigSource{}, - }, - }, - ServiceName: serviceName1, - }, - LbPolicy: xdspb.Cluster_ROUND_ROBIN, - LrsServer: &corepb.ConfigSource{ - ConfigSourceSpecifier: &corepb.ConfigSource_Self{ - Self: &corepb.SelfConfigSource{}, - }, - }, - } - marshaledCluster1, _ = proto.Marshal(goodCluster1) - goodCluster2 = &xdspb.Cluster{ - Name: clusterName2, - ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS}, - EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{ - EdsConfig: &corepb.ConfigSource{ - ConfigSourceSpecifier: &corepb.ConfigSource_Ads{ - Ads: &corepb.AggregatedConfigSource{}, - }, - }, - ServiceName: serviceName2, - }, - LbPolicy: xdspb.Cluster_ROUND_ROBIN, - } - marshaledCluster2, _ = proto.Marshal(goodCluster2) - goodCDSResponse1 = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - { - TypeUrl: cdsURL, - Value: marshaledCluster1, - }, - }, - TypeUrl: cdsURL, - } - goodCDSResponse2 = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - { - TypeUrl: cdsURL, - Value: marshaledCluster2, - }, - }, - TypeUrl: cdsURL, - } - cdsResponseWithMultipleResources = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - { - TypeUrl: cdsURL, - Value: marshaledCluster1, - }, - { - TypeUrl: cdsURL, - Value: marshaledCluster2, - }, - }, - TypeUrl: cdsURL, - } -) diff --git a/plugin/traffic/xds_old/client.go b/plugin/traffic/xds_old/client.go deleted file mode 100644 index 152b2ac58..000000000 --- a/plugin/traffic/xds_old/client.go +++ /dev/null @@ -1,113 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -// Package client implementation a full fledged gRPC client for the xDS API -// used by the xds resolver and balancer implementations. -package xds - -import ( - "errors" - "fmt" - "time" - - "github.com/coredns/coredns/plugin/traffic/xds/bootstrap" - - "google.golang.org/grpc" -) - -// Options provides all parameters required for the creation of an xDS client. -type Options struct { - // Config contains a fully populated bootstrap config. It is the - // responsibility of the caller to use some sane defaults here if the - // bootstrap process returned with certain fields left unspecified. - Config bootstrap.Config - // DialOpts contains dial options to be used when dialing the xDS server. - DialOpts []grpc.DialOption -} - -// Client is a full fledged gRPC client which queries a set of discovery APIs -// (collectively termed as xDS) on a remote management server, to discover -// various dynamic resources. A single client object will be shared by the xds -// resolver and balancer implementations. -type Client struct { - opts Options - cc *grpc.ClientConn // Connection to the xDS server - v2c *v2Client // Actual xDS client implementation using the v2 API - - serviceCallback func(ServiceUpdate, error) -} - -// New returns a new xdsClient configured with opts. -func New(opts Options) (*Client, error) { - switch { - case opts.Config.BalancerName == "": - return nil, errors.New("xds: no xds_server name provided in options") - case opts.Config.Creds == nil: - fmt.Printf("%s\n", errors.New("xds: no credentials provided in options")) - case opts.Config.NodeProto == nil: - return nil, errors.New("xds: no node_proto provided in options") - } - - var dopts []grpc.DialOption - if opts.Config.Creds == nil { - dopts = append([]grpc.DialOption{grpc.WithInsecure()}, opts.DialOpts...) - } else { - dopts = append([]grpc.DialOption{opts.Config.Creds}, opts.DialOpts...) - } - cc, err := grpc.Dial(opts.Config.BalancerName, dopts...) - if err != nil { - // An error from a non-blocking dial indicates something serious. - return nil, fmt.Errorf("xds: failed to dial balancer {%s}: %v", opts.Config.BalancerName, err) - } - - println("dialed balancer at", opts.Config.BalancerName) - - c := &Client{ - opts: opts, - cc: cc, - v2c: newV2Client(cc, opts.Config.NodeProto, func(int) time.Duration { return 0 }), - } - return c, nil -} - -// Close closes the gRPC connection to the xDS server. -func (c *Client) Close() { - // TODO: Should we invoke the registered callbacks here with an error that - // the client is closed? - c.v2c.close() - c.cc.Close() -} - -func (c *Client) Run() { - c.v2c.run() -} - -// ServiceUpdate contains update about the service. -type ServiceUpdate struct { - Cluster string -} - -// WatchCluster uses CDS to discover information about the provided clusterName. -func (c *Client) WatchCluster(clusterName string, cdsCb func(CDSUpdate, error)) (cancel func()) { - return c.v2c.watchCDS(clusterName, cdsCb) -} - -// WatchEndpoints uses EDS to discover information about the endpoints in a cluster. -func (c *Client) WatchEndpoints(clusterName string, edsCb func(*EDSUpdate, error)) (cancel func()) { - return c.v2c.watchEDS(clusterName, edsCb) -} diff --git a/plugin/traffic/xds_old/client_test.go b/plugin/traffic/xds_old/client_test.go deleted file mode 100644 index 92724046b..000000000 --- a/plugin/traffic/xds_old/client_test.go +++ /dev/null @@ -1,292 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package xds - -import ( - "errors" - "fmt" - "testing" - "time" - - "github.com/coredns/coredns/plugin/traffic/xds/bootstrap" - - corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" - "google.golang.org/grpc" - "google.golang.org/grpc/xds/internal/testutils" - "google.golang.org/grpc/xds/internal/testutils/fakeserver" -) - -func clientOpts(balancerName string) Options { - return Options{ - Config: bootstrap.Config{ - BalancerName: balancerName, - Creds: grpc.WithInsecure(), - NodeProto: &corepb.Node{}, - }, - // WithTimeout is deprecated. But we are OK to call it here from the - // test, so we clearly know that the dial failed. - DialOpts: []grpc.DialOption{grpc.WithTimeout(5 * time.Second), grpc.WithBlock()}, - } -} - -func TestNew(t *testing.T) { - fakeServer, cleanup, err := fakeserver.StartServer() - if err != nil { - t.Fatalf("Failed to start fake xDS server: %v", err) - } - defer cleanup() - - tests := []struct { - name string - opts Options - wantErr bool - }{ - {name: "empty-opts", opts: Options{}, wantErr: true}, - { - name: "empty-balancer-name", - opts: Options{ - Config: bootstrap.Config{ - Creds: grpc.WithInsecure(), - NodeProto: &corepb.Node{}, - }, - }, - wantErr: true, - }, - { - name: "empty-dial-creds", - opts: Options{ - Config: bootstrap.Config{ - BalancerName: "dummy", - NodeProto: &corepb.Node{}, - }, - }, - wantErr: true, - }, - { - name: "empty-node-proto", - opts: Options{ - Config: bootstrap.Config{ - BalancerName: "dummy", - Creds: grpc.WithInsecure(), - }, - }, - wantErr: true, - }, - { - name: "happy-case", - opts: clientOpts(fakeServer.Address), - wantErr: false, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - c, err := New(test.opts) - if err == nil { - defer c.Close() - } - if (err != nil) != test.wantErr { - t.Fatalf("New(%+v) = %v, wantErr: %v", test.opts, err, test.wantErr) - } - }) - } -} - -// TestWatchService tests the happy case of registering a watcher for -// service updates and receiving a good update. -func TestWatchService(t *testing.T) { - fakeServer, cleanup, err := fakeserver.StartServer() - if err != nil { - t.Fatalf("Failed to start fake xDS server: %v", err) - } - defer cleanup() - - xdsClient, err := New(clientOpts(fakeServer.Address)) - if err != nil { - t.Fatalf("New returned error: %v", err) - } - defer xdsClient.Close() - t.Log("Created an xdsClient...") - - callbackCh := testutils.NewChannel() - cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) { - if err != nil { - callbackCh.Send(fmt.Errorf("xdsClient.WatchService returned error: %v", err)) - return - } - if su.Cluster != goodClusterName1 { - callbackCh.Send(fmt.Errorf("got clusterName: %+v, want clusterName: %+v", su.Cluster, goodClusterName1)) - return - } - callbackCh.Send(nil) - }) - defer cancelWatch() - t.Log("Registered a watcher for service updates...") - - // Make the fakeServer send LDS response. - if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { - t.Fatalf("Timeout expired when expecting an LDS request") - } - fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1} - - // Make the fakeServer send RDS response. - if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { - t.Fatalf("Timeout expired when expecting an RDS request") - } - fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodRDSResponse1} - waitForNilErr(t, callbackCh) -} - -// TestWatchServiceWithNoResponseFromServer tests the case where the -// xDS server does not respond to the requests being sent out as part of -// registering a service update watcher. The underlying v2Client will timeout -// and will send us an error. -func TestWatchServiceWithNoResponseFromServer(t *testing.T) { - fakeServer, cleanup, err := fakeserver.StartServer() - if err != nil { - t.Fatalf("Failed to start fake xDS server: %v", err) - } - defer cleanup() - - xdsClient, err := New(clientOpts(fakeServer.Address)) - if err != nil { - t.Fatalf("New returned error: %v", err) - } - defer xdsClient.Close() - t.Log("Created an xdsClient...") - - oldWatchExpiryTimeout := defaultWatchExpiryTimeout - defaultWatchExpiryTimeout = 500 * time.Millisecond - defer func() { - defaultWatchExpiryTimeout = oldWatchExpiryTimeout - }() - - callbackCh := testutils.NewChannel() - cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) { - if su.Cluster != "" { - callbackCh.Send(fmt.Errorf("got clusterName: %+v, want empty clusterName", su.Cluster)) - return - } - if err == nil { - callbackCh.Send(errors.New("xdsClient.WatchService returned error non-nil error")) - return - } - callbackCh.Send(nil) - }) - defer cancelWatch() - t.Log("Registered a watcher for service updates...") - - // Wait for one request from the client, but send no reponses. - if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { - t.Fatalf("Timeout expired when expecting an LDS request") - } - waitForNilErr(t, callbackCh) -} - -// TestWatchServiceEmptyRDS tests the case where the underlying -// v2Client receives an empty RDS response. -func TestWatchServiceEmptyRDS(t *testing.T) { - fakeServer, cleanup, err := fakeserver.StartServer() - if err != nil { - t.Fatalf("Failed to start fake xDS server: %v", err) - } - defer cleanup() - - xdsClient, err := New(clientOpts(fakeServer.Address)) - if err != nil { - t.Fatalf("New returned error: %v", err) - } - defer xdsClient.Close() - t.Log("Created an xdsClient...") - - oldWatchExpiryTimeout := defaultWatchExpiryTimeout - defaultWatchExpiryTimeout = 500 * time.Millisecond - defer func() { - defaultWatchExpiryTimeout = oldWatchExpiryTimeout - }() - - callbackCh := testutils.NewChannel() - cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) { - if su.Cluster != "" { - callbackCh.Send(fmt.Errorf("got clusterName: %+v, want empty clusterName", su.Cluster)) - return - } - if err == nil { - callbackCh.Send(errors.New("xdsClient.WatchService returned error non-nil error")) - return - } - callbackCh.Send(nil) - }) - defer cancelWatch() - t.Log("Registered a watcher for service updates...") - - // Make the fakeServer send LDS response. - if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { - t.Fatalf("Timeout expired when expecting an LDS request") - } - fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1} - - // Make the fakeServer send an empty RDS response. - if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { - t.Fatalf("Timeout expired when expecting an RDS request") - } - fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: noVirtualHostsInRDSResponse} - waitForNilErr(t, callbackCh) -} - -// TestWatchServiceWithClientClose tests the case where xDS responses are -// received after the client is closed, and we make sure that the registered -// watcher callback is not invoked. -func TestWatchServiceWithClientClose(t *testing.T) { - fakeServer, cleanup, err := fakeserver.StartServer() - if err != nil { - t.Fatalf("Failed to start fake xDS server: %v", err) - } - defer cleanup() - - xdsClient, err := New(clientOpts(fakeServer.Address)) - if err != nil { - t.Fatalf("New returned error: %v", err) - } - defer xdsClient.Close() - t.Log("Created an xdsClient...") - - callbackCh := testutils.NewChannel() - cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) { - callbackCh.Send(errors.New("watcher callback invoked after client close")) - }) - defer cancelWatch() - t.Log("Registered a watcher for service updates...") - - // Make the fakeServer send LDS response. - if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { - t.Fatalf("Timeout expired when expecting an LDS request") - } - fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1} - - xdsClient.Close() - t.Log("Closing the xdsClient...") - - // Push an RDS response from the fakeserver - fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodRDSResponse1} - if cbErr, err := callbackCh.Receive(); err != testutils.ErrRecvTimeout { - t.Fatal(cbErr) - } - -} diff --git a/plugin/traffic/xds_old/eds.go b/plugin/traffic/xds_old/eds.go deleted file mode 100644 index e97297f97..000000000 --- a/plugin/traffic/xds_old/eds.go +++ /dev/null @@ -1,207 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package xds - -import ( - "fmt" - "net" - "strconv" - - xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" - corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" - endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" - typepb "github.com/envoyproxy/go-control-plane/envoy/type" - "github.com/golang/protobuf/ptypes" -) - -// OverloadDropConfig contains the config to drop overloads. -type OverloadDropConfig struct { - Category string - Numerator uint32 - Denominator uint32 -} - -// EndpointHealthStatus represents the health status of an endpoint. -type EndpointHealthStatus int32 - -const ( - // EndpointHealthStatusUnknown represents HealthStatus UNKNOWN. - EndpointHealthStatusUnknown EndpointHealthStatus = iota - // EndpointHealthStatusHealthy represents HealthStatus HEALTHY. - EndpointHealthStatusHealthy - // EndpointHealthStatusUnhealthy represents HealthStatus UNHEALTHY. - EndpointHealthStatusUnhealthy - // EndpointHealthStatusDraining represents HealthStatus DRAINING. - EndpointHealthStatusDraining - // EndpointHealthStatusTimeout represents HealthStatus TIMEOUT. - EndpointHealthStatusTimeout - // EndpointHealthStatusDegraded represents HealthStatus DEGRADED. - EndpointHealthStatusDegraded -) - -// Endpoint contains information of an endpoint. -type Endpoint struct { - Address string - HealthStatus EndpointHealthStatus - Weight uint32 -} - -// Locality contains information of a locality. -type Locality struct { - Endpoints []Endpoint - ID LocalityID - Priority uint32 - Weight uint32 -} - -// EDSUpdate contains an EDS update. -type EDSUpdate struct { - Drops []OverloadDropConfig - Localities []Locality -} - -func parseAddress(socketAddress *corepb.SocketAddress) string { - return net.JoinHostPort(socketAddress.GetAddress(), strconv.Itoa(int(socketAddress.GetPortValue()))) -} - -func parseDropPolicy(dropPolicy *xdspb.ClusterLoadAssignment_Policy_DropOverload) OverloadDropConfig { - percentage := dropPolicy.GetDropPercentage() - var ( - numerator = percentage.GetNumerator() - denominator uint32 - ) - switch percentage.GetDenominator() { - case typepb.FractionalPercent_HUNDRED: - denominator = 100 - case typepb.FractionalPercent_TEN_THOUSAND: - denominator = 10000 - case typepb.FractionalPercent_MILLION: - denominator = 1000000 - } - return OverloadDropConfig{ - Category: dropPolicy.GetCategory(), - Numerator: numerator, - Denominator: denominator, - } -} - -func parseEndpoints(lbEndpoints []*endpointpb.LbEndpoint) []Endpoint { - endpoints := make([]Endpoint, 0, len(lbEndpoints)) - for _, lbEndpoint := range lbEndpoints { - endpoints = append(endpoints, Endpoint{ - HealthStatus: EndpointHealthStatus(lbEndpoint.GetHealthStatus()), - Address: parseAddress(lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress()), - Weight: lbEndpoint.GetLoadBalancingWeight().GetValue(), - }) - } - return endpoints -} - -// ParseEDSRespProto turns EDS response proto message to EDSUpdate. -// -// This is temporarily exported to be used in eds balancer, before it switches -// to use xds client. TODO: unexport. -func ParseEDSRespProto(m *xdspb.ClusterLoadAssignment) (*EDSUpdate, error) { - ret := &EDSUpdate{} - for _, dropPolicy := range m.GetPolicy().GetDropOverloads() { - ret.Drops = append(ret.Drops, parseDropPolicy(dropPolicy)) - } - priorities := make(map[uint32]struct{}) - for _, locality := range m.Endpoints { - l := locality.GetLocality() - if l == nil { - return nil, fmt.Errorf("EDS response contains a locality without ID, locality: %+v", locality) - } - lid := LocalityID{Region: l.Region, Zone: l.Zone, SubZone: l.SubZone} - priority := locality.GetPriority() - priorities[priority] = struct{}{} - ret.Localities = append(ret.Localities, Locality{ - ID: lid, - Endpoints: parseEndpoints(locality.GetLbEndpoints()), - Weight: locality.GetLoadBalancingWeight().GetValue(), - Priority: priority, - }) - } - for i := 0; i < len(priorities); i++ { - if _, ok := priorities[uint32(i)]; !ok { - return nil, fmt.Errorf("priority %v missing (with different priorities %v received)", i, priorities) - } - } - return ret, nil -} - -// ParseEDSRespProtoForTesting parses EDS response, and panic if parsing fails. -// This is used by EDS balancer tests. -// -// TODO: delete this. The EDS balancer should build an EDSUpdate directly, -// instead of building and parsing a proto message. -func ParseEDSRespProtoForTesting(m *xdspb.ClusterLoadAssignment) *EDSUpdate { - u, err := ParseEDSRespProto(m) - if err != nil { - panic(err.Error()) - } - return u -} - -func (v2c *v2Client) handleEDSResponse(resp *xdspb.DiscoveryResponse) error { - v2c.mu.Lock() - defer v2c.mu.Unlock() - - wi := v2c.watchMap[edsURL] - if wi == nil { - return fmt.Errorf("xds: no EDS watcher found when handling EDS response: %+v", resp) - } - - var returnUpdate *EDSUpdate - for _, r := range resp.GetResources() { - var resource ptypes.DynamicAny - if err := ptypes.UnmarshalAny(r, &resource); err != nil { - return fmt.Errorf("xds: failed to unmarshal resource in EDS response: %v", err) - } - cla, ok := resource.Message.(*xdspb.ClusterLoadAssignment) - if !ok { - return fmt.Errorf("xds: unexpected resource type: %T in EDS response", resource.Message) - } - - if cla.GetClusterName() != wi.target[0] { - log.Warningf("xds: got uninteresting EDS resource, got %s, want %s", cla.GetClusterName(), wi.target[0]) - // We won't validate the remaining resources. If one of the - // uninteresting ones is invalid, we will still ACK the response. - continue - } - - u, err := ParseEDSRespProto(cla) - if err != nil { - return err - } - - returnUpdate = u - // Break from the loop because the request resource is found. But - // this also means we won't validate the remaining resources. If one - // of the uninteresting ones is invalid, we will still ACK the - // response. - break - } - - if returnUpdate != nil { - wi.stopTimer() - wi.callback.(edsCallback)(returnUpdate, nil) - } - - return nil -} diff --git a/plugin/traffic/xds_old/eds_test.go b/plugin/traffic/xds_old/eds_test.go deleted file mode 100644 index 874fa1f30..000000000 --- a/plugin/traffic/xds_old/eds_test.go +++ /dev/null @@ -1,287 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package xds - -import ( - "errors" - "fmt" - "testing" - "time" - - xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" - corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" - "github.com/golang/protobuf/ptypes" - anypb "github.com/golang/protobuf/ptypes/any" - "github.com/google/go-cmp/cmp" - "google.golang.org/grpc/xds/internal/testutils" -) - -func TestEDSParseRespProto(t *testing.T) { - tests := []struct { - name string - m *xdspb.ClusterLoadAssignment - want *EDSUpdate - wantErr bool - }{ - { - name: "missing-priority", - m: func() *xdspb.ClusterLoadAssignment { - clab0 := NewClusterLoadAssignmentBuilder("test", nil) - clab0.AddLocality("locality-1", 1, 0, []string{"addr1:314"}, nil) - clab0.AddLocality("locality-2", 1, 2, []string{"addr2:159"}, nil) - return clab0.Build() - }(), - want: nil, - wantErr: true, - }, - { - name: "missing-locality-ID", - m: func() *xdspb.ClusterLoadAssignment { - clab0 := NewClusterLoadAssignmentBuilder("test", nil) - clab0.AddLocality("", 1, 0, []string{"addr1:314"}, nil) - return clab0.Build() - }(), - want: nil, - wantErr: true, - }, - { - name: "good", - m: func() *xdspb.ClusterLoadAssignment { - clab0 := NewClusterLoadAssignmentBuilder("test", nil) - clab0.AddLocality("locality-1", 1, 1, []string{"addr1:314"}, &AddLocalityOptions{ - Health: []corepb.HealthStatus{corepb.HealthStatus_UNHEALTHY}, - Weight: []uint32{271}, - }) - clab0.AddLocality("locality-2", 1, 0, []string{"addr2:159"}, &AddLocalityOptions{ - Health: []corepb.HealthStatus{corepb.HealthStatus_DRAINING}, - Weight: []uint32{828}, - }) - return clab0.Build() - }(), - want: &EDSUpdate{ - Drops: nil, - Localities: []Locality{ - { - Endpoints: []Endpoint{{ - Address: "addr1:314", - HealthStatus: EndpointHealthStatusUnhealthy, - Weight: 271, - }}, - ID: Locality{SubZone: "locality-1"}, - Priority: 1, - Weight: 1, - }, - { - Endpoints: []Endpoint{{ - Address: "addr2:159", - HealthStatus: EndpointHealthStatusDraining, - Weight: 828, - }}, - ID: Locality{SubZone: "locality-2"}, - Priority: 0, - Weight: 1, - }, - }, - }, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := ParseEDSRespProto(tt.m) - if (err != nil) != tt.wantErr { - t.Errorf("ParseEDSRespProto() error = %v, wantErr %v", err, tt.wantErr) - return - } - if d := cmp.Diff(got, tt.want); d != "" { - t.Errorf("ParseEDSRespProto() got = %v, want %v, diff: %v", got, tt.want, d) - } - }) - } -} - -var ( - badlyMarshaledEDSResponse = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - { - TypeUrl: edsURL, - Value: []byte{1, 2, 3, 4}, - }, - }, - TypeUrl: edsURL, - } - badResourceTypeInEDSResponse = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - { - TypeUrl: httpConnManagerURL, - Value: marshaledConnMgr1, - }, - }, - TypeUrl: edsURL, - } - goodEDSResponse1 = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - func() *anypb.Any { - clab0 := NewClusterLoadAssignmentBuilder(goodEDSName, nil) - clab0.AddLocality("locality-1", 1, 1, []string{"addr1:314"}, nil) - clab0.AddLocality("locality-2", 1, 0, []string{"addr2:159"}, nil) - a, _ := ptypes.MarshalAny(clab0.Build()) - return a - }(), - }, - TypeUrl: edsURL, - } - goodEDSResponse2 = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - func() *anypb.Any { - clab0 := NewClusterLoadAssignmentBuilder("not-goodEDSName", nil) - clab0.AddLocality("locality-1", 1, 1, []string{"addr1:314"}, nil) - clab0.AddLocality("locality-2", 1, 0, []string{"addr2:159"}, nil) - a, _ := ptypes.MarshalAny(clab0.Build()) - return a - }(), - }, - TypeUrl: edsURL, - } -) - -func TestEDSHandleResponse(t *testing.T) { - fakeServer, cc, cleanup := startServerAndGetCC(t) - defer cleanup() - - v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) - defer v2c.close() - - tests := []struct { - name string - edsResponse *xdspb.DiscoveryResponse - wantErr bool - wantUpdate *EDSUpdate - wantUpdateErr bool - }{ - // Any in resource is badly marshaled. - { - name: "badly-marshaled_response", - edsResponse: badlyMarshaledEDSResponse, - wantErr: true, - wantUpdate: nil, - wantUpdateErr: false, - }, - // Response doesn't contain resource with the right type. - { - name: "no-config-in-response", - edsResponse: badResourceTypeInEDSResponse, - wantErr: true, - wantUpdate: nil, - wantUpdateErr: false, - }, - // Response contains one uninteresting ClusterLoadAssignment. - { - name: "one-uninterestring-assignment", - edsResponse: goodEDSResponse2, - wantErr: false, - wantUpdate: nil, - wantUpdateErr: false, - }, - // Response contains one good ClusterLoadAssignment. - { - name: "one-good-assignment", - edsResponse: goodEDSResponse1, - wantErr: false, - wantUpdate: &EDSUpdate{ - Localities: []Locality{ - { - Endpoints: []Endpoint{{Address: "addr1:314"}}, - ID: Locality{SubZone: "locality-1"}, - Priority: 1, - Weight: 1, - }, - { - Endpoints: []Endpoint{{Address: "addr2:159"}}, - ID: Locality{SubZone: "locality-2"}, - Priority: 0, - Weight: 1, - }, - }, - }, - wantUpdateErr: false, - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - testWatchHandle(t, &watchHandleTestcase{ - responseToHandle: test.edsResponse, - wantHandleErr: test.wantErr, - wantUpdate: test.wantUpdate, - wantUpdateErr: test.wantUpdateErr, - - edsWatch: v2c.watchEDS, - watchReqChan: fakeServer.XDSRequestChan, - handleXDSResp: v2c.handleEDSResponse, - }) - }) - } -} - -// TestEDSHandleResponseWithoutWatch tests the case where the v2Client -// receives an EDS response without a registered EDS watcher. -func TestEDSHandleResponseWithoutWatch(t *testing.T) { - _, cc, cleanup := startServerAndGetCC(t) - defer cleanup() - - v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) - defer v2c.close() - - if v2c.handleEDSResponse(goodEDSResponse1) == nil { - t.Fatal("v2c.handleEDSResponse() succeeded, should have failed") - } -} - -func TestEDSWatchExpiryTimer(t *testing.T) { - oldWatchExpiryTimeout := defaultWatchExpiryTimeout - defaultWatchExpiryTimeout = 500 * time.Millisecond - defer func() { - defaultWatchExpiryTimeout = oldWatchExpiryTimeout - }() - - fakeServer, cc, cleanup := startServerAndGetCC(t) - defer cleanup() - - v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) - defer v2c.close() - t.Log("Started xds v2Client...") - - callbackCh := testutils.NewChannel() - v2c.watchEDS(goodRouteName1, func(u *EDSUpdate, err error) { - t.Logf("Received callback with edsUpdate {%+v} and error {%v}", u, err) - if u != nil { - callbackCh.Send(fmt.Errorf("received EDSUpdate %v in edsCallback, wanted nil", u)) - } - if err == nil { - callbackCh.Send(errors.New("received nil error in edsCallback")) - } - callbackCh.Send(nil) - }) - - // Wait till the request makes it to the fakeServer. This ensures that - // the watch request has been processed by the v2Client. - if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { - t.Fatalf("Timeout expired when expecting an CDS request") - } - waitForNilErr(t, callbackCh) -} diff --git a/plugin/traffic/xds_old/eds_testutil.go b/plugin/traffic/xds_old/eds_testutil.go deleted file mode 100644 index 7ae03fcfa..000000000 --- a/plugin/traffic/xds_old/eds_testutil.go +++ /dev/null @@ -1,128 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// All structs/functions in this file should be unexported. They are used in EDS -// balancer tests now, to generate test inputs. Eventually, EDS balancer tests -// should generate EDSUpdate directly, instead of generating and parsing the -// proto message. -// TODO: unexported everything in this file. - -package xds - -import ( - "fmt" - "net" - "strconv" - - xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" - corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" - endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" - typepb "github.com/envoyproxy/go-control-plane/envoy/type" - wrapperspb "github.com/golang/protobuf/ptypes/wrappers" -) - -// ClusterLoadAssignmentBuilder builds a ClusterLoadAssignment, aka EDS -// response. -type ClusterLoadAssignmentBuilder struct { - v *xdspb.ClusterLoadAssignment -} - -// NewClusterLoadAssignmentBuilder creates a ClusterLoadAssignmentBuilder. -func NewClusterLoadAssignmentBuilder(clusterName string, dropPercents []uint32) *ClusterLoadAssignmentBuilder { - var drops []*xdspb.ClusterLoadAssignment_Policy_DropOverload - for i, d := range dropPercents { - drops = append(drops, &xdspb.ClusterLoadAssignment_Policy_DropOverload{ - Category: fmt.Sprintf("test-drop-%d", i), - DropPercentage: &typepb.FractionalPercent{ - Numerator: d, - Denominator: typepb.FractionalPercent_HUNDRED, - }, - }) - } - - return &ClusterLoadAssignmentBuilder{ - v: &xdspb.ClusterLoadAssignment{ - ClusterName: clusterName, - Policy: &xdspb.ClusterLoadAssignment_Policy{ - DropOverloads: drops, - }, - }, - } -} - -// AddLocalityOptions contains options when adding locality to the builder. -type AddLocalityOptions struct { - Health []corepb.HealthStatus - Weight []uint32 -} - -// AddLocality adds a locality to the builder. -func (clab *ClusterLoadAssignmentBuilder) AddLocality(subzone string, weight uint32, priority uint32, addrsWithPort []string, opts *AddLocalityOptions) { - var lbEndPoints []*endpointpb.LbEndpoint - for i, a := range addrsWithPort { - host, portStr, err := net.SplitHostPort(a) - if err != nil { - panic("failed to split " + a) - } - port, err := strconv.Atoi(portStr) - if err != nil { - panic("failed to atoi " + portStr) - } - - lbe := &endpointpb.LbEndpoint{ - HostIdentifier: &endpointpb.LbEndpoint_Endpoint{ - Endpoint: &endpointpb.Endpoint{ - Address: &corepb.Address{ - Address: &corepb.Address_SocketAddress{ - SocketAddress: &corepb.SocketAddress{ - Protocol: corepb.SocketAddress_TCP, - Address: host, - PortSpecifier: &corepb.SocketAddress_PortValue{ - PortValue: uint32(port)}}}}}}, - } - if opts != nil { - if i < len(opts.Health) { - lbe.HealthStatus = opts.Health[i] - } - if i < len(opts.Weight) { - lbe.LoadBalancingWeight = &wrapperspb.UInt32Value{Value: opts.Weight[i]} - } - } - lbEndPoints = append(lbEndPoints, lbe) - } - - var localityID *corepb.Locality - if subzone != "" { - localityID = &corepb.Locality{ - Region: "", - Zone: "", - SubZone: subzone, - } - } - - clab.v.Endpoints = append(clab.v.Endpoints, &endpointpb.LocalityLbEndpoints{ - Locality: localityID, - LbEndpoints: lbEndPoints, - LoadBalancingWeight: &wrapperspb.UInt32Value{Value: weight}, - Priority: priority, - }) -} - -// Build builds ClusterLoadAssignment. -func (clab *ClusterLoadAssignmentBuilder) Build() *xdspb.ClusterLoadAssignment { - return clab.v -} diff --git a/plugin/traffic/xds_old/locality.go b/plugin/traffic/xds_old/locality.go deleted file mode 100644 index 8a8324228..000000000 --- a/plugin/traffic/xds_old/locality.go +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package xds - -import ( - "fmt" - - corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" -) - -// Locality is xds.Locality without XXX fields, so it can be used as map -// keys. -// -// xds.Locality cannot be map keys because one of the XXX fields is a slice. -// -// This struct should only be used as map keys. Use the proto message directly -// in all other places. -type LocalityID struct { - Region string - Zone string - SubZone string -} - -func (l LocalityID) String() string { - return fmt.Sprintf("%s-%s-%s", l.Region, l.Zone, l.SubZone) -} - -// ToProto convert Locality to the proto representation. -func (l LocalityID) ToProto() *corepb.Locality { - return &corepb.Locality{ - Region: l.Region, - Zone: l.Zone, - SubZone: l.SubZone, - } -} diff --git a/plugin/traffic/xds_old/log.go b/plugin/traffic/xds_old/log.go deleted file mode 100644 index d11cfa9fe..000000000 --- a/plugin/traffic/xds_old/log.go +++ /dev/null @@ -1,7 +0,0 @@ -package xds - -import ( - clog "github.com/coredns/coredns/plugin/pkg/log" -) - -var log = clog.NewWithPlugin("traffic") diff --git a/plugin/traffic/xds_old/testutil_test.go b/plugin/traffic/xds_old/testutil_test.go deleted file mode 100644 index b05acbd35..000000000 --- a/plugin/traffic/xds_old/testutil_test.go +++ /dev/null @@ -1,169 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package xds - -import ( - "reflect" - "testing" - - xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" - "github.com/google/go-cmp/cmp" - "google.golang.org/grpc" - "google.golang.org/grpc/xds/internal/testutils" - "google.golang.org/grpc/xds/internal/testutils/fakeserver" -) - -type watchHandleTestcase struct { - responseToHandle *xdspb.DiscoveryResponse - wantHandleErr bool - wantUpdate interface{} - wantUpdateErr bool - - // Only one of the following should be non-nil. The one corresponding with - // typeURL will be called. - ldsWatch func(target string, ldsCb ldsCallback) (cancel func()) - rdsWatch func(routeName string, rdsCb rdsCallback) (cancel func()) - cdsWatch func(clusterName string, cdsCb cdsCallback) (cancel func()) - edsWatch func(clusterName string, edsCb edsCallback) (cancel func()) - watchReqChan *testutils.Channel // The request sent for watch will be sent to this channel. - handleXDSResp func(response *xdspb.DiscoveryResponse) error -} - -// testWatchHandle is called to test response handling for each xDS. -// -// It starts the xDS watch as configured in test, waits for the fake xds server -// to receive the request (so watch callback is installed), and calls -// handleXDSResp with responseToHandle (if it's set). It then compares the -// update received by watch callback with the expected results. -func testWatchHandle(t *testing.T, test *watchHandleTestcase) { - type updateErr struct { - u interface{} - err error - } - gotUpdateCh := testutils.NewChannel() - - var cancelWatch func() - // Register the watcher, this will also trigger the v2Client to send the xDS - // request. - switch { - case test.ldsWatch != nil: - cancelWatch = test.ldsWatch(goodLDSTarget1, func(u ldsUpdate, err error) { - t.Logf("in v2c.watchLDS callback, ldsUpdate: %+v, err: %v", u, err) - gotUpdateCh.Send(updateErr{u, err}) - }) - case test.rdsWatch != nil: - cancelWatch = test.rdsWatch(goodRouteName1, func(u rdsUpdate, err error) { - t.Logf("in v2c.watchRDS callback, rdsUpdate: %+v, err: %v", u, err) - gotUpdateCh.Send(updateErr{u, err}) - }) - case test.cdsWatch != nil: - cancelWatch = test.cdsWatch(clusterName1, func(u CDSUpdate, err error) { - t.Logf("in v2c.watchCDS callback, cdsUpdate: %+v, err: %v", u, err) - gotUpdateCh.Send(updateErr{u, err}) - }) - case test.edsWatch != nil: - cancelWatch = test.edsWatch(goodEDSName, func(u *EDSUpdate, err error) { - t.Logf("in v2c.watchEDS callback, edsUpdate: %+v, err: %v", u, err) - gotUpdateCh.Send(updateErr{*u, err}) - }) - default: - t.Fatalf("no watch() is set") - } - defer cancelWatch() - - // Wait till the request makes it to the fakeServer. This ensures that - // the watch request has been processed by the v2Client. - if _, err := test.watchReqChan.Receive(); err != nil { - t.Fatalf("Timeout waiting for an xDS request: %v", err) - } - - // Directly push the response through a call to handleXDSResp. This bypasses - // the fakeServer, so it's only testing the handle logic. Client response - // processing is covered elsewhere. - // - // Also note that this won't trigger ACK, so there's no need to clear the - // request channel afterwards. - if err := test.handleXDSResp(test.responseToHandle); (err != nil) != test.wantHandleErr { - t.Fatalf("v2c.handleRDSResponse() returned err: %v, wantErr: %v", err, test.wantHandleErr) - } - - // If the test doesn't expect the callback to be invoked, verify that no - // update or error is pushed to the callback. - // - // Cannot directly compare test.wantUpdate with nil (typed vs non-typed nil: - // https://golang.org/doc/faq#nil_error). - if c := test.wantUpdate; c == nil || (reflect.ValueOf(c).Kind() == reflect.Ptr && reflect.ValueOf(c).IsNil()) { - update, err := gotUpdateCh.Receive() - if err == testutils.ErrRecvTimeout { - return - } - t.Fatalf("Unexpected update: +%v", update) - } - - wantUpdate := reflect.ValueOf(test.wantUpdate).Elem().Interface() - uErr, err := gotUpdateCh.Receive() - if err == testutils.ErrRecvTimeout { - t.Fatal("Timeout expecting xDS update") - } - gotUpdate := uErr.(updateErr).u - opt := cmp.AllowUnexported(rdsUpdate{}, ldsUpdate{}, CDSUpdate{}, EDSUpdate{}) - if diff := cmp.Diff(gotUpdate, wantUpdate, opt); diff != "" { - t.Fatalf("got update : %+v, want %+v, diff: %s", gotUpdate, wantUpdate, diff) - } - gotUpdateErr := uErr.(updateErr).err - if (gotUpdateErr != nil) != test.wantUpdateErr { - t.Fatalf("got xDS update error {%v}, wantErr: %v", gotUpdateErr, test.wantUpdateErr) - } -} - -// startServerAndGetCC starts a fake XDS server and also returns a ClientConn -// connected to it. -func startServerAndGetCC(t *testing.T) (*fakeserver.Server, *grpc.ClientConn, func()) { - t.Helper() - - fs, sCleanup, err := fakeserver.StartServer() - if err != nil { - t.Fatalf("Failed to start fake xDS server: %v", err) - } - - cc, ccCleanup, err := fs.XDSClientConn() - if err != nil { - sCleanup() - t.Fatalf("Failed to get a clientConn to the fake xDS server: %v", err) - } - return fs, cc, func() { - sCleanup() - ccCleanup() - } -} - -// waitForNilErr waits for a nil error value to be received on the -// provided channel. -func waitForNilErr(t *testing.T, ch *testutils.Channel) { - t.Helper() - - val, err := ch.Receive() - if err == testutils.ErrRecvTimeout { - t.Fatalf("Timeout expired when expecting update") - } - if val != nil { - if cbErr := val.(error); cbErr != nil { - t.Fatal(cbErr) - } - } -} diff --git a/plugin/traffic/xds_old/types.go b/plugin/traffic/xds_old/types.go deleted file mode 100644 index c063233bf..000000000 --- a/plugin/traffic/xds_old/types.go +++ /dev/null @@ -1,83 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package xds - -import ( - "time" - - adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" -) - -type adsStream adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient - -const ( - cdsURL = "type.googleapis.com/envoy.api.v2.Cluster" - edsURL = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment" -) - -// watchState is an enum to represent the state of a watch call. -type watchState int - -const ( - watchEnqueued watchState = iota - watchCancelled - watchStarted -) - -// watchInfo holds all the information about a watch call. -type watchInfo struct { - typeURL string - target []string - state watchState - - callback interface{} - expiryTimer *time.Timer -} - -// cancel marks the state as cancelled, and also stops the expiry timer. -func (wi *watchInfo) cancel() { - wi.state = watchCancelled - if wi.expiryTimer != nil { - wi.expiryTimer.Stop() - } -} - -// stopTimer stops the expiry timer without cancelling the watch. -func (wi *watchInfo) stopTimer() { - if wi.expiryTimer != nil { - wi.expiryTimer.Stop() - } -} - -type ackInfo struct { - typeURL string - version string // Nack if version is an empty string. - nonce string -} - -// CDSUpdate contains information from a received CDS response, which is of -// interest to the registered CDS watcher. -type CDSUpdate struct { - // ServiceName is the service name corresponding to the clusterName which - // is being watched for through CDS. - ServiceName string -} -type cdsCallback func(CDSUpdate, error) - -type edsCallback func(*EDSUpdate, error) diff --git a/plugin/traffic/xds_old/v2client.go b/plugin/traffic/xds_old/v2client.go deleted file mode 100644 index 3e2c1c95e..000000000 --- a/plugin/traffic/xds_old/v2client.go +++ /dev/null @@ -1,441 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package xds - -import ( - "context" - "fmt" - "os" - "sync" - "time" - - "github.com/coredns/coredns/plugin/traffic/xds/buffer" - - xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" - corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" - adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" - "google.golang.org/grpc" -) - -// The value chosen here is based on the default value of the -// initial_fetch_timeout field in corepb.ConfigSource proto. -var defaultWatchExpiryTimeout = 15 * time.Second - -// v2Client performs the actual xDS RPCs using the xDS v2 API. It creates a -// single ADS stream on which the different types of xDS requests and responses -// are multiplexed. -// The reason for splitting this out from the top level xdsClient object is -// because there is already an xDS v3Aplha API in development. If and when we -// want to switch to that, this separation will ease that process. -type v2Client struct { - ctx context.Context - cancelCtx context.CancelFunc - - // ClientConn to the xDS gRPC server. Owned by the parent xdsClient. - cc *grpc.ClientConn - nodeProto *corepb.Node - backoff func(int) time.Duration - - // sendCh in the channel onto which watchInfo objects are pushed by the - // watch API, and it is read and acted upon by the send() goroutine. - sendCh *buffer.Unbounded - - mu sync.Mutex - // Message specific watch infos, protected by the above mutex. These are - // written to, after successfully reading from the update channel, and are - // read from when recovering from a broken stream to resend the xDS - // messages. When the user of this client object cancels a watch call, - // these are set to nil. All accesses to the map protected and any value - // inside the map should be protected with the above mutex. - watchMap map[string]*watchInfo - // ackMap contains the version that was acked (the version in the ack - // request that was sent on wire). The key is typeURL, the value is the - // version string, becaues the versions for different resource types - // should be independent. - ackMap map[string]string - // rdsCache maintains a mapping of {clusterName --> CDSUpdate} from - // validated cluster configurations received in CDS responses. We cache all - // valid cluster configurations, whether or not we are interested in them - // when we received them (because we could become interested in them in the - // future and the server wont send us those resources again). This is only - // to support legacy management servers that do not honor the - // resource_names field. As per the latest spec, the server should resend - // the response when the request changes, even if it had sent the same - // resource earlier (when not asked for). Protected by the above mutex. - cdsCache map[string]CDSUpdate -} - -// newV2Client creates a new v2Client initialized with the passed arguments. -func newV2Client(cc *grpc.ClientConn, nodeProto *corepb.Node, backoff func(int) time.Duration) *v2Client { - v2c := &v2Client{ - cc: cc, - nodeProto: nodeProto, - backoff: backoff, - sendCh: buffer.NewUnbounded(), - watchMap: make(map[string]*watchInfo), - ackMap: make(map[string]string), - cdsCache: make(map[string]CDSUpdate), - } - v2c.ctx, v2c.cancelCtx = context.WithCancel(context.Background()) - - go v2c.run() - return v2c -} - -// close cleans up resources and goroutines allocated by this client. -func (v2c *v2Client) close() { - v2c.cancelCtx() -} - -// run starts an ADS stream (and backs off exponentially, if the previous -// stream failed without receiving a single reply) and runs the sender and -// receiver routines to send and receive data from the stream respectively. -func (v2c *v2Client) run() { - retries := 0 - for { - select { - case <-v2c.ctx.Done(): - return - default: - } - - if retries != 0 { - t := time.NewTimer(v2c.backoff(retries)) - select { - case <-t.C: - case <-v2c.ctx.Done(): - if !t.Stop() { - <-t.C - } - return - } - } - - retries++ - cli := adsgrpc.NewAggregatedDiscoveryServiceClient(v2c.cc) - stream, err := cli.StreamAggregatedResources(v2c.ctx) //, grpc.WaitForReady(true)) - if err != nil { - log.Infof("xds: ADS stream creation failed: %v", err) - os.Exit(1) - } - - // send() could be blocked on reading updates from the different update - // channels when it is not actually sending out messages. So, we need a - // way to break out of send() when recv() returns. This done channel is - // used to for that purpose. - done := make(chan struct{}) - go v2c.send(stream, done) - if v2c.recv(stream) { - retries = 0 - } - close(done) - } -} - -// sendRequest sends a request for provided typeURL and resource on the provided -// stream. -// -// version is the ack version to be sent with the request -// - If this is the new request (not an ack/nack), version will be an empty -// string -// - If this is an ack, version will be the version from the response -// - If this is a nack, version will be the previous acked version (from -// ackMap). If there was no ack before, it will be an empty string -func (v2c *v2Client) sendRequest(stream adsStream, resourceNames []string, typeURL, version, nonce string) bool { - req := &xdspb.DiscoveryRequest{ - Node: v2c.nodeProto, - TypeUrl: typeURL, - ResourceNames: resourceNames, - VersionInfo: version, - ResponseNonce: nonce, - // TODO: populate ErrorDetails for nack. - } - println("v2: sendrequest", typeURL) - if err := stream.Send(req); err != nil { - log.Warningf("xds: request (type %s) for resource %v failed: %v", typeURL, resourceNames, err) - return false - } - return true -} - -// sendExisting sends out xDS requests for registered watchers when recovering -// from a broken stream. -// -// We call stream.Send() here with the lock being held. It should be OK to do -// that here because the stream has just started and Send() usually returns -// quickly (once it pushes the message onto the transport layer) and is only -// ever blocked if we don't have enough flow control quota. -func (v2c *v2Client) sendExisting(stream adsStream) bool { - println("v2: sendexisting") - v2c.mu.Lock() - defer v2c.mu.Unlock() - - // Reset the ack versions when the stream restarts. - v2c.ackMap = make(map[string]string) - - for typeURL, wi := range v2c.watchMap { - if !v2c.sendRequest(stream, wi.target, typeURL, "", "") { - return false - } - } - - return true -} - -// processWatchInfo pulls the fields needed by the request from a watchInfo. -// -// It also calls callback with cached response, and updates the watch map in -// v2c. -// -// If the watch was already canceled, it returns false for send -func (v2c *v2Client) processWatchInfo(t *watchInfo) (target []string, typeURL, version, nonce string, send bool) { - v2c.mu.Lock() - defer v2c.mu.Unlock() - if t.state == watchCancelled { - return // This returns all zero values, and false for send. - } - t.state = watchStarted - send = true - - typeURL = t.typeURL - target = t.target - v2c.checkCacheAndUpdateWatchMap(t) - // TODO: if watch is called again with the same resource names, - // there's no need to send another request. - // - // TODO: should we reset version (for ack) when a new watch is - // started? Or do this only if the resource names are different - // (so we send a new request)? - return -} - -// processAckInfo pulls the fields needed by the ack request from a ackInfo. -// -// If no active watch is found for this ack, it returns false for send. -func (v2c *v2Client) processAckInfo(t *ackInfo) (target []string, typeURL, version, nonce string, send bool) { - typeURL = t.typeURL - - v2c.mu.Lock() - defer v2c.mu.Unlock() - wi, ok := v2c.watchMap[typeURL] - if !ok { - // We don't send the request ack if there's no active watch (this can be - // either the server sends responses before any request, or the watch is - // canceled while the ackInfo is in queue), because there's no resource - // name. And if we send a request with empty resource name list, the - // server may treat it as a wild card and send us everything. - log.Warningf("xds: ack (type %s) not sent because there's no active watch for the type", typeURL) - return // This returns all zero values, and false for send. - } - send = true - - version = t.version - nonce = t.nonce - target = wi.target - if version == "" { - // This is a nack, get the previous acked version. - version = v2c.ackMap[typeURL] - // version will still be an empty string if typeURL isn't - // found in ackMap, this can happen if there wasn't any ack - // before. - } else { - v2c.ackMap[typeURL] = version - } - return -} - -// send reads watch infos from update channel and sends out actual xDS requests -// on the provided ADS stream. -func (v2c *v2Client) send(stream adsStream, done chan struct{}) { - if !v2c.sendExisting(stream) { - println("not existing stream") - return - } - - println("in send") - - for { - select { - case <-v2c.ctx.Done(): - return - case u := <-v2c.sendCh.Get(): - v2c.sendCh.Load() - - var ( - target []string - typeURL, version, nonce string - send bool - ) - switch t := u.(type) { - case *watchInfo: - println("watchInfo") - target, typeURL, version, nonce, send = v2c.processWatchInfo(t) - println(target, typeURL, version, nonce, send) - fmt.Printf("%+v\n", target) - case *ackInfo: - println("ackInfo") - target, typeURL, version, nonce, send = v2c.processAckInfo(t) - } - if !send { - continue - } - if !v2c.sendRequest(stream, target, typeURL, version, nonce) { - return - } - case <-done: - return - } - } -} - -// recv receives xDS responses on the provided ADS stream and branches out to -// message specific handlers. -func (v2c *v2Client) recv(stream adsStream) bool { - println("v2 recv") - success := false - for { - println("WATIIGNM") - resp, err := stream.Recv() - // TODO: call watch callbacks with error when stream is broken. - println("DONE") - if err != nil { - log.Warningf("xds: ADS stream recv failed: %v", err) - return success - } - println("RECEIVING") - var respHandleErr error - switch resp.GetTypeUrl() { - case cdsURL: - println("CDS") - respHandleErr = v2c.handleCDSResponse(resp) - case edsURL: - println("EDS") - respHandleErr = v2c.handleEDSResponse(resp) - default: - log.Warningf("xds: unknown response URL type: %v", resp.GetTypeUrl()) - continue - } - - typeURL := resp.GetTypeUrl() - if respHandleErr != nil { - log.Warningf("xds: response (type %s) handler failed: %v", typeURL, respHandleErr) - v2c.sendCh.Put(&ackInfo{ - typeURL: typeURL, - version: "", - nonce: resp.GetNonce(), - }) - continue - } - v2c.sendCh.Put(&ackInfo{ - typeURL: typeURL, - version: resp.GetVersionInfo(), - nonce: resp.GetNonce(), - }) - success = true - } -} - -// watchCDS registers an CDS watcher for the provided clusterName. Updates -// corresponding to received CDS responses will be pushed to the provided -// callback. The caller can cancel the watch by invoking the returned cancel -// function. -// The provided callback should not block or perform any expensive operations -// or call other methods of the v2Client object. -func (v2c *v2Client) watchCDS(clusterName string, cdsCb cdsCallback) (cancel func()) { - return v2c.watch(&watchInfo{ - typeURL: cdsURL, - target: []string{clusterName}, - callback: cdsCb, - }) -} - -// watchEDS registers an EDS watcher for the provided clusterName. Updates -// corresponding to received EDS responses will be pushed to the provided -// callback. The caller can cancel the watch by invoking the returned cancel -// function. -// The provided callback should not block or perform any expensive operations -// or call other methods of the v2Client object. -func (v2c *v2Client) watchEDS(clusterName string, edsCb edsCallback) (cancel func()) { - return v2c.watch(&watchInfo{ - typeURL: edsURL, - target: []string{clusterName}, - callback: edsCb, - }) - // TODO: Once a registered EDS watch is cancelled, we should send an EDS - // request with no resources. This will let the server know that we are no - // longer interested in this resource. -} - -func (v2c *v2Client) watch(wi *watchInfo) (cancel func()) { - v2c.sendCh.Put(wi) - return func() { - v2c.mu.Lock() - defer v2c.mu.Unlock() - if wi.state == watchEnqueued { - wi.state = watchCancelled - return - } - v2c.watchMap[wi.typeURL].cancel() - delete(v2c.watchMap, wi.typeURL) - // TODO: should we reset ack version string when cancelling the watch? - } -} - -// checkCacheAndUpdateWatchMap is called when a new watch call is handled in -// send(). If an existing watcher is found, its expiry timer is stopped. If the -// watchInfo to be added to the watchMap is found in the cache, the watcher -// callback is immediately invoked. -// -// Caller should hold v2c.mu -func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) { - if existing := v2c.watchMap[wi.typeURL]; existing != nil { - println("cancel") - existing.cancel() - } - - v2c.watchMap[wi.typeURL] = wi - switch wi.typeURL { - // We need to grab the lock inside of the expiryTimer's afterFunc because - // we need to access the watchInfo, which is stored in the watchMap. - case cdsURL: - clusterName := wi.target[0] - println("CDS URLS", clusterName) - if update, ok := v2c.cdsCache[clusterName]; ok { - println("UPDATE SEEN, ok") - - var err error - if v2c.watchMap[cdsURL] == nil { - err = fmt.Errorf("xds: no CDS watcher found when handling CDS watch for cluster {%v} from cache", clusterName) - } - wi.callback.(cdsCallback)(update, err) - return - } - wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { - v2c.mu.Lock() - wi.callback.(cdsCallback)(CDSUpdate{}, fmt.Errorf("xds: CDS target %s not found", wi.target)) - v2c.mu.Unlock() - }) - case edsURL: - wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { - v2c.mu.Lock() - wi.callback.(edsCallback)(nil, fmt.Errorf("xds: EDS target %s not found", wi.target)) - v2c.mu.Unlock() - }) - } -} diff --git a/plugin/traffic/xds_old/v2client_ack_test.go b/plugin/traffic/xds_old/v2client_ack_test.go deleted file mode 100644 index 942943f07..000000000 --- a/plugin/traffic/xds_old/v2client_ack_test.go +++ /dev/null @@ -1,263 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package xds - -import ( - "fmt" - "strconv" - "testing" - "time" - - xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" - "github.com/golang/protobuf/proto" - anypb "github.com/golang/protobuf/ptypes/any" - "github.com/google/go-cmp/cmp" - "google.golang.org/grpc/xds/internal/testutils" - "google.golang.org/grpc/xds/internal/testutils/fakeserver" -) - -// compareXDSRequest reads requests from channel, compare it with want. -func compareXDSRequest(ch *testutils.Channel, want *xdspb.DiscoveryRequest, version, nonce string) error { - val, err := ch.Receive() - if err != nil { - return err - } - req := val.(*fakeserver.Request) - if req.Err != nil { - return fmt.Errorf("unexpected error from request: %v", req.Err) - } - wantClone := proto.Clone(want).(*xdspb.DiscoveryRequest) - wantClone.VersionInfo = version - wantClone.ResponseNonce = nonce - if !cmp.Equal(req.Req, wantClone, cmp.Comparer(proto.Equal)) { - return fmt.Errorf("received request different from want, diff: %s", cmp.Diff(req.Req, wantClone)) - } - return nil -} - -func sendXDSRespWithVersion(ch chan<- *fakeserver.Response, respWithoutVersion *xdspb.DiscoveryResponse, version int) (nonce string) { - respToSend := proto.Clone(respWithoutVersion).(*xdspb.DiscoveryResponse) - respToSend.VersionInfo = strconv.Itoa(version) - nonce = strconv.Itoa(int(time.Now().UnixNano())) - respToSend.Nonce = nonce - ch <- &fakeserver.Response{Resp: respToSend} - return -} - -// startXDS calls watch to send the first request. It then sends a good response -// and checks for ack. -func startXDS(t *testing.T, xdsname string, v2c *v2Client, reqChan *testutils.Channel, req *xdspb.DiscoveryRequest) *testutils.Channel { - callbackCh := testutils.NewChannel() - switch xdsname { - case "LDS": - v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) { - t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err) - callbackCh.Send(struct{}{}) - }) - case "RDS": - v2c.watchRDS(goodRouteName1, func(u rdsUpdate, err error) { - t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err) - callbackCh.Send(struct{}{}) - }) - case "CDS": - v2c.watchCDS(goodClusterName1, func(u CDSUpdate, err error) { - t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err) - callbackCh.Send(struct{}{}) - }) - case "EDS": - v2c.watchEDS(goodEDSName, func(u *EDSUpdate, err error) { - t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err) - callbackCh.Send(struct{}{}) - }) - } - - if err := compareXDSRequest(reqChan, req, "", ""); err != nil { - t.Fatalf("Failed to receive %s request: %v", xdsname, err) - } - t.Logf("FakeServer received %s request...", xdsname) - return callbackCh -} - -// sendGoodResp sends the good response, with the given version, and a random -// nonce. -// -// It also waits and checks that the ack request contains the given version, and -// the generated nonce. -func sendGoodResp(t *testing.T, xdsname string, fakeServer *fakeserver.Server, version int, goodResp *xdspb.DiscoveryResponse, wantReq *xdspb.DiscoveryRequest, callbackCh *testutils.Channel) { - nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, goodResp, version) - t.Logf("Good %s response pushed to fakeServer...", xdsname) - - if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(version), nonce); err != nil { - t.Errorf("Failed to receive %s request: %v", xdsname, err) - } - t.Logf("Good %s response acked", xdsname) - - if _, err := callbackCh.Receive(); err != nil { - t.Errorf("Timeout when expecting %s update", xdsname) - } - t.Logf("Good %s response callback executed", xdsname) -} - -// sendBadResp sends a bad response with the given version. This response will -// be nacked, so we expect a request with the previous version (version-1). -// -// But the nonce in request should be the new nonce. -func sendBadResp(t *testing.T, xdsname string, fakeServer *fakeserver.Server, version int, wantReq *xdspb.DiscoveryRequest) { - var typeURL string - switch xdsname { - case "LDS": - typeURL = ldsURL - case "RDS": - typeURL = rdsURL - case "CDS": - typeURL = cdsURL - case "EDS": - typeURL = edsURL - } - nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{{}}, - TypeUrl: typeURL, - }, version) - t.Logf("Bad %s response pushed to fakeServer...", xdsname) - if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(version-1), nonce); err != nil { - t.Errorf("Failed to receive %s request: %v", xdsname, err) - } - t.Logf("Bad %s response nacked", xdsname) -} - -// TestV2ClientAck verifies that valid responses are acked, and invalid ones -// are nacked. -// -// This test also verifies the version for different types are independent. -func TestV2ClientAck(t *testing.T) { - var ( - versionLDS = 1000 - versionRDS = 2000 - versionCDS = 3000 - versionEDS = 4000 - ) - - fakeServer, cc, cleanup := startServerAndGetCC(t) - defer cleanup() - v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) - defer v2c.close() - t.Log("Started xds v2Client...") - - // Start the watch, send a good response, and check for ack. - cbLDS := startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest) - sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS) - versionLDS++ - cbRDS := startXDS(t, "RDS", v2c, fakeServer.XDSRequestChan, goodRDSRequest) - sendGoodResp(t, "RDS", fakeServer, versionRDS, goodRDSResponse1, goodRDSRequest, cbRDS) - versionRDS++ - cbCDS := startXDS(t, "CDS", v2c, fakeServer.XDSRequestChan, goodCDSRequest) - sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS) - versionCDS++ - cbEDS := startXDS(t, "EDS", v2c, fakeServer.XDSRequestChan, goodEDSRequest) - sendGoodResp(t, "EDS", fakeServer, versionEDS, goodEDSResponse1, goodEDSRequest, cbEDS) - versionEDS++ - - // Send a bad response, and check for nack. - sendBadResp(t, "LDS", fakeServer, versionLDS, goodLDSRequest) - versionLDS++ - sendBadResp(t, "RDS", fakeServer, versionRDS, goodRDSRequest) - versionRDS++ - sendBadResp(t, "CDS", fakeServer, versionCDS, goodCDSRequest) - versionCDS++ - sendBadResp(t, "EDS", fakeServer, versionEDS, goodEDSRequest) - versionEDS++ - - // send another good response, and check for ack, with the new version. - sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS) - versionLDS++ - sendGoodResp(t, "RDS", fakeServer, versionRDS, goodRDSResponse1, goodRDSRequest, cbRDS) - versionRDS++ - sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS) - versionCDS++ - sendGoodResp(t, "EDS", fakeServer, versionEDS, goodEDSResponse1, goodEDSRequest, cbEDS) - versionEDS++ -} - -// Test when the first response is invalid, and is nacked, the nack requests -// should have an empty version string. -func TestV2ClientAckFirstIsNack(t *testing.T) { - var versionLDS = 1000 - - fakeServer, cc, cleanup := startServerAndGetCC(t) - defer cleanup() - v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) - defer v2c.close() - t.Log("Started xds v2Client...") - - // Start the watch, send a good response, and check for ack. - cbLDS := startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest) - - nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{{}}, - TypeUrl: ldsURL, - }, versionLDS) - t.Logf("Bad response pushed to fakeServer...") - - // The expected version string is an empty string, because this is the first - // response, and it's nacked (so there's no previous ack version). - if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, "", nonce); err != nil { - t.Errorf("Failed to receive request: %v", err) - } - t.Logf("Bad response nacked") - versionLDS++ - - sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS) - versionLDS++ -} - -// Test when a nack is sent after a new watch, we nack with the previous acked -// version (instead of resetting to empty string). -func TestV2ClientAckNackAfterNewWatch(t *testing.T) { - var versionLDS = 1000 - - fakeServer, cc, cleanup := startServerAndGetCC(t) - defer cleanup() - v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) - defer v2c.close() - t.Log("Started xds v2Client...") - - // Start the watch, send a good response, and check for ack. - cbLDS := startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest) - sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS) - versionLDS++ - - // Start a new watch. - cbLDS = startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest) - - // This is an invalid response after the new watch. - nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{{}}, - TypeUrl: ldsURL, - }, versionLDS) - t.Logf("Bad response pushed to fakeServer...") - - // The expected version string is the previous acked version. - if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, strconv.Itoa(versionLDS-1), nonce); err != nil { - t.Errorf("Failed to receive request: %v", err) - } - t.Logf("Bad response nacked") - versionLDS++ - - sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS) - versionLDS++ -} diff --git a/plugin/traffic/xds_old/v2client_test.go b/plugin/traffic/xds_old/v2client_test.go deleted file mode 100644 index ff2773dba..000000000 --- a/plugin/traffic/xds_old/v2client_test.go +++ /dev/null @@ -1,444 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package xds - -import ( - "errors" - "testing" - "time" - - "github.com/golang/protobuf/proto" - "google.golang.org/grpc/xds/internal/testutils" - "google.golang.org/grpc/xds/internal/testutils/fakeserver" - - xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" - basepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" - routepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/route" - httppb "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/http_connection_manager/v2" - listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v2" - anypb "github.com/golang/protobuf/ptypes/any" - structpb "github.com/golang/protobuf/ptypes/struct" -) - -const ( - defaultTestTimeout = 1 * time.Second - goodLDSTarget1 = "lds.target.good:1111" - goodLDSTarget2 = "lds.target.good:2222" - goodRouteName1 = "GoodRouteConfig1" - goodRouteName2 = "GoodRouteConfig2" - goodEDSName = "GoodClusterAssignment1" - uninterestingRouteName = "UninterestingRouteName" - goodMatchingDomain = "lds.target.good" - uninterestingDomain = "uninteresting.domain" - goodClusterName1 = "GoodClusterName1" - goodClusterName2 = "GoodClusterName2" - uninterestingClusterName = "UninterestingClusterName" - httpConnManagerURL = "type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager" -) - -var ( - goodNodeProto = &basepb.Node{ - Id: "ENVOY_NODE_ID", - Metadata: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - "TRAFFICDIRECTOR_GRPC_HOSTNAME": { - Kind: &structpb.Value_StringValue{StringValue: "trafficdirector"}, - }, - }, - }, - } - goodLDSRequest = &xdspb.DiscoveryRequest{ - Node: goodNodeProto, - TypeUrl: ldsURL, - ResourceNames: []string{goodLDSTarget1}, - } - goodCDSRequest = &xdspb.DiscoveryRequest{ - Node: goodNodeProto, - TypeUrl: cdsURL, - ResourceNames: []string{goodClusterName1}, - } - goodEDSRequest = &xdspb.DiscoveryRequest{ - Node: goodNodeProto, - TypeUrl: edsURL, - ResourceNames: []string{goodEDSName}, - } - goodHTTPConnManager1 = &httppb.HttpConnectionManager{} - marshaledConnMgr1, _ = proto.Marshal(goodHTTPConnManager1) - emptyHTTPConnManager = &httppb.HttpConnectionManager{ - RouteSpecifier: &httppb.HttpConnectionManager_Rds{ - Rds: &httppb.Rds{}, - }, - } - emptyMarshaledConnMgr, _ = proto.Marshal(emptyHTTPConnManager) - connMgrWithScopedRoutes = &httppb.HttpConnectionManager{ - RouteSpecifier: &httppb.HttpConnectionManager_ScopedRoutes{}, - } - marshaledConnMgrWithScopedRoutes, _ = proto.Marshal(connMgrWithScopedRoutes) - goodListener1 = &xdspb.Listener{ - Name: goodLDSTarget1, - ApiListener: &listenerpb.ApiListener{ - ApiListener: &anypb.Any{ - TypeUrl: httpConnManagerURL, - Value: marshaledConnMgr1, - }, - }, - } - marshaledListener1, _ = proto.Marshal(goodListener1) - goodListener2 = &xdspb.Listener{ - Name: goodLDSTarget2, - ApiListener: &listenerpb.ApiListener{ - ApiListener: &anypb.Any{ - TypeUrl: httpConnManagerURL, - Value: marshaledConnMgr1, - }, - }, - } - marshaledListener2, _ = proto.Marshal(goodListener2) - noAPIListener = &xdspb.Listener{Name: goodLDSTarget1} - marshaledNoAPIListener, _ = proto.Marshal(noAPIListener) - badAPIListener1 = &xdspb.Listener{ - Name: goodLDSTarget1, - ApiListener: &listenerpb.ApiListener{ - ApiListener: &anypb.Any{ - TypeUrl: httpConnManagerURL, - Value: []byte{1, 2, 3, 4}, - }, - }, - } - badAPIListener2 = &xdspb.Listener{ - Name: goodLDSTarget2, - ApiListener: &listenerpb.ApiListener{ - ApiListener: &anypb.Any{ - TypeUrl: httpConnManagerURL, - Value: []byte{1, 2, 3, 4}, - }, - }, - } - badlyMarshaledAPIListener2, _ = proto.Marshal(badAPIListener2) - badResourceListener = &xdspb.Listener{ - Name: goodLDSTarget1, - ApiListener: &listenerpb.ApiListener{ - ApiListener: &anypb.Any{ - TypeUrl: ldsURL, - Value: marshaledListener1, - }, - }, - } - listenerWithEmptyHTTPConnMgr = &xdspb.Listener{ - Name: goodLDSTarget1, - ApiListener: &listenerpb.ApiListener{ - ApiListener: &anypb.Any{ - TypeUrl: httpConnManagerURL, - Value: emptyMarshaledConnMgr, - }, - }, - } - listenerWithScopedRoutesRouteConfig = &xdspb.Listener{ - Name: goodLDSTarget1, - ApiListener: &listenerpb.ApiListener{ - ApiListener: &anypb.Any{ - TypeUrl: httpConnManagerURL, - Value: marshaledConnMgrWithScopedRoutes, - }, - }, - } - goodLDSResponse1 = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - { - TypeUrl: ldsURL, - Value: marshaledListener1, - }, - }, - TypeUrl: ldsURL, - } - goodLDSResponse2 = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - { - TypeUrl: ldsURL, - Value: marshaledListener2, - }, - }, - TypeUrl: ldsURL, - } - emptyLDSResponse = &xdspb.DiscoveryResponse{TypeUrl: ldsURL} - badlyMarshaledLDSResponse = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - { - TypeUrl: ldsURL, - Value: []byte{1, 2, 3, 4}, - }, - }, - TypeUrl: ldsURL, - } - badResourceTypeInLDSResponse = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - { - TypeUrl: httpConnManagerURL, - Value: marshaledConnMgr1, - }, - }, - TypeUrl: ldsURL, - } - ldsResponseWithMultipleResources = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - { - TypeUrl: ldsURL, - Value: marshaledListener2, - }, - { - TypeUrl: ldsURL, - Value: marshaledListener1, - }, - }, - TypeUrl: ldsURL, - } - noAPIListenerLDSResponse = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - { - TypeUrl: ldsURL, - Value: marshaledNoAPIListener, - }, - }, - TypeUrl: ldsURL, - } - goodBadUglyLDSResponse = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - { - TypeUrl: ldsURL, - Value: marshaledListener2, - }, - { - TypeUrl: ldsURL, - Value: marshaledListener1, - }, - { - TypeUrl: ldsURL, - Value: badlyMarshaledAPIListener2, - }, - }, - TypeUrl: ldsURL, - } - goodRouteConfig1 = &xdspb.RouteConfiguration{ - Name: goodRouteName1, - VirtualHosts: []*routepb.VirtualHost{ - { - Domains: []string{uninterestingDomain}, - Routes: []*routepb.Route{ - { - Match: &routepb.RouteMatch{PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: ""}}, - Action: &routepb.Route_Route{ - Route: &routepb.RouteAction{ - ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: uninterestingClusterName}, - }, - }, - }, - }, - }, - { - Domains: []string{goodMatchingDomain}, - Routes: []*routepb.Route{ - { - Match: &routepb.RouteMatch{PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: ""}}, - Action: &routepb.Route_Route{ - Route: &routepb.RouteAction{ - ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: goodClusterName1}, - }, - }, - }, - }, - }, - }, - } - marshaledGoodRouteConfig1, _ = proto.Marshal(goodRouteConfig1) - goodRouteConfig2 = &xdspb.RouteConfiguration{ - Name: goodRouteName2, - VirtualHosts: []*routepb.VirtualHost{ - { - Domains: []string{uninterestingDomain}, - Routes: []*routepb.Route{ - { - Match: &routepb.RouteMatch{PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: ""}}, - Action: &routepb.Route_Route{ - Route: &routepb.RouteAction{ - ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: uninterestingClusterName}, - }, - }, - }, - }, - }, - { - Domains: []string{goodMatchingDomain}, - Routes: []*routepb.Route{ - { - Match: &routepb.RouteMatch{PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: ""}}, - Action: &routepb.Route_Route{ - Route: &routepb.RouteAction{ - ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: goodClusterName2}, - }, - }, - }, - }, - }, - }, - } - marshaledGoodRouteConfig2, _ = proto.Marshal(goodRouteConfig2) - uninterestingRouteConfig = &xdspb.RouteConfiguration{ - Name: uninterestingRouteName, - VirtualHosts: []*routepb.VirtualHost{ - { - Domains: []string{uninterestingDomain}, - Routes: []*routepb.Route{ - { - Action: &routepb.Route_Route{ - Route: &routepb.RouteAction{ - ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: uninterestingClusterName}, - }, - }, - }, - }, - }, - }, - } - marshaledUninterestingRouteConfig, _ = proto.Marshal(uninterestingRouteConfig) -) - -// TestV2ClientBackoffAfterRecvError verifies if the v2Client backoffs when it -// encounters a Recv error while receiving an LDS response. -func TestV2ClientBackoffAfterRecvError(t *testing.T) { - fakeServer, cc, cleanup := startServerAndGetCC(t) - defer cleanup() - - // Override the v2Client backoff function with this, so that we can verify - // that a backoff actually was triggerred. - boCh := make(chan int, 1) - clientBackoff := func(v int) time.Duration { - boCh <- v - return 0 - } - - v2c := newV2Client(cc, goodNodeProto, clientBackoff) - defer v2c.close() - t.Log("Started xds v2Client...") - - callbackCh := make(chan struct{}) - v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) { - close(callbackCh) - }) - if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { - t.Fatalf("Timeout expired when expecting an LDS request") - } - t.Log("FakeServer received request...") - - fakeServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("RPC error")} - t.Log("Bad LDS response pushed to fakeServer...") - - timer := time.NewTimer(defaultTestTimeout) - select { - case <-timer.C: - t.Fatal("Timeout when expecting LDS update") - case <-boCh: - timer.Stop() - t.Log("v2Client backed off before retrying...") - case <-callbackCh: - t.Fatal("Received unexpected LDS callback") - } -} - -// TestV2ClientRetriesAfterBrokenStream verifies the case where a stream -// encountered a Recv() error, and is expected to send out xDS requests for -// registered watchers once it comes back up again. -func TestV2ClientRetriesAfterBrokenStream(t *testing.T) { - fakeServer, cc, cleanup := startServerAndGetCC(t) - defer cleanup() - - v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) - defer v2c.close() - t.Log("Started xds v2Client...") - - callbackCh := testutils.NewChannel() - v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) { - t.Logf("Received LDS callback with ldsUpdate {%+v} and error {%v}", u, err) - callbackCh.Send(struct{}{}) - }) - if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { - t.Fatalf("Timeout expired when expecting an LDS request") - } - t.Log("FakeServer received request...") - - fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1} - t.Log("Good LDS response pushed to fakeServer...") - - if _, err := callbackCh.Receive(); err != nil { - t.Fatal("Timeout when expecting LDS update") - } - - // Read the ack, so the next request is sent after stream re-creation. - if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { - t.Fatalf("Timeout expired when expecting an LDS ACK") - } - - fakeServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("RPC error")} - t.Log("Bad LDS response pushed to fakeServer...") - - val, err := fakeServer.XDSRequestChan.Receive() - if err == testutils.ErrRecvTimeout { - t.Fatalf("Timeout expired when expecting LDS update") - } - gotRequest := val.(*fakeserver.Request) - if !proto.Equal(gotRequest.Req, goodLDSRequest) { - t.Fatalf("gotRequest: %+v, wantRequest: %+v", gotRequest.Req, goodLDSRequest) - } -} - -// TestV2ClientCancelWatch verifies that the registered watch callback is not -// invoked if a response is received after the watcher is cancelled. -func TestV2ClientCancelWatch(t *testing.T) { - fakeServer, cc, cleanup := startServerAndGetCC(t) - defer cleanup() - - v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) - defer v2c.close() - t.Log("Started xds v2Client...") - - callbackCh := testutils.NewChannel() - cancelFunc := v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) { - t.Logf("Received LDS callback with ldsUpdate {%+v} and error {%v}", u, err) - callbackCh.Send(struct{}{}) - }) - if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { - t.Fatalf("Timeout expired when expecting an LDS request") - } - t.Log("FakeServer received request...") - - fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1} - t.Log("Good LDS response pushed to fakeServer...") - - if _, err := callbackCh.Receive(); err != nil { - t.Fatal("Timeout when expecting LDS update") - } - - cancelFunc() - - fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1} - t.Log("Another good LDS response pushed to fakeServer...") - - if _, err := callbackCh.Receive(); err != testutils.ErrRecvTimeout { - t.Fatalf("Watch callback invoked after the watcher was cancelled") - } -}