mirror of
https://github.com/coredns/coredns.git
synced 2025-10-27 08:14:18 -04:00
224 lines
7.3 KiB
Go
224 lines
7.3 KiB
Go
/*
|
|
This package contains code copied from github.com/grpc/grpc-co. The license for that code is:
|
|
|
|
Copyright 2019 gRPC authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
// Package xds implements a bidirectional stream to an envoy ADS management endpoint. It will stream
|
|
// updates (CDS and EDS) from there to help load balance responses to DNS clients.
|
|
package xds
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/coredns/coredns/coremain"
|
|
clog "github.com/coredns/coredns/plugin/pkg/log"
|
|
|
|
clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
|
|
corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
|
|
endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
|
|
xdspb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
|
|
"github.com/golang/protobuf/ptypes"
|
|
"google.golang.org/grpc"
|
|
)
|
|
|
|
var log = clog.NewWithPlugin("traffic: xds")
|
|
|
|
const (
|
|
cdsURL = "type.googleapis.com/envoy.config.cluster.v3.Cluster"
|
|
edsURL = "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"
|
|
)
|
|
|
|
type adsStream xdspb.AggregatedDiscoveryService_StreamAggregatedResourcesClient
|
|
|
|
// Client talks to the grpc manager's endpoint to get load assignments.
|
|
type Client struct {
|
|
cc *grpc.ClientConn
|
|
ctx context.Context
|
|
node *corepb.Node
|
|
cancel context.CancelFunc
|
|
stop chan struct{}
|
|
to string // upstream hosts, mostly here for logging purposes
|
|
mu sync.RWMutex // protects everything below
|
|
assignments *assignment // assignments contains the current clusters and endpoints
|
|
version map[string]string
|
|
nonce map[string]string
|
|
synced bool // true when we first successfully got a stream
|
|
}
|
|
|
|
// New returns a new client that's dialed to addr using node as the local identifier.
|
|
func New(addr, node string, opts ...grpc.DialOption) (*Client, error) {
|
|
cc, err := grpc.Dial(addr, opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
c := &Client{cc: cc, to: addr, node: &corepb.Node{Id: node, UserAgentName: "CoreDNS", UserAgentVersionType: &corepb.Node_UserAgentVersion{UserAgentVersion: coremain.CoreVersion}}}
|
|
c.assignments = &assignment{cla: make(map[string]*endpointpb.ClusterLoadAssignment)}
|
|
c.version, c.nonce = make(map[string]string), make(map[string]string)
|
|
c.ctx, c.cancel = context.WithCancel(context.Background())
|
|
|
|
return c, nil
|
|
}
|
|
|
|
// Stop stops all goroutines and closes the connection to the upstream manager.
|
|
func (c *Client) Stop() error { c.cancel(); return c.cc.Close() }
|
|
|
|
// Run starts all goroutines and gathers the clusters and endpoint information from the upstream manager.
|
|
func (c *Client) Run() error {
|
|
first := true
|
|
for {
|
|
select {
|
|
case <-c.ctx.Done():
|
|
return nil
|
|
default:
|
|
}
|
|
|
|
cli := xdspb.NewAggregatedDiscoveryServiceClient(c.cc)
|
|
stream, err := cli.StreamAggregatedResources(c.ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if first {
|
|
// send first request, to create stream, then wait for ADS to send us updates.
|
|
if err := c.clusterDiscovery(stream, c.Version(cdsURL), c.Nonce(cdsURL), []string{}); err != nil {
|
|
return err
|
|
}
|
|
log.Infof("gRPC stream established to %q", c.to) // might fail??
|
|
c.setSynced()
|
|
first = false
|
|
}
|
|
|
|
if err := c.receive(stream); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// clusterDiscovery sends a cluster DiscoveryRequest on the stream.
|
|
func (c *Client) clusterDiscovery(stream adsStream, version, nonce string, clusters []string) error {
|
|
req := &xdspb.DiscoveryRequest{
|
|
Node: c.node,
|
|
TypeUrl: cdsURL,
|
|
ResourceNames: clusters, // empty for all
|
|
VersionInfo: version,
|
|
ResponseNonce: nonce,
|
|
}
|
|
return stream.Send(req)
|
|
}
|
|
|
|
// endpointDiscovery sends a endpoint DiscoveryRequest on the stream.
|
|
func (c *Client) endpointDiscovery(stream adsStream, version, nonce string, clusters []string) error {
|
|
req := &xdspb.DiscoveryRequest{
|
|
Node: c.node,
|
|
TypeUrl: edsURL,
|
|
ResourceNames: clusters,
|
|
VersionInfo: version,
|
|
ResponseNonce: nonce,
|
|
}
|
|
return stream.Send(req)
|
|
}
|
|
|
|
// receive receives from the stream, it handles both cluster and endpoint DiscoveryResponses.
|
|
func (c *Client) receive(stream adsStream) error {
|
|
for {
|
|
resp, err := stream.Recv()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
switch resp.GetTypeUrl() {
|
|
case cdsURL:
|
|
a := NewAssignment()
|
|
for _, r := range resp.GetResources() {
|
|
var any ptypes.DynamicAny
|
|
if err := ptypes.UnmarshalAny(r, &any); err != nil {
|
|
log.Debugf("Failed to unmarshal cluster discovery: %s", err)
|
|
continue
|
|
}
|
|
cluster, ok := any.Message.(*clusterpb.Cluster)
|
|
if !ok {
|
|
continue
|
|
}
|
|
a.SetClusterLoadAssignment(cluster.GetName(), nil)
|
|
}
|
|
// set our local administration and ack the reply. Empty version would signal NACK.
|
|
c.SetNonce(cdsURL, resp.GetNonce())
|
|
c.SetVersion(cdsURL, resp.GetVersionInfo())
|
|
c.SetAssignments(a)
|
|
c.clusterDiscovery(stream, resp.GetVersionInfo(), resp.GetNonce(), a.clusters())
|
|
|
|
log.Debugf("Cluster discovery processed with %d resources, version %q and nonce %q", len(resp.GetResources()), c.Version(cdsURL), c.Nonce(cdsURL))
|
|
ClusterGauge.Set(float64(len(resp.GetResources())))
|
|
|
|
// now kick off discovery for endpoints
|
|
if err := c.endpointDiscovery(stream, c.Version(edsURL), c.Nonce(edsURL), a.clusters()); err != nil {
|
|
log.Debug(err)
|
|
}
|
|
case edsURL:
|
|
for _, r := range resp.GetResources() {
|
|
var any ptypes.DynamicAny
|
|
if err := ptypes.UnmarshalAny(r, &any); err != nil {
|
|
log.Debugf("Failed to unmarshal endpoint discovery: %s", err)
|
|
continue
|
|
}
|
|
cla, ok := any.Message.(*endpointpb.ClusterLoadAssignment)
|
|
if !ok {
|
|
// TODO warn/err here?
|
|
continue
|
|
}
|
|
c.assignments.SetClusterLoadAssignment(cla.GetClusterName(), cla)
|
|
|
|
}
|
|
// set our local administration and ack the reply. Empty version would signal NACK.
|
|
c.SetNonce(edsURL, resp.GetNonce())
|
|
c.SetVersion(edsURL, resp.GetVersionInfo())
|
|
|
|
log.Debugf("Endpoint discovery processed with %d resources, version %q and nonce %q", len(resp.GetResources()), c.Version(edsURL), c.Nonce(edsURL))
|
|
EndpointGauge.Set(float64(len(resp.GetResources())))
|
|
|
|
default:
|
|
return fmt.Errorf("unknown response URL for discovery: %q", resp.GetTypeUrl())
|
|
}
|
|
}
|
|
}
|
|
|
|
// Select returns an address that is deemed to be the correct one for this cluster. The returned
|
|
// boolean indicates if the cluster exists.
|
|
func (c *Client) Select(cluster string, ignore bool) (*SocketAddress, bool) {
|
|
if cluster == "" {
|
|
return nil, false
|
|
}
|
|
return c.assignments.Select(cluster, ignore)
|
|
}
|
|
|
|
// All returns all endpoints.
|
|
func (c *Client) All(cluster string, ignore bool) ([]*SocketAddress, bool) {
|
|
if cluster == "" {
|
|
return nil, false
|
|
}
|
|
return c.assignments.All(cluster, ignore)
|
|
}
|
|
|
|
// Locality holds the locality for this server. It contains a Region, Zone and SubZone.
|
|
// Currently this is not used.
|
|
type Locality struct {
|
|
Region string
|
|
Zone string
|
|
SubZone string
|
|
}
|