diff --git a/plugin/traffic/README.md b/plugin/traffic/README.md index b830f5ed8..1a20bc04f 100644 --- a/plugin/traffic/README.md +++ b/plugin/traffic/README.md @@ -6,8 +6,15 @@ ## Description -The *traffic* plugin is a balancer that allows traffic steering, weighted responses and draining of -clusters. +The *traffic* plugin is a balancer that allows traffic steering, weighted responses and draining +of clusters. A cluster in Envoy is defined as: "A group of logically similar endpoints that Envoy +connects to." Each cluster has a name, which *traffic* extends to be a domain name. See "Naming +Clusters" below. + +The use case for this plugin is when a cluster has endpoints running in multiple (Kubernetes?) +clusters and you need to steer traffic to (or away) from these endpoints, i.e. endpoint A needs to +be upgraded, so all traffic to it is drained. Or the entire Kubernetes needs to upgraded, and *all* +endpoints need to be drained from it. The cluster information is retrieved from a service discovery manager that implements the service discovery [protocols from Envoy implements](https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol). @@ -15,15 +22,12 @@ It connects to the manager using the Aggregated Discovery Service (ADS) protocol clusters are discovered every 10 seconds. The plugin hands out responses that adhere to these assignments. Only endpoints that are *healthy* are handed out. -If *traffic*'s `locality` has been set the answers can be localized. - -A cluster in Envoy is defined as: "A group of logically similar endpoints that Envoy connects to." -Each cluster has a name, which *traffic* extends to be a domain name. See "Naming Clusters" below. - -The use case for this plugin is when a cluster has endpoints running in multiple (Kubernetes?) -clusters and you need to steer traffic to (or away) from these endpoints, i.e. endpoint A needs to -be upgraded, so all traffic to it is drained. Or the entire Kubernetes needs to upgraded, and *all* -endpoints need to be drained from it. +Note that the manager *itself* is also a cluster that is managed *by the management server*. This is +the *management cluster* (see `cluster` below in "Syntax"). By default the name for cluster is `xds`. +When bootstrapping *traffic* tries to retrieve the cluster endpoints for the management cluster. +This continues in the background and *traffic* is smart enough to reconnect on failures or updates +cluster configuration. If the `xds` management cluster can't be found on start up, *traffic* returns a +fatal error. For A and AAAA queries each DNS response contains a single IP address that's considered the best one. The TTL on these answer is set to 5s. It will only return successful responses either with an @@ -36,15 +40,14 @@ enough to select the best one. When SRV records are returned, the endpoint DNS n works as well. [gRPC LB SRV records](https://github.com/grpc/proposal/blob/master/A5-grpclb-in-dns.md) are -supported and returned by the *traffic* plugin. These are SRV records for -`_grpclb._tcp..` and point to the xDS management servers as used in the configuration. -The target name used for these SRV records is `grpclb-.`. This means a cluster names -of `grpclb-N` are illegal, because it used by *traffic* itself. See "Naming Clusters" below for -details. +supported and returned by the *traffic* plugin for all clusters. The returned endpoints are, +however, the ones from the management cluster as these must implement gRPC LB. *Traffic* implements version 3 of the xDS API. It works with the management server as written in . +If *traffic*'s `locality` has been set the answers can be localized. + ## Syntax ~~~ @@ -53,13 +56,15 @@ traffic TO... This enabled the *traffic* plugin, with a default node ID of `coredns` and no TLS. - * **TO...** are the control plane endpoints to connect to. These must start with `grpc://`. The - port number defaults to 443, if not specified. + * **TO...** are the control plane endpoints to bootstrap from. These must start with `grpc://`. The + port number defaults to 443, if not specified. These endpoint will be tried in the order given. + First successful connection will be used to resolve the management cluster `xds`. The extended syntax is available if you want more control. ~~~ traffic TO... { + cluster CLUSTER id ID locality REGION[,ZONE[,SUBZONE]] [REGION[,ZONE[,SUBZONE]]]... tls CERT KEY CA @@ -68,6 +73,8 @@ traffic TO... { } ~~~ + * `cluster` **CLUSTER** define the name of the management cluster. By default this is `xds`. + * `id` **ID** is how *traffic* identifies itself to the control plane. This defaults to `coredns`. @@ -105,14 +112,15 @@ traffic TO... { When a cluster is named this usually consists out of a single word, i.e. "cluster-v0", or "web". The *traffic* plugins uses the name(s) specified in the Server Block to create fully qualified domain names. For example if the Server Block specifies `lb.example.org` as one of the names, -and "cluster-v0" is one of the load balanced cluster, *traffic* will respond to query asking for +and "cluster-v0" is one of the load balanced cluster, *traffic* will respond to queries asking for `cluster-v0.lb.example.org.` and the same goes for `web`; `web.lb.example.org`. For SRV queries all endpoints are returned, the SRV target names are synthesized: `endpoint-.web.lb.example.org` to take the example from above. *N* is an integer starting with 0. -gRPC LB integration is also done by returning the correct SRV records. A gRPC client will ask for -`_grpclb._tcp.web.lb.example.org` and expect to get the SRV (and address records) to tell it where +For the management cluster `_grpclb._tcp..` will also be resolved in the same way as +normal SRV queries. This special case is done because gRPC lib + the gRPC LBs are. For each **TO** in the configuration *traffic* will return a SRV record. The target name in the SRV are synthesized as well, using `grpclb-N` to prefix the zone from the Corefile, i.e. `grpclb-0.lb.example.org` will be the gRPC name when using `lb.example.org` in the configuration. diff --git a/plugin/traffic/setup.go b/plugin/traffic/setup.go index ca764ce6d..bddc76ea7 100644 --- a/plugin/traffic/setup.go +++ b/plugin/traffic/setup.go @@ -67,7 +67,7 @@ func setup(c *caddy.Controller) error { func parseTraffic(c *caddy.Controller) (*Traffic, error) { toHosts := []string{} - t := &Traffic{node: "coredns"} + t := &Traffic{node: "coredns", mgmt: "xds"} var ( err error tlsConfig *tls.Config @@ -98,6 +98,12 @@ func parseTraffic(c *caddy.Controller) (*Traffic, error) { } for c.NextBlock() { switch c.Val() { + case "cluster": + args := c.RemainingArgs() + if len(args) != 1 { + return nil, c.ArgErr() + } + t.mgmt = args[0] case "id": args := c.RemainingArgs() if len(args) != 1 { diff --git a/plugin/traffic/traffic.go b/plugin/traffic/traffic.go index fff563ddb..3cee43519 100644 --- a/plugin/traffic/traffic.go +++ b/plugin/traffic/traffic.go @@ -4,7 +4,6 @@ import ( "context" "crypto/tls" "fmt" - "net" "strconv" "strings" "time" @@ -21,6 +20,7 @@ import ( type Traffic struct { c *xds.Client node string + mgmt string tlsConfig *tls.Config hosts []string @@ -29,9 +29,6 @@ type Traffic struct { origins []string loc []xds.Locality - grpcSRV []dns.RR // SRV records for grpc LB - grpcAddr []dns.RR // Address records for each LB (taken from **TOO**) - Next plugin.Handler } @@ -54,26 +51,51 @@ func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg sockaddr, ok := t.c.Select(cluster, t.loc, t.health) if !ok { - // ok the cluster (which has potentially extra labels), doesn't exist, but we may have a query for endpoint-0.. - // check if we have 2 labels and that the first equals endpoint-0. - if dns.CountLabel(cluster) != 2 { - m.Ns = soa(state.Zone) - m.Rcode = dns.RcodeNameError - w.WriteMsg(m) - return 0, nil - } + // ok this cluster doesn't exist, potentially due to extra labels, which may be garbage or legit queries: + // legit is: + // endpoint-N.cluster + // _grpclb._tcp.cluster + // _tcp.cluster labels := dns.SplitDomainName(cluster) - if strings.HasPrefix(labels[0], "endpoint-") { - // recheck if the cluster exist. - cluster = labels[1] - sockaddr, ok = t.c.Select(cluster, t.loc, t.health) - if !ok { + switch len(labels) { + case 2: + // endpoint or _tcp + if strings.ToLower(labels[0]) == "_tcp" { + // nodata, because empty non-terminal + m.Ns = soa(state.Zone) + m.Rcode = dns.RcodeSuccess + w.WriteMsg(m) + return 0, nil + } + if strings.HasPrefix(strings.ToLower(labels[0]), "endpoint-") { + // recheck if the cluster exist. + cluster = labels[1] + sockaddr, ok = t.c.Select(cluster, t.loc, t.health) + if !ok { + m.Ns = soa(state.Zone) + m.Rcode = dns.RcodeNameError + w.WriteMsg(m) + return 0, nil + } + return t.serveEndpoint(ctx, state, labels[0], cluster) + } + case 3: + if strings.ToLower(labels[0]) != "_grpclb" || strings.ToLower(labels[1]) != "_tcp" { m.Ns = soa(state.Zone) m.Rcode = dns.RcodeNameError w.WriteMsg(m) return 0, nil } - return t.serveEndpoint(ctx, state, labels[0], cluster) + // OK, _grcplb._tcp query; we need to return the endpoint for the mgmt cluster *NOT* the cluster + // we got the query for. This should exist, but we'll check later anyway + cluster = t.mgmt + sockaddr, _ = t.c.Select(cluster, t.loc, t.health) + break + default: + m.Ns = soa(state.Zone) + m.Rcode = dns.RcodeNameError + w.WriteMsg(m) + return 0, nil } } @@ -193,46 +215,3 @@ func soa(z string) []dns.RR { Minttl: 5, }} } - -// srv record for grpclb endpoint. -func srv(i int, host, zone string) *dns.SRV { - target := fmt.Sprintf("grpclb-%d.%s", i, zone) - hdr := dns.RR_Header{Name: dnsutil.Join("_grpclb._tcp", zone), Class: dns.ClassINET, Rrtype: dns.TypeSRV} - _, p, _ := net.SplitHostPort(host) // err ignored already checked in setup - port, _ := strconv.Atoi(p) - return &dns.SRV{ - Hdr: hdr, - // prio, weight -> 0 - Port: uint16(port), - Target: target, - } -} - -func a(i int, host, zone string) *dns.A { - owner := fmt.Sprintf("grpclb-%d.%s", i, zone) - hdr := dns.RR_Header{Name: owner, Class: dns.ClassINET, Rrtype: dns.TypeA} - h, _, _ := net.SplitHostPort(host) - ip := net.ParseIP(h) - if ip == nil { - return nil - } - if ip.To4() == nil { - return nil - } - return &dns.A{Hdr: hdr, A: ip.To4()} -} - -func aaaa(i int, host, zone string) *dns.AAAA { - owner := fmt.Sprintf("grpclb-%d.%s", i, zone) - hdr := dns.RR_Header{Name: owner, Class: dns.ClassINET, Rrtype: dns.TypeAAAA} - h, _, _ := net.SplitHostPort(host) - ip := net.ParseIP(h) - if ip == nil { - return nil - } - if ip.To4() != nil { - return nil - } - return &dns.AAAA{Hdr: hdr, AAAA: ip.To16()} - -} diff --git a/plugin/traffic/traffic_test.go b/plugin/traffic/traffic_test.go index 20a39b18b..f43d9568f 100644 --- a/plugin/traffic/traffic_test.go +++ b/plugin/traffic/traffic_test.go @@ -256,6 +256,46 @@ func TestTrafficSRV(t *testing.T) { } } +func TestTrafficManagement(t *testing.T) { + c, err := xds.New("127.0.0.1:0", "test-id", grpc.WithInsecure()) + if err != nil { + t.Fatal(err) + } + tr := &Traffic{c: c, origins: []string{"lb.example.org."}, mgmt: "xds"} + + for _, cla := range []*endpointpb.ClusterLoadAssignment{ + &endpointpb.ClusterLoadAssignment{ + ClusterName: "web", + Endpoints: endpoints([]EndpointHealth{{"127.0.0.1", 18008, corepb.HealthStatus_HEALTHY}}), + }, + &endpointpb.ClusterLoadAssignment{ + ClusterName: "xds", + Endpoints: endpoints([]EndpointHealth{{"::1", 18008, corepb.HealthStatus_HEALTHY}}), + }, + } { + a := xds.NewAssignment() + a.SetClusterLoadAssignment(cla.ClusterName, cla) + c.SetAssignments(a) + } + ctx := context.TODO() + + // Now we ask for the grpclb endpoint in the web cluster, this should give us the endpoint of the xds (mgmt) cluster. + // ; ANSWER SECTION: + // _grpclb._tcp.web.lb.example.org. 5 IN SRV 100 100 18008 endpoint-0.xds.lb.example.org. + // ;; ADDITIONAL SECTION: + // endpoint-0.xds.lb.example.org. 5 IN AAAA ::1 + + m := new(dns.Msg) + m.SetQuestion("_grpclb._tcp.web.lb.example.org.", dns.TypeSRV) + rec := dnstest.NewRecorder(&test.ResponseWriter{}) + if _, err := tr.ServeDNS(ctx, rec, m); err != nil { + t.Errorf("Expected no error, but got %q", err) + } + if x := rec.Msg.Answer[0].(*dns.SRV).Target; x != "endpoint-0.xds.lb.example.org." { + t.Errorf("Expected %s, got %s", "endpoint-0.xds.lb.example.org.", x) + } +} + type EndpointHealth struct { Address string Port uint16