Its working

Signed-off-by: Miek Gieben <miek@miek.nl>
This commit is contained in:
Miek Gieben
2020-01-15 16:37:18 +01:00
parent cf478b0aed
commit 5f2d5788b0
23 changed files with 177 additions and 3673 deletions

View File

@@ -114,3 +114,13 @@ will be stripped. You can optionally sign responses on the fly by using the *dns
* wording: cluster, endpoints, assignments, service_name are all used and roughly mean the same
thing; unify this.
const (
HealthStatus_UNKNOWN HealthStatus = 0
HealthStatus_HEALTHY HealthStatus = 1
HealthStatus_UNHEALTHY HealthStatus = 2
HealthStatus_DRAINING HealthStatus = 3
HealthStatus_TIMEOUT HealthStatus = 4
HealthStatus_DEGRADED HealthStatus = 5
)
https://www.envoyproxy.io/docs/envoy/v1.11.2/api-docs/xds_protocol

View File

@@ -1,14 +1,12 @@
package traffic
import (
"fmt"
"math/rand"
"time"
"github.com/coredns/coredns/core/dnsserver"
"github.com/coredns/coredns/plugin"
clog "github.com/coredns/coredns/plugin/pkg/log"
"github.com/coredns/coredns/plugin/traffic/xds"
"github.com/caddyserver/caddy"
)
@@ -33,8 +31,19 @@ func setup(c *caddy.Controller) error {
return t
})
t.c.WatchCluster("", func(x xds.CDSUpdate, _ error) { fmt.Printf("CDSUpdate: %+v\n", x) })
t.c.WatchEndpoints("", func(x *xds.EDSUpdate, _ error) { fmt.Printf("EDSUpdate: %+v\n", x) })
stream, err := t.c.Run()
if err != nil {
return plugin.Error("traffic", err)
}
if err := t.c.ClusterDiscovery(stream, "", "", []string{}); err != nil {
log.Error(err)
}
err = t.c.Receive(stream)
if err != nil {
return plugin.Error("traffic", err)
}
return nil
}

View File

@@ -8,7 +8,6 @@ import (
"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/pkg/response"
"github.com/coredns/coredns/plugin/traffic/xds"
"github.com/coredns/coredns/plugin/traffic/xds/bootstrap"
"github.com/miekg/dns"
)
@@ -21,11 +20,7 @@ type Traffic struct {
// New returns a pointer to a new and initialized Traffic.
func New() (*Traffic, error) {
config, err := bootstrap.NewConfig()
if err != nil {
return nil, err
}
c, err := xds.New(xds.Options{Config: *config})
c, err := xds.New(":18000", "mycoredns")
if err != nil {
return nil, err
}

View File

@@ -1,10 +0,0 @@
This code is copied from
[https://github.com/grpc/grpc-go/tree/master/xds](https://github.com/grpc/grpc-go/tree/master/xds).
Grpc-go is also a consumer of the Envoy xDS data and acts upon it.
The *traffic* plugin only cares about clusters and endpoints, the following bits are deleted:
* lDS; listener discovery is not used here.
* rDS: routes have no use for DNS responses.
Load reporting is also not implemented, although this can be done on the DNS level.

View File

@@ -1,165 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package bootstrap provides the functionality to initialize certain aspects
// of an xDS client by reading a bootstrap file.
package bootstrap
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"os"
"github.com/coredns/coredns/plugin/pkg/log"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/golang/protobuf/jsonpb"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/google"
"google.golang.org/grpc/grpclog"
)
const (
// Environment variable which holds the name of the xDS bootstrap file.
fileEnv = "GRPC_XDS_BOOTSTRAP"
// Type name for Google default credentials.
googleDefaultCreds = "google_default"
)
var gRPCVersion = fmt.Sprintf("gRPC-Go %s", grpc.Version)
// For overriding in unit tests.
var fileReadFunc = ioutil.ReadFile
// Config provides the xDS client with several key bits of information that it
// requires in its interaction with an xDS server. The Config is initialized
// from the bootstrap file.
type Config struct {
// BalancerName is the name of the xDS server to connect to.
//
// The bootstrap file contains a list of servers (with name+creds), but we
// pick the first one.
BalancerName string
// Creds contains the credentials to be used while talking to the xDS
// server, as a grpc.DialOption.
Creds grpc.DialOption
// NodeProto contains the node proto to be used in xDS requests.
NodeProto *corepb.Node
}
type channelCreds struct {
Type string `json:"type"`
Config json.RawMessage `json:"config"`
}
type xdsServer struct {
ServerURI string `json:"server_uri"`
ChannelCreds []channelCreds `json:"channel_creds"`
}
// NewConfig returns a new instance of Config initialized by reading the
// bootstrap file found at ${GRPC_XDS_BOOTSTRAP}.
//
// The format of the bootstrap file will be as follows:
// {
// "xds_server": {
// "server_uri": <string containing URI of xds server>,
// "channel_creds": [
// {
// "type": <string containing channel cred type>,
// "config": <JSON object containing config for the type>
// }
// ]
// },
// "node": <JSON form of corepb.Node proto>
// }
//
// Currently, we support exactly one type of credential, which is
// "google_default", where we use the host's default certs for transport
// credentials and a Google oauth token for call credentials.
//
// This function tries to process as much of the bootstrap file as possible (in
// the presence of the errors) and may return a Config object with certain
// fields left unspecified, in which case the caller should use some sane
// defaults.
func NewConfig() (*Config, error) {
config := &Config{}
fName, ok := os.LookupEnv(fileEnv)
if !ok {
return config, fmt.Errorf("xds: %s environment variable not set", fileEnv)
}
grpclog.Infof("xds: Reading bootstrap file from %s", fName)
data, err := fileReadFunc(fName)
if err != nil {
return config, fmt.Errorf("xds: bootstrap file {%v} read failed: %v", fName, err)
}
var jsonData map[string]json.RawMessage
if err := json.Unmarshal(data, &jsonData); err != nil {
return config, fmt.Errorf("xds: json.Unmarshal(%v) failed during bootstrap: %v", string(data), err)
}
m := jsonpb.Unmarshaler{AllowUnknownFields: true}
for k, v := range jsonData {
switch k {
case "node":
n := &corepb.Node{}
if err := m.Unmarshal(bytes.NewReader(v), n); err != nil {
log.Errorf("xds: jsonpb.Unmarshal(%v) for field %q failed during bootstrap: %v", string(v), k, err)
break
}
config.NodeProto = n
case "xds_servers":
var servers []*xdsServer
if err := json.Unmarshal(v, &servers); err != nil {
log.Errorf("xds: json.Unmarshal(%v) for field %q failed during bootstrap: %v", string(v), k, err)
break
}
if len(servers) < 1 {
log.Errorf("xds: bootstrap file parsing failed during bootstrap: file doesn't contain any xds server to connect to")
break
}
xs := servers[0]
config.BalancerName = xs.ServerURI
for _, cc := range xs.ChannelCreds {
if cc.Type == googleDefaultCreds {
config.Creds = grpc.WithCredentialsBundle(google.NewComputeEngineCredentials())
// We stop at the first credential type that we support.
break
}
}
default:
// Do not fail the xDS bootstrap when an unknown field is seen.
log.Warningf("xds: unexpected data in bootstrap file: {%v, %v}", k, string(v))
}
}
// If we don't find a nodeProto in the bootstrap file, we just create an
// empty one here. That way, callers of this function can always expect
// that the NodeProto field is non-nil.
if config.NodeProto == nil {
config.NodeProto = &corepb.Node{}
}
config.NodeProto.BuildVersion = gRPCVersion
return config, nil
}

View File

@@ -1,260 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package bootstrap
import (
"os"
"testing"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/google"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
structpb "github.com/golang/protobuf/ptypes/struct"
)
var (
nodeProto = &corepb.Node{
Id: "ENVOY_NODE_ID",
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"TRAFFICDIRECTOR_GRPC_HOSTNAME": {
Kind: &structpb.Value_StringValue{StringValue: "trafficdirector"},
},
},
},
BuildVersion: gRPCVersion,
}
nilCredsConfig = &Config{
BalancerName: "trafficdirector.googleapis.com:443",
Creds: nil,
NodeProto: nodeProto,
}
nonNilCredsConfig = &Config{
BalancerName: "trafficdirector.googleapis.com:443",
Creds: grpc.WithCredentialsBundle(google.NewComputeEngineCredentials()),
NodeProto: nodeProto,
}
)
// TestNewConfig exercises the functionality in NewConfig with different
// bootstrap file contents. It overrides the fileReadFunc by returning
// bootstrap file contents defined in this test, instead of reading from a
// file.
func TestNewConfig(t *testing.T) {
bootstrapFileMap := map[string]string{
"empty": "",
"badJSON": `["test": 123]`,
"emptyNodeProto": `
{
"xds_servers" : [{
"server_uri": "trafficdirector.googleapis.com:443"
}]
}`,
"emptyXdsServer": `
{
"node": {
"id": "ENVOY_NODE_ID",
"metadata": {
"TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector"
}
}
}`,
"unknownTopLevelFieldInFile": `
{
"node": {
"id": "ENVOY_NODE_ID",
"metadata": {
"TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector"
}
},
"xds_servers" : [{
"server_uri": "trafficdirector.googleapis.com:443",
"channel_creds": [
{ "type": "not-google-default" }
]
}],
"unknownField": "foobar"
}`,
"unknownFieldInNodeProto": `
{
"node": {
"id": "ENVOY_NODE_ID",
"unknownField": "foobar",
"metadata": {
"TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector"
}
}
}`,
"unknownFieldInXdsServer": `
{
"node": {
"id": "ENVOY_NODE_ID",
"metadata": {
"TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector"
}
},
"xds_servers" : [{
"server_uri": "trafficdirector.googleapis.com:443",
"channel_creds": [
{ "type": "not-google-default" }
],
"unknownField": "foobar"
}]
}`,
"emptyChannelCreds": `
{
"node": {
"id": "ENVOY_NODE_ID",
"metadata": {
"TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector"
}
},
"xds_servers" : [{
"server_uri": "trafficdirector.googleapis.com:443"
}]
}`,
"nonGoogleDefaultCreds": `
{
"node": {
"id": "ENVOY_NODE_ID",
"metadata": {
"TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector"
}
},
"xds_servers" : [{
"server_uri": "trafficdirector.googleapis.com:443",
"channel_creds": [
{ "type": "not-google-default" }
]
}]
}`,
"multipleChannelCreds": `
{
"node": {
"id": "ENVOY_NODE_ID",
"metadata": {
"TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector"
}
},
"xds_servers" : [{
"server_uri": "trafficdirector.googleapis.com:443",
"channel_creds": [
{ "type": "not-google-default" },
{ "type": "google_default" }
]
}]
}`,
"goodBootstrap": `
{
"node": {
"id": "ENVOY_NODE_ID",
"metadata": {
"TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector"
}
},
"xds_servers" : [{
"server_uri": "trafficdirector.googleapis.com:443",
"channel_creds": [
{ "type": "google_default" }
]
}]
}`,
"multipleXDSServers": `
{
"node": {
"id": "ENVOY_NODE_ID",
"metadata": {
"TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector"
}
},
"xds_servers" : [
{
"server_uri": "trafficdirector.googleapis.com:443",
"channel_creds": [{ "type": "google_default" }]
},
{
"server_uri": "backup.never.use.com:1234",
"channel_creds": [{ "type": "not-google-default" }]
}
]
}`,
}
oldFileReadFunc := fileReadFunc
fileReadFunc = func(name string) ([]byte, error) {
if b, ok := bootstrapFileMap[name]; ok {
return []byte(b), nil
}
return nil, os.ErrNotExist
}
defer func() {
fileReadFunc = oldFileReadFunc
os.Unsetenv(fileEnv)
}()
tests := []struct {
name string
wantConfig *Config
}{
{"nonExistentBootstrapFile", &Config{}},
{"empty", &Config{}},
{"badJSON", &Config{}},
{"emptyNodeProto", &Config{
BalancerName: "trafficdirector.googleapis.com:443",
NodeProto: &corepb.Node{BuildVersion: gRPCVersion},
}},
{"emptyXdsServer", &Config{NodeProto: nodeProto}},
{"unknownTopLevelFieldInFile", nilCredsConfig},
{"unknownFieldInNodeProto", &Config{NodeProto: nodeProto}},
{"unknownFieldInXdsServer", nilCredsConfig},
{"emptyChannelCreds", nilCredsConfig},
{"nonGoogleDefaultCreds", nilCredsConfig},
{"multipleChannelCreds", nonNilCredsConfig},
{"goodBootstrap", nonNilCredsConfig},
{"multipleXDSServers", nonNilCredsConfig},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if err := os.Setenv(fileEnv, test.name); err != nil {
t.Fatalf("os.Setenv(%s, %s) failed with error: %v", fileEnv, test.name, err)
}
config := NewConfig()
if config.BalancerName != test.wantConfig.BalancerName {
t.Errorf("config.BalancerName is %s, want %s", config.BalancerName, test.wantConfig.BalancerName)
}
if !proto.Equal(config.NodeProto, test.wantConfig.NodeProto) {
t.Errorf("config.NodeProto is %#v, want %#v", config.NodeProto, test.wantConfig.NodeProto)
}
if (config.Creds != nil) != (test.wantConfig.Creds != nil) {
t.Errorf("config.Creds is %#v, want %#v", config.Creds, test.wantConfig.Creds)
}
})
}
}
func TestNewConfigEnvNotSet(t *testing.T) {
os.Unsetenv(fileEnv)
wantConfig := Config{}
if config := NewConfig(); *config != wantConfig {
t.Errorf("NewConfig() returned : %#v, wanted an empty Config object", config)
}
}

