Fix coredns, implement v2

Signed-off-by: Miek Gieben <miek@miek.nl>
This commit is contained in:
Miek Gieben
2020-06-04 13:36:27 +02:00
parent fafb347966
commit 29acaf739e
7 changed files with 85 additions and 200 deletions

View File

@@ -22,16 +22,14 @@ package xds
import (
"context"
"fmt"
"sync"
"github.com/coredns/coredns/coremain"
clog "github.com/coredns/coredns/plugin/pkg/log"
clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
xdspb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
xdspb2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb2 "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
adspb2 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc"
)
@@ -39,17 +37,17 @@ import (
var log = clog.NewWithPlugin("traffic: xds")
const (
cdsURL = "type.googleapis.com/envoy.config.cluster.v3.Cluster"
edsURL = "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"
clusterType = "type.googleapis.com/envoy.api.v2.Cluster"
endpointType = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"
)
type adsStream xdspb.AggregatedDiscoveryService_StreamAggregatedResourcesClient
type adsStream adspb2.AggregatedDiscoveryService_StreamAggregatedResourcesClient
// Client talks to the grpc manager's endpoint to get load assignments.
type Client struct {
cc *grpc.ClientConn
ctx context.Context
node *corepb.Node
node *corepb2.Node
cancel context.CancelFunc
stop chan struct{}
to string // upstream hosts, mostly here for logging purposes
@@ -66,8 +64,8 @@ func New(addr, node string, opts ...grpc.DialOption) (*Client, error) {
if err != nil {
return nil, err
}
c := &Client{cc: cc, to: addr, node: &corepb.Node{Id: node, UserAgentName: "CoreDNS", UserAgentVersionType: &corepb.Node_UserAgentVersion{UserAgentVersion: coremain.CoreVersion}}}
c.assignments = &assignment{cla: make(map[string]*endpointpb.ClusterLoadAssignment)}
c := &Client{cc: cc, to: addr, node: &corepb2.Node{Id: node, UserAgentName: "CoreDNS", UserAgentVersionType: &corepb2.Node_UserAgentVersion{UserAgentVersion: coremain.CoreVersion}}}
c.assignments = &assignment{cla: make(map[string]*xdspb2.ClusterLoadAssignment)}
c.version, c.nonce = make(map[string]string), make(map[string]string)
c.ctx, c.cancel = context.WithCancel(context.Background())
@@ -87,7 +85,7 @@ func (c *Client) Run() error {
default:
}
cli := xdspb.NewAggregatedDiscoveryServiceClient(c.cc)
cli := adspb2.NewAggregatedDiscoveryServiceClient(c.cc)
stream, err := cli.StreamAggregatedResources(c.ctx)
if err != nil {
return err
@@ -95,7 +93,7 @@ func (c *Client) Run() error {
if first {
// send first request, to create stream, then wait for ADS to send us updates.
if err := c.clusterDiscovery(stream, c.Version(cdsURL), c.Nonce(cdsURL), []string{}); err != nil {
if err := c.clusterDiscovery(stream, c.Version(clusterType), c.Nonce(clusterType), []string{}); err != nil {
return err
}
log.Infof("gRPC stream established to %q", c.to) // might fail??
@@ -111,9 +109,9 @@ func (c *Client) Run() error {
// clusterDiscovery sends a cluster DiscoveryRequest on the stream.
func (c *Client) clusterDiscovery(stream adsStream, version, nonce string, clusters []string) error {
req := &xdspb.DiscoveryRequest{
req := &xdspb2.DiscoveryRequest{
Node: c.node,
TypeUrl: cdsURL,
TypeUrl: clusterType,
ResourceNames: clusters, // empty for all
VersionInfo: version,
ResponseNonce: nonce,
@@ -123,9 +121,9 @@ func (c *Client) clusterDiscovery(stream adsStream, version, nonce string, clust
// endpointDiscovery sends a endpoint DiscoveryRequest on the stream.
func (c *Client) endpointDiscovery(stream adsStream, version, nonce string, clusters []string) error {
req := &xdspb.DiscoveryRequest{
req := &xdspb2.DiscoveryRequest{
Node: c.node,
TypeUrl: edsURL,
TypeUrl: endpointType,
ResourceNames: clusters,
VersionInfo: version,
ResponseNonce: nonce,
@@ -142,7 +140,7 @@ func (c *Client) receive(stream adsStream) error {
}
switch resp.GetTypeUrl() {
case cdsURL:
case clusterType:
a := NewAssignment()
for _, r := range resp.GetResources() {
var any ptypes.DynamicAny
@@ -150,33 +148,33 @@ func (c *Client) receive(stream adsStream) error {
log.Debugf("Failed to unmarshal cluster discovery: %s", err)
continue
}
cluster, ok := any.Message.(*clusterpb.Cluster)
cluster, ok := any.Message.(*xdspb2.Cluster)
if !ok {
continue
}
a.SetClusterLoadAssignment(cluster.GetName(), nil)
}
// set our local administration and ack the reply. Empty version would signal NACK.
c.SetNonce(cdsURL, resp.GetNonce())
c.SetVersion(cdsURL, resp.GetVersionInfo())
c.SetNonce(clusterType, resp.GetNonce())
c.SetVersion(clusterType, resp.GetVersionInfo())
c.SetAssignments(a)
c.clusterDiscovery(stream, resp.GetVersionInfo(), resp.GetNonce(), a.clusters())
log.Debugf("Cluster discovery processed with %d resources, version %q and nonce %q", len(resp.GetResources()), c.Version(cdsURL), c.Nonce(cdsURL))
log.Debugf("Cluster discovery processed with %d resources, version %q and nonce %q", len(resp.GetResources()), c.Version(clusterType), c.Nonce(clusterType))
ClusterGauge.Set(float64(len(resp.GetResources())))
// now kick off discovery for endpoints
if err := c.endpointDiscovery(stream, c.Version(edsURL), c.Nonce(edsURL), a.clusters()); err != nil {
if err := c.endpointDiscovery(stream, c.Version(endpointType), c.Nonce(endpointType), a.clusters()); err != nil {
log.Debug(err)
}
case edsURL:
case endpointType:
for _, r := range resp.GetResources() {
var any ptypes.DynamicAny
if err := ptypes.UnmarshalAny(r, &any); err != nil {
log.Debugf("Failed to unmarshal endpoint discovery: %s", err)
continue
}
cla, ok := any.Message.(*endpointpb.ClusterLoadAssignment)
cla, ok := any.Message.(*xdspb2.ClusterLoadAssignment)
if !ok {
// TODO warn/err here?
continue
@@ -185,14 +183,14 @@ func (c *Client) receive(stream adsStream) error {
}
// set our local administration and ack the reply. Empty version would signal NACK.
c.SetNonce(edsURL, resp.GetNonce())
c.SetVersion(edsURL, resp.GetVersionInfo())
c.SetNonce(endpointType, resp.GetNonce())
c.SetVersion(endpointType, resp.GetVersionInfo())
log.Debugf("Endpoint discovery processed with %d resources, version %q and nonce %q", len(resp.GetResources()), c.Version(edsURL), c.Nonce(edsURL))
log.Debugf("Endpoint discovery processed with %d resources, version %q and nonce %q", len(resp.GetResources()), c.Version(endpointType), c.Nonce(endpointType))
EndpointGauge.Set(float64(len(resp.GetResources())))
default:
return fmt.Errorf("unknown response URL for discovery: %q", resp.GetTypeUrl())
// ignore anything we don't know how to process. Probably should NACK these properly.
}
}
}