mirror of
https://github.com/coredns/coredns.git
synced 2025-10-31 18:23:13 -04:00
mw/federation: add federation back as separate mw for k8s (#929)
* mw/federaration This PR add the federation back as a middleware to keep it more contained from the main kubernetes code. It also makes parseRequest less import and pushes this functionlity down in the k.Entries. This minimizes (or tries to) the importance for the qtype in the query. In the end the qtype checking should only happen in ServeDNS - but for k8s this might proof difficult. Numerous other cleanup in code and kubernetes tests. * up test coverage
This commit is contained in:
43
middleware/kubernetes/federation.go
Normal file
43
middleware/kubernetes/federation.go
Normal file
@@ -0,0 +1,43 @@
|
||||
package kubernetes
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/coredns/coredns/middleware/etcd/msg"
|
||||
"github.com/coredns/coredns/request"
|
||||
)
|
||||
|
||||
// The federation node.Labels keys used.
|
||||
const (
|
||||
// TODO: Do not hardcode these labels. Pull them out of the API instead.
|
||||
//
|
||||
// We can get them via ....
|
||||
// import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
// metav1.LabelZoneFailureDomain
|
||||
// metav1.LabelZoneRegion
|
||||
//
|
||||
// But importing above breaks coredns with flag collision of 'log_dir'
|
||||
|
||||
LabelZone = "failure-domain.beta.kubernetes.io/zone"
|
||||
LabelRegion = "failure-domain.beta.kubernetes.io/region"
|
||||
)
|
||||
|
||||
// Federations is used from the federations middleware to return the service that should be
|
||||
// returned as a CNAME for federation(s) to work.
|
||||
func (k *Kubernetes) Federations(state request.Request, fname, fzone string) (msg.Service, error) {
|
||||
nodeName := k.localNodeName()
|
||||
node, err := k.APIConn.GetNodeByName(nodeName)
|
||||
if err != nil {
|
||||
return msg.Service{}, err
|
||||
}
|
||||
r, err := k.parseRequest(state)
|
||||
|
||||
lz := node.Labels[LabelZone]
|
||||
lr := node.Labels[LabelRegion]
|
||||
|
||||
if r.endpoint == "" {
|
||||
return msg.Service{Host: strings.Join([]string{r.service, r.namespace, fname, r.podOrSvc, lz, lr, fzone}, ".")}, nil
|
||||
}
|
||||
|
||||
return msg.Service{Host: strings.Join([]string{r.endpoint, r.service, r.namespace, fname, r.podOrSvc, lz, lr, fzone}, ".")}, nil
|
||||
}
|
||||
@@ -146,10 +146,8 @@ var podModeVerifiedCases = map[string](test.Case){
|
||||
|
||||
func TestServeDNS(t *testing.T) {
|
||||
|
||||
k := Kubernetes{Zones: []string{"cluster.local."}}
|
||||
|
||||
k := New([]string{"cluster.local."})
|
||||
k.APIConn = &APIConnServeTest{}
|
||||
k.interfaceAddrsFunc = localPodIP
|
||||
k.Next = test.NextHandler(dns.RcodeSuccess, nil)
|
||||
|
||||
ctx := context.TODO()
|
||||
@@ -166,7 +164,7 @@ func TestServeDNS(t *testing.T) {
|
||||
runServeDNSTests(ctx, t, podModeVerifiedCases, k)
|
||||
}
|
||||
|
||||
func runServeDNSTests(ctx context.Context, t *testing.T, dnsTestCases map[string](test.Case), k Kubernetes) {
|
||||
func runServeDNSTests(ctx context.Context, t *testing.T, dnsTestCases map[string](test.Case), k *Kubernetes) {
|
||||
for testname, tc := range dnsTestCases {
|
||||
r := tc.Msg()
|
||||
|
||||
|
||||
@@ -51,6 +51,16 @@ type Kubernetes struct {
|
||||
autoPathSearch []string // Local search path from /etc/resolv.conf. Needed for autopath.
|
||||
}
|
||||
|
||||
// New returns a intialized Kubernetes. It default interfaceAddrFunc to return 127.0.0.1. All other
|
||||
// values default to their zero value, primaryZoneIndex will thus point to the first zone.
|
||||
func New(zones []string) *Kubernetes {
|
||||
k := new(Kubernetes)
|
||||
k.Zones = zones
|
||||
k.interfaceAddrsFunc = func() net.IP { return net.ParseIP("127.0.0.1") }
|
||||
|
||||
return k
|
||||
}
|
||||
|
||||
const (
|
||||
// PodModeDisabled is the default value where pod requests are ignored
|
||||
PodModeDisabled = "disabled"
|
||||
@@ -96,21 +106,21 @@ func (k *Kubernetes) Services(state request.Request, exact bool, opt middleware.
|
||||
|
||||
// We're looking again at types, which we've already done in ServeDNS, but there are some types k8s just can't answer.
|
||||
switch state.QType() {
|
||||
|
||||
case dns.TypeTXT:
|
||||
// 1 label + zone, label must be "dns-version".
|
||||
t, err := dnsutil.TrimZone(state.Name(), state.Zone)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
t, _ := dnsutil.TrimZone(state.Name(), state.Zone)
|
||||
|
||||
segs := dns.SplitDomainName(t)
|
||||
if len(segs) != 1 {
|
||||
return nil, nil, errors.New("servfail")
|
||||
return nil, nil, fmt.Errorf("kubernetes: TXT query can only be for dns-version: %s", state.QName())
|
||||
}
|
||||
if segs[0] != "dns-version" {
|
||||
return nil, nil, errInvalidRequest
|
||||
return nil, nil, nil
|
||||
}
|
||||
svc := msg.Service{Text: DNSSchemaVersion, TTL: 28800, Key: msg.Path(state.QName(), "coredns")}
|
||||
return []msg.Service{svc}, nil, nil
|
||||
|
||||
case dns.TypeNS:
|
||||
// We can only get here if the qname equal the zone, see ServeDNS in handler.go.
|
||||
ns := k.nsAddr()
|
||||
@@ -118,38 +128,30 @@ func (k *Kubernetes) Services(state request.Request, exact bool, opt middleware.
|
||||
return []msg.Service{svc}, nil, nil
|
||||
}
|
||||
|
||||
r, e := k.parseRequest(state)
|
||||
if e != nil {
|
||||
return nil, nil, e
|
||||
if state.QType() == dns.TypeA && isDefaultNS(state.Name(), state.Zone) {
|
||||
// If this is an A request for "ns.dns", respond with a "fake" record for coredns.
|
||||
// SOA records always use this hardcoded name
|
||||
ns := k.nsAddr()
|
||||
svc := msg.Service{Host: ns.A.String(), Key: msg.Path(state.QName(), "coredns")}
|
||||
return []msg.Service{svc}, nil, nil
|
||||
}
|
||||
|
||||
switch state.QType() {
|
||||
case dns.TypeA, dns.TypeAAAA, dns.TypeCNAME:
|
||||
if state.Type() == "A" && isDefaultNS(state.Name(), r) {
|
||||
// If this is an A request for "ns.dns", respond with a "fake" record for coredns.
|
||||
// SOA records always use this hardcoded name
|
||||
ns := k.nsAddr()
|
||||
svc := msg.Service{Host: ns.A.String(), Key: msg.Path(state.QName(), "coredns")}
|
||||
return []msg.Service{svc}, nil, nil
|
||||
}
|
||||
s, e := k.Entries(r)
|
||||
if state.QType() == dns.TypeAAAA {
|
||||
// AAAA not implemented
|
||||
return nil, nil, e
|
||||
}
|
||||
return s, nil, e // Haven't implemented debug queries yet.
|
||||
case dns.TypeSRV:
|
||||
s, e := k.Entries(r)
|
||||
// SRV for external services is not yet implemented, so remove those records
|
||||
noext := []msg.Service{}
|
||||
for _, svc := range s {
|
||||
if t, _ := svc.HostType(); t != dns.TypeCNAME {
|
||||
noext = append(noext, svc)
|
||||
}
|
||||
}
|
||||
return noext, nil, e
|
||||
s, e := k.Entries(state)
|
||||
|
||||
// SRV for external services is not yet implemented, so remove those records.
|
||||
|
||||
if state.QType() != dns.TypeSRV {
|
||||
return s, nil, e
|
||||
}
|
||||
return nil, nil, nil
|
||||
|
||||
internal := []msg.Service{}
|
||||
for _, svc := range s {
|
||||
if t, _ := svc.HostType(); t != dns.TypeCNAME {
|
||||
internal = append(internal, svc)
|
||||
}
|
||||
}
|
||||
|
||||
return internal, nil, e
|
||||
}
|
||||
|
||||
// primaryZone will return the first non-reverse zone being handled by this middleware
|
||||
@@ -247,9 +249,11 @@ func (k *Kubernetes) getClientConfig() (*rest.Config, error) {
|
||||
if len(k.APIClientKey) > 0 {
|
||||
authinfo.ClientKey = k.APIClientKey
|
||||
}
|
||||
|
||||
overrides.ClusterInfo = clusterinfo
|
||||
overrides.AuthInfo = authinfo
|
||||
clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, overrides)
|
||||
|
||||
return clientConfig.ClientConfig()
|
||||
}
|
||||
|
||||
@@ -263,7 +267,7 @@ func (k *Kubernetes) InitKubeCache() (err error) {
|
||||
|
||||
kubeClient, err := kubernetes.NewForConfig(config)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create kubernetes notification controller: %v", err)
|
||||
return fmt.Errorf("failed to create kubernetes notification controller: %q", err)
|
||||
}
|
||||
|
||||
if k.LabelSelector != nil {
|
||||
@@ -271,12 +275,12 @@ func (k *Kubernetes) InitKubeCache() (err error) {
|
||||
selector, err = unversionedapi.LabelSelectorAsSelector(k.LabelSelector)
|
||||
k.Selector = &selector
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create Selector for LabelSelector '%s'.Error was: %s", k.LabelSelector, err)
|
||||
return fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", k.LabelSelector, err)
|
||||
}
|
||||
}
|
||||
|
||||
if k.LabelSelector != nil {
|
||||
log.Printf("[INFO] Kubernetes middleware configured with the label selector '%s'. Only kubernetes objects matching this label selector will be exposed.", unversionedapi.FormatLabelSelector(k.LabelSelector))
|
||||
log.Printf("[INFO] Kubernetes has label selector '%s'. Only objects matching this label selector will be exposed.", unversionedapi.FormatLabelSelector(k.LabelSelector))
|
||||
}
|
||||
|
||||
opts := dnsControlOpts{
|
||||
@@ -287,20 +291,22 @@ func (k *Kubernetes) InitKubeCache() (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
// Records not implemented, see Entries().
|
||||
// Records is not implemented.
|
||||
func (k *Kubernetes) Records(name string, exact bool) ([]msg.Service, error) {
|
||||
return nil, fmt.Errorf("NOOP")
|
||||
return nil, fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
// Entries looks up services in kubernetes. If exact is true, it will lookup
|
||||
// just this name. This is used when find matches when completing SRV lookups
|
||||
// for instance.
|
||||
func (k *Kubernetes) Entries(r recordRequest) ([]msg.Service, error) {
|
||||
// Abort if the namespace does not contain a wildcard, and namespace is not published per CoreFile
|
||||
// Case where namespace contains a wildcard is handled in Get(...) method.
|
||||
if (!wildcard(r.namespace)) && (len(k.Namespaces) > 0) && (!dnsstrings.StringInSlice(r.namespace, k.Namespaces)) {
|
||||
// Entries looks up services in kubernetes.
|
||||
func (k *Kubernetes) Entries(state request.Request) ([]msg.Service, error) {
|
||||
r, e := k.parseRequest(state)
|
||||
if e != nil {
|
||||
return nil, e
|
||||
}
|
||||
|
||||
if !k.namespaceExposed(r.namespace) {
|
||||
return nil, errNsNotExposed
|
||||
}
|
||||
|
||||
services, pods, err := k.get(r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -310,7 +316,6 @@ func (k *Kubernetes) Entries(r recordRequest) ([]msg.Service, error) {
|
||||
}
|
||||
|
||||
records := k.getRecordsForK8sItems(services, pods, r)
|
||||
|
||||
return records, nil
|
||||
}
|
||||
|
||||
@@ -432,6 +437,7 @@ func (k *Kubernetes) findServices(r recordRequest) ([]kService, error) {
|
||||
if !(match(r.namespace, svc.Namespace, nsWildcard) && match(r.service, svc.Name, serviceWildcard)) {
|
||||
continue
|
||||
}
|
||||
|
||||
// If namespace has a wildcard, filter results against Corefile namespace list.
|
||||
// (Namespaces without a wildcard were filtered before the call to this function.)
|
||||
if nsWildcard && (len(k.Namespaces) > 0) && (!dnsstrings.StringInSlice(svc.Namespace, k.Namespaces)) {
|
||||
@@ -529,28 +535,22 @@ func (k *Kubernetes) getServiceRecordForIP(ip, name string) []msg.Service {
|
||||
return nil
|
||||
}
|
||||
|
||||
// namespaceExposed returns true when the namespace is exposed.
|
||||
func (k *Kubernetes) namespaceExposed(namespace string) bool {
|
||||
// Abort if the namespace does not contain a wildcard, and namespace is
|
||||
// not published per CoreFile Case where namespace contains a wildcard
|
||||
// is handled in k.get(...) method.
|
||||
if (!wildcard(namespace)) && (len(k.Namespaces) > 0) && (!dnsstrings.StringInSlice(namespace, k.Namespaces)) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// wildcard checks whether s contains a wildcard value
|
||||
func wildcard(s string) bool {
|
||||
return (s == "*" || s == "any")
|
||||
}
|
||||
|
||||
func localPodIP() net.IP {
|
||||
addrs, err := net.InterfaceAddrs()
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, addr := range addrs {
|
||||
ip, _, _ := net.ParseCIDR(addr.String())
|
||||
ip = ip.To4()
|
||||
if ip == nil || ip.IsLoopback() {
|
||||
continue
|
||||
}
|
||||
return ip
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
const (
|
||||
// Svc is the DNS schema for kubernetes services
|
||||
Svc = "svc"
|
||||
|
||||
@@ -195,8 +195,7 @@ func (APIConnServiceTest) GetNodeByName(name string) (api.Node, error) {
|
||||
|
||||
func TestServices(t *testing.T) {
|
||||
|
||||
k := Kubernetes{Zones: []string{"interwebs.test."}}
|
||||
k.interfaceAddrsFunc = localPodIP
|
||||
k := New([]string{"interwebs.test."})
|
||||
k.APIConn = &APIConnServiceTest{}
|
||||
|
||||
type svcAns struct {
|
||||
|
||||
40
middleware/kubernetes/local.go
Normal file
40
middleware/kubernetes/local.go
Normal file
@@ -0,0 +1,40 @@
|
||||
package kubernetes
|
||||
|
||||
import "net"
|
||||
|
||||
func localPodIP() net.IP {
|
||||
addrs, err := net.InterfaceAddrs()
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, addr := range addrs {
|
||||
ip, _, _ := net.ParseCIDR(addr.String())
|
||||
ip = ip.To4()
|
||||
if ip == nil || ip.IsLoopback() {
|
||||
continue
|
||||
}
|
||||
return ip
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (k *Kubernetes) localNodeName() string {
|
||||
localIP := k.interfaceAddrsFunc()
|
||||
if localIP == nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
// Find endpoint matching localIP
|
||||
endpointsList := k.APIConn.EndpointsList()
|
||||
for _, ep := range endpointsList.Items {
|
||||
for _, eps := range ep.Subsets {
|
||||
for _, addr := range eps.Addresses {
|
||||
if localIP.Equal(net.ParseIP(addr.IP)) {
|
||||
return *addr.NodeName
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
@@ -8,8 +8,8 @@ import (
|
||||
"k8s.io/client-go/1.5/pkg/api"
|
||||
)
|
||||
|
||||
func isDefaultNS(name string, r recordRequest) bool {
|
||||
return strings.Index(name, defaultNSName) == 0 && strings.Index(name, r.zone) == len(defaultNSName)
|
||||
func isDefaultNS(name, zone string) bool {
|
||||
return strings.Index(name, defaultNSName) == 0 && strings.Index(name, zone) == len(defaultNSName)
|
||||
}
|
||||
|
||||
func (k *Kubernetes) nsAddr() *dns.A {
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package kubernetes
|
||||
|
||||
import (
|
||||
"net"
|
||||
"testing"
|
||||
|
||||
"k8s.io/client-go/1.5/pkg/api"
|
||||
@@ -36,7 +35,7 @@ func (APIConnTest) EndpointsList() api.EndpointsList {
|
||||
{
|
||||
Addresses: []api.EndpointAddress{
|
||||
{
|
||||
IP: "172.0.40.10",
|
||||
IP: "127.0.0.1",
|
||||
},
|
||||
},
|
||||
},
|
||||
@@ -54,8 +53,7 @@ func (APIConnTest) GetNodeByName(name string) (api.Node, error) { return api.Nod
|
||||
|
||||
func TestNsAddr(t *testing.T) {
|
||||
|
||||
k := Kubernetes{Zones: []string{"inter.webs.test"}}
|
||||
k.interfaceAddrsFunc = func() net.IP { return net.ParseIP("172.0.40.10") }
|
||||
k := New([]string{"inter.webs.test."})
|
||||
k.APIConn = &APIConnTest{}
|
||||
|
||||
cdr := k.nsAddr()
|
||||
|
||||
@@ -25,24 +25,17 @@ type recordRequest struct {
|
||||
|
||||
// parseRequest parses the qname to find all the elements we need for querying k8s.
|
||||
func (k *Kubernetes) parseRequest(state request.Request) (r recordRequest, err error) {
|
||||
// 3 Possible cases: TODO(chris): remove federations comments here.
|
||||
// SRV Request: _port._protocol.service.namespace.[federation.]type.zone
|
||||
// A Request (endpoint): endpoint.service.namespace.[federation.]type.zone
|
||||
// A Request (service): service.namespace.[federation.]type.zone
|
||||
// 3 Possible cases:
|
||||
// o SRV Request: _port._protocol.service.namespace.type.zone
|
||||
// o A Request (endpoint): endpoint.service.namespace.type.zone
|
||||
// o A Request (service): service.namespace.type.zone
|
||||
// Federations are handled in the federation middleware.
|
||||
|
||||
base, _ := dnsutil.TrimZone(state.Name(), state.Zone)
|
||||
segs := dns.SplitDomainName(base)
|
||||
|
||||
r.zone = state.Zone
|
||||
|
||||
if state.QType() == dns.TypeNS {
|
||||
return r, nil
|
||||
}
|
||||
|
||||
if state.QType() == dns.TypeA && isDefaultNS(state.Name(), r) {
|
||||
return r, nil
|
||||
}
|
||||
|
||||
offset := 0
|
||||
if state.QType() == dns.TypeSRV {
|
||||
// The kubernetes peer-finder expects queries with empty port and service to resolve
|
||||
@@ -99,8 +92,7 @@ func (k *Kubernetes) parseRequest(state request.Request) (r recordRequest, err e
|
||||
return r, errInvalidRequest
|
||||
}
|
||||
|
||||
// String return a string representation of r, it just returns all
|
||||
// fields concatenated with dots.
|
||||
// String return a string representation of r, it just returns all fields concatenated with dots.
|
||||
// This is mostly used in tests.
|
||||
func (r recordRequest) String() string {
|
||||
s := r.port
|
||||
|
||||
@@ -4,11 +4,12 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/coredns/coredns/request"
|
||||
|
||||
"github.com/miekg/dns"
|
||||
)
|
||||
|
||||
func TestParseRequest(t *testing.T) {
|
||||
k := Kubernetes{Zones: []string{zone}}
|
||||
k := New([]string{zone})
|
||||
|
||||
tests := []struct {
|
||||
query string
|
||||
@@ -30,10 +31,6 @@ func TestParseRequest(t *testing.T) {
|
||||
"1-2-3-4.webs.mynamespace.svc.inter.webs.test.", dns.TypeA,
|
||||
"..1-2-3-4.webs.mynamespace.svc.intern.webs.tests.",
|
||||
},
|
||||
{
|
||||
"inter.webs.test.", dns.TypeNS,
|
||||
"......intern.webs.tests.",
|
||||
},
|
||||
}
|
||||
for i, tc := range tests {
|
||||
m := new(dns.Msg)
|
||||
@@ -52,7 +49,7 @@ func TestParseRequest(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestParseInvalidRequest(t *testing.T) {
|
||||
k := Kubernetes{Zones: []string{zone}}
|
||||
k := New([]string{zone})
|
||||
|
||||
invalid := map[string]uint16{
|
||||
"_http._tcp.webs.mynamespace.svc.inter.webs.test.": dns.TypeA, // A requests cannot have port or protocol
|
||||
|
||||
@@ -77,8 +77,7 @@ func (APIConnReverseTest) GetNodeByName(name string) (api.Node, error) {
|
||||
|
||||
func TestReverse(t *testing.T) {
|
||||
|
||||
k := Kubernetes{Zones: []string{"cluster.local.", "0.10.in-addr.arpa."}}
|
||||
k.interfaceAddrsFunc = localPodIP
|
||||
k := New([]string{"cluster.local.", "0.10.in-addr.arpa."})
|
||||
k.APIConn = &APIConnReverseTest{}
|
||||
|
||||
tests := []test.Case{
|
||||
|
||||
@@ -67,10 +67,9 @@ func kubernetesParse(c *caddy.Controller) (*Kubernetes, error) {
|
||||
interfaceAddrsFunc: localPodIP,
|
||||
PodMode: PodModeDisabled,
|
||||
Proxy: proxy.Proxy{},
|
||||
autoPathSearch: searchFromResolvConf(),
|
||||
}
|
||||
|
||||
k8s.autoPathSearch = searchFromResolvConf()
|
||||
|
||||
for c.Next() {
|
||||
zones := c.RemainingArgs()
|
||||
|
||||
|
||||
@@ -19,12 +19,12 @@ func TestKubernetesParseReverseZone(t *testing.T) {
|
||||
c := caddy.NewTestController("dns", tc.input)
|
||||
k, err := kubernetesParse(c)
|
||||
if err != nil {
|
||||
t.Fatalf("Test %d: Expected no error, got %q", err)
|
||||
t.Fatalf("Test %d: Expected no error, got %q", i, err)
|
||||
}
|
||||
|
||||
zl := len(k.Zones)
|
||||
if zl != len(tc.expectedZones) {
|
||||
t.Errorf("Test %d: Expected kubernetes controller to be initialized with %d zones, found %d zones: '%v'", i, len(tc.expectedZones), zl)
|
||||
t.Errorf("Test %d: Expected kubernetes to be initialized with %d zones, found %d zones", i, len(tc.expectedZones), zl)
|
||||
}
|
||||
for i, z := range tc.expectedZones {
|
||||
if k.Zones[i] != z {
|
||||
|
||||
Reference in New Issue
Block a user