View File

@@ -1,85 +0,0 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package buffer provides an implementation of an unbounded buffer.
package buffer
import "sync"
// Unbounded is an implementation of an unbounded buffer which does not use
// extra goroutines. This is typically used for passing updates from one entity
// to another within gRPC.
//
// All methods on this type are thread-safe and don't block on anything except
// the underlying mutex used for synchronization.
//
// Unbounded supports values of any type to be stored in it by using a channel
// of `interface{}`. This means that a call to Put() incurs an extra memory
// allocation, and also that users need a type assertion while reading. For
// performance critical code paths, using Unbounded is strongly discouraged and
// defining a new type specific implementation of this buffer is preferred. See
// internal/transport/transport.go for an example of this.
type Unbounded struct {
c chan interface{}
mu sync.Mutex
backlog []interface{}
}
// NewUnbounded returns a new instance of Unbounded.
func NewUnbounded() *Unbounded {
return &Unbounded{c: make(chan interface{}, 1)}
}
// Put adds t to the unbounded buffer.
func (b *Unbounded) Put(t interface{}) {
b.mu.Lock()
if len(b.backlog) == 0 {
select {
case b.c <- t:
b.mu.Unlock()
return
default:
}
}
b.backlog = append(b.backlog, t)
b.mu.Unlock()
}
// Load sends the earliest buffered data, if any, onto the read channel
// returned by Get(). Users are expected to call this every time they read a
// value from the read channel.
func (b *Unbounded) Load() {
b.mu.Lock()
if len(b.backlog) > 0 {
select {
case b.c <- b.backlog[0]:
b.backlog[0] = nil
b.backlog = b.backlog[1:]
default:
}
}
b.mu.Unlock()
}
// Get returns a read channel on which values added to the buffer, via Put(),
// are sent on.
//
// Upon reading a value from this channel, users are expected to call Load() to
// send the next buffered value onto the channel if there is any.
func (b *Unbounded) Get() <-chan interface{} {
return b.c
}

View File

@@ -1,111 +0,0 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package buffer
import (
"reflect"
"sort"
"sync"
"testing"
)
const (
numWriters = 10
numWrites = 10
)
// wantReads contains the set of values expected to be read by the reader
// goroutine in the tests.
var wantReads []int
func init() {
for i := 0; i < numWriters; i++ {
for j := 0; j < numWrites; j++ {
wantReads = append(wantReads, i)
}
}
}
// TestSingleWriter starts one reader and one writer goroutine and makes sure
// that the reader gets all the value added to the buffer by the writer.
func TestSingleWriter(t *testing.T) {
ub := NewUnbounded()
reads := []int{}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
ch := ub.Get()
for i := 0; i < numWriters*numWrites; i++ {
r := <-ch
reads = append(reads, r.(int))
ub.Load()
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < numWriters; i++ {
for j := 0; j < numWrites; j++ {
ub.Put(i)
}
}
}()
wg.Wait()
if !reflect.DeepEqual(reads, wantReads) {
t.Errorf("reads: %#v, wantReads: %#v", reads, wantReads)
}
}
// TestMultipleWriters starts multiple writers and one reader goroutine and
// makes sure that the reader gets all the data written by all writers.
func TestMultipleWriters(t *testing.T) {
ub := NewUnbounded()
reads := []int{}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
ch := ub.Get()
for i := 0; i < numWriters*numWrites; i++ {
r := <-ch
reads = append(reads, r.(int))
ub.Load()
}
}()
wg.Add(numWriters)
for i := 0; i < numWriters; i++ {
go func(index int) {
defer wg.Done()
for j := 0; j < numWrites; j++ {
ub.Put(index)
}
}(i)
}
wg.Wait()
sort.Ints(reads)
if !reflect.DeepEqual(reads, wantReads) {
t.Errorf("reads: %#v, wantReads: %#v", reads, wantReads)
}
}

View File

@@ -1,90 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xds
import (
"fmt"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/golang/protobuf/ptypes"
)
// handleCDSResponse processes an CDS response received from the xDS server. On
// receipt of a good response, it also invokes the registered watcher callback.
func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error {
println("handlCDSResponse")
v2c.mu.Lock()
defer v2c.mu.Unlock()
wi := v2c.watchMap[cdsURL]
if wi == nil {
return fmt.Errorf("xds: no CDS watcher found when handling CDS response: %+v", resp)
}
var returnUpdate CDSUpdate
localCache := make(map[string]CDSUpdate)
for _, r := range resp.GetResources() {
var resource ptypes.DynamicAny
if err := ptypes.UnmarshalAny(r, &resource); err != nil {
return fmt.Errorf("xds: failed to unmarshal resource in CDS response: %v", err)
}
cluster, ok := resource.Message.(*xdspb.Cluster)
if !ok {
return fmt.Errorf("xds: unexpected resource type: %T in CDS response", resource.Message)
}
fmt.Printf("CLUSTER %+v\n", cluster)
update, err := validateCluster(cluster)
if err != nil {
return err
}
// If the Cluster message in the CDS response did not contain a
// serviceName, we will just use the clusterName for EDS.
if update.ServiceName == "" {
update.ServiceName = cluster.GetName()
}
localCache[cluster.GetName()] = update
if cluster.GetName() == wi.target[0] {
returnUpdate = update
}
}
v2c.cdsCache = localCache
var err error
if returnUpdate.ServiceName == "" {
err = fmt.Errorf("xds: CDS target %s not found in received response %+v", wi.target, resp)
}
wi.stopTimer()
wi.callback.(cdsCallback)(returnUpdate, err)
return nil
}
func validateCluster(cluster *xdspb.Cluster) (CDSUpdate, error) {
emptyUpdate := CDSUpdate{ServiceName: ""}
switch {
case cluster.GetType() != xdspb.Cluster_EDS:
return emptyUpdate, fmt.Errorf("xds: unexpected cluster type %v in response: %+v", cluster.GetType(), cluster)
case cluster.GetEdsClusterConfig().GetEdsConfig().GetAds() == nil:
return emptyUpdate, fmt.Errorf("xds: unexpected edsConfig in response: %+v", cluster)
case cluster.GetLbPolicy() != xdspb.Cluster_ROUND_ROBIN:
return emptyUpdate, fmt.Errorf("xds: unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster)
}
return CDSUpdate{ServiceName: cluster.GetEdsClusterConfig().GetServiceName()}, nil
}

View File

@@ -1,487 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xds
import (
"errors"
"fmt"
"reflect"
"testing"
"time"
discoverypb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/golang/protobuf/proto"
anypb "github.com/golang/protobuf/ptypes/any"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeserver"
)
const (
clusterName1 = "foo-cluster"
clusterName2 = "bar-cluster"
serviceName1 = "foo-service"
serviceName2 = "bar-service"
)
func (v2c *v2Client) cloneCDSCacheForTesting() map[string]CDSUpdate {
v2c.mu.Lock()
defer v2c.mu.Unlock()
cloneCache := make(map[string]CDSUpdate)
for k, v := range v2c.cdsCache {
cloneCache[k] = v
}
return cloneCache
}
func TestValidateCluster(t *testing.T) {
emptyUpdate := CDSUpdate{ServiceName: "", EnableLRS: false}
tests := []struct {
name string
cluster *xdspb.Cluster
wantUpdate CDSUpdate
wantErr bool
}{
{
name: "non-eds-cluster-type",
cluster: &xdspb.Cluster{
ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_STATIC},
EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{
EdsConfig: &corepb.ConfigSource{
ConfigSourceSpecifier: &corepb.ConfigSource_Ads{
Ads: &corepb.AggregatedConfigSource{},
},
},
},
LbPolicy: xdspb.Cluster_LEAST_REQUEST,
},
wantUpdate: emptyUpdate,
wantErr: true,
},
{
name: "no-eds-config",
cluster: &xdspb.Cluster{
ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS},
LbPolicy: xdspb.Cluster_ROUND_ROBIN,
},
wantUpdate: emptyUpdate,
wantErr: true,
},
{
name: "no-ads-config-source",
cluster: &xdspb.Cluster{
ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS},
EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{},
LbPolicy: xdspb.Cluster_ROUND_ROBIN,
},
wantUpdate: emptyUpdate,
wantErr: true,
},
{
name: "non-round-robin-lb-policy",
cluster: &xdspb.Cluster{
ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS},
EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{
EdsConfig: &corepb.ConfigSource{
ConfigSourceSpecifier: &corepb.ConfigSource_Ads{
Ads: &corepb.AggregatedConfigSource{},
},
},
},
LbPolicy: xdspb.Cluster_LEAST_REQUEST,
},
wantUpdate: emptyUpdate,
wantErr: true,
},
{
name: "happy-case-no-service-name-no-lrs",
cluster: &xdspb.Cluster{
ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS},
EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{
EdsConfig: &corepb.ConfigSource{
ConfigSourceSpecifier: &corepb.ConfigSource_Ads{
Ads: &corepb.AggregatedConfigSource{},
},
},
},
LbPolicy: xdspb.Cluster_ROUND_ROBIN,
},
wantUpdate: emptyUpdate,
},
{
name: "happy-case-no-lrs",
cluster: &xdspb.Cluster{
ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS},
EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{
EdsConfig: &corepb.ConfigSource{
ConfigSourceSpecifier: &corepb.ConfigSource_Ads{
Ads: &corepb.AggregatedConfigSource{},
},
},
ServiceName: serviceName1,
},
LbPolicy: xdspb.Cluster_ROUND_ROBIN,
},
wantUpdate: CDSUpdate{ServiceName: serviceName1, EnableLRS: false},
},
{
name: "happiest-case",
cluster: goodCluster1,
wantUpdate: CDSUpdate{ServiceName: serviceName1, EnableLRS: true},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
gotUpdate, gotErr := validateCluster(test.cluster)
if (gotErr != nil) != test.wantErr {
t.Errorf("validateCluster(%+v) returned error: %v, wantErr: %v", test.cluster, gotErr, test.wantErr)
}
if !reflect.DeepEqual(gotUpdate, test.wantUpdate) {
t.Errorf("validateCluster(%+v) = %v, want: %v", test.cluster, gotUpdate, test.wantUpdate)
}
})
}
}
// TestCDSHandleResponse starts a fake xDS server, makes a ClientConn to it,
// and creates a v2Client using it. Then, it registers a CDS watcher and tests
// different CDS responses.
func TestCDSHandleResponse(t *testing.T) {
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
tests := []struct {
name string
cdsResponse *xdspb.DiscoveryResponse
wantErr bool
wantUpdate *CDSUpdate
wantUpdateErr bool
}{
// Badly marshaled CDS response.
{
name: "badly-marshaled-response",
cdsResponse: badlyMarshaledCDSResponse,
wantErr: true,
wantUpdate: nil,
wantUpdateErr: false,
},
// Response does not contain Cluster proto.
{
name: "no-cluster-proto-in-response",
cdsResponse: badResourceTypeInLDSResponse,
wantErr: true,
wantUpdate: nil,
wantUpdateErr: false,
},
// Response contains no clusters.
{
name: "no-cluster",
cdsResponse: &xdspb.DiscoveryResponse{},
wantErr: false,
wantUpdate: &CDSUpdate{},
wantUpdateErr: true,
},
// Response contains one good cluster we are not interested in.
{
name: "one-uninteresting-cluster",
cdsResponse: goodCDSResponse2,
wantErr: false,
wantUpdate: &CDSUpdate{},
wantUpdateErr: true,
},
// Response contains one cluster and it is good.
{
name: "one-good-cluster",
cdsResponse: goodCDSResponse1,
wantErr: false,
wantUpdate: &CDSUpdate{ServiceName: serviceName1, EnableLRS: true},
wantUpdateErr: false,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testWatchHandle(t, &watchHandleTestcase{
responseToHandle: test.cdsResponse,
wantHandleErr: test.wantErr,
wantUpdate: test.wantUpdate,
wantUpdateErr: test.wantUpdateErr,
cdsWatch: v2c.watchCDS,
watchReqChan: fakeServer.XDSRequestChan,
handleXDSResp: v2c.handleCDSResponse,
})
})
}
}
// TestCDSHandleResponseWithoutWatch tests the case where the v2Client receives
// a CDS response without a registered watcher.
func TestCDSHandleResponseWithoutWatch(t *testing.T) {
_, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
if v2c.handleCDSResponse(goodCDSResponse1) == nil {
t.Fatal("v2c.handleCDSResponse() succeeded, should have failed")
}
}
// cdsTestOp contains all data related to one particular test operation. Not
// all fields make sense for all tests.
type cdsTestOp struct {
// target is the resource name to watch for.
target string
// responseToSend is the xDS response sent to the client
responseToSend *fakeserver.Response
// wantOpErr specfies whether the main operation should return an error.
wantOpErr bool
// wantCDSCache is the expected rdsCache at the end of an operation.
wantCDSCache map[string]CDSUpdate
// wantWatchCallback specifies if the watch callback should be invoked.
wantWatchCallback bool
}
// testCDSCaching is a helper function which starts a fake xDS server, makes a
// ClientConn to it, creates a v2Client using it. It then reads a bunch of
// test operations to be performed from cdsTestOps and returns error, if any,
// on the provided error channel. This is executed in a separate goroutine.
func testCDSCaching(t *testing.T, cdsTestOps []cdsTestOp, errCh *testutils.Channel) {
t.Helper()
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")
callbackCh := make(chan struct{}, 1)
for _, cdsTestOp := range cdsTestOps {
// Register a watcher if required, and use a channel to signal the
// successful invocation of the callback.
if cdsTestOp.target != "" {
v2c.watchCDS(cdsTestOp.target, func(u CDSUpdate, err error) {
t.Logf("Received callback with CDSUpdate {%+v} and error {%v}", u, err)
callbackCh <- struct{}{}
})
t.Logf("Registered a watcher for CDS target: %v...", cdsTestOp.target)
// Wait till the request makes it to the fakeServer. This ensures that
// the watch request has been processed by the v2Client.
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
errCh.Send(fmt.Errorf("Timeout waiting for CDS request: %v", err))
return
}
t.Log("FakeServer received request...")
}
// Directly push the response through a call to handleCDSResponse,
// thereby bypassing the fakeServer.
if cdsTestOp.responseToSend != nil {
resp := cdsTestOp.responseToSend.Resp.(*discoverypb.DiscoveryResponse)
if err := v2c.handleCDSResponse(resp); (err != nil) != cdsTestOp.wantOpErr {
errCh.Send(fmt.Errorf("v2c.handleRDSResponse(%+v) returned err: %v", resp, err))
return
}
}
// If the test needs the callback to be invoked, just verify that
// it was invoked. Since we verify the contents of the cache, it's
// ok not to verify the contents of the callback.
if cdsTestOp.wantWatchCallback {
<-callbackCh
}
if !reflect.DeepEqual(v2c.cloneCDSCacheForTesting(), cdsTestOp.wantCDSCache) {
errCh.Send(fmt.Errorf("gotCDSCache: %v, wantCDSCache: %v", v2c.rdsCache, cdsTestOp.wantCDSCache))
return
}
}
t.Log("Completed all test ops successfully...")
errCh.Send(nil)
}
// TestCDSCaching tests some end-to-end CDS flows using a fake xDS server, and
// verifies the CDS data cached at the v2Client.
func TestCDSCaching(t *testing.T) {
ops := []cdsTestOp{
// Add an CDS watch for a cluster name (clusterName1), which returns one
// matching resource in the response.
{
target: clusterName1,
responseToSend: &fakeserver.Response{Resp: goodCDSResponse1},
wantCDSCache: map[string]CDSUpdate{
clusterName1: {serviceName1, true},
},
wantWatchCallback: true,
},
// Push an CDS response which contains a new resource (apart from the
// one received in the previous response). This should be cached.
{
responseToSend: &fakeserver.Response{Resp: cdsResponseWithMultipleResources},
wantCDSCache: map[string]CDSUpdate{
clusterName1: {serviceName1, true},
clusterName2: {serviceName2, false},
},
wantWatchCallback: true,
},
// Switch the watch target to clusterName2, which was already cached. No
// response is received from the server (as expected), but we want the
// callback to be invoked with the new serviceName.
{
target: clusterName2,
wantCDSCache: map[string]CDSUpdate{
clusterName1: {serviceName1, true},
clusterName2: {serviceName2, false},
},
wantWatchCallback: true,
},
// Push an empty CDS response. This should clear the cache.
{
responseToSend: &fakeserver.Response{Resp: &xdspb.DiscoveryResponse{TypeUrl: cdsURL}},
wantOpErr: false,
wantCDSCache: map[string]CDSUpdate{},
wantWatchCallback: true,
},
}
errCh := testutils.NewChannel()
go testCDSCaching(t, ops, errCh)
waitForNilErr(t, errCh)
}
// TestCDSWatchExpiryTimer tests the case where the client does not receive an
// CDS response for the request that it sends out. We want the watch callback
// to be invoked with an error once the watchExpiryTimer fires.
func TestCDSWatchExpiryTimer(t *testing.T) {
oldWatchExpiryTimeout := defaultWatchExpiryTimeout
defaultWatchExpiryTimeout = 500 * time.Millisecond
defer func() {
defaultWatchExpiryTimeout = oldWatchExpiryTimeout
}()
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")
callbackCh := testutils.NewChannel()
v2c.watchCDS(clusterName1, func(u CDSUpdate, err error) {
t.Logf("Received callback with CDSUpdate {%+v} and error {%v}", u, err)
if u.ServiceName != "" {
callbackCh.Send(fmt.Errorf("received serviceName %v in cdsCallback, wanted empty string", u.ServiceName))
}
if err == nil {
callbackCh.Send(errors.New("received nil error in cdsCallback"))
}
callbackCh.Send(nil)
})
// Wait till the request makes it to the fakeServer. This ensures that
// the watch request has been processed by the v2Client.
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an CDS request")
}
waitForNilErr(t, callbackCh)
}
var (
badlyMarshaledCDSResponse = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: cdsURL,
Value: []byte{1, 2, 3, 4},
},
},
TypeUrl: cdsURL,
}
goodCluster1 = &xdspb.Cluster{
Name: clusterName1,
ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS},
EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{
EdsConfig: &corepb.ConfigSource{
ConfigSourceSpecifier: &corepb.ConfigSource_Ads{
Ads: &corepb.AggregatedConfigSource{},
},
},
ServiceName: serviceName1,
},
LbPolicy: xdspb.Cluster_ROUND_ROBIN,
LrsServer: &corepb.ConfigSource{
ConfigSourceSpecifier: &corepb.ConfigSource_Self{
Self: &corepb.SelfConfigSource{},
},
},
}
marshaledCluster1, _ = proto.Marshal(goodCluster1)
goodCluster2 = &xdspb.Cluster{
Name: clusterName2,
ClusterDiscoveryType: &xdspb.Cluster_Type{Type: xdspb.Cluster_EDS},
EdsClusterConfig: &xdspb.Cluster_EdsClusterConfig{
EdsConfig: &corepb.ConfigSource{
ConfigSourceSpecifier: &corepb.ConfigSource_Ads{
Ads: &corepb.AggregatedConfigSource{},
},
},
ServiceName: serviceName2,
},
LbPolicy: xdspb.Cluster_ROUND_ROBIN,
}
marshaledCluster2, _ = proto.Marshal(goodCluster2)
goodCDSResponse1 = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: cdsURL,
Value: marshaledCluster1,
},
},
TypeUrl: cdsURL,
}
goodCDSResponse2 = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: cdsURL,
Value: marshaledCluster2,
},
},
TypeUrl: cdsURL,
}
cdsResponseWithMultipleResources = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: cdsURL,
Value: marshaledCluster1,
},
{
TypeUrl: cdsURL,
Value: marshaledCluster2,
},
},
TypeUrl: cdsURL,
}
)

