Add traffic plugin

This allows for advanced loadbalancing and maybe geoIP loadbalancing.

Signed-off-by: Miek Gieben <miek@miek.nl>
This commit is contained in:
Miek Gieben
2019-10-05 11:45:45 +01:00
parent d6669dee80
commit d5c5ba010c
7 changed files with 462 additions and 0 deletions

115
plugin/traffic/README.md Normal file
View File

@@ -0,0 +1,115 @@
# traffic
## Name
*traffic* - handout addresses according to assignments.
## Description
The *traffic* plugin is a load balancer that allows traffic steering, weighted responses and
draining of endpoints. Endpoints are IP:port pairs. *Traffic* works as an overlay on top of other
plugins, it does not mandate any storage by itself.
*Traffic* receives (via gRPC?) *assignments* that define the weight of the endpoints in services.
The plugin takes care of handing out responses that adhere to these assignments. Assignments will
need to be updated frequently, without new updates *traffic* will hand out responses according to
the last received assignment. When there are no assignments for a service name (yet), the responses
will also be modified (see below).
An assignment covers a "service name", which is a domain name. For each service a number of backends
are expected. A backend is defined as an IP:port pair Each backend comes with a integer indicating
it relative weight. A zero means the backend exists, but should not be handed out (drain it).
*Traffic* will load balance A and AAAA queries. known to the plugin. It will return precisely one
record in a response, which is the optimal record according to the assignments and previously handed
out responses. If a service should be load balanced, but no assignment can be found a random record
from the *answer section* will be choosen.
Every message that is handled by the *traffic* plugin will have all it's TTLs set to 5 seconds,
any authority section is removed and all RRSIGs are removed from it.
The *traffic* plugin has no notion of draining, drop overload and anything that advanced, *it just
acts upon assignments*. This is means that if a backend goes down and *traffic* has not seen a new
assignment yet, it will still include this backend in responses.
## Syntax
~~~
traffic
~~~
This enables traffic load balancing for all (sub-)domains named in the server block.
## Examples
~~~ corefile
example.org {
traffic
forward . 10.12.13.14
}
~~~
This will add load balancing for domains under example.org; the upstream information comes from
10.12.13.14; depending on received assignments, replies will be let through as-is or are load balanced.
## Assignments
Assignments are given in protobuf format, but here is an example in YAML conveying the same
information. This is an example assignment for the service "www.example.org".
~~~ yaml
assignments:
- service: www.example.org
- backend: 192.168.1.1:443
assign: 4
backend: 192.168.1.2:443
assign: 6
backend: 192.168.1.3:443
assign: 0
~~~
This particular one has 3 backends, one of which is to be drained (192.168.1.3). the two remaining
ones have a non zero weighted assignment. We use "Weighted Random Selection" to select a backend:
* Add up all the weights for all the items in the list (here 8).
* Pick a number at random between 1 and the sum of the weights.
* Iterate over the items
* For the current item, subtract the item's weight from the random number.
* If less or zero pick this item, other continue with the next item.
On seeing a query for a service, *traffic* will track the reply. When it returns with an answer
*traffic* will rewrite it (and discard of any RRSIGs). Using the assignments the answer section will
be rewritten as such:
* A backend will be picked using the algorithm from above.
* The TTL on the response will be 5s for all included records.
* According to previous responses for this service and the relative weights of each backends the
best backend will be put in the response.
* If after the selection *no* backends are available an NODATA response will be sent. An SOA
record will be synthesised, and a low TTL (and negative TTL) of 5 seconds will be set.
TTL rewriting always? TODO.
Authority section will be removed.
If no assignment, randomly pick an address
other types then A and AAAA, like SRV - do the same selection.
## Bugs
This plugin does not play nice with DNSSEC - if the backend returns signatures with the answer; they
will be stripped. You can optionally sign responses on the fly by using the *dnssec* plugin.
## Also See
This is a [post on weighted random
selection](https://medium.com/@peterkellyonline/weighted-random-selection-3ff222917eb6).
## TODO
Should we add source address information (geographical load balancing) to the assignment? This can
be handled be having each backend specify an optional source range there this record should be used.
For IPv4 this must a /24 for IPv6 a /64.
Other points that require more attention:
* deleting assignments?
* last known good assignment (esp with deleting assignments)?

View File

@@ -0,0 +1,39 @@
package traffic
import (
"math/rand"
"net"
)
// assignment is an assignment for a single service. It contains multiple backends.
type assignment struct {
service string
backends []*backend
}
// backend is a backend specified by an address, port and a weight.
type backend struct {
addr net.IP
port int
weight int
}
// Select selects a backend from a, using weighted random selection
func (a assignment) Select() *backend {
total := 0
for _, b := range a.backends {
total += b.weight
}
if total == 0 {
return nil
}
r := rand.Intn(total) + 1
for _, b := range a.backends {
r -= b.weight
if r <= 0 {
return b
}
}
return nil
}

View File

@@ -0,0 +1,38 @@
package traffic
import (
"math/rand"
"net"
"testing"
"time"
)
func TestAssignment(t *testing.T) {
rand.Seed(int64(time.Now().Nanosecond()))
backends := []*backend{
{net.IPv4zero, 0, 6},
{net.IPv4allrouter, 0, 4},
{net.IPv4allsys, 0, 0},
}
a := assignment{"www.example.org", backends}
// should never get 0 weight, could be improved to check the difference between 4 and 6.
for i := 0; i < 100; i++ {
if x := a.Select(); x.weight == 0 {
t.Errorf("Expected non-nil weight for Select, got %v", x)
}
}
}
func TestAssignmentZero(t *testing.T) {
rand.Seed(int64(time.Now().Nanosecond()))
backends := []*backend{
{net.IPv4zero, 0, 0},
}
a := assignment{"www.example.org", backends}
if x := a.Select(); x != nil {
t.Errorf("Expected nil for Select, got %v", x)
}
}

40
plugin/traffic/setup.go Normal file
View File

@@ -0,0 +1,40 @@
package traffic
import (
"math/rand"
"time"
"github.com/coredns/coredns/core/dnsserver"
"github.com/coredns/coredns/plugin"
clog "github.com/coredns/coredns/plugin/pkg/log"
"github.com/caddyserver/caddy"
)
var log = clog.NewWithPlugin("traffic")
func init() { plugin.Register("traffic", setup) }
func setup(c *caddy.Controller) error {
rand.Seed(int64(time.Now().Nanosecond()))
if err := parse(c); err != nil {
return plugin.Error("traffic", err)
}
dnsserver.GetConfig(c).AddPlugin(func(next plugin.Handler) plugin.Handler {
return &Traffic{Next: next, assignments: make(map[string]assignment)}
})
return nil
}
func parse(c *caddy.Controller) error {
for c.Next() {
args := c.RemainingArgs()
if len(args) != 0 {
return c.ArgErr()
}
}
return nil
}

View File

@@ -0,0 +1,34 @@
package traffic
import (
"testing"
"github.com/caddyserver/caddy"
)
func TestSetup(t *testing.T) {
tests := []struct {
input string
shouldErr bool
}{
// positive
{`traffic`, false},
// negative
{`traffic fleeb`, true},
}
for i, test := range tests {
c := caddy.NewTestController("dns", test.input)
err := parse(c)
if test.shouldErr && err == nil {
t.Errorf("Test %d: Expected error but found %s for input %s", i, err, test.input)
}
if err != nil {
if !test.shouldErr {
t.Errorf("Test %d: Expected no error but found one for input %s. Error was: %v", i, test.input, err)
}
}
}
}

73
plugin/traffic/traffic.go Normal file
View File

@@ -0,0 +1,73 @@
package traffic
import (
"context"
"math/rand"
"sync"
"time"
"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/pkg/response"
"github.com/coredns/coredns/request"
"github.com/miekg/dns"
)
// Traffic is a plugin that load balances according to assignments.
type Traffic struct {
assignments map[string]assignment // zone -> assignment
mu sync.RWMutex // protects assignments
Next plugin.Handler
}
// ServeDNS implements the plugin.Handler interface.
func (t *Traffic) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) {
state := request.Request{W: w, Req: r}
tw := &ResponseWriter{ResponseWriter: w}
t.mu.RLock()
a, ok := t.assignments[state.Name()]
t.mu.RUnlock()
if ok {
tw.a = &a
}
return plugin.NextOrFailure(t.Name(), t.Next, ctx, tw, r)
}
// Name implements the plugin.Handler interface.
func (t *Traffic) Name() string { return "traffic" }
// ResponseWriter writes a traffic load balanced response.
type ResponseWriter struct {
dns.ResponseWriter
a *assignment
}
// WriteMsg implements the dns.ResponseWriter interface.
func (r *ResponseWriter) WriteMsg(res *dns.Msg) error {
// set all TTLs to 5, also negative TTL?
if res.Rcode != dns.RcodeSuccess {
return r.ResponseWriter.WriteMsg(res)
}
if res.Question[0].Qtype != dns.TypeA && res.Question[0].Qtype != dns.TypeAAAA {
return r.ResponseWriter.WriteMsg(res)
}
typ, _ := response.Typify(res, time.Now().UTC())
if typ != response.NoError {
return r.ResponseWriter.WriteMsg(res)
}
// ok, traffic-lb
if r.a != nil {
}
if len(res.Answer) > 1 {
res.Answer = []dns.RR{res.Answer[rand.Intn(len(res.Answer))]}
res.Answer[0].Header().Ttl = 5
}
res.Ns = []dns.RR{} // remove auth section, we don't care
return r.ResponseWriter.WriteMsg(res)
}

