Signed-off-by: Miek Gieben <miek@miek.nl>
This commit is contained in:
Miek Gieben
2020-01-19 09:14:09 +01:00
parent 894ec68487
commit 64d0cfba0c
6 changed files with 20 additions and 14 deletions

View File

@@ -36,6 +36,7 @@ Then for CoreDNS, check out the `traffic` branch, create a Corefile:
example.org { example.org {
traffic grpc://127.0.0.1:18000 { traffic grpc://127.0.0.1:18000 {
id test-id id test-id
ignore_health
} }
debug debug
} }

View File

@@ -62,6 +62,7 @@ traffic TO... {
node ID node ID
tls CERT KEY CA tls CERT KEY CA
tls_servername NAME tls_servername NAME
ignore_health
} }
~~~ ~~~
@@ -78,6 +79,7 @@ traffic TO... {
* `tls_servername` **NAME** allows you to set a server name in the TLS configuration. This is needed * `tls_servername` **NAME** allows you to set a server name in the TLS configuration. This is needed
because *traffic* connects to an IP address, so it can't infer the server name from it. because *traffic* connects to an IP address, so it can't infer the server name from it.
* `ignore_health` can be enabled to ignore endpoint health status, this can aid when debugging.
## Naming Clusters ## Naming Clusters

View File

@@ -101,6 +101,8 @@ func parseTraffic(c *caddy.Controller) (*Traffic, error) {
return nil, c.ArgErr() return nil, c.ArgErr()
} }
tlsServerName = c.Val() tlsServerName = c.Val()
case "ignore_health":
t.health = true
default: default:
return nil, c.Errf("unknown property '%s'", c.Val()) return nil, c.Errf("unknown property '%s'", c.Val())
} }

View File

