diff --git a/core/dnsserver/zdirectives.go b/core/dnsserver/zdirectives.go index 61d96c633..08b36475c 100644 --- a/core/dnsserver/zdirectives.go +++ b/core/dnsserver/zdirectives.go @@ -30,6 +30,7 @@ var Directives = []string{ "acl", "any", "chaos", + "traffic", "loadbalance", "cache", "rewrite", diff --git a/core/plugin/zplugin.go b/core/plugin/zplugin.go index 90267f29b..374d0b8fe 100644 --- a/core/plugin/zplugin.go +++ b/core/plugin/zplugin.go @@ -46,6 +46,7 @@ import ( _ "github.com/coredns/coredns/plugin/template" _ "github.com/coredns/coredns/plugin/tls" _ "github.com/coredns/coredns/plugin/trace" + _ "github.com/coredns/coredns/plugin/traffic" _ "github.com/coredns/coredns/plugin/transfer" _ "github.com/coredns/coredns/plugin/whoami" _ "github.com/coredns/federation" diff --git a/plugin.cfg b/plugin.cfg index 3f3dd85fd..f8e00a4e2 100644 --- a/plugin.cfg +++ b/plugin.cfg @@ -39,6 +39,7 @@ dnstap:dnstap acl:acl any:any chaos:chaos +traffic:traffic loadbalance:loadbalance cache:cache rewrite:rewrite diff --git a/plugin/traffic/HACKING.md b/plugin/traffic/HACKING.md index 0a483fec6..0c425c0bf 100644 --- a/plugin/traffic/HACKING.md +++ b/plugin/traffic/HACKING.md @@ -9,3 +9,12 @@ Repos used: : implements client for xDS - can probably list all code out from there. To see if things are working start the testing control plane from go-control-plane: + +https://github.com/envoyproxy/envoy/blob/master/api/API_OVERVIEW.md + +https://github.com/envoyproxy/learnenvoy/blob/master/_articles/service-discovery.md + + +Cluster: A cluster is a group of logically similar endpoints that Envoy connects to. In v2, RDS +routes points to clusters, CDS provides cluster configuration and Envoy discovers the cluster +members via EDS. diff --git a/plugin/traffic/setup.go b/plugin/traffic/setup.go index 5ec2a3dd9..befca9fc4 100644 --- a/plugin/traffic/setup.go +++ b/plugin/traffic/setup.go @@ -21,8 +21,14 @@ func setup(c *caddy.Controller) error { return plugin.Error("traffic", err) } + t, err := New() + if err != nil { + return plugin.Error("traffic", err) + } + dnsserver.GetConfig(c).AddPlugin(func(next plugin.Handler) plugin.Handler { - return &Traffic{Next: next, assignments: make(map[string]assignment)} + t.Next = next + return t }) return nil diff --git a/plugin/traffic/traffic.go b/plugin/traffic/traffic.go index 890b705ec..b320df070 100644 --- a/plugin/traffic/traffic.go +++ b/plugin/traffic/traffic.go @@ -3,37 +3,43 @@ package traffic import ( "context" "math/rand" - "sync" "time" - clog "github.com/coredns/coredns/pkg/log" "github.com/coredns/coredns/plugin" "github.com/coredns/coredns/plugin/pkg/response" - "github.com/coredns/coredns/request" + "github.com/coredns/coredns/plugin/traffic/xds" + "github.com/coredns/coredns/plugin/traffic/xds/bootstrap" "github.com/miekg/dns" ) -var log = clog.NewWithPlugin("traffic") - // Traffic is a plugin that load balances according to assignments. type Traffic struct { - assignments map[string]assignment // zone -> assignment - mu sync.RWMutex // protects assignments - Next plugin.Handler + c *xds.Client + Next plugin.Handler +} + +// New returns a pointer to a new and initialized Traffic. +func New() (*Traffic, error) { + config, err := bootstrap.NewConfig() + if err != nil { + return nil, err + } + c, err := xds.New(xds.Options{Config: *config}) + if err != nil { + return nil, err + } + + return &Traffic{c: c}, nil +} + +func (t *Traffic) Close() { + t.c.Close() } // ServeDNS implements the plugin.Handler interface. func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) { - state := request.Request{W: w, Req: r} - tw := &ResponseWriter{ResponseWriter: w} - t.mu.RLock() - a, ok := t.assignments[state.Name()] - t.mu.RUnlock() - if ok { - tw.a = &a - } return plugin.NextOrFailure(t.Name(), t.Next, ctx, tw, r) } @@ -43,7 +49,6 @@ func (t *Traffic) Name() string { return "traffic" } // ResponseWriter writes a traffic load balanced response. type ResponseWriter struct { dns.ResponseWriter - a *assignment } // WriteMsg implements the dns.ResponseWriter interface. @@ -62,10 +67,6 @@ func (r *ResponseWriter) WriteMsg(res *dns.Msg) error { return r.ResponseWriter.WriteMsg(res) } - // ok, traffic-lb - if r.a != nil { - - } if len(res.Answer) > 1 { res.Answer = []dns.RR{res.Answer[rand.Intn(len(res.Answer))]} res.Answer[0].Header().Ttl = 5 diff --git a/plugin/traffic/xds/bootstrap/bootstrap.go b/plugin/traffic/xds/bootstrap/bootstrap.go index cc2150ccc..910120c0c 100644 --- a/plugin/traffic/xds/bootstrap/bootstrap.go +++ b/plugin/traffic/xds/bootstrap/bootstrap.go @@ -27,12 +27,13 @@ import ( "io/ioutil" "os" + "github.com/coredns/coredns/plugin/pkg/log" + + corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" "github.com/golang/protobuf/jsonpb" "google.golang.org/grpc" "google.golang.org/grpc/credentials/google" "google.golang.org/grpc/grpclog" - - corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" ) const ( @@ -98,26 +99,23 @@ type xdsServer struct { // the presence of the errors) and may return a Config object with certain // fields left unspecified, in which case the caller should use some sane // defaults. -func NewConfig() *Config { +func NewConfig() (*Config, error) { config := &Config{} fName, ok := os.LookupEnv(fileEnv) if !ok { - grpclog.Errorf("xds: %s environment variable not set", fileEnv) - return config + return config, fmt.Errorf("xds: %s environment variable not set", fileEnv) } grpclog.Infof("xds: Reading bootstrap file from %s", fName) data, err := fileReadFunc(fName) if err != nil { - grpclog.Errorf("xds: bootstrap file {%v} read failed: %v", fName, err) - return config + return config, fmt.Errorf("xds: bootstrap file {%v} read failed: %v", fName, err) } var jsonData map[string]json.RawMessage if err := json.Unmarshal(data, &jsonData); err != nil { - grpclog.Errorf("xds: json.Unmarshal(%v) failed during bootstrap: %v", string(data), err) - return config + return config, fmt.Errorf("xds: json.Unmarshal(%v) failed during bootstrap: %v", string(data), err) } m := jsonpb.Unmarshaler{AllowUnknownFields: true} @@ -126,18 +124,18 @@ func NewConfig() *Config { case "node": n := &corepb.Node{} if err := m.Unmarshal(bytes.NewReader(v), n); err != nil { - grpclog.Errorf("xds: jsonpb.Unmarshal(%v) for field %q failed during bootstrap: %v", string(v), k, err) + log.Errorf("xds: jsonpb.Unmarshal(%v) for field %q failed during bootstrap: %v", string(v), k, err) break } config.NodeProto = n case "xds_servers": var servers []*xdsServer if err := json.Unmarshal(v, &servers); err != nil { - grpclog.Errorf("xds: json.Unmarshal(%v) for field %q failed during bootstrap: %v", string(v), k, err) + log.Errorf("xds: json.Unmarshal(%v) for field %q failed during bootstrap: %v", string(v), k, err) break } if len(servers) < 1 { - grpclog.Errorf("xds: bootstrap file parsing failed during bootstrap: file doesn't contain any xds server to connect to") + log.Errorf("xds: bootstrap file parsing failed during bootstrap: file doesn't contain any xds server to connect to") break } xs := servers[0] @@ -151,7 +149,7 @@ func NewConfig() *Config { } default: // Do not fail the xDS bootstrap when an unknown field is seen. - grpclog.Warningf("xds: unexpected data in bootstrap file: {%v, %v}", k, string(v)) + log.Warningf("xds: unexpected data in bootstrap file: {%v, %v}", k, string(v)) } } @@ -163,6 +161,5 @@ func NewConfig() *Config { } config.NodeProto.BuildVersion = gRPCVersion - grpclog.Infof("xds: bootstrap.NewConfig returning: %+v", config) - return config + return config, nil } diff --git a/plugin/traffic/xds/client.go b/plugin/traffic/xds/client.go index 98e5d2b4a..7104bb7c6 100644 --- a/plugin/traffic/xds/client.go +++ b/plugin/traffic/xds/client.go @@ -23,7 +23,6 @@ package xds import ( "errors" "fmt" - "sync" "github.com/coredns/coredns/plugin/traffic/xds/bootstrap" @@ -49,10 +48,7 @@ type Client struct { cc *grpc.ClientConn // Connection to the xDS server v2c *v2Client // Actual xDS client implementation using the v2 API - mu sync.Mutex serviceCallback func(ServiceUpdate, error) - ldsCancel func() - rdsCancel func() } // New returns a new xdsClient configured with opts. @@ -99,7 +95,7 @@ func (c *Client) WatchCluster(clusterName string, cdsCb func(CDSUpdate, error)) return c.v2c.watchCDS(clusterName, cdsCb) } -// WatchEDS watches the ghost. -func (c *Client) WatchEDS(clusterName string, edsCb func(*EDSUpdate, error)) (cancel func()) { +// WatchEndpoints uses EDS to discover information about the endpoints in a cluster. +func (c *Client) WatchEndpoints(clusterName string, edsCb func(*EDSUpdate, error)) (cancel func()) { return c.v2c.watchEDS(clusterName, edsCb) } diff --git a/plugin/traffic/xds_bootstrap.json b/plugin/traffic/xds_bootstrap.json new file mode 100644 index 000000000..c27cf394c --- /dev/null +++ b/plugin/traffic/xds_bootstrap.json @@ -0,0 +1,14 @@ +{ + "node": { + "id": "ENVOY_NODE_ID", + "metadata": { + "TRAFFICDIRECTOR_GRPC_HOSTNAME": "trafficdirector" + } + }, + "xds_servers" : [{ + "server_uri": "trafficdirector.googleapis.com:443", + "channel_creds": [ + { "type": "google_default" } + ] + }] +}