View File

@@ -1,113 +1,181 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
This package contains code copied from github.com/grpc/grpc-co. The license for that code is:
// Package client implementation a full fledged gRPC client for the xDS API
// used by the xds resolver and balancer implementations.
Copyright 2019 gRPC authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package xds implements a bidirectional stream to an envoy ADS management endpoint. It will stream
// updates (CDS and EDS) from there to help load balance responses to DNS clients.
package xds
import (
"errors"
"fmt"
"time"
"context"
"sync"
"github.com/coredns/coredns/plugin/traffic/xds/bootstrap"
clog "github.com/coredns/coredns/plugin/pkg/log"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc"
)
// Options provides all parameters required for the creation of an xDS client.
type Options struct {
// Config contains a fully populated bootstrap config. It is the
// responsibility of the caller to use some sane defaults here if the
// bootstrap process returned with certain fields left unspecified.
Config bootstrap.Config
// DialOpts contains dial options to be used when dialing the xDS server.
DialOpts []grpc.DialOption
}
var log = clog.NewWithPlugin("traffic")
const (
cdsURL = "type.googleapis.com/envoy.api.v2.Cluster"
edsURL = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"
)
type adsStream adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient
// Client is a full fledged gRPC client which queries a set of discovery APIs
// (collectively termed as xDS) on a remote management server, to discover
// various dynamic resources. A single client object will be shared by the xds
// resolver and balancer implementations.
type Client struct {
opts Options
cc *grpc.ClientConn // Connection to the xDS server
v2c *v2Client // Actual xDS client implementation using the v2 API
serviceCallback func(ServiceUpdate, error)
cc *grpc.ClientConn
ctx context.Context
assignments assignment
node *corepb.Node
cancel context.CancelFunc
}
// New returns a new xdsClient configured with opts.
func New(opts Options) (*Client, error) {
switch {
case opts.Config.BalancerName == "":
return nil, errors.New("xds: no xds_server name provided in options")
case opts.Config.Creds == nil:
fmt.Printf("%s\n", errors.New("xds: no credentials provided in options"))
case opts.Config.NodeProto == nil:
return nil, errors.New("xds: no node_proto provided in options")
}
type assignment struct {
mu sync.RWMutex
cla map[string]*xdspb.ClusterLoadAssignment
version int // not sure what do with and if we should discard all clusters.
}
var dopts []grpc.DialOption
if opts.Config.Creds == nil {
dopts = append([]grpc.DialOption{grpc.WithInsecure()}, opts.DialOpts...)
} else {
dopts = append([]grpc.DialOption{opts.Config.Creds}, opts.DialOpts...)
func (a assignment) SetClusterLoadAssignment(cluster string, cla *xdspb.ClusterLoadAssignment) {
// if cla is nil we just found a cluster, check if we already know about it, or if we need to make
// a new entry
a.mu.Lock()
defer a.mu.Unlock()
_, ok := a.cla[cluster]
if !ok {
a.cla[cluster] = cla
return
}
cc, err := grpc.Dial(opts.Config.BalancerName, dopts...)
if cla == nil {
return
}
a.cla[cluster] = cla
}
func (a assignment) ClusterLoadAssignment(cluster string) *xdspb.ClusterLoadAssignment {
return nil
}
func (a assignment) Clusters() []string {
a.mu.RLock()
defer a.mu.RUnlock()
clusters := make([]string, len(a.cla))
i := 0
for k := range a.cla {
clusters[i] = k
i++
}
return clusters
}
// New returns a new client that's dialed to addr using node as the local identifier.
func New(addr, node string) (*Client, error) {
// todo credentials
opts := []grpc.DialOption{grpc.WithInsecure()}
cc, err := grpc.Dial(addr, opts...)
if err != nil {
// An error from a non-blocking dial indicates something serious.
return nil, fmt.Errorf("xds: failed to dial balancer {%s}: %v", opts.Config.BalancerName, err)
return nil, err
}
c := &Client{cc: cc, node: &corepb.Node{Id: "test-id"}} // do more with this node data? Hostname port??
c.assignments = assignment{cla: make(map[string]*xdspb.ClusterLoadAssignment)}
c.ctx, c.cancel = context.WithCancel(context.Background())
println("dialed balancer at", opts.Config.BalancerName)
c := &Client{
opts: opts,
cc: cc,
v2c: newV2Client(cc, opts.Config.NodeProto, func(int) time.Duration { return 0 }),
}
return c, nil
}
// Close closes the gRPC connection to the xDS server.
func (c *Client) Close() {
// TODO: Should we invoke the registered callbacks here with an error that
// the client is closed?
c.v2c.close()
c.cc.Close()
func (c *Client) Close() { c.cancel(); c.cc.Close() }
func (c *Client) Run() (adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient, error) {
cli := adsgrpc.NewAggregatedDiscoveryServiceClient(c.cc)
stream, err := cli.StreamAggregatedResources(c.ctx)
if err != nil {
return nil, err
}
return stream, nil
}
func (c *Client) Run() {
c.v2c.run()
func (c *Client) ClusterDiscovery(stream adsStream, version, nonce string, clusters []string) error {
req := &xdspb.DiscoveryRequest{
Node: c.node,
TypeUrl: cdsURL,
ResourceNames: clusters, // empty for all
VersionInfo: version,
ResponseNonce: nonce,
}
return stream.Send(req)
}
// ServiceUpdate contains update about the service.
type ServiceUpdate struct {
Cluster string
func (c *Client) EndpointDiscovery(stream adsStream, version, nonce string, clusters []string) error {
req := &xdspb.DiscoveryRequest{
Node: c.node,
TypeUrl: edsURL,
ResourceNames: clusters,
VersionInfo: version,
ResponseNonce: nonce,
}
return stream.Send(req)
}
// WatchCluster uses CDS to discover information about the provided clusterName.
func (c *Client) WatchCluster(clusterName string, cdsCb func(CDSUpdate, error)) (cancel func()) {
return c.v2c.watchCDS(clusterName, cdsCb)
}
func (c *Client) Receive(stream adsStream) error {
for {
resp, err := stream.Recv()
if err != nil {
return err
}
// WatchEndpoints uses EDS to discover information about the endpoints in a cluster.
func (c *Client) WatchEndpoints(clusterName string, edsCb func(*EDSUpdate, error)) (cancel func()) {
return c.v2c.watchEDS(clusterName, edsCb)
switch resp.GetTypeUrl() {
case cdsURL:
for _, r := range resp.GetResources() {
var any ptypes.DynamicAny
if err := ptypes.UnmarshalAny(r, &any); err != nil {
continue
}
cluster, ok := any.Message.(*xdspb.Cluster)
if !ok {
continue
}
c.assignments.SetClusterLoadAssignment(cluster.GetName(), nil)
}
println("HERER", len(resp.GetResources()))
log.Debug("Cluster discovery processed with %d resources", len(resp.GetResources()))
// ack the CDS proto, with we we've got. (empty version would be NACK)
if err := c.ClusterDiscovery(stream, resp.GetVersionInfo(), resp.GetNonce(), c.assignments.Clusters()); err != nil {
log.Warningf("Failed to acknowledge cluster discovery: %s", err)
}
// need to figure out how to handle the version exactly.
// now kick off discovery for endpoints
if err := c.EndpointDiscovery(stream, "", "", c.assignments.Clusters()); err != nil {
log.Warningf("Failed to perform endpoint discovery: %s", err)
}
case edsURL:
println("EDS")
default:
log.Warningf("Unknown response URL for discovery: %q", resp.GetTypeUrl())
continue
}
}
return nil
}

View File

@@ -1,292 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xds
import (
"errors"
"fmt"
"testing"
"time"
"github.com/coredns/coredns/plugin/traffic/xds/bootstrap"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"google.golang.org/grpc"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeserver"
)
func clientOpts(balancerName string) Options {
return Options{
Config: bootstrap.Config{
BalancerName: balancerName,
Creds: grpc.WithInsecure(),
NodeProto: &corepb.Node{},
},
// WithTimeout is deprecated. But we are OK to call it here from the
// test, so we clearly know that the dial failed.
DialOpts: []grpc.DialOption{grpc.WithTimeout(5 * time.Second), grpc.WithBlock()},
}
}
func TestNew(t *testing.T) {
fakeServer, cleanup, err := fakeserver.StartServer()
if err != nil {
t.Fatalf("Failed to start fake xDS server: %v", err)
}
defer cleanup()
tests := []struct {
name string
opts Options
wantErr bool
}{
{name: "empty-opts", opts: Options{}, wantErr: true},
{
name: "empty-balancer-name",
opts: Options{
Config: bootstrap.Config{
Creds: grpc.WithInsecure(),
NodeProto: &corepb.Node{},
},
},
wantErr: true,
},
{
name: "empty-dial-creds",
opts: Options{
Config: bootstrap.Config{
BalancerName: "dummy",
NodeProto: &corepb.Node{},
},
},
wantErr: true,
},
{
name: "empty-node-proto",
opts: Options{
Config: bootstrap.Config{
BalancerName: "dummy",
Creds: grpc.WithInsecure(),
},
},
wantErr: true,
},
{
name: "happy-case",
opts: clientOpts(fakeServer.Address),
wantErr: false,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
c, err := New(test.opts)
if err == nil {
defer c.Close()
}
if (err != nil) != test.wantErr {
t.Fatalf("New(%+v) = %v, wantErr: %v", test.opts, err, test.wantErr)
}
})
}
}
// TestWatchService tests the happy case of registering a watcher for
// service updates and receiving a good update.
func TestWatchService(t *testing.T) {
fakeServer, cleanup, err := fakeserver.StartServer()
if err != nil {
t.Fatalf("Failed to start fake xDS server: %v", err)
}
defer cleanup()
xdsClient, err := New(clientOpts(fakeServer.Address))
if err != nil {
t.Fatalf("New returned error: %v", err)
}
defer xdsClient.Close()
t.Log("Created an xdsClient...")
callbackCh := testutils.NewChannel()
cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) {
if err != nil {
callbackCh.Send(fmt.Errorf("xdsClient.WatchService returned error: %v", err))
return
}
if su.Cluster != goodClusterName1 {
callbackCh.Send(fmt.Errorf("got clusterName: %+v, want clusterName: %+v", su.Cluster, goodClusterName1))
return
}
callbackCh.Send(nil)
})
defer cancelWatch()
t.Log("Registered a watcher for service updates...")
// Make the fakeServer send LDS response.
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an LDS request")
}
fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1}
// Make the fakeServer send RDS response.
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an RDS request")
}
fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodRDSResponse1}
waitForNilErr(t, callbackCh)
}
// TestWatchServiceWithNoResponseFromServer tests the case where the
// xDS server does not respond to the requests being sent out as part of
// registering a service update watcher. The underlying v2Client will timeout
// and will send us an error.
func TestWatchServiceWithNoResponseFromServer(t *testing.T) {
fakeServer, cleanup, err := fakeserver.StartServer()
if err != nil {
t.Fatalf("Failed to start fake xDS server: %v", err)
}
defer cleanup()
xdsClient, err := New(clientOpts(fakeServer.Address))
if err != nil {
t.Fatalf("New returned error: %v", err)
}
defer xdsClient.Close()
t.Log("Created an xdsClient...")
oldWatchExpiryTimeout := defaultWatchExpiryTimeout
defaultWatchExpiryTimeout = 500 * time.Millisecond
defer func() {
defaultWatchExpiryTimeout = oldWatchExpiryTimeout
}()
callbackCh := testutils.NewChannel()
cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) {
if su.Cluster != "" {
callbackCh.Send(fmt.Errorf("got clusterName: %+v, want empty clusterName", su.Cluster))
return
}
if err == nil {
callbackCh.Send(errors.New("xdsClient.WatchService returned error non-nil error"))
return
}
callbackCh.Send(nil)
})
defer cancelWatch()
t.Log("Registered a watcher for service updates...")
// Wait for one request from the client, but send no reponses.
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an LDS request")
}
waitForNilErr(t, callbackCh)
}
// TestWatchServiceEmptyRDS tests the case where the underlying
// v2Client receives an empty RDS response.
func TestWatchServiceEmptyRDS(t *testing.T) {
fakeServer, cleanup, err := fakeserver.StartServer()
if err != nil {
t.Fatalf("Failed to start fake xDS server: %v", err)
}
defer cleanup()
xdsClient, err := New(clientOpts(fakeServer.Address))
if err != nil {
t.Fatalf("New returned error: %v", err)
}
defer xdsClient.Close()
t.Log("Created an xdsClient...")
oldWatchExpiryTimeout := defaultWatchExpiryTimeout
defaultWatchExpiryTimeout = 500 * time.Millisecond
defer func() {
defaultWatchExpiryTimeout = oldWatchExpiryTimeout
}()
callbackCh := testutils.NewChannel()
cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) {
if su.Cluster != "" {
callbackCh.Send(fmt.Errorf("got clusterName: %+v, want empty clusterName", su.Cluster))
return
}
if err == nil {
callbackCh.Send(errors.New("xdsClient.WatchService returned error non-nil error"))
return
}
callbackCh.Send(nil)
})
defer cancelWatch()
t.Log("Registered a watcher for service updates...")
// Make the fakeServer send LDS response.
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an LDS request")
}
fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1}
// Make the fakeServer send an empty RDS response.
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an RDS request")
}
fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: noVirtualHostsInRDSResponse}
waitForNilErr(t, callbackCh)
}
// TestWatchServiceWithClientClose tests the case where xDS responses are
// received after the client is closed, and we make sure that the registered
// watcher callback is not invoked.
func TestWatchServiceWithClientClose(t *testing.T) {
fakeServer, cleanup, err := fakeserver.StartServer()
if err != nil {
t.Fatalf("Failed to start fake xDS server: %v", err)
}
defer cleanup()
xdsClient, err := New(clientOpts(fakeServer.Address))
if err != nil {
t.Fatalf("New returned error: %v", err)
}
defer xdsClient.Close()
t.Log("Created an xdsClient...")
callbackCh := testutils.NewChannel()
cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) {
callbackCh.Send(errors.New("watcher callback invoked after client close"))
})
defer cancelWatch()
t.Log("Registered a watcher for service updates...")
// Make the fakeServer send LDS response.
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an LDS request")
}
fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1}
xdsClient.Close()
t.Log("Closing the xdsClient...")
// Push an RDS response from the fakeserver
fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodRDSResponse1}
if cbErr, err := callbackCh.Receive(); err != testutils.ErrRecvTimeout {
t.Fatal(cbErr)
}
}