View File

@@ -0,0 +1,123 @@
package traffic
/*
func TestTraffic(t *testing.T) {
rm := Traffic{Next: handler()}
// the first X records must be cnames after this test
tests := []struct {
answer []dns.RR
extra []dns.RR
cnameAnswer int
cnameExtra int
addressAnswer int
addressExtra int
mxAnswer int
mxExtra int
}{
{
answer: []dns.RR{
test.CNAME("cname1.region2.skydns.test. 300 IN CNAME cname2.region2.skydns.test."),
test.CNAME("cname2.region2.skydns.test. 300 IN CNAME cname3.region2.skydns.test."),
test.CNAME("cname5.region2.skydns.test. 300 IN CNAME cname6.region2.skydns.test."),
test.CNAME("cname6.region2.skydns.test. 300 IN CNAME endpoint.region2.skydns.test."),
test.A("endpoint.region2.skydns.test. 300 IN A 10.240.0.1"),
test.MX("mx.region2.skydns.test. 300 IN MX 1 mx1.region2.skydns.test."),
test.MX("mx.region2.skydns.test. 300 IN MX 2 mx2.region2.skydns.test."),
test.MX("mx.region2.skydns.test. 300 IN MX 3 mx3.region2.skydns.test."),
},
cnameAnswer: 4,
addressAnswer: 1,
mxAnswer: 3,
},
{
answer: []dns.RR{
test.A("endpoint.region2.skydns.test. 300 IN A 10.240.0.1"),
test.MX("mx.region2.skydns.test. 300 IN MX 1 mx1.region2.skydns.test."),
test.CNAME("cname.region2.skydns.test. 300 IN CNAME endpoint.region2.skydns.test."),
},
cnameAnswer: 1,
addressAnswer: 1,
mxAnswer: 1,
},
{
answer: []dns.RR{
test.MX("mx.region2.skydns.test. 300 IN MX 1 mx1.region2.skydns.test."),
test.A("endpoint.region2.skydns.test. 300 IN A 10.240.0.1"),
test.A("endpoint.region2.skydns.test. 300 IN A 10.240.0.2"),
test.MX("mx.region2.skydns.test. 300 IN MX 1 mx2.region2.skydns.test."),
test.CNAME("cname2.region2.skydns.test. 300 IN CNAME cname3.region2.skydns.test."),
test.A("endpoint.region2.skydns.test. 300 IN A 10.240.0.3"),
test.MX("mx.region2.skydns.test. 300 IN MX 1 mx3.region2.skydns.test."),
},
extra: []dns.RR{
test.A("endpoint.region2.skydns.test. 300 IN A 10.240.0.1"),
test.AAAA("endpoint.region2.skydns.test. 300 IN AAAA ::1"),
test.MX("mx.region2.skydns.test. 300 IN MX 1 mx1.region2.skydns.test."),
test.CNAME("cname2.region2.skydns.test. 300 IN CNAME cname3.region2.skydns.test."),
test.MX("mx.region2.skydns.test. 300 IN MX 1 mx2.region2.skydns.test."),
test.A("endpoint.region2.skydns.test. 300 IN A 10.240.0.3"),
test.AAAA("endpoint.region2.skydns.test. 300 IN AAAA ::2"),
test.MX("mx.region2.skydns.test. 300 IN MX 1 mx3.region2.skydns.test."),
},
cnameAnswer: 1,
cnameExtra: 1,
addressAnswer: 3,
addressExtra: 4,
mxAnswer: 3,
mxExtra: 3,
},
}
rec := dnstest.NewRecorder(&test.ResponseWriter{})
for i, test := range tests {
req := new(dns.Msg)
req.SetQuestion("region2.skydns.test.", dns.TypeSRV)
req.Answer = test.answer
req.Extra = test.extra
_, err := rm.ServeDNS(context.TODO(), rec, req)
if err != nil {
t.Errorf("Test %d: Expected no error, but got %s", i, err)
continue
}
cname, address, mx, sorted := countRecords(rec.Msg.Answer)
if !sorted {
t.Errorf("Test %d: Expected CNAMEs, then AAAAs, then MX in Answer, but got mixed", i)
}
if cname != test.cnameAnswer {
t.Errorf("Test %d: Expected %d CNAMEs in Answer, but got %d", i, test.cnameAnswer, cname)
}
if address != test.addressAnswer {
t.Errorf("Test %d: Expected %d A/AAAAs in Answer, but got %d", i, test.addressAnswer, address)
}
if mx != test.mxAnswer {
t.Errorf("Test %d: Expected %d MXs in Answer, but got %d", i, test.mxAnswer, mx)
}
cname, address, mx, sorted = countRecords(rec.Msg.Extra)
if !sorted {
t.Errorf("Test %d: Expected CNAMEs, then AAAAs, then MX in Extra, but got mixed", i)
}
if cname != test.cnameExtra {
t.Errorf("Test %d: Expected %d CNAMEs in Extra, but got %d", i, test.cnameAnswer, cname)
}
if address != test.addressExtra {
t.Errorf("Test %d: Expected %d A/AAAAs in Extra, but got %d", i, test.addressAnswer, address)
}
if mx != test.mxExtra {
t.Errorf("Test %d: Expected %d MXs in Extra, but got %d", i, test.mxAnswer, mx)
}
}
}
func handler() plugin.Handler {
return plugin.HandlerFunc(func(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) {
w.WriteMsg(r)
return dns.RcodeSuccess, nil
})
}
*/