kubernetes: add multicluster support (#7266)

* kubernetes: add multicluster support

Add multicluster support via Multi-Cluster Services API (MCS-API) via a
new option `multiclusterZones` in the kubernetes plugin.

When some multicluster zones are passed to the kubernetes plugin, it
will start watching the ServiceImport objects and its associated
EndpointSlices.

Signed-off-by: Arthur Outhenin-Chalandre <arthur@cri.epita.fr>

* kubernetes: implement xfr support for multicluster zones

Signed-off-by: Arthur Outhenin-Chalandre <arthur@cri.epita.fr>

---------

Signed-off-by: Arthur Outhenin-Chalandre <arthur@cri.epita.fr>
This commit is contained in:
Arthur Outhenin-Chalandre
2025-05-19 07:58:16 +02:00
committed by GitHub
parent 76b199f829
commit 5c71bd0b87
23 changed files with 1634 additions and 298 deletions

View File

@@ -15,29 +15,45 @@ import (
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
mcs "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
mcsClientset "sigs.k8s.io/mcs-api/pkg/client/clientset/versioned/typed/apis/v1alpha1"
)
const (
podIPIndex = "PodIP"
svcNameNamespaceIndex = "ServiceNameNamespace"
svcIPIndex = "ServiceIP"
svcExtIPIndex = "ServiceExternalIP"
epNameNamespaceIndex = "EndpointNameNamespace"
epIPIndex = "EndpointsIP"
podIPIndex = "PodIP"
svcNameNamespaceIndex = "ServiceNameNamespace"
svcIPIndex = "ServiceIP"
svcExtIPIndex = "ServiceExternalIP"
epNameNamespaceIndex = "EndpointNameNamespace"
epIPIndex = "EndpointsIP"
svcImportNameNamespaceIndex = "ServiceImportNameNamespace"
mcEpNameNamespaceIndex = "MultiClusterEndpointsImportNameNamespace"
)
type ModifiedMode int
const (
ModifiedInternal ModifiedMode = iota
ModifiedExternal
ModifiedMultiCluster
)
type dnsController interface {
ServiceList() []*object.Service
EndpointsList() []*object.Endpoints
ServiceImportList() []*object.ServiceImport
SvcIndex(string) []*object.Service
SvcIndexReverse(string) []*object.Service
SvcExtIndexReverse(string) []*object.Service
SvcImportIndex(string) []*object.ServiceImport
PodIndex(string) []*object.Pod
EpIndex(string) []*object.Endpoints
EpIndexReverse(string) []*object.Endpoints
McEpIndex(string) []*object.MultiClusterEndpoints
GetNodeByName(context.Context, string) (*api.Node, error)
GetNamespaceByName(string) (*object.Namespace, error)
@@ -46,9 +62,8 @@ type dnsController interface {
HasSynced() bool
Stop() error
// Modified returns the timestamp of the most recent changes to services. If the passed bool is true, it should
// return the timestamp of the most recent changes to services with external facing IP addresses
Modified(bool) int64
// Modified returns the timestamp of the most recent changes to services.
Modified(ModifiedMode) int64
}
type dnsControl struct {
@@ -56,24 +71,32 @@ type dnsControl struct {
// It needs to be first because it is guaranteed to be 8-byte
// aligned ( we use sync.LoadAtomic with this )
modified int64
// multiClusterModified tracks timestamp of the most recent changes to
// multi cluster services
multiClusterModified int64
// extModified tracks timestamp of the most recent changes to
// services with external facing IP addresses
extModified int64
client kubernetes.Interface
client kubernetes.Interface
mcsClient mcsClientset.MulticlusterV1alpha1Interface
selector labels.Selector
namespaceSelector labels.Selector
svcController cache.Controller
podController cache.Controller
epController cache.Controller
nsController cache.Controller
svcController cache.Controller
podController cache.Controller
epController cache.Controller
nsController cache.Controller
svcImportController cache.Controller
mcEpController cache.Controller
svcLister cache.Indexer
podLister cache.Indexer
epLister cache.Indexer
nsLister cache.Store
svcLister cache.Indexer
podLister cache.Indexer
epLister cache.Indexer
nsLister cache.Store
svcImportLister cache.Indexer
mcEpLister cache.Indexer
// stopLock is used to enforce only a single call to Stop is active.
// Needed because we allow stopping through an http endpoint and
@@ -82,8 +105,9 @@ type dnsControl struct {
shutdown bool
stopCh chan struct{}
zones []string
endpointNameMode bool
zones []string
endpointNameMode bool
multiclusterZones []string
}
type dnsControlOpts struct {
@@ -97,19 +121,22 @@ type dnsControlOpts struct {
namespaceLabelSelector *meta.LabelSelector
namespaceSelector labels.Selector
zones []string
endpointNameMode bool
zones []string
endpointNameMode bool
multiclusterZones []string
}
// newdnsController creates a controller for CoreDNS.
func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts dnsControlOpts) *dnsControl {
func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, mcsClient mcsClientset.MulticlusterV1alpha1Interface, opts dnsControlOpts) *dnsControl {
dns := dnsControl{
client: kubeClient,
mcsClient: mcsClient,
selector: opts.selector,
namespaceSelector: opts.namespaceSelector,
stopCh: make(chan struct{}),
zones: opts.zones,
endpointNameMode: opts.endpointNameMode,
multiclusterZones: opts.multiclusterZones,
}
dns.svcLister, dns.svcController = object.NewIndexerInformer(
@@ -164,6 +191,35 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
object.DefaultProcessor(object.ToNamespace, nil),
)
if len(opts.multiclusterZones) > 0 {
mcsEpReq, _ := labels.NewRequirement(mcs.LabelServiceName, selection.Exists, []string{})
mcsEpSelector := dns.selector
if mcsEpSelector == nil {
mcsEpSelector = labels.NewSelector()
}
mcsEpSelector = mcsEpSelector.Add(*mcsEpReq)
dns.mcEpLister, dns.mcEpController = object.NewIndexerInformer(
&cache.ListWatch{
ListFunc: endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, mcsEpSelector),
WatchFunc: endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, mcsEpSelector),
},
&discovery.EndpointSlice{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{mcEpNameNamespaceIndex: mcEpNameNamespaceIndexFunc},
object.DefaultProcessor(object.EndpointSliceToMultiClusterEndpoints, dns.EndpointSliceLatencyRecorder()),
)
dns.svcImportLister, dns.svcImportController = object.NewIndexerInformer(
&cache.ListWatch{
ListFunc: serviceImportListFunc(ctx, dns.mcsClient, api.NamespaceAll, dns.namespaceSelector),
WatchFunc: serviceImportWatchFunc(ctx, dns.mcsClient, api.NamespaceAll, dns.namespaceSelector),
},
&mcs.ServiceImport{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{svcImportNameNamespaceIndex: svcImportNameNamespaceIndexFunc},
object.DefaultProcessor(object.ToServiceImport, nil),
)
}
return &dns
}
@@ -174,6 +230,7 @@ func (dns *dnsControl) EndpointsLatencyRecorder() *object.EndpointLatencyRecorde
},
}
}
func (dns *dnsControl) EndpointSliceLatencyRecorder() *object.EndpointLatencyRecorder {
return &object.EndpointLatencyRecorder{
ServiceFunc: func(o meta.Object) []*object.Service {
@@ -234,6 +291,22 @@ func epIPIndexFunc(obj interface{}) ([]string, error) {
return ep.IndexIP, nil
}
func svcImportNameNamespaceIndexFunc(obj interface{}) ([]string, error) {
s, ok := obj.(*object.ServiceImport)
if !ok {
return nil, errObj
}
return []string{s.Index}, nil
}
func mcEpNameNamespaceIndexFunc(obj interface{}) ([]string, error) {
mcEp, ok := obj.(*object.MultiClusterEndpoints)
if !ok {
return nil, errObj
}
return []string{mcEp.Index}, nil
}
func serviceListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) {
if s != nil {
@@ -274,6 +347,15 @@ func namespaceListFunc(ctx context.Context, c kubernetes.Interface, s labels.Sel
}
}
func serviceImportListFunc(ctx context.Context, c mcsClientset.MulticlusterV1alpha1Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) {
if s != nil {
opts.LabelSelector = s.String()
}
return c.ServiceImports(ns).List(ctx, opts)
}
}
func serviceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
@@ -314,6 +396,15 @@ func namespaceWatchFunc(ctx context.Context, c kubernetes.Interface, s labels.Se
}
}
func serviceImportWatchFunc(ctx context.Context, c mcsClientset.MulticlusterV1alpha1Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
options.LabelSelector = s.String()
}
return c.ServiceImports(ns).Watch(ctx, options)
}
}
// Stop stops the controller.
func (dns *dnsControl) Stop() error {
dns.stopLock.Lock()
@@ -342,6 +433,12 @@ func (dns *dnsControl) Run() {
go dns.podController.Run(dns.stopCh)
}
go dns.nsController.Run(dns.stopCh)
if dns.svcImportController != nil {
go dns.svcImportController.Run(dns.stopCh)
}
if dns.mcEpController != nil {
go dns.mcEpController.Run(dns.stopCh)
}
<-dns.stopCh
}
@@ -357,7 +454,15 @@ func (dns *dnsControl) HasSynced() bool {
c = dns.podController.HasSynced()
}
d := dns.nsController.HasSynced()
return a && b && c && d
e := true
if dns.svcImportController != nil {
c = dns.svcImportController.HasSynced()
}
f := true
if dns.mcEpController != nil {
c = dns.mcEpController.HasSynced()
}
return a && b && c && d && e && f
}
func (dns *dnsControl) ServiceList() (svcs []*object.Service) {
@@ -372,6 +477,18 @@ func (dns *dnsControl) ServiceList() (svcs []*object.Service) {
return svcs
}
func (dns *dnsControl) ServiceImportList() (svcs []*object.ServiceImport) {
os := dns.svcImportLister.List()
for _, o := range os {
s, ok := o.(*object.ServiceImport)
if !ok {
continue
}
svcs = append(svcs, s)
}
return svcs
}
func (dns *dnsControl) EndpointsList() (eps []*object.Endpoints) {
os := dns.epLister.List()
for _, o := range os {
@@ -446,6 +563,21 @@ func (dns *dnsControl) SvcExtIndexReverse(ip string) (svcs []*object.Service) {
return svcs
}
func (dns *dnsControl) SvcImportIndex(idx string) (svcs []*object.ServiceImport) {
os, err := dns.svcImportLister.ByIndex(svcImportNameNamespaceIndex, idx)
if err != nil {
return nil
}
for _, o := range os {
s, ok := o.(*object.ServiceImport)
if !ok {
continue
}
svcs = append(svcs, s)
}
return svcs
}
func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) {
os, err := dns.epLister.ByIndex(epNameNamespaceIndex, idx)
if err != nil {
@@ -476,6 +608,21 @@ func (dns *dnsControl) EpIndexReverse(ip string) (ep []*object.Endpoints) {
return ep
}
func (dns *dnsControl) McEpIndex(idx string) (ep []*object.MultiClusterEndpoints) {
os, err := dns.mcEpLister.ByIndex(mcEpNameNamespaceIndex, idx)
if err != nil {
return nil
}
for _, o := range os {
e, ok := o.(*object.MultiClusterEndpoints)
if !ok {
continue
}
ep = append(ep, e)
}
return ep
}
// GetNodeByName return the node by name. If nothing is found an error is
// returned. This query causes a round trip to the k8s API server, so use
// sparingly. Currently, this is only used for Federation.
@@ -523,12 +670,20 @@ func (dns *dnsControl) detectChanges(oldObj, newObj interface{}) {
if emod {
dns.updateExtModified()
}
case *object.ServiceImport:
if !serviceImportEquivalent(oldObj, newObj) {
dns.updateMultiClusterModified()
}
case *object.Pod:
dns.updateModified()
case *object.Endpoints:
if !endpointsEquivalent(oldObj.(*object.Endpoints), newObj.(*object.Endpoints)) {
dns.updateModified()
}
case *object.MultiClusterEndpoints:
if !multiclusterEndpointsEquivalent(oldObj.(*object.MultiClusterEndpoints), newObj.(*object.MultiClusterEndpoints)) {
dns.updateMultiClusterModified()
}
default:
log.Warningf("Updates for %T not supported.", ob)
}
@@ -596,6 +751,23 @@ func endpointsEquivalent(a, b *object.Endpoints) bool {
return true
}
// multiclusterEndpointsEquivalent checks if the update to an endpoint is something
// that matters to us or if they are effectively equivalent.
func multiclusterEndpointsEquivalent(a, b *object.MultiClusterEndpoints) bool {
if a == nil || b == nil {
return false
}
if !endpointsEquivalent(&a.Endpoints, &b.Endpoints) {
return false
}
if a.ClusterId != b.ClusterId {
return false
}
return true
}
// serviceModified checks the services passed for changes that result in changes
// to internal and or external records. It returns two booleans, one for internal
// record changes, and a second for external record changes
@@ -651,11 +823,52 @@ func serviceModified(oldObj, newObj interface{}) (intSvc, extSvc bool) {
return intSvc, extSvc
}
func (dns *dnsControl) Modified(external bool) int64 {
if external {
return atomic.LoadInt64(&dns.extModified)
// serviceImportEquivalent checks if the update to a ServiceImport is something
// that matters to us or if they are effectively equivalent.
func serviceImportEquivalent(oldObj, newObj interface{}) bool {
if oldObj != nil && newObj == nil {
return false
}
return atomic.LoadInt64(&dns.modified)
if oldObj == nil && newObj != nil {
return false
}
newSvc := newObj.(*object.ServiceImport)
oldSvc := oldObj.(*object.ServiceImport)
if oldSvc.Type != newSvc.Type {
return false
}
// All Port fields are mutable, affecting both internal/external zone records
if len(oldSvc.Ports) != len(newSvc.Ports) {
return false
}
for i := range oldSvc.Ports {
if oldSvc.Ports[i].Name != newSvc.Ports[i].Name {
return false
}
if oldSvc.Ports[i].Port != newSvc.Ports[i].Port {
return false
}
if oldSvc.Ports[i].Protocol != newSvc.Ports[i].Protocol {
return false
}
}
return true
}
func (dns *dnsControl) Modified(mode ModifiedMode) int64 {
switch mode {
case ModifiedInternal:
return atomic.LoadInt64(&dns.modified)
case ModifiedExternal:
return atomic.LoadInt64(&dns.extModified)
case ModifiedMultiCluster:
return atomic.LoadInt64(&dns.multiClusterModified)
}
return -1
}
// updateModified set dns.modified to the current time.
@@ -664,6 +877,12 @@ func (dns *dnsControl) updateModified() {
atomic.StoreInt64(&dns.modified, unix)
}
// updateMultiClusterModified set dns.modified to the current time.
func (dns *dnsControl) updateMultiClusterModified() {
unix := time.Now().Unix()
atomic.StoreInt64(&dns.multiClusterModified, unix)
}
// updateExtModified set dns.extModified to the current time.
func (dns *dnsControl) updateExtModified() {
unix := time.Now().Unix()