View File

@@ -1,207 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package xds
import (
"fmt"
"net"
"strconv"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
typepb "github.com/envoyproxy/go-control-plane/envoy/type"
"github.com/golang/protobuf/ptypes"
)
// OverloadDropConfig contains the config to drop overloads.
type OverloadDropConfig struct {
Category string
Numerator uint32
Denominator uint32
}
// EndpointHealthStatus represents the health status of an endpoint.
type EndpointHealthStatus int32
const (
// EndpointHealthStatusUnknown represents HealthStatus UNKNOWN.
EndpointHealthStatusUnknown EndpointHealthStatus = iota
// EndpointHealthStatusHealthy represents HealthStatus HEALTHY.
EndpointHealthStatusHealthy
// EndpointHealthStatusUnhealthy represents HealthStatus UNHEALTHY.
EndpointHealthStatusUnhealthy
// EndpointHealthStatusDraining represents HealthStatus DRAINING.
EndpointHealthStatusDraining
// EndpointHealthStatusTimeout represents HealthStatus TIMEOUT.
EndpointHealthStatusTimeout
// EndpointHealthStatusDegraded represents HealthStatus DEGRADED.
EndpointHealthStatusDegraded
)
// Endpoint contains information of an endpoint.
type Endpoint struct {
Address string
HealthStatus EndpointHealthStatus
Weight uint32
}
// Locality contains information of a locality.
type Locality struct {
Endpoints []Endpoint
ID LocalityID
Priority uint32
Weight uint32
}
// EDSUpdate contains an EDS update.
type EDSUpdate struct {
Drops []OverloadDropConfig
Localities []Locality
}
func parseAddress(socketAddress *corepb.SocketAddress) string {
return net.JoinHostPort(socketAddress.GetAddress(), strconv.Itoa(int(socketAddress.GetPortValue())))
}
func parseDropPolicy(dropPolicy *xdspb.ClusterLoadAssignment_Policy_DropOverload) OverloadDropConfig {
percentage := dropPolicy.GetDropPercentage()
var (
numerator = percentage.GetNumerator()
denominator uint32
)
switch percentage.GetDenominator() {
case typepb.FractionalPercent_HUNDRED:
denominator = 100
case typepb.FractionalPercent_TEN_THOUSAND:
denominator = 10000
case typepb.FractionalPercent_MILLION:
denominator = 1000000
}
return OverloadDropConfig{
Category: dropPolicy.GetCategory(),
Numerator: numerator,
Denominator: denominator,
}
}
func parseEndpoints(lbEndpoints []*endpointpb.LbEndpoint) []Endpoint {
endpoints := make([]Endpoint, 0, len(lbEndpoints))
for _, lbEndpoint := range lbEndpoints {
endpoints = append(endpoints, Endpoint{
HealthStatus: EndpointHealthStatus(lbEndpoint.GetHealthStatus()),
Address: parseAddress(lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress()),
Weight: lbEndpoint.GetLoadBalancingWeight().GetValue(),
})
}
return endpoints
}
// ParseEDSRespProto turns EDS response proto message to EDSUpdate.
//
// This is temporarily exported to be used in eds balancer, before it switches
// to use xds client. TODO: unexport.
func ParseEDSRespProto(m *xdspb.ClusterLoadAssignment) (*EDSUpdate, error) {
ret := &EDSUpdate{}
for _, dropPolicy := range m.GetPolicy().GetDropOverloads() {
ret.Drops = append(ret.Drops, parseDropPolicy(dropPolicy))
}
priorities := make(map[uint32]struct{})
for _, locality := range m.Endpoints {
l := locality.GetLocality()
if l == nil {
return nil, fmt.Errorf("EDS response contains a locality without ID, locality: %+v", locality)
}
lid := LocalityID{Region: l.Region, Zone: l.Zone, SubZone: l.SubZone}
priority := locality.GetPriority()
priorities[priority] = struct{}{}
ret.Localities = append(ret.Localities, Locality{
ID: lid,
Endpoints: parseEndpoints(locality.GetLbEndpoints()),
Weight: locality.GetLoadBalancingWeight().GetValue(),
Priority: priority,
})
}
for i := 0; i < len(priorities); i++ {
if _, ok := priorities[uint32(i)]; !ok {
return nil, fmt.Errorf("priority %v missing (with different priorities %v received)", i, priorities)
}
}
return ret, nil
}
// ParseEDSRespProtoForTesting parses EDS response, and panic if parsing fails.
// This is used by EDS balancer tests.
//
// TODO: delete this. The EDS balancer should build an EDSUpdate directly,
// instead of building and parsing a proto message.
func ParseEDSRespProtoForTesting(m *xdspb.ClusterLoadAssignment) *EDSUpdate {
u, err := ParseEDSRespProto(m)
if err != nil {
panic(err.Error())
}
return u
}
func (v2c *v2Client) handleEDSResponse(resp *xdspb.DiscoveryResponse) error {
v2c.mu.Lock()
defer v2c.mu.Unlock()
wi := v2c.watchMap[edsURL]
if wi == nil {
return fmt.Errorf("xds: no EDS watcher found when handling EDS response: %+v", resp)
}
var returnUpdate *EDSUpdate
for _, r := range resp.GetResources() {
var resource ptypes.DynamicAny
if err := ptypes.UnmarshalAny(r, &resource); err != nil {
return fmt.Errorf("xds: failed to unmarshal resource in EDS response: %v", err)
}
cla, ok := resource.Message.(*xdspb.ClusterLoadAssignment)
if !ok {
return fmt.Errorf("xds: unexpected resource type: %T in EDS response", resource.Message)
}
if cla.GetClusterName() != wi.target[0] {
log.Warningf("xds: got uninteresting EDS resource, got %s, want %s", cla.GetClusterName(), wi.target[0])
// We won't validate the remaining resources. If one of the
// uninteresting ones is invalid, we will still ACK the response.
continue
}
u, err := ParseEDSRespProto(cla)
if err != nil {
return err
}
returnUpdate = u
// Break from the loop because the request resource is found. But
// this also means we won't validate the remaining resources. If one
// of the uninteresting ones is invalid, we will still ACK the
// response.
break
}
if returnUpdate != nil {
wi.stopTimer()
wi.callback.(edsCallback)(returnUpdate, nil)
}
return nil
}

