diff --git a/plugin/traffic/xds/bootstrap/bootstrap.go b/plugin/traffic/xds/bootstrap/bootstrap.go new file mode 100644 index 000000000..cc2150ccc --- /dev/null +++ b/plugin/traffic/xds/bootstrap/bootstrap.go @@ -0,0 +1,168 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package bootstrap provides the functionality to initialize certain aspects +// of an xDS client by reading a bootstrap file. +package bootstrap + +import ( + "bytes" + "encoding/json" + "fmt" + "io/ioutil" + "os" + + "github.com/golang/protobuf/jsonpb" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/google" + "google.golang.org/grpc/grpclog" + + corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" +) + +const ( + // Environment variable which holds the name of the xDS bootstrap file. + fileEnv = "GRPC_XDS_BOOTSTRAP" + // Type name for Google default credentials. + googleDefaultCreds = "google_default" +) + +var gRPCVersion = fmt.Sprintf("gRPC-Go %s", grpc.Version) + +// For overriding in unit tests. +var fileReadFunc = ioutil.ReadFile + +// Config provides the xDS client with several key bits of information that it +// requires in its interaction with an xDS server. The Config is initialized +// from the bootstrap file. +type Config struct { + // BalancerName is the name of the xDS server to connect to. + // + // The bootstrap file contains a list of servers (with name+creds), but we + // pick the first one. + BalancerName string + // Creds contains the credentials to be used while talking to the xDS + // server, as a grpc.DialOption. + Creds grpc.DialOption + // NodeProto contains the node proto to be used in xDS requests. + NodeProto *corepb.Node +} + +type channelCreds struct { + Type string `json:"type"` + Config json.RawMessage `json:"config"` +} + +type xdsServer struct { + ServerURI string `json:"server_uri"` + ChannelCreds []channelCreds `json:"channel_creds"` +} + +// NewConfig returns a new instance of Config initialized by reading the +// bootstrap file found at ${GRPC_XDS_BOOTSTRAP}. +// +// The format of the bootstrap file will be as follows: +// { +// "xds_server": { +// "server_uri": , +// "channel_creds": [ +// { +// "type": , +// "config": +// } +// ] +// }, +// "node": +// } +// +// Currently, we support exactly one type of credential, which is +// "google_default", where we use the host's default certs for transport +// credentials and a Google oauth token for call credentials. +// +// This function tries to process as much of the bootstrap file as possible (in +// the presence of the errors) and may return a Config object with certain +// fields left unspecified, in which case the caller should use some sane +// defaults. +func NewConfig() *Config { + config := &Config{} + + fName, ok := os.LookupEnv(fileEnv) + if !ok { + grpclog.Errorf("xds: %s environment variable not set", fileEnv) + return config + } + + grpclog.Infof("xds: Reading bootstrap file from %s", fName) + data, err := fileReadFunc(fName) + if err != nil { + grpclog.Errorf("xds: bootstrap file {%v} read failed: %v", fName, err) + return config + } + + var jsonData map[string]json.RawMessage + if err := json.Unmarshal(data, &jsonData); err != nil { + grpclog.Errorf("xds: json.Unmarshal(%v) failed during bootstrap: %v", string(data), err) + return config + } + + m := jsonpb.Unmarshaler{AllowUnknownFields: true} + for k, v := range jsonData { + switch k { + case "node": + n := &corepb.Node{} + if err := m.Unmarshal(bytes.NewReader(v), n); err != nil { + grpclog.Errorf("xds: jsonpb.Unmarshal(%v) for field %q failed during bootstrap: %v", string(v), k, err) + break + } + config.NodeProto = n + case "xds_servers": + var servers []*xdsServer + if err := json.Unmarshal(v, &servers); err != nil { + grpclog.Errorf("xds: json.Unmarshal(%v) for field %q failed during bootstrap: %v", string(v), k, err) + break + } + if len(servers) < 1 { + grpclog.Errorf("xds: bootstrap file parsing failed during bootstrap: file doesn't contain any xds server to connect to") + break + } + xs := servers[0] + config.BalancerName = xs.ServerURI + for _, cc := range xs.ChannelCreds { + if cc.Type == googleDefaultCreds { + config.Creds = grpc.WithCredentialsBundle(google.NewComputeEngineCredentials()) + // We stop at the first credential type that we support. + break + } + } + default: + // Do not fail the xDS bootstrap when an unknown field is seen. + grpclog.Warningf("xds: unexpected data in bootstrap file: {%v, %v}", k, string(v)) + } + } + + // If we don't find a nodeProto in the bootstrap file, we just create an + // empty one here. That way, callers of this function can always expect + // that the NodeProto field is non-nil. + if config.NodeProto == nil { + config.NodeProto = &corepb.Node{} + } + config.NodeProto.BuildVersion = gRPCVersion + + grpclog.Infof("xds: bootstrap.NewConfig returning: %+v", config) + return config +} diff --git a/plugin/traffic/xds/bootstrap/bootstrap_test.go b/plugin/traffic/xds/bootstrap/bootstrap_test.go new file mode 100644 index 000000000..a55e1d0f7 --- /dev/null +++ b/plugin/traffic/xds/bootstrap/bootstrap_test.go @@ -0,0 +1,260 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package bootstrap + +import ( + "os" + "testing" + + "github.com/golang/protobuf/proto" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/google" + + corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + structpb "github.com/golang/protobuf/ptypes/struct" +) + +var ( + nodeProto = &corepb.Node{ + Id: "ENVOY_NODE_ID", + Metadata: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "TRAFFICDIRECTOR_GRPC_HOSTNAME": { + Kind: &structpb.Value_StringValue{StringValue: "trafficdirector"}, + }, + }, + }, + BuildVersion: gRPCVersion, + } + nilCredsConfig = &Config{ + BalancerName: "trafficdirector.googleapis.com:443", + Creds: nil, + NodeProto: nodeProto, + } + nonNilCredsConfig = &Config{ + BalancerName: "trafficdirector.googleapis.com:443", + Creds: grpc.WithCredentialsBundle(google.NewComputeEngineCredentials()), + NodeProto: nodeProto, + } +) + +// TestNewConfig exercises the functionality in NewConfig with different +// bootstrap file contents. It overrides the fileReadFunc by returning +// bootstrap file contents defined in this test, instead of reading from a +// file. +func TestNewConfig(t *testing.T) { + bootstrapFileMap := map[string]string{ + "empty": "", + "badJSON": `["test": 123]`, + "emptyNodeProto": ` + { + "xds_servers" : [{ + "server_uri": "trafficdirector.googleapis.com:443" + }] + }`, + "emptyXdsServer": ` + { + "node": { + "id": "ENVOY_NODE_ID", + "metadata": { + "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" + } + } + }`, + "unknownTopLevelFieldInFile": ` + { + "node": { + "id": "ENVOY_NODE_ID", + "metadata": { + "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" + } + }, + "xds_servers" : [{ + "server_uri": "trafficdirector.googleapis.com:443", + "channel_creds": [ + { "type": "not-google-default" } + ] + }], + "unknownField": "foobar" + }`, + "unknownFieldInNodeProto": ` + { + "node": { + "id": "ENVOY_NODE_ID", + "unknownField": "foobar", + "metadata": { + "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" + } + } + }`, + "unknownFieldInXdsServer": ` + { + "node": { + "id": "ENVOY_NODE_ID", + "metadata": { + "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" + } + }, + "xds_servers" : [{ + "server_uri": "trafficdirector.googleapis.com:443", + "channel_creds": [ + { "type": "not-google-default" } + ], + "unknownField": "foobar" + }] + }`, + "emptyChannelCreds": ` + { + "node": { + "id": "ENVOY_NODE_ID", + "metadata": { + "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" + } + }, + "xds_servers" : [{ + "server_uri": "trafficdirector.googleapis.com:443" + }] + }`, + "nonGoogleDefaultCreds": ` + { + "node": { + "id": "ENVOY_NODE_ID", + "metadata": { + "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" + } + }, + "xds_servers" : [{ + "server_uri": "trafficdirector.googleapis.com:443", + "channel_creds": [ + { "type": "not-google-default" } + ] + }] + }`, + "multipleChannelCreds": ` + { + "node": { + "id": "ENVOY_NODE_ID", + "metadata": { + "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" + } + }, + "xds_servers" : [{ + "server_uri": "trafficdirector.googleapis.com:443", + "channel_creds": [ + { "type": "not-google-default" }, + { "type": "google_default" } + ] + }] + }`, + "goodBootstrap": ` + { + "node": { + "id": "ENVOY_NODE_ID", + "metadata": { + "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" + } + }, + "xds_servers" : [{ + "server_uri": "trafficdirector.googleapis.com:443", + "channel_creds": [ + { "type": "google_default" } + ] + }] + }`, + "multipleXDSServers": ` + { + "node": { + "id": "ENVOY_NODE_ID", + "metadata": { + "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" + } + }, + "xds_servers" : [ + { + "server_uri": "trafficdirector.googleapis.com:443", + "channel_creds": [{ "type": "google_default" }] + }, + { + "server_uri": "backup.never.use.com:1234", + "channel_creds": [{ "type": "not-google-default" }] + } + ] + }`, + } + + oldFileReadFunc := fileReadFunc + fileReadFunc = func(name string) ([]byte, error) { + if b, ok := bootstrapFileMap[name]; ok { + return []byte(b), nil + } + return nil, os.ErrNotExist + } + defer func() { + fileReadFunc = oldFileReadFunc + os.Unsetenv(fileEnv) + }() + + tests := []struct { + name string + wantConfig *Config + }{ + {"nonExistentBootstrapFile", &Config{}}, + {"empty", &Config{}}, + {"badJSON", &Config{}}, + {"emptyNodeProto", &Config{ + BalancerName: "trafficdirector.googleapis.com:443", + NodeProto: &corepb.Node{BuildVersion: gRPCVersion}, + }}, + {"emptyXdsServer", &Config{NodeProto: nodeProto}}, + {"unknownTopLevelFieldInFile", nilCredsConfig}, + {"unknownFieldInNodeProto", &Config{NodeProto: nodeProto}}, + {"unknownFieldInXdsServer", nilCredsConfig}, + {"emptyChannelCreds", nilCredsConfig}, + {"nonGoogleDefaultCreds", nilCredsConfig}, + {"multipleChannelCreds", nonNilCredsConfig}, + {"goodBootstrap", nonNilCredsConfig}, + {"multipleXDSServers", nonNilCredsConfig}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if err := os.Setenv(fileEnv, test.name); err != nil { + t.Fatalf("os.Setenv(%s, %s) failed with error: %v", fileEnv, test.name, err) + } + config := NewConfig() + if config.BalancerName != test.wantConfig.BalancerName { + t.Errorf("config.BalancerName is %s, want %s", config.BalancerName, test.wantConfig.BalancerName) + } + if !proto.Equal(config.NodeProto, test.wantConfig.NodeProto) { + t.Errorf("config.NodeProto is %#v, want %#v", config.NodeProto, test.wantConfig.NodeProto) + } + if (config.Creds != nil) != (test.wantConfig.Creds != nil) { + t.Errorf("config.Creds is %#v, want %#v", config.Creds, test.wantConfig.Creds) + } + }) + } +} + +func TestNewConfigEnvNotSet(t *testing.T) { + os.Unsetenv(fileEnv) + wantConfig := Config{} + if config := NewConfig(); *config != wantConfig { + t.Errorf("NewConfig() returned : %#v, wanted an empty Config object", config) + } +} diff --git a/plugin/traffic/xds/buffer/unbounded.go b/plugin/traffic/xds/buffer/unbounded.go new file mode 100644 index 000000000..9f6a0c120 --- /dev/null +++ b/plugin/traffic/xds/buffer/unbounded.go @@ -0,0 +1,85 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package buffer provides an implementation of an unbounded buffer. +package buffer + +import "sync" + +// Unbounded is an implementation of an unbounded buffer which does not use +// extra goroutines. This is typically used for passing updates from one entity +// to another within gRPC. +// +// All methods on this type are thread-safe and don't block on anything except +// the underlying mutex used for synchronization. +// +// Unbounded supports values of any type to be stored in it by using a channel +// of `interface{}`. This means that a call to Put() incurs an extra memory +// allocation, and also that users need a type assertion while reading. For +// performance critical code paths, using Unbounded is strongly discouraged and +// defining a new type specific implementation of this buffer is preferred. See +// internal/transport/transport.go for an example of this. +type Unbounded struct { + c chan interface{} + mu sync.Mutex + backlog []interface{} +} + +// NewUnbounded returns a new instance of Unbounded. +func NewUnbounded() *Unbounded { + return &Unbounded{c: make(chan interface{}, 1)} +} + +// Put adds t to the unbounded buffer. +func (b *Unbounded) Put(t interface{}) { + b.mu.Lock() + if len(b.backlog) == 0 { + select { + case b.c <- t: + b.mu.Unlock() + return + default: + } + } + b.backlog = append(b.backlog, t) + b.mu.Unlock() +} + +// Load sends the earliest buffered data, if any, onto the read channel +// returned by Get(). Users are expected to call this every time they read a +// value from the read channel. +func (b *Unbounded) Load() { + b.mu.Lock() + if len(b.backlog) > 0 { + select { + case b.c <- b.backlog[0]: + b.backlog[0] = nil + b.backlog = b.backlog[1:] + default: + } + } + b.mu.Unlock() +} + +// Get returns a read channel on which values added to the buffer, via Put(), +// are sent on. +// +// Upon reading a value from this channel, users are expected to call Load() to +// send the next buffered value onto the channel if there is any. +func (b *Unbounded) Get() <-chan interface{} { + return b.c +} diff --git a/plugin/traffic/xds/buffer/unbounded_test.go b/plugin/traffic/xds/buffer/unbounded_test.go new file mode 100644 index 000000000..c8067019b --- /dev/null +++ b/plugin/traffic/xds/buffer/unbounded_test.go @@ -0,0 +1,111 @@ +/* + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package buffer + +import ( + "reflect" + "sort" + "sync" + "testing" +) + +const ( + numWriters = 10 + numWrites = 10 +) + +// wantReads contains the set of values expected to be read by the reader +// goroutine in the tests. +var wantReads []int + +func init() { + for i := 0; i < numWriters; i++ { + for j := 0; j < numWrites; j++ { + wantReads = append(wantReads, i) + } + } +} + +// TestSingleWriter starts one reader and one writer goroutine and makes sure +// that the reader gets all the value added to the buffer by the writer. +func TestSingleWriter(t *testing.T) { + ub := NewUnbounded() + reads := []int{} + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + ch := ub.Get() + for i := 0; i < numWriters*numWrites; i++ { + r := <-ch + reads = append(reads, r.(int)) + ub.Load() + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < numWriters; i++ { + for j := 0; j < numWrites; j++ { + ub.Put(i) + } + } + }() + + wg.Wait() + if !reflect.DeepEqual(reads, wantReads) { + t.Errorf("reads: %#v, wantReads: %#v", reads, wantReads) + } +} + +// TestMultipleWriters starts multiple writers and one reader goroutine and +// makes sure that the reader gets all the data written by all writers. +func TestMultipleWriters(t *testing.T) { + ub := NewUnbounded() + reads := []int{} + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + ch := ub.Get() + for i := 0; i < numWriters*numWrites; i++ { + r := <-ch + reads = append(reads, r.(int)) + ub.Load() + } + }() + + wg.Add(numWriters) + for i := 0; i < numWriters; i++ { + go func(index int) { + defer wg.Done() + for j := 0; j < numWrites; j++ { + ub.Put(index) + } + }(i) + } + + wg.Wait() + sort.Ints(reads) + if !reflect.DeepEqual(reads, wantReads) { + t.Errorf("reads: %#v, wantReads: %#v", reads, wantReads) + } +} diff --git a/plugin/traffic/xds/cds.go b/plugin/traffic/xds/cds.go new file mode 100644 index 000000000..381b0d0c9 --- /dev/null +++ b/plugin/traffic/xds/cds.go @@ -0,0 +1,88 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xds + +import ( + "fmt" + + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + "github.com/golang/protobuf/ptypes" +) + +// handleCDSResponse processes an CDS response received from the xDS server. On +// receipt of a good response, it also invokes the registered watcher callback. +func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error { + v2c.mu.Lock() + defer v2c.mu.Unlock() + + wi := v2c.watchMap[cdsURL] + if wi == nil { + return fmt.Errorf("xds: no CDS watcher found when handling CDS response: %+v", resp) + } + + var returnUpdate CDSUpdate + localCache := make(map[string]CDSUpdate) + for _, r := range resp.GetResources() { + var resource ptypes.DynamicAny + if err := ptypes.UnmarshalAny(r, &resource); err != nil { + return fmt.Errorf("xds: failed to unmarshal resource in CDS response: %v", err) + } + cluster, ok := resource.Message.(*xdspb.Cluster) + if !ok { + return fmt.Errorf("xds: unexpected resource type: %T in CDS response", resource.Message) + } + update, err := validateCluster(cluster) + if err != nil { + return err + } + + // If the Cluster message in the CDS response did not contain a + // serviceName, we will just use the clusterName for EDS. + if update.ServiceName == "" { + update.ServiceName = cluster.GetName() + } + localCache[cluster.GetName()] = update + if cluster.GetName() == wi.target[0] { + returnUpdate = update + } + } + v2c.cdsCache = localCache + + var err error + if returnUpdate.ServiceName == "" { + err = fmt.Errorf("xds: CDS target %s not found in received response %+v", wi.target, resp) + } + wi.stopTimer() + wi.callback.(cdsCallback)(returnUpdate, err) + return nil +} + +func validateCluster(cluster *xdspb.Cluster) (CDSUpdate, error) { + emptyUpdate := CDSUpdate{ServiceName: ""} + switch { + case cluster.GetType() != xdspb.Cluster_EDS: + return emptyUpdate, fmt.Errorf("xds: unexpected cluster type %v in response: %+v", cluster.GetType(), cluster) + case cluster.GetEdsClusterConfig().GetEdsConfig().GetAds() == nil: + return emptyUpdate, fmt.Errorf("xds: unexpected edsConfig in response: %+v", cluster) + case cluster.GetLbPolicy() != xdspb.Cluster_ROUND_ROBIN: + return emptyUpdate, fmt.Errorf("xds: unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster) + } + + return CDSUpdate{ServiceName: cluster.GetEdsClusterConfig().GetServiceName()}, nil +} diff --git a/plugin/traffic/xds/cds_test.go b/plugin/traffic/xds/cds_test.go new file mode 100644 index 000000000..c8412f52a --- /dev/null +++ b/plugin/traffic/xds/cds_test.go @@ -0,0 +1,487 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xds + +import ( + "errors" + "fmt" + "reflect" + "testing" + "time" + + discoverypb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + "github.com/golang/protobuf/proto" + anypb "github.com/golang/protobuf/ptypes/any" + "google.golang.org/grpc/xds/internal/testutils" + "google.golang.org/grpc/xds/internal/testutils/fakeserver" +) + +const ( + clusterName1 = "foo-cluster" + clusterName2 = "bar-cluster" + serviceName1 = "foo-service" + serviceName2 = "bar-service" +) + +func (v2c *v2Client) cloneCDSCacheForTesting() map[string]CDSUpdate { + v2c.mu.Lock() + defer v2c.mu.Unlock() + + cloneCache := make(map[string]CDSUpdate) + for k, v := range v2c.cdsCache { + cloneCache[k] = v + } + return cloneCache +} + +func TestValidateCluster(t *testing.T) { + emptyUpdate := CDSUpdate{ServiceName: "", EnableLRS: false} + tests := []struct { + name string + cluster *xdspb.Cluster + wantUpdate CDSUpdate + wantErr bool + }{ + { + name: "non-eds-cluster-type", + cluster: &xdspb.Cluster{ + ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_STATIC}, + EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{ + EdsConfig: &corepb.ConfigSource{ + ConfigSourceSpecifier: &corepb.ConfigSource_Ads{ + Ads: &corepb.AggregatedConfigSource{}, + }, + }, + }, + LbPolicy: xdspb.Cluster_LEAST_REQUEST, + }, + wantUpdate: emptyUpdate, + wantErr: true, + }, + { + name: "no-eds-config", + cluster: &xdspb.Cluster{ + ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS}, + LbPolicy: xdspb.Cluster_ROUND_ROBIN, + }, + wantUpdate: emptyUpdate, + wantErr: true, + }, + { + name: "no-ads-config-source", + cluster: &xdspb.Cluster{ + ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS}, + EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{}, + LbPolicy: xdspb.Cluster_ROUND_ROBIN, + }, + wantUpdate: emptyUpdate, + wantErr: true, + }, + { + name: "non-round-robin-lb-policy", + cluster: &xdspb.Cluster{ + ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS}, + EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{ + EdsConfig: &corepb.ConfigSource{ + ConfigSourceSpecifier: &corepb.ConfigSource_Ads{ + Ads: &corepb.AggregatedConfigSource{}, + }, + }, + }, + LbPolicy: xdspb.Cluster_LEAST_REQUEST, + }, + wantUpdate: emptyUpdate, + wantErr: true, + }, + { + name: "happy-case-no-service-name-no-lrs", + cluster: &xdspb.Cluster{ + ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS}, + EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{ + EdsConfig: &corepb.ConfigSource{ + ConfigSourceSpecifier: &corepb.ConfigSource_Ads{ + Ads: &corepb.AggregatedConfigSource{}, + }, + }, + }, + LbPolicy: xdspb.Cluster_ROUND_ROBIN, + }, + wantUpdate: emptyUpdate, + }, + { + name: "happy-case-no-lrs", + cluster: &xdspb.Cluster{ + ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS}, + EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{ + EdsConfig: &corepb.ConfigSource{ + ConfigSourceSpecifier: &corepb.ConfigSource_Ads{ + Ads: &corepb.AggregatedConfigSource{}, + }, + }, + ServiceName: serviceName1, + }, + LbPolicy: xdspb.Cluster_ROUND_ROBIN, + }, + wantUpdate: CDSUpdate{ServiceName: serviceName1, EnableLRS: false}, + }, + { + name: "happiest-case", + cluster: goodCluster1, + wantUpdate: CDSUpdate{ServiceName: serviceName1, EnableLRS: true}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + gotUpdate, gotErr := validateCluster(test.cluster) + if (gotErr != nil) != test.wantErr { + t.Errorf("validateCluster(%+v) returned error: %v, wantErr: %v", test.cluster, gotErr, test.wantErr) + } + if !reflect.DeepEqual(gotUpdate, test.wantUpdate) { + t.Errorf("validateCluster(%+v) = %v, want: %v", test.cluster, gotUpdate, test.wantUpdate) + } + }) + } +} + +// TestCDSHandleResponse starts a fake xDS server, makes a ClientConn to it, +// and creates a v2Client using it. Then, it registers a CDS watcher and tests +// different CDS responses. +func TestCDSHandleResponse(t *testing.T) { + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) + defer v2c.close() + + tests := []struct { + name string + cdsResponse *xdspb.DiscoveryResponse + wantErr bool + wantUpdate *CDSUpdate + wantUpdateErr bool + }{ + // Badly marshaled CDS response. + { + name: "badly-marshaled-response", + cdsResponse: badlyMarshaledCDSResponse, + wantErr: true, + wantUpdate: nil, + wantUpdateErr: false, + }, + // Response does not contain Cluster proto. + { + name: "no-cluster-proto-in-response", + cdsResponse: badResourceTypeInLDSResponse, + wantErr: true, + wantUpdate: nil, + wantUpdateErr: false, + }, + // Response contains no clusters. + { + name: "no-cluster", + cdsResponse: &xdspb.DiscoveryResponse{}, + wantErr: false, + wantUpdate: &CDSUpdate{}, + wantUpdateErr: true, + }, + // Response contains one good cluster we are not interested in. + { + name: "one-uninteresting-cluster", + cdsResponse: goodCDSResponse2, + wantErr: false, + wantUpdate: &CDSUpdate{}, + wantUpdateErr: true, + }, + // Response contains one cluster and it is good. + { + name: "one-good-cluster", + cdsResponse: goodCDSResponse1, + wantErr: false, + wantUpdate: &CDSUpdate{ServiceName: serviceName1, EnableLRS: true}, + wantUpdateErr: false, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + testWatchHandle(t, &watchHandleTestcase{ + responseToHandle: test.cdsResponse, + wantHandleErr: test.wantErr, + wantUpdate: test.wantUpdate, + wantUpdateErr: test.wantUpdateErr, + + cdsWatch: v2c.watchCDS, + watchReqChan: fakeServer.XDSRequestChan, + handleXDSResp: v2c.handleCDSResponse, + }) + }) + } +} + +// TestCDSHandleResponseWithoutWatch tests the case where the v2Client receives +// a CDS response without a registered watcher. +func TestCDSHandleResponseWithoutWatch(t *testing.T) { + _, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) + defer v2c.close() + + if v2c.handleCDSResponse(goodCDSResponse1) == nil { + t.Fatal("v2c.handleCDSResponse() succeeded, should have failed") + } +} + +// cdsTestOp contains all data related to one particular test operation. Not +// all fields make sense for all tests. +type cdsTestOp struct { + // target is the resource name to watch for. + target string + // responseToSend is the xDS response sent to the client + responseToSend *fakeserver.Response + // wantOpErr specfies whether the main operation should return an error. + wantOpErr bool + // wantCDSCache is the expected rdsCache at the end of an operation. + wantCDSCache map[string]CDSUpdate + // wantWatchCallback specifies if the watch callback should be invoked. + wantWatchCallback bool +} + +// testCDSCaching is a helper function which starts a fake xDS server, makes a +// ClientConn to it, creates a v2Client using it. It then reads a bunch of +// test operations to be performed from cdsTestOps and returns error, if any, +// on the provided error channel. This is executed in a separate goroutine. +func testCDSCaching(t *testing.T, cdsTestOps []cdsTestOp, errCh *testutils.Channel) { + t.Helper() + + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) + defer v2c.close() + t.Log("Started xds v2Client...") + + callbackCh := make(chan struct{}, 1) + for _, cdsTestOp := range cdsTestOps { + // Register a watcher if required, and use a channel to signal the + // successful invocation of the callback. + if cdsTestOp.target != "" { + v2c.watchCDS(cdsTestOp.target, func(u CDSUpdate, err error) { + t.Logf("Received callback with CDSUpdate {%+v} and error {%v}", u, err) + callbackCh <- struct{}{} + }) + t.Logf("Registered a watcher for CDS target: %v...", cdsTestOp.target) + + // Wait till the request makes it to the fakeServer. This ensures that + // the watch request has been processed by the v2Client. + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + errCh.Send(fmt.Errorf("Timeout waiting for CDS request: %v", err)) + return + } + t.Log("FakeServer received request...") + } + + // Directly push the response through a call to handleCDSResponse, + // thereby bypassing the fakeServer. + if cdsTestOp.responseToSend != nil { + resp := cdsTestOp.responseToSend.Resp.(*discoverypb.DiscoveryResponse) + if err := v2c.handleCDSResponse(resp); (err != nil) != cdsTestOp.wantOpErr { + errCh.Send(fmt.Errorf("v2c.handleRDSResponse(%+v) returned err: %v", resp, err)) + return + } + } + + // If the test needs the callback to be invoked, just verify that + // it was invoked. Since we verify the contents of the cache, it's + // ok not to verify the contents of the callback. + if cdsTestOp.wantWatchCallback { + <-callbackCh + } + + if !reflect.DeepEqual(v2c.cloneCDSCacheForTesting(), cdsTestOp.wantCDSCache) { + errCh.Send(fmt.Errorf("gotCDSCache: %v, wantCDSCache: %v", v2c.rdsCache, cdsTestOp.wantCDSCache)) + return + } + } + t.Log("Completed all test ops successfully...") + errCh.Send(nil) +} + +// TestCDSCaching tests some end-to-end CDS flows using a fake xDS server, and +// verifies the CDS data cached at the v2Client. +func TestCDSCaching(t *testing.T) { + ops := []cdsTestOp{ + // Add an CDS watch for a cluster name (clusterName1), which returns one + // matching resource in the response. + { + target: clusterName1, + responseToSend: &fakeserver.Response{Resp: goodCDSResponse1}, + wantCDSCache: map[string]CDSUpdate{ + clusterName1: {serviceName1, true}, + }, + wantWatchCallback: true, + }, + // Push an CDS response which contains a new resource (apart from the + // one received in the previous response). This should be cached. + { + responseToSend: &fakeserver.Response{Resp: cdsResponseWithMultipleResources}, + wantCDSCache: map[string]CDSUpdate{ + clusterName1: {serviceName1, true}, + clusterName2: {serviceName2, false}, + }, + wantWatchCallback: true, + }, + // Switch the watch target to clusterName2, which was already cached. No + // response is received from the server (as expected), but we want the + // callback to be invoked with the new serviceName. + { + target: clusterName2, + wantCDSCache: map[string]CDSUpdate{ + clusterName1: {serviceName1, true}, + clusterName2: {serviceName2, false}, + }, + wantWatchCallback: true, + }, + // Push an empty CDS response. This should clear the cache. + { + responseToSend: &fakeserver.Response{Resp: &xdspb.DiscoveryResponse{TypeUrl: cdsURL}}, + wantOpErr: false, + wantCDSCache: map[string]CDSUpdate{}, + wantWatchCallback: true, + }, + } + errCh := testutils.NewChannel() + go testCDSCaching(t, ops, errCh) + waitForNilErr(t, errCh) +} + +// TestCDSWatchExpiryTimer tests the case where the client does not receive an +// CDS response for the request that it sends out. We want the watch callback +// to be invoked with an error once the watchExpiryTimer fires. +func TestCDSWatchExpiryTimer(t *testing.T) { + oldWatchExpiryTimeout := defaultWatchExpiryTimeout + defaultWatchExpiryTimeout = 500 * time.Millisecond + defer func() { + defaultWatchExpiryTimeout = oldWatchExpiryTimeout + }() + + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) + defer v2c.close() + t.Log("Started xds v2Client...") + + callbackCh := testutils.NewChannel() + v2c.watchCDS(clusterName1, func(u CDSUpdate, err error) { + t.Logf("Received callback with CDSUpdate {%+v} and error {%v}", u, err) + if u.ServiceName != "" { + callbackCh.Send(fmt.Errorf("received serviceName %v in cdsCallback, wanted empty string", u.ServiceName)) + } + if err == nil { + callbackCh.Send(errors.New("received nil error in cdsCallback")) + } + callbackCh.Send(nil) + }) + + // Wait till the request makes it to the fakeServer. This ensures that + // the watch request has been processed by the v2Client. + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + t.Fatalf("Timeout expired when expecting an CDS request") + } + waitForNilErr(t, callbackCh) +} + +var ( + badlyMarshaledCDSResponse = &xdspb.DiscoveryResponse{ + Resources: []*anypb.Any{ + { + TypeUrl: cdsURL, + Value: []byte{1, 2, 3, 4}, + }, + }, + TypeUrl: cdsURL, + } + goodCluster1 = &xdspb.Cluster{ + Name: clusterName1, + ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS}, + EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{ + EdsConfig: &corepb.ConfigSource{ + ConfigSourceSpecifier: &corepb.ConfigSource_Ads{ + Ads: &corepb.AggregatedConfigSource{}, + }, + }, + ServiceName: serviceName1, + }, + LbPolicy: xdspb.Cluster_ROUND_ROBIN, + LrsServer: &corepb.ConfigSource{ + ConfigSourceSpecifier: &corepb.ConfigSource_Self{ + Self: &corepb.SelfConfigSource{}, + }, + }, + } + marshaledCluster1, _ = proto.Marshal(goodCluster1) + goodCluster2 = &xdspb.Cluster{ + Name: clusterName2, + ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS}, + EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{ + EdsConfig: &corepb.ConfigSource{ + ConfigSourceSpecifier: &corepb.ConfigSource_Ads{ + Ads: &corepb.AggregatedConfigSource{}, + }, + }, + ServiceName: serviceName2, + }, + LbPolicy: xdspb.Cluster_ROUND_ROBIN, + } + marshaledCluster2, _ = proto.Marshal(goodCluster2) + goodCDSResponse1 = &xdspb.DiscoveryResponse{ + Resources: []*anypb.Any{ + { + TypeUrl: cdsURL, + Value: marshaledCluster1, + }, + }, + TypeUrl: cdsURL, + } + goodCDSResponse2 = &xdspb.DiscoveryResponse{ + Resources: []*anypb.Any{ + { + TypeUrl: cdsURL, + Value: marshaledCluster2, + }, + }, + TypeUrl: cdsURL, + } + cdsResponseWithMultipleResources = &xdspb.DiscoveryResponse{ + Resources: []*anypb.Any{ + { + TypeUrl: cdsURL, + Value: marshaledCluster1, + }, + { + TypeUrl: cdsURL, + Value: marshaledCluster2, + }, + }, + TypeUrl: cdsURL, + } +) diff --git a/plugin/traffic/xds/client.go b/plugin/traffic/xds/client.go new file mode 100644 index 000000000..98e5d2b4a --- /dev/null +++ b/plugin/traffic/xds/client.go @@ -0,0 +1,105 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package client implementation a full fledged gRPC client for the xDS API +// used by the xds resolver and balancer implementations. +package xds + +import ( + "errors" + "fmt" + "sync" + + "github.com/coredns/coredns/plugin/traffic/xds/bootstrap" + + "google.golang.org/grpc" +) + +// Options provides all parameters required for the creation of an xDS client. +type Options struct { + // Config contains a fully populated bootstrap config. It is the + // responsibility of the caller to use some sane defaults here if the + // bootstrap process returned with certain fields left unspecified. + Config bootstrap.Config + // DialOpts contains dial options to be used when dialing the xDS server. + DialOpts []grpc.DialOption +} + +// Client is a full fledged gRPC client which queries a set of discovery APIs +// (collectively termed as xDS) on a remote management server, to discover +// various dynamic resources. A single client object will be shared by the xds +// resolver and balancer implementations. +type Client struct { + opts Options + cc *grpc.ClientConn // Connection to the xDS server + v2c *v2Client // Actual xDS client implementation using the v2 API + + mu sync.Mutex + serviceCallback func(ServiceUpdate, error) + ldsCancel func() + rdsCancel func() +} + +// New returns a new xdsClient configured with opts. +func New(opts Options) (*Client, error) { + switch { + case opts.Config.BalancerName == "": + return nil, errors.New("xds: no xds_server name provided in options") + case opts.Config.Creds == nil: + return nil, errors.New("xds: no credentials provided in options") + case opts.Config.NodeProto == nil: + return nil, errors.New("xds: no node_proto provided in options") + } + + dopts := append([]grpc.DialOption{opts.Config.Creds}, opts.DialOpts...) + cc, err := grpc.Dial(opts.Config.BalancerName, dopts...) + if err != nil { + // An error from a non-blocking dial indicates something serious. + return nil, fmt.Errorf("xds: failed to dial balancer {%s}: %v", opts.Config.BalancerName, err) + } + + c := &Client{ + opts: opts, + cc: cc, + v2c: newV2Client(cc, opts.Config.NodeProto, nil), // todo re-add backoff (exponential) + } + return c, nil +} + +// Close closes the gRPC connection to the xDS server. +func (c *Client) Close() { + // TODO: Should we invoke the registered callbacks here with an error that + // the client is closed? + c.v2c.close() + c.cc.Close() +} + +// ServiceUpdate contains update about the service. +type ServiceUpdate struct { + Cluster string +} + +// WatchCluster uses CDS to discover information about the provided clusterName. +func (c *Client) WatchCluster(clusterName string, cdsCb func(CDSUpdate, error)) (cancel func()) { + return c.v2c.watchCDS(clusterName, cdsCb) +} + +// WatchEDS watches the ghost. +func (c *Client) WatchEDS(clusterName string, edsCb func(*EDSUpdate, error)) (cancel func()) { + return c.v2c.watchEDS(clusterName, edsCb) +} diff --git a/plugin/traffic/xds/client_test.go b/plugin/traffic/xds/client_test.go new file mode 100644 index 000000000..92724046b --- /dev/null +++ b/plugin/traffic/xds/client_test.go @@ -0,0 +1,292 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xds + +import ( + "errors" + "fmt" + "testing" + "time" + + "github.com/coredns/coredns/plugin/traffic/xds/bootstrap" + + corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + "google.golang.org/grpc" + "google.golang.org/grpc/xds/internal/testutils" + "google.golang.org/grpc/xds/internal/testutils/fakeserver" +) + +func clientOpts(balancerName string) Options { + return Options{ + Config: bootstrap.Config{ + BalancerName: balancerName, + Creds: grpc.WithInsecure(), + NodeProto: &corepb.Node{}, + }, + // WithTimeout is deprecated. But we are OK to call it here from the + // test, so we clearly know that the dial failed. + DialOpts: []grpc.DialOption{grpc.WithTimeout(5 * time.Second), grpc.WithBlock()}, + } +} + +func TestNew(t *testing.T) { + fakeServer, cleanup, err := fakeserver.StartServer() + if err != nil { + t.Fatalf("Failed to start fake xDS server: %v", err) + } + defer cleanup() + + tests := []struct { + name string + opts Options + wantErr bool + }{ + {name: "empty-opts", opts: Options{}, wantErr: true}, + { + name: "empty-balancer-name", + opts: Options{ + Config: bootstrap.Config{ + Creds: grpc.WithInsecure(), + NodeProto: &corepb.Node{}, + }, + }, + wantErr: true, + }, + { + name: "empty-dial-creds", + opts: Options{ + Config: bootstrap.Config{ + BalancerName: "dummy", + NodeProto: &corepb.Node{}, + }, + }, + wantErr: true, + }, + { + name: "empty-node-proto", + opts: Options{ + Config: bootstrap.Config{ + BalancerName: "dummy", + Creds: grpc.WithInsecure(), + }, + }, + wantErr: true, + }, + { + name: "happy-case", + opts: clientOpts(fakeServer.Address), + wantErr: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + c, err := New(test.opts) + if err == nil { + defer c.Close() + } + if (err != nil) != test.wantErr { + t.Fatalf("New(%+v) = %v, wantErr: %v", test.opts, err, test.wantErr) + } + }) + } +} + +// TestWatchService tests the happy case of registering a watcher for +// service updates and receiving a good update. +func TestWatchService(t *testing.T) { + fakeServer, cleanup, err := fakeserver.StartServer() + if err != nil { + t.Fatalf("Failed to start fake xDS server: %v", err) + } + defer cleanup() + + xdsClient, err := New(clientOpts(fakeServer.Address)) + if err != nil { + t.Fatalf("New returned error: %v", err) + } + defer xdsClient.Close() + t.Log("Created an xdsClient...") + + callbackCh := testutils.NewChannel() + cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) { + if err != nil { + callbackCh.Send(fmt.Errorf("xdsClient.WatchService returned error: %v", err)) + return + } + if su.Cluster != goodClusterName1 { + callbackCh.Send(fmt.Errorf("got clusterName: %+v, want clusterName: %+v", su.Cluster, goodClusterName1)) + return + } + callbackCh.Send(nil) + }) + defer cancelWatch() + t.Log("Registered a watcher for service updates...") + + // Make the fakeServer send LDS response. + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + t.Fatalf("Timeout expired when expecting an LDS request") + } + fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1} + + // Make the fakeServer send RDS response. + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + t.Fatalf("Timeout expired when expecting an RDS request") + } + fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodRDSResponse1} + waitForNilErr(t, callbackCh) +} + +// TestWatchServiceWithNoResponseFromServer tests the case where the +// xDS server does not respond to the requests being sent out as part of +// registering a service update watcher. The underlying v2Client will timeout +// and will send us an error. +func TestWatchServiceWithNoResponseFromServer(t *testing.T) { + fakeServer, cleanup, err := fakeserver.StartServer() + if err != nil { + t.Fatalf("Failed to start fake xDS server: %v", err) + } + defer cleanup() + + xdsClient, err := New(clientOpts(fakeServer.Address)) + if err != nil { + t.Fatalf("New returned error: %v", err) + } + defer xdsClient.Close() + t.Log("Created an xdsClient...") + + oldWatchExpiryTimeout := defaultWatchExpiryTimeout + defaultWatchExpiryTimeout = 500 * time.Millisecond + defer func() { + defaultWatchExpiryTimeout = oldWatchExpiryTimeout + }() + + callbackCh := testutils.NewChannel() + cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) { + if su.Cluster != "" { + callbackCh.Send(fmt.Errorf("got clusterName: %+v, want empty clusterName", su.Cluster)) + return + } + if err == nil { + callbackCh.Send(errors.New("xdsClient.WatchService returned error non-nil error")) + return + } + callbackCh.Send(nil) + }) + defer cancelWatch() + t.Log("Registered a watcher for service updates...") + + // Wait for one request from the client, but send no reponses. + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + t.Fatalf("Timeout expired when expecting an LDS request") + } + waitForNilErr(t, callbackCh) +} + +// TestWatchServiceEmptyRDS tests the case where the underlying +// v2Client receives an empty RDS response. +func TestWatchServiceEmptyRDS(t *testing.T) { + fakeServer, cleanup, err := fakeserver.StartServer() + if err != nil { + t.Fatalf("Failed to start fake xDS server: %v", err) + } + defer cleanup() + + xdsClient, err := New(clientOpts(fakeServer.Address)) + if err != nil { + t.Fatalf("New returned error: %v", err) + } + defer xdsClient.Close() + t.Log("Created an xdsClient...") + + oldWatchExpiryTimeout := defaultWatchExpiryTimeout + defaultWatchExpiryTimeout = 500 * time.Millisecond + defer func() { + defaultWatchExpiryTimeout = oldWatchExpiryTimeout + }() + + callbackCh := testutils.NewChannel() + cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) { + if su.Cluster != "" { + callbackCh.Send(fmt.Errorf("got clusterName: %+v, want empty clusterName", su.Cluster)) + return + } + if err == nil { + callbackCh.Send(errors.New("xdsClient.WatchService returned error non-nil error")) + return + } + callbackCh.Send(nil) + }) + defer cancelWatch() + t.Log("Registered a watcher for service updates...") + + // Make the fakeServer send LDS response. + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + t.Fatalf("Timeout expired when expecting an LDS request") + } + fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1} + + // Make the fakeServer send an empty RDS response. + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + t.Fatalf("Timeout expired when expecting an RDS request") + } + fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: noVirtualHostsInRDSResponse} + waitForNilErr(t, callbackCh) +} + +// TestWatchServiceWithClientClose tests the case where xDS responses are +// received after the client is closed, and we make sure that the registered +// watcher callback is not invoked. +func TestWatchServiceWithClientClose(t *testing.T) { + fakeServer, cleanup, err := fakeserver.StartServer() + if err != nil { + t.Fatalf("Failed to start fake xDS server: %v", err) + } + defer cleanup() + + xdsClient, err := New(clientOpts(fakeServer.Address)) + if err != nil { + t.Fatalf("New returned error: %v", err) + } + defer xdsClient.Close() + t.Log("Created an xdsClient...") + + callbackCh := testutils.NewChannel() + cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) { + callbackCh.Send(errors.New("watcher callback invoked after client close")) + }) + defer cancelWatch() + t.Log("Registered a watcher for service updates...") + + // Make the fakeServer send LDS response. + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + t.Fatalf("Timeout expired when expecting an LDS request") + } + fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1} + + xdsClient.Close() + t.Log("Closing the xdsClient...") + + // Push an RDS response from the fakeserver + fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodRDSResponse1} + if cbErr, err := callbackCh.Receive(); err != testutils.ErrRecvTimeout { + t.Fatal(cbErr) + } + +} diff --git a/plugin/traffic/xds/eds.go b/plugin/traffic/xds/eds.go new file mode 100644 index 000000000..e97297f97 --- /dev/null +++ b/plugin/traffic/xds/eds.go @@ -0,0 +1,207 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xds + +import ( + "fmt" + "net" + "strconv" + + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" + typepb "github.com/envoyproxy/go-control-plane/envoy/type" + "github.com/golang/protobuf/ptypes" +) + +// OverloadDropConfig contains the config to drop overloads. +type OverloadDropConfig struct { + Category string + Numerator uint32 + Denominator uint32 +} + +// EndpointHealthStatus represents the health status of an endpoint. +type EndpointHealthStatus int32 + +const ( + // EndpointHealthStatusUnknown represents HealthStatus UNKNOWN. + EndpointHealthStatusUnknown EndpointHealthStatus = iota + // EndpointHealthStatusHealthy represents HealthStatus HEALTHY. + EndpointHealthStatusHealthy + // EndpointHealthStatusUnhealthy represents HealthStatus UNHEALTHY. + EndpointHealthStatusUnhealthy + // EndpointHealthStatusDraining represents HealthStatus DRAINING. + EndpointHealthStatusDraining + // EndpointHealthStatusTimeout represents HealthStatus TIMEOUT. + EndpointHealthStatusTimeout + // EndpointHealthStatusDegraded represents HealthStatus DEGRADED. + EndpointHealthStatusDegraded +) + +// Endpoint contains information of an endpoint. +type Endpoint struct { + Address string + HealthStatus EndpointHealthStatus + Weight uint32 +} + +// Locality contains information of a locality. +type Locality struct { + Endpoints []Endpoint + ID LocalityID + Priority uint32 + Weight uint32 +} + +// EDSUpdate contains an EDS update. +type EDSUpdate struct { + Drops []OverloadDropConfig + Localities []Locality +} + +func parseAddress(socketAddress *corepb.SocketAddress) string { + return net.JoinHostPort(socketAddress.GetAddress(), strconv.Itoa(int(socketAddress.GetPortValue()))) +} + +func parseDropPolicy(dropPolicy *xdspb.ClusterLoadAssignment_Policy_DropOverload) OverloadDropConfig { + percentage := dropPolicy.GetDropPercentage() + var ( + numerator = percentage.GetNumerator() + denominator uint32 + ) + switch percentage.GetDenominator() { + case typepb.FractionalPercent_HUNDRED: + denominator = 100 + case typepb.FractionalPercent_TEN_THOUSAND: + denominator = 10000 + case typepb.FractionalPercent_MILLION: + denominator = 1000000 + } + return OverloadDropConfig{ + Category: dropPolicy.GetCategory(), + Numerator: numerator, + Denominator: denominator, + } +} + +func parseEndpoints(lbEndpoints []*endpointpb.LbEndpoint) []Endpoint { + endpoints := make([]Endpoint, 0, len(lbEndpoints)) + for _, lbEndpoint := range lbEndpoints { + endpoints = append(endpoints, Endpoint{ + HealthStatus: EndpointHealthStatus(lbEndpoint.GetHealthStatus()), + Address: parseAddress(lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress()), + Weight: lbEndpoint.GetLoadBalancingWeight().GetValue(), + }) + } + return endpoints +} + +// ParseEDSRespProto turns EDS response proto message to EDSUpdate. +// +// This is temporarily exported to be used in eds balancer, before it switches +// to use xds client. TODO: unexport. +func ParseEDSRespProto(m *xdspb.ClusterLoadAssignment) (*EDSUpdate, error) { + ret := &EDSUpdate{} + for _, dropPolicy := range m.GetPolicy().GetDropOverloads() { + ret.Drops = append(ret.Drops, parseDropPolicy(dropPolicy)) + } + priorities := make(map[uint32]struct{}) + for _, locality := range m.Endpoints { + l := locality.GetLocality() + if l == nil { + return nil, fmt.Errorf("EDS response contains a locality without ID, locality: %+v", locality) + } + lid := LocalityID{Region: l.Region, Zone: l.Zone, SubZone: l.SubZone} + priority := locality.GetPriority() + priorities[priority] = struct{}{} + ret.Localities = append(ret.Localities, Locality{ + ID: lid, + Endpoints: parseEndpoints(locality.GetLbEndpoints()), + Weight: locality.GetLoadBalancingWeight().GetValue(), + Priority: priority, + }) + } + for i := 0; i < len(priorities); i++ { + if _, ok := priorities[uint32(i)]; !ok { + return nil, fmt.Errorf("priority %v missing (with different priorities %v received)", i, priorities) + } + } + return ret, nil +} + +// ParseEDSRespProtoForTesting parses EDS response, and panic if parsing fails. +// This is used by EDS balancer tests. +// +// TODO: delete this. The EDS balancer should build an EDSUpdate directly, +// instead of building and parsing a proto message. +func ParseEDSRespProtoForTesting(m *xdspb.ClusterLoadAssignment) *EDSUpdate { + u, err := ParseEDSRespProto(m) + if err != nil { + panic(err.Error()) + } + return u +} + +func (v2c *v2Client) handleEDSResponse(resp *xdspb.DiscoveryResponse) error { + v2c.mu.Lock() + defer v2c.mu.Unlock() + + wi := v2c.watchMap[edsURL] + if wi == nil { + return fmt.Errorf("xds: no EDS watcher found when handling EDS response: %+v", resp) + } + + var returnUpdate *EDSUpdate + for _, r := range resp.GetResources() { + var resource ptypes.DynamicAny + if err := ptypes.UnmarshalAny(r, &resource); err != nil { + return fmt.Errorf("xds: failed to unmarshal resource in EDS response: %v", err) + } + cla, ok := resource.Message.(*xdspb.ClusterLoadAssignment) + if !ok { + return fmt.Errorf("xds: unexpected resource type: %T in EDS response", resource.Message) + } + + if cla.GetClusterName() != wi.target[0] { + log.Warningf("xds: got uninteresting EDS resource, got %s, want %s", cla.GetClusterName(), wi.target[0]) + // We won't validate the remaining resources. If one of the + // uninteresting ones is invalid, we will still ACK the response. + continue + } + + u, err := ParseEDSRespProto(cla) + if err != nil { + return err + } + + returnUpdate = u + // Break from the loop because the request resource is found. But + // this also means we won't validate the remaining resources. If one + // of the uninteresting ones is invalid, we will still ACK the + // response. + break + } + + if returnUpdate != nil { + wi.stopTimer() + wi.callback.(edsCallback)(returnUpdate, nil) + } + + return nil +} diff --git a/plugin/traffic/xds/eds_test.go b/plugin/traffic/xds/eds_test.go new file mode 100644 index 000000000..874fa1f30 --- /dev/null +++ b/plugin/traffic/xds/eds_test.go @@ -0,0 +1,287 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xds + +import ( + "errors" + "fmt" + "testing" + "time" + + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + "github.com/golang/protobuf/ptypes" + anypb "github.com/golang/protobuf/ptypes/any" + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/xds/internal/testutils" +) + +func TestEDSParseRespProto(t *testing.T) { + tests := []struct { + name string + m *xdspb.ClusterLoadAssignment + want *EDSUpdate + wantErr bool + }{ + { + name: "missing-priority", + m: func() *xdspb.ClusterLoadAssignment { + clab0 := NewClusterLoadAssignmentBuilder("test", nil) + clab0.AddLocality("locality-1", 1, 0, []string{"addr1:314"}, nil) + clab0.AddLocality("locality-2", 1, 2, []string{"addr2:159"}, nil) + return clab0.Build() + }(), + want: nil, + wantErr: true, + }, + { + name: "missing-locality-ID", + m: func() *xdspb.ClusterLoadAssignment { + clab0 := NewClusterLoadAssignmentBuilder("test", nil) + clab0.AddLocality("", 1, 0, []string{"addr1:314"}, nil) + return clab0.Build() + }(), + want: nil, + wantErr: true, + }, + { + name: "good", + m: func() *xdspb.ClusterLoadAssignment { + clab0 := NewClusterLoadAssignmentBuilder("test", nil) + clab0.AddLocality("locality-1", 1, 1, []string{"addr1:314"}, &AddLocalityOptions{ + Health: []corepb.HealthStatus{corepb.HealthStatus_UNHEALTHY}, + Weight: []uint32{271}, + }) + clab0.AddLocality("locality-2", 1, 0, []string{"addr2:159"}, &AddLocalityOptions{ + Health: []corepb.HealthStatus{corepb.HealthStatus_DRAINING}, + Weight: []uint32{828}, + }) + return clab0.Build() + }(), + want: &EDSUpdate{ + Drops: nil, + Localities: []Locality{ + { + Endpoints: []Endpoint{{ + Address: "addr1:314", + HealthStatus: EndpointHealthStatusUnhealthy, + Weight: 271, + }}, + ID: Locality{SubZone: "locality-1"}, + Priority: 1, + Weight: 1, + }, + { + Endpoints: []Endpoint{{ + Address: "addr2:159", + HealthStatus: EndpointHealthStatusDraining, + Weight: 828, + }}, + ID: Locality{SubZone: "locality-2"}, + Priority: 0, + Weight: 1, + }, + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := ParseEDSRespProto(tt.m) + if (err != nil) != tt.wantErr { + t.Errorf("ParseEDSRespProto() error = %v, wantErr %v", err, tt.wantErr) + return + } + if d := cmp.Diff(got, tt.want); d != "" { + t.Errorf("ParseEDSRespProto() got = %v, want %v, diff: %v", got, tt.want, d) + } + }) + } +} + +var ( + badlyMarshaledEDSResponse = &xdspb.DiscoveryResponse{ + Resources: []*anypb.Any{ + { + TypeUrl: edsURL, + Value: []byte{1, 2, 3, 4}, + }, + }, + TypeUrl: edsURL, + } + badResourceTypeInEDSResponse = &xdspb.DiscoveryResponse{ + Resources: []*anypb.Any{ + { + TypeUrl: httpConnManagerURL, + Value: marshaledConnMgr1, + }, + }, + TypeUrl: edsURL, + } + goodEDSResponse1 = &xdspb.DiscoveryResponse{ + Resources: []*anypb.Any{ + func() *anypb.Any { + clab0 := NewClusterLoadAssignmentBuilder(goodEDSName, nil) + clab0.AddLocality("locality-1", 1, 1, []string{"addr1:314"}, nil) + clab0.AddLocality("locality-2", 1, 0, []string{"addr2:159"}, nil) + a, _ := ptypes.MarshalAny(clab0.Build()) + return a + }(), + }, + TypeUrl: edsURL, + } + goodEDSResponse2 = &xdspb.DiscoveryResponse{ + Resources: []*anypb.Any{ + func() *anypb.Any { + clab0 := NewClusterLoadAssignmentBuilder("not-goodEDSName", nil) + clab0.AddLocality("locality-1", 1, 1, []string{"addr1:314"}, nil) + clab0.AddLocality("locality-2", 1, 0, []string{"addr2:159"}, nil) + a, _ := ptypes.MarshalAny(clab0.Build()) + return a + }(), + }, + TypeUrl: edsURL, + } +) + +func TestEDSHandleResponse(t *testing.T) { + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) + defer v2c.close() + + tests := []struct { + name string + edsResponse *xdspb.DiscoveryResponse + wantErr bool + wantUpdate *EDSUpdate + wantUpdateErr bool + }{ + // Any in resource is badly marshaled. + { + name: "badly-marshaled_response", + edsResponse: badlyMarshaledEDSResponse, + wantErr: true, + wantUpdate: nil, + wantUpdateErr: false, + }, + // Response doesn't contain resource with the right type. + { + name: "no-config-in-response", + edsResponse: badResourceTypeInEDSResponse, + wantErr: true, + wantUpdate: nil, + wantUpdateErr: false, + }, + // Response contains one uninteresting ClusterLoadAssignment. + { + name: "one-uninterestring-assignment", + edsResponse: goodEDSResponse2, + wantErr: false, + wantUpdate: nil, + wantUpdateErr: false, + }, + // Response contains one good ClusterLoadAssignment. + { + name: "one-good-assignment", + edsResponse: goodEDSResponse1, + wantErr: false, + wantUpdate: &EDSUpdate{ + Localities: []Locality{ + { + Endpoints: []Endpoint{{Address: "addr1:314"}}, + ID: Locality{SubZone: "locality-1"}, + Priority: 1, + Weight: 1, + }, + { + Endpoints: []Endpoint{{Address: "addr2:159"}}, + ID: Locality{SubZone: "locality-2"}, + Priority: 0, + Weight: 1, + }, + }, + }, + wantUpdateErr: false, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + testWatchHandle(t, &watchHandleTestcase{ + responseToHandle: test.edsResponse, + wantHandleErr: test.wantErr, + wantUpdate: test.wantUpdate, + wantUpdateErr: test.wantUpdateErr, + + edsWatch: v2c.watchEDS, + watchReqChan: fakeServer.XDSRequestChan, + handleXDSResp: v2c.handleEDSResponse, + }) + }) + } +} + +// TestEDSHandleResponseWithoutWatch tests the case where the v2Client +// receives an EDS response without a registered EDS watcher. +func TestEDSHandleResponseWithoutWatch(t *testing.T) { + _, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) + defer v2c.close() + + if v2c.handleEDSResponse(goodEDSResponse1) == nil { + t.Fatal("v2c.handleEDSResponse() succeeded, should have failed") + } +} + +func TestEDSWatchExpiryTimer(t *testing.T) { + oldWatchExpiryTimeout := defaultWatchExpiryTimeout + defaultWatchExpiryTimeout = 500 * time.Millisecond + defer func() { + defaultWatchExpiryTimeout = oldWatchExpiryTimeout + }() + + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) + defer v2c.close() + t.Log("Started xds v2Client...") + + callbackCh := testutils.NewChannel() + v2c.watchEDS(goodRouteName1, func(u *EDSUpdate, err error) { + t.Logf("Received callback with edsUpdate {%+v} and error {%v}", u, err) + if u != nil { + callbackCh.Send(fmt.Errorf("received EDSUpdate %v in edsCallback, wanted nil", u)) + } + if err == nil { + callbackCh.Send(errors.New("received nil error in edsCallback")) + } + callbackCh.Send(nil) + }) + + // Wait till the request makes it to the fakeServer. This ensures that + // the watch request has been processed by the v2Client. + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + t.Fatalf("Timeout expired when expecting an CDS request") + } + waitForNilErr(t, callbackCh) +} diff --git a/plugin/traffic/xds/eds_testutil.go b/plugin/traffic/xds/eds_testutil.go new file mode 100644 index 000000000..7ae03fcfa --- /dev/null +++ b/plugin/traffic/xds/eds_testutil.go @@ -0,0 +1,128 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// All structs/functions in this file should be unexported. They are used in EDS +// balancer tests now, to generate test inputs. Eventually, EDS balancer tests +// should generate EDSUpdate directly, instead of generating and parsing the +// proto message. +// TODO: unexported everything in this file. + +package xds + +import ( + "fmt" + "net" + "strconv" + + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" + typepb "github.com/envoyproxy/go-control-plane/envoy/type" + wrapperspb "github.com/golang/protobuf/ptypes/wrappers" +) + +// ClusterLoadAssignmentBuilder builds a ClusterLoadAssignment, aka EDS +// response. +type ClusterLoadAssignmentBuilder struct { + v *xdspb.ClusterLoadAssignment +} + +// NewClusterLoadAssignmentBuilder creates a ClusterLoadAssignmentBuilder. +func NewClusterLoadAssignmentBuilder(clusterName string, dropPercents []uint32) *ClusterLoadAssignmentBuilder { + var drops []*xdspb.ClusterLoadAssignment_Policy_DropOverload + for i, d := range dropPercents { + drops = append(drops, &xdspb.ClusterLoadAssignment_Policy_DropOverload{ + Category: fmt.Sprintf("test-drop-%d", i), + DropPercentage: &typepb.FractionalPercent{ + Numerator: d, + Denominator: typepb.FractionalPercent_HUNDRED, + }, + }) + } + + return &ClusterLoadAssignmentBuilder{ + v: &xdspb.ClusterLoadAssignment{ + ClusterName: clusterName, + Policy: &xdspb.ClusterLoadAssignment_Policy{ + DropOverloads: drops, + }, + }, + } +} + +// AddLocalityOptions contains options when adding locality to the builder. +type AddLocalityOptions struct { + Health []corepb.HealthStatus + Weight []uint32 +} + +// AddLocality adds a locality to the builder. +func (clab *ClusterLoadAssignmentBuilder) AddLocality(subzone string, weight uint32, priority uint32, addrsWithPort []string, opts *AddLocalityOptions) { + var lbEndPoints []*endpointpb.LbEndpoint + for i, a := range addrsWithPort { + host, portStr, err := net.SplitHostPort(a) + if err != nil { + panic("failed to split " + a) + } + port, err := strconv.Atoi(portStr) + if err != nil { + panic("failed to atoi " + portStr) + } + + lbe := &endpointpb.LbEndpoint{ + HostIdentifier: &endpointpb.LbEndpoint_Endpoint{ + Endpoint: &endpointpb.Endpoint{ + Address: &corepb.Address{ + Address: &corepb.Address_SocketAddress{ + SocketAddress: &corepb.SocketAddress{ + Protocol: corepb.SocketAddress_TCP, + Address: host, + PortSpecifier: &corepb.SocketAddress_PortValue{ + PortValue: uint32(port)}}}}}}, + } + if opts != nil { + if i < len(opts.Health) { + lbe.HealthStatus = opts.Health[i] + } + if i < len(opts.Weight) { + lbe.LoadBalancingWeight = &wrapperspb.UInt32Value{Value: opts.Weight[i]} + } + } + lbEndPoints = append(lbEndPoints, lbe) + } + + var localityID *corepb.Locality + if subzone != "" { + localityID = &corepb.Locality{ + Region: "", + Zone: "", + SubZone: subzone, + } + } + + clab.v.Endpoints = append(clab.v.Endpoints, &endpointpb.LocalityLbEndpoints{ + Locality: localityID, + LbEndpoints: lbEndPoints, + LoadBalancingWeight: &wrapperspb.UInt32Value{Value: weight}, + Priority: priority, + }) +} + +// Build builds ClusterLoadAssignment. +func (clab *ClusterLoadAssignmentBuilder) Build() *xdspb.ClusterLoadAssignment { + return clab.v +} diff --git a/plugin/traffic/xds/locality.go b/plugin/traffic/xds/locality.go new file mode 100644 index 000000000..8a8324228 --- /dev/null +++ b/plugin/traffic/xds/locality.go @@ -0,0 +1,50 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xds + +import ( + "fmt" + + corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" +) + +// Locality is xds.Locality without XXX fields, so it can be used as map +// keys. +// +// xds.Locality cannot be map keys because one of the XXX fields is a slice. +// +// This struct should only be used as map keys. Use the proto message directly +// in all other places. +type LocalityID struct { + Region string + Zone string + SubZone string +} + +func (l LocalityID) String() string { + return fmt.Sprintf("%s-%s-%s", l.Region, l.Zone, l.SubZone) +} + +// ToProto convert Locality to the proto representation. +func (l LocalityID) ToProto() *corepb.Locality { + return &corepb.Locality{ + Region: l.Region, + Zone: l.Zone, + SubZone: l.SubZone, + } +} diff --git a/plugin/traffic/xds/testutil_test.go b/plugin/traffic/xds/testutil_test.go new file mode 100644 index 000000000..b05acbd35 --- /dev/null +++ b/plugin/traffic/xds/testutil_test.go @@ -0,0 +1,169 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xds + +import ( + "reflect" + "testing" + + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc" + "google.golang.org/grpc/xds/internal/testutils" + "google.golang.org/grpc/xds/internal/testutils/fakeserver" +) + +type watchHandleTestcase struct { + responseToHandle *xdspb.DiscoveryResponse + wantHandleErr bool + wantUpdate interface{} + wantUpdateErr bool + + // Only one of the following should be non-nil. The one corresponding with + // typeURL will be called. + ldsWatch func(target string, ldsCb ldsCallback) (cancel func()) + rdsWatch func(routeName string, rdsCb rdsCallback) (cancel func()) + cdsWatch func(clusterName string, cdsCb cdsCallback) (cancel func()) + edsWatch func(clusterName string, edsCb edsCallback) (cancel func()) + watchReqChan *testutils.Channel // The request sent for watch will be sent to this channel. + handleXDSResp func(response *xdspb.DiscoveryResponse) error +} + +// testWatchHandle is called to test response handling for each xDS. +// +// It starts the xDS watch as configured in test, waits for the fake xds server +// to receive the request (so watch callback is installed), and calls +// handleXDSResp with responseToHandle (if it's set). It then compares the +// update received by watch callback with the expected results. +func testWatchHandle(t *testing.T, test *watchHandleTestcase) { + type updateErr struct { + u interface{} + err error + } + gotUpdateCh := testutils.NewChannel() + + var cancelWatch func() + // Register the watcher, this will also trigger the v2Client to send the xDS + // request. + switch { + case test.ldsWatch != nil: + cancelWatch = test.ldsWatch(goodLDSTarget1, func(u ldsUpdate, err error) { + t.Logf("in v2c.watchLDS callback, ldsUpdate: %+v, err: %v", u, err) + gotUpdateCh.Send(updateErr{u, err}) + }) + case test.rdsWatch != nil: + cancelWatch = test.rdsWatch(goodRouteName1, func(u rdsUpdate, err error) { + t.Logf("in v2c.watchRDS callback, rdsUpdate: %+v, err: %v", u, err) + gotUpdateCh.Send(updateErr{u, err}) + }) + case test.cdsWatch != nil: + cancelWatch = test.cdsWatch(clusterName1, func(u CDSUpdate, err error) { + t.Logf("in v2c.watchCDS callback, cdsUpdate: %+v, err: %v", u, err) + gotUpdateCh.Send(updateErr{u, err}) + }) + case test.edsWatch != nil: + cancelWatch = test.edsWatch(goodEDSName, func(u *EDSUpdate, err error) { + t.Logf("in v2c.watchEDS callback, edsUpdate: %+v, err: %v", u, err) + gotUpdateCh.Send(updateErr{*u, err}) + }) + default: + t.Fatalf("no watch() is set") + } + defer cancelWatch() + + // Wait till the request makes it to the fakeServer. This ensures that + // the watch request has been processed by the v2Client. + if _, err := test.watchReqChan.Receive(); err != nil { + t.Fatalf("Timeout waiting for an xDS request: %v", err) + } + + // Directly push the response through a call to handleXDSResp. This bypasses + // the fakeServer, so it's only testing the handle logic. Client response + // processing is covered elsewhere. + // + // Also note that this won't trigger ACK, so there's no need to clear the + // request channel afterwards. + if err := test.handleXDSResp(test.responseToHandle); (err != nil) != test.wantHandleErr { + t.Fatalf("v2c.handleRDSResponse() returned err: %v, wantErr: %v", err, test.wantHandleErr) + } + + // If the test doesn't expect the callback to be invoked, verify that no + // update or error is pushed to the callback. + // + // Cannot directly compare test.wantUpdate with nil (typed vs non-typed nil: + // https://golang.org/doc/faq#nil_error). + if c := test.wantUpdate; c == nil || (reflect.ValueOf(c).Kind() == reflect.Ptr && reflect.ValueOf(c).IsNil()) { + update, err := gotUpdateCh.Receive() + if err == testutils.ErrRecvTimeout { + return + } + t.Fatalf("Unexpected update: +%v", update) + } + + wantUpdate := reflect.ValueOf(test.wantUpdate).Elem().Interface() + uErr, err := gotUpdateCh.Receive() + if err == testutils.ErrRecvTimeout { + t.Fatal("Timeout expecting xDS update") + } + gotUpdate := uErr.(updateErr).u + opt := cmp.AllowUnexported(rdsUpdate{}, ldsUpdate{}, CDSUpdate{}, EDSUpdate{}) + if diff := cmp.Diff(gotUpdate, wantUpdate, opt); diff != "" { + t.Fatalf("got update : %+v, want %+v, diff: %s", gotUpdate, wantUpdate, diff) + } + gotUpdateErr := uErr.(updateErr).err + if (gotUpdateErr != nil) != test.wantUpdateErr { + t.Fatalf("got xDS update error {%v}, wantErr: %v", gotUpdateErr, test.wantUpdateErr) + } +} + +// startServerAndGetCC starts a fake XDS server and also returns a ClientConn +// connected to it. +func startServerAndGetCC(t *testing.T) (*fakeserver.Server, *grpc.ClientConn, func()) { + t.Helper() + + fs, sCleanup, err := fakeserver.StartServer() + if err != nil { + t.Fatalf("Failed to start fake xDS server: %v", err) + } + + cc, ccCleanup, err := fs.XDSClientConn() + if err != nil { + sCleanup() + t.Fatalf("Failed to get a clientConn to the fake xDS server: %v", err) + } + return fs, cc, func() { + sCleanup() + ccCleanup() + } +} + +// waitForNilErr waits for a nil error value to be received on the +// provided channel. +func waitForNilErr(t *testing.T, ch *testutils.Channel) { + t.Helper() + + val, err := ch.Receive() + if err == testutils.ErrRecvTimeout { + t.Fatalf("Timeout expired when expecting update") + } + if val != nil { + if cbErr := val.(error); cbErr != nil { + t.Fatal(cbErr) + } + } +} diff --git a/plugin/traffic/xds/types.go b/plugin/traffic/xds/types.go new file mode 100644 index 000000000..c063233bf --- /dev/null +++ b/plugin/traffic/xds/types.go @@ -0,0 +1,83 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xds + +import ( + "time" + + adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" +) + +type adsStream adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient + +const ( + cdsURL = "type.googleapis.com/envoy.api.v2.Cluster" + edsURL = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment" +) + +// watchState is an enum to represent the state of a watch call. +type watchState int + +const ( + watchEnqueued watchState = iota + watchCancelled + watchStarted +) + +// watchInfo holds all the information about a watch call. +type watchInfo struct { + typeURL string + target []string + state watchState + + callback interface{} + expiryTimer *time.Timer +} + +// cancel marks the state as cancelled, and also stops the expiry timer. +func (wi *watchInfo) cancel() { + wi.state = watchCancelled + if wi.expiryTimer != nil { + wi.expiryTimer.Stop() + } +} + +// stopTimer stops the expiry timer without cancelling the watch. +func (wi *watchInfo) stopTimer() { + if wi.expiryTimer != nil { + wi.expiryTimer.Stop() + } +} + +type ackInfo struct { + typeURL string + version string // Nack if version is an empty string. + nonce string +} + +// CDSUpdate contains information from a received CDS response, which is of +// interest to the registered CDS watcher. +type CDSUpdate struct { + // ServiceName is the service name corresponding to the clusterName which + // is being watched for through CDS. + ServiceName string +} +type cdsCallback func(CDSUpdate, error) + +type edsCallback func(*EDSUpdate, error) diff --git a/plugin/traffic/xds/v2client.go b/plugin/traffic/xds/v2client.go new file mode 100644 index 000000000..dc0db0883 --- /dev/null +++ b/plugin/traffic/xds/v2client.go @@ -0,0 +1,421 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xds + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/coredns/coredns/plugin/traffic/xds/buffer" + + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" + "google.golang.org/grpc" +) + +// The value chosen here is based on the default value of the +// initial_fetch_timeout field in corepb.ConfigSource proto. +var defaultWatchExpiryTimeout = 15 * time.Second + +// v2Client performs the actual xDS RPCs using the xDS v2 API. It creates a +// single ADS stream on which the different types of xDS requests and responses +// are multiplexed. +// The reason for splitting this out from the top level xdsClient object is +// because there is already an xDS v3Aplha API in development. If and when we +// want to switch to that, this separation will ease that process. +type v2Client struct { + ctx context.Context + cancelCtx context.CancelFunc + + // ClientConn to the xDS gRPC server. Owned by the parent xdsClient. + cc *grpc.ClientConn + nodeProto *corepb.Node + backoff func(int) time.Duration + + // sendCh in the channel onto which watchInfo objects are pushed by the + // watch API, and it is read and acted upon by the send() goroutine. + sendCh *buffer.Unbounded + + mu sync.Mutex + // Message specific watch infos, protected by the above mutex. These are + // written to, after successfully reading from the update channel, and are + // read from when recovering from a broken stream to resend the xDS + // messages. When the user of this client object cancels a watch call, + // these are set to nil. All accesses to the map protected and any value + // inside the map should be protected with the above mutex. + watchMap map[string]*watchInfo + // ackMap contains the version that was acked (the version in the ack + // request that was sent on wire). The key is typeURL, the value is the + // version string, becaues the versions for different resource types + // should be independent. + ackMap map[string]string + // rdsCache maintains a mapping of {clusterName --> CDSUpdate} from + // validated cluster configurations received in CDS responses. We cache all + // valid cluster configurations, whether or not we are interested in them + // when we received them (because we could become interested in them in the + // future and the server wont send us those resources again). This is only + // to support legacy management servers that do not honor the + // resource_names field. As per the latest spec, the server should resend + // the response when the request changes, even if it had sent the same + // resource earlier (when not asked for). Protected by the above mutex. + cdsCache map[string]CDSUpdate +} + +// newV2Client creates a new v2Client initialized with the passed arguments. +func newV2Client(cc *grpc.ClientConn, nodeProto *corepb.Node, backoff func(int) time.Duration) *v2Client { + v2c := &v2Client{ + cc: cc, + nodeProto: nodeProto, + backoff: backoff, + sendCh: buffer.NewUnbounded(), + watchMap: make(map[string]*watchInfo), + ackMap: make(map[string]string), + cdsCache: make(map[string]CDSUpdate), + } + v2c.ctx, v2c.cancelCtx = context.WithCancel(context.Background()) + + go v2c.run() + return v2c +} + +// close cleans up resources and goroutines allocated by this client. +func (v2c *v2Client) close() { + v2c.cancelCtx() +} + +// run starts an ADS stream (and backs off exponentially, if the previous +// stream failed without receiving a single reply) and runs the sender and +// receiver routines to send and receive data from the stream respectively. +func (v2c *v2Client) run() { + retries := 0 + for { + select { + case <-v2c.ctx.Done(): + return + default: + } + + if retries != 0 { + t := time.NewTimer(v2c.backoff(retries)) + select { + case <-t.C: + case <-v2c.ctx.Done(): + if !t.Stop() { + <-t.C + } + return + } + } + + retries++ + cli := adsgrpc.NewAggregatedDiscoveryServiceClient(v2c.cc) + stream, err := cli.StreamAggregatedResources(v2c.ctx, grpc.WaitForReady(true)) + if err != nil { + log.Infof("xds: ADS stream creation failed: %v", err) + continue + } + + // send() could be blocked on reading updates from the different update + // channels when it is not actually sending out messages. So, we need a + // way to break out of send() when recv() returns. This done channel is + // used to for that purpose. + done := make(chan struct{}) + go v2c.send(stream, done) + if v2c.recv(stream) { + retries = 0 + } + close(done) + } +} + +// sendRequest sends a request for provided typeURL and resource on the provided +// stream. +// +// version is the ack version to be sent with the request +// - If this is the new request (not an ack/nack), version will be an empty +// string +// - If this is an ack, version will be the version from the response +// - If this is a nack, version will be the previous acked version (from +// ackMap). If there was no ack before, it will be an empty string +func (v2c *v2Client) sendRequest(stream adsStream, resourceNames []string, typeURL, version, nonce string) bool { + req := &xdspb.DiscoveryRequest{ + Node: v2c.nodeProto, + TypeUrl: typeURL, + ResourceNames: resourceNames, + VersionInfo: version, + ResponseNonce: nonce, + // TODO: populate ErrorDetails for nack. + } + if err := stream.Send(req); err != nil { + log.Warningf("xds: request (type %s) for resource %v failed: %v", typeURL, resourceNames, err) + return false + } + return true +} + +// sendExisting sends out xDS requests for registered watchers when recovering +// from a broken stream. +// +// We call stream.Send() here with the lock being held. It should be OK to do +// that here because the stream has just started and Send() usually returns +// quickly (once it pushes the message onto the transport layer) and is only +// ever blocked if we don't have enough flow control quota. +func (v2c *v2Client) sendExisting(stream adsStream) bool { + v2c.mu.Lock() + defer v2c.mu.Unlock() + + // Reset the ack versions when the stream restarts. + v2c.ackMap = make(map[string]string) + + for typeURL, wi := range v2c.watchMap { + if !v2c.sendRequest(stream, wi.target, typeURL, "", "") { + return false + } + } + + return true +} + +// processWatchInfo pulls the fields needed by the request from a watchInfo. +// +// It also calls callback with cached response, and updates the watch map in +// v2c. +// +// If the watch was already canceled, it returns false for send +func (v2c *v2Client) processWatchInfo(t *watchInfo) (target []string, typeURL, version, nonce string, send bool) { + v2c.mu.Lock() + defer v2c.mu.Unlock() + if t.state == watchCancelled { + return // This returns all zero values, and false for send. + } + t.state = watchStarted + send = true + + typeURL = t.typeURL + target = t.target + v2c.checkCacheAndUpdateWatchMap(t) + // TODO: if watch is called again with the same resource names, + // there's no need to send another request. + // + // TODO: should we reset version (for ack) when a new watch is + // started? Or do this only if the resource names are different + // (so we send a new request)? + return +} + +// processAckInfo pulls the fields needed by the ack request from a ackInfo. +// +// If no active watch is found for this ack, it returns false for send. +func (v2c *v2Client) processAckInfo(t *ackInfo) (target []string, typeURL, version, nonce string, send bool) { + typeURL = t.typeURL + + v2c.mu.Lock() + defer v2c.mu.Unlock() + wi, ok := v2c.watchMap[typeURL] + if !ok { + // We don't send the request ack if there's no active watch (this can be + // either the server sends responses before any request, or the watch is + // canceled while the ackInfo is in queue), because there's no resource + // name. And if we send a request with empty resource name list, the + // server may treat it as a wild card and send us everything. + log.Warningf("xds: ack (type %s) not sent because there's no active watch for the type", typeURL) + return // This returns all zero values, and false for send. + } + send = true + + version = t.version + nonce = t.nonce + target = wi.target + if version == "" { + // This is a nack, get the previous acked version. + version = v2c.ackMap[typeURL] + // version will still be an empty string if typeURL isn't + // found in ackMap, this can happen if there wasn't any ack + // before. + } else { + v2c.ackMap[typeURL] = version + } + return +} + +// send reads watch infos from update channel and sends out actual xDS requests +// on the provided ADS stream. +func (v2c *v2Client) send(stream adsStream, done chan struct{}) { + if !v2c.sendExisting(stream) { + return + } + + for { + select { + case <-v2c.ctx.Done(): + return + case u := <-v2c.sendCh.Get(): + v2c.sendCh.Load() + + var ( + target []string + typeURL, version, nonce string + send bool + ) + switch t := u.(type) { + case *watchInfo: + target, typeURL, version, nonce, send = v2c.processWatchInfo(t) + case *ackInfo: + target, typeURL, version, nonce, send = v2c.processAckInfo(t) + } + if !send { + continue + } + if !v2c.sendRequest(stream, target, typeURL, version, nonce) { + return + } + case <-done: + return + } + } +} + +// recv receives xDS responses on the provided ADS stream and branches out to +// message specific handlers. +func (v2c *v2Client) recv(stream adsStream) bool { + success := false + for { + resp, err := stream.Recv() + // TODO: call watch callbacks with error when stream is broken. + if err != nil { + log.Warningf("xds: ADS stream recv failed: %v", err) + return success + } + var respHandleErr error + switch resp.GetTypeUrl() { + case cdsURL: + respHandleErr = v2c.handleCDSResponse(resp) + case edsURL: + respHandleErr = v2c.handleEDSResponse(resp) + default: + log.Warningf("xds: unknown response URL type: %v", resp.GetTypeUrl()) + continue + } + + typeURL := resp.GetTypeUrl() + if respHandleErr != nil { + log.Warningf("xds: response (type %s) handler failed: %v", typeURL, respHandleErr) + v2c.sendCh.Put(&ackInfo{ + typeURL: typeURL, + version: "", + nonce: resp.GetNonce(), + }) + continue + } + v2c.sendCh.Put(&ackInfo{ + typeURL: typeURL, + version: resp.GetVersionInfo(), + nonce: resp.GetNonce(), + }) + success = true + } +} + +// watchCDS registers an CDS watcher for the provided clusterName. Updates +// corresponding to received CDS responses will be pushed to the provided +// callback. The caller can cancel the watch by invoking the returned cancel +// function. +// The provided callback should not block or perform any expensive operations +// or call other methods of the v2Client object. +func (v2c *v2Client) watchCDS(clusterName string, cdsCb cdsCallback) (cancel func()) { + return v2c.watch(&watchInfo{ + typeURL: cdsURL, + target: []string{clusterName}, + callback: cdsCb, + }) +} + +// watchEDS registers an EDS watcher for the provided clusterName. Updates +// corresponding to received EDS responses will be pushed to the provided +// callback. The caller can cancel the watch by invoking the returned cancel +// function. +// The provided callback should not block or perform any expensive operations +// or call other methods of the v2Client object. +func (v2c *v2Client) watchEDS(clusterName string, edsCb edsCallback) (cancel func()) { + return v2c.watch(&watchInfo{ + typeURL: edsURL, + target: []string{clusterName}, + callback: edsCb, + }) + // TODO: Once a registered EDS watch is cancelled, we should send an EDS + // request with no resources. This will let the server know that we are no + // longer interested in this resource. +} + +func (v2c *v2Client) watch(wi *watchInfo) (cancel func()) { + v2c.sendCh.Put(wi) + return func() { + v2c.mu.Lock() + defer v2c.mu.Unlock() + if wi.state == watchEnqueued { + wi.state = watchCancelled + return + } + v2c.watchMap[wi.typeURL].cancel() + delete(v2c.watchMap, wi.typeURL) + // TODO: should we reset ack version string when cancelling the watch? + } +} + +// checkCacheAndUpdateWatchMap is called when a new watch call is handled in +// send(). If an existing watcher is found, its expiry timer is stopped. If the +// watchInfo to be added to the watchMap is found in the cache, the watcher +// callback is immediately invoked. +// +// Caller should hold v2c.mu +func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) { + if existing := v2c.watchMap[wi.typeURL]; existing != nil { + existing.cancel() + } + + v2c.watchMap[wi.typeURL] = wi + switch wi.typeURL { + // We need to grab the lock inside of the expiryTimer's afterFunc because + // we need to access the watchInfo, which is stored in the watchMap. + case cdsURL: + clusterName := wi.target[0] + if update, ok := v2c.cdsCache[clusterName]; ok { + var err error + if v2c.watchMap[cdsURL] == nil { + err = fmt.Errorf("xds: no CDS watcher found when handling CDS watch for cluster {%v} from cache", clusterName) + } + wi.callback.(cdsCallback)(update, err) + return + } + wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { + v2c.mu.Lock() + wi.callback.(cdsCallback)(CDSUpdate{}, fmt.Errorf("xds: CDS target %s not found", wi.target)) + v2c.mu.Unlock() + }) + case edsURL: + wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { + v2c.mu.Lock() + wi.callback.(edsCallback)(nil, fmt.Errorf("xds: EDS target %s not found", wi.target)) + v2c.mu.Unlock() + }) + } +} diff --git a/plugin/traffic/xds/v2client_ack_test.go b/plugin/traffic/xds/v2client_ack_test.go new file mode 100644 index 000000000..942943f07 --- /dev/null +++ b/plugin/traffic/xds/v2client_ack_test.go @@ -0,0 +1,263 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xds + +import ( + "fmt" + "strconv" + "testing" + "time" + + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + "github.com/golang/protobuf/proto" + anypb "github.com/golang/protobuf/ptypes/any" + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/xds/internal/testutils" + "google.golang.org/grpc/xds/internal/testutils/fakeserver" +) + +// compareXDSRequest reads requests from channel, compare it with want. +func compareXDSRequest(ch *testutils.Channel, want *xdspb.DiscoveryRequest, version, nonce string) error { + val, err := ch.Receive() + if err != nil { + return err + } + req := val.(*fakeserver.Request) + if req.Err != nil { + return fmt.Errorf("unexpected error from request: %v", req.Err) + } + wantClone := proto.Clone(want).(*xdspb.DiscoveryRequest) + wantClone.VersionInfo = version + wantClone.ResponseNonce = nonce + if !cmp.Equal(req.Req, wantClone, cmp.Comparer(proto.Equal)) { + return fmt.Errorf("received request different from want, diff: %s", cmp.Diff(req.Req, wantClone)) + } + return nil +} + +func sendXDSRespWithVersion(ch chan<- *fakeserver.Response, respWithoutVersion *xdspb.DiscoveryResponse, version int) (nonce string) { + respToSend := proto.Clone(respWithoutVersion).(*xdspb.DiscoveryResponse) + respToSend.VersionInfo = strconv.Itoa(version) + nonce = strconv.Itoa(int(time.Now().UnixNano())) + respToSend.Nonce = nonce + ch <- &fakeserver.Response{Resp: respToSend} + return +} + +// startXDS calls watch to send the first request. It then sends a good response +// and checks for ack. +func startXDS(t *testing.T, xdsname string, v2c *v2Client, reqChan *testutils.Channel, req *xdspb.DiscoveryRequest) *testutils.Channel { + callbackCh := testutils.NewChannel() + switch xdsname { + case "LDS": + v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) { + t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err) + callbackCh.Send(struct{}{}) + }) + case "RDS": + v2c.watchRDS(goodRouteName1, func(u rdsUpdate, err error) { + t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err) + callbackCh.Send(struct{}{}) + }) + case "CDS": + v2c.watchCDS(goodClusterName1, func(u CDSUpdate, err error) { + t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err) + callbackCh.Send(struct{}{}) + }) + case "EDS": + v2c.watchEDS(goodEDSName, func(u *EDSUpdate, err error) { + t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err) + callbackCh.Send(struct{}{}) + }) + } + + if err := compareXDSRequest(reqChan, req, "", ""); err != nil { + t.Fatalf("Failed to receive %s request: %v", xdsname, err) + } + t.Logf("FakeServer received %s request...", xdsname) + return callbackCh +} + +// sendGoodResp sends the good response, with the given version, and a random +// nonce. +// +// It also waits and checks that the ack request contains the given version, and +// the generated nonce. +func sendGoodResp(t *testing.T, xdsname string, fakeServer *fakeserver.Server, version int, goodResp *xdspb.DiscoveryResponse, wantReq *xdspb.DiscoveryRequest, callbackCh *testutils.Channel) { + nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, goodResp, version) + t.Logf("Good %s response pushed to fakeServer...", xdsname) + + if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(version), nonce); err != nil { + t.Errorf("Failed to receive %s request: %v", xdsname, err) + } + t.Logf("Good %s response acked", xdsname) + + if _, err := callbackCh.Receive(); err != nil { + t.Errorf("Timeout when expecting %s update", xdsname) + } + t.Logf("Good %s response callback executed", xdsname) +} + +// sendBadResp sends a bad response with the given version. This response will +// be nacked, so we expect a request with the previous version (version-1). +// +// But the nonce in request should be the new nonce. +func sendBadResp(t *testing.T, xdsname string, fakeServer *fakeserver.Server, version int, wantReq *xdspb.DiscoveryRequest) { + var typeURL string + switch xdsname { + case "LDS": + typeURL = ldsURL + case "RDS": + typeURL = rdsURL + case "CDS": + typeURL = cdsURL + case "EDS": + typeURL = edsURL + } + nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, &xdspb.DiscoveryResponse{ + Resources: []*anypb.Any{{}}, + TypeUrl: typeURL, + }, version) + t.Logf("Bad %s response pushed to fakeServer...", xdsname) + if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(version-1), nonce); err != nil { + t.Errorf("Failed to receive %s request: %v", xdsname, err) + } + t.Logf("Bad %s response nacked", xdsname) +} + +// TestV2ClientAck verifies that valid responses are acked, and invalid ones +// are nacked. +// +// This test also verifies the version for different types are independent. +func TestV2ClientAck(t *testing.T) { + var ( + versionLDS = 1000 + versionRDS = 2000 + versionCDS = 3000 + versionEDS = 4000 + ) + + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) + defer v2c.close() + t.Log("Started xds v2Client...") + + // Start the watch, send a good response, and check for ack. + cbLDS := startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest) + sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS) + versionLDS++ + cbRDS := startXDS(t, "RDS", v2c, fakeServer.XDSRequestChan, goodRDSRequest) + sendGoodResp(t, "RDS", fakeServer, versionRDS, goodRDSResponse1, goodRDSRequest, cbRDS) + versionRDS++ + cbCDS := startXDS(t, "CDS", v2c, fakeServer.XDSRequestChan, goodCDSRequest) + sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS) + versionCDS++ + cbEDS := startXDS(t, "EDS", v2c, fakeServer.XDSRequestChan, goodEDSRequest) + sendGoodResp(t, "EDS", fakeServer, versionEDS, goodEDSResponse1, goodEDSRequest, cbEDS) + versionEDS++ + + // Send a bad response, and check for nack. + sendBadResp(t, "LDS", fakeServer, versionLDS, goodLDSRequest) + versionLDS++ + sendBadResp(t, "RDS", fakeServer, versionRDS, goodRDSRequest) + versionRDS++ + sendBadResp(t, "CDS", fakeServer, versionCDS, goodCDSRequest) + versionCDS++ + sendBadResp(t, "EDS", fakeServer, versionEDS, goodEDSRequest) + versionEDS++ + + // send another good response, and check for ack, with the new version. + sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS) + versionLDS++ + sendGoodResp(t, "RDS", fakeServer, versionRDS, goodRDSResponse1, goodRDSRequest, cbRDS) + versionRDS++ + sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS) + versionCDS++ + sendGoodResp(t, "EDS", fakeServer, versionEDS, goodEDSResponse1, goodEDSRequest, cbEDS) + versionEDS++ +} + +// Test when the first response is invalid, and is nacked, the nack requests +// should have an empty version string. +func TestV2ClientAckFirstIsNack(t *testing.T) { + var versionLDS = 1000 + + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) + defer v2c.close() + t.Log("Started xds v2Client...") + + // Start the watch, send a good response, and check for ack. + cbLDS := startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest) + + nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, &xdspb.DiscoveryResponse{ + Resources: []*anypb.Any{{}}, + TypeUrl: ldsURL, + }, versionLDS) + t.Logf("Bad response pushed to fakeServer...") + + // The expected version string is an empty string, because this is the first + // response, and it's nacked (so there's no previous ack version). + if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, "", nonce); err != nil { + t.Errorf("Failed to receive request: %v", err) + } + t.Logf("Bad response nacked") + versionLDS++ + + sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS) + versionLDS++ +} + +// Test when a nack is sent after a new watch, we nack with the previous acked +// version (instead of resetting to empty string). +func TestV2ClientAckNackAfterNewWatch(t *testing.T) { + var versionLDS = 1000 + + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) + defer v2c.close() + t.Log("Started xds v2Client...") + + // Start the watch, send a good response, and check for ack. + cbLDS := startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest) + sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS) + versionLDS++ + + // Start a new watch. + cbLDS = startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest) + + // This is an invalid response after the new watch. + nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, &xdspb.DiscoveryResponse{ + Resources: []*anypb.Any{{}}, + TypeUrl: ldsURL, + }, versionLDS) + t.Logf("Bad response pushed to fakeServer...") + + // The expected version string is the previous acked version. + if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, strconv.Itoa(versionLDS-1), nonce); err != nil { + t.Errorf("Failed to receive request: %v", err) + } + t.Logf("Bad response nacked") + versionLDS++ + + sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS) + versionLDS++ +} diff --git a/plugin/traffic/xds/v2client_test.go b/plugin/traffic/xds/v2client_test.go new file mode 100644 index 000000000..ff2773dba --- /dev/null +++ b/plugin/traffic/xds/v2client_test.go @@ -0,0 +1,444 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xds + +import ( + "errors" + "testing" + "time" + + "github.com/golang/protobuf/proto" + "google.golang.org/grpc/xds/internal/testutils" + "google.golang.org/grpc/xds/internal/testutils/fakeserver" + + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + basepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + routepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/route" + httppb "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/http_connection_manager/v2" + listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v2" + anypb "github.com/golang/protobuf/ptypes/any" + structpb "github.com/golang/protobuf/ptypes/struct" +) + +const ( + defaultTestTimeout = 1 * time.Second + goodLDSTarget1 = "lds.target.good:1111" + goodLDSTarget2 = "lds.target.good:2222" + goodRouteName1 = "GoodRouteConfig1" + goodRouteName2 = "GoodRouteConfig2" + goodEDSName = "GoodClusterAssignment1" + uninterestingRouteName = "UninterestingRouteName" + goodMatchingDomain = "lds.target.good" + uninterestingDomain = "uninteresting.domain" + goodClusterName1 = "GoodClusterName1" + goodClusterName2 = "GoodClusterName2" + uninterestingClusterName = "UninterestingClusterName" + httpConnManagerURL = "type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager" +) + +var ( + goodNodeProto = &basepb.Node{ + Id: "ENVOY_NODE_ID", + Metadata: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "TRAFFICDIRECTOR_GRPC_HOSTNAME": { + Kind: &structpb.Value_StringValue{StringValue: "trafficdirector"}, + }, + }, + }, + } + goodLDSRequest = &xdspb.DiscoveryRequest{ + Node: goodNodeProto, + TypeUrl: ldsURL, + ResourceNames: []string{goodLDSTarget1}, + } + goodCDSRequest = &xdspb.DiscoveryRequest{ + Node: goodNodeProto, + TypeUrl: cdsURL, + ResourceNames: []string{goodClusterName1}, + } + goodEDSRequest = &xdspb.DiscoveryRequest{ + Node: goodNodeProto, + TypeUrl: edsURL, + ResourceNames: []string{goodEDSName}, + } + goodHTTPConnManager1 = &httppb.HttpConnectionManager{} + marshaledConnMgr1, _ = proto.Marshal(goodHTTPConnManager1) + emptyHTTPConnManager = &httppb.HttpConnectionManager{ + RouteSpecifier: &httppb.HttpConnectionManager_Rds{ + Rds: &httppb.Rds{}, + }, + } + emptyMarshaledConnMgr, _ = proto.Marshal(emptyHTTPConnManager) + connMgrWithScopedRoutes = &httppb.HttpConnectionManager{ + RouteSpecifier: &httppb.HttpConnectionManager_ScopedRoutes{}, + } + marshaledConnMgrWithScopedRoutes, _ = proto.Marshal(connMgrWithScopedRoutes) + goodListener1 = &xdspb.Listener{ + Name: goodLDSTarget1, + ApiListener: &listenerpb.ApiListener{ + ApiListener: &anypb.Any{ + TypeUrl: httpConnManagerURL, + Value: marshaledConnMgr1, + }, + }, + } + marshaledListener1, _ = proto.Marshal(goodListener1) + goodListener2 = &xdspb.Listener{ + Name: goodLDSTarget2, + ApiListener: &listenerpb.ApiListener{ + ApiListener: &anypb.Any{ + TypeUrl: httpConnManagerURL, + Value: marshaledConnMgr1, + }, + }, + } + marshaledListener2, _ = proto.Marshal(goodListener2) + noAPIListener = &xdspb.Listener{Name: goodLDSTarget1} + marshaledNoAPIListener, _ = proto.Marshal(noAPIListener) + badAPIListener1 = &xdspb.Listener{ + Name: goodLDSTarget1, + ApiListener: &listenerpb.ApiListener{ + ApiListener: &anypb.Any{ + TypeUrl: httpConnManagerURL, + Value: []byte{1, 2, 3, 4}, + }, + }, + } + badAPIListener2 = &xdspb.Listener{ + Name: goodLDSTarget2, + ApiListener: &listenerpb.ApiListener{ + ApiListener: &anypb.Any{ + TypeUrl: httpConnManagerURL, + Value: []byte{1, 2, 3, 4}, + }, + }, + } + badlyMarshaledAPIListener2, _ = proto.Marshal(badAPIListener2) + badResourceListener = &xdspb.Listener{ + Name: goodLDSTarget1, + ApiListener: &listenerpb.ApiListener{ + ApiListener: &anypb.Any{ + TypeUrl: ldsURL, + Value: marshaledListener1, + }, + }, + } + listenerWithEmptyHTTPConnMgr = &xdspb.Listener{ + Name: goodLDSTarget1, + ApiListener: &listenerpb.ApiListener{ + ApiListener: &anypb.Any{ + TypeUrl: httpConnManagerURL, + Value: emptyMarshaledConnMgr, + }, + }, + } + listenerWithScopedRoutesRouteConfig = &xdspb.Listener{ + Name: goodLDSTarget1, + ApiListener: &listenerpb.ApiListener{ + ApiListener: &anypb.Any{ + TypeUrl: httpConnManagerURL, + Value: marshaledConnMgrWithScopedRoutes, + }, + }, + } + goodLDSResponse1 = &xdspb.DiscoveryResponse{ + Resources: []*anypb.Any{ + { + TypeUrl: ldsURL, + Value: marshaledListener1, + }, + }, + TypeUrl: ldsURL, + } + goodLDSResponse2 = &xdspb.DiscoveryResponse{ + Resources: []*anypb.Any{ + { + TypeUrl: ldsURL, + Value: marshaledListener2, + }, + }, + TypeUrl: ldsURL, + } + emptyLDSResponse = &xdspb.DiscoveryResponse{TypeUrl: ldsURL} + badlyMarshaledLDSResponse = &xdspb.DiscoveryResponse{ + Resources: []*anypb.Any{ + { + TypeUrl: ldsURL, + Value: []byte{1, 2, 3, 4}, + }, + }, + TypeUrl: ldsURL, + } + badResourceTypeInLDSResponse = &xdspb.DiscoveryResponse{ + Resources: []*anypb.Any{ + { + TypeUrl: httpConnManagerURL, + Value: marshaledConnMgr1, + }, + }, + TypeUrl: ldsURL, + } + ldsResponseWithMultipleResources = &xdspb.DiscoveryResponse{ + Resources: []*anypb.Any{ + { + TypeUrl: ldsURL, + Value: marshaledListener2, + }, + { + TypeUrl: ldsURL, + Value: marshaledListener1, + }, + }, + TypeUrl: ldsURL, + } + noAPIListenerLDSResponse = &xdspb.DiscoveryResponse{ + Resources: []*anypb.Any{ + { + TypeUrl: ldsURL, + Value: marshaledNoAPIListener, + }, + }, + TypeUrl: ldsURL, + } + goodBadUglyLDSResponse = &xdspb.DiscoveryResponse{ + Resources: []*anypb.Any{ + { + TypeUrl: ldsURL, + Value: marshaledListener2, + }, + { + TypeUrl: ldsURL, + Value: marshaledListener1, + }, + { + TypeUrl: ldsURL, + Value: badlyMarshaledAPIListener2, + }, + }, + TypeUrl: ldsURL, + } + goodRouteConfig1 = &xdspb.RouteConfiguration{ + Name: goodRouteName1, + VirtualHosts: []*routepb.VirtualHost{ + { + Domains: []string{uninterestingDomain}, + Routes: []*routepb.Route{ + { + Match: &routepb.RouteMatch{PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: ""}}, + Action: &routepb.Route_Route{ + Route: &routepb.RouteAction{ + ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: uninterestingClusterName}, + }, + }, + }, + }, + }, + { + Domains: []string{goodMatchingDomain}, + Routes: []*routepb.Route{ + { + Match: &routepb.RouteMatch{PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: ""}}, + Action: &routepb.Route_Route{ + Route: &routepb.RouteAction{ + ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: goodClusterName1}, + }, + }, + }, + }, + }, + }, + } + marshaledGoodRouteConfig1, _ = proto.Marshal(goodRouteConfig1) + goodRouteConfig2 = &xdspb.RouteConfiguration{ + Name: goodRouteName2, + VirtualHosts: []*routepb.VirtualHost{ + { + Domains: []string{uninterestingDomain}, + Routes: []*routepb.Route{ + { + Match: &routepb.RouteMatch{PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: ""}}, + Action: &routepb.Route_Route{ + Route: &routepb.RouteAction{ + ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: uninterestingClusterName}, + }, + }, + }, + }, + }, + { + Domains: []string{goodMatchingDomain}, + Routes: []*routepb.Route{ + { + Match: &routepb.RouteMatch{PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: ""}}, + Action: &routepb.Route_Route{ + Route: &routepb.RouteAction{ + ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: goodClusterName2}, + }, + }, + }, + }, + }, + }, + } + marshaledGoodRouteConfig2, _ = proto.Marshal(goodRouteConfig2) + uninterestingRouteConfig = &xdspb.RouteConfiguration{ + Name: uninterestingRouteName, + VirtualHosts: []*routepb.VirtualHost{ + { + Domains: []string{uninterestingDomain}, + Routes: []*routepb.Route{ + { + Action: &routepb.Route_Route{ + Route: &routepb.RouteAction{ + ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: uninterestingClusterName}, + }, + }, + }, + }, + }, + }, + } + marshaledUninterestingRouteConfig, _ = proto.Marshal(uninterestingRouteConfig) +) + +// TestV2ClientBackoffAfterRecvError verifies if the v2Client backoffs when it +// encounters a Recv error while receiving an LDS response. +func TestV2ClientBackoffAfterRecvError(t *testing.T) { + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + + // Override the v2Client backoff function with this, so that we can verify + // that a backoff actually was triggerred. + boCh := make(chan int, 1) + clientBackoff := func(v int) time.Duration { + boCh <- v + return 0 + } + + v2c := newV2Client(cc, goodNodeProto, clientBackoff) + defer v2c.close() + t.Log("Started xds v2Client...") + + callbackCh := make(chan struct{}) + v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) { + close(callbackCh) + }) + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + t.Fatalf("Timeout expired when expecting an LDS request") + } + t.Log("FakeServer received request...") + + fakeServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("RPC error")} + t.Log("Bad LDS response pushed to fakeServer...") + + timer := time.NewTimer(defaultTestTimeout) + select { + case <-timer.C: + t.Fatal("Timeout when expecting LDS update") + case <-boCh: + timer.Stop() + t.Log("v2Client backed off before retrying...") + case <-callbackCh: + t.Fatal("Received unexpected LDS callback") + } +} + +// TestV2ClientRetriesAfterBrokenStream verifies the case where a stream +// encountered a Recv() error, and is expected to send out xDS requests for +// registered watchers once it comes back up again. +func TestV2ClientRetriesAfterBrokenStream(t *testing.T) { + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) + defer v2c.close() + t.Log("Started xds v2Client...") + + callbackCh := testutils.NewChannel() + v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) { + t.Logf("Received LDS callback with ldsUpdate {%+v} and error {%v}", u, err) + callbackCh.Send(struct{}{}) + }) + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + t.Fatalf("Timeout expired when expecting an LDS request") + } + t.Log("FakeServer received request...") + + fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1} + t.Log("Good LDS response pushed to fakeServer...") + + if _, err := callbackCh.Receive(); err != nil { + t.Fatal("Timeout when expecting LDS update") + } + + // Read the ack, so the next request is sent after stream re-creation. + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + t.Fatalf("Timeout expired when expecting an LDS ACK") + } + + fakeServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("RPC error")} + t.Log("Bad LDS response pushed to fakeServer...") + + val, err := fakeServer.XDSRequestChan.Receive() + if err == testutils.ErrRecvTimeout { + t.Fatalf("Timeout expired when expecting LDS update") + } + gotRequest := val.(*fakeserver.Request) + if !proto.Equal(gotRequest.Req, goodLDSRequest) { + t.Fatalf("gotRequest: %+v, wantRequest: %+v", gotRequest.Req, goodLDSRequest) + } +} + +// TestV2ClientCancelWatch verifies that the registered watch callback is not +// invoked if a response is received after the watcher is cancelled. +func TestV2ClientCancelWatch(t *testing.T) { + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) + defer v2c.close() + t.Log("Started xds v2Client...") + + callbackCh := testutils.NewChannel() + cancelFunc := v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) { + t.Logf("Received LDS callback with ldsUpdate {%+v} and error {%v}", u, err) + callbackCh.Send(struct{}{}) + }) + if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { + t.Fatalf("Timeout expired when expecting an LDS request") + } + t.Log("FakeServer received request...") + + fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1} + t.Log("Good LDS response pushed to fakeServer...") + + if _, err := callbackCh.Receive(); err != nil { + t.Fatal("Timeout when expecting LDS update") + } + + cancelFunc() + + fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1} + t.Log("Another good LDS response pushed to fakeServer...") + + if _, err := callbackCh.Receive(); err != testutils.ErrRecvTimeout { + t.Fatalf("Watch callback invoked after the watcher was cancelled") + } +}