plugin/k8s_external: Resolve headless services (#5505)

*add option for resolving headless Services without external IPs in k8s_external

Signed-off-by: Tomas Kohout <tomas.kohout1995@gmail.com>
This commit is contained in:
TomasKohout
2022-08-30 20:59:27 +02:00
committed by GitHub
parent b218b56063
commit 6782b7fb42
15 changed files with 491 additions and 81 deletions

View File

@@ -11,9 +11,21 @@ import (
"github.com/miekg/dns"
)
// Those constants are used to distinguish between records in ExternalServices headless
// return values.
// They are always appendedn to key in a map which is
// either base service key eg. /com/example/namespace/service/endpoint or
// /com/example/namespace/service/_http/_tcp/port.protocol
// this will allow us to distinguish services in implementation of Transfer protocol
// see plugin/k8s_external/transfer.go
const (
Endpoint = "endpoint"
PortProtocol = "port.protocol"
)
// External implements the ExternalFunc call from the external plugin.
// It returns any services matching in the services' ExternalIPs.
func (k *Kubernetes) External(state request.Request) ([]msg.Service, int) {
// It returns any services matching in the services' ExternalIPs and if enabled, headless endpoints..
func (k *Kubernetes) External(state request.Request, headless bool) ([]msg.Service, int) {
if state.QType() == dns.TypePTR {
ip := dnsutil.ExtractAddressFromReverse(state.Name())
if ip != "" {
@@ -33,10 +45,11 @@ func (k *Kubernetes) External(state request.Request) ([]msg.Service, int) {
if last < 0 {
return nil, dns.RcodeServerFailure
}
// We are dealing with a fairly normal domain name here, but we still need to have the service
// and the namespace:
// service.namespace.<base>
var port, protocol string
// We are dealing with a fairly normal domain name here, but we still need to have the service,
// namespace and if present, endpoint:
// service.namespace.<base> or
// endpoint.service.namespace.<base>
var port, protocol, endpoint string
namespace := segs[last]
if !k.namespaceExposed(namespace) {
return nil, dns.RcodeNameError
@@ -49,7 +62,10 @@ func (k *Kubernetes) External(state request.Request) ([]msg.Service, int) {
service := segs[last]
last--
if last == 1 {
if last == 0 {
endpoint = stripUnderscore(segs[last])
last--
} else if last == 1 {
protocol = stripUnderscore(segs[last])
port = stripUnderscore(segs[last-1])
last -= 2
@@ -60,8 +76,13 @@ func (k *Kubernetes) External(state request.Request) ([]msg.Service, int) {
return nil, dns.RcodeNameError
}
var (
endpointsList []*object.Endpoints
serviceList []*object.Service
)
idx := object.ServiceKey(service, namespace)
serviceList := k.APIConn.SvcIndex(idx)
serviceList = k.APIConn.SvcIndex(idx)
services := []msg.Service{}
zonePath := msg.Path(state.Zone, coredns)
@@ -75,16 +96,47 @@ func (k *Kubernetes) External(state request.Request) ([]msg.Service, int) {
continue
}
for _, ip := range svc.ExternalIPs {
for _, p := range svc.Ports {
if !(matchPortAndProtocol(port, p.Name, protocol, string(p.Protocol))) {
if headless && len(svc.ExternalIPs) == 0 && (svc.Headless() || endpoint != "") {
if endpointsList == nil {
endpointsList = k.APIConn.EpIndex(idx)
}
// Endpoint query or headless service
for _, ep := range endpointsList {
if object.EndpointsKey(svc.Name, svc.Namespace) != ep.Index {
continue
}
rcode = dns.RcodeSuccess
s := msg.Service{Host: ip, Port: int(p.Port), TTL: k.ttl}
s.Key = strings.Join([]string{zonePath, svc.Namespace, svc.Name}, "/")
services = append(services, s)
for _, eps := range ep.Subsets {
for _, addr := range eps.Addresses {
if endpoint != "" && !match(endpoint, endpointHostname(addr, k.endpointNameMode)) {
continue
}
for _, p := range eps.Ports {
if !(matchPortAndProtocol(port, p.Name, protocol, p.Protocol)) {
continue
}
s := msg.Service{Host: addr.IP, Port: int(p.Port), TTL: k.ttl}
s.Key = strings.Join([]string{zonePath, svc.Namespace, svc.Name, endpointHostname(addr, k.endpointNameMode)}, "/")
services = append(services, s)
}
}
}
}
continue
} else {
for _, ip := range svc.ExternalIPs {
for _, p := range svc.Ports {
if !(matchPortAndProtocol(port, p.Name, protocol, string(p.Protocol))) {
continue
}
rcode = dns.RcodeSuccess
s := msg.Service{Host: ip, Port: int(p.Port), TTL: k.ttl}
s.Key = strings.Join([]string{zonePath, svc.Namespace, svc.Name}, "/")
services = append(services, s)
}
}
}
}
@@ -96,31 +148,65 @@ func (k *Kubernetes) External(state request.Request) ([]msg.Service, int) {
}
// ExternalAddress returns the external service address(es) for the CoreDNS service.
func (k *Kubernetes) ExternalAddress(state request.Request) []dns.RR {
func (k *Kubernetes) ExternalAddress(state request.Request, headless bool) []dns.RR {
// If CoreDNS is running inside the Kubernetes cluster: k.nsAddrs() will return the external IPs of the services
// targeting the CoreDNS Pod.
// If CoreDNS is running outside of the Kubernetes cluster: k.nsAddrs() will return the first non-loopback IP
// address seen on the local system it is running on. This could be the wrong answer if coredns is using the *bind*
// plugin to bind to a different IP address.
return k.nsAddrs(true, state.Zone)
return k.nsAddrs(true, headless, state.Zone)
}
// ExternalServices returns all services with external IPs
func (k *Kubernetes) ExternalServices(zone string) (services []msg.Service) {
// ExternalServices returns all services with external IPs and if enabled headless services
func (k *Kubernetes) ExternalServices(zone string, headless bool) (services []msg.Service, headlessServices map[string][]msg.Service) {
zonePath := msg.Path(zone, coredns)
headlessServices = make(map[string][]msg.Service)
for _, svc := range k.APIConn.ServiceList() {
for _, ip := range svc.ExternalIPs {
for _, p := range svc.Ports {
s := msg.Service{Host: ip, Port: int(p.Port), TTL: k.ttl}
s.Key = strings.Join([]string{zonePath, svc.Namespace, svc.Name}, "/")
services = append(services, s)
s.Key = strings.Join(append([]string{zonePath, svc.Namespace, svc.Name}, strings.ToLower("_"+string(p.Protocol)), strings.ToLower("_"+string(p.Name))), "/")
s.TargetStrip = 2
services = append(services, s)
// Endpoints and headless services
if headless && len(svc.ExternalIPs) == 0 && svc.Headless() {
idx := object.ServiceKey(svc.Name, svc.Namespace)
endpointsList := k.APIConn.EpIndex(idx)
for _, ep := range endpointsList {
for _, eps := range ep.Subsets {
for _, addr := range eps.Addresses {
// we need to have some answers grouped together
// 1. for endpoint requests eg. endpoint-0.service.example.com - will always have one endpoint
// 2. for service requests eg. service.example.com - can have multiple endpoints
// 3. for port.protocol requests eg. _http._tcp.service.example.com - can have multiple endpoints
for _, p := range eps.Ports {
s := msg.Service{Host: addr.IP, Port: int(p.Port), TTL: k.ttl}
baseSvc := strings.Join([]string{zonePath, svc.Namespace, svc.Name}, "/")
s.Key = strings.Join([]string{baseSvc, endpointHostname(addr, k.endpointNameMode)}, "/")
headlessServices[strings.Join([]string{baseSvc, Endpoint}, "/")] = append(headlessServices[strings.Join([]string{baseSvc, Endpoint}, "/")], s)
// As per spec unnamed ports do not have a srv record
// https://github.com/kubernetes/dns/blob/master/docs/specification.md#232---srv-records
if p.Name == "" {
continue
}
s.Host = msg.Domain(s.Key)
s.Key = strings.Join(append([]string{zonePath, svc.Namespace, svc.Name}, strings.ToLower("_"+string(p.Protocol)), strings.ToLower("_"+string(p.Name))), "/")
headlessServices[strings.Join([]string{s.Key, PortProtocol}, "/")] = append(headlessServices[strings.Join([]string{s.Key, PortProtocol}, "/")], s)
}
}
}
}
continue
} else {
for _, ip := range svc.ExternalIPs {
for _, p := range svc.Ports {
s := msg.Service{Host: ip, Port: int(p.Port), TTL: k.ttl}
s.Key = strings.Join([]string{zonePath, svc.Namespace, svc.Name}, "/")
services = append(services, s)
s.Key = strings.Join(append([]string{zonePath, svc.Namespace, svc.Name}, strings.ToLower("_"+string(p.Protocol)), strings.ToLower("_"+string(p.Name))), "/")
s.TargetStrip = 2
services = append(services, s)
}
}
}
}
return services
return services, headlessServices
}
//ExternalSerial returns the serial of the external zone

View File

@@ -43,6 +43,22 @@ var extCases = []struct {
{
Qname: "svc0.svc-nons.example.com.", Rcode: dns.RcodeNameError,
},
{
Qname: "svc-headless.testns.example.com.", Rcode: dns.RcodeSuccess,
Msg: []msg.Service{
{Host: "1.2.3.4", Port: 80, TTL: 5, Weight: 50, Key: "/c/org/example/testns/svc-headless"},
{Host: "1.2.3.5", Port: 80, TTL: 5, Weight: 50, Key: "/c/org/example/testns/svc-headless"},
},
},
{
Qname: "endpoint-0.svc-headless.testns.example.com.", Rcode: dns.RcodeSuccess,
Msg: []msg.Service{
{Host: "1.2.3.4", Port: 80, TTL: 5, Weight: 100, Key: "/c/org/example/testns/svc-headless/endpoint-0"},
},
},
{
Qname: "endpoint-1.svc-nons.testns.example.com.", Rcode: dns.RcodeNameError,
},
}
func TestExternal(t *testing.T) {
@@ -54,7 +70,7 @@ func TestExternal(t *testing.T) {
for i, tc := range extCases {
state := testRequest(tc.Qname)
svc, rcode := k.External(state)
svc, rcode := k.External(state, true)
if x := tc.Rcode; x != rcode {
t.Errorf("Test %d, expected rcode %d, got %d", i, x, rcode)
@@ -75,15 +91,23 @@ func TestExternal(t *testing.T) {
type external struct{}
func (external) HasSynced() bool { return true }
func (external) Run() {}
func (external) Stop() error { return nil }
func (external) EpIndexReverse(string) []*object.Endpoints { return nil }
func (external) SvcIndexReverse(string) []*object.Service { return nil }
func (external) SvcExtIndexReverse(string) []*object.Service { return nil }
func (external) Modified(bool) int64 { return 0 }
func (external) EpIndex(s string) []*object.Endpoints { return nil }
func (external) EndpointsList() []*object.Endpoints { return nil }
func (external) HasSynced() bool { return true }
func (external) Run() {}
func (external) Stop() error { return nil }
func (external) EpIndexReverse(string) []*object.Endpoints { return nil }
func (external) SvcIndexReverse(string) []*object.Service { return nil }
func (external) SvcExtIndexReverse(string) []*object.Service { return nil }
func (external) Modified(bool) int64 { return 0 }
func (external) EpIndex(s string) []*object.Endpoints {
return epIndexExternal[s]
}
func (external) EndpointsList() []*object.Endpoints {
var eps []*object.Endpoints
for _, ep := range epIndexExternal {
eps = append(eps, ep...)
}
return eps
}
func (external) GetNodeByName(ctx context.Context, name string) (*api.Node, error) { return nil, nil }
func (external) SvcIndex(s string) []*object.Service { return svcIndexExternal[s] }
func (external) PodIndex(string) []*object.Pod { return nil }
@@ -94,6 +118,41 @@ func (external) GetNamespaceByName(name string) (*object.Namespace, error) {
}, nil
}
var epIndexExternal = map[string][]*object.Endpoints{
"svc-headless.testns": {
{
Name: "svc-headless",
Namespace: "testns",
Index: "svc-headless.testns",
Subsets: []object.EndpointSubset{
{
Ports: []object.EndpointPort{
{
Port: 80,
Name: "http",
Protocol: "TCP",
},
},
Addresses: []object.EndpointAddress{
{
IP: "1.2.3.4",
Hostname: "endpoint-svc-0",
NodeName: "test-node",
TargetRefName: "endpoint-svc-0",
},
{
IP: "1.2.3.5",
Hostname: "endpoint-svc-1",
NodeName: "test-node",
TargetRefName: "endpoint-svc-1",
},
},
},
},
},
},
}
var svcIndexExternal = map[string][]*object.Service{
"svc1.testns": {
{
@@ -115,6 +174,15 @@ var svcIndexExternal = map[string][]*object.Service{
Ports: []api.ServicePort{{Name: "http", Protocol: "tcp", Port: 80}},
},
},
"svc-headless.testns": {
{
Name: "svc-headless",
Namespace: "testns",
Type: api.ServiceTypeClusterIP,
ClusterIPs: []string{api.ClusterIPNone},
Ports: []api.ServicePort{{Name: "http", Protocol: "tcp", Port: 80}},
},
},
}
func (external) ServiceList() []*object.Service {

View File

@@ -112,7 +112,7 @@ func (k *Kubernetes) Services(ctx context.Context, state request.Request, exact
case dns.TypeNS:
// We can only get here if the qname equals the zone, see ServeDNS in handler.go.
nss := k.nsAddrs(false, state.Zone)
nss := k.nsAddrs(false, false, state.Zone)
var svcs []msg.Service
for _, ns := range nss {
if ns.Header().Rrtype == dns.TypeA {
@@ -127,7 +127,7 @@ func (k *Kubernetes) Services(ctx context.Context, state request.Request, exact
}
if isDefaultNS(state.Name(), state.Zone) {
nss := k.nsAddrs(false, state.Zone)
nss := k.nsAddrs(false, false, state.Zone)
var svcs []msg.Service
for _, ns := range nss {
if ns.Header().Rrtype == dns.TypeA && state.QType() == dns.TypeA {

View File

@@ -13,7 +13,7 @@ func isDefaultNS(name, zone string) bool {
// nsAddrs returns the A or AAAA records for the CoreDNS service in the cluster. If the service cannot be found,
// it returns a record for the local address of the machine we're running on.
func (k *Kubernetes) nsAddrs(external bool, zone string) []dns.RR {
func (k *Kubernetes) nsAddrs(external, headless bool, zone string) []dns.RR {
var (
svcNames []string
svcIPs []net.IP
@@ -31,10 +31,21 @@ func (k *Kubernetes) nsAddrs(external bool, zone string) []dns.RR {
for _, svc := range svcs {
if external {
svcName := strings.Join([]string{svc.Name, svc.Namespace, zone}, ".")
for _, exIP := range svc.ExternalIPs {
svcNames = append(svcNames, svcName)
svcIPs = append(svcIPs, net.ParseIP(exIP))
if headless && svc.Headless() {
for _, s := range endpoint.Subsets {
for _, a := range s.Addresses {
svcNames = append(svcNames, endpointHostname(a, k.endpointNameMode)+"."+svcName)
svcIPs = append(svcIPs, net.ParseIP(a.IP))
}
}
} else {
for _, exIP := range svc.ExternalIPs {
svcNames = append(svcNames, svcName)
svcIPs = append(svcIPs, net.ParseIP(exIP))
}
}
continue
}
svcName := strings.Join([]string{svc.Name, svc.Namespace, Svc, zone}, ".")

View File

@@ -103,7 +103,7 @@ func TestNsAddrs(t *testing.T) {
k.APIConn = &APIConnTest{}
k.localIPs = []net.IP{net.ParseIP("10.244.0.20")}
cdrs := k.nsAddrs(false, k.Zones[0])
cdrs := k.nsAddrs(false, false, k.Zones[0])
if len(cdrs) != 3 {
t.Fatalf("Expected 3 results, got %v", len(cdrs))
@@ -137,13 +137,36 @@ func TestNsAddrs(t *testing.T) {
}
}
func TestNsAddrsExternalHeadless(t *testing.T) {
k := New([]string{"example.com."})
k.APIConn = &APIConnTest{}
k.localIPs = []net.IP{net.ParseIP("10.244.0.20")}
// there are only headless sevices
cdrs := k.nsAddrs(true, true, k.Zones[0])
if len(cdrs) != 1 {
t.Fatalf("Expected 0 results, got %v", cdrs)
}
cdr := cdrs[0]
expected := "10.244.0.20"
if cdr.(*dns.A).A.String() != expected {
t.Errorf("Expected A address to be %q, got %q", expected, cdr.(*dns.A).A.String())
}
expected = "10-244-0-20.hdls-dns-service.kube-system.example.com."
if cdr.Header().Name != expected {
t.Errorf("Expected record name to be %q, got %q", expected, cdr.Header().Name)
}
}
func TestNsAddrsExternal(t *testing.T) {
k := New([]string{"example.com."})
k.APIConn = &APIConnTest{}
k.localIPs = []net.IP{net.ParseIP("10.244.0.20")}
// initially no services have an external IP ...
cdrs := k.nsAddrs(true, k.Zones[0])
cdrs := k.nsAddrs(true, false, k.Zones[0])
if len(cdrs) != 0 {
t.Fatalf("Expected 0 results, got %v", len(cdrs))
@@ -151,7 +174,7 @@ func TestNsAddrsExternal(t *testing.T) {
// Add an external IP to one of the services ...
svcs[0].ExternalIPs = []string{"1.2.3.4"}
cdrs = k.nsAddrs(true, k.Zones[0])
cdrs = k.nsAddrs(true, false, k.Zones[0])
if len(cdrs) != 1 {
t.Fatalf("Expected 1 results, got %v", len(cdrs))
@@ -166,3 +189,31 @@ func TestNsAddrsExternal(t *testing.T) {
t.Errorf("Expected record name to be %q, got %q", expected, cdr.Header().Name)
}
}
func TestNsAddrsExternalWithPreexistingExternalIP(t *testing.T) {
k := New([]string{"example.com."})
k.APIConn = &APIConnTest{}
k.localIPs = []net.IP{net.ParseIP("10.244.0.20")}
svcs[0].ExternalIPs = []string{"1.2.3.4"}
// initially no services have an external IP ...
cdrs := k.nsAddrs(true, false, k.Zones[0])
if len(cdrs) != 1 {
t.Fatalf("Expected 1 results, got %v", len(cdrs))
}
if len(cdrs) != 1 {
t.Fatalf("Expected 1 results, got %v", len(cdrs))
}
cdr := cdrs[0]
expected := "1.2.3.4"
if cdr.(*dns.A).A.String() != expected {
t.Errorf("Expected A address to be %q, got %q", expected, cdr.(*dns.A).A.String())
}
expected = "dns-service.kube-system.example.com."
if cdr.Header().Name != expected {
t.Errorf("Expected record name to be %q, got %q", expected, cdr.Header().Name)
}
}

View File

@@ -42,7 +42,7 @@ func (k *Kubernetes) Transfer(zone string, serial uint32) (<-chan []dns.RR, erro
}
ch <- soa
nsAddrs := k.nsAddrs(false, zone)
nsAddrs := k.nsAddrs(false, false, zone)
nsHosts := make(map[string]struct{})
for _, nsAddr := range nsAddrs {
nsHost := nsAddr.Header().Name