From 2d14fa270bbf5d02e65b896cbc27857f3d4f8489 Mon Sep 17 00:00:00 2001 From: Miek Gieben Date: Fri, 24 Jan 2020 13:34:59 +0100 Subject: [PATCH] Adds some locality stuff Signed-off-by: Miek Gieben --- plugin/traffic/setup.go | 29 ++++++++++++++++++++++++++--- plugin/traffic/setup_test.go | 29 +++++++++++++++++++++++++++++ plugin/traffic/traffic.go | 25 +++++++++---------------- plugin/traffic/traffic_test.go | 9 +++++++++ plugin/traffic/xds/assignment.go | 4 ++-- plugin/traffic/xds/client.go | 15 +++++++++++---- 6 files changed, 86 insertions(+), 25 deletions(-) diff --git a/plugin/traffic/setup.go b/plugin/traffic/setup.go index 51b690a44..9b0f70039 100644 --- a/plugin/traffic/setup.go +++ b/plugin/traffic/setup.go @@ -125,9 +125,32 @@ func parseTraffic(c *caddy.Controller) (*Traffic, error) { return t, nil } -// parseLoc parses string s into loc's. Each loc must be space separated from the other, inside +// parseLocality parses string s into loc's. Each loc must be space separated from the other, inside // a loc we have region,zone,subzone, where subzone or subzone and zone maybe empty. If specified // they must be comma separate (not spaces or anything). -func parseLoc(s string) ([]loc, error) { - return nil, nil +func parseLocality(s string) ([]xds.Locality, error) { + sets := strings.Fields(s) + if len(sets) == 0 { + return nil, nil + } + + locs := []xds.Locality{} + for _, s := range sets { + l := strings.Split(s, ",") + switch len(l) { + default: + return nil, fmt.Errorf("too many location specifiers: %q", s) + case 1: + locs = append(locs, xds.Locality{Region: l[0]}) + continue + case 2: + locs = append(locs, xds.Locality{Region: l[0], Zone: l[1]}) + continue + case 3: + locs = append(locs, xds.Locality{Region: l[0], Zone: l[1], SubZone: l[2]}) + continue + } + + } + return locs, nil } diff --git a/plugin/traffic/setup_test.go b/plugin/traffic/setup_test.go index de7056a32..14b0f5037 100644 --- a/plugin/traffic/setup_test.go +++ b/plugin/traffic/setup_test.go @@ -49,3 +49,32 @@ func TestParseTraffic(t *testing.T) { } } } + +func testParseLocality(t *testing.T) { + s := "region" + locs, err := parseLocality(s) + if err != nil { + t.Fatal(err) + } + if locs[0].Region != "region" { + t.Errorf("Expected %s, but got %s", "region", locs[0].Region) + } + + s = "region1,zone,sub region2" + locs, err = parseLocality(s) + if err != nil { + t.Fatal(err) + } + if locs[0].Zone != "zone" { + t.Errorf("Expected %s, but got %s", "zone", locs[1].Zone) + } + if locs[0].SubZone != "sub" { + t.Errorf("Expected %s, but got %s", "sub", locs[1].SubZone) + } + if locs[1].Region != "region2" { + t.Errorf("Expected %s, but got %s", "region2", locs[1].Region) + } + if locs[1].Zone != "" { + t.Errorf("Expected %s, but got %s", "", locs[1].Zone) + } +} diff --git a/plugin/traffic/traffic.go b/plugin/traffic/traffic.go index bbc7598e6..be5f48b0d 100644 --- a/plugin/traffic/traffic.go +++ b/plugin/traffic/traffic.go @@ -17,11 +17,11 @@ import ( // Traffic is a plugin that load balances according to assignments. type Traffic struct { - c *xds.Client - id string - health bool - origins []string - locality []loc + c *xds.Client + id string + health bool + origins []string + loc []xds.Locality Next plugin.Handler } @@ -43,7 +43,7 @@ func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg m.SetReply(r) m.Authoritative = true - sockaddr, ok := t.c.Select(cluster, t.health) + sockaddr, ok := t.c.Select(cluster, t.loc, t.health) if !ok { // ok the cluster (which has potentially extra labels), doesn't exist, but we may have a query for endpoint-0.. // check if we have 2 labels and that the first equals endpoint-0. @@ -57,7 +57,7 @@ func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg if strings.HasPrefix(labels[0], "endpoint-") { // recheck if the cluster exist. cluster = labels[1] - sockaddr, ok = t.c.Select(cluster, t.health) + sockaddr, ok = t.c.Select(cluster, t.loc, t.health) if !ok { m.Ns = soa(state.Zone) m.Rcode = dns.RcodeNameError @@ -90,7 +90,7 @@ func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg } m.Answer = []dns.RR{&dns.AAAA{Hdr: dns.RR_Header{Name: state.QName(), Rrtype: dns.TypeAAAA, Class: dns.ClassINET, Ttl: 5}, AAAA: sockaddr.Address()}} case dns.TypeSRV: - sockaddrs, _ := t.c.All(cluster, t.health) + sockaddrs, _ := t.c.All(cluster, t.loc, t.health) for i, sa := range sockaddrs { target := fmt.Sprintf("endpoint-%d.%s.%s", i, cluster, state.Zone) @@ -135,7 +135,7 @@ func (t *Traffic) serveEndpoint(ctx context.Context, state request.Request, endp return 0, nil } - sockaddrs, _ := t.c.All(cluster, t.health) + sockaddrs, _ := t.c.All(cluster, t.loc, t.health) if len(sockaddrs) < nr { m.Ns = soa(state.Zone) m.Rcode = dns.RcodeNameError @@ -182,10 +182,3 @@ func soa(z string) []dns.RR { // Name implements the plugin.Handler interface. func (t *Traffic) Name() string { return "traffic" } - -// loc holds the locality for this server. It a list of the set region, zone, subzone. -type loc struct { - region string - zone string - subzone string -} diff --git a/plugin/traffic/traffic_test.go b/plugin/traffic/traffic_test.go index 43b5ad605..75e8c1e53 100644 --- a/plugin/traffic/traffic_test.go +++ b/plugin/traffic/traffic_test.go @@ -212,9 +212,18 @@ type EndpointHealth struct { } func endpoints(e []EndpointHealth) []*endpointpb.LocalityLbEndpoints { + return endpointsWithLocality(e, xds.Locality{}) +} + +func endpointsWithLocality(e []EndpointHealth, loc xds.Locality) []*endpointpb.LocalityLbEndpoints { ep := make([]*endpointpb.LocalityLbEndpoints, len(e)) for i := range e { ep[i] = &endpointpb.LocalityLbEndpoints{ + Locality: &corepb.Locality{ + Region: loc.Region, + Zone: loc.Zone, + SubZone: loc.SubZone, + }, LbEndpoints: []*endpointpb.LbEndpoint{{ HostIdentifier: &endpointpb.LbEndpoint_Endpoint{ Endpoint: &endpointpb.Endpoint{ diff --git a/plugin/traffic/xds/assignment.go b/plugin/traffic/xds/assignment.go index 8c2b0334f..3608f9fbb 100644 --- a/plugin/traffic/xds/assignment.go +++ b/plugin/traffic/xds/assignment.go @@ -71,7 +71,7 @@ func (a *assignment) clusters() []string { } // Select selects a endpoint from cluster load assignments, using weighted random selection. It only selects endpoints that are reporting healthy. -func (a *assignment) Select(cluster string, ignore bool) (*SocketAddress, bool) { +func (a *assignment) Select(cluster string, locality []Locality, ignore bool) (*SocketAddress, bool) { cla := a.ClusterLoadAssignment(cluster) if cla == nil { return nil, false @@ -126,7 +126,7 @@ func (a *assignment) Select(cluster string, ignore bool) (*SocketAddress, bool) } // All returns all healthy endpoints. -func (a *assignment) All(cluster string, ignore bool) ([]*SocketAddress, bool) { +func (a *assignment) All(cluster string, locality []Locality, ignore bool) ([]*SocketAddress, bool) { cla := a.ClusterLoadAssignment(cluster) if cla == nil { return nil, false diff --git a/plugin/traffic/xds/client.go b/plugin/traffic/xds/client.go index e54514f51..80db96682 100644 --- a/plugin/traffic/xds/client.go +++ b/plugin/traffic/xds/client.go @@ -227,17 +227,24 @@ func (c *Client) receive(stream adsStream) error { // Select returns an address that is deemed to be the correct one for this cluster. The returned // boolean indicates if the cluster exists. -func (c *Client) Select(cluster string, ignore bool) (*SocketAddress, bool) { +func (c *Client) Select(cluster string, locality []Locality, ignore bool) (*SocketAddress, bool) { if cluster == "" { return nil, false } - return c.assignments.Select(cluster, ignore) + return c.assignments.Select(cluster, locality, ignore) } // All returns all endpoints. -func (c *Client) All(cluster string, ignore bool) ([]*SocketAddress, bool) { +func (c *Client) All(cluster string, locality []Locality, ignore bool) ([]*SocketAddress, bool) { if cluster == "" { return nil, false } - return c.assignments.All(cluster, ignore) + return c.assignments.All(cluster, locality, ignore) +} + +// Locality holds the locality for this server. It contains a Region, Zone and SubZone. +type Locality struct { + Region string + Zone string + SubZone string }