From 5c71bd0b870af092486a201ce255b4b0a25ea529 Mon Sep 17 00:00:00 2001 From: Arthur Outhenin-Chalandre Date: Mon, 19 May 2025 07:58:16 +0200 Subject: [PATCH] kubernetes: add multicluster support (#7266) * kubernetes: add multicluster support Add multicluster support via Multi-Cluster Services API (MCS-API) via a new option `multiclusterZones` in the kubernetes plugin. When some multicluster zones are passed to the kubernetes plugin, it will start watching the ServiceImport objects and its associated EndpointSlices. Signed-off-by: Arthur Outhenin-Chalandre * kubernetes: implement xfr support for multicluster zones Signed-off-by: Arthur Outhenin-Chalandre --------- Signed-off-by: Arthur Outhenin-Chalandre --- go.mod | 1 + go.sum | 2 + plugin/k8s_external/external_test.go | 9 +- plugin/kubernetes/README.md | 13 + plugin/kubernetes/controller.go | 275 ++++++++- plugin/kubernetes/controller_test.go | 80 ++- plugin/kubernetes/external.go | 2 +- plugin/kubernetes/external_test.go | 18 +- plugin/kubernetes/handler_test.go | 581 ++++++++++++++---- plugin/kubernetes/kubernetes.go | 139 ++++- plugin/kubernetes/kubernetes_test.go | 134 +++- plugin/kubernetes/metadata.go | 12 +- plugin/kubernetes/metadata_test.go | 23 +- plugin/kubernetes/ns_test.go | 22 +- .../object/multicluster_endpoint.go | 64 ++ plugin/kubernetes/object/serviceimport.go | 95 +++ plugin/kubernetes/parse.go | 22 +- plugin/kubernetes/parse_test.go | 23 +- plugin/kubernetes/reverse_test.go | 21 +- plugin/kubernetes/setup.go | 10 + plugin/kubernetes/setup_test.go | 72 +++ plugin/kubernetes/xfr.go | 242 +++++--- plugin/kubernetes/xfr_test.go | 72 ++- 23 files changed, 1634 insertions(+), 298 deletions(-) create mode 100644 plugin/kubernetes/object/multicluster_endpoint.go create mode 100644 plugin/kubernetes/object/serviceimport.go diff --git a/go.mod b/go.mod index 57e87bb0b..a333cdf89 100644 --- a/go.mod +++ b/go.mod @@ -42,6 +42,7 @@ require ( k8s.io/apimachinery v0.32.3 k8s.io/client-go v0.32.3 k8s.io/klog/v2 v2.130.1 + sigs.k8s.io/mcs-api v0.1.1-0.20250224121229-6c631f4730d0 ) require ( diff --git a/go.sum b/go.sum index 840abe5ac..b0f6b9387 100644 --- a/go.sum +++ b/go.sum @@ -595,6 +595,8 @@ modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 h1:/Rv+M11QRah1itp8VhT6HoVx1Ray9eB4DBr+K+/sCJ8= sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3/go.mod h1:18nIHnGi6636UCz6m8i4DhaJ65T6EruyzmoQqI2BVDo= +sigs.k8s.io/mcs-api v0.1.1-0.20250224121229-6c631f4730d0 h1:LChl5QBr39XNzUjscGlfBJYjyclDru70cLujcC8Vn/M= +sigs.k8s.io/mcs-api v0.1.1-0.20250224121229-6c631f4730d0/go.mod h1:M1Zjh0Jn/Z5e/2JHsZyEeLMw0qGBBmkJqEOc+OceERY= sigs.k8s.io/structured-merge-diff/v4 v4.4.2 h1:MdmvkGuXi/8io6ixD5wud3vOLwc1rj0aNqRlpuvjmwA= sigs.k8s.io/structured-merge-diff/v4 v4.4.2/go.mod h1:N8f93tFZh9U6vpxwRArLiikrE5/2tiu1w1AGfACIGE4= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= diff --git a/plugin/k8s_external/external_test.go b/plugin/k8s_external/external_test.go index 1e3630bdf..ae265e955 100644 --- a/plugin/k8s_external/external_test.go +++ b/plugin/k8s_external/external_test.go @@ -286,10 +286,17 @@ func (external) Run() {} func (external) Stop() error { return nil } func (external) EpIndexReverse(string) []*object.Endpoints { return nil } func (external) SvcIndexReverse(string) []*object.Service { return nil } -func (external) Modified(bool) int64 { return 0 } +func (external) Modified(kubernetes.ModifiedMode) int64 { return 0 } + +func (external) SvcImportIndex(s string) []*object.ServiceImport { return nil } +func (external) ServiceImportList() []*object.ServiceImport { return nil } +func (external) McEpIndex(s string) []*object.MultiClusterEndpoints { return nil } +func (external) MultiClusterEndpointsList(s string) []*object.MultiClusterEndpoints { return nil } + func (external) EpIndex(s string) []*object.Endpoints { return epIndexExternal[s] } + func (external) EndpointsList() []*object.Endpoints { var eps []*object.Endpoints for _, ep := range epIndexExternal { diff --git a/plugin/kubernetes/README.md b/plugin/kubernetes/README.md index eb5cac03e..5bb60b281 100644 --- a/plugin/kubernetes/README.md +++ b/plugin/kubernetes/README.md @@ -42,6 +42,7 @@ kubernetes [ZONES...] { noendpoints fallthrough [ZONES...] ignore empty_service + multicluster [ZONES...] } ``` @@ -101,6 +102,10 @@ kubernetes [ZONES...] { * `ignore empty_service` returns NXDOMAIN for services without any ready endpoint addresses (e.g., ready pods). This allows the querying pod to continue searching for the service in the search path. The search path could, for example, include another Kubernetes cluster. +* `multicluster` defines the multicluster zones as defined by Multi-Cluster + Services API (MCS-API). Specifying this option is generally paired with the + installation of an MCS-API implementation and the ServiceImport and ServiceExport + CRDs. The plugin MUST be authoritative for the zones listed here. Enabling zone transfer is done by using the *transfer* plugin. @@ -157,6 +162,14 @@ kubernetes cluster.local { } ~~~ +Configure multicluster + +~~~ txt +kubernetes cluster.local clusterset.local { + multicluster clusterset.local +} +~~~ + ## stubDomains and upstreamNameservers Here we use the *forward* plugin to implement a stubDomain that forwards `example.local` to the nameserver `10.100.0.10:53`. diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go index e7db294fc..f504037c5 100644 --- a/plugin/kubernetes/controller.go +++ b/plugin/kubernetes/controller.go @@ -15,29 +15,45 @@ import ( meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" + mcs "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" + mcsClientset "sigs.k8s.io/mcs-api/pkg/client/clientset/versioned/typed/apis/v1alpha1" ) const ( - podIPIndex = "PodIP" - svcNameNamespaceIndex = "ServiceNameNamespace" - svcIPIndex = "ServiceIP" - svcExtIPIndex = "ServiceExternalIP" - epNameNamespaceIndex = "EndpointNameNamespace" - epIPIndex = "EndpointsIP" + podIPIndex = "PodIP" + svcNameNamespaceIndex = "ServiceNameNamespace" + svcIPIndex = "ServiceIP" + svcExtIPIndex = "ServiceExternalIP" + epNameNamespaceIndex = "EndpointNameNamespace" + epIPIndex = "EndpointsIP" + svcImportNameNamespaceIndex = "ServiceImportNameNamespace" + mcEpNameNamespaceIndex = "MultiClusterEndpointsImportNameNamespace" +) + +type ModifiedMode int + +const ( + ModifiedInternal ModifiedMode = iota + ModifiedExternal + ModifiedMultiCluster ) type dnsController interface { ServiceList() []*object.Service EndpointsList() []*object.Endpoints + ServiceImportList() []*object.ServiceImport SvcIndex(string) []*object.Service SvcIndexReverse(string) []*object.Service SvcExtIndexReverse(string) []*object.Service + SvcImportIndex(string) []*object.ServiceImport PodIndex(string) []*object.Pod EpIndex(string) []*object.Endpoints EpIndexReverse(string) []*object.Endpoints + McEpIndex(string) []*object.MultiClusterEndpoints GetNodeByName(context.Context, string) (*api.Node, error) GetNamespaceByName(string) (*object.Namespace, error) @@ -46,9 +62,8 @@ type dnsController interface { HasSynced() bool Stop() error - // Modified returns the timestamp of the most recent changes to services. If the passed bool is true, it should - // return the timestamp of the most recent changes to services with external facing IP addresses - Modified(bool) int64 + // Modified returns the timestamp of the most recent changes to services. + Modified(ModifiedMode) int64 } type dnsControl struct { @@ -56,24 +71,32 @@ type dnsControl struct { // It needs to be first because it is guaranteed to be 8-byte // aligned ( we use sync.LoadAtomic with this ) modified int64 + // multiClusterModified tracks timestamp of the most recent changes to + // multi cluster services + multiClusterModified int64 // extModified tracks timestamp of the most recent changes to // services with external facing IP addresses extModified int64 - client kubernetes.Interface + client kubernetes.Interface + mcsClient mcsClientset.MulticlusterV1alpha1Interface selector labels.Selector namespaceSelector labels.Selector - svcController cache.Controller - podController cache.Controller - epController cache.Controller - nsController cache.Controller + svcController cache.Controller + podController cache.Controller + epController cache.Controller + nsController cache.Controller + svcImportController cache.Controller + mcEpController cache.Controller - svcLister cache.Indexer - podLister cache.Indexer - epLister cache.Indexer - nsLister cache.Store + svcLister cache.Indexer + podLister cache.Indexer + epLister cache.Indexer + nsLister cache.Store + svcImportLister cache.Indexer + mcEpLister cache.Indexer // stopLock is used to enforce only a single call to Stop is active. // Needed because we allow stopping through an http endpoint and @@ -82,8 +105,9 @@ type dnsControl struct { shutdown bool stopCh chan struct{} - zones []string - endpointNameMode bool + zones []string + endpointNameMode bool + multiclusterZones []string } type dnsControlOpts struct { @@ -97,19 +121,22 @@ type dnsControlOpts struct { namespaceLabelSelector *meta.LabelSelector namespaceSelector labels.Selector - zones []string - endpointNameMode bool + zones []string + endpointNameMode bool + multiclusterZones []string } // newdnsController creates a controller for CoreDNS. -func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts dnsControlOpts) *dnsControl { +func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, mcsClient mcsClientset.MulticlusterV1alpha1Interface, opts dnsControlOpts) *dnsControl { dns := dnsControl{ client: kubeClient, + mcsClient: mcsClient, selector: opts.selector, namespaceSelector: opts.namespaceSelector, stopCh: make(chan struct{}), zones: opts.zones, endpointNameMode: opts.endpointNameMode, + multiclusterZones: opts.multiclusterZones, } dns.svcLister, dns.svcController = object.NewIndexerInformer( @@ -164,6 +191,35 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts object.DefaultProcessor(object.ToNamespace, nil), ) + if len(opts.multiclusterZones) > 0 { + mcsEpReq, _ := labels.NewRequirement(mcs.LabelServiceName, selection.Exists, []string{}) + mcsEpSelector := dns.selector + if mcsEpSelector == nil { + mcsEpSelector = labels.NewSelector() + } + mcsEpSelector = mcsEpSelector.Add(*mcsEpReq) + dns.mcEpLister, dns.mcEpController = object.NewIndexerInformer( + &cache.ListWatch{ + ListFunc: endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, mcsEpSelector), + WatchFunc: endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, mcsEpSelector), + }, + &discovery.EndpointSlice{}, + cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, + cache.Indexers{mcEpNameNamespaceIndex: mcEpNameNamespaceIndexFunc}, + object.DefaultProcessor(object.EndpointSliceToMultiClusterEndpoints, dns.EndpointSliceLatencyRecorder()), + ) + dns.svcImportLister, dns.svcImportController = object.NewIndexerInformer( + &cache.ListWatch{ + ListFunc: serviceImportListFunc(ctx, dns.mcsClient, api.NamespaceAll, dns.namespaceSelector), + WatchFunc: serviceImportWatchFunc(ctx, dns.mcsClient, api.NamespaceAll, dns.namespaceSelector), + }, + &mcs.ServiceImport{}, + cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, + cache.Indexers{svcImportNameNamespaceIndex: svcImportNameNamespaceIndexFunc}, + object.DefaultProcessor(object.ToServiceImport, nil), + ) + } + return &dns } @@ -174,6 +230,7 @@ func (dns *dnsControl) EndpointsLatencyRecorder() *object.EndpointLatencyRecorde }, } } + func (dns *dnsControl) EndpointSliceLatencyRecorder() *object.EndpointLatencyRecorder { return &object.EndpointLatencyRecorder{ ServiceFunc: func(o meta.Object) []*object.Service { @@ -234,6 +291,22 @@ func epIPIndexFunc(obj interface{}) ([]string, error) { return ep.IndexIP, nil } +func svcImportNameNamespaceIndexFunc(obj interface{}) ([]string, error) { + s, ok := obj.(*object.ServiceImport) + if !ok { + return nil, errObj + } + return []string{s.Index}, nil +} + +func mcEpNameNamespaceIndexFunc(obj interface{}) ([]string, error) { + mcEp, ok := obj.(*object.MultiClusterEndpoints) + if !ok { + return nil, errObj + } + return []string{mcEp.Index}, nil +} + func serviceListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) { return func(opts meta.ListOptions) (runtime.Object, error) { if s != nil { @@ -274,6 +347,15 @@ func namespaceListFunc(ctx context.Context, c kubernetes.Interface, s labels.Sel } } +func serviceImportListFunc(ctx context.Context, c mcsClientset.MulticlusterV1alpha1Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) { + return func(opts meta.ListOptions) (runtime.Object, error) { + if s != nil { + opts.LabelSelector = s.String() + } + return c.ServiceImports(ns).List(ctx, opts) + } +} + func serviceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { return func(options meta.ListOptions) (watch.Interface, error) { if s != nil { @@ -314,6 +396,15 @@ func namespaceWatchFunc(ctx context.Context, c kubernetes.Interface, s labels.Se } } +func serviceImportWatchFunc(ctx context.Context, c mcsClientset.MulticlusterV1alpha1Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { + return func(options meta.ListOptions) (watch.Interface, error) { + if s != nil { + options.LabelSelector = s.String() + } + return c.ServiceImports(ns).Watch(ctx, options) + } +} + // Stop stops the controller. func (dns *dnsControl) Stop() error { dns.stopLock.Lock() @@ -342,6 +433,12 @@ func (dns *dnsControl) Run() { go dns.podController.Run(dns.stopCh) } go dns.nsController.Run(dns.stopCh) + if dns.svcImportController != nil { + go dns.svcImportController.Run(dns.stopCh) + } + if dns.mcEpController != nil { + go dns.mcEpController.Run(dns.stopCh) + } <-dns.stopCh } @@ -357,7 +454,15 @@ func (dns *dnsControl) HasSynced() bool { c = dns.podController.HasSynced() } d := dns.nsController.HasSynced() - return a && b && c && d + e := true + if dns.svcImportController != nil { + c = dns.svcImportController.HasSynced() + } + f := true + if dns.mcEpController != nil { + c = dns.mcEpController.HasSynced() + } + return a && b && c && d && e && f } func (dns *dnsControl) ServiceList() (svcs []*object.Service) { @@ -372,6 +477,18 @@ func (dns *dnsControl) ServiceList() (svcs []*object.Service) { return svcs } +func (dns *dnsControl) ServiceImportList() (svcs []*object.ServiceImport) { + os := dns.svcImportLister.List() + for _, o := range os { + s, ok := o.(*object.ServiceImport) + if !ok { + continue + } + svcs = append(svcs, s) + } + return svcs +} + func (dns *dnsControl) EndpointsList() (eps []*object.Endpoints) { os := dns.epLister.List() for _, o := range os { @@ -446,6 +563,21 @@ func (dns *dnsControl) SvcExtIndexReverse(ip string) (svcs []*object.Service) { return svcs } +func (dns *dnsControl) SvcImportIndex(idx string) (svcs []*object.ServiceImport) { + os, err := dns.svcImportLister.ByIndex(svcImportNameNamespaceIndex, idx) + if err != nil { + return nil + } + for _, o := range os { + s, ok := o.(*object.ServiceImport) + if !ok { + continue + } + svcs = append(svcs, s) + } + return svcs +} + func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) { os, err := dns.epLister.ByIndex(epNameNamespaceIndex, idx) if err != nil { @@ -476,6 +608,21 @@ func (dns *dnsControl) EpIndexReverse(ip string) (ep []*object.Endpoints) { return ep } +func (dns *dnsControl) McEpIndex(idx string) (ep []*object.MultiClusterEndpoints) { + os, err := dns.mcEpLister.ByIndex(mcEpNameNamespaceIndex, idx) + if err != nil { + return nil + } + for _, o := range os { + e, ok := o.(*object.MultiClusterEndpoints) + if !ok { + continue + } + ep = append(ep, e) + } + return ep +} + // GetNodeByName return the node by name. If nothing is found an error is // returned. This query causes a round trip to the k8s API server, so use // sparingly. Currently, this is only used for Federation. @@ -523,12 +670,20 @@ func (dns *dnsControl) detectChanges(oldObj, newObj interface{}) { if emod { dns.updateExtModified() } + case *object.ServiceImport: + if !serviceImportEquivalent(oldObj, newObj) { + dns.updateMultiClusterModified() + } case *object.Pod: dns.updateModified() case *object.Endpoints: if !endpointsEquivalent(oldObj.(*object.Endpoints), newObj.(*object.Endpoints)) { dns.updateModified() } + case *object.MultiClusterEndpoints: + if !multiclusterEndpointsEquivalent(oldObj.(*object.MultiClusterEndpoints), newObj.(*object.MultiClusterEndpoints)) { + dns.updateMultiClusterModified() + } default: log.Warningf("Updates for %T not supported.", ob) } @@ -596,6 +751,23 @@ func endpointsEquivalent(a, b *object.Endpoints) bool { return true } +// multiclusterEndpointsEquivalent checks if the update to an endpoint is something +// that matters to us or if they are effectively equivalent. +func multiclusterEndpointsEquivalent(a, b *object.MultiClusterEndpoints) bool { + if a == nil || b == nil { + return false + } + + if !endpointsEquivalent(&a.Endpoints, &b.Endpoints) { + return false + } + if a.ClusterId != b.ClusterId { + return false + } + + return true +} + // serviceModified checks the services passed for changes that result in changes // to internal and or external records. It returns two booleans, one for internal // record changes, and a second for external record changes @@ -651,11 +823,52 @@ func serviceModified(oldObj, newObj interface{}) (intSvc, extSvc bool) { return intSvc, extSvc } -func (dns *dnsControl) Modified(external bool) int64 { - if external { - return atomic.LoadInt64(&dns.extModified) +// serviceImportEquivalent checks if the update to a ServiceImport is something +// that matters to us or if they are effectively equivalent. +func serviceImportEquivalent(oldObj, newObj interface{}) bool { + if oldObj != nil && newObj == nil { + return false } - return atomic.LoadInt64(&dns.modified) + if oldObj == nil && newObj != nil { + return false + } + + newSvc := newObj.(*object.ServiceImport) + oldSvc := oldObj.(*object.ServiceImport) + + if oldSvc.Type != newSvc.Type { + return false + } + + // All Port fields are mutable, affecting both internal/external zone records + if len(oldSvc.Ports) != len(newSvc.Ports) { + return false + } + for i := range oldSvc.Ports { + if oldSvc.Ports[i].Name != newSvc.Ports[i].Name { + return false + } + if oldSvc.Ports[i].Port != newSvc.Ports[i].Port { + return false + } + if oldSvc.Ports[i].Protocol != newSvc.Ports[i].Protocol { + return false + } + } + + return true +} + +func (dns *dnsControl) Modified(mode ModifiedMode) int64 { + switch mode { + case ModifiedInternal: + return atomic.LoadInt64(&dns.modified) + case ModifiedExternal: + return atomic.LoadInt64(&dns.extModified) + case ModifiedMultiCluster: + return atomic.LoadInt64(&dns.multiClusterModified) + } + return -1 } // updateModified set dns.modified to the current time. @@ -664,6 +877,12 @@ func (dns *dnsControl) updateModified() { atomic.StoreInt64(&dns.modified, unix) } +// updateMultiClusterModified set dns.modified to the current time. +func (dns *dnsControl) updateMultiClusterModified() { + unix := time.Now().Unix() + atomic.StoreInt64(&dns.multiClusterModified, unix) +} + // updateExtModified set dns.extModified to the current time. func (dns *dnsControl) updateExtModified() { unix := time.Now().Unix() diff --git a/plugin/kubernetes/controller_test.go b/plugin/kubernetes/controller_test.go index 1b51954fe..a4aeffc14 100644 --- a/plugin/kubernetes/controller_test.go +++ b/plugin/kubernetes/controller_test.go @@ -4,6 +4,7 @@ import ( "context" "net" "strconv" + "strings" "testing" "time" @@ -17,6 +18,9 @@ import ( meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" + mcs "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" + mcsClientsetFake "sigs.k8s.io/mcs-api/pkg/client/clientset/versioned/fake" + mcsClientset "sigs.k8s.io/mcs-api/pkg/client/clientset/versioned/typed/apis/v1alpha1" ) func inc(ip net.IP) { @@ -28,29 +32,32 @@ func inc(ip net.IP) { } } -func kubernetesWithFakeClient(ctx context.Context, zone, cidr string, initEndpointsCache bool, svcType string) *Kubernetes { +func kubernetesWithFakeClient(ctx context.Context, cidr string, initEndpointsCache bool, svcType string) *Kubernetes { client := fake.NewSimpleClientset() + mcsClient := mcsClientsetFake.NewSimpleClientset() dco := dnsControlOpts{ - zones: []string{zone}, + zones: []string{"cluster.local.", "clusterset.local."}, + multiclusterZones: []string{"clusterset.local."}, initEndpointsCache: initEndpointsCache, } - controller := newdnsController(ctx, client, dco) + controller := newdnsController(ctx, client, mcsClient.MulticlusterV1alpha1(), dco) // Add resources _, err := client.CoreV1().Namespaces().Create(ctx, &api.Namespace{ObjectMeta: meta.ObjectMeta{Name: "testns"}}, meta.CreateOptions{}) if err != nil { log.Fatal(err) } - generateSvcs(cidr, svcType, client) - generateEndpointSlices(cidr, client) - k := New([]string{"cluster.local."}) + generateSvcs(cidr, svcType, client, mcsClient.MulticlusterV1alpha1()) + generateEndpointSlices(cidr, svcType, client) + k := New([]string{"cluster.local.", "clusterset.local."}) k.APIConn = controller + k.opts.multiclusterZones = []string{"clusterset.local."} return k } func BenchmarkController(b *testing.B) { ctx := context.Background() - k := kubernetesWithFakeClient(ctx, "cluster.local.", "10.0.0.0/24", true, "all") + k := kubernetesWithFakeClient(ctx, "10.0.0.0/24", true, "all") go k.APIConn.Run() defer k.APIConn.Stop() @@ -70,7 +77,7 @@ func BenchmarkController(b *testing.B) { func TestEndpointsDisabled(t *testing.T) { ctx := context.Background() - k := kubernetesWithFakeClient(ctx, "cluster.local.", "10.0.0.0/30", false, "headless") + k := kubernetesWithFakeClient(ctx, "10.0.0.0/30", false, "headless") k.opts.initEndpointsCache = false go k.APIConn.Run() @@ -90,7 +97,7 @@ func TestEndpointsDisabled(t *testing.T) { func TestEndpointsEnabled(t *testing.T) { ctx := context.Background() - k := kubernetesWithFakeClient(ctx, "cluster.local.", "10.0.0.0/30", true, "headless") + k := kubernetesWithFakeClient(ctx, "10.0.0.0/30", true, "headless") k.opts.initEndpointsCache = true go k.APIConn.Run() @@ -108,7 +115,27 @@ func TestEndpointsEnabled(t *testing.T) { } } -func generateEndpointSlices(cidr string, client kubernetes.Interface) { +func TestMultiClusterHeadless(t *testing.T) { + ctx := context.Background() + k := kubernetesWithFakeClient(ctx, "10.0.0.0/30", true, "mcs-headless") + k.opts.initEndpointsCache = true + + go k.APIConn.Run() + defer k.APIConn.Stop() + for !k.APIConn.HasSynced() { + time.Sleep(time.Millisecond) + } + + rw := &dnstest.Recorder{ResponseWriter: &test.ResponseWriter{}} + m := new(dns.Msg) + m.SetQuestion("svc2.testns.svc.clusterset.local.", dns.TypeA) + k.ServeDNS(ctx, rw, m) + if rw.Msg.Rcode != dns.RcodeSuccess { + t.Errorf("Expected SUCCESS, got %v", dns.RcodeToString[rw.Msg.Rcode]) + } +} + +func generateEndpointSlices(cidr string, svcType string, client kubernetes.Interface) { // https://groups.google.com/d/msg/golang-nuts/zlcYA4qk-94/TWRFHeXJCcYJ ip, ipnet, err := net.ParseCIDR(cidr) if err != nil { @@ -141,7 +168,11 @@ func generateEndpointSlices(cidr string, client kubernetes.Interface) { }, } eps.Name = "svc" + strconv.Itoa(count) - eps.Labels = map[string]string{discovery.LabelServiceName: eps.Name} + if !strings.Contains(svcType, "mcs") { + eps.Labels = map[string]string{discovery.LabelServiceName: eps.Name} + } else { + eps.Labels = map[string]string{mcs.LabelServiceName: eps.Name} + } _, err := client.DiscoveryV1().EndpointSlices("testns").Create(ctx, eps, meta.CreateOptions{}) if err != nil { log.Fatal(err) @@ -150,7 +181,7 @@ func generateEndpointSlices(cidr string, client kubernetes.Interface) { } } -func generateSvcs(cidr string, svcType string, client kubernetes.Interface) { +func generateSvcs(cidr string, svcType string, client kubernetes.Interface, mcsClient mcsClientset.MulticlusterV1alpha1Interface) { ip, ipnet, err := net.ParseCIDR(cidr) if err != nil { log.Fatal(err) @@ -173,6 +204,11 @@ func generateSvcs(cidr string, svcType string, client kubernetes.Interface) { createExternalSvc(count, client, ip) count++ } + case "mcs-headless": + for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); inc(ip) { + createMultiClusterHeadlessSvc(count, mcsClient, ip) + count++ + } default: for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); inc(ip) { switch count % 3 { @@ -238,8 +274,26 @@ func createExternalSvc(suffix int, client kubernetes.Interface, ip net.IP) { }, meta.CreateOptions{}) } +func createMultiClusterHeadlessSvc(suffix int, mcsClient mcsClientset.MulticlusterV1alpha1Interface, ip net.IP) { + ctx := context.TODO() + mcsClient.ServiceImports("testns").Create(ctx, &mcs.ServiceImport{ + ObjectMeta: meta.ObjectMeta{ + Name: "svc" + strconv.Itoa(suffix), + Namespace: "testns", + }, + Spec: mcs.ServiceImportSpec{ + Ports: []mcs.ServicePort{{ + Name: "http", + Protocol: "tcp", + Port: 80, + }}, + Type: mcs.Headless, + }, + }, meta.CreateOptions{}) +} + func TestServiceModified(t *testing.T) { - var tests = []struct { + tests := []struct { oldSvc interface{} newSvc interface{} ichanged bool diff --git a/plugin/kubernetes/external.go b/plugin/kubernetes/external.go index fa167782c..88c43be6b 100644 --- a/plugin/kubernetes/external.go +++ b/plugin/kubernetes/external.go @@ -213,7 +213,7 @@ func (k *Kubernetes) ExternalServices(zone string, headless bool) (services []ms // ExternalSerial returns the serial of the external zone func (k *Kubernetes) ExternalSerial(string) uint32 { - return uint32(k.APIConn.Modified(true)) + return uint32(k.APIConn.Modified(ModifiedExternal)) } // ExternalReverse does a reverse lookup for the external IPs diff --git a/plugin/kubernetes/external_test.go b/plugin/kubernetes/external_test.go index 474b7be44..b584cabef 100644 --- a/plugin/kubernetes/external_test.go +++ b/plugin/kubernetes/external_test.go @@ -90,16 +90,20 @@ func TestExternal(t *testing.T) { type external struct{} -func (external) HasSynced() bool { return true } -func (external) Run() {} -func (external) Stop() error { return nil } -func (external) EpIndexReverse(string) []*object.Endpoints { return nil } -func (external) SvcIndexReverse(string) []*object.Service { return nil } -func (external) SvcExtIndexReverse(string) []*object.Service { return nil } -func (external) Modified(bool) int64 { return 0 } +func (external) HasSynced() bool { return true } +func (external) Run() {} +func (external) Stop() error { return nil } +func (external) EpIndexReverse(string) []*object.Endpoints { return nil } +func (external) SvcIndexReverse(string) []*object.Service { return nil } +func (external) SvcExtIndexReverse(string) []*object.Service { return nil } +func (external) SvcImportIndex(string) []*object.ServiceImport { return nil } +func (external) ServiceImportList() []*object.ServiceImport { return nil } +func (external) McEpIndex(string) []*object.MultiClusterEndpoints { return nil } +func (external) Modified(ModifiedMode) int64 { return 0 } func (external) EpIndex(s string) []*object.Endpoints { return epIndexExternal[s] } + func (external) EndpointsList() []*object.Endpoints { var eps []*object.Endpoints for _, ep := range epIndexExternal { diff --git a/plugin/kubernetes/handler_test.go b/plugin/kubernetes/handler_test.go index 683e0de7a..28ba5af28 100644 --- a/plugin/kubernetes/handler_test.go +++ b/plugin/kubernetes/handler_test.go @@ -13,6 +13,7 @@ import ( "github.com/miekg/dns" api "k8s.io/api/core/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" + mcs "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) type kubeTestCase struct { @@ -21,7 +22,8 @@ type kubeTestCase struct { test.Case } -var dnsTestCases = []kubeTestCase{ +// test cases working for both regular and multicluster zone +var commonDnsTestCases = []kubeTestCase{ // A Service {Case: test.Case{ Qname: "svc1.testns.svc.cluster.local.", Qtype: dns.TypeA, @@ -57,7 +59,6 @@ var dnsTestCases = []kubeTestCase{ }}, // SRV Service {Case: test.Case{ - Qname: "_http._tcp.svc1.testns.svc.cluster.local.", Qtype: dns.TypeSRV, Rcode: dns.RcodeSuccess, Answer: []dns.RR{ @@ -68,7 +69,6 @@ var dnsTestCases = []kubeTestCase{ }, }}, {Case: test.Case{ - Qname: "_http._tcp.svcempty.testns.svc.cluster.local.", Qtype: dns.TypeSRV, Rcode: dns.RcodeSuccess, Answer: []dns.RR{ @@ -97,116 +97,6 @@ var dnsTestCases = []kubeTestCase{ test.A("hdlsprtls.testns.svc.cluster.local. 5 IN A 172.0.0.20"), }, }}, - // An Endpoint with no port - {Case: test.Case{ - Qname: "172-0-0-20.hdlsprtls.testns.svc.cluster.local.", Qtype: dns.TypeA, - Rcode: dns.RcodeSuccess, - Answer: []dns.RR{ - test.A("172-0-0-20.hdlsprtls.testns.svc.cluster.local. 5 IN A 172.0.0.20"), - }, - }}, - // An Endpoint ip - {Case: test.Case{ - Qname: "172-0-0-2.hdls1.testns.svc.cluster.local.", Qtype: dns.TypeA, - Rcode: dns.RcodeSuccess, - Answer: []dns.RR{ - test.A("172-0-0-2.hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.2"), - }, - }}, - // A Endpoint ip - {Case: test.Case{ - Qname: "172-0-0-3.hdls1.testns.svc.cluster.local.", Qtype: dns.TypeA, - Rcode: dns.RcodeSuccess, - Answer: []dns.RR{ - test.A("172-0-0-3.hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.3"), - }, - }}, - // An Endpoint by name - {Case: test.Case{ - Qname: "dup-name.hdls1.testns.svc.cluster.local.", Qtype: dns.TypeA, - Rcode: dns.RcodeSuccess, - Answer: []dns.RR{ - test.A("dup-name.hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.4"), - test.A("dup-name.hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.5"), - }, - }}, - // SRV Service (Headless) - {Case: test.Case{ - Qname: "_http._tcp.hdls1.testns.svc.cluster.local.", Qtype: dns.TypeSRV, - Rcode: dns.RcodeSuccess, - Answer: []dns.RR{ - test.SRV("_http._tcp.hdls1.testns.svc.cluster.local. 5 IN SRV 0 16 80 172-0-0-2.hdls1.testns.svc.cluster.local."), - test.SRV("_http._tcp.hdls1.testns.svc.cluster.local. 5 IN SRV 0 16 80 172-0-0-3.hdls1.testns.svc.cluster.local."), - test.SRV("_http._tcp.hdls1.testns.svc.cluster.local. 5 IN SRV 0 16 80 5678-abcd--1.hdls1.testns.svc.cluster.local."), - test.SRV("_http._tcp.hdls1.testns.svc.cluster.local. 5 IN SRV 0 16 80 5678-abcd--2.hdls1.testns.svc.cluster.local."), - test.SRV("_http._tcp.hdls1.testns.svc.cluster.local. 5 IN SRV 0 16 80 dup-name.hdls1.testns.svc.cluster.local."), - }, - Extra: []dns.RR{ - test.A("172-0-0-2.hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.2"), - test.A("172-0-0-3.hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.3"), - test.AAAA("5678-abcd--1.hdls1.testns.svc.cluster.local. 5 IN AAAA 5678:abcd::1"), - test.AAAA("5678-abcd--2.hdls1.testns.svc.cluster.local. 5 IN AAAA 5678:abcd::2"), - test.A("dup-name.hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.4"), - test.A("dup-name.hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.5"), - }, - }}, - {Case: test.Case{ // An A record query for an existing headless service should return a record for each of its ipv4 endpoints - Qname: "hdls1.testns.svc.cluster.local.", Qtype: dns.TypeA, - Rcode: dns.RcodeSuccess, - Answer: []dns.RR{ - test.A("hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.2"), - test.A("hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.3"), - test.A("hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.4"), - test.A("hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.5"), - }, - }}, - // AAAA - {Case: test.Case{ - Qname: "5678-abcd--2.hdls1.testns.svc.cluster.local", Qtype: dns.TypeAAAA, - Rcode: dns.RcodeSuccess, - Answer: []dns.RR{test.AAAA("5678-abcd--2.hdls1.testns.svc.cluster.local. 5 IN AAAA 5678:abcd::2")}, - }}, - // CNAME External - {Case: test.Case{ - Qname: "external.testns.svc.cluster.local.", Qtype: dns.TypeCNAME, - Rcode: dns.RcodeSuccess, - Answer: []dns.RR{ - test.CNAME("external.testns.svc.cluster.local. 5 IN CNAME ext.interwebs.test."), - }, - }}, - // CNAME External Truncated Lookup - { - Case: test.Case{ - Qname: "external.testns.svc.cluster.local.", Qtype: dns.TypeA, - Rcode: dns.RcodeSuccess, - Answer: []dns.RR{ - test.A("ext.interwebs.test. 5 IN A 1.2.3.4"), - test.CNAME("external.testns.svc.cluster.local. 5 IN CNAME ext.interwebs.test."), - }, - }, - Upstream: &Upstub{ - Truncated: true, - Qclass: dns.ClassINET, - Case: test.Case{ - Qname: "external.testns.svc.cluster.local.", - Qtype: dns.TypeA, - Answer: []dns.RR{ - test.A("ext.interwebs.test. 5 IN A 1.2.3.4"), - test.CNAME("external.testns.svc.cluster.local. 5 IN CNAME ext.interwebs.test."), - }, - }, - }, - Truncated: true, - }, - // CNAME External To Internal Service - {Case: test.Case{ - Qname: "external-to-service.testns.svc.cluster.local", Qtype: dns.TypeA, - Rcode: dns.RcodeSuccess, - Answer: []dns.RR{ - test.CNAME("external-to-service.testns.svc.cluster.local. 5 IN CNAME svc1.testns.svc.cluster.local."), - test.A("svc1.testns.svc.cluster.local. 5 IN A 10.0.0.1"), - }, - }}, // AAAA Service (with an existing A record, but no AAAA record) {Case: test.Case{ Qname: "svc1.testns.svc.cluster.local.", Qtype: dns.TypeAAAA, @@ -307,14 +197,6 @@ var dnsTestCases = []kubeTestCase{ test.AAAA("hdls1.testns.svc.cluster.local. 5 IN AAAA 5678:abcd::2"), }, }}, - // AAAA Endpoint - {Case: test.Case{ - Qname: "5678-abcd--1.hdls1.testns.svc.cluster.local.", Qtype: dns.TypeAAAA, - Rcode: dns.RcodeSuccess, - Answer: []dns.RR{ - test.AAAA("5678-abcd--1.hdls1.testns.svc.cluster.local. 5 IN AAAA 5678:abcd::1"), - }, - }}, {Case: test.Case{ Qname: "svc.cluster.local.", Qtype: dns.TypeA, @@ -418,6 +300,127 @@ var dnsTestCases = []kubeTestCase{ }}, } +var dnsTestCases = []kubeTestCase{ + // SRV Service (Headless) + {Case: test.Case{ + Qname: "_http._tcp.hdls1.testns.svc.cluster.local.", Qtype: dns.TypeSRV, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.SRV("_http._tcp.hdls1.testns.svc.cluster.local. 5 IN SRV 0 16 80 172-0-0-2.hdls1.testns.svc.cluster.local."), + test.SRV("_http._tcp.hdls1.testns.svc.cluster.local. 5 IN SRV 0 16 80 172-0-0-3.hdls1.testns.svc.cluster.local."), + test.SRV("_http._tcp.hdls1.testns.svc.cluster.local. 5 IN SRV 0 16 80 5678-abcd--1.hdls1.testns.svc.cluster.local."), + test.SRV("_http._tcp.hdls1.testns.svc.cluster.local. 5 IN SRV 0 16 80 5678-abcd--2.hdls1.testns.svc.cluster.local."), + test.SRV("_http._tcp.hdls1.testns.svc.cluster.local. 5 IN SRV 0 16 80 dup-name.hdls1.testns.svc.cluster.local."), + }, + Extra: []dns.RR{ + test.A("172-0-0-2.hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.2"), + test.A("172-0-0-3.hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.3"), + test.AAAA("5678-abcd--1.hdls1.testns.svc.cluster.local. 5 IN AAAA 5678:abcd::1"), + test.AAAA("5678-abcd--2.hdls1.testns.svc.cluster.local. 5 IN AAAA 5678:abcd::2"), + test.A("dup-name.hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.4"), + test.A("dup-name.hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.5"), + }, + }}, + {Case: test.Case{ // An A record query for an existing headless service should return a record for each of its ipv4 endpoints + Qname: "hdls1.testns.svc.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.A("hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.2"), + test.A("hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.3"), + test.A("hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.4"), + test.A("hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.5"), + }, + }}, + // AAAA + {Case: test.Case{ + Qname: "5678-abcd--2.hdls1.testns.svc.cluster.local", Qtype: dns.TypeAAAA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{test.AAAA("5678-abcd--2.hdls1.testns.svc.cluster.local. 5 IN AAAA 5678:abcd::2")}, + }}, + // AAAA Endpoint + {Case: test.Case{ + Qname: "5678-abcd--1.hdls1.testns.svc.cluster.local.", Qtype: dns.TypeAAAA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.AAAA("5678-abcd--1.hdls1.testns.svc.cluster.local. 5 IN AAAA 5678:abcd::1"), + }, + }}, + // An Endpoint with no port + {Case: test.Case{ + Qname: "172-0-0-20.hdlsprtls.testns.svc.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.A("172-0-0-20.hdlsprtls.testns.svc.cluster.local. 5 IN A 172.0.0.20"), + }, + }}, + // An Endpoint ip + {Case: test.Case{ + Qname: "172-0-0-2.hdls1.testns.svc.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.A("172-0-0-2.hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.2"), + }, + }}, + // A Endpoint ip + {Case: test.Case{ + Qname: "172-0-0-3.hdls1.testns.svc.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.A("172-0-0-3.hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.3"), + }, + }}, + // An Endpoint by name + {Case: test.Case{ + Qname: "dup-name.hdls1.testns.svc.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.A("dup-name.hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.4"), + test.A("dup-name.hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.5"), + }, + }}, + // CNAME External + {Case: test.Case{ + Qname: "external.testns.svc.cluster.local.", Qtype: dns.TypeCNAME, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.CNAME("external.testns.svc.cluster.local. 5 IN CNAME ext.interwebs.test."), + }, + }}, + // CNAME External Truncated Lookup + { + Case: test.Case{ + Qname: "external.testns.svc.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.A("ext.interwebs.test. 5 IN A 1.2.3.4"), + test.CNAME("external.testns.svc.cluster.local. 5 IN CNAME ext.interwebs.test."), + }, + }, + Upstream: &Upstub{ + Truncated: true, + Qclass: dns.ClassINET, + Case: test.Case{ + Qname: "external.testns.svc.cluster.local.", + Qtype: dns.TypeA, + Answer: []dns.RR{ + test.A("ext.interwebs.test. 5 IN A 1.2.3.4"), + test.CNAME("external.testns.svc.cluster.local. 5 IN CNAME ext.interwebs.test."), + }, + }, + }, + Truncated: true, + }, + // CNAME External To Internal Service + {Case: test.Case{ + Qname: "external-to-service.testns.svc.cluster.local", Qtype: dns.TypeA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.CNAME("external-to-service.testns.svc.cluster.local. 5 IN CNAME svc1.testns.svc.cluster.local."), + test.A("svc1.testns.svc.cluster.local. 5 IN A 10.0.0.1"), + }, + }}, +} + func TestServeDNS(t *testing.T) { k := New([]string{"cluster.local."}) k.APIConn = &APIConnServeTest{} @@ -425,7 +428,139 @@ func TestServeDNS(t *testing.T) { k.Namespaces = map[string]struct{}{"testns": {}} ctx := context.TODO() - for i, tc := range dnsTestCases { + for i, tc := range append(commonDnsTestCases, dnsTestCases...) { + k.Upstream = tc.Upstream + + r := tc.Msg() + + w := dnstest.NewRecorder(&test.ResponseWriter{}) + + _, err := k.ServeDNS(ctx, w, r) + if err != tc.Error { + t.Errorf("Test %d expected no error, got %v", i, err) + return + } + if tc.Error != nil { + continue + } + + resp := w.Msg + if resp == nil { + t.Fatalf("Test %d, got nil message and no error for %q", i, r.Question[0].Name) + } + + if tc.Truncated != resp.Truncated { + t.Errorf("Expected truncation %t, got truncation %t", tc.Truncated, resp.Truncated) + } + + // Before sorting, make sure that CNAMES do not appear after their target records + if err := test.CNAMEOrder(resp); err != nil { + t.Errorf("Test %d, %v", i, err) + } + + if err := test.SortAndCheck(resp, tc.Case); err != nil { + t.Errorf("Test %d, %v", i, err) + } + } +} + +var multiclusterDnsTestCases = []kubeTestCase{ + // SRV Service (Headless) + {Case: test.Case{ + Qname: "_http._tcp.hdls1.testns.svc.cluster.local.", Qtype: dns.TypeSRV, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.SRV("_http._tcp.hdls1.testns.svc.cluster.local. 5 IN SRV 0 16 80 172-0-0-2.cluster1.hdls1.testns.svc.cluster.local."), + test.SRV("_http._tcp.hdls1.testns.svc.cluster.local. 5 IN SRV 0 16 80 172-0-0-3.cluster1.hdls1.testns.svc.cluster.local."), + test.SRV("_http._tcp.hdls1.testns.svc.cluster.local. 5 IN SRV 0 16 80 5678-abcd--1.cluster1.hdls1.testns.svc.cluster.local."), + test.SRV("_http._tcp.hdls1.testns.svc.cluster.local. 5 IN SRV 0 16 80 5678-abcd--2.cluster1.hdls1.testns.svc.cluster.local."), + test.SRV("_http._tcp.hdls1.testns.svc.cluster.local. 5 IN SRV 0 16 80 dup-name.cluster1.hdls1.testns.svc.cluster.local."), + }, + Extra: []dns.RR{ + test.A("172-0-0-2.cluster1.hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.2"), + test.A("172-0-0-3.cluster1.hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.3"), + test.AAAA("5678-abcd--1.cluster1.hdls1.testns.svc.cluster.local. 5 IN AAAA 5678:abcd::1"), + test.AAAA("5678-abcd--2.cluster1.hdls1.testns.svc.cluster.local. 5 IN AAAA 5678:abcd::2"), + test.A("dup-name.cluster1.hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.4"), + test.A("dup-name.cluster1.hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.5"), + }, + }}, + {Case: test.Case{ // An A record query for an existing headless service should return a record for each of its ipv4 endpoints + Qname: "hdls1.testns.svc.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.A("hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.2"), + test.A("hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.3"), + test.A("hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.4"), + test.A("hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.5"), + }, + }}, + // AAAA + {Case: test.Case{ + Qname: "5678-abcd--2.cluster1.hdls1.testns.svc.cluster.local", Qtype: dns.TypeAAAA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{test.AAAA("5678-abcd--2.cluster1.hdls1.testns.svc.cluster.local. 5 IN AAAA 5678:abcd::2")}, + }}, + // AAAA Endpoint + {Case: test.Case{ + Qname: "5678-abcd--1.cluster1.hdls1.testns.svc.cluster.local.", Qtype: dns.TypeAAAA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.AAAA("5678-abcd--1.cluster1.hdls1.testns.svc.cluster.local. 5 IN AAAA 5678:abcd::1"), + }, + }}, + // An Endpoint with no port + {Case: test.Case{ + Qname: "172-0-0-20.cluster1.hdlsprtls.testns.svc.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.A("172-0-0-20.cluster1.hdlsprtls.testns.svc.cluster.local. 5 IN A 172.0.0.20"), + }, + }}, + // AAAA Endpoint without specifying the clusterid + {Case: test.Case{ + Qname: "5678-abcd--1.hdls1.testns.svc.cluster.local.", Qtype: dns.TypeAAAA, + Rcode: dns.RcodeNameError, + Ns: []dns.RR{ + test.SOA("cluster.local. 5 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 1499347823 7200 1800 86400 5"), + }, + }}, + // An Endpoint ip + {Case: test.Case{ + Qname: "172-0-0-2.cluster1.hdls1.testns.svc.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.A("172-0-0-2.cluster1.hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.2"), + }, + }}, + // A Endpoint ip + {Case: test.Case{ + Qname: "172-0-0-3.cluster1.hdls1.testns.svc.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.A("172-0-0-3.cluster1.hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.3"), + }, + }}, + // An Endpoint by name + {Case: test.Case{ + Qname: "dup-name.cluster1.hdls1.testns.svc.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.A("dup-name.cluster1.hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.4"), + test.A("dup-name.cluster1.hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.5"), + }, + }}, +} + +func TestMultiClusterServeDNS(t *testing.T) { + k := New([]string{"cluster.local."}) + k.opts.multiclusterZones = []string{"cluster.local."} + k.APIConn = &APIConnServeTest{} + k.Next = test.NextHandler(dns.RcodeSuccess, nil) + k.Namespaces = map[string]struct{}{"testns": {}} + ctx := context.TODO() + + for i, tc := range append(commonDnsTestCases, multiclusterDnsTestCases...) { k.Upstream = tc.Upstream r := tc.Msg() @@ -573,7 +708,7 @@ func (APIConnServeTest) Stop() error { return ni func (APIConnServeTest) EpIndexReverse(string) []*object.Endpoints { return nil } func (APIConnServeTest) SvcIndexReverse(string) []*object.Service { return nil } func (APIConnServeTest) SvcExtIndexReverse(string) []*object.Service { return nil } -func (APIConnServeTest) Modified(bool) int64 { return int64(3) } +func (APIConnServeTest) Modified(ModifiedMode) int64 { return int64(3) } func (APIConnServeTest) PodIndex(ip string) []*object.Pod { if ip != "10.240.0.1" { @@ -790,6 +925,206 @@ func (APIConnServeTest) EpIndex(s string) []*object.Endpoints { return epsIndex[s] } +var svcImportIndex = map[string][]*object.ServiceImport{ + "kubedns.kube-system": { + { + Name: "kubedns", + Namespace: "kube-system", + Type: mcs.ClusterSetIP, + ClusterIPs: []string{"10.0.0.10"}, + Ports: []mcs.ServicePort{ + {Name: "dns", Protocol: "udp", Port: 53}, + }, + }, + }, + "svc1.testns": { + { + Name: "svc1", + Namespace: "testns", + Type: mcs.ClusterSetIP, + ClusterIPs: []string{"10.0.0.1"}, + Ports: []mcs.ServicePort{ + {Name: "http", Protocol: "tcp", Port: 80}, + }, + }, + }, + "svcempty.testns": { + { + Name: "svcempty", + Namespace: "testns", + Type: mcs.ClusterSetIP, + ClusterIPs: []string{"10.0.0.1"}, + Ports: []mcs.ServicePort{ + {Name: "http", Protocol: "tcp", Port: 80}, + }, + }, + }, + "svc6.testns": { + { + Name: "svc6", + Namespace: "testns", + Type: mcs.ClusterSetIP, + ClusterIPs: []string{"1234:abcd::1"}, + Ports: []mcs.ServicePort{ + {Name: "http", Protocol: "tcp", Port: 80}, + }, + }, + }, + "hdls1.testns": { + { + Name: "hdls1", + Namespace: "testns", + Type: mcs.Headless, + ClusterIPs: []string{}, + }, + }, + "hdlsprtls.testns": { + { + Name: "hdlsprtls", + Namespace: "testns", + Type: mcs.Headless, + }, + }, + "svc1.unexposedns": { + { + Name: "svc1", + Namespace: "unexposedns", + Type: mcs.ClusterSetIP, + ClusterIPs: []string{"10.0.0.2"}, + Ports: []mcs.ServicePort{ + {Name: "http", Protocol: "tcp", Port: 80}, + }, + }, + }, + "svc-dual-stack.testns": { + { + Name: "svc-dual-stack", + Namespace: "testns", + Type: mcs.ClusterSetIP, + ClusterIPs: []string{"10.0.0.3", "10::3"}, + Ports: []mcs.ServicePort{ + {Name: "http", Protocol: "tcp", Port: 80}, + }, + }, + }, +} + +func (APIConnServeTest) SvcImportIndex(s string) []*object.ServiceImport { return svcImportIndex[s] } + +func (APIConnServeTest) ServiceImportList() []*object.ServiceImport { + var svcs []*object.ServiceImport + for _, svc := range svcImportIndex { + svcs = append(svcs, svc...) + } + return svcs +} + +var mcEpsIndex = map[string][]*object.MultiClusterEndpoints{ + "kubedns.kube-system": {{ + Endpoints: object.Endpoints{ + Subsets: []object.EndpointSubset{ + { + Addresses: []object.EndpointAddress{ + {IP: "172.0.0.100"}, + }, + Ports: []object.EndpointPort{ + {Port: 53, Protocol: "udp", Name: "dns"}, + }, + }, + }, + Name: "kubedns", + Namespace: "kube-system", + Index: object.EndpointsKey("kubedns", "kube-system"), + }, + ClusterId: "cluster1", + }}, + "svc1.testns": {{ + Endpoints: object.Endpoints{ + Subsets: []object.EndpointSubset{ + { + Addresses: []object.EndpointAddress{ + {IP: "172.0.0.1", Hostname: "ep1a"}, + }, + Ports: []object.EndpointPort{ + {Port: 80, Protocol: "tcp", Name: "http"}, + }, + }, + }, + Name: "svc1-slice1", + Namespace: "testns", + Index: object.EndpointsKey("svc1", "testns"), + }, + ClusterId: "cluster1", + }}, + "svcempty.testns": {{ + Endpoints: object.Endpoints{ + Subsets: []object.EndpointSubset{ + { + Addresses: nil, + Ports: []object.EndpointPort{ + {Port: 80, Protocol: "tcp", Name: "http"}, + }, + }, + }, + Name: "svcempty-slice1", + Namespace: "testns", + Index: object.EndpointsKey("svcempty", "testns"), + }, + ClusterId: "cluster1", + }}, + "hdls1.testns": {{ + Endpoints: object.Endpoints{ + Subsets: []object.EndpointSubset{ + { + Addresses: []object.EndpointAddress{ + {IP: "172.0.0.2"}, + {IP: "172.0.0.3"}, + {IP: "172.0.0.4", Hostname: "dup-name"}, + {IP: "172.0.0.5", Hostname: "dup-name"}, + {IP: "5678:abcd::1"}, + {IP: "5678:abcd::2"}, + }, + Ports: []object.EndpointPort{ + {Port: 80, Protocol: "tcp", Name: "http"}, + }, + }, + }, + Name: "hdls1-slice1", + Namespace: "testns", + Index: object.EndpointsKey("hdls1", "testns"), + }, + ClusterId: "cluster1", + }}, + "hdlsprtls.testns": {{ + Endpoints: object.Endpoints{ + Subsets: []object.EndpointSubset{ + { + Addresses: []object.EndpointAddress{ + {IP: "172.0.0.20"}, + }, + Ports: []object.EndpointPort{{Port: -1}}, + }, + }, + Name: "hdlsprtls-slice1", + Namespace: "testns", + Index: object.EndpointsKey("hdlsprtls", "testns"), + }, + ClusterId: "cluster1", + }}, +} + +func (APIConnServeTest) McEpIndex(s string) []*object.MultiClusterEndpoints { + return mcEpsIndex[s] +} + +func (APIConnServeTest) MultiClusterEndpointsList() []*object.MultiClusterEndpoints { + var eps []*object.MultiClusterEndpoints + for _, ep := range mcEpsIndex { + eps = append(eps, ep...) + } + return eps +} + func (APIConnServeTest) EndpointsList() []*object.Endpoints { var eps []*object.Endpoints for _, ep := range epsIndex { diff --git a/plugin/kubernetes/kubernetes.go b/plugin/kubernetes/kubernetes.go index ad9c05008..63a85e363 100644 --- a/plugin/kubernetes/kubernetes.go +++ b/plugin/kubernetes/kubernetes.go @@ -26,6 +26,7 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" + mcsClientset "sigs.k8s.io/mcs-api/pkg/client/clientset/versioned/typed/apis/v1alpha1" ) // Kubernetes implements a plugin that connects to a Kubernetes cluster. @@ -237,6 +238,14 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (onStart func() error, o return nil, nil, fmt.Errorf("failed to create kubernetes notification controller: %q", err) } + var mcsClient mcsClientset.MulticlusterV1alpha1Interface + if len(k.opts.multiclusterZones) > 0 { + mcsClient, err = mcsClientset.NewForConfig(config) + if err != nil { + return nil, nil, fmt.Errorf("failed to create kubernetes multicluster notification controller: %q", err) + } + } + if k.opts.labelSelector != nil { var selector labels.Selector selector, err = meta.LabelSelectorAsSelector(k.opts.labelSelector) @@ -260,7 +269,7 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (onStart func() error, o k.opts.zones = k.Zones k.opts.endpointNameMode = k.endpointNameMode - k.APIConn = newdnsController(ctx, kubeClient, k.opts) + k.APIConn = newdnsController(ctx, kubeClient, mcsClient, k.opts) onStart = func() error { go func() { @@ -299,7 +308,8 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (onStart func() error, o // Records looks up services in kubernetes. func (k *Kubernetes) Records(ctx context.Context, state request.Request, exact bool) ([]msg.Service, error) { - r, e := parseRequest(state.Name(), state.Zone) + multicluster := k.isMultiClusterZone(state.Zone) + r, e := parseRequest(state.Name(), state.Zone, multicluster) if e != nil { return nil, e } @@ -320,7 +330,13 @@ func (k *Kubernetes) Records(ctx context.Context, state request.Request, exact b return pods, err } - services, err := k.findServices(r, state.Zone) + var services []msg.Service + var err error + if !multicluster { + services, err = k.findServices(r, state.Zone) + } else { + services, err = k.findMultiClusterServices(r, state.Zone) + } return services, err } @@ -518,12 +534,127 @@ func (k *Kubernetes) findServices(r recordRequest, zone string) (services []msg. return services, err } +// findMultiClusterServices returns the multicluster services matching r from the cache. +func (k *Kubernetes) findMultiClusterServices(r recordRequest, zone string) (services []msg.Service, err error) { + if !k.namespaceExposed(r.namespace) { + return nil, errNoItems + } + + // handle empty service name + if r.service == "" { + if k.namespaceExposed(r.namespace) { + // NODATA + return nil, nil + } + // NXDOMAIN + return nil, errNoItems + } + + err = errNoItems + + var ( + endpointsListFunc func() []*object.MultiClusterEndpoints + endpointsList []*object.MultiClusterEndpoints + serviceList []*object.ServiceImport + ) + + idx := object.ServiceImportKey(r.service, r.namespace) + serviceList = k.APIConn.SvcImportIndex(idx) + endpointsListFunc = func() []*object.MultiClusterEndpoints { return k.APIConn.McEpIndex(idx) } + + zonePath := msg.Path(zone, coredns) + for _, svc := range serviceList { + if !match(r.namespace, svc.Namespace) || !match(r.service, svc.Name) { + continue + } + + // If "ignore empty_service" option is set and no endpoints exist, return NXDOMAIN unless + // it's a headless or externalName service (covered below). + if k.opts.ignoreEmptyService && !svc.Headless() { // serve NXDOMAIN if no endpoint is able to answer + podsCount := 0 + for _, ep := range endpointsListFunc() { + for _, eps := range ep.Subsets { + podsCount += len(eps.Addresses) + } + } + + if podsCount == 0 { + continue + } + } + + // Endpoint query or headless service + if svc.Headless() || r.endpoint != "" { + if endpointsList == nil { + endpointsList = endpointsListFunc() + } + + for _, ep := range endpointsList { + if object.MultiClusterEndpointsKey(svc.Name, svc.Namespace) != ep.Index { + continue + } + + for _, eps := range ep.Subsets { + for _, addr := range eps.Addresses { + // See comments in parse.go parseRequest about the endpoint handling. + if r.endpoint != "" { + if !match(r.cluster, ep.ClusterId) || !match(r.endpoint, endpointHostname(addr, k.endpointNameMode)) { + continue + } + } + + for _, p := range eps.Ports { + if !(matchPortAndProtocol(r.port, p.Name, r.protocol, p.Protocol)) { + continue + } + s := msg.Service{Host: addr.IP, Port: int(p.Port), TTL: k.ttl} + s.Key = strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name, ep.ClusterId, endpointHostname(addr, k.endpointNameMode)}, "/") + + err = nil + + services = append(services, s) + } + } + } + } + continue + } + + // ClusterIP service + for _, p := range svc.Ports { + if !(matchPortAndProtocol(r.port, p.Name, r.protocol, string(p.Protocol))) { + continue + } + + err = nil + + for _, ip := range svc.ClusterIPs { + s := msg.Service{Host: ip, Port: int(p.Port), TTL: k.ttl} + s.Key = strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name}, "/") + services = append(services, s) + } + } + } + return services, err +} + // Serial return the SOA serial. -func (k *Kubernetes) Serial(state request.Request) uint32 { return uint32(k.APIConn.Modified(false)) } +func (k *Kubernetes) Serial(state request.Request) uint32 { + if !k.isMultiClusterZone(state.Zone) { + return uint32(k.APIConn.Modified(ModifiedInternal)) + } else { + return uint32(k.APIConn.Modified(ModifiedMultiCluster)) + } +} // MinTTL returns the minimal TTL. func (k *Kubernetes) MinTTL(state request.Request) uint32 { return k.ttl } +func (k *Kubernetes) isMultiClusterZone(zone string) bool { + z := plugin.Zones(k.opts.multiclusterZones).Matches(zone) + return z != "" +} + // match checks if a and b are equal. func match(a, b string) bool { return strings.EqualFold(a, b) diff --git a/plugin/kubernetes/kubernetes_test.go b/plugin/kubernetes/kubernetes_test.go index acdfd4c64..6b5a3cb76 100644 --- a/plugin/kubernetes/kubernetes_test.go +++ b/plugin/kubernetes/kubernetes_test.go @@ -3,6 +3,7 @@ package kubernetes import ( "context" "net" + "strings" "testing" "github.com/coredns/coredns/plugin" @@ -12,10 +13,11 @@ import ( "github.com/miekg/dns" api "k8s.io/api/core/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" + mcs "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" ) func TestEndpointHostname(t *testing.T) { - var tests = []struct { + tests := []struct { ip string hostname string expected string @@ -46,7 +48,7 @@ func (APIConnServiceTest) PodIndex(string) []*object.Pod { return func (APIConnServiceTest) SvcIndexReverse(string) []*object.Service { return nil } func (APIConnServiceTest) SvcExtIndexReverse(string) []*object.Service { return nil } func (APIConnServiceTest) EpIndexReverse(string) []*object.Endpoints { return nil } -func (APIConnServiceTest) Modified(bool) int64 { return 0 } +func (APIConnServiceTest) Modified(ModifiedMode) int64 { return 0 } func (APIConnServiceTest) SvcIndex(string) []*object.Service { svcs := []*object.Service{ @@ -225,6 +227,107 @@ func (APIConnServiceTest) EndpointsList() []*object.Endpoints { return eps } +func (APIConnServiceTest) SvcImportIndex(string) []*object.ServiceImport { + svcs := []*object.ServiceImport{ + { + Name: "svc1", + Namespace: "testns", + Type: mcs.ClusterSetIP, + ClusterIPs: []string{"10.0.0.1"}, + Ports: []mcs.ServicePort{ + {Name: "http", Protocol: "tcp", Port: 80}, + }, + }, + { + Name: "svc-dual-stack", + Namespace: "testns", + Type: mcs.ClusterSetIP, + ClusterIPs: []string{"10.0.0.2", "10::2"}, + Ports: []mcs.ServicePort{ + {Name: "http", Protocol: "tcp", Port: 80}, + }, + }, + { + Name: "hdls1", + Namespace: "testns", + Type: mcs.Headless, + ClusterIPs: []string{}, + }, + } + return svcs +} + +func (APIConnServiceTest) ServiceImportList() []*object.ServiceImport { + svcs := []*object.ServiceImport{ + { + Name: "", + Namespace: "testns", + Type: mcs.ClusterSetIP, + ClusterIPs: []string{"10.0.0.1"}, + Ports: []mcs.ServicePort{ + {Name: "http", Protocol: "tcp", Port: 80}, + }, + }, + { + Name: "svc-dual-stack", + Namespace: "testns", + Type: mcs.ClusterSetIP, + ClusterIPs: []string{"10.0.0.2", "10::2"}, + Ports: []mcs.ServicePort{ + {Name: "http", Protocol: "tcp", Port: 80}, + }, + }, + { + Name: "hdls1", + Namespace: "testns", + Type: mcs.Headless, + }, + } + return svcs +} + +func (APIConnServiceTest) McEpIndex(string) []*object.MultiClusterEndpoints { + eps := []*object.MultiClusterEndpoints{ + { + Endpoints: object.Endpoints{ + Subsets: []object.EndpointSubset{ + { + Addresses: []object.EndpointAddress{ + {IP: "172.0.0.1", Hostname: "ep1a"}, + }, + Ports: []object.EndpointPort{ + {Port: 80, Protocol: "tcp", Name: "http"}, + }, + }, + }, + Name: "svc1-slice1", + Namespace: "testns", + Index: object.EndpointsKey("svc1", "testns"), + }, + ClusterId: "cluster1", + }, + { + Endpoints: object.Endpoints{ + Subsets: []object.EndpointSubset{ + { + Addresses: []object.EndpointAddress{ + {IP: "172.0.0.2"}, + }, + Ports: []object.EndpointPort{ + {Port: 80, Protocol: "tcp", Name: "http"}, + }, + }, + }, + Name: "hdls1-slice1", + Namespace: "testns", + Index: object.EndpointsKey("hdls1", "testns"), + }, + ClusterId: "cluster1", + }, + } + return eps +} + func (APIConnServiceTest) GetNodeByName(ctx context.Context, name string) (*api.Node, error) { return &api.Node{ ObjectMeta: meta.ObjectMeta{ @@ -240,7 +343,8 @@ func (APIConnServiceTest) GetNamespaceByName(name string) (*object.Namespace, er } func TestServices(t *testing.T) { - k := New([]string{"interwebs.test."}) + k := New([]string{"interwebs.test.", "clusterset.test."}) + k.opts.multiclusterZones = []string{"clusterset.test."} k.APIConn = &APIConnServiceTest{} type svcAns struct { @@ -273,12 +377,34 @@ func TestServices(t *testing.T) { // Headless Services {qname: "hdls1.testns.svc.interwebs.test.", qtype: dns.TypeA, answer: []svcAns{{host: "172.0.0.2", key: "/" + coredns + "/test/interwebs/svc/testns/hdls1/172-0-0-2"}}}, + + // ClusterSet MultiCluster IP Services + {qname: "svc1.testns.svc.clusterset.test.", qtype: dns.TypeA, answer: []svcAns{{host: "10.0.0.1", key: "/" + coredns + "/test/clusterset/svc/testns/svc1"}}}, + {qname: "_http._tcp.svc1.testns.svc.clusterset.test.", qtype: dns.TypeSRV, answer: []svcAns{{host: "10.0.0.1", key: "/" + coredns + "/test/clusterset/svc/testns/svc1"}}}, + {qname: "ep1a.cluster1.svc1.testns.svc.clusterset.test.", qtype: dns.TypeA, answer: []svcAns{{host: "172.0.0.1", key: "/" + coredns + "/test/clusterset/svc/testns/svc1/cluster1/ep1a"}}}, + + // Dual-Stack ClusterSet MultiCluster IP Service + { + qname: "_http._tcp.svc-dual-stack.testns.svc.clusterset.test.", + qtype: dns.TypeSRV, + answer: []svcAns{ + {host: "10.0.0.2", key: "/" + coredns + "/test/clusterset/svc/testns/svc-dual-stack"}, + {host: "10::2", key: "/" + coredns + "/test/clusterset/svc/testns/svc-dual-stack"}, + }, + }, + + // Headless MultiCluster Services + {qname: "hdls1.testns.svc.clusterset.test.", qtype: dns.TypeA, answer: []svcAns{{host: "172.0.0.2", key: "/" + coredns + "/test/clusterset/svc/testns/hdls1/cluster1/172-0-0-2"}}}, } for i, test := range tests { + zone := "interwebs.test." + if strings.Contains(test.qname, "clusterset.test") { + zone = "clusterset.test." + } state := request.Request{ Req: &dns.Msg{Question: []dns.Question{{Name: test.qname, Qtype: test.qtype}}}, - Zone: "interwebs.test.", // must match from k.Zones[0] + Zone: zone, } svcs, e := k.Services(context.TODO(), state, false, plugin.Options{}) if e != nil { diff --git a/plugin/kubernetes/metadata.go b/plugin/kubernetes/metadata.go index ed651e4ed..21858f465 100644 --- a/plugin/kubernetes/metadata.go +++ b/plugin/kubernetes/metadata.go @@ -32,8 +32,12 @@ func (k *Kubernetes) Metadata(ctx context.Context, state request.Request) contex if zone == "" { return ctx } + multicluster := false + if z := plugin.Zones(k.opts.multiclusterZones).Matches(state.Zone); z != "" { + multicluster = true + } // possible optimization: cache r so it doesn't need to be calculated again in ServeDNS - r, err := parseRequest(state.Name(), zone) + r, err := parseRequest(state.Name(), zone, multicluster) if err != nil { metadata.SetValueFunc(ctx, "kubernetes/parse-error", func() string { return err.Error() @@ -53,6 +57,12 @@ func (k *Kubernetes) Metadata(ctx context.Context, state request.Request) contex return r.endpoint }) + if multicluster { + metadata.SetValueFunc(ctx, "kubernetes/cluster", func() string { + return r.cluster + }) + } + metadata.SetValueFunc(ctx, "kubernetes/service", func() string { return r.service }) diff --git a/plugin/kubernetes/metadata_test.go b/plugin/kubernetes/metadata_test.go index 1f47bd542..f65602cfc 100644 --- a/plugin/kubernetes/metadata_test.go +++ b/plugin/kubernetes/metadata_test.go @@ -2,6 +2,7 @@ package kubernetes import ( "context" + "strings" "testing" "github.com/coredns/coredns/plugin/metadata" @@ -81,6 +82,19 @@ var metadataCases = []struct { "kubernetes/service": "s", }, }, + { + Qname: "ep.c1.s.ns.svc.clusterset.local.", Qtype: dns.TypeA, + RemoteIP: "10.10.10.10", + Md: map[string]string{ + "kubernetes/cluster": "c1", + "kubernetes/endpoint": "ep", + "kubernetes/kind": "svc", + "kubernetes/namespace": "ns", + "kubernetes/port-name": "", + "kubernetes/protocol": "", + "kubernetes/service": "s", + }, + }, { Qname: "example.com.", Qtype: dns.TypeA, RemoteIP: "10.10.10.10", @@ -103,14 +117,19 @@ func mapsDiffer(a, b map[string]string) bool { } func TestMetadata(t *testing.T) { - k := New([]string{"cluster.local."}) + k := New([]string{"cluster.local.", "clusterset.local."}) + k.opts.multiclusterZones = []string{"clusterset.local."} k.APIConn = &APIConnServeTest{} for i, tc := range metadataCases { ctx := metadata.ContextWithMetadata(context.Background()) + zone := "." + if strings.Contains(tc.Qname, "clusterset.local") { + zone = "clusterset.local." + } state := request.Request{ Req: &dns.Msg{Question: []dns.Question{{Name: tc.Qname, Qtype: tc.Qtype}}}, - Zone: ".", + Zone: zone, W: &test.ResponseWriter{RemoteIP: tc.RemoteIP}, } diff --git a/plugin/kubernetes/ns_test.go b/plugin/kubernetes/ns_test.go index bdf326e52..819301a62 100644 --- a/plugin/kubernetes/ns_test.go +++ b/plugin/kubernetes/ns_test.go @@ -14,15 +14,18 @@ import ( type APIConnTest struct{} -func (APIConnTest) HasSynced() bool { return true } -func (APIConnTest) Run() {} -func (APIConnTest) Stop() error { return nil } -func (APIConnTest) PodIndex(string) []*object.Pod { return nil } -func (APIConnTest) SvcIndexReverse(string) []*object.Service { return nil } -func (APIConnTest) SvcExtIndexReverse(string) []*object.Service { return nil } -func (APIConnTest) EpIndex(string) []*object.Endpoints { return nil } -func (APIConnTest) EndpointsList() []*object.Endpoints { return nil } -func (APIConnTest) Modified(bool) int64 { return 0 } +func (APIConnTest) HasSynced() bool { return true } +func (APIConnTest) Run() {} +func (APIConnTest) Stop() error { return nil } +func (APIConnTest) PodIndex(string) []*object.Pod { return nil } +func (APIConnTest) SvcIndexReverse(string) []*object.Service { return nil } +func (APIConnTest) SvcExtIndexReverse(string) []*object.Service { return nil } +func (APIConnTest) ServiceImportList() []*object.ServiceImport { return nil } +func (APIConnTest) SvcImportIndex(string) []*object.ServiceImport { return nil } +func (APIConnTest) EpIndex(string) []*object.Endpoints { return nil } +func (APIConnTest) McEpIndex(string) []*object.MultiClusterEndpoints { return nil } +func (APIConnTest) EndpointsList() []*object.Endpoints { return nil } +func (APIConnTest) Modified(ModifiedMode) int64 { return 0 } func (a APIConnTest) SvcIndex(s string) []*object.Service { switch s { @@ -94,6 +97,7 @@ func (APIConnTest) EpIndexReverse(ip string) []*object.Endpoints { func (APIConnTest) GetNodeByName(ctx context.Context, name string) (*api.Node, error) { return &api.Node{}, nil } + func (APIConnTest) GetNamespaceByName(name string) (*object.Namespace, error) { return nil, fmt.Errorf("namespace not found") } diff --git a/plugin/kubernetes/object/multicluster_endpoint.go b/plugin/kubernetes/object/multicluster_endpoint.go new file mode 100644 index 000000000..c4a3d6d56 --- /dev/null +++ b/plugin/kubernetes/object/multicluster_endpoint.go @@ -0,0 +1,64 @@ +package object + +import ( + "maps" + + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + mcs "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" +) + +// Endpoints is a stripped down api.Endpoints with only the items we need for CoreDNS. +type MultiClusterEndpoints struct { + Endpoints + ClusterId string + *Empty +} + +// MultiClusterEndpointsKey returns a string using for the index. +func MultiClusterEndpointsKey(name, namespace string) string { return name + "." + namespace } + +// EndpointSliceToEndpoints converts a *discovery.EndpointSlice to a *Endpoints. +func EndpointSliceToMultiClusterEndpoints(obj meta.Object) (meta.Object, error) { + labels := maps.Clone(obj.GetLabels()) + ends, err := EndpointSliceToEndpoints(obj) + if err != nil { + return nil, err + } + e := &MultiClusterEndpoints{ + Endpoints: *ends.(*Endpoints), + ClusterId: labels[mcs.LabelSourceCluster], + } + e.Index = MultiClusterEndpointsKey(labels[mcs.LabelServiceName], ends.GetNamespace()) + + return e, nil +} + +var _ runtime.Object = &Endpoints{} + +// DeepCopyObject implements the ObjectKind interface. +func (e *MultiClusterEndpoints) DeepCopyObject() runtime.Object { + e1 := &MultiClusterEndpoints{ + ClusterId: e.ClusterId, + Endpoints: *e.Endpoints.DeepCopyObject().(*Endpoints), + } + return e1 +} + +// GetNamespace implements the metav1.Object interface. +func (e *MultiClusterEndpoints) GetNamespace() string { return e.Endpoints.GetNamespace() } + +// SetNamespace implements the metav1.Object interface. +func (e *MultiClusterEndpoints) SetNamespace(namespace string) {} + +// GetName implements the metav1.Object interface. +func (e *MultiClusterEndpoints) GetName() string { return e.Endpoints.GetName() } + +// SetName implements the metav1.Object interface. +func (e *MultiClusterEndpoints) SetName(name string) {} + +// GetResourceVersion implements the metav1.Object interface. +func (e *MultiClusterEndpoints) GetResourceVersion() string { return e.Endpoints.GetResourceVersion() } + +// SetResourceVersion implements the metav1.Object interface. +func (e *MultiClusterEndpoints) SetResourceVersion(version string) {} diff --git a/plugin/kubernetes/object/serviceimport.go b/plugin/kubernetes/object/serviceimport.go new file mode 100644 index 000000000..9fe276fe8 --- /dev/null +++ b/plugin/kubernetes/object/serviceimport.go @@ -0,0 +1,95 @@ +package object + +import ( + "fmt" + + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + mcs "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" +) + +// ServiceImport is a stripped down api.ServiceImport with only the items we need for CoreDNS. +type ServiceImport struct { + Version string + Name string + Namespace string + Index string + ClusterIPs []string + Type mcs.ServiceImportType + Ports []mcs.ServicePort + + *Empty +} + +// ServiceImportKey returns a string using for the index. +func ServiceImportKey(name, namespace string) string { return name + "." + namespace } + +// ToServiceImport converts an v1alpha1.ServiceImport to a *ServiceImport. +func ToServiceImport(obj meta.Object) (meta.Object, error) { + svc, ok := obj.(*mcs.ServiceImport) + + if !ok { + return nil, fmt.Errorf("unexpected object %v", obj) + } + s := &ServiceImport{ + Version: svc.GetResourceVersion(), + Name: svc.GetName(), + Namespace: svc.GetNamespace(), + Index: ServiceImportKey(svc.GetName(), svc.GetNamespace()), + Type: svc.Spec.Type, + } + + if len(svc.Spec.IPs) > 0 { + s.ClusterIPs = make([]string, len(svc.Spec.IPs)) + copy(s.ClusterIPs, svc.Spec.IPs) + } + + if len(svc.Spec.Ports) > 0 { + s.Ports = make([]mcs.ServicePort, len(svc.Spec.Ports)) + copy(s.Ports, svc.Spec.Ports) + } + + *svc = mcs.ServiceImport{} + return s, nil +} + +var _ runtime.Object = &ServiceImport{} + +// Headless returns true if the service is headless +func (s *ServiceImport) Headless() bool { + return s.Type == mcs.Headless +} + +// DeepCopyObject implements the ObjectKind interface. +func (s *ServiceImport) DeepCopyObject() runtime.Object { + s1 := &ServiceImport{ + Version: s.Version, + Name: s.Name, + Namespace: s.Namespace, + Index: s.Index, + Type: s.Type, + ClusterIPs: make([]string, len(s.ClusterIPs)), + Ports: make([]mcs.ServicePort, len(s.Ports)), + } + copy(s1.ClusterIPs, s.ClusterIPs) + copy(s1.Ports, s.Ports) + return s1 +} + +// GetNamespace implements the metav1.Object interface. +func (s *ServiceImport) GetNamespace() string { return s.Namespace } + +// SetNamespace implements the metav1.Object interface. +func (s *ServiceImport) SetNamespace(namespace string) {} + +// GetName implements the metav1.Object interface. +func (s *ServiceImport) GetName() string { return s.Name } + +// SetName implements the metav1.Object interface. +func (s *ServiceImport) SetName(name string) {} + +// GetResourceVersion implements the metav1.Object interface. +func (s *ServiceImport) GetResourceVersion() string { return s.Version } + +// SetResourceVersion implements the metav1.Object interface. +func (s *ServiceImport) SetResourceVersion(version string) {} diff --git a/plugin/kubernetes/parse.go b/plugin/kubernetes/parse.go index 4690c814b..f87a64e03 100644 --- a/plugin/kubernetes/parse.go +++ b/plugin/kubernetes/parse.go @@ -1,6 +1,8 @@ package kubernetes import ( + "strings" + "github.com/coredns/coredns/plugin/pkg/dnsutil" "github.com/miekg/dns" @@ -14,6 +16,7 @@ type recordRequest struct { // SRV record. protocol string endpoint string + cluster string // The servicename used in Kubernetes. service string // The namespace used in Kubernetes. @@ -25,11 +28,12 @@ type recordRequest struct { // parseRequest parses the qname to find all the elements we need for querying k8s. Anything // that is not parsed will have the wildcard "*" value (except r.endpoint). // Potential underscores are stripped from _port and _protocol. -func parseRequest(name, zone string) (r recordRequest, err error) { - // 3 Possible cases: +func parseRequest(name, zone string, multicluster bool) (r recordRequest, err error) { + // 4 Possible cases: // 1. _port._protocol.service.namespace.pod|svc.zone // 2. (endpoint): endpoint.service.namespace.pod|svc.zone // 3. (service): service.namespace.pod|svc.zone + // 4. (endpoint multicluster): endpoint.cluster.service.namespace.pod|svc.zone base, _ := dnsutil.TrimZone(name, zone) // return NODATA for apex queries @@ -63,14 +67,19 @@ func parseRequest(name, zone string) (r recordRequest, err error) { return r, nil } - // Because of ambiguity we check the labels left: 1: an endpoint. 2: port and protocol. + // Because of ambiguity we check the labels left: 1: an endpoint. 2: port and protocol or endpoint and clusterid. // Anything else is a query that is too long to answer and can safely be delegated to return an nxdomain. switch last { case 0: // endpoint only r.endpoint = segs[last] - case 1: // service and port - r.protocol = stripUnderscore(segs[last]) - r.port = stripUnderscore(segs[last-1]) + case 1: // service and port or endpoint and clusterid + if !multicluster || strings.HasPrefix(segs[last], "_") || strings.HasPrefix(segs[last-1], "_") { + r.protocol = stripUnderscore(segs[last]) + r.port = stripUnderscore(segs[last-1]) + } else { + r.cluster = segs[last] + r.endpoint = segs[last-1] + } default: // too long return r, errInvalidRequest @@ -96,6 +105,7 @@ func (r recordRequest) String() string { s := r.port s += "." + r.protocol s += "." + r.endpoint + s += "." + r.cluster s += "." + r.service s += "." + r.namespace s += "." + r.podOrSvc diff --git a/plugin/kubernetes/parse_test.go b/plugin/kubernetes/parse_test.go index 739a405ff..a19b1d4a1 100644 --- a/plugin/kubernetes/parse_test.go +++ b/plugin/kubernetes/parse_test.go @@ -10,28 +10,31 @@ import ( func TestParseRequest(t *testing.T) { tests := []struct { - query string - expected string // output from r.String() + query string + expected string // output from r.String() + multicluster bool }{ // valid SRV request - {"_http._tcp.webs.mynamespace.svc.inter.webs.tests.", "http.tcp..webs.mynamespace.svc"}, + {"_http._tcp.webs.mynamespace.svc.inter.webs.tests.", "http.tcp...webs.mynamespace.svc", false}, // A request of endpoint - {"1-2-3-4.webs.mynamespace.svc.inter.webs.tests.", "..1-2-3-4.webs.mynamespace.svc"}, + {"1-2-3-4.webs.mynamespace.svc.inter.webs.tests.", "..1-2-3-4..webs.mynamespace.svc", false}, // bare zone - {"inter.webs.tests.", "....."}, + {"inter.webs.tests.", "......", false}, // bare svc type - {"svc.inter.webs.tests.", "....."}, + {"svc.inter.webs.tests.", "......", false}, // bare pod type - {"pod.inter.webs.tests.", "....."}, + {"pod.inter.webs.tests.", "......", false}, // SRV request with empty segments - {"..webs.mynamespace.svc.inter.webs.tests.", "...webs.mynamespace.svc"}, + {"..webs.mynamespace.svc.inter.webs.tests.", "....webs.mynamespace.svc", false}, + // A multicluster request with a clusterid + {"1-2-3-4.cluster1.webs.mynamespace.svc.inter.webs.tests.", "..1-2-3-4.cluster1.webs.mynamespace.svc", true}, } for i, tc := range tests { m := new(dns.Msg) m.SetQuestion(tc.query, dns.TypeA) state := request.Request{Zone: zone, Req: m} - r, e := parseRequest(state.Name(), state.Zone) + r, e := parseRequest(state.Name(), state.Zone, tc.multicluster) if e != nil { t.Errorf("Test %d, expected no error, got '%v'.", i, e) } @@ -53,7 +56,7 @@ func TestParseInvalidRequest(t *testing.T) { m.SetQuestion(query, dns.TypeA) state := request.Request{Zone: zone, Req: m} - if _, e := parseRequest(state.Name(), state.Zone); e == nil { + if _, e := parseRequest(state.Name(), state.Zone, false); e == nil { t.Errorf("Test %d: expected error from %s, got none", i, query) } } diff --git a/plugin/kubernetes/reverse_test.go b/plugin/kubernetes/reverse_test.go index 370b9f9b5..9508aadd4 100644 --- a/plugin/kubernetes/reverse_test.go +++ b/plugin/kubernetes/reverse_test.go @@ -15,15 +15,18 @@ import ( type APIConnReverseTest struct{} -func (APIConnReverseTest) HasSynced() bool { return true } -func (APIConnReverseTest) Run() {} -func (APIConnReverseTest) Stop() error { return nil } -func (APIConnReverseTest) PodIndex(string) []*object.Pod { return nil } -func (APIConnReverseTest) EpIndex(string) []*object.Endpoints { return nil } -func (APIConnReverseTest) EndpointsList() []*object.Endpoints { return nil } -func (APIConnReverseTest) ServiceList() []*object.Service { return nil } -func (APIConnReverseTest) SvcExtIndexReverse(string) []*object.Service { return nil } -func (APIConnReverseTest) Modified(bool) int64 { return 0 } +func (APIConnReverseTest) HasSynced() bool { return true } +func (APIConnReverseTest) Run() {} +func (APIConnReverseTest) Stop() error { return nil } +func (APIConnReverseTest) PodIndex(string) []*object.Pod { return nil } +func (APIConnReverseTest) EpIndex(string) []*object.Endpoints { return nil } +func (APIConnReverseTest) McEpIndex(string) []*object.MultiClusterEndpoints { return nil } +func (APIConnReverseTest) EndpointsList() []*object.Endpoints { return nil } +func (APIConnReverseTest) ServiceList() []*object.Service { return nil } +func (APIConnReverseTest) ServiceImportList() []*object.ServiceImport { return nil } +func (APIConnReverseTest) SvcImportIndex(string) []*object.ServiceImport { return nil } +func (APIConnReverseTest) SvcExtIndexReverse(string) []*object.Service { return nil } +func (APIConnReverseTest) Modified(ModifiedMode) int64 { return 0 } func (APIConnReverseTest) SvcIndex(svc string) []*object.Service { if svc != "svc1.testns" { diff --git a/plugin/kubernetes/setup.go b/plugin/kubernetes/setup.go index 0b988a9a0..36f65b795 100644 --- a/plugin/kubernetes/setup.go +++ b/plugin/kubernetes/setup.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "slices" "strconv" "strings" @@ -229,6 +230,8 @@ func ParseStanza(c *caddy.Controller) (*Kubernetes, error) { overrides, ) k8s.ClientConfig = config + case "multicluster": + k8s.opts.multiclusterZones = plugin.OriginsFromArgsOrServerBlock(c.RemainingArgs(), []string{}) default: return nil, c.Errf("unknown property '%s'", c.Val()) } @@ -238,6 +241,13 @@ func ParseStanza(c *caddy.Controller) (*Kubernetes, error) { return nil, c.Errf("namespaces and namespace_labels cannot both be set") } + for _, multiclusterZone := range k8s.opts.multiclusterZones { + if !slices.Contains(k8s.Zones, multiclusterZone) { + fmt.Println(k8s.Zones) + return nil, c.Errf("is not authoritative for the multicluster zone %s", multiclusterZone) + } + } + return k8s, nil } diff --git a/plugin/kubernetes/setup_test.go b/plugin/kubernetes/setup_test.go index 52b0d3fb2..ffcc4b912 100644 --- a/plugin/kubernetes/setup_test.go +++ b/plugin/kubernetes/setup_test.go @@ -1,6 +1,7 @@ package kubernetes import ( + "slices" "strings" "testing" @@ -610,3 +611,74 @@ func TestKubernetesParseIgnoreEmptyService(t *testing.T) { } } } + +func TestKubernetesParseMulticluster(t *testing.T) { + tests := []struct { + input string // Corefile data as string + shouldErr bool // true if test case is expected to produce an error. + expectedErrContent string // substring from the expected error. Empty for positive cases. + expectedMulticlusterZones []string + }{ + // valid + { + `kubernetes coredns.local clusterset.local { + multicluster clusterset.local +}`, + false, + "", + []string{"clusterset.local."}, + }, + // invalid + { + `kubernetes coredns.local { + multicluster clusterset.local +}`, + true, + "Error during parsing: is not authoritative for the multicluster zone clusterset.local.", + []string{"clusterset.local."}, + }, + { + `kubernetes coredns.local clusterset.local { + multicluster clusterset.local test.local +}`, + true, + "Error during parsing: is not authoritative for the multicluster zone test.local.", + []string{"clusterset.local.", "test.local."}, + }, + // not set + { + `kubernetes coredns.local { + multicluster +}`, + false, + "", + []string{}, + }, + } + + for i, test := range tests { + c := caddy.NewTestController("dns", test.input) + k8sController, err := kubernetesParse(c) + + if test.shouldErr && err == nil { + t.Errorf("Test %d: Expected error, but did not find error for input '%s'. Error was: '%v'", i, test.input, err) + } + + if err != nil { + if !test.shouldErr { + t.Errorf("Test %d: Expected no error but found one for input %s. Error was: %v", i, test.input, err) + continue + } + + if !strings.Contains(err.Error(), test.expectedErrContent) { + t.Errorf("Test %d: Expected error to contain: %v, found error: %v, input: %s", i, test.expectedErrContent, err, test.input) + } + continue + } + + foundMulticlusterZones := k8sController.opts.multiclusterZones + if !slices.Equal(foundMulticlusterZones, test.expectedMulticlusterZones) { + t.Errorf("Test %d: Expected kubernetes controller to be initialized with multicluster '%v'. Instead found multicluster '%v' for input '%s'", i, test.expectedMulticlusterZones, foundMulticlusterZones, test.input) + } + } +} diff --git a/plugin/kubernetes/xfr.go b/plugin/kubernetes/xfr.go index 6a5faff2c..9d064caaa 100644 --- a/plugin/kubernetes/xfr.go +++ b/plugin/kubernetes/xfr.go @@ -29,9 +29,7 @@ func (k *Kubernetes) Transfer(zone string, serial uint32) (<-chan []dns.RR, erro } ch := make(chan []dns.RR) - zonePath := msg.Path(zone, "coredns") - serviceList := k.APIConn.ServiceList() go func() { // ixfr fallback @@ -41,7 +39,6 @@ func (k *Kubernetes) Transfer(zone string, serial uint32) (<-chan []dns.RR, erro return } ch <- soa - nsAddrs := k.nsAddrs(false, false, zone) nsHosts := make(map[string]struct{}) for _, nsAddr := range nsAddrs { @@ -51,98 +48,191 @@ func (k *Kubernetes) Transfer(zone string, serial uint32) (<-chan []dns.RR, erro ch <- []dns.RR{&dns.NS{Hdr: dns.RR_Header{Name: zone, Rrtype: dns.TypeNS, Class: dns.ClassINET, Ttl: k.ttl}, Ns: nsHost}} } ch <- nsAddrs - } - sort.Slice(serviceList, func(i, j int) bool { - return serviceList[i].Name < serviceList[j].Name - }) - - for _, svc := range serviceList { - if !k.namespaceExposed(svc.Namespace) { - continue + if !k.isMultiClusterZone(zone) { + k.transferServices(ch, zonePath) + } else { + k.transferMultiClusterServices(ch, zonePath) } - svcBase := []string{zonePath, Svc, svc.Namespace, svc.Name} - switch svc.Type { - case api.ServiceTypeClusterIP, api.ServiceTypeNodePort, api.ServiceTypeLoadBalancer: - clusterIP := net.ParseIP(svc.ClusterIPs[0]) - if clusterIP != nil { - var host string - for _, ip := range svc.ClusterIPs { - s := msg.Service{Host: ip, TTL: k.ttl} - s.Key = strings.Join(svcBase, "/") - // Change host from IP to Name for SRV records - host = emitAddressRecord(ch, s) + ch <- soa + close(ch) + } + }() + return ch, nil +} + +func (k *Kubernetes) transferServices(ch chan []dns.RR, zonePath string) { + serviceList := k.APIConn.ServiceList() + sort.Slice(serviceList, func(i, j int) bool { + return serviceList[i].Name < serviceList[j].Name + }) + + for _, svc := range serviceList { + if !k.namespaceExposed(svc.Namespace) { + continue + } + svcBase := []string{zonePath, Svc, svc.Namespace, svc.Name} + switch svc.Type { + case api.ServiceTypeClusterIP, api.ServiceTypeNodePort, api.ServiceTypeLoadBalancer: + clusterIP := net.ParseIP(svc.ClusterIPs[0]) + if clusterIP != nil { + var host string + for _, ip := range svc.ClusterIPs { + s := msg.Service{Host: ip, TTL: k.ttl} + s.Key = strings.Join(svcBase, "/") + + // Change host from IP to Name for SRV records + host = emitAddressRecord(ch, s) + } + + for _, p := range svc.Ports { + s := msg.Service{Host: host, Port: int(p.Port), TTL: k.ttl} + s.Key = strings.Join(svcBase, "/") + + // Need to generate this to handle use cases for peer-finder + // ref: https://github.com/coredns/coredns/pull/823 + ch <- []dns.RR{s.NewSRV(msg.Domain(s.Key), 100)} + + // As per spec unnamed ports do not have a srv record + // https://github.com/kubernetes/dns/blob/master/docs/specification.md#232---srv-records + if p.Name == "" { + continue } - for _, p := range svc.Ports { - s := msg.Service{Host: host, Port: int(p.Port), TTL: k.ttl} + s.Key = strings.Join(append(svcBase, strings.ToLower("_"+string(p.Protocol)), strings.ToLower("_"+p.Name)), "/") + + ch <- []dns.RR{s.NewSRV(msg.Domain(s.Key), 100)} + } + + // Skip endpoint discovery if clusterIP is defined + continue + } + + endpointsList := k.APIConn.EpIndex(svc.Name + "." + svc.Namespace) + + for _, ep := range endpointsList { + for _, eps := range ep.Subsets { + srvWeight := calcSRVWeight(len(eps.Addresses)) + for _, addr := range eps.Addresses { + s := msg.Service{Host: addr.IP, TTL: k.ttl} s.Key = strings.Join(svcBase, "/") + // We don't need to change the msg.Service host from IP to Name yet + // so disregard the return value here + emitAddressRecord(ch, s) - // Need to generate this to handle use cases for peer-finder - // ref: https://github.com/coredns/coredns/pull/823 - ch <- []dns.RR{s.NewSRV(msg.Domain(s.Key), 100)} + s.Key = strings.Join(append(svcBase, endpointHostname(addr, k.endpointNameMode)), "/") + // Change host from IP to Name for SRV records + host := emitAddressRecord(ch, s) + s.Host = host + for _, p := range eps.Ports { + // As per spec unnamed ports do not have a srv record + // https://github.com/kubernetes/dns/blob/master/docs/specification.md#232---srv-records + if p.Name == "" { + continue + } + + s.Port = int(p.Port) + + s.Key = strings.Join(append(svcBase, strings.ToLower("_"+p.Protocol), strings.ToLower("_"+p.Name)), "/") + ch <- []dns.RR{s.NewSRV(msg.Domain(s.Key), srvWeight)} + } + } + } + } + + case api.ServiceTypeExternalName: + + s := msg.Service{Key: strings.Join(svcBase, "/"), Host: svc.ExternalName, TTL: k.ttl} + if t, _ := s.HostType(); t == dns.TypeCNAME { + ch <- []dns.RR{s.NewCNAME(msg.Domain(s.Key), s.Host)} + } + } + } +} + +func (k *Kubernetes) transferMultiClusterServices(ch chan []dns.RR, zonePath string) { + serviceImportList := k.APIConn.ServiceImportList() + sort.Slice(serviceImportList, func(i, j int) bool { + return serviceImportList[i].Name < serviceImportList[j].Name + }) + + for _, svcImport := range serviceImportList { + if !k.namespaceExposed(svcImport.Namespace) { + continue + } + svcBase := []string{zonePath, Svc, svcImport.Namespace, svcImport.Name} + var clusterIP net.IP + if len(svcImport.ClusterIPs) > 0 { + clusterIP = net.ParseIP(svcImport.ClusterIPs[0]) + } + if clusterIP != nil { + var host string + for _, ip := range svcImport.ClusterIPs { + s := msg.Service{Host: ip, TTL: k.ttl} + s.Key = strings.Join(svcBase, "/") + + // Change host from IP to Name for SRV records + host = emitAddressRecord(ch, s) + } + + for _, p := range svcImport.Ports { + s := msg.Service{Host: host, Port: int(p.Port), TTL: k.ttl} + s.Key = strings.Join(svcBase, "/") + + // Need to generate this to handle use cases for peer-finder + // ref: https://github.com/coredns/coredns/pull/823 + ch <- []dns.RR{s.NewSRV(msg.Domain(s.Key), 100)} + + // As per spec unnamed ports do not have a srv record + // https://github.com/kubernetes/dns/blob/master/docs/specification.md#232---srv-records + if p.Name == "" { + continue + } + + s.Key = strings.Join(append(svcBase, strings.ToLower("_"+string(p.Protocol)), strings.ToLower("_"+p.Name)), "/") + + ch <- []dns.RR{s.NewSRV(msg.Domain(s.Key), 100)} + } + + // Skip endpoint discovery if clusterIP is defined + continue + } + + endpointsList := k.APIConn.McEpIndex(svcImport.Name + "." + svcImport.Namespace) + + for _, ep := range endpointsList { + for _, eps := range ep.Subsets { + srvWeight := calcSRVWeight(len(eps.Addresses)) + for _, addr := range eps.Addresses { + s := msg.Service{Host: addr.IP, TTL: k.ttl} + s.Key = strings.Join(svcBase, "/") + // We don't need to change the msg.Service host from IP to Name yet + // so disregard the return value here + emitAddressRecord(ch, s) + + s.Key = strings.Join(append(svcBase, endpointHostname(addr, k.endpointNameMode)), "/") + // Change host from IP to Name for SRV records + host := emitAddressRecord(ch, s) + s.Host = host + + for _, p := range eps.Ports { // As per spec unnamed ports do not have a srv record // https://github.com/kubernetes/dns/blob/master/docs/specification.md#232---srv-records if p.Name == "" { continue } - s.Key = strings.Join(append(svcBase, strings.ToLower("_"+string(p.Protocol)), strings.ToLower("_"+p.Name)), "/") + s.Port = int(p.Port) - ch <- []dns.RR{s.NewSRV(msg.Domain(s.Key), 100)} + s.Key = strings.Join(append(svcBase, strings.ToLower("_"+p.Protocol), strings.ToLower("_"+p.Name)), "/") + ch <- []dns.RR{s.NewSRV(msg.Domain(s.Key), srvWeight)} } - - // Skip endpoint discovery if clusterIP is defined - continue - } - - endpointsList := k.APIConn.EpIndex(svc.Name + "." + svc.Namespace) - - for _, ep := range endpointsList { - for _, eps := range ep.Subsets { - srvWeight := calcSRVWeight(len(eps.Addresses)) - for _, addr := range eps.Addresses { - s := msg.Service{Host: addr.IP, TTL: k.ttl} - s.Key = strings.Join(svcBase, "/") - // We don't need to change the msg.Service host from IP to Name yet - // so disregard the return value here - emitAddressRecord(ch, s) - - s.Key = strings.Join(append(svcBase, endpointHostname(addr, k.endpointNameMode)), "/") - // Change host from IP to Name for SRV records - host := emitAddressRecord(ch, s) - s.Host = host - - for _, p := range eps.Ports { - // As per spec unnamed ports do not have a srv record - // https://github.com/kubernetes/dns/blob/master/docs/specification.md#232---srv-records - if p.Name == "" { - continue - } - - s.Port = int(p.Port) - - s.Key = strings.Join(append(svcBase, strings.ToLower("_"+p.Protocol), strings.ToLower("_"+p.Name)), "/") - ch <- []dns.RR{s.NewSRV(msg.Domain(s.Key), srvWeight)} - } - } - } - } - - case api.ServiceTypeExternalName: - - s := msg.Service{Key: strings.Join(svcBase, "/"), Host: svc.ExternalName, TTL: k.ttl} - if t, _ := s.HostType(); t == dns.TypeCNAME { - ch <- []dns.RR{s.NewCNAME(msg.Domain(s.Key), s.Host)} } } } - ch <- soa - close(ch) - }() - return ch, nil + } } // emitAddressRecord generates a new A or AAAA record based on the msg.Service and writes it to a channel. diff --git a/plugin/kubernetes/xfr_test.go b/plugin/kubernetes/xfr_test.go index 61e5d0af6..dcd57380c 100644 --- a/plugin/kubernetes/xfr_test.go +++ b/plugin/kubernetes/xfr_test.go @@ -38,7 +38,24 @@ func TestKubernetesAXFR(t *testing.T) { if err != nil { t.Error(err) } - validateAXFR(t, ch) + validateAXFR(t, ch, false) +} + +func TestKubernetesMultiClusterAXFR(t *testing.T) { + k := New([]string{"cluster.local.", "clusterset.local."}) + k.opts.multiclusterZones = []string{"clusterset.local."} + k.APIConn = &APIConnServeTest{} + k.Namespaces = map[string]struct{}{"testns": {}, "kube-system": {}} + k.localIPs = []net.IP{net.ParseIP("10.0.0.10")} + + dnsmsg := &dns.Msg{} + dnsmsg.SetAxfr("clusterset.local.") + + ch, err := k.Transfer("clusterset.local.", 0) + if err != nil { + t.Error(err) + } + validateAXFR(t, ch, true) } func TestKubernetesIXFRFallback(t *testing.T) { @@ -54,7 +71,7 @@ func TestKubernetesIXFRFallback(t *testing.T) { if err != nil { t.Error(err) } - validateAXFR(t, ch) + validateAXFR(t, ch, false) } func TestKubernetesIXFRCurrent(t *testing.T) { @@ -87,7 +104,7 @@ func TestKubernetesIXFRCurrent(t *testing.T) { } } -func validateAXFR(t *testing.T, ch <-chan []dns.RR) { +func validateAXFR(t *testing.T, ch <-chan []dns.RR, multicluster bool) { xfr := []dns.RR{} for rrs := range ch { xfr = append(xfr, rrs...) @@ -96,7 +113,11 @@ func validateAXFR(t *testing.T, ch <-chan []dns.RR) { t.Error("Invalid transfer response, does not start with SOA record") } - zp := dns.NewZoneParser(strings.NewReader(expectedZone), "", "") + expected := expectedZone + if multicluster { + expected = expectedMultiClusterZone + } + zp := dns.NewZoneParser(strings.NewReader(expected), "", "") i := 0 for rr, ok := zp.Next(); ok; rr, ok = zp.Next() { if !dns.IsDuplicate(rr, xfr[i]) { @@ -154,3 +175,46 @@ svcempty.testns.svc.cluster.local. 5 IN SRV 0 100 80 svcempty.testns.svc.cluster _http._tcp.svcempty.testns.svc.cluster.local. 5 IN SRV 0 100 80 svcempty.testns.svc.cluster.local. cluster.local. 5 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 3 7200 1800 86400 5 ` + +const expectedMultiClusterZone = ` +clusterset.local. 5 IN SOA ns.dns.clusterset.local. hostmaster.clusterset.local. 3 7200 1800 86400 5 +clusterset.local. 5 IN NS ns.dns.clusterset.local. +ns.dns.clusterset.local. 5 IN A 10.0.0.10 +hdls1.testns.svc.clusterset.local. 5 IN A 172.0.0.2 +172-0-0-2.hdls1.testns.svc.clusterset.local. 5 IN A 172.0.0.2 +_http._tcp.hdls1.testns.svc.clusterset.local. 5 IN SRV 0 16 80 172-0-0-2.hdls1.testns.svc.clusterset.local. +hdls1.testns.svc.clusterset.local. 5 IN A 172.0.0.3 +172-0-0-3.hdls1.testns.svc.clusterset.local. 5 IN A 172.0.0.3 +_http._tcp.hdls1.testns.svc.clusterset.local. 5 IN SRV 0 16 80 172-0-0-3.hdls1.testns.svc.clusterset.local. +hdls1.testns.svc.clusterset.local. 5 IN A 172.0.0.4 +dup-name.hdls1.testns.svc.clusterset.local. 5 IN A 172.0.0.4 +_http._tcp.hdls1.testns.svc.clusterset.local. 5 IN SRV 0 16 80 dup-name.hdls1.testns.svc.clusterset.local. +hdls1.testns.svc.clusterset.local. 5 IN A 172.0.0.5 +dup-name.hdls1.testns.svc.clusterset.local. 5 IN A 172.0.0.5 +_http._tcp.hdls1.testns.svc.clusterset.local. 5 IN SRV 0 16 80 dup-name.hdls1.testns.svc.clusterset.local. +hdls1.testns.svc.clusterset.local. 5 IN AAAA 5678:abcd::1 +5678-abcd--1.hdls1.testns.svc.clusterset.local. 5 IN AAAA 5678:abcd::1 +_http._tcp.hdls1.testns.svc.clusterset.local. 5 IN SRV 0 16 80 5678-abcd--1.hdls1.testns.svc.clusterset.local. +hdls1.testns.svc.clusterset.local. 5 IN AAAA 5678:abcd::2 +5678-abcd--2.hdls1.testns.svc.clusterset.local. 5 IN AAAA 5678:abcd::2 +_http._tcp.hdls1.testns.svc.clusterset.local. 5 IN SRV 0 16 80 5678-abcd--2.hdls1.testns.svc.clusterset.local. +hdlsprtls.testns.svc.clusterset.local. 5 IN A 172.0.0.20 +172-0-0-20.hdlsprtls.testns.svc.clusterset.local. 5 IN A 172.0.0.20 +kubedns.kube-system.svc.clusterset.local. 5 IN A 10.0.0.10 +kubedns.kube-system.svc.clusterset.local. 5 IN SRV 0 100 53 kubedns.kube-system.svc.clusterset.local. +_dns._udp.kubedns.kube-system.svc.clusterset.local. 5 IN SRV 0 100 53 kubedns.kube-system.svc.clusterset.local. +svc-dual-stack.testns.svc.clusterset.local. 5 IN A 10.0.0.3 +svc-dual-stack.testns.svc.clusterset.local. 5 IN AAAA 10::3 +svc-dual-stack.testns.svc.clusterset.local. 5 IN SRV 0 100 80 svc-dual-stack.testns.svc.clusterset.local. +_http._tcp.svc-dual-stack.testns.svc.clusterset.local. 5 IN SRV 0 100 80 svc-dual-stack.testns.svc.clusterset.local. +svc1.testns.svc.clusterset.local. 5 IN A 10.0.0.1 +svc1.testns.svc.clusterset.local. 5 IN SRV 0 100 80 svc1.testns.svc.clusterset.local. +_http._tcp.svc1.testns.svc.clusterset.local. 5 IN SRV 0 100 80 svc1.testns.svc.clusterset.local. +svc6.testns.svc.clusterset.local. 5 IN AAAA 1234:abcd::1 +svc6.testns.svc.clusterset.local. 5 IN SRV 0 100 80 svc6.testns.svc.clusterset.local. +_http._tcp.svc6.testns.svc.clusterset.local. 5 IN SRV 0 100 80 svc6.testns.svc.clusterset.local. +svcempty.testns.svc.clusterset.local. 5 IN A 10.0.0.1 +svcempty.testns.svc.clusterset.local. 5 IN SRV 0 100 80 svcempty.testns.svc.clusterset.local. +_http._tcp.svcempty.testns.svc.clusterset.local. 5 IN SRV 0 100 80 svcempty.testns.svc.clusterset.local. +clusterset.local. 5 IN SOA ns.dns.clusterset.local. hostmaster.clusterset.local. 3 7200 1800 86400 5 +`