mirror of
https://github.com/coredns/coredns.git
synced 2025-10-27 16:24:19 -04:00
Managment server is xds management server as well
Signed-off-by: Miek Gieben <miek@miek.nl>
This commit is contained in:
@@ -6,8 +6,15 @@
|
||||
|
||||
## Description
|
||||
|
||||
The *traffic* plugin is a balancer that allows traffic steering, weighted responses and draining of
|
||||
clusters.
|
||||
The *traffic* plugin is a balancer that allows traffic steering, weighted responses and draining
|
||||
of clusters. A cluster in Envoy is defined as: "A group of logically similar endpoints that Envoy
|
||||
connects to." Each cluster has a name, which *traffic* extends to be a domain name. See "Naming
|
||||
Clusters" below.
|
||||
|
||||
The use case for this plugin is when a cluster has endpoints running in multiple (Kubernetes?)
|
||||
clusters and you need to steer traffic to (or away) from these endpoints, i.e. endpoint A needs to
|
||||
be upgraded, so all traffic to it is drained. Or the entire Kubernetes needs to upgraded, and *all*
|
||||
endpoints need to be drained from it.
|
||||
|
||||
The cluster information is retrieved from a service discovery manager that implements the service
|
||||
discovery [protocols from Envoy implements](https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol).
|
||||
@@ -15,15 +22,12 @@ It connects to the manager using the Aggregated Discovery Service (ADS) protocol
|
||||
clusters are discovered every 10 seconds. The plugin hands out responses that adhere to these
|
||||
assignments. Only endpoints that are *healthy* are handed out.
|
||||
|
||||
If *traffic*'s `locality` has been set the answers can be localized.
|
||||
|
||||
A cluster in Envoy is defined as: "A group of logically similar endpoints that Envoy connects to."
|
||||
Each cluster has a name, which *traffic* extends to be a domain name. See "Naming Clusters" below.
|
||||
|
||||
The use case for this plugin is when a cluster has endpoints running in multiple (Kubernetes?)
|
||||
clusters and you need to steer traffic to (or away) from these endpoints, i.e. endpoint A needs to
|
||||
be upgraded, so all traffic to it is drained. Or the entire Kubernetes needs to upgraded, and *all*
|
||||
endpoints need to be drained from it.
|
||||
Note that the manager *itself* is also a cluster that is managed *by the management server*. This is
|
||||
the *management cluster* (see `cluster` below in "Syntax"). By default the name for cluster is `xds`.
|
||||
When bootstrapping *traffic* tries to retrieve the cluster endpoints for the management cluster.
|
||||
This continues in the background and *traffic* is smart enough to reconnect on failures or updates
|
||||
cluster configuration. If the `xds` management cluster can't be found on start up, *traffic* returns a
|
||||
fatal error.
|
||||
|
||||
For A and AAAA queries each DNS response contains a single IP address that's considered the best
|
||||
one. The TTL on these answer is set to 5s. It will only return successful responses either with an
|
||||
@@ -36,15 +40,14 @@ enough to select the best one. When SRV records are returned, the endpoint DNS n
|
||||
works as well.
|
||||
|
||||
[gRPC LB SRV records](https://github.com/grpc/proposal/blob/master/A5-grpclb-in-dns.md) are
|
||||
supported and returned by the *traffic* plugin. These are SRV records for
|
||||
`_grpclb._tcp.<name>.<domain>` and point to the xDS management servers as used in the configuration.
|
||||
The target name used for these SRV records is `grpclb-<N>.<domain>`. This means a cluster names
|
||||
of `grpclb-N` are illegal, because it used by *traffic* itself. See "Naming Clusters" below for
|
||||
details.
|
||||
supported and returned by the *traffic* plugin for all clusters. The returned endpoints are,
|
||||
however, the ones from the management cluster as these must implement gRPC LB.
|
||||
|
||||
*Traffic* implements version 3 of the xDS API. It works with the management server as written in
|
||||
<https://github.com/miekg/xds>.
|
||||
|
||||
If *traffic*'s `locality` has been set the answers can be localized.
|
||||
|
||||
## Syntax
|
||||
|
||||
~~~
|
||||
@@ -53,13 +56,15 @@ traffic TO...
|
||||
|
||||
This enabled the *traffic* plugin, with a default node ID of `coredns` and no TLS.
|
||||
|
||||
* **TO...** are the control plane endpoints to connect to. These must start with `grpc://`. The
|
||||
port number defaults to 443, if not specified.
|
||||
* **TO...** are the control plane endpoints to bootstrap from. These must start with `grpc://`. The
|
||||
port number defaults to 443, if not specified. These endpoint will be tried in the order given.
|
||||
First successful connection will be used to resolve the management cluster `xds`.
|
||||
|
||||
The extended syntax is available if you want more control.
|
||||
|
||||
~~~
|
||||
traffic TO... {
|
||||
cluster CLUSTER
|
||||
id ID
|
||||
locality REGION[,ZONE[,SUBZONE]] [REGION[,ZONE[,SUBZONE]]]...
|
||||
tls CERT KEY CA
|
||||
@@ -68,6 +73,8 @@ traffic TO... {
|
||||
}
|
||||
~~~
|
||||
|
||||
* `cluster` **CLUSTER** define the name of the management cluster. By default this is `xds`.
|
||||
|
||||
* `id` **ID** is how *traffic* identifies itself to the control plane. This defaults to
|
||||
`coredns`.
|
||||
|
||||
@@ -105,14 +112,15 @@ traffic TO... {
|
||||
When a cluster is named this usually consists out of a single word, i.e. "cluster-v0", or "web".
|
||||
The *traffic* plugins uses the name(s) specified in the Server Block to create fully qualified
|
||||
domain names. For example if the Server Block specifies `lb.example.org` as one of the names,
|
||||
and "cluster-v0" is one of the load balanced cluster, *traffic* will respond to query asking for
|
||||
and "cluster-v0" is one of the load balanced cluster, *traffic* will respond to queries asking for
|
||||
`cluster-v0.lb.example.org.` and the same goes for `web`; `web.lb.example.org`.
|
||||
|
||||
For SRV queries all endpoints are returned, the SRV target names are synthesized:
|
||||
`endpoint-<N>.web.lb.example.org` to take the example from above. *N* is an integer starting with 0.
|
||||
|
||||
gRPC LB integration is also done by returning the correct SRV records. A gRPC client will ask for
|
||||
`_grpclb._tcp.web.lb.example.org` and expect to get the SRV (and address records) to tell it where
|
||||
For the management cluster `_grpclb._tcp.<cluster>.<name>` will also be resolved in the same way as
|
||||
normal SRV queries. This special case is done because gRPC lib
|
||||
|
||||
the gRPC LBs are. For each **TO** in the configuration *traffic* will return a SRV record. The
|
||||
target name in the SRV are synthesized as well, using `grpclb-N` to prefix the zone from the Corefile,
|
||||
i.e. `grpclb-0.lb.example.org` will be the gRPC name when using `lb.example.org` in the configuration.
|
||||
|
||||
@@ -67,7 +67,7 @@ func setup(c *caddy.Controller) error {
|
||||
|
||||
func parseTraffic(c *caddy.Controller) (*Traffic, error) {
|
||||
toHosts := []string{}
|
||||
t := &Traffic{node: "coredns"}
|
||||
t := &Traffic{node: "coredns", mgmt: "xds"}
|
||||
var (
|
||||
err error
|
||||
tlsConfig *tls.Config
|
||||
@@ -98,6 +98,12 @@ func parseTraffic(c *caddy.Controller) (*Traffic, error) {
|
||||
}
|
||||
for c.NextBlock() {
|
||||
switch c.Val() {
|
||||
case "cluster":
|
||||
args := c.RemainingArgs()
|
||||
if len(args) != 1 {
|
||||
return nil, c.ArgErr()
|
||||
}
|
||||
t.mgmt = args[0]
|
||||
case "id":
|
||||
args := c.RemainingArgs()
|
||||
if len(args) != 1 {
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -21,6 +20,7 @@ import (
|
||||
type Traffic struct {
|
||||
c *xds.Client
|
||||
node string
|
||||
mgmt string
|
||||
tlsConfig *tls.Config
|
||||
hosts []string
|
||||
|
||||
@@ -29,9 +29,6 @@ type Traffic struct {
|
||||
origins []string
|
||||
loc []xds.Locality
|
||||
|
||||
grpcSRV []dns.RR // SRV records for grpc LB
|
||||
grpcAddr []dns.RR // Address records for each LB (taken from **TOO**)
|
||||
|
||||
Next plugin.Handler
|
||||
}
|
||||
|
||||
@@ -54,26 +51,51 @@ func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg
|
||||
|
||||
sockaddr, ok := t.c.Select(cluster, t.loc, t.health)
|
||||
if !ok {
|
||||
// ok the cluster (which has potentially extra labels), doesn't exist, but we may have a query for endpoint-0.<cluster>.
|
||||
// check if we have 2 labels and that the first equals endpoint-0.
|
||||
if dns.CountLabel(cluster) != 2 {
|
||||
m.Ns = soa(state.Zone)
|
||||
m.Rcode = dns.RcodeNameError
|
||||
w.WriteMsg(m)
|
||||
return 0, nil
|
||||
}
|
||||
// ok this cluster doesn't exist, potentially due to extra labels, which may be garbage or legit queries:
|
||||
// legit is:
|
||||
// endpoint-N.cluster
|
||||
// _grpclb._tcp.cluster
|
||||
// _tcp.cluster
|
||||
labels := dns.SplitDomainName(cluster)
|
||||
if strings.HasPrefix(labels[0], "endpoint-") {
|
||||
// recheck if the cluster exist.
|
||||
cluster = labels[1]
|
||||
sockaddr, ok = t.c.Select(cluster, t.loc, t.health)
|
||||
if !ok {
|
||||
switch len(labels) {
|
||||
case 2:
|
||||
// endpoint or _tcp
|
||||
if strings.ToLower(labels[0]) == "_tcp" {
|
||||
// nodata, because empty non-terminal
|
||||
m.Ns = soa(state.Zone)
|
||||
m.Rcode = dns.RcodeSuccess
|
||||
w.WriteMsg(m)
|
||||
return 0, nil
|
||||
}
|
||||
if strings.HasPrefix(strings.ToLower(labels[0]), "endpoint-") {
|
||||
// recheck if the cluster exist.
|
||||
cluster = labels[1]
|
||||
sockaddr, ok = t.c.Select(cluster, t.loc, t.health)
|
||||
if !ok {
|
||||
m.Ns = soa(state.Zone)
|
||||
m.Rcode = dns.RcodeNameError
|
||||
w.WriteMsg(m)
|
||||
return 0, nil
|
||||
}
|
||||
return t.serveEndpoint(ctx, state, labels[0], cluster)
|
||||
}
|
||||
case 3:
|
||||
if strings.ToLower(labels[0]) != "_grpclb" || strings.ToLower(labels[1]) != "_tcp" {
|
||||
m.Ns = soa(state.Zone)
|
||||
m.Rcode = dns.RcodeNameError
|
||||
w.WriteMsg(m)
|
||||
return 0, nil
|
||||
}
|
||||
return t.serveEndpoint(ctx, state, labels[0], cluster)
|
||||
// OK, _grcplb._tcp query; we need to return the endpoint for the mgmt cluster *NOT* the cluster
|
||||
// we got the query for. This should exist, but we'll check later anyway
|
||||
cluster = t.mgmt
|
||||
sockaddr, _ = t.c.Select(cluster, t.loc, t.health)
|
||||
break
|
||||
default:
|
||||
m.Ns = soa(state.Zone)
|
||||
m.Rcode = dns.RcodeNameError
|
||||
w.WriteMsg(m)
|
||||
return 0, nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -193,46 +215,3 @@ func soa(z string) []dns.RR {
|
||||
Minttl: 5,
|
||||
}}
|
||||
}
|
||||
|
||||
// srv record for grpclb endpoint.
|
||||
func srv(i int, host, zone string) *dns.SRV {
|
||||
target := fmt.Sprintf("grpclb-%d.%s", i, zone)
|
||||
hdr := dns.RR_Header{Name: dnsutil.Join("_grpclb._tcp", zone), Class: dns.ClassINET, Rrtype: dns.TypeSRV}
|
||||
_, p, _ := net.SplitHostPort(host) // err ignored already checked in setup
|
||||
port, _ := strconv.Atoi(p)
|
||||
return &dns.SRV{
|
||||
Hdr: hdr,
|
||||
// prio, weight -> 0
|
||||
Port: uint16(port),
|
||||
Target: target,
|
||||
}
|
||||
}
|
||||
|
||||
func a(i int, host, zone string) *dns.A {
|
||||
owner := fmt.Sprintf("grpclb-%d.%s", i, zone)
|
||||
hdr := dns.RR_Header{Name: owner, Class: dns.ClassINET, Rrtype: dns.TypeA}
|
||||
h, _, _ := net.SplitHostPort(host)
|
||||
ip := net.ParseIP(h)
|
||||
if ip == nil {
|
||||
return nil
|
||||
}
|
||||
if ip.To4() == nil {
|
||||
return nil
|
||||
}
|
||||
return &dns.A{Hdr: hdr, A: ip.To4()}
|
||||
}
|
||||
|
||||
func aaaa(i int, host, zone string) *dns.AAAA {
|
||||
owner := fmt.Sprintf("grpclb-%d.%s", i, zone)
|
||||
hdr := dns.RR_Header{Name: owner, Class: dns.ClassINET, Rrtype: dns.TypeAAAA}
|
||||
h, _, _ := net.SplitHostPort(host)
|
||||
ip := net.ParseIP(h)
|
||||
if ip == nil {
|
||||
return nil
|
||||
}
|
||||
if ip.To4() != nil {
|
||||
return nil
|
||||
}
|
||||
return &dns.AAAA{Hdr: hdr, AAAA: ip.To16()}
|
||||
|
||||
}
|
||||
|
||||
@@ -256,6 +256,46 @@ func TestTrafficSRV(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestTrafficManagement(t *testing.T) {
|
||||
c, err := xds.New("127.0.0.1:0", "test-id", grpc.WithInsecure())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
tr := &Traffic{c: c, origins: []string{"lb.example.org."}, mgmt: "xds"}
|
||||
|
||||
for _, cla := range []*endpointpb.ClusterLoadAssignment{
|
||||
&endpointpb.ClusterLoadAssignment{
|
||||
ClusterName: "web",
|
||||
Endpoints: endpoints([]EndpointHealth{{"127.0.0.1", 18008, corepb.HealthStatus_HEALTHY}}),
|
||||
},
|
||||
&endpointpb.ClusterLoadAssignment{
|
||||
ClusterName: "xds",
|
||||
Endpoints: endpoints([]EndpointHealth{{"::1", 18008, corepb.HealthStatus_HEALTHY}}),
|
||||
},
|
||||
} {
|
||||
a := xds.NewAssignment()
|
||||
a.SetClusterLoadAssignment(cla.ClusterName, cla)
|
||||
c.SetAssignments(a)
|
||||
}
|
||||
ctx := context.TODO()
|
||||
|
||||
// Now we ask for the grpclb endpoint in the web cluster, this should give us the endpoint of the xds (mgmt) cluster.
|
||||
// ; ANSWER SECTION:
|
||||
// _grpclb._tcp.web.lb.example.org. 5 IN SRV 100 100 18008 endpoint-0.xds.lb.example.org.
|
||||
// ;; ADDITIONAL SECTION:
|
||||
// endpoint-0.xds.lb.example.org. 5 IN AAAA ::1
|
||||
|
||||
m := new(dns.Msg)
|
||||
m.SetQuestion("_grpclb._tcp.web.lb.example.org.", dns.TypeSRV)
|
||||
rec := dnstest.NewRecorder(&test.ResponseWriter{})
|
||||
if _, err := tr.ServeDNS(ctx, rec, m); err != nil {
|
||||
t.Errorf("Expected no error, but got %q", err)
|
||||
}
|
||||
if x := rec.Msg.Answer[0].(*dns.SRV).Target; x != "endpoint-0.xds.lb.example.org." {
|
||||
t.Errorf("Expected %s, got %s", "endpoint-0.xds.lb.example.org.", x)
|
||||
}
|
||||
}
|
||||
|
||||
type EndpointHealth struct {
|
||||
Address string
|
||||
Port uint16
|
||||
|
||||
Reference in New Issue
Block a user