diff --git a/plugin/traffic/README.md b/plugin/traffic/README.md index f22263ed1..ba2320773 100644 --- a/plugin/traffic/README.md +++ b/plugin/traffic/README.md @@ -24,10 +24,13 @@ endpoints need to be drained from it. discovered every 10 seconds. The plugin hands out responses that adhere to these assignments. Only endpoints that are *healthy* are handed out. -Each DNS response contains a single IP address that's considered the best one. *Traffic* will load -balance A and AAAA queries. The TTL on these answer is set to 5s. It will only return successful -responses either with an answer or otherwise a NODATA response. Queries for non-existent clusters -get a NXDOMAIN, where the minimal TTL is also set to 5s. +Each DNS response contains a single IP address (or SRV record) that's considered the best one. +*Traffic* will load balance A, AAAA and SRV queries. The TTL on these answer is set to 5s. It will +only return successful responses either with an answer or otherwise a NODATA response. Queries for +non-existent clusters get a NXDOMAIN, where the minimal TTL is also set to 5s. + +When an SRV record is returned an endpoint DNS name is synthesized `endpoint-0..` that +carries the IP address. Querying for these synthesized names works as well. The *traffic* plugin has no notion of draining, drop overload and anything that advanced, *it just acts upon assignments*. This is means that if a endpoint goes down and *traffic* has not seen a new @@ -37,7 +40,7 @@ Load reporting is not supported for the following reason. A DNS query is done by Behind this resolver (which can also cache) there may be many clients that will use this reply. The responding server (CoreDNS) has no idea how many clients use this resolver. So reporting a load of +1 on the CoreDNS side can results in anything from 1 to 1000+ of queries on the endpoint, making -the load reporting from *trafifc* highly inaccurate. +the load reporting from *traffic* highly inaccurate. ## Syntax @@ -97,7 +100,6 @@ If monitoring is enabled (via the *prometheus* plugin) then the following metric * `coredns_traffic_clusters_tracked{}` the number of tracked clusters. - ## Examples ~~~ diff --git a/plugin/traffic/traffic.go b/plugin/traffic/traffic.go index 0090bd780..2377e4c56 100644 --- a/plugin/traffic/traffic.go +++ b/plugin/traffic/traffic.go @@ -38,12 +38,27 @@ func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg m.SetReply(r) m.Authoritative = true - addr, ok := t.c.Select(cluster) + addr, port, ok := t.c.Select(cluster) if !ok { - m.Ns = soa(state.Zone) - m.Rcode = dns.RcodeNameError - w.WriteMsg(m) - return 0, nil + // ok the cluster (which has potentially extra labels), doesn't exist, but we may have a query for endpoint-0.. + // check if we have 2 labels and that the first equals endpoint-0. + if dns.CountLabel(cluster) != 2 { + m.Ns = soa(state.Zone) + m.Rcode = dns.RcodeNameError + w.WriteMsg(m) + return 0, nil + } + labels := dns.SplitDomainName(cluster) + if strings.Compare(labels[0], "endpoint-0") == 0 { + // recheck if the cluster exist. + addr, port, ok = t.c.Select(labels[1]) + if !ok { + m.Ns = soa(state.Zone) + m.Rcode = dns.RcodeNameError + w.WriteMsg(m) + return 0, nil + } + } } if addr == nil { @@ -66,7 +81,16 @@ func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg m.Ns = soa(state.Zone) break } - m.Answer = []dns.RR{&dns.AAAA{Hdr: dns.RR_Header{Name: state.QName(), Rrtype: dns.TypeA, Class: dns.ClassINET, Ttl: 5}, AAAA: addr}} + m.Answer = []dns.RR{&dns.AAAA{Hdr: dns.RR_Header{Name: state.QName(), Rrtype: dns.TypeAAAA, Class: dns.ClassINET, Ttl: 5}, AAAA: addr}} + case dns.TypeSRV: + target := dnsutil.Join("endpoint-0", cluster) + state.Zone + m.Answer = []dns.RR{&dns.SRV{Hdr: dns.RR_Header{Name: state.QName(), Rrtype: dns.TypeA, Class: dns.ClassINET, Ttl: 5}, + Priority: 100, Weight: 100, Port: port, Target: target}} + if addr.To4() == nil { + m.Extra = []dns.RR{&dns.AAAA{Hdr: dns.RR_Header{Name: target, Rrtype: dns.TypeA, Class: dns.ClassINET, Ttl: 5}, AAAA: addr}} + } else { + m.Extra = []dns.RR{&dns.A{Hdr: dns.RR_Header{Name: target, Rrtype: dns.TypeA, Class: dns.ClassINET, Ttl: 5}, A: addr}} + } default: m.Ns = soa(state.Zone) } diff --git a/plugin/traffic/traffic_test.go b/plugin/traffic/traffic_test.go index 7f1fb2648..8a2ec73c9 100644 --- a/plugin/traffic/traffic_test.go +++ b/plugin/traffic/traffic_test.go @@ -47,15 +47,22 @@ func TestTraffic(t *testing.T) { { cla: &xdspb.ClusterLoadAssignment{ ClusterName: "web", - Endpoints: endpoints([]EndpointHealth{{"127.0.0.1", corepb.HealthStatus_HEALTHY}}), + Endpoints: endpoints([]EndpointHealth{{"127.0.0.1", 18008, corepb.HealthStatus_HEALTHY}}), }, cluster: "web", qtype: dns.TypeA, rcode: dns.RcodeSuccess, answer: "127.0.0.1", }, + { + cla: &xdspb.ClusterLoadAssignment{ + ClusterName: "web", + Endpoints: endpoints([]EndpointHealth{{"::1", 18008, corepb.HealthStatus_HEALTHY}}), + }, + cluster: "web", qtype: dns.TypeAAAA, rcode: dns.RcodeSuccess, answer: "::1", + }, // unknown backend { cla: &xdspb.ClusterLoadAssignment{ ClusterName: "web", - Endpoints: endpoints([]EndpointHealth{{"127.0.0.1", corepb.HealthStatus_UNKNOWN}}), + Endpoints: endpoints([]EndpointHealth{{"127.0.0.1", 18008, corepb.HealthStatus_UNKNOWN}}), }, cluster: "web", qtype: dns.TypeA, rcode: dns.RcodeSuccess, ns: true, }, @@ -64,12 +71,32 @@ func TestTraffic(t *testing.T) { cla: &xdspb.ClusterLoadAssignment{ ClusterName: "web", Endpoints: endpoints([]EndpointHealth{ - {"127.0.0.1", corepb.HealthStatus_UNKNOWN}, - {"127.0.0.2", corepb.HealthStatus_HEALTHY}, + {"127.0.0.1", 18008, corepb.HealthStatus_UNKNOWN}, + {"127.0.0.2", 18008, corepb.HealthStatus_HEALTHY}, }), }, cluster: "web", qtype: dns.TypeA, rcode: dns.RcodeSuccess, answer: "127.0.0.2", }, + // SRV query healthy backend + { + cla: &xdspb.ClusterLoadAssignment{ + ClusterName: "web", + Endpoints: endpoints([]EndpointHealth{ + {"127.0.0.2", 18008, corepb.HealthStatus_HEALTHY}, + }), + }, + cluster: "web", qtype: dns.TypeSRV, rcode: dns.RcodeSuccess, answer: "endpoint-0.web.lb.example.org.", + }, + // A query for endpoint-0. + { + cla: &xdspb.ClusterLoadAssignment{ + ClusterName: "web", + Endpoints: endpoints([]EndpointHealth{ + {"127.0.0.2", 18008, corepb.HealthStatus_HEALTHY}, + }), + }, + cluster: "endpoint-0.web", qtype: dns.TypeA, rcode: dns.RcodeSuccess, answer: "127.0.0.2", + }, } ctx := context.TODO() @@ -105,18 +132,19 @@ func TestTraffic(t *testing.T) { addr = x.A.String() case *dns.AAAA: addr = x.AAAA.String() + case *dns.SRV: + addr = x.Target } if tc.answer != addr { t.Errorf("Test %d: Expected answer %s, but got %s", i, tc.answer, addr) } - } - } } type EndpointHealth struct { Address string + Port uint16 Health corepb.HealthStatus } @@ -131,6 +159,9 @@ func endpoints(e []EndpointHealth) []*endpointpb.LocalityLbEndpoints { Address: &corepb.Address_SocketAddress{ SocketAddress: &corepb.SocketAddress{ Address: e[i].Address, + PortSpecifier: &corepb.SocketAddress_PortValue{ + PortValue: uint32(e[i].Port), + }, }, }, }, diff --git a/plugin/traffic/xds/assignment.go b/plugin/traffic/xds/assignment.go index c8d1a5798..4bea5cb0f 100644 --- a/plugin/traffic/xds/assignment.go +++ b/plugin/traffic/xds/assignment.go @@ -59,12 +59,11 @@ func (a *assignment) clusters() []string { return clusters } -// Select selects a backend from cluster load assignments, using weighted random selection. It only selects -// backends that are reporting healthy. -func (a *assignment) Select(cluster string) (net.IP, bool) { +// Select selects a backend from cluster load assignments, using weighted random selection. It only selects backends that are reporting healthy. +func (a *assignment) Select(cluster string) (ip net.IP, port uint16, exists bool) { cla := a.ClusterLoadAssignment(cluster) if cla == nil { - return nil, false + return nil, 0, false } total := 0 @@ -79,7 +78,7 @@ func (a *assignment) Select(cluster string) (net.IP, bool) { } } if healthy == 0 { - return nil, true + return nil, 0, true } if total == 0 { @@ -92,12 +91,14 @@ func (a *assignment) Select(cluster string) (net.IP, bool) { continue } if r == i { - return net.ParseIP(lb.GetEndpoint().GetAddress().GetSocketAddress().GetAddress()), true + addr := net.ParseIP(lb.GetEndpoint().GetAddress().GetSocketAddress().GetAddress()) + port := uint16(lb.GetEndpoint().GetAddress().GetSocketAddress().GetPortValue()) + return addr, port, true } i++ } } - return nil, true + return nil, 0, true } r := rand.Intn(total) + 1 @@ -108,9 +109,11 @@ func (a *assignment) Select(cluster string) (net.IP, bool) { } r -= int(lb.GetLoadBalancingWeight().GetValue()) if r <= 0 { - return net.ParseIP(lb.GetEndpoint().GetAddress().GetSocketAddress().GetAddress()), true + addr := net.ParseIP(lb.GetEndpoint().GetAddress().GetSocketAddress().GetAddress()) + port := uint16(lb.GetEndpoint().GetAddress().GetSocketAddress().GetPortValue()) + return addr, port, true } } } - return nil, true + return nil, 0, true } diff --git a/plugin/traffic/xds/client.go b/plugin/traffic/xds/client.go index acb395a12..315304b47 100644 --- a/plugin/traffic/xds/client.go +++ b/plugin/traffic/xds/client.go @@ -228,9 +228,9 @@ func (c *Client) receive(stream adsStream) error { // Select returns an address that is deemed to be the correct one for this cluster. The returned // boolean indicates if the cluster exists. -func (c *Client) Select(cluster string) (net.IP, bool) { +func (c *Client) Select(cluster string) (net.IP, uint16, bool) { if cluster == "" { - return nil, false + return nil, 0, false } return c.assignments.Select(cluster) }