View File

@@ -1,287 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package xds
import (
"errors"
"fmt"
"testing"
"time"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/golang/protobuf/ptypes"
anypb "github.com/golang/protobuf/ptypes/any"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/xds/internal/testutils"
)
func TestEDSParseRespProto(t *testing.T) {
tests := []struct {
name string
m *xdspb.ClusterLoadAssignment
want *EDSUpdate
wantErr bool
}{
{
name: "missing-priority",
m: func() *xdspb.ClusterLoadAssignment {
clab0 := NewClusterLoadAssignmentBuilder("test", nil)
clab0.AddLocality("locality-1", 1, 0, []string{"addr1:314"}, nil)
clab0.AddLocality("locality-2", 1, 2, []string{"addr2:159"}, nil)
return clab0.Build()
}(),
want: nil,
wantErr: true,
},
{
name: "missing-locality-ID",
m: func() *xdspb.ClusterLoadAssignment {
clab0 := NewClusterLoadAssignmentBuilder("test", nil)
clab0.AddLocality("", 1, 0, []string{"addr1:314"}, nil)
return clab0.Build()
}(),
want: nil,
wantErr: true,
},
{
name: "good",
m: func() *xdspb.ClusterLoadAssignment {
clab0 := NewClusterLoadAssignmentBuilder("test", nil)
clab0.AddLocality("locality-1", 1, 1, []string{"addr1:314"}, &AddLocalityOptions{
Health: []corepb.HealthStatus{corepb.HealthStatus_UNHEALTHY},
Weight: []uint32{271},
})
clab0.AddLocality("locality-2", 1, 0, []string{"addr2:159"}, &AddLocalityOptions{
Health: []corepb.HealthStatus{corepb.HealthStatus_DRAINING},
Weight: []uint32{828},
})
return clab0.Build()
}(),
want: &EDSUpdate{
Drops: nil,
Localities: []Locality{
{
Endpoints: []Endpoint{{
Address: "addr1:314",
HealthStatus: EndpointHealthStatusUnhealthy,
Weight: 271,
}},
ID: Locality{SubZone: "locality-1"},
Priority: 1,
Weight: 1,
},
{
Endpoints: []Endpoint{{
Address: "addr2:159",
HealthStatus: EndpointHealthStatusDraining,
Weight: 828,
}},
ID: Locality{SubZone: "locality-2"},
Priority: 0,
Weight: 1,
},
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := ParseEDSRespProto(tt.m)
if (err != nil) != tt.wantErr {
t.Errorf("ParseEDSRespProto() error = %v, wantErr %v", err, tt.wantErr)
return
}
if d := cmp.Diff(got, tt.want); d != "" {
t.Errorf("ParseEDSRespProto() got = %v, want %v, diff: %v", got, tt.want, d)
}
})
}
}
var (
badlyMarshaledEDSResponse = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: edsURL,
Value: []byte{1, 2, 3, 4},
},
},
TypeUrl: edsURL,
}
badResourceTypeInEDSResponse = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: httpConnManagerURL,
Value: marshaledConnMgr1,
},
},
TypeUrl: edsURL,
}
goodEDSResponse1 = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
func() *anypb.Any {
clab0 := NewClusterLoadAssignmentBuilder(goodEDSName, nil)
clab0.AddLocality("locality-1", 1, 1, []string{"addr1:314"}, nil)
clab0.AddLocality("locality-2", 1, 0, []string{"addr2:159"}, nil)
a, _ := ptypes.MarshalAny(clab0.Build())
return a
}(),
},
TypeUrl: edsURL,
}
goodEDSResponse2 = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
func() *anypb.Any {
clab0 := NewClusterLoadAssignmentBuilder("not-goodEDSName", nil)
clab0.AddLocality("locality-1", 1, 1, []string{"addr1:314"}, nil)
clab0.AddLocality("locality-2", 1, 0, []string{"addr2:159"}, nil)
a, _ := ptypes.MarshalAny(clab0.Build())
return a
}(),
},
TypeUrl: edsURL,
}
)
func TestEDSHandleResponse(t *testing.T) {
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
tests := []struct {
name string
edsResponse *xdspb.DiscoveryResponse
wantErr bool
wantUpdate *EDSUpdate
wantUpdateErr bool
}{
// Any in resource is badly marshaled.
{
name: "badly-marshaled_response",
edsResponse: badlyMarshaledEDSResponse,
wantErr: true,
wantUpdate: nil,
wantUpdateErr: false,
},
// Response doesn't contain resource with the right type.
{
name: "no-config-in-response",
edsResponse: badResourceTypeInEDSResponse,
wantErr: true,
wantUpdate: nil,
wantUpdateErr: false,
},
// Response contains one uninteresting ClusterLoadAssignment.
{
name: "one-uninterestring-assignment",
edsResponse: goodEDSResponse2,
wantErr: false,
wantUpdate: nil,
wantUpdateErr: false,
},
// Response contains one good ClusterLoadAssignment.
{
name: "one-good-assignment",
edsResponse: goodEDSResponse1,
wantErr: false,
wantUpdate: &EDSUpdate{
Localities: []Locality{
{
Endpoints: []Endpoint{{Address: "addr1:314"}},
ID: Locality{SubZone: "locality-1"},
Priority: 1,
Weight: 1,
},
{
Endpoints: []Endpoint{{Address: "addr2:159"}},
ID: Locality{SubZone: "locality-2"},
Priority: 0,
Weight: 1,
},
},
},
wantUpdateErr: false,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testWatchHandle(t, &watchHandleTestcase{
responseToHandle: test.edsResponse,
wantHandleErr: test.wantErr,
wantUpdate: test.wantUpdate,
wantUpdateErr: test.wantUpdateErr,
edsWatch: v2c.watchEDS,
watchReqChan: fakeServer.XDSRequestChan,
handleXDSResp: v2c.handleEDSResponse,
})
})
}
}
// TestEDSHandleResponseWithoutWatch tests the case where the v2Client
// receives an EDS response without a registered EDS watcher.
func TestEDSHandleResponseWithoutWatch(t *testing.T) {
_, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
if v2c.handleEDSResponse(goodEDSResponse1) == nil {
t.Fatal("v2c.handleEDSResponse() succeeded, should have failed")
}
}
func TestEDSWatchExpiryTimer(t *testing.T) {
oldWatchExpiryTimeout := defaultWatchExpiryTimeout
defaultWatchExpiryTimeout = 500 * time.Millisecond
defer func() {
defaultWatchExpiryTimeout = oldWatchExpiryTimeout
}()
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")
callbackCh := testutils.NewChannel()
v2c.watchEDS(goodRouteName1, func(u *EDSUpdate, err error) {
t.Logf("Received callback with edsUpdate {%+v} and error {%v}", u, err)
if u != nil {
callbackCh.Send(fmt.Errorf("received EDSUpdate %v in edsCallback, wanted nil", u))
}
if err == nil {
callbackCh.Send(errors.New("received nil error in edsCallback"))
}
callbackCh.Send(nil)
})
// Wait till the request makes it to the fakeServer. This ensures that
// the watch request has been processed by the v2Client.
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an CDS request")
}
waitForNilErr(t, callbackCh)
}

View File

@@ -1,128 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// All structs/functions in this file should be unexported. They are used in EDS
// balancer tests now, to generate test inputs. Eventually, EDS balancer tests
// should generate EDSUpdate directly, instead of generating and parsing the
// proto message.
// TODO: unexported everything in this file.
package xds
import (
"fmt"
"net"
"strconv"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
typepb "github.com/envoyproxy/go-control-plane/envoy/type"
wrapperspb "github.com/golang/protobuf/ptypes/wrappers"
)
// ClusterLoadAssignmentBuilder builds a ClusterLoadAssignment, aka EDS
// response.
type ClusterLoadAssignmentBuilder struct {
v *xdspb.ClusterLoadAssignment
}
// NewClusterLoadAssignmentBuilder creates a ClusterLoadAssignmentBuilder.
func NewClusterLoadAssignmentBuilder(clusterName string, dropPercents []uint32) *ClusterLoadAssignmentBuilder {
var drops []*xdspb.ClusterLoadAssignment_Policy_DropOverload
for i, d := range dropPercents {
drops = append(drops, &xdspb.ClusterLoadAssignment_Policy_DropOverload{
Category: fmt.Sprintf("test-drop-%d", i),
DropPercentage: &typepb.FractionalPercent{
Numerator: d,
Denominator: typepb.FractionalPercent_HUNDRED,
},
})
}
return &ClusterLoadAssignmentBuilder{
v: &xdspb.ClusterLoadAssignment{
ClusterName: clusterName,
Policy: &xdspb.ClusterLoadAssignment_Policy{
DropOverloads: drops,
},
},
}
}
// AddLocalityOptions contains options when adding locality to the builder.
type AddLocalityOptions struct {
Health []corepb.HealthStatus
Weight []uint32
}
// AddLocality adds a locality to the builder.
func (clab *ClusterLoadAssignmentBuilder) AddLocality(subzone string, weight uint32, priority uint32, addrsWithPort []string, opts *AddLocalityOptions) {
var lbEndPoints []*endpointpb.LbEndpoint
for i, a := range addrsWithPort {
host, portStr, err := net.SplitHostPort(a)
if err != nil {
panic("failed to split " + a)
}
port, err := strconv.Atoi(portStr)
if err != nil {
panic("failed to atoi " + portStr)
}
lbe := &endpointpb.LbEndpoint{
HostIdentifier: &endpointpb.LbEndpoint_Endpoint{
Endpoint: &endpointpb.Endpoint{
Address: &corepb.Address{
Address: &corepb.Address_SocketAddress{
SocketAddress: &corepb.SocketAddress{
Protocol: corepb.SocketAddress_TCP,
Address: host,
PortSpecifier: &corepb.SocketAddress_PortValue{
PortValue: uint32(port)}}}}}},
}
if opts != nil {
if i < len(opts.Health) {
lbe.HealthStatus = opts.Health[i]
}
if i < len(opts.Weight) {
lbe.LoadBalancingWeight = &wrapperspb.UInt32Value{Value: opts.Weight[i]}
}
}
lbEndPoints = append(lbEndPoints, lbe)
}
var localityID *corepb.Locality
if subzone != "" {
localityID = &corepb.Locality{
Region: "",
Zone: "",
SubZone: subzone,
}
}
clab.v.Endpoints = append(clab.v.Endpoints, &endpointpb.LocalityLbEndpoints{
Locality: localityID,
LbEndpoints: lbEndPoints,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: weight},
Priority: priority,
})
}
// Build builds ClusterLoadAssignment.
func (clab *ClusterLoadAssignmentBuilder) Build() *xdspb.ClusterLoadAssignment {
return clab.v
}

View File

@@ -1,50 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package xds
import (
"fmt"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
)
// Locality is xds.Locality without XXX fields, so it can be used as map
// keys.
//
// xds.Locality cannot be map keys because one of the XXX fields is a slice.
//
// This struct should only be used as map keys. Use the proto message directly
// in all other places.
type LocalityID struct {
Region string
Zone string
SubZone string
}
func (l LocalityID) String() string {
return fmt.Sprintf("%s-%s-%s", l.Region, l.Zone, l.SubZone)
}
// ToProto convert Locality to the proto representation.
func (l LocalityID) ToProto() *corepb.Locality {
return &corepb.Locality{
Region: l.Region,
Zone: l.Zone,
SubZone: l.SubZone,
}
}

View File

@@ -1,7 +0,0 @@
package xds
import (
clog "github.com/coredns/coredns/plugin/pkg/log"
)
var log = clog.NewWithPlugin("traffic")

View File

