Handle K8s middleware NS record (#662)

* commit for testing in cluster

* commit for testing in cluster

* refactor and add ns.dns record

* Release 007

* reduce heap allocations

* gofmt

* revert accidental Makefile commits

* restore prior rcode for disabled pod mode

* revert Makefile deltas

* add unit tests

* more unit tests

* make isRequestInReverseRange easier to test

* more unit tests

* addressing review feedback

* commit setup.go
This commit is contained in:
Chris O'Haver
2017-05-22 16:05:48 -04:00
committed by John Belamaric
parent 024f56682d
commit 7f950e496a
9 changed files with 537 additions and 97 deletions

View File

@@ -28,22 +28,23 @@ import (
// Kubernetes implements a middleware that connects to a Kubernetes cluster.
type Kubernetes struct {
Next middleware.Handler
Zones []string
primaryZone int
Proxy proxy.Proxy // Proxy for looking up names during the resolution process
APIEndpoint string
APICertAuth string
APIClientCert string
APIClientKey string
APIConn *dnsController
ResyncPeriod time.Duration
Namespaces []string
LabelSelector *unversionedapi.LabelSelector
Selector *labels.Selector
PodMode string
ReverseCidrs []net.IPNet
Fallthrough bool
Next middleware.Handler
Zones []string
primaryZone int
Proxy proxy.Proxy // Proxy for looking up names during the resolution process
APIEndpoint string
APICertAuth string
APIClientCert string
APIClientKey string
APIConn dnsController
ResyncPeriod time.Duration
Namespaces []string
LabelSelector *unversionedapi.LabelSelector
Selector *labels.Selector
PodMode string
ReverseCidrs []net.IPNet
Fallthrough bool
interfaceAddrs InterfaceAddrser
}
const (
@@ -83,36 +84,49 @@ type recordRequest struct {
var errNoItems = errors.New("no items found")
var errNsNotExposed = errors.New("namespace is not exposed")
var errInvalidRequest = errors.New("invalid query name")
var errZoneNotFound = errors.New("zone not found")
var errApiBadPodType = errors.New("expected type *api.Pod")
var errPodsDisabled = errors.New("pod records disabled")
// Services implements the ServiceBackend interface.
func (k *Kubernetes) Services(state request.Request, exact bool, opt middleware.Options) ([]msg.Service, []msg.Service, error) {
func (k *Kubernetes) Services(state request.Request, exact bool, opt middleware.Options) (svcs []msg.Service, debug []msg.Service, err error) {
r, e := k.parseRequest(state.Name(), state.Type())
r, e := k.parseRequest(state.Name(), state.QType())
if e != nil {
return nil, nil, e
}
switch state.Type() {
case "A", "SRV":
if state.Type() == "A" && isDefaultNS(state.Name(), r) {
// If this is an A request for "ns.dns", respond with a "fake" record for coredns.
// SOA records always use this hardcoded name
svcs = append(svcs, k.defaultNSMsg(r))
return svcs, nil, nil
}
s, e := k.Records(r)
return s, nil, e // Haven't implemented debug queries yet.
case "TXT":
s, e := k.recordsForTXT(r)
return s, nil, e
err := k.recordsForTXT(r, &svcs)
return svcs, nil, err
case "NS":
err = k.recordsForNS(r, &svcs)
return svcs, nil, err
}
return nil, nil, nil
}
func (k *Kubernetes) recordsForTXT(r recordRequest) ([]msg.Service, error) {
func (k *Kubernetes) recordsForTXT(r recordRequest, svcs *[]msg.Service) (err error) {
switch r.typeName {
case "dns-version":
s := msg.Service{
Text: DNSSchemaVersion,
TTL: 28800,
Key: msg.Path(r.typeName+"."+r.zone, "coredns")}
return []msg.Service{s}, nil
Key: msg.Path(strings.Join([]string{r.typeName, r.zone}, "."), "coredns")}
*svcs = append(*svcs, s)
return nil
}
return nil, nil
return nil
}
// PrimaryZone will return the first non-reverse zone being handled by this middleware
@@ -122,6 +136,7 @@ func (k *Kubernetes) PrimaryZone() string {
// Reverse implements the ServiceBackend interface.
func (k *Kubernetes) Reverse(state request.Request, exact bool, opt middleware.Options) ([]msg.Service, []msg.Service, error) {
ip := dnsutil.ExtractAddressFromReverse(state.Name())
if ip == "" {
return nil, nil, nil
@@ -131,8 +146,8 @@ func (k *Kubernetes) Reverse(state request.Request, exact bool, opt middleware.O
return records, nil, nil
}
func (k *Kubernetes) isRequestInReverseRange(state request.Request) bool {
ip := dnsutil.ExtractAddressFromReverse(state.Name())
func (k *Kubernetes) isRequestInReverseRange(name string) bool {
ip := dnsutil.ExtractAddressFromReverse(name)
for _, c := range k.ReverseCidrs {
if c.Contains(net.ParseIP(ip)) {
return true
@@ -186,7 +201,8 @@ func (k *Kubernetes) getClientConfig() (*rest.Config, error) {
}
// InitKubeCache initializes a new Kubernetes cache.
func (k *Kubernetes) InitKubeCache() error {
func (k *Kubernetes) InitKubeCache() (err error) {
config, err := k.getClientConfig()
if err != nil {
@@ -216,12 +232,11 @@ func (k *Kubernetes) InitKubeCache() error {
return err
}
func (k *Kubernetes) parseRequest(lowerCasedName, qtype string) (r recordRequest, err error) {
func (k *Kubernetes) parseRequest(lowerCasedName string, qtype uint16) (r recordRequest, err error) {
// 3 Possible cases
// SRV Request: _port._protocol.service.namespace.type.zone
// A Request (endpoint): endpoint.service.namespace.type.zone
// A Request (service): service.namespace.type.zone
// separate zone from rest of lowerCasedName
var segs []string
for _, z := range k.Zones {
@@ -234,11 +249,19 @@ func (k *Kubernetes) parseRequest(lowerCasedName, qtype string) (r recordRequest
}
}
if r.zone == "" {
return r, errors.New("zone not found")
return r, errZoneNotFound
}
if qtype == dns.TypeNS {
return r, nil
}
if qtype == dns.TypeA && isDefaultNS(lowerCasedName, r) {
return r, nil
}
offset := 0
if qtype == "SRV" {
if qtype == dns.TypeSRV {
if len(segs) != 5 {
return r, errInvalidRequest
}
@@ -268,7 +291,7 @@ func (k *Kubernetes) parseRequest(lowerCasedName, qtype string) (r recordRequest
}
offset = 2
}
if qtype == "A" && len(segs) == 4 {
if qtype == dns.TypeA && len(segs) == 4 {
// This is an endpoint A record request. Get first element as endpoint.
r.endpoint = segs[0]
offset = 1
@@ -282,7 +305,7 @@ func (k *Kubernetes) parseRequest(lowerCasedName, qtype string) (r recordRequest
return r, nil
}
if len(segs) == 1 && qtype == "TXT" {
if len(segs) == 1 && qtype == dns.TypeTXT {
r.typeName = segs[0]
return r, nil
}
@@ -328,37 +351,35 @@ func endpointHostname(addr api.EndpointAddress) string {
return ""
}
func (k *Kubernetes) getRecordsForK8sItems(services []service, pods []pod, zone string) []msg.Service {
var records []msg.Service
func (k *Kubernetes) getRecordsForK8sItems(services []service, pods []pod, zone string) (records []msg.Service) {
zonePath := msg.Path(zone, "coredns")
for _, svc := range services {
key := svc.name + "." + svc.namespace + ".svc." + zone
if svc.addr == api.ClusterIPNone {
// This is a headless service, create records for each endpoint
for _, ep := range svc.endpoints {
ephostname := endpointHostname(ep.addr)
s := msg.Service{
Key: msg.Path(strings.ToLower(ephostname+"."+key), "coredns"),
Host: ep.addr.IP, Port: int(ep.port.Port),
Key: strings.Join([]string{zonePath, "svc", svc.namespace, svc.name, endpointHostname(ep.addr)}, "/"),
Host: ep.addr.IP,
Port: int(ep.port.Port),
}
records = append(records, s)
}
} else {
// Create records for each exposed port...
for _, p := range svc.ports {
s := msg.Service{Key: msg.Path(strings.ToLower(key), "coredns"), Host: svc.addr, Port: int(p.Port)}
s := msg.Service{
Key: strings.Join([]string{zonePath, "svc", svc.namespace, svc.name}, "/"),
Host: svc.addr,
Port: int(p.Port)}
records = append(records, s)
}
}
}
for _, p := range pods {
key := p.name + "." + p.namespace + ".pod." + zone
s := msg.Service{
Key: msg.Path(strings.ToLower(key), "coredns"),
Key: strings.Join([]string{zonePath, "pod", p.namespace, p.name}, "/"),
Host: p.addr,
}
records = append(records, s)
@@ -376,7 +397,7 @@ func ipFromPodName(podname string) string {
func (k *Kubernetes) findPods(namespace, podname string) (pods []pod, err error) {
if k.PodMode == PodModeDisabled {
return pods, errors.New("pod records disabled")
return pods, errPodsDisabled
}
var ip string
@@ -393,16 +414,13 @@ func (k *Kubernetes) findPods(namespace, podname string) (pods []pod, err error)
}
// PodModeVerified
objList, err := k.APIConn.podLister.Indexer.ByIndex(podIPIndex, ip)
if err != nil {
return nil, err
}
objList := k.APIConn.PodIndex(ip)
nsWildcard := symbolContainsWildcard(namespace)
for _, o := range objList {
p, ok := o.(*api.Pod)
if !ok {
return nil, errors.New("expected type *api.Pod")
return nil, errApiBadPodType
}
// If namespace has a wildcard, filter results against Corefile namespace list.
if nsWildcard && (len(k.Namespaces) > 0) && (!dnsstrings.StringInSlice(p.Namespace, k.Namespaces)) {
@@ -461,10 +479,8 @@ func (k *Kubernetes) findServices(r recordRequest) ([]service, error) {
continue
}
// Headless service
endpointsList, err := k.APIConn.epLister.List()
if err != nil {
continue
}
endpointsList := k.APIConn.EndpointsList()
for _, ep := range endpointsList.Items {
if ep.ObjectMeta.Name != svc.Name || ep.ObjectMeta.Namespace != svc.Namespace {
continue
@@ -500,24 +516,19 @@ func symbolMatches(queryString, candidateString string, wildcard bool) bool {
// If a service cluster ip does not match, it checks all endpoints
func (k *Kubernetes) getServiceRecordForIP(ip, name string) []msg.Service {
// First check services with cluster ips
svcList, err := k.APIConn.svcLister.List(labels.Everything())
if err != nil {
return nil
}
svcList := k.APIConn.ServiceList()
for _, service := range svcList {
if (len(k.Namespaces) > 0) && !dnsstrings.StringInSlice(service.Namespace, k.Namespaces) {
continue
}
if service.Spec.ClusterIP == ip {
domain := service.Name + "." + service.Namespace + ".svc." + k.PrimaryZone()
domain := strings.Join([]string{service.Name, service.Namespace, "svc", k.PrimaryZone()}, ".")
return []msg.Service{{Host: domain}}
}
}
// If no cluster ips match, search endpoints
epList, err := k.APIConn.epLister.List()
if err != nil {
return nil
}
epList := k.APIConn.EndpointsList()
for _, ep := range epList.Items {
if (len(k.Namespaces) > 0) && !dnsstrings.StringInSlice(ep.ObjectMeta.Namespace, k.Namespaces) {
continue
@@ -525,7 +536,7 @@ func (k *Kubernetes) getServiceRecordForIP(ip, name string) []msg.Service {
for _, eps := range ep.Subsets {
for _, addr := range eps.Addresses {
if addr.IP == ip {
domain := endpointHostname(addr) + "." + ep.ObjectMeta.Name + "." + ep.ObjectMeta.Namespace + ".svc." + k.PrimaryZone()
domain := strings.Join([]string{endpointHostname(addr), ep.ObjectMeta.Name, ep.ObjectMeta.Namespace, "svc", k.PrimaryZone()}, ".")
return []msg.Service{{Host: domain}}
}
}