Adds some locality stuff

Signed-off-by: Miek Gieben <miek@miek.nl>
This commit is contained in:
Miek Gieben
2020-01-24 13:34:59 +01:00
parent eaa7f0d6eb
commit 2d14fa270b
6 changed files with 86 additions and 25 deletions

View File

@@ -125,9 +125,32 @@ func parseTraffic(c *caddy.Controller) (*Traffic, error) {
return t, nil
}
// parseLoc parses string s into loc's. Each loc must be space separated from the other, inside
// parseLocality parses string s into loc's. Each loc must be space separated from the other, inside
// a loc we have region,zone,subzone, where subzone or subzone and zone maybe empty. If specified
// they must be comma separate (not spaces or anything).
func parseLoc(s string) ([]loc, error) {
return nil, nil
func parseLocality(s string) ([]xds.Locality, error) {
sets := strings.Fields(s)
if len(sets) == 0 {
return nil, nil
}
locs := []xds.Locality{}
for _, s := range sets {
l := strings.Split(s, ",")
switch len(l) {
default:
return nil, fmt.Errorf("too many location specifiers: %q", s)
case 1:
locs = append(locs, xds.Locality{Region: l[0]})
continue
case 2:
locs = append(locs, xds.Locality{Region: l[0], Zone: l[1]})
continue
case 3:
locs = append(locs, xds.Locality{Region: l[0], Zone: l[1], SubZone: l[2]})
continue
}
}
return locs, nil
}

View File

@@ -49,3 +49,32 @@ func TestParseTraffic(t *testing.T) {
}
}
}
func testParseLocality(t *testing.T) {
s := "region"
locs, err := parseLocality(s)
if err != nil {
t.Fatal(err)
}
if locs[0].Region != "region" {
t.Errorf("Expected %s, but got %s", "region", locs[0].Region)
}
s = "region1,zone,sub region2"
locs, err = parseLocality(s)
if err != nil {
t.Fatal(err)
}
if locs[0].Zone != "zone" {
t.Errorf("Expected %s, but got %s", "zone", locs[1].Zone)
}
if locs[0].SubZone != "sub" {
t.Errorf("Expected %s, but got %s", "sub", locs[1].SubZone)
}
if locs[1].Region != "region2" {
t.Errorf("Expected %s, but got %s", "region2", locs[1].Region)
}
if locs[1].Zone != "" {
t.Errorf("Expected %s, but got %s", "", locs[1].Zone)
}
}

View File