@@ -1,169 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package xds
import (
"reflect"
"testing"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeserver"
)
type watchHandleTestcase struct {
responseToHandle *xdspb.DiscoveryResponse
wantHandleErr bool
wantUpdate interface{}
wantUpdateErr bool
// Only one of the following should be non-nil. The one corresponding with
// typeURL will be called.
ldsWatch func(target string, ldsCb ldsCallback) (cancel func())
rdsWatch func(routeName string, rdsCb rdsCallback) (cancel func())
cdsWatch func(clusterName string, cdsCb cdsCallback) (cancel func())
edsWatch func(clusterName string, edsCb edsCallback) (cancel func())
watchReqChan *testutils.Channel // The request sent for watch will be sent to this channel.
handleXDSResp func(response *xdspb.DiscoveryResponse) error
}
// testWatchHandle is called to test response handling for each xDS.
//
// It starts the xDS watch as configured in test, waits for the fake xds server
// to receive the request (so watch callback is installed), and calls
// handleXDSResp with responseToHandle (if it's set). It then compares the
// update received by watch callback with the expected results.
func testWatchHandle(t *testing.T, test *watchHandleTestcase) {
type updateErr struct {
u interface{}
err error
}
gotUpdateCh := testutils.NewChannel()
var cancelWatch func()
// Register the watcher, this will also trigger the v2Client to send the xDS
// request.
switch {
case test.ldsWatch != nil:
cancelWatch = test.ldsWatch(goodLDSTarget1, func(u ldsUpdate, err error) {
t.Logf("in v2c.watchLDS callback, ldsUpdate: %+v, err: %v", u, err)
gotUpdateCh.Send(updateErr{u, err})
})
case test.rdsWatch != nil:
cancelWatch = test.rdsWatch(goodRouteName1, func(u rdsUpdate, err error) {
t.Logf("in v2c.watchRDS callback, rdsUpdate: %+v, err: %v", u, err)
gotUpdateCh.Send(updateErr{u, err})
})
case test.cdsWatch != nil:
cancelWatch = test.cdsWatch(clusterName1, func(u CDSUpdate, err error) {
t.Logf("in v2c.watchCDS callback, cdsUpdate: %+v, err: %v", u, err)
gotUpdateCh.Send(updateErr{u, err})
})
case test.edsWatch != nil:
cancelWatch = test.edsWatch(goodEDSName, func(u *EDSUpdate, err error) {
t.Logf("in v2c.watchEDS callback, edsUpdate: %+v, err: %v", u, err)
gotUpdateCh.Send(updateErr{*u, err})
})
default:
t.Fatalf("no watch() is set")
}
defer cancelWatch()
// Wait till the request makes it to the fakeServer. This ensures that
// the watch request has been processed by the v2Client.
if _, err := test.watchReqChan.Receive(); err != nil {
t.Fatalf("Timeout waiting for an xDS request: %v", err)
}
// Directly push the response through a call to handleXDSResp. This bypasses
// the fakeServer, so it's only testing the handle logic. Client response
// processing is covered elsewhere.
//
// Also note that this won't trigger ACK, so there's no need to clear the
// request channel afterwards.
if err := test.handleXDSResp(test.responseToHandle); (err != nil) != test.wantHandleErr {
t.Fatalf("v2c.handleRDSResponse() returned err: %v, wantErr: %v", err, test.wantHandleErr)
}
// If the test doesn't expect the callback to be invoked, verify that no
// update or error is pushed to the callback.
//
// Cannot directly compare test.wantUpdate with nil (typed vs non-typed nil:
// https://golang.org/doc/faq#nil_error).
if c := test.wantUpdate; c == nil || (reflect.ValueOf(c).Kind() == reflect.Ptr && reflect.ValueOf(c).IsNil()) {
update, err := gotUpdateCh.Receive()
if err == testutils.ErrRecvTimeout {
return
}
t.Fatalf("Unexpected update: +%v", update)
}
wantUpdate := reflect.ValueOf(test.wantUpdate).Elem().Interface()
uErr, err := gotUpdateCh.Receive()
if err == testutils.ErrRecvTimeout {
t.Fatal("Timeout expecting xDS update")
}
gotUpdate := uErr.(updateErr).u
opt := cmp.AllowUnexported(rdsUpdate{}, ldsUpdate{}, CDSUpdate{}, EDSUpdate{})
if diff := cmp.Diff(gotUpdate, wantUpdate, opt); diff != "" {
t.Fatalf("got update : %+v, want %+v, diff: %s", gotUpdate, wantUpdate, diff)
}
gotUpdateErr := uErr.(updateErr).err
if (gotUpdateErr != nil) != test.wantUpdateErr {
t.Fatalf("got xDS update error {%v}, wantErr: %v", gotUpdateErr, test.wantUpdateErr)
}
}
// startServerAndGetCC starts a fake XDS server and also returns a ClientConn
// connected to it.
func startServerAndGetCC(t *testing.T) (*fakeserver.Server, *grpc.ClientConn, func()) {
t.Helper()
fs, sCleanup, err := fakeserver.StartServer()
if err != nil {
t.Fatalf("Failed to start fake xDS server: %v", err)
}
cc, ccCleanup, err := fs.XDSClientConn()
if err != nil {
sCleanup()
t.Fatalf("Failed to get a clientConn to the fake xDS server: %v", err)
}
return fs, cc, func() {
sCleanup()
ccCleanup()
}
}
// waitForNilErr waits for a nil error value to be received on the
// provided channel.
func waitForNilErr(t *testing.T, ch *testutils.Channel) {
t.Helper()
val, err := ch.Receive()
if err == testutils.ErrRecvTimeout {
t.Fatalf("Timeout expired when expecting update")
}
if val != nil {
if cbErr := val.(error); cbErr != nil {
t.Fatal(cbErr)
}
}
}

View File

@@ -1,83 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xds
import (
"time"
adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
)
type adsStream adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient
const (
cdsURL = "type.googleapis.com/envoy.api.v2.Cluster"
edsURL = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"
)
// watchState is an enum to represent the state of a watch call.
type watchState int
const (
watchEnqueued watchState = iota
watchCancelled
watchStarted
)
// watchInfo holds all the information about a watch call.
type watchInfo struct {
typeURL string
target []string
state watchState
callback interface{}
expiryTimer *time.Timer
}
// cancel marks the state as cancelled, and also stops the expiry timer.
func (wi *watchInfo) cancel() {
wi.state = watchCancelled
if wi.expiryTimer != nil {
wi.expiryTimer.Stop()
}
}
// stopTimer stops the expiry timer without cancelling the watch.
func (wi *watchInfo) stopTimer() {
if wi.expiryTimer != nil {
wi.expiryTimer.Stop()
}
}
type ackInfo struct {
typeURL string
version string // Nack if version is an empty string.
nonce string
}
// CDSUpdate contains information from a received CDS response, which is of
// interest to the registered CDS watcher.
type CDSUpdate struct {
// ServiceName is the service name corresponding to the clusterName which
// is being watched for through CDS.
ServiceName string
}
type cdsCallback func(CDSUpdate, error)
type edsCallback func(*EDSUpdate, error)

View File