@@ -19,6 +19,7 @@ import (
type Traffic struct { type Traffic struct {
c *xds.Client c *xds.Client
id string id string
health bool
origins []string origins []string
Next plugin.Handler Next plugin.Handler
@@ -41,7 +42,7 @@ func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg
m.SetReply(r) m.SetReply(r)
m.Authoritative = true m.Authoritative = true
sockaddr, ok := t.c.Select(cluster) sockaddr, ok := t.c.Select(cluster, t.health)
if !ok { if !ok {
// ok the cluster (which has potentially extra labels), doesn't exist, but we may have a query for endpoint-0.<cluster>. // ok the cluster (which has potentially extra labels), doesn't exist, but we may have a query for endpoint-0.<cluster>.
// check if we have 2 labels and that the first equals endpoint-0. // check if we have 2 labels and that the first equals endpoint-0.
@@ -55,7 +56,7 @@ func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg
if strings.HasPrefix(labels[0], "endpoint-") { if strings.HasPrefix(labels[0], "endpoint-") {
// recheck if the cluster exist. // recheck if the cluster exist.
cluster = labels[1] cluster = labels[1]
sockaddr, ok = t.c.Select(cluster) sockaddr, ok = t.c.Select(cluster, t.health)
if !ok { if !ok {
m.Ns = soa(state.Zone) m.Ns = soa(state.Zone)
m.Rcode = dns.RcodeNameError m.Rcode = dns.RcodeNameError
@@ -88,7 +89,7 @@ func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg
} }
m.Answer = []dns.RR{&dns.AAAA{Hdr: dns.RR_Header{Name: state.QName(), Rrtype: dns.TypeAAAA, Class: dns.ClassINET, Ttl: 5}, AAAA: sockaddr.Address()}} m.Answer = []dns.RR{&dns.AAAA{Hdr: dns.RR_Header{Name: state.QName(), Rrtype: dns.TypeAAAA, Class: dns.ClassINET, Ttl: 5}, AAAA: sockaddr.Address()}}
case dns.TypeSRV: case dns.TypeSRV:
sockaddrs, _ := t.c.All(cluster) sockaddrs, _ := t.c.All(cluster, t.health)
for i, sa := range sockaddrs { for i, sa := range sockaddrs {
target := fmt.Sprintf("endpoint-%d.%s.%s", i, cluster, state.Zone) target := fmt.Sprintf("endpoint-%d.%s.%s", i, cluster, state.Zone)
@@ -133,7 +134,7 @@ func (t *Traffic) serveEndpoint(ctx context.Context, state request.Request, endp
return 0, nil return 0, nil
} }
sockaddrs, _ := t.c.All(cluster) sockaddrs, _ := t.c.All(cluster, t.health)
if len(sockaddrs) < nr { if len(sockaddrs) < nr {
m.Ns = soa(state.Zone) m.Ns = soa(state.Zone)
m.Rcode = dns.RcodeNameError m.Rcode = dns.RcodeNameError

View File

@@ -67,7 +67,7 @@ func (a *assignment) clusters() []string {
} }
// Select selects a endpoint from cluster load assignments, using weighted random selection. It only selects endpoints that are reporting healthy. // Select selects a endpoint from cluster load assignments, using weighted random selection. It only selects endpoints that are reporting healthy.
func (a *assignment) Select(cluster string) (*SocketAddress, bool) { func (a *assignment) Select(cluster string, ignore bool) (*SocketAddress, bool) {
cla := a.ClusterLoadAssignment(cluster) cla := a.ClusterLoadAssignment(cluster)
if cla == nil { if cla == nil {
return nil, false return nil, false
@@ -77,7 +77,7 @@ func (a *assignment) Select(cluster string) (*SocketAddress, bool) {
healthy := 0 healthy := 0
for _, ep := range cla.Endpoints { for _, ep := range cla.Endpoints {
for _, lb := range ep.GetLbEndpoints() { for _, lb := range ep.GetLbEndpoints() {
if lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY { if !ignore && lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY {
continue continue
} }
total += int(lb.GetLoadBalancingWeight().GetValue()) total += int(lb.GetLoadBalancingWeight().GetValue())
@@ -94,7 +94,7 @@ func (a *assignment) Select(cluster string) (*SocketAddress, bool) {
i := 0 i := 0
for _, ep := range cla.Endpoints { for _, ep := range cla.Endpoints {
for _, lb := range ep.GetLbEndpoints() { for _, lb := range ep.GetLbEndpoints() {
if lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY { if !ignore && lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY {
continue continue
} }
if r == i { if r == i {
@@ -109,7 +109,7 @@ func (a *assignment) Select(cluster string) (*SocketAddress, bool) {
r := rand.Intn(total) + 1 r := rand.Intn(total) + 1
for _, ep := range cla.Endpoints { for _, ep := range cla.Endpoints {
for _, lb := range ep.GetLbEndpoints() { for _, lb := range ep.GetLbEndpoints() {
if lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY { if !ignore && lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY {
continue continue
} }
r -= int(lb.GetLoadBalancingWeight().GetValue()) r -= int(lb.GetLoadBalancingWeight().GetValue())
@@ -122,7 +122,7 @@ func (a *assignment) Select(cluster string) (*SocketAddress, bool) {
} }
// All returns all healthy endpoints. // All returns all healthy endpoints.
func (a *assignment) All(cluster string) ([]*SocketAddress, bool) { func (a *assignment) All(cluster string, ignore bool) ([]*SocketAddress, bool) {
cla := a.ClusterLoadAssignment(cluster) cla := a.ClusterLoadAssignment(cluster)
if cla == nil { if cla == nil {
return nil, false return nil, false
@@ -131,7 +131,7 @@ func (a *assignment) All(cluster string) ([]*SocketAddress, bool) {
sa := []*SocketAddress{} sa := []*SocketAddress{}
for _, ep := range cla.Endpoints { for _, ep := range cla.Endpoints {
for _, lb := range ep.GetLbEndpoints() { for _, lb := range ep.GetLbEndpoints() {
if lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY { if !ignore && lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY {
continue continue
} }
sa = append(sa, &SocketAddress{lb.GetEndpoint().GetAddress().GetSocketAddress()}) sa = append(sa, &SocketAddress{lb.GetEndpoint().GetAddress().GetSocketAddress()})

View File

@@ -227,17 +227,17 @@ func (c *Client) receive(stream adsStream) error {
// Select returns an address that is deemed to be the correct one for this cluster. The returned // Select returns an address that is deemed to be the correct one for this cluster. The returned
// boolean indicates if the cluster exists. // boolean indicates if the cluster exists.
func (c *Client) Select(cluster string) (*SocketAddress, bool) { func (c *Client) Select(cluster string, ignore bool) (*SocketAddress, bool) {
if cluster == "" { if cluster == "" {
return nil, false return nil, false
} }
return c.assignments.Select(cluster) return c.assignments.Select(cluster, ignore)
} }
// All returns all endpoints. // All returns all endpoints.
func (c *Client) All(cluster string) ([]*SocketAddress, bool) { func (c *Client) All(cluster string, ignore bool) ([]*SocketAddress, bool) {
if cluster == "" { if cluster == "" {
return nil, false return nil, false
} }
return c.assignments.All(cluster) return c.assignments.All(cluster, ignore)
} }