diff --git a/plugin/traffic/README.md b/plugin/traffic/README.md index 6c07418aa..ec053d5e9 100644 --- a/plugin/traffic/README.md +++ b/plugin/traffic/README.md @@ -114,3 +114,13 @@ will be stripped. You can optionally sign responses on the fly by using the *dns * wording: cluster, endpoints, assignments, service_name are all used and roughly mean the same thing; unify this. +const ( + HealthStatus_UNKNOWN HealthStatus = 0 + HealthStatus_HEALTHY HealthStatus = 1 + HealthStatus_UNHEALTHY HealthStatus = 2 + HealthStatus_DRAINING HealthStatus = 3 + HealthStatus_TIMEOUT HealthStatus = 4 + HealthStatus_DEGRADED HealthStatus = 5 +) + +https://www.envoyproxy.io/docs/envoy/v1.11.2/api-docs/xds_protocol diff --git a/plugin/traffic/setup.go b/plugin/traffic/setup.go index d4f8509de..d091a9882 100644 --- a/plugin/traffic/setup.go +++ b/plugin/traffic/setup.go @@ -1,14 +1,12 @@ package traffic import ( - "fmt" "math/rand" "time" "github.com/coredns/coredns/core/dnsserver" "github.com/coredns/coredns/plugin" clog "github.com/coredns/coredns/plugin/pkg/log" - "github.com/coredns/coredns/plugin/traffic/xds" "github.com/caddyserver/caddy" ) @@ -33,8 +31,19 @@ func setup(c *caddy.Controller) error { return t }) - t.c.WatchCluster("", func(x xds.CDSUpdate, _ error) { fmt.Printf("CDSUpdate: %+v\n", x) }) - t.c.WatchEndpoints("", func(x *xds.EDSUpdate, _ error) { fmt.Printf("EDSUpdate: %+v\n", x) }) + stream, err := t.c.Run() + if err != nil { + return plugin.Error("traffic", err) + } + + if err := t.c.ClusterDiscovery(stream, "", "", []string{}); err != nil { + log.Error(err) + } + + err = t.c.Receive(stream) + if err != nil { + return plugin.Error("traffic", err) + } return nil } diff --git a/plugin/traffic/traffic.go b/plugin/traffic/traffic.go index b320df070..b8a41dfc8 100644 --- a/plugin/traffic/traffic.go +++ b/plugin/traffic/traffic.go @@ -8,7 +8,6 @@ import ( "github.com/coredns/coredns/plugin" "github.com/coredns/coredns/plugin/pkg/response" "github.com/coredns/coredns/plugin/traffic/xds" - "github.com/coredns/coredns/plugin/traffic/xds/bootstrap" "github.com/miekg/dns" ) @@ -21,11 +20,7 @@ type Traffic struct { // New returns a pointer to a new and initialized Traffic. func New() (*Traffic, error) { - config, err := bootstrap.NewConfig() - if err != nil { - return nil, err - } - c, err := xds.New(xds.Options{Config: *config}) + c, err := xds.New(":18000", "mycoredns") if err != nil { return nil, err } diff --git a/plugin/traffic/xds/README_coredns.md b/plugin/traffic/xds/README_coredns.md deleted file mode 100644 index 6c5ece8f8..000000000 --- a/plugin/traffic/xds/README_coredns.md +++ /dev/null @@ -1,10 +0,0 @@ -This code is copied from -[https://github.com/grpc/grpc-go/tree/master/xds](https://github.com/grpc/grpc-go/tree/master/xds). -Grpc-go is also a consumer of the Envoy xDS data and acts upon it. - -The *traffic* plugin only cares about clusters and endpoints, the following bits are deleted: - -* lDS; listener discovery is not used here. -* rDS: routes have no use for DNS responses. - -Load reporting is also not implemented, although this can be done on the DNS level. diff --git a/plugin/traffic/xds/bootstrap/bootstrap.go b/plugin/traffic/xds/bootstrap/bootstrap.go deleted file mode 100644 index 910120c0c..000000000 --- a/plugin/traffic/xds/bootstrap/bootstrap.go +++ /dev/null @@ -1,165 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -// Package bootstrap provides the functionality to initialize certain aspects -// of an xDS client by reading a bootstrap file. -package bootstrap - -import ( - "bytes" - "encoding/json" - "fmt" - "io/ioutil" - "os" - - "github.com/coredns/coredns/plugin/pkg/log" - - corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" - "github.com/golang/protobuf/jsonpb" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/google" - "google.golang.org/grpc/grpclog" -) - -const ( - // Environment variable which holds the name of the xDS bootstrap file. - fileEnv = "GRPC_XDS_BOOTSTRAP" - // Type name for Google default credentials. - googleDefaultCreds = "google_default" -) - -var gRPCVersion = fmt.Sprintf("gRPC-Go %s", grpc.Version) - -// For overriding in unit tests. -var fileReadFunc = ioutil.ReadFile - -// Config provides the xDS client with several key bits of information that it -// requires in its interaction with an xDS server. The Config is initialized -// from the bootstrap file. -type Config struct { - // BalancerName is the name of the xDS server to connect to. - // - // The bootstrap file contains a list of servers (with name+creds), but we - // pick the first one. - BalancerName string - // Creds contains the credentials to be used while talking to the xDS - // server, as a grpc.DialOption. - Creds grpc.DialOption - // NodeProto contains the node proto to be used in xDS requests. - NodeProto *corepb.Node -} - -type channelCreds struct { - Type string `json:"type"` - Config json.RawMessage `json:"config"` -} - -type xdsServer struct { - ServerURI string `json:"server_uri"` - ChannelCreds []channelCreds `json:"channel_creds"` -} - -// NewConfig returns a new instance of Config initialized by reading the -// bootstrap file found at ${GRPC_XDS_BOOTSTRAP}. -// -// The format of the bootstrap file will be as follows: -// { -// "xds_server": { -// "server_uri": , -// "channel_creds": [ -// { -// "type": , -// "config": -// } -// ] -// }, -// "node": -// } -// -// Currently, we support exactly one type of credential, which is -// "google_default", where we use the host's default certs for transport -// credentials and a Google oauth token for call credentials. -// -// This function tries to process as much of the bootstrap file as possible (in -// the presence of the errors) and may return a Config object with certain -// fields left unspecified, in which case the caller should use some sane -// defaults. -func NewConfig() (*Config, error) { - config := &Config{} - - fName, ok := os.LookupEnv(fileEnv) - if !ok { - return config, fmt.Errorf("xds: %s environment variable not set", fileEnv) - } - - grpclog.Infof("xds: Reading bootstrap file from %s", fName) - data, err := fileReadFunc(fName) - if err != nil { - return config, fmt.Errorf("xds: bootstrap file {%v} read failed: %v", fName, err) - } - - var jsonData map[string]json.RawMessage - if err := json.Unmarshal(data, &jsonData); err != nil { - return config, fmt.Errorf("xds: json.Unmarshal(%v) failed during bootstrap: %v", string(data), err) - } - - m := jsonpb.Unmarshaler{AllowUnknownFields: true} - for k, v := range jsonData { - switch k { - case "node": - n := &corepb.Node{} - if err := m.Unmarshal(bytes.NewReader(v), n); err != nil { - log.Errorf("xds: jsonpb.Unmarshal(%v) for field %q failed during bootstrap: %v", string(v), k, err) - break - } - config.NodeProto = n - case "xds_servers": - var servers []*xdsServer - if err := json.Unmarshal(v, &servers); err != nil { - log.Errorf("xds: json.Unmarshal(%v) for field %q failed during bootstrap: %v", string(v), k, err) - break - } - if len(servers) < 1 { - log.Errorf("xds: bootstrap file parsing failed during bootstrap: file doesn't contain any xds server to connect to") - break - } - xs := servers[0] - config.BalancerName = xs.ServerURI - for _, cc := range xs.ChannelCreds { - if cc.Type == googleDefaultCreds { - config.Creds = grpc.WithCredentialsBundle(google.NewComputeEngineCredentials()) - // We stop at the first credential type that we support. - break - } - } - default: - // Do not fail the xDS bootstrap when an unknown field is seen. - log.Warningf("xds: unexpected data in bootstrap file: {%v, %v}", k, string(v)) - } - } - - // If we don't find a nodeProto in the bootstrap file, we just create an - // empty one here. That way, callers of this function can always expect - // that the NodeProto field is non-nil. - if config.NodeProto == nil { - config.NodeProto = &corepb.Node{} - } - config.NodeProto.BuildVersion = gRPCVersion - - return config, nil -} diff --git a/plugin/traffic/xds/bootstrap/bootstrap_test.go b/plugin/traffic/xds/bootstrap/bootstrap_test.go deleted file mode 100644 index a55e1d0f7..000000000 --- a/plugin/traffic/xds/bootstrap/bootstrap_test.go +++ /dev/null @@ -1,260 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package bootstrap - -import ( - "os" - "testing" - - "github.com/golang/protobuf/proto" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/google" - - corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" - structpb "github.com/golang/protobuf/ptypes/struct" -) - -var ( - nodeProto = &corepb.Node{ - Id: "ENVOY_NODE_ID", - Metadata: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - "TRAFFICDIRECTOR_GRPC_HOSTNAME": { - Kind: &structpb.Value_StringValue{StringValue: "trafficdirector"}, - }, - }, - }, - BuildVersion: gRPCVersion, - } - nilCredsConfig = &Config{ - BalancerName: "trafficdirector.googleapis.com:443", - Creds: nil, - NodeProto: nodeProto, - } - nonNilCredsConfig = &Config{ - BalancerName: "trafficdirector.googleapis.com:443", - Creds: grpc.WithCredentialsBundle(google.NewComputeEngineCredentials()), - NodeProto: nodeProto, - } -) - -// TestNewConfig exercises the functionality in NewConfig with different -// bootstrap file contents. It overrides the fileReadFunc by returning -// bootstrap file contents defined in this test, instead of reading from a -// file. -func TestNewConfig(t *testing.T) { - bootstrapFileMap := map[string]string{ - "empty": "", - "badJSON": `["test": 123]`, - "emptyNodeProto": ` - { - "xds_servers" : [{ - "server_uri": "trafficdirector.googleapis.com:443" - }] - }`, - "emptyXdsServer": ` - { - "node": { - "id": "ENVOY_NODE_ID", - "metadata": { - "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" - } - } - }`, - "unknownTopLevelFieldInFile": ` - { - "node": { - "id": "ENVOY_NODE_ID", - "metadata": { - "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" - } - }, - "xds_servers" : [{ - "server_uri": "trafficdirector.googleapis.com:443", - "channel_creds": [ - { "type": "not-google-default" } - ] - }], - "unknownField": "foobar" - }`, - "unknownFieldInNodeProto": ` - { - "node": { - "id": "ENVOY_NODE_ID", - "unknownField": "foobar", - "metadata": { - "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" - } - } - }`, - "unknownFieldInXdsServer": ` - { - "node": { - "id": "ENVOY_NODE_ID", - "metadata": { - "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" - } - }, - "xds_servers" : [{ - "server_uri": "trafficdirector.googleapis.com:443", - "channel_creds": [ - { "type": "not-google-default" } - ], - "unknownField": "foobar" - }] - }`, - "emptyChannelCreds": ` - { - "node": { - "id": "ENVOY_NODE_ID", - "metadata": { - "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" - } - }, - "xds_servers" : [{ - "server_uri": "trafficdirector.googleapis.com:443" - }] - }`, - "nonGoogleDefaultCreds": ` - { - "node": { - "id": "ENVOY_NODE_ID", - "metadata": { - "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" - } - }, - "xds_servers" : [{ - "server_uri": "trafficdirector.googleapis.com:443", - "channel_creds": [ - { "type": "not-google-default" } - ] - }] - }`, - "multipleChannelCreds": ` - { - "node": { - "id": "ENVOY_NODE_ID", - "metadata": { - "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" - } - }, - "xds_servers" : [{ - "server_uri": "trafficdirector.googleapis.com:443", - "channel_creds": [ - { "type": "not-google-default" }, - { "type": "google_default" } - ] - }] - }`, - "goodBootstrap": ` - { - "node": { - "id": "ENVOY_NODE_ID", - "metadata": { - "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" - } - }, - "xds_servers" : [{ - "server_uri": "trafficdirector.googleapis.com:443", - "channel_creds": [ - { "type": "google_default" } - ] - }] - }`, - "multipleXDSServers": ` - { - "node": { - "id": "ENVOY_NODE_ID", - "metadata": { - "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" - } - }, - "xds_servers" : [ - { - "server_uri": "trafficdirector.googleapis.com:443", - "channel_creds": [{ "type": "google_default" }] - }, - { - "server_uri": "backup.never.use.com:1234", - "channel_creds": [{ "type": "not-google-default" }] - } - ] - }`, - } - - oldFileReadFunc := fileReadFunc - fileReadFunc = func(name string) ([]byte, error) { - if b, ok := bootstrapFileMap[name]; ok { - return []byte(b), nil - } - return nil, os.ErrNotExist - } - defer func() { - fileReadFunc = oldFileReadFunc - os.Unsetenv(fileEnv) - }() - - tests := []struct { - name string - wantConfig *Config - }{ - {"nonExistentBootstrapFile", &Config{}}, - {"empty", &Config{}}, - {"badJSON", &Config{}}, - {"emptyNodeProto", &Config{ - BalancerName: "trafficdirector.googleapis.com:443", - NodeProto: &corepb.Node{BuildVersion: gRPCVersion}, - }}, - {"emptyXdsServer", &Config{NodeProto: nodeProto}}, - {"unknownTopLevelFieldInFile", nilCredsConfig}, - {"unknownFieldInNodeProto", &Config{NodeProto: nodeProto}}, - {"unknownFieldInXdsServer", nilCredsConfig}, - {"emptyChannelCreds", nilCredsConfig}, - {"nonGoogleDefaultCreds", nilCredsConfig}, - {"multipleChannelCreds", nonNilCredsConfig}, - {"goodBootstrap", nonNilCredsConfig}, - {"multipleXDSServers", nonNilCredsConfig}, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - if err := os.Setenv(fileEnv, test.name); err != nil { - t.Fatalf("os.Setenv(%s, %s) failed with error: %v", fileEnv, test.name, err) - } - config := NewConfig() - if config.BalancerName != test.wantConfig.BalancerName { - t.Errorf("config.BalancerName is %s, want %s", config.BalancerName, test.wantConfig.BalancerName) - } - if !proto.Equal(config.NodeProto, test.wantConfig.NodeProto) { - t.Errorf("config.NodeProto is %#v, want %#v", config.NodeProto, test.wantConfig.NodeProto) - } - if (config.Creds != nil) != (test.wantConfig.Creds != nil) { - t.Errorf("config.Creds is %#v, want %#v", config.Creds, test.wantConfig.Creds) - } - }) - } -} - -func TestNewConfigEnvNotSet(t *testing.T) { - os.Unsetenv(fileEnv) - wantConfig := Config{} - if config := NewConfig(); *config != wantConfig { - t.Errorf("NewConfig() returned : %#v, wanted an empty Config object", config) - } -} diff --git a/plugin/traffic/xds/buffer/unbounded.go b/plugin/traffic/xds/buffer/unbounded.go deleted file mode 100644 index 9f6a0c120..000000000 --- a/plugin/traffic/xds/buffer/unbounded.go +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -// Package buffer provides an implementation of an unbounded buffer. -package buffer - -import "sync" - -// Unbounded is an implementation of an unbounded buffer which does not use -// extra goroutines. This is typically used for passing updates from one entity -// to another within gRPC. -// -// All methods on this type are thread-safe and don't block on anything except -// the underlying mutex used for synchronization. -// -// Unbounded supports values of any type to be stored in it by using a channel -// of `interface{}`. This means that a call to Put() incurs an extra memory -// allocation, and also that users need a type assertion while reading. For -// performance critical code paths, using Unbounded is strongly discouraged and -// defining a new type specific implementation of this buffer is preferred. See -// internal/transport/transport.go for an example of this. -type Unbounded struct { - c chan interface{} - mu sync.Mutex - backlog []interface{} -} - -// NewUnbounded returns a new instance of Unbounded. -func NewUnbounded() *Unbounded { - return &Unbounded{c: make(chan interface{}, 1)} -} - -// Put adds t to the unbounded buffer. -func (b *Unbounded) Put(t interface{}) { - b.mu.Lock() - if len(b.backlog) == 0 { - select { - case b.c <- t: - b.mu.Unlock() - return - default: - } - } - b.backlog = append(b.backlog, t) - b.mu.Unlock() -} - -// Load sends the earliest buffered data, if any, onto the read channel -// returned by Get(). Users are expected to call this every time they read a -// value from the read channel. -func (b *Unbounded) Load() { - b.mu.Lock() - if len(b.backlog) > 0 { - select { - case b.c <- b.backlog[0]: - b.backlog[0] = nil - b.backlog = b.backlog[1:] - default: - } - } - b.mu.Unlock() -} - -// Get returns a read channel on which values added to the buffer, via Put(), -// are sent on. -// -// Upon reading a value from this channel, users are expected to call Load() to -// send the next buffered value onto the channel if there is any. -func (b *Unbounded) Get() <-chan interface{} { - return b.c -} diff --git a/plugin/traffic/xds/buffer/unbounded_test.go b/plugin/traffic/xds/buffer/unbounded_test.go deleted file mode 100644 index c8067019b..000000000 --- a/plugin/traffic/xds/buffer/unbounded_test.go +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package buffer - -import ( - "reflect" - "sort" - "sync" - "testing" -) - -const ( - numWriters = 10 - numWrites = 10 -) - -// wantReads contains the set of values expected to be read by the reader -// goroutine in the tests. -var wantReads []int - -func init() { - for i := 0; i < numWriters; i++ { - for j := 0; j < numWrites; j++ { - wantReads = append(wantReads, i) - } - } -} - -// TestSingleWriter starts one reader and one writer goroutine and makes sure -// that the reader gets all the value added to the buffer by the writer. -func TestSingleWriter(t *testing.T) { - ub := NewUnbounded() - reads := []int{} - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - ch := ub.Get() - for i := 0; i < numWriters*numWrites; i++ { - r := <-ch - reads = append(reads, r.(int)) - ub.Load() - } - }() - - wg.Add(1) - go func() { - defer wg.Done() - for i := 0; i < numWriters; i++ { - for j := 0; j < numWrites; j++ { - ub.Put(i) - } - } - }() - - wg.Wait() - if !reflect.DeepEqual(reads, wantReads) { - t.Errorf("reads: %#v, wantReads: %#v", reads, wantReads) - } -} - -// TestMultipleWriters starts multiple writers and one reader goroutine and -// makes sure that the reader gets all the data written by all writers. -func TestMultipleWriters(t *testing.T) { - ub := NewUnbounded() - reads := []int{} - - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - ch := ub.Get() - for i := 0; i < numWriters*numWrites; i++ { - r := <-ch - reads = append(reads, r.(int)) - ub.Load() - } - }() - - wg.Add(numWriters) - for i := 0; i < numWriters; i++ { - go func(index int) { - defer wg.Done() - for j := 0; j < numWrites; j++ { - ub.Put(index) - } - }(i) - } - - wg.Wait() - sort.Ints(reads) - if !reflect.DeepEqual(reads, wantReads) { - t.Errorf("reads: %#v, wantReads: %#v", reads, wantReads) - } -} diff --git a/plugin/traffic/xds/cds.go b/plugin/traffic/xds/cds.go deleted file mode 100644 index c51d0a46a..000000000 --- a/plugin/traffic/xds/cds.go +++ /dev/null @@ -1,90 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package xds - -import ( - "fmt" - - xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" - "github.com/golang/protobuf/ptypes" -) - -// handleCDSResponse processes an CDS response received from the xDS server. On -// receipt of a good response, it also invokes the registered watcher callback. -func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error { - println("handlCDSResponse") - v2c.mu.Lock() - defer v2c.mu.Unlock() - - wi := v2c.watchMap[cdsURL] - if wi == nil { - return fmt.Errorf("xds: no CDS watcher found when handling CDS response: %+v", resp) - } - - var returnUpdate CDSUpdate - localCache := make(map[string]CDSUpdate) - for _, r := range resp.GetResources() { - var resource ptypes.DynamicAny - if err := ptypes.UnmarshalAny(r, &resource); err != nil { - return fmt.Errorf("xds: failed to unmarshal resource in CDS response: %v", err) - } - cluster, ok := resource.Message.(*xdspb.Cluster) - if !ok { - return fmt.Errorf("xds: unexpected resource type: %T in CDS response", resource.Message) - } - fmt.Printf("CLUSTER %+v\n", cluster) - update, err := validateCluster(cluster) - if err != nil { - return err - } - - // If the Cluster message in the CDS response did not contain a - // serviceName, we will just use the clusterName for EDS. - if update.ServiceName == "" { - update.ServiceName = cluster.GetName() - } - localCache[cluster.GetName()] = update - if cluster.GetName() == wi.target[0] { - returnUpdate = update - } - } - v2c.cdsCache = localCache - - var err error - if returnUpdate.ServiceName == "" { - err = fmt.Errorf("xds: CDS target %s not found in received response %+v", wi.target, resp) - } - wi.stopTimer() - wi.callback.(cdsCallback)(returnUpdate, err) - return nil -} - -func validateCluster(cluster *xdspb.Cluster) (CDSUpdate, error) { - emptyUpdate := CDSUpdate{ServiceName: ""} - switch { - case cluster.GetType() != xdspb.Cluster_EDS: - return emptyUpdate, fmt.Errorf("xds: unexpected cluster type %v in response: %+v", cluster.GetType(), cluster) - case cluster.GetEdsClusterConfig().GetEdsConfig().GetAds() == nil: - return emptyUpdate, fmt.Errorf("xds: unexpected edsConfig in response: %+v", cluster) - case cluster.GetLbPolicy() != xdspb.Cluster_ROUND_ROBIN: - return emptyUpdate, fmt.Errorf("xds: unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster) - } - - return CDSUpdate{ServiceName: cluster.GetEdsClusterConfig().GetServiceName()}, nil -} diff --git a/plugin/traffic/xds/cds_test.go b/plugin/traffic/xds/cds_test.go deleted file mode 100644 index c8412f52a..000000000 --- a/plugin/traffic/xds/cds_test.go +++ /dev/null @@ -1,487 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package xds - -import ( - "errors" - "fmt" - "reflect" - "testing" - "time" - - discoverypb "github.com/envoyproxy/go-control-plane/envoy/api/v2" - xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" - corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" - "github.com/golang/protobuf/proto" - anypb "github.com/golang/protobuf/ptypes/any" - "google.golang.org/grpc/xds/internal/testutils" - "google.golang.org/grpc/xds/internal/testutils/fakeserver" -) - -const ( - clusterName1 = "foo-cluster" - clusterName2 = "bar-cluster" - serviceName1 = "foo-service" - serviceName2 = "bar-service" -) - -func (v2c *v2Client) cloneCDSCacheForTesting() map[string]CDSUpdate { - v2c.mu.Lock() - defer v2c.mu.Unlock() - - cloneCache := make(map[string]CDSUpdate) - for k, v := range v2c.cdsCache { - cloneCache[k] = v - } - return cloneCache -} - -func TestValidateCluster(t *testing.T) { - emptyUpdate := CDSUpdate{ServiceName: "", EnableLRS: false} - tests := []struct { - name string - cluster *xdspb.Cluster - wantUpdate CDSUpdate - wantErr bool - }{ - { - name: "non-eds-cluster-type", - cluster: &xdspb.Cluster{ - ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_STATIC}, - EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{ - EdsConfig: &corepb.ConfigSource{ - ConfigSourceSpecifier: &corepb.ConfigSource_Ads{ - Ads: &corepb.AggregatedConfigSource{}, - }, - }, - }, - LbPolicy: xdspb.Cluster_LEAST_REQUEST, - }, - wantUpdate: emptyUpdate, - wantErr: true, - }, - { - name: "no-eds-config", - cluster: &xdspb.Cluster{ - ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS}, - LbPolicy: xdspb.Cluster_ROUND_ROBIN, - }, - wantUpdate: emptyUpdate, - wantErr: true, - }, - { - name: "no-ads-config-source", - cluster: &xdspb.Cluster{ - ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS}, - EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{}, - LbPolicy: xdspb.Cluster_ROUND_ROBIN, - }, - wantUpdate: emptyUpdate, - wantErr: true, - }, - { - name: "non-round-robin-lb-policy", - cluster: &xdspb.Cluster{ - ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS}, - EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{ - EdsConfig: &corepb.ConfigSource{ - ConfigSourceSpecifier: &corepb.ConfigSource_Ads{ - Ads: &corepb.AggregatedConfigSource{}, - }, - }, - }, - LbPolicy: xdspb.Cluster_LEAST_REQUEST, - }, - wantUpdate: emptyUpdate, - wantErr: true, - }, - { - name: "happy-case-no-service-name-no-lrs", - cluster: &xdspb.Cluster{ - ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS}, - EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{ - EdsConfig: &corepb.ConfigSource{ - ConfigSourceSpecifier: &corepb.ConfigSource_Ads{ - Ads: &corepb.AggregatedConfigSource{}, - }, - }, - }, - LbPolicy: xdspb.Cluster_ROUND_ROBIN, - }, - wantUpdate: emptyUpdate, - }, - { - name: "happy-case-no-lrs", - cluster: &xdspb.Cluster{ - ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS}, - EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{ - EdsConfig: &corepb.ConfigSource{ - ConfigSourceSpecifier: &corepb.ConfigSource_Ads{ - Ads: &corepb.AggregatedConfigSource{}, - }, - }, - ServiceName: serviceName1, - }, - LbPolicy: xdspb.Cluster_ROUND_ROBIN, - }, - wantUpdate: CDSUpdate{ServiceName: serviceName1, EnableLRS: false}, - }, - { - name: "happiest-case", - cluster: goodCluster1, - wantUpdate: CDSUpdate{ServiceName: serviceName1, EnableLRS: true}, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - gotUpdate, gotErr := validateCluster(test.cluster) - if (gotErr != nil) != test.wantErr { - t.Errorf("validateCluster(%+v) returned error: %v, wantErr: %v", test.cluster, gotErr, test.wantErr) - } - if !reflect.DeepEqual(gotUpdate, test.wantUpdate) { - t.Errorf("validateCluster(%+v) = %v, want: %v", test.cluster, gotUpdate, test.wantUpdate) - } - }) - } -} - -// TestCDSHandleResponse starts a fake xDS server, makes a ClientConn to it, -// and creates a v2Client using it. Then, it registers a CDS watcher and tests -// different CDS responses. -func TestCDSHandleResponse(t *testing.T) { - fakeServer, cc, cleanup := startServerAndGetCC(t) - defer cleanup() - - v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) - defer v2c.close() - - tests := []struct { - name string - cdsResponse *xdspb.DiscoveryResponse - wantErr bool - wantUpdate *CDSUpdate - wantUpdateErr bool - }{ - // Badly marshaled CDS response. - { - name: "badly-marshaled-response", - cdsResponse: badlyMarshaledCDSResponse, - wantErr: true, - wantUpdate: nil, - wantUpdateErr: false, - }, - // Response does not contain Cluster proto. - { - name: "no-cluster-proto-in-response", - cdsResponse: badResourceTypeInLDSResponse, - wantErr: true, - wantUpdate: nil, - wantUpdateErr: false, - }, - // Response contains no clusters. - { - name: "no-cluster", - cdsResponse: &xdspb.DiscoveryResponse{}, - wantErr: false, - wantUpdate: &CDSUpdate{}, - wantUpdateErr: true, - }, - // Response contains one good cluster we are not interested in. - { - name: "one-uninteresting-cluster", - cdsResponse: goodCDSResponse2, - wantErr: false, - wantUpdate: &CDSUpdate{}, - wantUpdateErr: true, - }, - // Response contains one cluster and it is good. - { - name: "one-good-cluster", - cdsResponse: goodCDSResponse1, - wantErr: false, - wantUpdate: &CDSUpdate{ServiceName: serviceName1, EnableLRS: true}, - wantUpdateErr: false, - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - testWatchHandle(t, &watchHandleTestcase{ - responseToHandle: test.cdsResponse, - wantHandleErr: test.wantErr, - wantUpdate: test.wantUpdate, - wantUpdateErr: test.wantUpdateErr, - - cdsWatch: v2c.watchCDS, - watchReqChan: fakeServer.XDSRequestChan, - handleXDSResp: v2c.handleCDSResponse, - }) - }) - } -} - -// TestCDSHandleResponseWithoutWatch tests the case where the v2Client receives -// a CDS response without a registered watcher. -func TestCDSHandleResponseWithoutWatch(t *testing.T) { - _, cc, cleanup := startServerAndGetCC(t) - defer cleanup() - - v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) - defer v2c.close() - - if v2c.handleCDSResponse(goodCDSResponse1) == nil { - t.Fatal("v2c.handleCDSResponse() succeeded, should have failed") - } -} - -// cdsTestOp contains all data related to one particular test operation. Not -// all fields make sense for all tests. -type cdsTestOp struct { - // target is the resource name to watch for. - target string - // responseToSend is the xDS response sent to the client - responseToSend *fakeserver.Response - // wantOpErr specfies whether the main operation should return an error. - wantOpErr bool - // wantCDSCache is the expected rdsCache at the end of an operation. - wantCDSCache map[string]CDSUpdate - // wantWatchCallback specifies if the watch callback should be invoked. - wantWatchCallback bool -} - -// testCDSCaching is a helper function which starts a fake xDS server, makes a -// ClientConn to it, creates a v2Client using it. It then reads a bunch of -// test operations to be performed from cdsTestOps and returns error, if any, -// on the provided error channel. This is executed in a separate goroutine. -func testCDSCaching(t *testing.T, cdsTestOps []cdsTestOp, errCh *testutils.Channel) { - t.Helper() - - fakeServer, cc, cleanup := startServerAndGetCC(t) - defer cleanup() - - v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) - defer v2c.close() - t.Log("Started xds v2Client...") - - callbackCh := make(chan struct{}, 1) - for _, cdsTestOp := range cdsTestOps { - // Register a watcher if required, and use a channel to signal the - // successful invocation of the callback. - if cdsTestOp.target != "" { - v2c.watchCDS(cdsTestOp.target, func(u CDSUpdate, err error) { - t.Logf("Received callback with CDSUpdate {%+v} and error {%v}", u, err) - callbackCh <- struct{}{} - }) - t.Logf("Registered a watcher for CDS target: %v...", cdsTestOp.target) - - // Wait till the request makes it to the fakeServer. This ensures that - // the watch request has been processed by the v2Client. - if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { - errCh.Send(fmt.Errorf("Timeout waiting for CDS request: %v", err)) - return - } - t.Log("FakeServer received request...") - } - - // Directly push the response through a call to handleCDSResponse, - // thereby bypassing the fakeServer. - if cdsTestOp.responseToSend != nil { - resp := cdsTestOp.responseToSend.Resp.(*discoverypb.DiscoveryResponse) - if err := v2c.handleCDSResponse(resp); (err != nil) != cdsTestOp.wantOpErr { - errCh.Send(fmt.Errorf("v2c.handleRDSResponse(%+v) returned err: %v", resp, err)) - return - } - } - - // If the test needs the callback to be invoked, just verify that - // it was invoked. Since we verify the contents of the cache, it's - // ok not to verify the contents of the callback. - if cdsTestOp.wantWatchCallback { - <-callbackCh - } - - if !reflect.DeepEqual(v2c.cloneCDSCacheForTesting(), cdsTestOp.wantCDSCache) { - errCh.Send(fmt.Errorf("gotCDSCache: %v, wantCDSCache: %v", v2c.rdsCache, cdsTestOp.wantCDSCache)) - return - } - } - t.Log("Completed all test ops successfully...") - errCh.Send(nil) -} - -// TestCDSCaching tests some end-to-end CDS flows using a fake xDS server, and -// verifies the CDS data cached at the v2Client. -func TestCDSCaching(t *testing.T) { - ops := []cdsTestOp{ - // Add an CDS watch for a cluster name (clusterName1), which returns one - // matching resource in the response. - { - target: clusterName1, - responseToSend: &fakeserver.Response{Resp: goodCDSResponse1}, - wantCDSCache: map[string]CDSUpdate{ - clusterName1: {serviceName1, true}, - }, - wantWatchCallback: true, - }, - // Push an CDS response which contains a new resource (apart from the - // one received in the previous response). This should be cached. - { - responseToSend: &fakeserver.Response{Resp: cdsResponseWithMultipleResources}, - wantCDSCache: map[string]CDSUpdate{ - clusterName1: {serviceName1, true}, - clusterName2: {serviceName2, false}, - }, - wantWatchCallback: true, - }, - // Switch the watch target to clusterName2, which was already cached. No - // response is received from the server (as expected), but we want the - // callback to be invoked with the new serviceName. - { - target: clusterName2, - wantCDSCache: map[string]CDSUpdate{ - clusterName1: {serviceName1, true}, - clusterName2: {serviceName2, false}, - }, - wantWatchCallback: true, - }, - // Push an empty CDS response. This should clear the cache. - { - responseToSend: &fakeserver.Response{Resp: &xdspb.DiscoveryResponse{TypeUrl: cdsURL}}, - wantOpErr: false, - wantCDSCache: map[string]CDSUpdate{}, - wantWatchCallback: true, - }, - } - errCh := testutils.NewChannel() - go testCDSCaching(t, ops, errCh) - waitForNilErr(t, errCh) -} - -// TestCDSWatchExpiryTimer tests the case where the client does not receive an -// CDS response for the request that it sends out. We want the watch callback -// to be invoked with an error once the watchExpiryTimer fires. -func TestCDSWatchExpiryTimer(t *testing.T) { - oldWatchExpiryTimeout := defaultWatchExpiryTimeout - defaultWatchExpiryTimeout = 500 * time.Millisecond - defer func() { - defaultWatchExpiryTimeout = oldWatchExpiryTimeout - }() - - fakeServer, cc, cleanup := startServerAndGetCC(t) - defer cleanup() - - v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) - defer v2c.close() - t.Log("Started xds v2Client...") - - callbackCh := testutils.NewChannel() - v2c.watchCDS(clusterName1, func(u CDSUpdate, err error) { - t.Logf("Received callback with CDSUpdate {%+v} and error {%v}", u, err) - if u.ServiceName != "" { - callbackCh.Send(fmt.Errorf("received serviceName %v in cdsCallback, wanted empty string", u.ServiceName)) - } - if err == nil { - callbackCh.Send(errors.New("received nil error in cdsCallback")) - } - callbackCh.Send(nil) - }) - - // Wait till the request makes it to the fakeServer. This ensures that - // the watch request has been processed by the v2Client. - if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { - t.Fatalf("Timeout expired when expecting an CDS request") - } - waitForNilErr(t, callbackCh) -} - -var ( - badlyMarshaledCDSResponse = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - { - TypeUrl: cdsURL, - Value: []byte{1, 2, 3, 4}, - }, - }, - TypeUrl: cdsURL, - } - goodCluster1 = &xdspb.Cluster{ - Name: clusterName1, - ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS}, - EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{ - EdsConfig: &corepb.ConfigSource{ - ConfigSourceSpecifier: &corepb.ConfigSource_Ads{ - Ads: &corepb.AggregatedConfigSource{}, - }, - }, - ServiceName: serviceName1, - }, - LbPolicy: xdspb.Cluster_ROUND_ROBIN, - LrsServer: &corepb.ConfigSource{ - ConfigSourceSpecifier: &corepb.ConfigSource_Self{ - Self: &corepb.SelfConfigSource{}, - }, - }, - } - marshaledCluster1, _ = proto.Marshal(goodCluster1) - goodCluster2 = &xdspb.Cluster{ - Name: clusterName2, - ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS}, - EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{ - EdsConfig: &corepb.ConfigSource{ - ConfigSourceSpecifier: &corepb.ConfigSource_Ads{ - Ads: &corepb.AggregatedConfigSource{}, - }, - }, - ServiceName: serviceName2, - }, - LbPolicy: xdspb.Cluster_ROUND_ROBIN, - } - marshaledCluster2, _ = proto.Marshal(goodCluster2) - goodCDSResponse1 = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - { - TypeUrl: cdsURL, - Value: marshaledCluster1, - }, - }, - TypeUrl: cdsURL, - } - goodCDSResponse2 = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - { - TypeUrl: cdsURL, - Value: marshaledCluster2, - }, - }, - TypeUrl: cdsURL, - } - cdsResponseWithMultipleResources = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - { - TypeUrl: cdsURL, - Value: marshaledCluster1, - }, - { - TypeUrl: cdsURL, - Value: marshaledCluster2, - }, - }, - TypeUrl: cdsURL, - } -) diff --git a/plugin/traffic/xds/client.go b/plugin/traffic/xds/client.go index 152b2ac58..27c7c77c6 100644 --- a/plugin/traffic/xds/client.go +++ b/plugin/traffic/xds/client.go @@ -1,113 +1,181 @@ /* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ +This package contains code copied from github.com/grpc/grpc-co. The license for that code is: -// Package client implementation a full fledged gRPC client for the xDS API -// used by the xds resolver and balancer implementations. +Copyright 2019 gRPC authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package xds implements a bidirectional stream to an envoy ADS management endpoint. It will stream +// updates (CDS and EDS) from there to help load balance responses to DNS clients. package xds import ( - "errors" - "fmt" - "time" + "context" + "sync" - "github.com/coredns/coredns/plugin/traffic/xds/bootstrap" + clog "github.com/coredns/coredns/plugin/pkg/log" + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" + "github.com/golang/protobuf/ptypes" "google.golang.org/grpc" ) -// Options provides all parameters required for the creation of an xDS client. -type Options struct { - // Config contains a fully populated bootstrap config. It is the - // responsibility of the caller to use some sane defaults here if the - // bootstrap process returned with certain fields left unspecified. - Config bootstrap.Config - // DialOpts contains dial options to be used when dialing the xDS server. - DialOpts []grpc.DialOption -} +var log = clog.NewWithPlugin("traffic") + +const ( + cdsURL = "type.googleapis.com/envoy.api.v2.Cluster" + edsURL = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment" +) + +type adsStream adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient -// Client is a full fledged gRPC client which queries a set of discovery APIs -// (collectively termed as xDS) on a remote management server, to discover -// various dynamic resources. A single client object will be shared by the xds -// resolver and balancer implementations. type Client struct { - opts Options - cc *grpc.ClientConn // Connection to the xDS server - v2c *v2Client // Actual xDS client implementation using the v2 API - - serviceCallback func(ServiceUpdate, error) + cc *grpc.ClientConn + ctx context.Context + assignments assignment + node *corepb.Node + cancel context.CancelFunc } -// New returns a new xdsClient configured with opts. -func New(opts Options) (*Client, error) { - switch { - case opts.Config.BalancerName == "": - return nil, errors.New("xds: no xds_server name provided in options") - case opts.Config.Creds == nil: - fmt.Printf("%s\n", errors.New("xds: no credentials provided in options")) - case opts.Config.NodeProto == nil: - return nil, errors.New("xds: no node_proto provided in options") - } +type assignment struct { + mu sync.RWMutex + cla map[string]*xdspb.ClusterLoadAssignment + version int // not sure what do with and if we should discard all clusters. +} - var dopts []grpc.DialOption - if opts.Config.Creds == nil { - dopts = append([]grpc.DialOption{grpc.WithInsecure()}, opts.DialOpts...) - } else { - dopts = append([]grpc.DialOption{opts.Config.Creds}, opts.DialOpts...) +func (a assignment) SetClusterLoadAssignment(cluster string, cla *xdspb.ClusterLoadAssignment) { + // if cla is nil we just found a cluster, check if we already know about it, or if we need to make + // a new entry + a.mu.Lock() + defer a.mu.Unlock() + _, ok := a.cla[cluster] + if !ok { + a.cla[cluster] = cla + return } - cc, err := grpc.Dial(opts.Config.BalancerName, dopts...) + if cla == nil { + return + } + a.cla[cluster] = cla + +} + +func (a assignment) ClusterLoadAssignment(cluster string) *xdspb.ClusterLoadAssignment { + return nil +} + +func (a assignment) Clusters() []string { + a.mu.RLock() + defer a.mu.RUnlock() + clusters := make([]string, len(a.cla)) + i := 0 + for k := range a.cla { + clusters[i] = k + i++ + } + return clusters +} + +// New returns a new client that's dialed to addr using node as the local identifier. +func New(addr, node string) (*Client, error) { + // todo credentials + opts := []grpc.DialOption{grpc.WithInsecure()} + cc, err := grpc.Dial(addr, opts...) if err != nil { - // An error from a non-blocking dial indicates something serious. - return nil, fmt.Errorf("xds: failed to dial balancer {%s}: %v", opts.Config.BalancerName, err) + return nil, err } + c := &Client{cc: cc, node: &corepb.Node{Id: "test-id"}} // do more with this node data? Hostname port?? + c.assignments = assignment{cla: make(map[string]*xdspb.ClusterLoadAssignment)} + c.ctx, c.cancel = context.WithCancel(context.Background()) - println("dialed balancer at", opts.Config.BalancerName) - - c := &Client{ - opts: opts, - cc: cc, - v2c: newV2Client(cc, opts.Config.NodeProto, func(int) time.Duration { return 0 }), - } return c, nil } -// Close closes the gRPC connection to the xDS server. -func (c *Client) Close() { - // TODO: Should we invoke the registered callbacks here with an error that - // the client is closed? - c.v2c.close() - c.cc.Close() +func (c *Client) Close() { c.cancel(); c.cc.Close() } + +func (c *Client) Run() (adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient, error) { + cli := adsgrpc.NewAggregatedDiscoveryServiceClient(c.cc) + stream, err := cli.StreamAggregatedResources(c.ctx) + if err != nil { + return nil, err + } + return stream, nil } -func (c *Client) Run() { - c.v2c.run() +func (c *Client) ClusterDiscovery(stream adsStream, version, nonce string, clusters []string) error { + req := &xdspb.DiscoveryRequest{ + Node: c.node, + TypeUrl: cdsURL, + ResourceNames: clusters, // empty for all + VersionInfo: version, + ResponseNonce: nonce, + } + return stream.Send(req) } -// ServiceUpdate contains update about the service. -type ServiceUpdate struct { - Cluster string +func (c *Client) EndpointDiscovery(stream adsStream, version, nonce string, clusters []string) error { + req := &xdspb.DiscoveryRequest{ + Node: c.node, + TypeUrl: edsURL, + ResourceNames: clusters, + VersionInfo: version, + ResponseNonce: nonce, + } + return stream.Send(req) } -// WatchCluster uses CDS to discover information about the provided clusterName. -func (c *Client) WatchCluster(clusterName string, cdsCb func(CDSUpdate, error)) (cancel func()) { - return c.v2c.watchCDS(clusterName, cdsCb) -} +func (c *Client) Receive(stream adsStream) error { + for { + resp, err := stream.Recv() + if err != nil { + return err + } -// WatchEndpoints uses EDS to discover information about the endpoints in a cluster. -func (c *Client) WatchEndpoints(clusterName string, edsCb func(*EDSUpdate, error)) (cancel func()) { - return c.v2c.watchEDS(clusterName, edsCb) + switch resp.GetTypeUrl() { + case cdsURL: + for _, r := range resp.GetResources() { + var any ptypes.DynamicAny + if err := ptypes.UnmarshalAny(r, &any); err != nil { + continue + } + cluster, ok := any.Message.(*xdspb.Cluster) + if !ok { + continue + } + c.assignments.SetClusterLoadAssignment(cluster.GetName(), nil) + } + println("HERER", len(resp.GetResources())) + log.Debug("Cluster discovery processed with %d resources", len(resp.GetResources())) + // ack the CDS proto, with we we've got. (empty version would be NACK) + if err := c.ClusterDiscovery(stream, resp.GetVersionInfo(), resp.GetNonce(), c.assignments.Clusters()); err != nil { + log.Warningf("Failed to acknowledge cluster discovery: %s", err) + } + // need to figure out how to handle the version exactly. + + // now kick off discovery for endpoints + if err := c.EndpointDiscovery(stream, "", "", c.assignments.Clusters()); err != nil { + log.Warningf("Failed to perform endpoint discovery: %s", err) + } + + case edsURL: + println("EDS") + default: + log.Warningf("Unknown response URL for discovery: %q", resp.GetTypeUrl()) + continue + } + } + return nil } diff --git a/plugin/traffic/xds/client_test.go b/plugin/traffic/xds/client_test.go deleted file mode 100644 index 92724046b..000000000 --- a/plugin/traffic/xds/client_test.go +++ /dev/null @@ -1,292 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package xds - -import ( - "errors" - "fmt" - "testing" - "time" - - "github.com/coredns/coredns/plugin/traffic/xds/bootstrap" - - corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" - "google.golang.org/grpc" - "google.golang.org/grpc/xds/internal/testutils" - "google.golang.org/grpc/xds/internal/testutils/fakeserver" -) - -func clientOpts(balancerName string) Options { - return Options{ - Config: bootstrap.Config{ - BalancerName: balancerName, - Creds: grpc.WithInsecure(), - NodeProto: &corepb.Node{}, - }, - // WithTimeout is deprecated. But we are OK to call it here from the - // test, so we clearly know that the dial failed. - DialOpts: []grpc.DialOption{grpc.WithTimeout(5 * time.Second), grpc.WithBlock()}, - } -} - -func TestNew(t *testing.T) { - fakeServer, cleanup, err := fakeserver.StartServer() - if err != nil { - t.Fatalf("Failed to start fake xDS server: %v", err) - } - defer cleanup() - - tests := []struct { - name string - opts Options - wantErr bool - }{ - {name: "empty-opts", opts: Options{}, wantErr: true}, - { - name: "empty-balancer-name", - opts: Options{ - Config: bootstrap.Config{ - Creds: grpc.WithInsecure(), - NodeProto: &corepb.Node{}, - }, - }, - wantErr: true, - }, - { - name: "empty-dial-creds", - opts: Options{ - Config: bootstrap.Config{ - BalancerName: "dummy", - NodeProto: &corepb.Node{}, - }, - }, - wantErr: true, - }, - { - name: "empty-node-proto", - opts: Options{ - Config: bootstrap.Config{ - BalancerName: "dummy", - Creds: grpc.WithInsecure(), - }, - }, - wantErr: true, - }, - { - name: "happy-case", - opts: clientOpts(fakeServer.Address), - wantErr: false, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - c, err := New(test.opts) - if err == nil { - defer c.Close() - } - if (err != nil) != test.wantErr { - t.Fatalf("New(%+v) = %v, wantErr: %v", test.opts, err, test.wantErr) - } - }) - } -} - -// TestWatchService tests the happy case of registering a watcher for -// service updates and receiving a good update. -func TestWatchService(t *testing.T) { - fakeServer, cleanup, err := fakeserver.StartServer() - if err != nil { - t.Fatalf("Failed to start fake xDS server: %v", err) - } - defer cleanup() - - xdsClient, err := New(clientOpts(fakeServer.Address)) - if err != nil { - t.Fatalf("New returned error: %v", err) - } - defer xdsClient.Close() - t.Log("Created an xdsClient...") - - callbackCh := testutils.NewChannel() - cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) { - if err != nil { - callbackCh.Send(fmt.Errorf("xdsClient.WatchService returned error: %v", err)) - return - } - if su.Cluster != goodClusterName1 { - callbackCh.Send(fmt.Errorf("got clusterName: %+v, want clusterName: %+v", su.Cluster, goodClusterName1)) - return - } - callbackCh.Send(nil) - }) - defer cancelWatch() - t.Log("Registered a watcher for service updates...") - - // Make the fakeServer send LDS response. - if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { - t.Fatalf("Timeout expired when expecting an LDS request") - } - fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1} - - // Make the fakeServer send RDS response. - if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { - t.Fatalf("Timeout expired when expecting an RDS request") - } - fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodRDSResponse1} - waitForNilErr(t, callbackCh) -} - -// TestWatchServiceWithNoResponseFromServer tests the case where the -// xDS server does not respond to the requests being sent out as part of -// registering a service update watcher. The underlying v2Client will timeout -// and will send us an error. -func TestWatchServiceWithNoResponseFromServer(t *testing.T) { - fakeServer, cleanup, err := fakeserver.StartServer() - if err != nil { - t.Fatalf("Failed to start fake xDS server: %v", err) - } - defer cleanup() - - xdsClient, err := New(clientOpts(fakeServer.Address)) - if err != nil { - t.Fatalf("New returned error: %v", err) - } - defer xdsClient.Close() - t.Log("Created an xdsClient...") - - oldWatchExpiryTimeout := defaultWatchExpiryTimeout - defaultWatchExpiryTimeout = 500 * time.Millisecond - defer func() { - defaultWatchExpiryTimeout = oldWatchExpiryTimeout - }() - - callbackCh := testutils.NewChannel() - cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) { - if su.Cluster != "" { - callbackCh.Send(fmt.Errorf("got clusterName: %+v, want empty clusterName", su.Cluster)) - return - } - if err == nil { - callbackCh.Send(errors.New("xdsClient.WatchService returned error non-nil error")) - return - } - callbackCh.Send(nil) - }) - defer cancelWatch() - t.Log("Registered a watcher for service updates...") - - // Wait for one request from the client, but send no reponses. - if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { - t.Fatalf("Timeout expired when expecting an LDS request") - } - waitForNilErr(t, callbackCh) -} - -// TestWatchServiceEmptyRDS tests the case where the underlying -// v2Client receives an empty RDS response. -func TestWatchServiceEmptyRDS(t *testing.T) { - fakeServer, cleanup, err := fakeserver.StartServer() - if err != nil { - t.Fatalf("Failed to start fake xDS server: %v", err) - } - defer cleanup() - - xdsClient, err := New(clientOpts(fakeServer.Address)) - if err != nil { - t.Fatalf("New returned error: %v", err) - } - defer xdsClient.Close() - t.Log("Created an xdsClient...") - - oldWatchExpiryTimeout := defaultWatchExpiryTimeout - defaultWatchExpiryTimeout = 500 * time.Millisecond - defer func() { - defaultWatchExpiryTimeout = oldWatchExpiryTimeout - }() - - callbackCh := testutils.NewChannel() - cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) { - if su.Cluster != "" { - callbackCh.Send(fmt.Errorf("got clusterName: %+v, want empty clusterName", su.Cluster)) - return - } - if err == nil { - callbackCh.Send(errors.New("xdsClient.WatchService returned error non-nil error")) - return - } - callbackCh.Send(nil) - }) - defer cancelWatch() - t.Log("Registered a watcher for service updates...") - - // Make the fakeServer send LDS response. - if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { - t.Fatalf("Timeout expired when expecting an LDS request") - } - fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1} - - // Make the fakeServer send an empty RDS response. - if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { - t.Fatalf("Timeout expired when expecting an RDS request") - } - fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: noVirtualHostsInRDSResponse} - waitForNilErr(t, callbackCh) -} - -// TestWatchServiceWithClientClose tests the case where xDS responses are -// received after the client is closed, and we make sure that the registered -// watcher callback is not invoked. -func TestWatchServiceWithClientClose(t *testing.T) { - fakeServer, cleanup, err := fakeserver.StartServer() - if err != nil { - t.Fatalf("Failed to start fake xDS server: %v", err) - } - defer cleanup() - - xdsClient, err := New(clientOpts(fakeServer.Address)) - if err != nil { - t.Fatalf("New returned error: %v", err) - } - defer xdsClient.Close() - t.Log("Created an xdsClient...") - - callbackCh := testutils.NewChannel() - cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) { - callbackCh.Send(errors.New("watcher callback invoked after client close")) - }) - defer cancelWatch() - t.Log("Registered a watcher for service updates...") - - // Make the fakeServer send LDS response. - if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { - t.Fatalf("Timeout expired when expecting an LDS request") - } - fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1} - - xdsClient.Close() - t.Log("Closing the xdsClient...") - - // Push an RDS response from the fakeserver - fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodRDSResponse1} - if cbErr, err := callbackCh.Receive(); err != testutils.ErrRecvTimeout { - t.Fatal(cbErr) - } - -} diff --git a/plugin/traffic/xds/eds.go b/plugin/traffic/xds/eds.go deleted file mode 100644 index e97297f97..000000000 --- a/plugin/traffic/xds/eds.go +++ /dev/null @@ -1,207 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package xds - -import ( - "fmt" - "net" - "strconv" - - xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" - corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" - endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" - typepb "github.com/envoyproxy/go-control-plane/envoy/type" - "github.com/golang/protobuf/ptypes" -) - -// OverloadDropConfig contains the config to drop overloads. -type OverloadDropConfig struct { - Category string - Numerator uint32 - Denominator uint32 -} - -// EndpointHealthStatus represents the health status of an endpoint. -type EndpointHealthStatus int32 - -const ( - // EndpointHealthStatusUnknown represents HealthStatus UNKNOWN. - EndpointHealthStatusUnknown EndpointHealthStatus = iota - // EndpointHealthStatusHealthy represents HealthStatus HEALTHY. - EndpointHealthStatusHealthy - // EndpointHealthStatusUnhealthy represents HealthStatus UNHEALTHY. - EndpointHealthStatusUnhealthy - // EndpointHealthStatusDraining represents HealthStatus DRAINING. - EndpointHealthStatusDraining - // EndpointHealthStatusTimeout represents HealthStatus TIMEOUT. - EndpointHealthStatusTimeout - // EndpointHealthStatusDegraded represents HealthStatus DEGRADED. - EndpointHealthStatusDegraded -) - -// Endpoint contains information of an endpoint. -type Endpoint struct { - Address string - HealthStatus EndpointHealthStatus - Weight uint32 -} - -// Locality contains information of a locality. -type Locality struct { - Endpoints []Endpoint - ID LocalityID - Priority uint32 - Weight uint32 -} - -// EDSUpdate contains an EDS update. -type EDSUpdate struct { - Drops []OverloadDropConfig - Localities []Locality -} - -func parseAddress(socketAddress *corepb.SocketAddress) string { - return net.JoinHostPort(socketAddress.GetAddress(), strconv.Itoa(int(socketAddress.GetPortValue()))) -} - -func parseDropPolicy(dropPolicy *xdspb.ClusterLoadAssignment_Policy_DropOverload) OverloadDropConfig { - percentage := dropPolicy.GetDropPercentage() - var ( - numerator = percentage.GetNumerator() - denominator uint32 - ) - switch percentage.GetDenominator() { - case typepb.FractionalPercent_HUNDRED: - denominator = 100 - case typepb.FractionalPercent_TEN_THOUSAND: - denominator = 10000 - case typepb.FractionalPercent_MILLION: - denominator = 1000000 - } - return OverloadDropConfig{ - Category: dropPolicy.GetCategory(), - Numerator: numerator, - Denominator: denominator, - } -} - -func parseEndpoints(lbEndpoints []*endpointpb.LbEndpoint) []Endpoint { - endpoints := make([]Endpoint, 0, len(lbEndpoints)) - for _, lbEndpoint := range lbEndpoints { - endpoints = append(endpoints, Endpoint{ - HealthStatus: EndpointHealthStatus(lbEndpoint.GetHealthStatus()), - Address: parseAddress(lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress()), - Weight: lbEndpoint.GetLoadBalancingWeight().GetValue(), - }) - } - return endpoints -} - -// ParseEDSRespProto turns EDS response proto message to EDSUpdate. -// -// This is temporarily exported to be used in eds balancer, before it switches -// to use xds client. TODO: unexport. -func ParseEDSRespProto(m *xdspb.ClusterLoadAssignment) (*EDSUpdate, error) { - ret := &EDSUpdate{} - for _, dropPolicy := range m.GetPolicy().GetDropOverloads() { - ret.Drops = append(ret.Drops, parseDropPolicy(dropPolicy)) - } - priorities := make(map[uint32]struct{}) - for _, locality := range m.Endpoints { - l := locality.GetLocality() - if l == nil { - return nil, fmt.Errorf("EDS response contains a locality without ID, locality: %+v", locality) - } - lid := LocalityID{Region: l.Region, Zone: l.Zone, SubZone: l.SubZone} - priority := locality.GetPriority() - priorities[priority] = struct{}{} - ret.Localities = append(ret.Localities, Locality{ - ID: lid, - Endpoints: parseEndpoints(locality.GetLbEndpoints()), - Weight: locality.GetLoadBalancingWeight().GetValue(), - Priority: priority, - }) - } - for i := 0; i < len(priorities); i++ { - if _, ok := priorities[uint32(i)]; !ok { - return nil, fmt.Errorf("priority %v missing (with different priorities %v received)", i, priorities) - } - } - return ret, nil -} - -// ParseEDSRespProtoForTesting parses EDS response, and panic if parsing fails. -// This is used by EDS balancer tests. -// -// TODO: delete this. The EDS balancer should build an EDSUpdate directly, -// instead of building and parsing a proto message. -func ParseEDSRespProtoForTesting(m *xdspb.ClusterLoadAssignment) *EDSUpdate { - u, err := ParseEDSRespProto(m) - if err != nil { - panic(err.Error()) - } - return u -} - -func (v2c *v2Client) handleEDSResponse(resp *xdspb.DiscoveryResponse) error { - v2c.mu.Lock() - defer v2c.mu.Unlock() - - wi := v2c.watchMap[edsURL] - if wi == nil { - return fmt.Errorf("xds: no EDS watcher found when handling EDS response: %+v", resp) - } - - var returnUpdate *EDSUpdate - for _, r := range resp.GetResources() { - var resource ptypes.DynamicAny - if err := ptypes.UnmarshalAny(r, &resource); err != nil { - return fmt.Errorf("xds: failed to unmarshal resource in EDS response: %v", err) - } - cla, ok := resource.Message.(*xdspb.ClusterLoadAssignment) - if !ok { - return fmt.Errorf("xds: unexpected resource type: %T in EDS response", resource.Message) - } - - if cla.GetClusterName() != wi.target[0] { - log.Warningf("xds: got uninteresting EDS resource, got %s, want %s", cla.GetClusterName(), wi.target[0]) - // We won't validate the remaining resources. If one of the - // uninteresting ones is invalid, we will still ACK the response. - continue - } - - u, err := ParseEDSRespProto(cla) - if err != nil { - return err - } - - returnUpdate = u - // Break from the loop because the request resource is found. But - // this also means we won't validate the remaining resources. If one - // of the uninteresting ones is invalid, we will still ACK the - // response. - break - } - - if returnUpdate != nil { - wi.stopTimer() - wi.callback.(edsCallback)(returnUpdate, nil) - } - - return nil -} diff --git a/plugin/traffic/xds/eds_test.go b/plugin/traffic/xds/eds_test.go deleted file mode 100644 index 874fa1f30..000000000 --- a/plugin/traffic/xds/eds_test.go +++ /dev/null @@ -1,287 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package xds - -import ( - "errors" - "fmt" - "testing" - "time" - - xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" - corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" - "github.com/golang/protobuf/ptypes" - anypb "github.com/golang/protobuf/ptypes/any" - "github.com/google/go-cmp/cmp" - "google.golang.org/grpc/xds/internal/testutils" -) - -func TestEDSParseRespProto(t *testing.T) { - tests := []struct { - name string - m *xdspb.ClusterLoadAssignment - want *EDSUpdate - wantErr bool - }{ - { - name: "missing-priority", - m: func() *xdspb.ClusterLoadAssignment { - clab0 := NewClusterLoadAssignmentBuilder("test", nil) - clab0.AddLocality("locality-1", 1, 0, []string{"addr1:314"}, nil) - clab0.AddLocality("locality-2", 1, 2, []string{"addr2:159"}, nil) - return clab0.Build() - }(), - want: nil, - wantErr: true, - }, - { - name: "missing-locality-ID", - m: func() *xdspb.ClusterLoadAssignment { - clab0 := NewClusterLoadAssignmentBuilder("test", nil) - clab0.AddLocality("", 1, 0, []string{"addr1:314"}, nil) - return clab0.Build() - }(), - want: nil, - wantErr: true, - }, - { - name: "good", - m: func() *xdspb.ClusterLoadAssignment { - clab0 := NewClusterLoadAssignmentBuilder("test", nil) - clab0.AddLocality("locality-1", 1, 1, []string{"addr1:314"}, &AddLocalityOptions{ - Health: []corepb.HealthStatus{corepb.HealthStatus_UNHEALTHY}, - Weight: []uint32{271}, - }) - clab0.AddLocality("locality-2", 1, 0, []string{"addr2:159"}, &AddLocalityOptions{ - Health: []corepb.HealthStatus{corepb.HealthStatus_DRAINING}, - Weight: []uint32{828}, - }) - return clab0.Build() - }(), - want: &EDSUpdate{ - Drops: nil, - Localities: []Locality{ - { - Endpoints: []Endpoint{{ - Address: "addr1:314", - HealthStatus: EndpointHealthStatusUnhealthy, - Weight: 271, - }}, - ID: Locality{SubZone: "locality-1"}, - Priority: 1, - Weight: 1, - }, - { - Endpoints: []Endpoint{{ - Address: "addr2:159", - HealthStatus: EndpointHealthStatusDraining, - Weight: 828, - }}, - ID: Locality{SubZone: "locality-2"}, - Priority: 0, - Weight: 1, - }, - }, - }, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := ParseEDSRespProto(tt.m) - if (err != nil) != tt.wantErr { - t.Errorf("ParseEDSRespProto() error = %v, wantErr %v", err, tt.wantErr) - return - } - if d := cmp.Diff(got, tt.want); d != "" { - t.Errorf("ParseEDSRespProto() got = %v, want %v, diff: %v", got, tt.want, d) - } - }) - } -} - -var ( - badlyMarshaledEDSResponse = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - { - TypeUrl: edsURL, - Value: []byte{1, 2, 3, 4}, - }, - }, - TypeUrl: edsURL, - } - badResourceTypeInEDSResponse = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - { - TypeUrl: httpConnManagerURL, - Value: marshaledConnMgr1, - }, - }, - TypeUrl: edsURL, - } - goodEDSResponse1 = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - func() *anypb.Any { - clab0 := NewClusterLoadAssignmentBuilder(goodEDSName, nil) - clab0.AddLocality("locality-1", 1, 1, []string{"addr1:314"}, nil) - clab0.AddLocality("locality-2", 1, 0, []string{"addr2:159"}, nil) - a, _ := ptypes.MarshalAny(clab0.Build()) - return a - }(), - }, - TypeUrl: edsURL, - } - goodEDSResponse2 = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - func() *anypb.Any { - clab0 := NewClusterLoadAssignmentBuilder("not-goodEDSName", nil) - clab0.AddLocality("locality-1", 1, 1, []string{"addr1:314"}, nil) - clab0.AddLocality("locality-2", 1, 0, []string{"addr2:159"}, nil) - a, _ := ptypes.MarshalAny(clab0.Build()) - return a - }(), - }, - TypeUrl: edsURL, - } -) - -func TestEDSHandleResponse(t *testing.T) { - fakeServer, cc, cleanup := startServerAndGetCC(t) - defer cleanup() - - v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) - defer v2c.close() - - tests := []struct { - name string - edsResponse *xdspb.DiscoveryResponse - wantErr bool - wantUpdate *EDSUpdate - wantUpdateErr bool - }{ - // Any in resource is badly marshaled. - { - name: "badly-marshaled_response", - edsResponse: badlyMarshaledEDSResponse, - wantErr: true, - wantUpdate: nil, - wantUpdateErr: false, - }, - // Response doesn't contain resource with the right type. - { - name: "no-config-in-response", - edsResponse: badResourceTypeInEDSResponse, - wantErr: true, - wantUpdate: nil, - wantUpdateErr: false, - }, - // Response contains one uninteresting ClusterLoadAssignment. - { - name: "one-uninterestring-assignment", - edsResponse: goodEDSResponse2, - wantErr: false, - wantUpdate: nil, - wantUpdateErr: false, - }, - // Response contains one good ClusterLoadAssignment. - { - name: "one-good-assignment", - edsResponse: goodEDSResponse1, - wantErr: false, - wantUpdate: &EDSUpdate{ - Localities: []Locality{ - { - Endpoints: []Endpoint{{Address: "addr1:314"}}, - ID: Locality{SubZone: "locality-1"}, - Priority: 1, - Weight: 1, - }, - { - Endpoints: []Endpoint{{Address: "addr2:159"}}, - ID: Locality{SubZone: "locality-2"}, - Priority: 0, - Weight: 1, - }, - }, - }, - wantUpdateErr: false, - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - testWatchHandle(t, &watchHandleTestcase{ - responseToHandle: test.edsResponse, - wantHandleErr: test.wantErr, - wantUpdate: test.wantUpdate, - wantUpdateErr: test.wantUpdateErr, - - edsWatch: v2c.watchEDS, - watchReqChan: fakeServer.XDSRequestChan, - handleXDSResp: v2c.handleEDSResponse, - }) - }) - } -} - -// TestEDSHandleResponseWithoutWatch tests the case where the v2Client -// receives an EDS response without a registered EDS watcher. -func TestEDSHandleResponseWithoutWatch(t *testing.T) { - _, cc, cleanup := startServerAndGetCC(t) - defer cleanup() - - v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) - defer v2c.close() - - if v2c.handleEDSResponse(goodEDSResponse1) == nil { - t.Fatal("v2c.handleEDSResponse() succeeded, should have failed") - } -} - -func TestEDSWatchExpiryTimer(t *testing.T) { - oldWatchExpiryTimeout := defaultWatchExpiryTimeout - defaultWatchExpiryTimeout = 500 * time.Millisecond - defer func() { - defaultWatchExpiryTimeout = oldWatchExpiryTimeout - }() - - fakeServer, cc, cleanup := startServerAndGetCC(t) - defer cleanup() - - v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) - defer v2c.close() - t.Log("Started xds v2Client...") - - callbackCh := testutils.NewChannel() - v2c.watchEDS(goodRouteName1, func(u *EDSUpdate, err error) { - t.Logf("Received callback with edsUpdate {%+v} and error {%v}", u, err) - if u != nil { - callbackCh.Send(fmt.Errorf("received EDSUpdate %v in edsCallback, wanted nil", u)) - } - if err == nil { - callbackCh.Send(errors.New("received nil error in edsCallback")) - } - callbackCh.Send(nil) - }) - - // Wait till the request makes it to the fakeServer. This ensures that - // the watch request has been processed by the v2Client. - if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { - t.Fatalf("Timeout expired when expecting an CDS request") - } - waitForNilErr(t, callbackCh) -} diff --git a/plugin/traffic/xds/eds_testutil.go b/plugin/traffic/xds/eds_testutil.go deleted file mode 100644 index 7ae03fcfa..000000000 --- a/plugin/traffic/xds/eds_testutil.go +++ /dev/null @@ -1,128 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// All structs/functions in this file should be unexported. They are used in EDS -// balancer tests now, to generate test inputs. Eventually, EDS balancer tests -// should generate EDSUpdate directly, instead of generating and parsing the -// proto message. -// TODO: unexported everything in this file. - -package xds - -import ( - "fmt" - "net" - "strconv" - - xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" - corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" - endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" - typepb "github.com/envoyproxy/go-control-plane/envoy/type" - wrapperspb "github.com/golang/protobuf/ptypes/wrappers" -) - -// ClusterLoadAssignmentBuilder builds a ClusterLoadAssignment, aka EDS -// response. -type ClusterLoadAssignmentBuilder struct { - v *xdspb.ClusterLoadAssignment -} - -// NewClusterLoadAssignmentBuilder creates a ClusterLoadAssignmentBuilder. -func NewClusterLoadAssignmentBuilder(clusterName string, dropPercents []uint32) *ClusterLoadAssignmentBuilder { - var drops []*xdspb.ClusterLoadAssignment_Policy_DropOverload - for i, d := range dropPercents { - drops = append(drops, &xdspb.ClusterLoadAssignment_Policy_DropOverload{ - Category: fmt.Sprintf("test-drop-%d", i), - DropPercentage: &typepb.FractionalPercent{ - Numerator: d, - Denominator: typepb.FractionalPercent_HUNDRED, - }, - }) - } - - return &ClusterLoadAssignmentBuilder{ - v: &xdspb.ClusterLoadAssignment{ - ClusterName: clusterName, - Policy: &xdspb.ClusterLoadAssignment_Policy{ - DropOverloads: drops, - }, - }, - } -} - -// AddLocalityOptions contains options when adding locality to the builder. -type AddLocalityOptions struct { - Health []corepb.HealthStatus - Weight []uint32 -} - -// AddLocality adds a locality to the builder. -func (clab *ClusterLoadAssignmentBuilder) AddLocality(subzone string, weight uint32, priority uint32, addrsWithPort []string, opts *AddLocalityOptions) { - var lbEndPoints []*endpointpb.LbEndpoint - for i, a := range addrsWithPort { - host, portStr, err := net.SplitHostPort(a) - if err != nil { - panic("failed to split " + a) - } - port, err := strconv.Atoi(portStr) - if err != nil { - panic("failed to atoi " + portStr) - } - - lbe := &endpointpb.LbEndpoint{ - HostIdentifier: &endpointpb.LbEndpoint_Endpoint{ - Endpoint: &endpointpb.Endpoint{ - Address: &corepb.Address{ - Address: &corepb.Address_SocketAddress{ - SocketAddress: &corepb.SocketAddress{ - Protocol: corepb.SocketAddress_TCP, - Address: host, - PortSpecifier: &corepb.SocketAddress_PortValue{ - PortValue: uint32(port)}}}}}}, - } - if opts != nil { - if i < len(opts.Health) { - lbe.HealthStatus = opts.Health[i] - } - if i < len(opts.Weight) { - lbe.LoadBalancingWeight = &wrapperspb.UInt32Value{Value: opts.Weight[i]} - } - } - lbEndPoints = append(lbEndPoints, lbe) - } - - var localityID *corepb.Locality - if subzone != "" { - localityID = &corepb.Locality{ - Region: "", - Zone: "", - SubZone: subzone, - } - } - - clab.v.Endpoints = append(clab.v.Endpoints, &endpointpb.LocalityLbEndpoints{ - Locality: localityID, - LbEndpoints: lbEndPoints, - LoadBalancingWeight: &wrapperspb.UInt32Value{Value: weight}, - Priority: priority, - }) -} - -// Build builds ClusterLoadAssignment. -func (clab *ClusterLoadAssignmentBuilder) Build() *xdspb.ClusterLoadAssignment { - return clab.v -} diff --git a/plugin/traffic/xds/locality.go b/plugin/traffic/xds/locality.go deleted file mode 100644 index 8a8324228..000000000 --- a/plugin/traffic/xds/locality.go +++ /dev/null @@ -1,50 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package xds - -import ( - "fmt" - - corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" -) - -// Locality is xds.Locality without XXX fields, so it can be used as map -// keys. -// -// xds.Locality cannot be map keys because one of the XXX fields is a slice. -// -// This struct should only be used as map keys. Use the proto message directly -// in all other places. -type LocalityID struct { - Region string - Zone string - SubZone string -} - -func (l LocalityID) String() string { - return fmt.Sprintf("%s-%s-%s", l.Region, l.Zone, l.SubZone) -} - -// ToProto convert Locality to the proto representation. -func (l LocalityID) ToProto() *corepb.Locality { - return &corepb.Locality{ - Region: l.Region, - Zone: l.Zone, - SubZone: l.SubZone, - } -} diff --git a/plugin/traffic/xds/log.go b/plugin/traffic/xds/log.go deleted file mode 100644 index d11cfa9fe..000000000 --- a/plugin/traffic/xds/log.go +++ /dev/null @@ -1,7 +0,0 @@ -package xds - -import ( - clog "github.com/coredns/coredns/plugin/pkg/log" -) - -var log = clog.NewWithPlugin("traffic") diff --git a/plugin/traffic/xds/testutil_test.go b/plugin/traffic/xds/testutil_test.go deleted file mode 100644 index b05acbd35..000000000 --- a/plugin/traffic/xds/testutil_test.go +++ /dev/null @@ -1,169 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package xds - -import ( - "reflect" - "testing" - - xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" - "github.com/google/go-cmp/cmp" - "google.golang.org/grpc" - "google.golang.org/grpc/xds/internal/testutils" - "google.golang.org/grpc/xds/internal/testutils/fakeserver" -) - -type watchHandleTestcase struct { - responseToHandle *xdspb.DiscoveryResponse - wantHandleErr bool - wantUpdate interface{} - wantUpdateErr bool - - // Only one of the following should be non-nil. The one corresponding with - // typeURL will be called. - ldsWatch func(target string, ldsCb ldsCallback) (cancel func()) - rdsWatch func(routeName string, rdsCb rdsCallback) (cancel func()) - cdsWatch func(clusterName string, cdsCb cdsCallback) (cancel func()) - edsWatch func(clusterName string, edsCb edsCallback) (cancel func()) - watchReqChan *testutils.Channel // The request sent for watch will be sent to this channel. - handleXDSResp func(response *xdspb.DiscoveryResponse) error -} - -// testWatchHandle is called to test response handling for each xDS. -// -// It starts the xDS watch as configured in test, waits for the fake xds server -// to receive the request (so watch callback is installed), and calls -// handleXDSResp with responseToHandle (if it's set). It then compares the -// update received by watch callback with the expected results. -func testWatchHandle(t *testing.T, test *watchHandleTestcase) { - type updateErr struct { - u interface{} - err error - } - gotUpdateCh := testutils.NewChannel() - - var cancelWatch func() - // Register the watcher, this will also trigger the v2Client to send the xDS - // request. - switch { - case test.ldsWatch != nil: - cancelWatch = test.ldsWatch(goodLDSTarget1, func(u ldsUpdate, err error) { - t.Logf("in v2c.watchLDS callback, ldsUpdate: %+v, err: %v", u, err) - gotUpdateCh.Send(updateErr{u, err}) - }) - case test.rdsWatch != nil: - cancelWatch = test.rdsWatch(goodRouteName1, func(u rdsUpdate, err error) { - t.Logf("in v2c.watchRDS callback, rdsUpdate: %+v, err: %v", u, err) - gotUpdateCh.Send(updateErr{u, err}) - }) - case test.cdsWatch != nil: - cancelWatch = test.cdsWatch(clusterName1, func(u CDSUpdate, err error) { - t.Logf("in v2c.watchCDS callback, cdsUpdate: %+v, err: %v", u, err) - gotUpdateCh.Send(updateErr{u, err}) - }) - case test.edsWatch != nil: - cancelWatch = test.edsWatch(goodEDSName, func(u *EDSUpdate, err error) { - t.Logf("in v2c.watchEDS callback, edsUpdate: %+v, err: %v", u, err) - gotUpdateCh.Send(updateErr{*u, err}) - }) - default: - t.Fatalf("no watch() is set") - } - defer cancelWatch() - - // Wait till the request makes it to the fakeServer. This ensures that - // the watch request has been processed by the v2Client. - if _, err := test.watchReqChan.Receive(); err != nil { - t.Fatalf("Timeout waiting for an xDS request: %v", err) - } - - // Directly push the response through a call to handleXDSResp. This bypasses - // the fakeServer, so it's only testing the handle logic. Client response - // processing is covered elsewhere. - // - // Also note that this won't trigger ACK, so there's no need to clear the - // request channel afterwards. - if err := test.handleXDSResp(test.responseToHandle); (err != nil) != test.wantHandleErr { - t.Fatalf("v2c.handleRDSResponse() returned err: %v, wantErr: %v", err, test.wantHandleErr) - } - - // If the test doesn't expect the callback to be invoked, verify that no - // update or error is pushed to the callback. - // - // Cannot directly compare test.wantUpdate with nil (typed vs non-typed nil: - // https://golang.org/doc/faq#nil_error). - if c := test.wantUpdate; c == nil || (reflect.ValueOf(c).Kind() == reflect.Ptr && reflect.ValueOf(c).IsNil()) { - update, err := gotUpdateCh.Receive() - if err == testutils.ErrRecvTimeout { - return - } - t.Fatalf("Unexpected update: +%v", update) - } - - wantUpdate := reflect.ValueOf(test.wantUpdate).Elem().Interface() - uErr, err := gotUpdateCh.Receive() - if err == testutils.ErrRecvTimeout { - t.Fatal("Timeout expecting xDS update") - } - gotUpdate := uErr.(updateErr).u - opt := cmp.AllowUnexported(rdsUpdate{}, ldsUpdate{}, CDSUpdate{}, EDSUpdate{}) - if diff := cmp.Diff(gotUpdate, wantUpdate, opt); diff != "" { - t.Fatalf("got update : %+v, want %+v, diff: %s", gotUpdate, wantUpdate, diff) - } - gotUpdateErr := uErr.(updateErr).err - if (gotUpdateErr != nil) != test.wantUpdateErr { - t.Fatalf("got xDS update error {%v}, wantErr: %v", gotUpdateErr, test.wantUpdateErr) - } -} - -// startServerAndGetCC starts a fake XDS server and also returns a ClientConn -// connected to it. -func startServerAndGetCC(t *testing.T) (*fakeserver.Server, *grpc.ClientConn, func()) { - t.Helper() - - fs, sCleanup, err := fakeserver.StartServer() - if err != nil { - t.Fatalf("Failed to start fake xDS server: %v", err) - } - - cc, ccCleanup, err := fs.XDSClientConn() - if err != nil { - sCleanup() - t.Fatalf("Failed to get a clientConn to the fake xDS server: %v", err) - } - return fs, cc, func() { - sCleanup() - ccCleanup() - } -} - -// waitForNilErr waits for a nil error value to be received on the -// provided channel. -func waitForNilErr(t *testing.T, ch *testutils.Channel) { - t.Helper() - - val, err := ch.Receive() - if err == testutils.ErrRecvTimeout { - t.Fatalf("Timeout expired when expecting update") - } - if val != nil { - if cbErr := val.(error); cbErr != nil { - t.Fatal(cbErr) - } - } -} diff --git a/plugin/traffic/xds/types.go b/plugin/traffic/xds/types.go deleted file mode 100644 index c063233bf..000000000 --- a/plugin/traffic/xds/types.go +++ /dev/null @@ -1,83 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package xds - -import ( - "time" - - adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" -) - -type adsStream adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient - -const ( - cdsURL = "type.googleapis.com/envoy.api.v2.Cluster" - edsURL = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment" -) - -// watchState is an enum to represent the state of a watch call. -type watchState int - -const ( - watchEnqueued watchState = iota - watchCancelled - watchStarted -) - -// watchInfo holds all the information about a watch call. -type watchInfo struct { - typeURL string - target []string - state watchState - - callback interface{} - expiryTimer *time.Timer -} - -// cancel marks the state as cancelled, and also stops the expiry timer. -func (wi *watchInfo) cancel() { - wi.state = watchCancelled - if wi.expiryTimer != nil { - wi.expiryTimer.Stop() - } -} - -// stopTimer stops the expiry timer without cancelling the watch. -func (wi *watchInfo) stopTimer() { - if wi.expiryTimer != nil { - wi.expiryTimer.Stop() - } -} - -type ackInfo struct { - typeURL string - version string // Nack if version is an empty string. - nonce string -} - -// CDSUpdate contains information from a received CDS response, which is of -// interest to the registered CDS watcher. -type CDSUpdate struct { - // ServiceName is the service name corresponding to the clusterName which - // is being watched for through CDS. - ServiceName string -} -type cdsCallback func(CDSUpdate, error) - -type edsCallback func(*EDSUpdate, error) diff --git a/plugin/traffic/xds/v2client.go b/plugin/traffic/xds/v2client.go deleted file mode 100644 index 9dfe83eb1..000000000 --- a/plugin/traffic/xds/v2client.go +++ /dev/null @@ -1,440 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package xds - -import ( - "context" - "fmt" - "sync" - "time" - - "github.com/coredns/coredns/plugin/traffic/xds/buffer" - - xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" - corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" - adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" - "google.golang.org/grpc" -) - -// The value chosen here is based on the default value of the -// initial_fetch_timeout field in corepb.ConfigSource proto. -var defaultWatchExpiryTimeout = 15 * time.Second - -// v2Client performs the actual xDS RPCs using the xDS v2 API. It creates a -// single ADS stream on which the different types of xDS requests and responses -// are multiplexed. -// The reason for splitting this out from the top level xdsClient object is -// because there is already an xDS v3Aplha API in development. If and when we -// want to switch to that, this separation will ease that process. -type v2Client struct { - ctx context.Context - cancelCtx context.CancelFunc - - // ClientConn to the xDS gRPC server. Owned by the parent xdsClient. - cc *grpc.ClientConn - nodeProto *corepb.Node - backoff func(int) time.Duration - - // sendCh in the channel onto which watchInfo objects are pushed by the - // watch API, and it is read and acted upon by the send() goroutine. - sendCh *buffer.Unbounded - - mu sync.Mutex - // Message specific watch infos, protected by the above mutex. These are - // written to, after successfully reading from the update channel, and are - // read from when recovering from a broken stream to resend the xDS - // messages. When the user of this client object cancels a watch call, - // these are set to nil. All accesses to the map protected and any value - // inside the map should be protected with the above mutex. - watchMap map[string]*watchInfo - // ackMap contains the version that was acked (the version in the ack - // request that was sent on wire). The key is typeURL, the value is the - // version string, becaues the versions for different resource types - // should be independent. - ackMap map[string]string - // rdsCache maintains a mapping of {clusterName --> CDSUpdate} from - // validated cluster configurations received in CDS responses. We cache all - // valid cluster configurations, whether or not we are interested in them - // when we received them (because we could become interested in them in the - // future and the server wont send us those resources again). This is only - // to support legacy management servers that do not honor the - // resource_names field. As per the latest spec, the server should resend - // the response when the request changes, even if it had sent the same - // resource earlier (when not asked for). Protected by the above mutex. - cdsCache map[string]CDSUpdate -} - -// newV2Client creates a new v2Client initialized with the passed arguments. -func newV2Client(cc *grpc.ClientConn, nodeProto *corepb.Node, backoff func(int) time.Duration) *v2Client { - v2c := &v2Client{ - cc: cc, - nodeProto: nodeProto, - backoff: backoff, - sendCh: buffer.NewUnbounded(), - watchMap: make(map[string]*watchInfo), - ackMap: make(map[string]string), - cdsCache: make(map[string]CDSUpdate), - } - v2c.ctx, v2c.cancelCtx = context.WithCancel(context.Background()) - - go v2c.run() - return v2c -} - -// close cleans up resources and goroutines allocated by this client. -func (v2c *v2Client) close() { - v2c.cancelCtx() -} - -// run starts an ADS stream (and backs off exponentially, if the previous -// stream failed without receiving a single reply) and runs the sender and -// receiver routines to send and receive data from the stream respectively. -func (v2c *v2Client) run() { - retries := 0 - for { - select { - case <-v2c.ctx.Done(): - return - default: - } - - if retries != 0 { - t := time.NewTimer(v2c.backoff(retries)) - select { - case <-t.C: - case <-v2c.ctx.Done(): - if !t.Stop() { - <-t.C - } - return - } - } - - retries++ - cli := adsgrpc.NewAggregatedDiscoveryServiceClient(v2c.cc) - stream, err := cli.StreamAggregatedResources(v2c.ctx) //, grpc.WaitForReady(true)) - if err != nil { - log.Infof("xds: ADS stream creation failed: %v", err) - continue - } - - // send() could be blocked on reading updates from the different update - // channels when it is not actually sending out messages. So, we need a - // way to break out of send() when recv() returns. This done channel is - // used to for that purpose. - done := make(chan struct{}) - go v2c.send(stream, done) - if v2c.recv(stream) { - retries = 0 - } - close(done) - } -} - -// endRequest sends a request for provided typeURL and resource on the provided -// stream. -// -// version is the ack version to be sent with the request -// - If this is the new request (not an ack/nack), version will be an empty -// string -// - If this is an ack, version will be the version from the response -// - If this is a nack, version will be the previous acked version (from -// ackMap). If there was no ack before, it will be an empty string -func (v2c *v2Client) sendRequest(stream adsStream, resourceNames []string, typeURL, version, nonce string) bool { - req := &xdspb.DiscoveryRequest{ - Node: v2c.nodeProto, - TypeUrl: typeURL, - ResourceNames: resourceNames, - VersionInfo: version, - ResponseNonce: nonce, - // TODO: populate ErrorDetails for nack. - } - println("v2: sendrequest", typeURL) - if err := stream.Send(req); err != nil { - log.Warningf("xds: request (type %s) for resource %v failed: %v", typeURL, resourceNames, err) - return false - } - return true -} - -// sendExisting sends out xDS requests for registered watchers when recovering -// from a broken stream. -// -// We call stream.Send() here with the lock being held. It should be OK to do -// that here because the stream has just started and Send() usually returns -// quickly (once it pushes the message onto the transport layer) and is only -// ever blocked if we don't have enough flow control quota. -func (v2c *v2Client) sendExisting(stream adsStream) bool { - println("v2: sendexisting") - v2c.mu.Lock() - defer v2c.mu.Unlock() - - // Reset the ack versions when the stream restarts. - v2c.ackMap = make(map[string]string) - - for typeURL, wi := range v2c.watchMap { - if !v2c.sendRequest(stream, wi.target, typeURL, "", "") { - return false - } - } - - return true -} - -// processWatchInfo pulls the fields needed by the request from a watchInfo. -// -// It also calls callback with cached response, and updates the watch map in -// v2c. -// -// If the watch was already canceled, it returns false for send -func (v2c *v2Client) processWatchInfo(t *watchInfo) (target []string, typeURL, version, nonce string, send bool) { - v2c.mu.Lock() - defer v2c.mu.Unlock() - if t.state == watchCancelled { - return // This returns all zero values, and false for send. - } - t.state = watchStarted - send = true - - typeURL = t.typeURL - target = t.target - v2c.checkCacheAndUpdateWatchMap(t) - // TODO: if watch is called again with the same resource names, - // there's no need to send another request. - // - // TODO: should we reset version (for ack) when a new watch is - // started? Or do this only if the resource names are different - // (so we send a new request)? - return -} - -// processAckInfo pulls the fields needed by the ack request from a ackInfo. -// -// If no active watch is found for this ack, it returns false for send. -func (v2c *v2Client) processAckInfo(t *ackInfo) (target []string, typeURL, version, nonce string, send bool) { - typeURL = t.typeURL - - v2c.mu.Lock() - defer v2c.mu.Unlock() - wi, ok := v2c.watchMap[typeURL] - if !ok { - // We don't send the request ack if there's no active watch (this can be - // either the server sends responses before any request, or the watch is - // canceled while the ackInfo is in queue), because there's no resource - // name. And if we send a request with empty resource name list, the - // server may treat it as a wild card and send us everything. - log.Warningf("xds: ack (type %s) not sent because there's no active watch for the type", typeURL) - return // This returns all zero values, and false for send. - } - send = true - - version = t.version - nonce = t.nonce - target = wi.target - if version == "" { - // This is a nack, get the previous acked version. - version = v2c.ackMap[typeURL] - // version will still be an empty string if typeURL isn't - // found in ackMap, this can happen if there wasn't any ack - // before. - } else { - v2c.ackMap[typeURL] = version - } - return -} - -// send reads watch infos from update channel and sends out actual xDS requests -// on the provided ADS stream. -func (v2c *v2Client) send(stream adsStream, done chan struct{}) { - if !v2c.sendExisting(stream) { - println("not existing stream") - return - } - - println("in send") - - for { - select { - case <-v2c.ctx.Done(): - return - case u := <-v2c.sendCh.Get(): - v2c.sendCh.Load() - - var ( - target []string - typeURL, version, nonce string - send bool - ) - switch t := u.(type) { - case *watchInfo: - println("watchInfo") - target, typeURL, version, nonce, send = v2c.processWatchInfo(t) - println(target, typeURL, version, nonce, send) - fmt.Printf("%+v\n", target) - case *ackInfo: - println("ackInfo") - target, typeURL, version, nonce, send = v2c.processAckInfo(t) - } - if !send { - continue - } - if !v2c.sendRequest(stream, target, typeURL, version, nonce) { - return - } - case <-done: - return - } - } -} - -// recv receives xDS responses on the provided ADS stream and branches out to -// message specific handlers. -func (v2c *v2Client) recv(stream adsStream) bool { - println("v2 recv") - success := false - for { - println("WATIIGNM") - resp, err := stream.Recv() - // TODO: call watch callbacks with error when stream is broken. - println("DONE") - if err != nil { - log.Warningf("xds: ADS stream recv failed: %v", err) - return success - } - println("RECEIVING") - var respHandleErr error - switch resp.GetTypeUrl() { - case cdsURL: - println("CDS") - respHandleErr = v2c.handleCDSResponse(resp) - case edsURL: - println("EDS") - respHandleErr = v2c.handleEDSResponse(resp) - default: - log.Warningf("xds: unknown response URL type: %v", resp.GetTypeUrl()) - continue - } - - typeURL := resp.GetTypeUrl() - if respHandleErr != nil { - log.Warningf("xds: response (type %s) handler failed: %v", typeURL, respHandleErr) - v2c.sendCh.Put(&ackInfo{ - typeURL: typeURL, - version: "", - nonce: resp.GetNonce(), - }) - continue - } - v2c.sendCh.Put(&ackInfo{ - typeURL: typeURL, - version: resp.GetVersionInfo(), - nonce: resp.GetNonce(), - }) - success = true - } -} - -// watchCDS registers an CDS watcher for the provided clusterName. Updates -// corresponding to received CDS responses will be pushed to the provided -// callback. The caller can cancel the watch by invoking the returned cancel -// function. -// The provided callback should not block or perform any expensive operations -// or call other methods of the v2Client object. -func (v2c *v2Client) watchCDS(clusterName string, cdsCb cdsCallback) (cancel func()) { - return v2c.watch(&watchInfo{ - typeURL: cdsURL, - target: []string{clusterName}, - callback: cdsCb, - }) -} - -// watchEDS registers an EDS watcher for the provided clusterName. Updates -// corresponding to received EDS responses will be pushed to the provided -// callback. The caller can cancel the watch by invoking the returned cancel -// function. -// The provided callback should not block or perform any expensive operations -// or call other methods of the v2Client object. -func (v2c *v2Client) watchEDS(clusterName string, edsCb edsCallback) (cancel func()) { - return v2c.watch(&watchInfo{ - typeURL: edsURL, - target: []string{clusterName}, - callback: edsCb, - }) - // TODO: Once a registered EDS watch is cancelled, we should send an EDS - // request with no resources. This will let the server know that we are no - // longer interested in this resource. -} - -func (v2c *v2Client) watch(wi *watchInfo) (cancel func()) { - v2c.sendCh.Put(wi) - return func() { - v2c.mu.Lock() - defer v2c.mu.Unlock() - if wi.state == watchEnqueued { - wi.state = watchCancelled - return - } - v2c.watchMap[wi.typeURL].cancel() - delete(v2c.watchMap, wi.typeURL) - // TODO: should we reset ack version string when cancelling the watch? - } -} - -// checkCacheAndUpdateWatchMap is called when a new watch call is handled in -// send(). If an existing watcher is found, its expiry timer is stopped. If the -// watchInfo to be added to the watchMap is found in the cache, the watcher -// callback is immediately invoked. -// -// Caller should hold v2c.mu -func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) { - if existing := v2c.watchMap[wi.typeURL]; existing != nil { - println("cancel") - existing.cancel() - } - - v2c.watchMap[wi.typeURL] = wi - switch wi.typeURL { - // We need to grab the lock inside of the expiryTimer's afterFunc because - // we need to access the watchInfo, which is stored in the watchMap. - case cdsURL: - clusterName := wi.target[0] - println("CDS URLS", clusterName) - if update, ok := v2c.cdsCache[clusterName]; ok { - println("UPDATE SEEN, ok") - - var err error - if v2c.watchMap[cdsURL] == nil { - err = fmt.Errorf("xds: no CDS watcher found when handling CDS watch for cluster {%v} from cache", clusterName) - } - wi.callback.(cdsCallback)(update, err) - return - } - wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { - v2c.mu.Lock() - wi.callback.(cdsCallback)(CDSUpdate{}, fmt.Errorf("xds: CDS target %s not found", wi.target)) - v2c.mu.Unlock() - }) - case edsURL: - wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { - v2c.mu.Lock() - wi.callback.(edsCallback)(nil, fmt.Errorf("xds: EDS target %s not found", wi.target)) - v2c.mu.Unlock() - }) - } -} diff --git a/plugin/traffic/xds/v2client_ack_test.go b/plugin/traffic/xds/v2client_ack_test.go deleted file mode 100644 index 942943f07..000000000 --- a/plugin/traffic/xds/v2client_ack_test.go +++ /dev/null @@ -1,263 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package xds - -import ( - "fmt" - "strconv" - "testing" - "time" - - xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" - "github.com/golang/protobuf/proto" - anypb "github.com/golang/protobuf/ptypes/any" - "github.com/google/go-cmp/cmp" - "google.golang.org/grpc/xds/internal/testutils" - "google.golang.org/grpc/xds/internal/testutils/fakeserver" -) - -// compareXDSRequest reads requests from channel, compare it with want. -func compareXDSRequest(ch *testutils.Channel, want *xdspb.DiscoveryRequest, version, nonce string) error { - val, err := ch.Receive() - if err != nil { - return err - } - req := val.(*fakeserver.Request) - if req.Err != nil { - return fmt.Errorf("unexpected error from request: %v", req.Err) - } - wantClone := proto.Clone(want).(*xdspb.DiscoveryRequest) - wantClone.VersionInfo = version - wantClone.ResponseNonce = nonce - if !cmp.Equal(req.Req, wantClone, cmp.Comparer(proto.Equal)) { - return fmt.Errorf("received request different from want, diff: %s", cmp.Diff(req.Req, wantClone)) - } - return nil -} - -func sendXDSRespWithVersion(ch chan<- *fakeserver.Response, respWithoutVersion *xdspb.DiscoveryResponse, version int) (nonce string) { - respToSend := proto.Clone(respWithoutVersion).(*xdspb.DiscoveryResponse) - respToSend.VersionInfo = strconv.Itoa(version) - nonce = strconv.Itoa(int(time.Now().UnixNano())) - respToSend.Nonce = nonce - ch <- &fakeserver.Response{Resp: respToSend} - return -} - -// startXDS calls watch to send the first request. It then sends a good response -// and checks for ack. -func startXDS(t *testing.T, xdsname string, v2c *v2Client, reqChan *testutils.Channel, req *xdspb.DiscoveryRequest) *testutils.Channel { - callbackCh := testutils.NewChannel() - switch xdsname { - case "LDS": - v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) { - t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err) - callbackCh.Send(struct{}{}) - }) - case "RDS": - v2c.watchRDS(goodRouteName1, func(u rdsUpdate, err error) { - t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err) - callbackCh.Send(struct{}{}) - }) - case "CDS": - v2c.watchCDS(goodClusterName1, func(u CDSUpdate, err error) { - t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err) - callbackCh.Send(struct{}{}) - }) - case "EDS": - v2c.watchEDS(goodEDSName, func(u *EDSUpdate, err error) { - t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err) - callbackCh.Send(struct{}{}) - }) - } - - if err := compareXDSRequest(reqChan, req, "", ""); err != nil { - t.Fatalf("Failed to receive %s request: %v", xdsname, err) - } - t.Logf("FakeServer received %s request...", xdsname) - return callbackCh -} - -// sendGoodResp sends the good response, with the given version, and a random -// nonce. -// -// It also waits and checks that the ack request contains the given version, and -// the generated nonce. -func sendGoodResp(t *testing.T, xdsname string, fakeServer *fakeserver.Server, version int, goodResp *xdspb.DiscoveryResponse, wantReq *xdspb.DiscoveryRequest, callbackCh *testutils.Channel) { - nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, goodResp, version) - t.Logf("Good %s response pushed to fakeServer...", xdsname) - - if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(version), nonce); err != nil { - t.Errorf("Failed to receive %s request: %v", xdsname, err) - } - t.Logf("Good %s response acked", xdsname) - - if _, err := callbackCh.Receive(); err != nil { - t.Errorf("Timeout when expecting %s update", xdsname) - } - t.Logf("Good %s response callback executed", xdsname) -} - -// sendBadResp sends a bad response with the given version. This response will -// be nacked, so we expect a request with the previous version (version-1). -// -// But the nonce in request should be the new nonce. -func sendBadResp(t *testing.T, xdsname string, fakeServer *fakeserver.Server, version int, wantReq *xdspb.DiscoveryRequest) { - var typeURL string - switch xdsname { - case "LDS": - typeURL = ldsURL - case "RDS": - typeURL = rdsURL - case "CDS": - typeURL = cdsURL - case "EDS": - typeURL = edsURL - } - nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{{}}, - TypeUrl: typeURL, - }, version) - t.Logf("Bad %s response pushed to fakeServer...", xdsname) - if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(version-1), nonce); err != nil { - t.Errorf("Failed to receive %s request: %v", xdsname, err) - } - t.Logf("Bad %s response nacked", xdsname) -} - -// TestV2ClientAck verifies that valid responses are acked, and invalid ones -// are nacked. -// -// This test also verifies the version for different types are independent. -func TestV2ClientAck(t *testing.T) { - var ( - versionLDS = 1000 - versionRDS = 2000 - versionCDS = 3000 - versionEDS = 4000 - ) - - fakeServer, cc, cleanup := startServerAndGetCC(t) - defer cleanup() - v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) - defer v2c.close() - t.Log("Started xds v2Client...") - - // Start the watch, send a good response, and check for ack. - cbLDS := startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest) - sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS) - versionLDS++ - cbRDS := startXDS(t, "RDS", v2c, fakeServer.XDSRequestChan, goodRDSRequest) - sendGoodResp(t, "RDS", fakeServer, versionRDS, goodRDSResponse1, goodRDSRequest, cbRDS) - versionRDS++ - cbCDS := startXDS(t, "CDS", v2c, fakeServer.XDSRequestChan, goodCDSRequest) - sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS) - versionCDS++ - cbEDS := startXDS(t, "EDS", v2c, fakeServer.XDSRequestChan, goodEDSRequest) - sendGoodResp(t, "EDS", fakeServer, versionEDS, goodEDSResponse1, goodEDSRequest, cbEDS) - versionEDS++ - - // Send a bad response, and check for nack. - sendBadResp(t, "LDS", fakeServer, versionLDS, goodLDSRequest) - versionLDS++ - sendBadResp(t, "RDS", fakeServer, versionRDS, goodRDSRequest) - versionRDS++ - sendBadResp(t, "CDS", fakeServer, versionCDS, goodCDSRequest) - versionCDS++ - sendBadResp(t, "EDS", fakeServer, versionEDS, goodEDSRequest) - versionEDS++ - - // send another good response, and check for ack, with the new version. - sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS) - versionLDS++ - sendGoodResp(t, "RDS", fakeServer, versionRDS, goodRDSResponse1, goodRDSRequest, cbRDS) - versionRDS++ - sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS) - versionCDS++ - sendGoodResp(t, "EDS", fakeServer, versionEDS, goodEDSResponse1, goodEDSRequest, cbEDS) - versionEDS++ -} - -// Test when the first response is invalid, and is nacked, the nack requests -// should have an empty version string. -func TestV2ClientAckFirstIsNack(t *testing.T) { - var versionLDS = 1000 - - fakeServer, cc, cleanup := startServerAndGetCC(t) - defer cleanup() - v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) - defer v2c.close() - t.Log("Started xds v2Client...") - - // Start the watch, send a good response, and check for ack. - cbLDS := startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest) - - nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{{}}, - TypeUrl: ldsURL, - }, versionLDS) - t.Logf("Bad response pushed to fakeServer...") - - // The expected version string is an empty string, because this is the first - // response, and it's nacked (so there's no previous ack version). - if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, "", nonce); err != nil { - t.Errorf("Failed to receive request: %v", err) - } - t.Logf("Bad response nacked") - versionLDS++ - - sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS) - versionLDS++ -} - -// Test when a nack is sent after a new watch, we nack with the previous acked -// version (instead of resetting to empty string). -func TestV2ClientAckNackAfterNewWatch(t *testing.T) { - var versionLDS = 1000 - - fakeServer, cc, cleanup := startServerAndGetCC(t) - defer cleanup() - v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) - defer v2c.close() - t.Log("Started xds v2Client...") - - // Start the watch, send a good response, and check for ack. - cbLDS := startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest) - sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS) - versionLDS++ - - // Start a new watch. - cbLDS = startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest) - - // This is an invalid response after the new watch. - nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{{}}, - TypeUrl: ldsURL, - }, versionLDS) - t.Logf("Bad response pushed to fakeServer...") - - // The expected version string is the previous acked version. - if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, strconv.Itoa(versionLDS-1), nonce); err != nil { - t.Errorf("Failed to receive request: %v", err) - } - t.Logf("Bad response nacked") - versionLDS++ - - sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS) - versionLDS++ -} diff --git a/plugin/traffic/xds/v2client_test.go b/plugin/traffic/xds/v2client_test.go deleted file mode 100644 index ff2773dba..000000000 --- a/plugin/traffic/xds/v2client_test.go +++ /dev/null @@ -1,444 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package xds - -import ( - "errors" - "testing" - "time" - - "github.com/golang/protobuf/proto" - "google.golang.org/grpc/xds/internal/testutils" - "google.golang.org/grpc/xds/internal/testutils/fakeserver" - - xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" - basepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" - routepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/route" - httppb "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/http_connection_manager/v2" - listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v2" - anypb "github.com/golang/protobuf/ptypes/any" - structpb "github.com/golang/protobuf/ptypes/struct" -) - -const ( - defaultTestTimeout = 1 * time.Second - goodLDSTarget1 = "lds.target.good:1111" - goodLDSTarget2 = "lds.target.good:2222" - goodRouteName1 = "GoodRouteConfig1" - goodRouteName2 = "GoodRouteConfig2" - goodEDSName = "GoodClusterAssignment1" - uninterestingRouteName = "UninterestingRouteName" - goodMatchingDomain = "lds.target.good" - uninterestingDomain = "uninteresting.domain" - goodClusterName1 = "GoodClusterName1" - goodClusterName2 = "GoodClusterName2" - uninterestingClusterName = "UninterestingClusterName" - httpConnManagerURL = "type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager" -) - -var ( - goodNodeProto = &basepb.Node{ - Id: "ENVOY_NODE_ID", - Metadata: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - "TRAFFICDIRECTOR_GRPC_HOSTNAME": { - Kind: &structpb.Value_StringValue{StringValue: "trafficdirector"}, - }, - }, - }, - } - goodLDSRequest = &xdspb.DiscoveryRequest{ - Node: goodNodeProto, - TypeUrl: ldsURL, - ResourceNames: []string{goodLDSTarget1}, - } - goodCDSRequest = &xdspb.DiscoveryRequest{ - Node: goodNodeProto, - TypeUrl: cdsURL, - ResourceNames: []string{goodClusterName1}, - } - goodEDSRequest = &xdspb.DiscoveryRequest{ - Node: goodNodeProto, - TypeUrl: edsURL, - ResourceNames: []string{goodEDSName}, - } - goodHTTPConnManager1 = &httppb.HttpConnectionManager{} - marshaledConnMgr1, _ = proto.Marshal(goodHTTPConnManager1) - emptyHTTPConnManager = &httppb.HttpConnectionManager{ - RouteSpecifier: &httppb.HttpConnectionManager_Rds{ - Rds: &httppb.Rds{}, - }, - } - emptyMarshaledConnMgr, _ = proto.Marshal(emptyHTTPConnManager) - connMgrWithScopedRoutes = &httppb.HttpConnectionManager{ - RouteSpecifier: &httppb.HttpConnectionManager_ScopedRoutes{}, - } - marshaledConnMgrWithScopedRoutes, _ = proto.Marshal(connMgrWithScopedRoutes) - goodListener1 = &xdspb.Listener{ - Name: goodLDSTarget1, - ApiListener: &listenerpb.ApiListener{ - ApiListener: &anypb.Any{ - TypeUrl: httpConnManagerURL, - Value: marshaledConnMgr1, - }, - }, - } - marshaledListener1, _ = proto.Marshal(goodListener1) - goodListener2 = &xdspb.Listener{ - Name: goodLDSTarget2, - ApiListener: &listenerpb.ApiListener{ - ApiListener: &anypb.Any{ - TypeUrl: httpConnManagerURL, - Value: marshaledConnMgr1, - }, - }, - } - marshaledListener2, _ = proto.Marshal(goodListener2) - noAPIListener = &xdspb.Listener{Name: goodLDSTarget1} - marshaledNoAPIListener, _ = proto.Marshal(noAPIListener) - badAPIListener1 = &xdspb.Listener{ - Name: goodLDSTarget1, - ApiListener: &listenerpb.ApiListener{ - ApiListener: &anypb.Any{ - TypeUrl: httpConnManagerURL, - Value: []byte{1, 2, 3, 4}, - }, - }, - } - badAPIListener2 = &xdspb.Listener{ - Name: goodLDSTarget2, - ApiListener: &listenerpb.ApiListener{ - ApiListener: &anypb.Any{ - TypeUrl: httpConnManagerURL, - Value: []byte{1, 2, 3, 4}, - }, - }, - } - badlyMarshaledAPIListener2, _ = proto.Marshal(badAPIListener2) - badResourceListener = &xdspb.Listener{ - Name: goodLDSTarget1, - ApiListener: &listenerpb.ApiListener{ - ApiListener: &anypb.Any{ - TypeUrl: ldsURL, - Value: marshaledListener1, - }, - }, - } - listenerWithEmptyHTTPConnMgr = &xdspb.Listener{ - Name: goodLDSTarget1, - ApiListener: &listenerpb.ApiListener{ - ApiListener: &anypb.Any{ - TypeUrl: httpConnManagerURL, - Value: emptyMarshaledConnMgr, - }, - }, - } - listenerWithScopedRoutesRouteConfig = &xdspb.Listener{ - Name: goodLDSTarget1, - ApiListener: &listenerpb.ApiListener{ - ApiListener: &anypb.Any{ - TypeUrl: httpConnManagerURL, - Value: marshaledConnMgrWithScopedRoutes, - }, - }, - } - goodLDSResponse1 = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - { - TypeUrl: ldsURL, - Value: marshaledListener1, - }, - }, - TypeUrl: ldsURL, - } - goodLDSResponse2 = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - { - TypeUrl: ldsURL, - Value: marshaledListener2, - }, - }, - TypeUrl: ldsURL, - } - emptyLDSResponse = &xdspb.DiscoveryResponse{TypeUrl: ldsURL} - badlyMarshaledLDSResponse = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - { - TypeUrl: ldsURL, - Value: []byte{1, 2, 3, 4}, - }, - }, - TypeUrl: ldsURL, - } - badResourceTypeInLDSResponse = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - { - TypeUrl: httpConnManagerURL, - Value: marshaledConnMgr1, - }, - }, - TypeUrl: ldsURL, - } - ldsResponseWithMultipleResources = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - { - TypeUrl: ldsURL, - Value: marshaledListener2, - }, - { - TypeUrl: ldsURL, - Value: marshaledListener1, - }, - }, - TypeUrl: ldsURL, - } - noAPIListenerLDSResponse = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - { - TypeUrl: ldsURL, - Value: marshaledNoAPIListener, - }, - }, - TypeUrl: ldsURL, - } - goodBadUglyLDSResponse = &xdspb.DiscoveryResponse{ - Resources: []*anypb.Any{ - { - TypeUrl: ldsURL, - Value: marshaledListener2, - }, - { - TypeUrl: ldsURL, - Value: marshaledListener1, - }, - { - TypeUrl: ldsURL, - Value: badlyMarshaledAPIListener2, - }, - }, - TypeUrl: ldsURL, - } - goodRouteConfig1 = &xdspb.RouteConfiguration{ - Name: goodRouteName1, - VirtualHosts: []*routepb.VirtualHost{ - { - Domains: []string{uninterestingDomain}, - Routes: []*routepb.Route{ - { - Match: &routepb.RouteMatch{PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: ""}}, - Action: &routepb.Route_Route{ - Route: &routepb.RouteAction{ - ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: uninterestingClusterName}, - }, - }, - }, - }, - }, - { - Domains: []string{goodMatchingDomain}, - Routes: []*routepb.Route{ - { - Match: &routepb.RouteMatch{PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: ""}}, - Action: &routepb.Route_Route{ - Route: &routepb.RouteAction{ - ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: goodClusterName1}, - }, - }, - }, - }, - }, - }, - } - marshaledGoodRouteConfig1, _ = proto.Marshal(goodRouteConfig1) - goodRouteConfig2 = &xdspb.RouteConfiguration{ - Name: goodRouteName2, - VirtualHosts: []*routepb.VirtualHost{ - { - Domains: []string{uninterestingDomain}, - Routes: []*routepb.Route{ - { - Match: &routepb.RouteMatch{PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: ""}}, - Action: &routepb.Route_Route{ - Route: &routepb.RouteAction{ - ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: uninterestingClusterName}, - }, - }, - }, - }, - }, - { - Domains: []string{goodMatchingDomain}, - Routes: []*routepb.Route{ - { - Match: &routepb.RouteMatch{PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: ""}}, - Action: &routepb.Route_Route{ - Route: &routepb.RouteAction{ - ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: goodClusterName2}, - }, - }, - }, - }, - }, - }, - } - marshaledGoodRouteConfig2, _ = proto.Marshal(goodRouteConfig2) - uninterestingRouteConfig = &xdspb.RouteConfiguration{ - Name: uninterestingRouteName, - VirtualHosts: []*routepb.VirtualHost{ - { - Domains: []string{uninterestingDomain}, - Routes: []*routepb.Route{ - { - Action: &routepb.Route_Route{ - Route: &routepb.RouteAction{ - ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: uninterestingClusterName}, - }, - }, - }, - }, - }, - }, - } - marshaledUninterestingRouteConfig, _ = proto.Marshal(uninterestingRouteConfig) -) - -// TestV2ClientBackoffAfterRecvError verifies if the v2Client backoffs when it -// encounters a Recv error while receiving an LDS response. -func TestV2ClientBackoffAfterRecvError(t *testing.T) { - fakeServer, cc, cleanup := startServerAndGetCC(t) - defer cleanup() - - // Override the v2Client backoff function with this, so that we can verify - // that a backoff actually was triggerred. - boCh := make(chan int, 1) - clientBackoff := func(v int) time.Duration { - boCh <- v - return 0 - } - - v2c := newV2Client(cc, goodNodeProto, clientBackoff) - defer v2c.close() - t.Log("Started xds v2Client...") - - callbackCh := make(chan struct{}) - v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) { - close(callbackCh) - }) - if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { - t.Fatalf("Timeout expired when expecting an LDS request") - } - t.Log("FakeServer received request...") - - fakeServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("RPC error")} - t.Log("Bad LDS response pushed to fakeServer...") - - timer := time.NewTimer(defaultTestTimeout) - select { - case <-timer.C: - t.Fatal("Timeout when expecting LDS update") - case <-boCh: - timer.Stop() - t.Log("v2Client backed off before retrying...") - case <-callbackCh: - t.Fatal("Received unexpected LDS callback") - } -} - -// TestV2ClientRetriesAfterBrokenStream verifies the case where a stream -// encountered a Recv() error, and is expected to send out xDS requests for -// registered watchers once it comes back up again. -func TestV2ClientRetriesAfterBrokenStream(t *testing.T) { - fakeServer, cc, cleanup := startServerAndGetCC(t) - defer cleanup() - - v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) - defer v2c.close() - t.Log("Started xds v2Client...") - - callbackCh := testutils.NewChannel() - v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) { - t.Logf("Received LDS callback with ldsUpdate {%+v} and error {%v}", u, err) - callbackCh.Send(struct{}{}) - }) - if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { - t.Fatalf("Timeout expired when expecting an LDS request") - } - t.Log("FakeServer received request...") - - fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1} - t.Log("Good LDS response pushed to fakeServer...") - - if _, err := callbackCh.Receive(); err != nil { - t.Fatal("Timeout when expecting LDS update") - } - - // Read the ack, so the next request is sent after stream re-creation. - if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { - t.Fatalf("Timeout expired when expecting an LDS ACK") - } - - fakeServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("RPC error")} - t.Log("Bad LDS response pushed to fakeServer...") - - val, err := fakeServer.XDSRequestChan.Receive() - if err == testutils.ErrRecvTimeout { - t.Fatalf("Timeout expired when expecting LDS update") - } - gotRequest := val.(*fakeserver.Request) - if !proto.Equal(gotRequest.Req, goodLDSRequest) { - t.Fatalf("gotRequest: %+v, wantRequest: %+v", gotRequest.Req, goodLDSRequest) - } -} - -// TestV2ClientCancelWatch verifies that the registered watch callback is not -// invoked if a response is received after the watcher is cancelled. -func TestV2ClientCancelWatch(t *testing.T) { - fakeServer, cc, cleanup := startServerAndGetCC(t) - defer cleanup() - - v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }) - defer v2c.close() - t.Log("Started xds v2Client...") - - callbackCh := testutils.NewChannel() - cancelFunc := v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) { - t.Logf("Received LDS callback with ldsUpdate {%+v} and error {%v}", u, err) - callbackCh.Send(struct{}{}) - }) - if _, err := fakeServer.XDSRequestChan.Receive(); err != nil { - t.Fatalf("Timeout expired when expecting an LDS request") - } - t.Log("FakeServer received request...") - - fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1} - t.Log("Good LDS response pushed to fakeServer...") - - if _, err := callbackCh.Receive(); err != nil { - t.Fatal("Timeout when expecting LDS update") - } - - cancelFunc() - - fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1} - t.Log("Another good LDS response pushed to fakeServer...") - - if _, err := callbackCh.Receive(); err != testutils.ErrRecvTimeout { - t.Fatalf("Watch callback invoked after the watcher was cancelled") - } -} diff --git a/plugin/traffic/xds_bootstrap_insecure.json b/plugin/traffic/xds_bootstrap_insecure.json index 1ec652a37..724de2409 100644 --- a/plugin/traffic/xds_bootstrap_insecure.json +++ b/plugin/traffic/xds_bootstrap_insecure.json @@ -1,8 +1,8 @@ { "node": { - "id": "ENVOY_NODE_ID", + "id": "COREDNS_NODE_ID", "metadata": { - "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" + "TRAFFICDIRECTOR_GRPC_HOSTNAME": "xds_cluster" } }, "xds_servers" : [{