@@ -17,11 +17,11 @@ import (
// Traffic is a plugin that load balances according to assignments.
type Traffic struct {
c *xds.Client
id string
health bool
origins []string
locality []loc
c *xds.Client
id string
health bool
origins []string
loc []xds.Locality
Next plugin.Handler
}
@@ -43,7 +43,7 @@ func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg
m.SetReply(r)
m.Authoritative = true
sockaddr, ok := t.c.Select(cluster, t.health)
sockaddr, ok := t.c.Select(cluster, t.loc, t.health)
if !ok {
// ok the cluster (which has potentially extra labels), doesn't exist, but we may have a query for endpoint-0.<cluster>.
// check if we have 2 labels and that the first equals endpoint-0.
@@ -57,7 +57,7 @@ func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg
if strings.HasPrefix(labels[0], "endpoint-") {
// recheck if the cluster exist.
cluster = labels[1]
sockaddr, ok = t.c.Select(cluster, t.health)
sockaddr, ok = t.c.Select(cluster, t.loc, t.health)
if !ok {
m.Ns = soa(state.Zone)
m.Rcode = dns.RcodeNameError
@@ -90,7 +90,7 @@ func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg
}
m.Answer = []dns.RR{&dns.AAAA{Hdr: dns.RR_Header{Name: state.QName(), Rrtype: dns.TypeAAAA, Class: dns.ClassINET, Ttl: 5}, AAAA: sockaddr.Address()}}
case dns.TypeSRV:
sockaddrs, _ := t.c.All(cluster, t.health)
sockaddrs, _ := t.c.All(cluster, t.loc, t.health)
for i, sa := range sockaddrs {
target := fmt.Sprintf("endpoint-%d.%s.%s", i, cluster, state.Zone)
@@ -135,7 +135,7 @@ func (t *Traffic) serveEndpoint(ctx context.Context, state request.Request, endp
return 0, nil
}
sockaddrs, _ := t.c.All(cluster, t.health)
sockaddrs, _ := t.c.All(cluster, t.loc, t.health)
if len(sockaddrs) < nr {
m.Ns = soa(state.Zone)
m.Rcode = dns.RcodeNameError
@@ -182,10 +182,3 @@ func soa(z string) []dns.RR {
// Name implements the plugin.Handler interface.
func (t *Traffic) Name() string { return "traffic" }
// loc holds the locality for this server. It a list of the set region, zone, subzone.
type loc struct {
region string
zone string
subzone string
}

View File

@@ -212,9 +212,18 @@ type EndpointHealth struct {
}
func endpoints(e []EndpointHealth) []*endpointpb.LocalityLbEndpoints {
return endpointsWithLocality(e, xds.Locality{})
}
func endpointsWithLocality(e []EndpointHealth, loc xds.Locality) []*endpointpb.LocalityLbEndpoints {
ep := make([]*endpointpb.LocalityLbEndpoints, len(e))
for i := range e {
ep[i] = &endpointpb.LocalityLbEndpoints{
Locality: &corepb.Locality{
Region: loc.Region,
Zone: loc.Zone,
SubZone: loc.SubZone,
},
LbEndpoints: []*endpointpb.LbEndpoint{{
HostIdentifier: &endpointpb.LbEndpoint_Endpoint{
Endpoint: &endpointpb.Endpoint{

View File

@@ -71,7 +71,7 @@ func (a *assignment) clusters() []string {
}
// Select selects a endpoint from cluster load assignments, using weighted random selection. It only selects endpoints that are reporting healthy.
func (a *assignment) Select(cluster string, ignore bool) (*SocketAddress, bool) {
func (a *assignment) Select(cluster string, locality []Locality, ignore bool) (*SocketAddress, bool) {
cla := a.ClusterLoadAssignment(cluster)
if cla == nil {
return nil, false
@@ -126,7 +126,7 @@ func (a *assignment) Select(cluster string, ignore bool) (*SocketAddress, bool)
}
// All returns all healthy endpoints.
func (a *assignment) All(cluster string, ignore bool) ([]*SocketAddress, bool) {
func (a *assignment) All(cluster string, locality []Locality, ignore bool) ([]*SocketAddress, bool) {
cla := a.ClusterLoadAssignment(cluster)
if cla == nil {
return nil, false

View File

@@ -227,17 +227,24 @@ func (c *Client) receive(stream adsStream) error {
// Select returns an address that is deemed to be the correct one for this cluster. The returned
// boolean indicates if the cluster exists.
func (c *Client) Select(cluster string, ignore bool) (*SocketAddress, bool) {
func (c *Client) Select(cluster string, locality []Locality, ignore bool) (*SocketAddress, bool) {
if cluster == "" {
return nil, false
}
return c.assignments.Select(cluster, ignore)
return c.assignments.Select(cluster, locality, ignore)
}
// All returns all endpoints.
func (c *Client) All(cluster string, ignore bool) ([]*SocketAddress, bool) {
func (c *Client) All(cluster string, locality []Locality, ignore bool) ([]*SocketAddress, bool) {
if cluster == "" {
return nil, false
}
return c.assignments.All(cluster, ignore)
return c.assignments.All(cluster, locality, ignore)
}
// Locality holds the locality for this server. It contains a Region, Zone and SubZone.
type Locality struct {
Region string
Zone string
SubZone string
}