@@ -1,440 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xds
import (
"context"
"fmt"
"sync"
"time"
"github.com/coredns/coredns/plugin/traffic/xds/buffer"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
"google.golang.org/grpc"
)
// The value chosen here is based on the default value of the
// initial_fetch_timeout field in corepb.ConfigSource proto.
var defaultWatchExpiryTimeout = 15 * time.Second
// v2Client performs the actual xDS RPCs using the xDS v2 API. It creates a
// single ADS stream on which the different types of xDS requests and responses
// are multiplexed.
// The reason for splitting this out from the top level xdsClient object is
// because there is already an xDS v3Aplha API in development. If and when we
// want to switch to that, this separation will ease that process.
type v2Client struct {
ctx context.Context
cancelCtx context.CancelFunc
// ClientConn to the xDS gRPC server. Owned by the parent xdsClient.
cc *grpc.ClientConn
nodeProto *corepb.Node
backoff func(int) time.Duration
// sendCh in the channel onto which watchInfo objects are pushed by the
// watch API, and it is read and acted upon by the send() goroutine.
sendCh *buffer.Unbounded
mu sync.Mutex
// Message specific watch infos, protected by the above mutex. These are
// written to, after successfully reading from the update channel, and are
// read from when recovering from a broken stream to resend the xDS
// messages. When the user of this client object cancels a watch call,
// these are set to nil. All accesses to the map protected and any value
// inside the map should be protected with the above mutex.
watchMap map[string]*watchInfo
// ackMap contains the version that was acked (the version in the ack
// request that was sent on wire). The key is typeURL, the value is the
// version string, becaues the versions for different resource types
// should be independent.
ackMap map[string]string
// rdsCache maintains a mapping of {clusterName --> CDSUpdate} from
// validated cluster configurations received in CDS responses. We cache all
// valid cluster configurations, whether or not we are interested in them
// when we received them (because we could become interested in them in the
// future and the server wont send us those resources again). This is only
// to support legacy management servers that do not honor the
// resource_names field. As per the latest spec, the server should resend
// the response when the request changes, even if it had sent the same
// resource earlier (when not asked for). Protected by the above mutex.
cdsCache map[string]CDSUpdate
}
// newV2Client creates a new v2Client initialized with the passed arguments.
func newV2Client(cc *grpc.ClientConn, nodeProto *corepb.Node, backoff func(int) time.Duration) *v2Client {
v2c := &v2Client{
cc: cc,
nodeProto: nodeProto,
backoff: backoff,
sendCh: buffer.NewUnbounded(),
watchMap: make(map[string]*watchInfo),
ackMap: make(map[string]string),
cdsCache: make(map[string]CDSUpdate),
}
v2c.ctx, v2c.cancelCtx = context.WithCancel(context.Background())
go v2c.run()
return v2c
}
// close cleans up resources and goroutines allocated by this client.
func (v2c *v2Client) close() {
v2c.cancelCtx()
}
// run starts an ADS stream (and backs off exponentially, if the previous
// stream failed without receiving a single reply) and runs the sender and
// receiver routines to send and receive data from the stream respectively.
func (v2c *v2Client) run() {
retries := 0
for {
select {
case <-v2c.ctx.Done():
return
default:
}
if retries != 0 {
t := time.NewTimer(v2c.backoff(retries))
select {
case <-t.C:
case <-v2c.ctx.Done():
if !t.Stop() {
<-t.C
}
return
}
}
retries++
cli := adsgrpc.NewAggregatedDiscoveryServiceClient(v2c.cc)
stream, err := cli.StreamAggregatedResources(v2c.ctx) //, grpc.WaitForReady(true))
if err != nil {
log.Infof("xds: ADS stream creation failed: %v", err)
continue
}
// send() could be blocked on reading updates from the different update
// channels when it is not actually sending out messages. So, we need a
// way to break out of send() when recv() returns. This done channel is
// used to for that purpose.
done := make(chan struct{})
go v2c.send(stream, done)
if v2c.recv(stream) {
retries = 0
}
close(done)
}
}
// endRequest sends a request for provided typeURL and resource on the provided
// stream.
//
// version is the ack version to be sent with the request
// - If this is the new request (not an ack/nack), version will be an empty
// string
// - If this is an ack, version will be the version from the response
// - If this is a nack, version will be the previous acked version (from
// ackMap). If there was no ack before, it will be an empty string
func (v2c *v2Client) sendRequest(stream adsStream, resourceNames []string, typeURL, version, nonce string) bool {
req := &xdspb.DiscoveryRequest{
Node: v2c.nodeProto,
TypeUrl: typeURL,
ResourceNames: resourceNames,
VersionInfo: version,
ResponseNonce: nonce,
// TODO: populate ErrorDetails for nack.
}
println("v2: sendrequest", typeURL)
if err := stream.Send(req); err != nil {
log.Warningf("xds: request (type %s) for resource %v failed: %v", typeURL, resourceNames, err)
return false
}
return true
}
// sendExisting sends out xDS requests for registered watchers when recovering
// from a broken stream.
//
// We call stream.Send() here with the lock being held. It should be OK to do
// that here because the stream has just started and Send() usually returns
// quickly (once it pushes the message onto the transport layer) and is only
// ever blocked if we don't have enough flow control quota.
func (v2c *v2Client) sendExisting(stream adsStream) bool {
println("v2: sendexisting")
v2c.mu.Lock()
defer v2c.mu.Unlock()
// Reset the ack versions when the stream restarts.
v2c.ackMap = make(map[string]string)
for typeURL, wi := range v2c.watchMap {
if !v2c.sendRequest(stream, wi.target, typeURL, "", "") {
return false
}
}
return true
}
// processWatchInfo pulls the fields needed by the request from a watchInfo.
//
// It also calls callback with cached response, and updates the watch map in
// v2c.
//
// If the watch was already canceled, it returns false for send
func (v2c *v2Client) processWatchInfo(t *watchInfo) (target []string, typeURL, version, nonce string, send bool) {
v2c.mu.Lock()
defer v2c.mu.Unlock()
if t.state == watchCancelled {
return // This returns all zero values, and false for send.
}
t.state = watchStarted
send = true
typeURL = t.typeURL
target = t.target
v2c.checkCacheAndUpdateWatchMap(t)
// TODO: if watch is called again with the same resource names,
// there's no need to send another request.
//
// TODO: should we reset version (for ack) when a new watch is
// started? Or do this only if the resource names are different
// (so we send a new request)?
return
}
// processAckInfo pulls the fields needed by the ack request from a ackInfo.
//
// If no active watch is found for this ack, it returns false for send.
func (v2c *v2Client) processAckInfo(t *ackInfo) (target []string, typeURL, version, nonce string, send bool) {
typeURL = t.typeURL
v2c.mu.Lock()
defer v2c.mu.Unlock()
wi, ok := v2c.watchMap[typeURL]
if !ok {
// We don't send the request ack if there's no active watch (this can be
// either the server sends responses before any request, or the watch is
// canceled while the ackInfo is in queue), because there's no resource
// name. And if we send a request with empty resource name list, the
// server may treat it as a wild card and send us everything.
log.Warningf("xds: ack (type %s) not sent because there's no active watch for the type", typeURL)
return // This returns all zero values, and false for send.
}
send = true
version = t.version
nonce = t.nonce
target = wi.target
if version == "" {
// This is a nack, get the previous acked version.
version = v2c.ackMap[typeURL]
// version will still be an empty string if typeURL isn't
// found in ackMap, this can happen if there wasn't any ack
// before.
} else {
v2c.ackMap[typeURL] = version
}
return
}
// send reads watch infos from update channel and sends out actual xDS requests
// on the provided ADS stream.
func (v2c *v2Client) send(stream adsStream, done chan struct{}) {
if !v2c.sendExisting(stream) {
println("not existing stream")
return
}
println("in send")
for {
select {
case <-v2c.ctx.Done():
return
case u := <-v2c.sendCh.Get():
v2c.sendCh.Load()
var (
target []string
typeURL, version, nonce string
send bool
)
switch t := u.(type) {
case *watchInfo:
println("watchInfo")
target, typeURL, version, nonce, send = v2c.processWatchInfo(t)
println(target, typeURL, version, nonce, send)
fmt.Printf("%+v\n", target)
case *ackInfo:
println("ackInfo")
target, typeURL, version, nonce, send = v2c.processAckInfo(t)
}
if !send {
continue
}
if !v2c.sendRequest(stream, target, typeURL, version, nonce) {
return
}
case <-done:
return
}
}
}
// recv receives xDS responses on the provided ADS stream and branches out to
// message specific handlers.
func (v2c *v2Client) recv(stream adsStream) bool {
println("v2 recv")
success := false
for {
println("WATIIGNM")
resp, err := stream.Recv()
// TODO: call watch callbacks with error when stream is broken.
println("DONE")
if err != nil {
log.Warningf("xds: ADS stream recv failed: %v", err)
return success
}
println("RECEIVING")
var respHandleErr error
switch resp.GetTypeUrl() {
case cdsURL:
println("CDS")
respHandleErr = v2c.handleCDSResponse(resp)
case edsURL:
println("EDS")
respHandleErr = v2c.handleEDSResponse(resp)
default:
log.Warningf("xds: unknown response URL type: %v", resp.GetTypeUrl())
continue
}
typeURL := resp.GetTypeUrl()
if respHandleErr != nil {
log.Warningf("xds: response (type %s) handler failed: %v", typeURL, respHandleErr)
v2c.sendCh.Put(&ackInfo{
typeURL: typeURL,
version: "",
nonce: resp.GetNonce(),
})
continue
}
v2c.sendCh.Put(&ackInfo{
typeURL: typeURL,
version: resp.GetVersionInfo(),
nonce: resp.GetNonce(),
})
success = true
}
}
// watchCDS registers an CDS watcher for the provided clusterName. Updates
// corresponding to received CDS responses will be pushed to the provided
// callback. The caller can cancel the watch by invoking the returned cancel
// function.
// The provided callback should not block or perform any expensive operations
// or call other methods of the v2Client object.
func (v2c *v2Client) watchCDS(clusterName string, cdsCb cdsCallback) (cancel func()) {
return v2c.watch(&watchInfo{
typeURL: cdsURL,
target: []string{clusterName},
callback: cdsCb,
})
}
// watchEDS registers an EDS watcher for the provided clusterName. Updates
// corresponding to received EDS responses will be pushed to the provided
// callback. The caller can cancel the watch by invoking the returned cancel
// function.
// The provided callback should not block or perform any expensive operations
// or call other methods of the v2Client object.
func (v2c *v2Client) watchEDS(clusterName string, edsCb edsCallback) (cancel func()) {
return v2c.watch(&watchInfo{
typeURL: edsURL,
target: []string{clusterName},
callback: edsCb,
})
// TODO: Once a registered EDS watch is cancelled, we should send an EDS
// request with no resources. This will let the server know that we are no
// longer interested in this resource.
}
func (v2c *v2Client) watch(wi *watchInfo) (cancel func()) {
v2c.sendCh.Put(wi)
return func() {
v2c.mu.Lock()
defer v2c.mu.Unlock()
if wi.state == watchEnqueued {
wi.state = watchCancelled
return
}
v2c.watchMap[wi.typeURL].cancel()
delete(v2c.watchMap, wi.typeURL)
// TODO: should we reset ack version string when cancelling the watch?
}
}
// checkCacheAndUpdateWatchMap is called when a new watch call is handled in
// send(). If an existing watcher is found, its expiry timer is stopped. If the
// watchInfo to be added to the watchMap is found in the cache, the watcher
// callback is immediately invoked.
//
// Caller should hold v2c.mu
func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) {
if existing := v2c.watchMap[wi.typeURL]; existing != nil {
println("cancel")
existing.cancel()
}
v2c.watchMap[wi.typeURL] = wi
switch wi.typeURL {
// We need to grab the lock inside of the expiryTimer's afterFunc because
// we need to access the watchInfo, which is stored in the watchMap.
case cdsURL:
clusterName := wi.target[0]
println("CDS URLS", clusterName)
if update, ok := v2c.cdsCache[clusterName]; ok {
println("UPDATE SEEN, ok")
var err error
if v2c.watchMap[cdsURL] == nil {
err = fmt.Errorf("xds: no CDS watcher found when handling CDS watch for cluster {%v} from cache", clusterName)
}
wi.callback.(cdsCallback)(update, err)
return
}
wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
v2c.mu.Lock()
wi.callback.(cdsCallback)(CDSUpdate{}, fmt.Errorf("xds: CDS target %s not found", wi.target))
v2c.mu.Unlock()
})
case edsURL:
wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
v2c.mu.Lock()
wi.callback.(edsCallback)(nil, fmt.Errorf("xds: EDS target %s not found", wi.target))
v2c.mu.Unlock()
})
}
}

View File

@@ -1,263 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package xds
import (
"fmt"
"strconv"
"testing"
"time"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/golang/protobuf/proto"
anypb "github.com/golang/protobuf/ptypes/any"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeserver"
)
// compareXDSRequest reads requests from channel, compare it with want.
func compareXDSRequest(ch *testutils.Channel, want *xdspb.DiscoveryRequest, version, nonce string) error {
val, err := ch.Receive()
if err != nil {
return err
}
req := val.(*fakeserver.Request)
if req.Err != nil {
return fmt.Errorf("unexpected error from request: %v", req.Err)
}
wantClone := proto.Clone(want).(*xdspb.DiscoveryRequest)
wantClone.VersionInfo = version
wantClone.ResponseNonce = nonce
if !cmp.Equal(req.Req, wantClone, cmp.Comparer(proto.Equal)) {
return fmt.Errorf("received request different from want, diff: %s", cmp.Diff(req.Req, wantClone))
}
return nil
}
func sendXDSRespWithVersion(ch chan<- *fakeserver.Response, respWithoutVersion *xdspb.DiscoveryResponse, version int) (nonce string) {
respToSend := proto.Clone(respWithoutVersion).(*xdspb.DiscoveryResponse)
respToSend.VersionInfo = strconv.Itoa(version)
nonce = strconv.Itoa(int(time.Now().UnixNano()))
respToSend.Nonce = nonce
ch <- &fakeserver.Response{Resp: respToSend}
return
}
// startXDS calls watch to send the first request. It then sends a good response
// and checks for ack.
func startXDS(t *testing.T, xdsname string, v2c *v2Client, reqChan *testutils.Channel, req *xdspb.DiscoveryRequest) *testutils.Channel {
callbackCh := testutils.NewChannel()
switch xdsname {
case "LDS":
v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) {
t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err)
callbackCh.Send(struct{}{})
})
case "RDS":
v2c.watchRDS(goodRouteName1, func(u rdsUpdate, err error) {
t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err)
callbackCh.Send(struct{}{})
})
case "CDS":
v2c.watchCDS(goodClusterName1, func(u CDSUpdate, err error) {
t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err)
callbackCh.Send(struct{}{})
})
case "EDS":
v2c.watchEDS(goodEDSName, func(u *EDSUpdate, err error) {
t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err)
callbackCh.Send(struct{}{})
})
}
if err := compareXDSRequest(reqChan, req, "", ""); err != nil {
t.Fatalf("Failed to receive %s request: %v", xdsname, err)
}
t.Logf("FakeServer received %s request...", xdsname)
return callbackCh
}
// sendGoodResp sends the good response, with the given version, and a random
// nonce.
//
// It also waits and checks that the ack request contains the given version, and
// the generated nonce.
func sendGoodResp(t *testing.T, xdsname string, fakeServer *fakeserver.Server, version int, goodResp *xdspb.DiscoveryResponse, wantReq *xdspb.DiscoveryRequest, callbackCh *testutils.Channel) {
nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, goodResp, version)
t.Logf("Good %s response pushed to fakeServer...", xdsname)
if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(version), nonce); err != nil {
t.Errorf("Failed to receive %s request: %v", xdsname, err)
}
t.Logf("Good %s response acked", xdsname)
if _, err := callbackCh.Receive(); err != nil {
t.Errorf("Timeout when expecting %s update", xdsname)
}
t.Logf("Good %s response callback executed", xdsname)
}
// sendBadResp sends a bad response with the given version. This response will
// be nacked, so we expect a request with the previous version (version-1).
//
// But the nonce in request should be the new nonce.
func sendBadResp(t *testing.T, xdsname string, fakeServer *fakeserver.Server, version int, wantReq *xdspb.DiscoveryRequest) {
var typeURL string
switch xdsname {
case "LDS":
typeURL = ldsURL
case "RDS":
typeURL = rdsURL
case "CDS":
typeURL = cdsURL
case "EDS":
typeURL = edsURL
}
nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{{}},
TypeUrl: typeURL,
}, version)
t.Logf("Bad %s response pushed to fakeServer...", xdsname)
if err := compareXDSRequest(fakeServer.XDSRequestChan, wantReq, strconv.Itoa(version-1), nonce); err != nil {
t.Errorf("Failed to receive %s request: %v", xdsname, err)
}
t.Logf("Bad %s response nacked", xdsname)
}
// TestV2ClientAck verifies that valid responses are acked, and invalid ones
// are nacked.
//
// This test also verifies the version for different types are independent.
func TestV2ClientAck(t *testing.T) {
var (
versionLDS = 1000
versionRDS = 2000
versionCDS = 3000
versionEDS = 4000
)
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")
// Start the watch, send a good response, and check for ack.
cbLDS := startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest)
sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS)
versionLDS++
cbRDS := startXDS(t, "RDS", v2c, fakeServer.XDSRequestChan, goodRDSRequest)
sendGoodResp(t, "RDS", fakeServer, versionRDS, goodRDSResponse1, goodRDSRequest, cbRDS)
versionRDS++
cbCDS := startXDS(t, "CDS", v2c, fakeServer.XDSRequestChan, goodCDSRequest)
sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS)
versionCDS++
cbEDS := startXDS(t, "EDS", v2c, fakeServer.XDSRequestChan, goodEDSRequest)
sendGoodResp(t, "EDS", fakeServer, versionEDS, goodEDSResponse1, goodEDSRequest, cbEDS)
versionEDS++
// Send a bad response, and check for nack.
sendBadResp(t, "LDS", fakeServer, versionLDS, goodLDSRequest)
versionLDS++
sendBadResp(t, "RDS", fakeServer, versionRDS, goodRDSRequest)
versionRDS++
sendBadResp(t, "CDS", fakeServer, versionCDS, goodCDSRequest)
versionCDS++
sendBadResp(t, "EDS", fakeServer, versionEDS, goodEDSRequest)
versionEDS++
// send another good response, and check for ack, with the new version.
sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS)
versionLDS++
sendGoodResp(t, "RDS", fakeServer, versionRDS, goodRDSResponse1, goodRDSRequest, cbRDS)
versionRDS++
sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS)
versionCDS++
sendGoodResp(t, "EDS", fakeServer, versionEDS, goodEDSResponse1, goodEDSRequest, cbEDS)
versionEDS++
}
// Test when the first response is invalid, and is nacked, the nack requests
// should have an empty version string.
func TestV2ClientAckFirstIsNack(t *testing.T) {
var versionLDS = 1000
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")
// Start the watch, send a good response, and check for ack.
cbLDS := startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest)
nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{{}},
TypeUrl: ldsURL,
}, versionLDS)
t.Logf("Bad response pushed to fakeServer...")
// The expected version string is an empty string, because this is the first
// response, and it's nacked (so there's no previous ack version).
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, "", nonce); err != nil {
t.Errorf("Failed to receive request: %v", err)
}
t.Logf("Bad response nacked")
versionLDS++
sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS)
versionLDS++
}
// Test when a nack is sent after a new watch, we nack with the previous acked
// version (instead of resetting to empty string).
func TestV2ClientAckNackAfterNewWatch(t *testing.T) {
var versionLDS = 1000
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")
// Start the watch, send a good response, and check for ack.
cbLDS := startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest)
sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS)
versionLDS++
// Start a new watch.
cbLDS = startXDS(t, "LDS", v2c, fakeServer.XDSRequestChan, goodLDSRequest)
// This is an invalid response after the new watch.
nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{{}},
TypeUrl: ldsURL,
}, versionLDS)
t.Logf("Bad response pushed to fakeServer...")
// The expected version string is the previous acked version.
if err := compareXDSRequest(fakeServer.XDSRequestChan, goodLDSRequest, strconv.Itoa(versionLDS-1), nonce); err != nil {
t.Errorf("Failed to receive request: %v", err)
}
t.Logf("Bad response nacked")
versionLDS++
sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS)
versionLDS++
}

