middleware/proxy: absorb httpproxy (#481)

* middleware/proxy: absorb httpproxy

Move the httproxy into proxy. This adds and Exchanger interface which
is used to exchange the messages with the upstream.

The https_google upstream will re-resolve itself and update the upstream
hosts used every 300s.

* Remove and add TODO
This commit is contained in:
Miek Gieben
2017-02-06 19:32:48 +00:00
committed by GitHub
parent 77f957d443
commit 123a76c91e
21 changed files with 466 additions and 827 deletions

View File

@@ -13,8 +13,8 @@ In its most basic form, a simple reverse proxy uses this syntax:
proxy FROM TO
~~~
* **FROM** is the base domain to match for the request to be proxied
* **TO** is the destination endpoint to proxy to
* **FROM** is the base domain to match for the request to be proxied.
* **TO** is the destination endpoint to proxy to.
However, advanced features including load balancing can be utilized with an expanded syntax:
@@ -26,7 +26,7 @@ proxy FROM TO... {
health_check PATH:PORT [DURATION]
except IGNORED_NAMES...
spray
protocol [dns|https_google]
protocol [dns|https_google [bootstrap ADDRESS...]]
}
~~~
@@ -39,7 +39,8 @@ proxy FROM TO... {
* `ignored_names...` is a space-separated list of paths to exclude from proxying. Requests that match any of these paths will be passed through.
* `spray` when all backends are unhealthy, randomly pick one to send the traffic to. (This is a failsafe.)
* `protocol` specifies what protocol to use to speak to an upstream, `dns` (the default) is plain old DNS, and
`https_google` uses `https://dns.google.com` and speaks a JSON DNS dialect.
`https_google` uses `https://dns.google.com` and speaks a JSON DNS dialect. Note when using this
**TO** must be `dns.google.com`.
## Policies
@@ -53,17 +54,43 @@ available. This is to preeempt the case where the healthchecking (as a mechanism
## Upstream Protocols
Currently supported are `dns` (i.e., standard DNS over UDP) and `https_google`. Note that with
`https_google` the entire transport is encrypted. Only *you* and *Google* can see your DNS activity.
Currently `protocol` supports `dns` (i.e., standard DNS over UDP/TCP) and `https_google` (JSON
payload over HTTPS). Note that with `https_google` the entire transport is encrypted. Only *you* and
*Google* can see your DNS activity.
* `dns`: no options can be given at the moment.
* `https_google`: bootstrap **ADDRESS...** is used to (re-)resolve `dns.google.com` to an address to
connect to. This happens every 300s. If not specified the default is used: 8.8.8.8:53/8.8.4.4:53.
Note that **TO** is *ignored* when `https_google` is used, as its upstream is defined as
`dns.google.com`.
Debug queries are enabled by default and currently there is no way to turn them off. When CoreDNS
receives a debug query (i.e. the name is prefixed with `o-o.debug.`) a TXT record with Comment
from `dns.google.com` is added. Note this is not always set, but sometimes you'll see:
`dig @localhost -p 1053 mx o-o.debug.example.org`:
~~~ txt
;; OPT PSEUDOSECTION:
; EDNS: version: 0, flags:; udp: 4096
;; QUESTION SECTION:
;o-o.debug.example.org. IN MX
;; AUTHORITY SECTION:
example.org. 1799 IN SOA sns.dns.icann.org. noc.dns.icann.org. 2016110711 7200 3600 1209600 3600
;; ADDITIONAL SECTION:
. 0 CH TXT "Response from 199.43.133.53"
~~~
## Metrics
If monitoring is enabled (via the *prometheus* directive) then the following metric is exported:
* coredns_proxy_request_duration_milliseconds{zone}
* coredns_proxy_request_count_total{proto, from}
The metric shows the duration for a proxied request, the `zone` label is the **FROM** as specified
in the configuration.
Where `proto` is the protocol used (`dns`, or `https_google`) and `from` is **FROM** specified in
the config.
## Examples
@@ -111,3 +138,19 @@ proxy . /etc/resolv.conf {
except miek.nl example.org
}
~~~
Proxy all requests within example.org to Google's dns.google.com.
~~~
proxy example.org 1.2.3.4:53 {
protocol https_google
}
~~~
Proxy everything, and re-lookup `dns.google.com` every 300 seconds using 8.8.8.8:53.
~~~
proxy . 1.2.3.4:53 {
protocol https_google bootstrap 8.8.8.8:53
}
~~~

View File

@@ -12,23 +12,20 @@ import (
type dnsEx struct {
Timeout time.Duration
Address string // address/name of this upstream
group *singleflight.Group
group *singleflight.Group
}
func newDNSEx(address string) *dnsEx {
return &dnsEx{Address: address, group: new(singleflight.Group), Timeout: defaultTimeout * time.Second}
func newDNSEx() *dnsEx {
return &dnsEx{group: new(singleflight.Group), Timeout: defaultTimeout * time.Second}
}
func (d *dnsEx) OnStartup() error { return nil }
func (d *dnsEx) OnShutdown() error { return nil }
func (d *dnsEx) SetUpstream(u Upstream) error { return nil }
func (d *dnsEx) Protocol() protocol { return dnsProto }
func (g *dnsEx) Protocol() string { return "dns" }
func (d *dnsEx) OnShutdown(p *Proxy) error { return nil }
func (d *dnsEx) OnStartup(p *Proxy) error { return nil }
// Exchange implements the Exchanger interface.
func (d *dnsEx) Exchange(state request.Request) (*dns.Msg, error) {
co, err := net.DialTimeout(state.Proto(), d.Address, d.Timeout)
func (d *dnsEx) Exchange(addr string, state request.Request) (*dns.Msg, error) {
co, err := net.DialTimeout(state.Proto(), addr, d.Timeout)
if err != nil {
return nil, err
}
@@ -101,5 +98,3 @@ func exchange(m *dns.Msg, co net.Conn) (dns.Msg, error) {
}
return *r, err
}
const dnsProto protocol = "dns"

View File

@@ -8,11 +8,9 @@ import (
// Exchanger is an interface that specifies a type implementing a DNS resolver that
// can use whatever transport it likes.
type Exchanger interface {
Exchange(request.Request) (*dns.Msg, error)
SetUpstream(Upstream) error // (Re)set the upstream
OnStartup() error
OnShutdown() error
Protocol() protocol
}
Exchange(addr string, state request.Request) (*dns.Msg, error)
Protocol() string
type protocol string
OnStartup(*Proxy) error
OnShutdown(*Proxy) error
}

241
middleware/proxy/google.go Normal file
View File

@@ -0,0 +1,241 @@
package proxy
import (
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"sync/atomic"
"time"
"github.com/miekg/coredns/middleware/pkg/debug"
"github.com/miekg/coredns/request"
"github.com/miekg/dns"
)
type google struct {
client *http.Client
endpoint string // Name to resolve via 'bootstrapProxy'
bootstrapProxy Proxy
quit chan bool
}
func newGoogle(endpoint string, bootstrap []string) *google {
if endpoint == "" {
endpoint = ghost
}
tls := &tls.Config{ServerName: endpoint}
client := &http.Client{
Timeout: time.Second * defaultTimeout,
Transport: &http.Transport{TLSClientConfig: tls},
}
boot := NewLookup(bootstrap)
return &google{client: client, endpoint: dns.Fqdn(endpoint), bootstrapProxy: boot, quit: make(chan bool)}
}
func (g *google) Exchange(addr string, state request.Request) (*dns.Msg, error) {
v := url.Values{}
v.Set("name", state.Name())
v.Set("type", fmt.Sprintf("%d", state.QType()))
optDebug := false
if bug := debug.IsDebug(state.Name()); bug != "" {
optDebug = true
v.Set("name", bug)
}
buf, backendErr := g.exchangeJSON(addr, v.Encode())
if backendErr == nil {
gm := new(googleMsg)
if err := json.Unmarshal(buf, gm); err != nil {
return nil, err
}
m, debug, err := toMsg(gm)
if err != nil {
return nil, err
}
if optDebug {
// reset question
m.Question[0].Name = state.QName()
// prepend debug RR to the additional section
m.Extra = append([]dns.RR{debug}, m.Extra...)
}
m.Id = state.Req.Id
return m, nil
}
log.Printf("[WARNING] Failed to connect to HTTPS backend %q: %s", g.endpoint, backendErr)
return nil, backendErr
}
func (g *google) exchangeJSON(addr, json string) ([]byte, error) {
url := "https://" + addr + "/resolve?" + json
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
req.Host = g.endpoint // TODO(miek): works with the extra dot at the end?
resp, err := g.client.Do(req)
if err != nil {
return nil, err
}
buf, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return nil, err
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("failed to get 200 status code, got %d", resp.StatusCode)
}
return buf, nil
}
func (g *google) Protocol() string { return "https_google" }
func (g *google) OnShutdown(p *Proxy) error {
g.quit <- true
return nil
}
func (g *google) OnStartup(p *Proxy) error {
// We fake a state because normally the proxy is called after we already got a incoming query.
// This is a non-edns0, udp request to g.endpoint.
req := new(dns.Msg)
req.SetQuestion(g.endpoint, dns.TypeA)
state := request.Request{W: new(fakeBootWriter), Req: req}
new, err := g.bootstrapProxy.Lookup(state, g.endpoint, dns.TypeA)
oldUpstream := *p.Upstreams
oldFrom := ""
var oldEx Exchanger
if len(oldUpstream) > 0 {
oldFrom = oldUpstream[0].From()
oldEx = oldUpstream[0].Exchanger()
}
// ignore errors here, as we want to keep on trying.
if err != nil {
log.Printf("[WARNING] Failed to bootstrap A records %q: %s", g.endpoint, err)
} else {
addrs, err1 := extractAnswer(new)
if err1 != nil {
log.Printf("[WARNING] Failed to bootstrap A records %q: %s", g.endpoint, err)
}
up := newUpstream(addrs, oldFrom, oldEx)
p.Upstreams = &[]Upstream{up}
}
go func() {
tick := time.NewTicker(300 * time.Second)
for {
select {
case <-tick.C:
new, err := g.bootstrapProxy.Lookup(state, g.endpoint, dns.TypeA)
if err != nil {
log.Printf("[WARNING] Failed to bootstrap A records %q: %s", g.endpoint, err)
} else {
addrs, err1 := extractAnswer(new)
if err1 != nil {
log.Printf("[WARNING] Failed to bootstrap A records %q: %s", g.endpoint, err)
continue
}
up := newUpstream(addrs, oldFrom, oldEx)
p.Upstreams = &[]Upstream{up}
}
case <-g.quit:
return
}
}
}()
return nil
}
func extractAnswer(m *dns.Msg) ([]string, error) {
if len(m.Answer) == 0 {
return nil, fmt.Errorf("no answer section in response")
}
ret := []string{}
for _, an := range m.Answer {
if a, ok := an.(*dns.A); ok {
ret = append(ret, net.JoinHostPort(a.A.String(), "443"))
}
}
if len(ret) > 0 {
return ret, nil
}
return nil, fmt.Errorf("no address records in answer section")
}
// newUpstream returns an upstream initialized with hosts.
func newUpstream(hosts []string, from string, ex Exchanger) Upstream {
upstream := &staticUpstream{
from: from,
Hosts: nil,
Policy: &Random{},
Spray: nil,
FailTimeout: 10 * time.Second,
MaxFails: 3,
ex: ex,
}
upstream.Hosts = make([]*UpstreamHost, len(hosts))
for i, h := range hosts {
uh := &UpstreamHost{
Name: h,
Conns: 0,
Fails: 0,
FailTimeout: upstream.FailTimeout,
Unhealthy: false,
CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc {
return func(uh *UpstreamHost) bool {
if uh.Unhealthy {
return true
}
fails := atomic.LoadInt32(&uh.Fails)
if fails >= upstream.MaxFails && upstream.MaxFails != 0 {
return true
}
return false
}
}(upstream),
WithoutPathPrefix: upstream.WithoutPathPrefix,
}
upstream.Hosts[i] = uh
}
return upstream
}
const (
// Default endpoint for this service.
ghost = "dns.google.com."
)

View File

@@ -0,0 +1,90 @@
package proxy
import (
"fmt"
"github.com/miekg/dns"
)
// toMsg converts a googleMsg into the dns message. The returned RR is the comment disquised as a TXT record.
func toMsg(g *googleMsg) (*dns.Msg, dns.RR, error) {
m := new(dns.Msg)
m.Response = true
m.Rcode = g.Status
m.Truncated = g.TC
m.RecursionDesired = g.RD
m.RecursionAvailable = g.RA
m.AuthenticatedData = g.AD
m.CheckingDisabled = g.CD
m.Question = make([]dns.Question, 1)
m.Answer = make([]dns.RR, len(g.Answer))
m.Ns = make([]dns.RR, len(g.Authority))
m.Extra = make([]dns.RR, len(g.Additional))
m.Question[0] = dns.Question{Name: g.Question[0].Name, Qtype: g.Question[0].Type, Qclass: dns.ClassINET}
var err error
for i := 0; i < len(m.Answer); i++ {
m.Answer[i], err = toRR(g.Answer[i])
if err != nil {
return nil, nil, err
}
}
for i := 0; i < len(m.Ns); i++ {
m.Ns[i], err = toRR(g.Authority[i])
if err != nil {
return nil, nil, err
}
}
for i := 0; i < len(m.Extra); i++ {
m.Extra[i], err = toRR(g.Additional[i])
if err != nil {
return nil, nil, err
}
}
txt, _ := dns.NewRR(". 0 CH TXT " + g.Comment)
return m, txt, nil
}
// toRR transforms a "google" RR to a dns.RR.
func toRR(g googleRR) (dns.RR, error) {
typ, ok := dns.TypeToString[g.Type]
if !ok {
return nil, fmt.Errorf("failed to convert type %q", g.Type)
}
str := fmt.Sprintf("%s %d %s %s", g.Name, g.TTL, typ, g.Data)
rr, err := dns.NewRR(str)
if err != nil {
return nil, fmt.Errorf("failed to parse %q: %s", str, err)
}
return rr, nil
}
// googleRR represents a dns.RR in another form.
type googleRR struct {
Name string
Type uint16
TTL uint32
Data string
}
// googleMsg is a JSON representation of the dns.Msg.
type googleMsg struct {
Status int
TC bool
RD bool
RA bool
AD bool
CD bool
Question []struct {
Name string
Type uint16
}
Answer []googleRR
Authority []googleRR
Additional []googleRR
Comment string
}

View File

@@ -0,0 +1,5 @@
package proxy
// TODO(miek):
// Test cert failures - put those in SERVFAIL messages, but attach error code in TXT
// Test connecting to a a bad host.

View File

@@ -13,7 +13,6 @@ import (
// NewLookup create a new proxy with the hosts in host and a Random policy.
func NewLookup(hosts []string) Proxy {
// TODO(miek): maybe add optional protocol parameter?
p := Proxy{Next: nil}
upstream := &staticUpstream{
@@ -22,7 +21,8 @@ func NewLookup(hosts []string) Proxy {
Policy: &Random{},
Spray: nil,
FailTimeout: 10 * time.Second,
MaxFails: 3,
MaxFails: 3, // TODO(miek): disable error checking for simple lookups?
ex: newDNSEx(),
}
for i, host := range hosts {
@@ -31,7 +31,6 @@ func NewLookup(hosts []string) Proxy {
Conns: 0,
Fails: 0,
FailTimeout: upstream.FailTimeout,
Exchanger: newDNSEx(host),
Unhealthy: false,
CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc {
@@ -50,7 +49,7 @@ func NewLookup(hosts []string) Proxy {
}
upstream.Hosts[i] = uh
}
p.Upstreams = []Upstream{upstream}
p.Upstreams = &[]Upstream{upstream}
return p
}
@@ -72,7 +71,7 @@ func (p Proxy) Forward(state request.Request) (*dns.Msg, error) {
}
func (p Proxy) lookup(state request.Request) (*dns.Msg, error) {
for _, upstream := range p.Upstreams {
for _, upstream := range *p.Upstreams {
start := time.Now()
// Since Select() should give us "up" hosts, keep retrying
@@ -88,7 +87,7 @@ func (p Proxy) lookup(state request.Request) (*dns.Msg, error) {
atomic.AddInt64(&host.Conns, 1)
reply, backendErr := host.Exchange(state)
reply, backendErr := upstream.Exchanger().Exchange(host.Name, state)
atomic.AddInt64(&host.Conns, -1)

View File

@@ -16,11 +16,11 @@ var (
Name: "request_duration_milliseconds",
Buckets: append(prometheus.DefBuckets, []float64{50, 100, 200, 500, 1000, 2000, 3000, 4000, 5000, 10000}...),
Help: "Histogram of the time (in milliseconds) each request took.",
}, []string{"zone"})
}, []string{"proto", "from"})
)
// OnStartup sets up the metrics on startup. This is done for all proxy protocols.
func OnStartup() error {
// OnStartupMetrics sets up the metrics on startup. This is done for all proxy protocols.
func OnStartupMetrics() error {
metricsOnce.Do(func() {
prometheus.MustRegister(RequestDuration)
})

View File

@@ -21,8 +21,12 @@ var (
// Proxy represents a middleware instance that can proxy requests to another (DNS) server.
type Proxy struct {
Next middleware.Handler
Upstreams []Upstream
Next middleware.Handler
// Upstreams is a pointer to a slice, so we can update the upstream (used for Google)
// midway.
Upstreams *[]Upstream
}
// Upstream manages a pool of proxy upstream hosts. Select should return a
@@ -34,8 +38,8 @@ type Upstream interface {
Select() *UpstreamHost
// Checks if subpdomain is not an ignored.
IsAllowedPath(string) bool
// Options returns the options set for this upstream
Options() Options
// Exchanger returns the exchanger to be used for this upstream.
Exchanger() Exchanger
}
// UpstreamHostDownFunc can be used to customize how Down behaves.
@@ -50,7 +54,6 @@ type UpstreamHost struct {
Unhealthy bool
CheckDown UpstreamHostDownFunc
WithoutPathPrefix string
Exchanger
}
// Down checks whether the upstream host is down or not.
@@ -70,11 +73,12 @@ func (uh *UpstreamHost) Down() bool {
var tryDuration = 60 * time.Second
// ServeDNS satisfies the middleware.Handler interface.
func (p Proxy) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) {
var span, child ot.Span
span = ot.SpanFromContext(ctx)
state := request.Request{W: w, Req: r}
for _, upstream := range p.Upstreams {
for _, upstream := range *p.Upstreams {
start := time.Now()
// Since Select() should give us "up" hosts, keep retrying
@@ -83,7 +87,7 @@ func (p Proxy) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (
host := upstream.Select()
if host == nil {
RequestDuration.WithLabelValues(state.Proto(), upstream.From()).Observe(float64(time.Since(start) / time.Millisecond))
RequestDuration.WithLabelValues(upstream.Exchanger().Protocol(), upstream.From()).Observe(float64(time.Since(start) / time.Millisecond))
return dns.RcodeServerFailure, errUnreachable
}
@@ -95,7 +99,7 @@ func (p Proxy) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (
atomic.AddInt64(&host.Conns, 1)
reply, backendErr := host.Exchange(state)
reply, backendErr := upstream.Exchanger().Exchange(host.Name, state)
atomic.AddInt64(&host.Conns, -1)
@@ -106,7 +110,7 @@ func (p Proxy) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (
if backendErr == nil {
w.WriteMsg(reply)
RequestDuration.WithLabelValues(state.Proto(), upstream.From()).Observe(float64(time.Since(start) / time.Millisecond))
RequestDuration.WithLabelValues(upstream.Exchanger().Protocol(), upstream.From()).Observe(float64(time.Since(start) / time.Millisecond))
return 0, nil
}
@@ -121,7 +125,7 @@ func (p Proxy) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (
}(host, timeout)
}
RequestDuration.WithLabelValues(state.Proto(), upstream.From()).Observe(float64(time.Since(start) / time.Millisecond))
RequestDuration.WithLabelValues(upstream.Exchanger().Protocol(), upstream.From()).Observe(float64(time.Since(start) / time.Millisecond))
return dns.RcodeServerFailure, errUnreachable
}

View File

@@ -0,0 +1,21 @@
package proxy
import (
"net"
"github.com/miekg/dns"
)
type fakeBootWriter struct {
dns.ResponseWriter
}
func (w *fakeBootWriter) LocalAddr() net.Addr {
local := net.ParseIP("127.0.0.1")
return &net.UDPAddr{IP: local, Port: 53} // Port is not used here
}
func (w *fakeBootWriter) RemoteAddr() net.Addr {
remote := net.ParseIP("8.8.8.8")
return &net.UDPAddr{IP: remote, Port: 53} // Port is not used here
}

View File

@@ -19,11 +19,24 @@ func setup(c *caddy.Controller) error {
if err != nil {
return middleware.Error("proxy", err)
}
P := &Proxy{}
dnsserver.GetConfig(c).AddMiddleware(func(next middleware.Handler) middleware.Handler {
return Proxy{Next: next, Upstreams: upstreams}
P.Next = next
P.Upstreams = &upstreams
return P
})
c.OnStartup(OnStartup)
c.OnStartup(OnStartupMetrics)
for _, u := range upstreams {
c.OnStartup(func() error {
return u.Exchanger().OnStartup(P)
})
c.OnShutdown(func() error {
return u.Exchanger().OnShutdown(P)
})
}
return nil
}

View File

@@ -37,13 +37,7 @@ type staticUpstream struct {
}
WithoutPathPrefix string
IgnoredSubDomains []string
options Options
Protocol protocol
}
// Options ...
type Options struct {
Ecs []*net.IPNet // EDNS0 CLIENT SUBNET address (v4/v6) to add in CIDR notaton.
ex Exchanger
}
// NewStaticUpstreams parses the configuration input and sets up
@@ -58,7 +52,7 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) {
Spray: nil,
FailTimeout: 10 * time.Second,
MaxFails: 1,
Protocol: dnsProto,
ex: newDNSEx(),
}
if !c.Args(&upstream.from) {
@@ -89,7 +83,6 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) {
Fails: 0,
FailTimeout: upstream.FailTimeout,
Unhealthy: false,
Exchanger: newDNSEx(host),
CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc {
return func(uh *UpstreamHost) bool {
@@ -106,14 +99,6 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) {
}(upstream),
WithoutPathPrefix: upstream.WithoutPathPrefix,
}
switch upstream.Protocol {
// case https_google:
case dnsProto:
fallthrough
default:
// Already done in the initialization above.
}
upstream.Hosts[i] = uh
}
@@ -135,10 +120,6 @@ func (u *staticUpstream) From() string {
return u.from
}
func (u *staticUpstream) Options() Options {
return u.options
}
func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error {
switch c.Val() {
case "policy":
@@ -208,9 +189,14 @@ func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error {
}
switch encArgs[0] {
case "dns":
u.Protocol = dnsProto
u.ex = newDNSEx()
case "https_google":
// Nothing yet.
boot := []string{"8.8.8.8:53", "8.8.4.4:53"}
if len(encArgs) > 2 && encArgs[1] == "bootstrap" {
boot = encArgs[2:]
}
u.ex = newGoogle("", boot) // "" for default in google.go
default:
return fmt.Errorf("%s: %s", errInvalidProtocol, encArgs[0])
}
@@ -305,3 +291,5 @@ func (u *staticUpstream) IsAllowedPath(name string) bool {
}
return true
}
func (u *staticUpstream) Exchanger() Exchanger { return u.ex }