View File

@@ -1,444 +0,0 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xds
import (
"errors"
"testing"
"time"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeserver"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
basepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
routepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/route"
httppb "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/http_connection_manager/v2"
listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v2"
anypb "github.com/golang/protobuf/ptypes/any"
structpb "github.com/golang/protobuf/ptypes/struct"
)
const (
defaultTestTimeout = 1 * time.Second
goodLDSTarget1 = "lds.target.good:1111"
goodLDSTarget2 = "lds.target.good:2222"
goodRouteName1 = "GoodRouteConfig1"
goodRouteName2 = "GoodRouteConfig2"
goodEDSName = "GoodClusterAssignment1"
uninterestingRouteName = "UninterestingRouteName"
goodMatchingDomain = "lds.target.good"
uninterestingDomain = "uninteresting.domain"
goodClusterName1 = "GoodClusterName1"
goodClusterName2 = "GoodClusterName2"
uninterestingClusterName = "UninterestingClusterName"
httpConnManagerURL = "type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager"
)
var (
goodNodeProto = &basepb.Node{
Id: "ENVOY_NODE_ID",
Metadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
"TRAFFICDIRECTOR_GRPC_HOSTNAME": {
Kind: &structpb.Value_StringValue{StringValue: "trafficdirector"},
},
},
},
}
goodLDSRequest = &xdspb.DiscoveryRequest{
Node: goodNodeProto,
TypeUrl: ldsURL,
ResourceNames: []string{goodLDSTarget1},
}
goodCDSRequest = &xdspb.DiscoveryRequest{
Node: goodNodeProto,
TypeUrl: cdsURL,
ResourceNames: []string{goodClusterName1},
}
goodEDSRequest = &xdspb.DiscoveryRequest{
Node: goodNodeProto,
TypeUrl: edsURL,
ResourceNames: []string{goodEDSName},
}
goodHTTPConnManager1 = &httppb.HttpConnectionManager{}
marshaledConnMgr1, _ = proto.Marshal(goodHTTPConnManager1)
emptyHTTPConnManager = &httppb.HttpConnectionManager{
RouteSpecifier: &httppb.HttpConnectionManager_Rds{
Rds: &httppb.Rds{},
},
}
emptyMarshaledConnMgr, _ = proto.Marshal(emptyHTTPConnManager)
connMgrWithScopedRoutes = &httppb.HttpConnectionManager{
RouteSpecifier: &httppb.HttpConnectionManager_ScopedRoutes{},
}
marshaledConnMgrWithScopedRoutes, _ = proto.Marshal(connMgrWithScopedRoutes)
goodListener1 = &xdspb.Listener{
Name: goodLDSTarget1,
ApiListener: &listenerpb.ApiListener{
ApiListener: &anypb.Any{
TypeUrl: httpConnManagerURL,
Value: marshaledConnMgr1,
},
},
}
marshaledListener1, _ = proto.Marshal(goodListener1)
goodListener2 = &xdspb.Listener{
Name: goodLDSTarget2,
ApiListener: &listenerpb.ApiListener{
ApiListener: &anypb.Any{
TypeUrl: httpConnManagerURL,
Value: marshaledConnMgr1,
},
},
}
marshaledListener2, _ = proto.Marshal(goodListener2)
noAPIListener = &xdspb.Listener{Name: goodLDSTarget1}
marshaledNoAPIListener, _ = proto.Marshal(noAPIListener)
badAPIListener1 = &xdspb.Listener{
Name: goodLDSTarget1,
ApiListener: &listenerpb.ApiListener{
ApiListener: &anypb.Any{
TypeUrl: httpConnManagerURL,
Value: []byte{1, 2, 3, 4},
},
},
}
badAPIListener2 = &xdspb.Listener{
Name: goodLDSTarget2,
ApiListener: &listenerpb.ApiListener{
ApiListener: &anypb.Any{
TypeUrl: httpConnManagerURL,
Value: []byte{1, 2, 3, 4},
},
},
}
badlyMarshaledAPIListener2, _ = proto.Marshal(badAPIListener2)
badResourceListener = &xdspb.Listener{
Name: goodLDSTarget1,
ApiListener: &listenerpb.ApiListener{
ApiListener: &anypb.Any{
TypeUrl: ldsURL,
Value: marshaledListener1,
},
},
}
listenerWithEmptyHTTPConnMgr = &xdspb.Listener{
Name: goodLDSTarget1,
ApiListener: &listenerpb.ApiListener{
ApiListener: &anypb.Any{
TypeUrl: httpConnManagerURL,
Value: emptyMarshaledConnMgr,
},
},
}
listenerWithScopedRoutesRouteConfig = &xdspb.Listener{
Name: goodLDSTarget1,
ApiListener: &listenerpb.ApiListener{
ApiListener: &anypb.Any{
TypeUrl: httpConnManagerURL,
Value: marshaledConnMgrWithScopedRoutes,
},
},
}
goodLDSResponse1 = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: ldsURL,
Value: marshaledListener1,
},
},
TypeUrl: ldsURL,
}
goodLDSResponse2 = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: ldsURL,
Value: marshaledListener2,
},
},
TypeUrl: ldsURL,
}
emptyLDSResponse = &xdspb.DiscoveryResponse{TypeUrl: ldsURL}
badlyMarshaledLDSResponse = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: ldsURL,
Value: []byte{1, 2, 3, 4},
},
},
TypeUrl: ldsURL,
}
badResourceTypeInLDSResponse = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: httpConnManagerURL,
Value: marshaledConnMgr1,
},
},
TypeUrl: ldsURL,
}
ldsResponseWithMultipleResources = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: ldsURL,
Value: marshaledListener2,
},
{
TypeUrl: ldsURL,
Value: marshaledListener1,
},
},
TypeUrl: ldsURL,
}
noAPIListenerLDSResponse = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: ldsURL,
Value: marshaledNoAPIListener,
},
},
TypeUrl: ldsURL,
}
goodBadUglyLDSResponse = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: ldsURL,
Value: marshaledListener2,
},
{
TypeUrl: ldsURL,
Value: marshaledListener1,
},
{
TypeUrl: ldsURL,
Value: badlyMarshaledAPIListener2,
},
},
TypeUrl: ldsURL,
}
goodRouteConfig1 = &xdspb.RouteConfiguration{
Name: goodRouteName1,
VirtualHosts: []*routepb.VirtualHost{
{
Domains: []string{uninterestingDomain},
Routes: []*routepb.Route{
{
Match: &routepb.RouteMatch{PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: ""}},
Action: &routepb.Route_Route{
Route: &routepb.RouteAction{
ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: uninterestingClusterName},
},
},
},
},
},
{
Domains: []string{goodMatchingDomain},
Routes: []*routepb.Route{
{
Match: &routepb.RouteMatch{PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: ""}},
Action: &routepb.Route_Route{
Route: &routepb.RouteAction{
ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: goodClusterName1},
},
},
},
},
},
},
}
marshaledGoodRouteConfig1, _ = proto.Marshal(goodRouteConfig1)
goodRouteConfig2 = &xdspb.RouteConfiguration{
Name: goodRouteName2,
VirtualHosts: []*routepb.VirtualHost{
{
Domains: []string{uninterestingDomain},
Routes: []*routepb.Route{
{
Match: &routepb.RouteMatch{PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: ""}},
Action: &routepb.Route_Route{
Route: &routepb.RouteAction{
ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: uninterestingClusterName},
},
},
},
},
},
{
Domains: []string{goodMatchingDomain},
Routes: []*routepb.Route{
{
Match: &routepb.RouteMatch{PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: ""}},
Action: &routepb.Route_Route{
Route: &routepb.RouteAction{
ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: goodClusterName2},
},
},
},
},
},
},
}
marshaledGoodRouteConfig2, _ = proto.Marshal(goodRouteConfig2)
uninterestingRouteConfig = &xdspb.RouteConfiguration{
Name: uninterestingRouteName,
VirtualHosts: []*routepb.VirtualHost{
{
Domains: []string{uninterestingDomain},
Routes: []*routepb.Route{
{
Action: &routepb.Route_Route{
Route: &routepb.RouteAction{
ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: uninterestingClusterName},
},
},
},
},
},
},
}
marshaledUninterestingRouteConfig, _ = proto.Marshal(uninterestingRouteConfig)
)
// TestV2ClientBackoffAfterRecvError verifies if the v2Client backoffs when it
// encounters a Recv error while receiving an LDS response.
func TestV2ClientBackoffAfterRecvError(t *testing.T) {
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
// Override the v2Client backoff function with this, so that we can verify
// that a backoff actually was triggerred.
boCh := make(chan int, 1)
clientBackoff := func(v int) time.Duration {
boCh <- v
return 0
}
v2c := newV2Client(cc, goodNodeProto, clientBackoff)
defer v2c.close()
t.Log("Started xds v2Client...")
callbackCh := make(chan struct{})
v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) {
close(callbackCh)
})
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an LDS request")
}
t.Log("FakeServer received request...")
fakeServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("RPC error")}
t.Log("Bad LDS response pushed to fakeServer...")
timer := time.NewTimer(defaultTestTimeout)
select {
case <-timer.C:
t.Fatal("Timeout when expecting LDS update")
case <-boCh:
timer.Stop()
t.Log("v2Client backed off before retrying...")
case <-callbackCh:
t.Fatal("Received unexpected LDS callback")
}
}
// TestV2ClientRetriesAfterBrokenStream verifies the case where a stream
// encountered a Recv() error, and is expected to send out xDS requests for
// registered watchers once it comes back up again.
func TestV2ClientRetriesAfterBrokenStream(t *testing.T) {
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")
callbackCh := testutils.NewChannel()
v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) {
t.Logf("Received LDS callback with ldsUpdate {%+v} and error {%v}", u, err)
callbackCh.Send(struct{}{})
})
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an LDS request")
}
t.Log("FakeServer received request...")
fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1}
t.Log("Good LDS response pushed to fakeServer...")
if _, err := callbackCh.Receive(); err != nil {
t.Fatal("Timeout when expecting LDS update")
}
// Read the ack, so the next request is sent after stream re-creation.
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an LDS ACK")
}
fakeServer.XDSResponseChan <- &fakeserver.Response{Err: errors.New("RPC error")}
t.Log("Bad LDS response pushed to fakeServer...")
val, err := fakeServer.XDSRequestChan.Receive()
if err == testutils.ErrRecvTimeout {
t.Fatalf("Timeout expired when expecting LDS update")
}
gotRequest := val.(*fakeserver.Request)
if !proto.Equal(gotRequest.Req, goodLDSRequest) {
t.Fatalf("gotRequest: %+v, wantRequest: %+v", gotRequest.Req, goodLDSRequest)
}
}
// TestV2ClientCancelWatch verifies that the registered watch callback is not
// invoked if a response is received after the watcher is cancelled.
func TestV2ClientCancelWatch(t *testing.T) {
fakeServer, cc, cleanup := startServerAndGetCC(t)
defer cleanup()
v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")
callbackCh := testutils.NewChannel()
cancelFunc := v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) {
t.Logf("Received LDS callback with ldsUpdate {%+v} and error {%v}", u, err)
callbackCh.Send(struct{}{})
})
if _, err := fakeServer.XDSRequestChan.Receive(); err != nil {
t.Fatalf("Timeout expired when expecting an LDS request")
}
t.Log("FakeServer received request...")
fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1}
t.Log("Good LDS response pushed to fakeServer...")
if _, err := callbackCh.Receive(); err != nil {
t.Fatal("Timeout when expecting LDS update")
}
cancelFunc()
fakeServer.XDSResponseChan <- &fakeserver.Response{Resp: goodLDSResponse1}
t.Log("Another good LDS response pushed to fakeServer...")
if _, err := callbackCh.Receive(); err != testutils.ErrRecvTimeout {
t.Fatalf("Watch callback invoked after the watcher was cancelled")
}
}

View File

@@ -1,8 +1,8 @@
{
"node": {
"id": "ENVOY_NODE_ID",
"id": "COREDNS_NODE_ID",
"metadata": {
"TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector"
"TRAFFICDIRECTOR_GRPC_HOSTNAME": "xds_cluster"
}
},
"xds_servers" : [{