Switch over to k8s notification API (#202)

* Merge notification code by @aledbf and update for recent changes.
* Fix travis environment to correctly build with k8s.io and forked repositories.
* Refactored kubernetes Corefile parser
* Added lots of Corefile parsing tests
This commit is contained in:
Michael Richmond
2016-08-05 18:19:51 -07:00
committed by GitHub
parent 604d2a3730
commit 6d90b745e0
13 changed files with 609 additions and 237 deletions

View File

@@ -25,15 +25,25 @@ before_install:
- env
before_script:
# Fix repo pathname for golang imports.
# When building in a forked repo the import path will be incorrect.
# Fix is to detect this case and create a symbolic link for the real import name.
# Note: This assumes that both the upstream "real" repo and the fork are hosted
# at the same domain. (eg. github.com)
- ( export UPSTREAM="miekg/coredns" && export REPONAME=`pwd | rev | cut -d "/" -f 1-2 | rev` && test "$REPO" != "$UPSTREAM" && mkdir -p ../../`echo $UPSTREAM | cut -d "/" -f 1` && ln -s ../$REPONAME ../../$UPSTREAM )
# Download etcd, unpack and launch
- curl -L https://github.com/coreos/etcd/releases/download/v2.3.1/etcd-v2.3.1-linux-amd64.tar.gz -o etcd-v2.3.1-linux-amd64.tar.gz
- tar xzvf etcd-v2.3.1-linux-amd64.tar.gz
- ./etcd-v2.3.1-linux-amd64/etcd &
- go get
- go get github.com/coreos/go-etcd/etcd
# If docker is available, pull the kubernetes hyperkube image down and launch kubernetes.
- if which docker &>/dev/null ; then docker pull gcr.io/google_containers/hyperkube-amd64:v1.2.4 ; docker ps -a ; fi
- pwd
- if which docker &>/dev/null ; then ./middleware/kubernetes/test/00_run_k8s.sh && ./middleware/kubernetes/test/10_setup_kubectl.sh && ./middleware/kubernetes/test/20_setup_k8s_services.sh ; docker ps -a ; fi
# Get golang dependencies, and build coredns binary
- go get -v -d
- go get github.com/coreos/go-etcd/etcd
- go build -v -ldflags="-s -w"
script:
- go test -tags etcd -race -bench=. ./...
# Run kubernetes integration tests only if kubectl is available. i.e. If kubernetes was launched
- ./middleware/kubernetes/test/kubectl version && go test -tags k8s -race -bench=. -run 'TestK8sIntegration' ./test

View File

@@ -4,13 +4,16 @@ BUILD_VERBOSE := -v
TEST_VERBOSE :=
TEST_VERBOSE := -v
DOCKER_IMAGE_NAME := $$USER/coredns
all:
go build $(BUILD_VERBOSE)
go build $(BUILD_VERBOSE) -ldflags="-s -w"
.PHONY: docker
docker:
GOOS=linux go build -a -tags netgo -installsuffix netgo
docker build -t $$USER/coredns .
docker: all
GOOS=linux go build -a -tags netgo -installsuffix netgo -ldflags="-s -w"
docker build -t $(DOCKER_IMAGE_NAME) .
.PHONY: deps
deps:
@@ -22,8 +25,15 @@ test:
.PHONY: testk8s
testk8s:
# With -args --v=100 the k8s API response data will be printed in the log:
#go test $(TEST_VERBOSE) -tags=k8s -run 'TestK8sIntegration' ./test -args --v=100
# Without the k8s API response data:
go test $(TEST_VERBOSE) -tags=k8s -run 'TestK8sIntegration' ./test
.PHONY: testk8s-setup
testk8s-setup:
go test -v ./core/setup -run TestKubernetes
.PHONY: clean
clean:
go clean

View File

@@ -1,32 +1,34 @@
package setup
import (
"errors"
"log"
"strings"
"time"
"github.com/miekg/coredns/middleware"
"github.com/miekg/coredns/middleware/kubernetes"
k8sc "github.com/miekg/coredns/middleware/kubernetes/k8sclient"
"github.com/miekg/coredns/middleware/kubernetes/nametemplate"
"github.com/miekg/coredns/middleware/proxy"
)
const (
defaultK8sEndpoint = "http://localhost:8080"
defaultNameTemplate = "{service}.{namespace}.{zone}"
defaultResyncPeriod = 5 * time.Minute
)
// Kubernetes sets up the kubernetes middleware.
func Kubernetes(c *Controller) (middleware.Middleware, error) {
log.Printf("[debug] controller %v\n", c)
// TODO: Determine if subzone support required
kubernetes, err := kubernetesParse(c)
if err != nil {
return nil, err
}
err = kubernetes.StartKubeCache()
if err != nil {
return nil, err
}
log.Printf("[debug] after parse and start KubeCache, APIconn is: %v", kubernetes.APIConn)
return func(next middleware.Handler) middleware.Handler {
kubernetes.Next = next
return kubernetes
@@ -34,78 +36,73 @@ func Kubernetes(c *Controller) (middleware.Middleware, error) {
}
func kubernetesParse(c *Controller) (kubernetes.Kubernetes, error) {
k8s := kubernetes.Kubernetes{
Proxy: proxy.New([]string{}),
}
var (
endpoints = []string{defaultK8sEndpoint}
template = defaultNameTemplate
namespaces = []string{}
)
var err error
template := defaultNameTemplate
k8s.APIConn = k8sc.NewK8sConnector(endpoints[0])
k8s := kubernetes.Kubernetes{
ResyncPeriod: defaultResyncPeriod,
}
k8s.NameTemplate = new(nametemplate.NameTemplate)
k8s.NameTemplate.SetTemplate(template)
// TODO: expose resync period in Corefile
for c.Next() {
if c.Val() == "kubernetes" {
zones := c.RemainingArgs()
log.Printf("[debug] Zones: %v", zones)
if len(zones) == 0 {
k8s.Zones = c.ServerBlockHosts
log.Printf("[debug] Zones(from ServerBlockHosts): %v", zones)
} else {
// Normalize requested zones
k8s.Zones = kubernetes.NormalizeZoneList(zones)
}
// TODO: clean this parsing up
middleware.Zones(k8s.Zones).FullyQualify()
if k8s.Zones == nil || len(k8s.Zones) < 1 {
err = errors.New("Zone name must be provided for kubernetes middleware.")
log.Printf("[debug] %v\n", err)
return kubernetes.Kubernetes{}, err
}
log.Printf("[debug] c data: %v\n", c)
if c.NextBlock() {
// TODO(miek): 2 switches?
switch c.Val() {
case "endpoint":
args := c.RemainingArgs()
if len(args) == 0 {
return kubernetes.Kubernetes{}, c.ArgErr()
}
endpoints = args
k8s.APIConn = k8sc.NewK8sConnector(endpoints[0])
case "namespaces":
args := c.RemainingArgs()
if len(args) == 0 {
return kubernetes.Kubernetes{}, c.ArgErr()
}
namespaces = args
k8s.Namespaces = append(k8s.Namespaces, namespaces...)
}
for c.Next() {
for c.NextBlock() {
switch c.Val() {
case "template":
args := c.RemainingArgs()
if len(args) == 0 {
return kubernetes.Kubernetes{}, c.ArgErr()
}
template = strings.Join(args, "")
err := k8s.NameTemplate.SetTemplate(template)
if len(args) != 0 {
template := strings.Join(args, "")
err = k8s.NameTemplate.SetTemplate(template)
if err != nil {
return kubernetes.Kubernetes{}, err
}
case "namespaces":
args := c.RemainingArgs()
if len(args) == 0 {
} else {
log.Printf("[debug] 'template' keyword provided without any template value.")
return kubernetes.Kubernetes{}, c.ArgErr()
}
namespaces = args
k8s.Namespaces = append(k8s.Namespaces, namespaces...)
case "namespaces":
args := c.RemainingArgs()
if len(args) != 0 {
k8s.Namespaces = append(k8s.Namespaces, args...)
} else {
log.Printf("[debug] 'namespaces' keyword provided without any namespace values.")
return kubernetes.Kubernetes{}, c.ArgErr()
}
case "endpoint":
args := c.RemainingArgs()
if len(args) != 0 {
k8s.APIEndpoint = args[0]
} else {
log.Printf("[debug] 'endpoint' keyword provided without any endpoint url value.")
return kubernetes.Kubernetes{}, c.ArgErr()
}
}
}
return k8s, nil
}
}
return kubernetes.Kubernetes{}, nil
err = errors.New("Kubernetes setup called without keyword 'kubernetes' in Corefile")
log.Printf("[ERROR] %v\n", err)
return kubernetes.Kubernetes{}, err
}

View File

@@ -5,37 +5,19 @@ import (
"testing"
)
/*
kubernetes coredns.local {
# Use url for k8s API endpoint
endpoint http://localhost:8080
# Assemble k8s record names with the template
template {service}.{namespace}.{zone}
# Only expose the k8s namespace "demo"
#namespaces demo
}
*/
func TestKubernetesParse(t *testing.T) {
tests := []struct {
description string
input string
shouldErr bool
expectedErrContent string // substring from the expected error. Empty for positive cases.
expectedZoneCount int // expected count of defined zones. '-1' for negative cases.
expectedZoneCount int // expected count of defined zones.
expectedNTValid bool // NameTemplate to be initialized and valid
expectedNSCount int // expected count of namespaces. '-1' for negative cases.
expectedNSCount int // expected count of namespaces.
}{
// positive
// TODO: not specifiying a zone maybe should error out.
{
`kubernetes`,
false,
"",
0,
true,
0,
},
{
"kubernetes keyword with one zone",
`kubernetes coredns.local`,
false,
"",
@@ -44,6 +26,7 @@ func TestKubernetesParse(t *testing.T) {
0,
},
{
"kubernetes keyword with multiple zones",
`kubernetes coredns.local test.local`,
false,
"",
@@ -52,6 +35,17 @@ func TestKubernetesParse(t *testing.T) {
0,
},
{
"kubernetes keyword with zone and empty braces",
`kubernetes coredns.local {
}`,
false,
"",
1,
true,
0,
},
{
"endpoint keyword with url",
`kubernetes coredns.local {
endpoint http://localhost:9090
}`,
@@ -62,6 +56,7 @@ func TestKubernetesParse(t *testing.T) {
0,
},
{
"template keyword with valid template",
`kubernetes coredns.local {
template {service}.{namespace}.{zone}
}`,
@@ -72,6 +67,7 @@ func TestKubernetesParse(t *testing.T) {
0,
},
{
"namespaces keyword with one namespace",
`kubernetes coredns.local {
namespaces demo
}`,
@@ -82,6 +78,7 @@ func TestKubernetesParse(t *testing.T) {
1,
},
{
"namespaces keyword with multiple namespaces",
`kubernetes coredns.local {
namespaces demo test
}`,
@@ -91,9 +88,40 @@ func TestKubernetesParse(t *testing.T) {
true,
2,
},
{
"fully specified valid config",
`kubernetes coredns.local test.local {
endpoint http://localhost:8080
template {service}.{namespace}.{zone}
namespaces demo test
}`,
false,
"",
2,
true,
2,
},
// negative
{
"no kubernetes keyword",
"",
true,
"Kubernetes setup called without keyword 'kubernetes' in Corefile",
-1,
false,
-1,
},
{
"kubernetes keyword without a zone",
`kubernetes`,
true,
"Zone name must be provided for kubernetes middleware",
-1,
true,
0,
},
{
"endpoint keyword without an endpoint value",
`kubernetes coredns.local {
endpoint
}`,
@@ -103,56 +131,56 @@ func TestKubernetesParse(t *testing.T) {
true,
-1,
},
// No template provided for template line.
{
"template keyword without a template value",
`kubernetes coredns.local {
template
}`,
true,
"",
"Wrong argument count or unexpected line ending after 'template'",
-1,
false,
-1,
0,
},
// Invalid template provided
{
"template keyword with an invalid template value",
`kubernetes coredns.local {
template {namespace}.{zone}
}`,
true,
"",
"Record name template does not pass NameTemplate validation",
-1,
false,
-1,
0,
},
/*
// No valid provided for namespaces
{
"namespace keyword without a namespace value",
`kubernetes coredns.local {
namespaces
}`,
true,
"",
"Parse error: Wrong argument count or unexpected line ending after 'namespaces'",
-1,
true,
-1,
},
*/
}
t.Logf("Parser test cases count: %v", len(tests))
for i, test := range tests {
c := NewTestController(test.input)
k8sController, err := kubernetesParse(c)
t.Logf("i: %v\n", i)
t.Logf("controller: %v\n", k8sController)
t.Logf("setup test: %2v -- %v\n", i, test.description)
//t.Logf("controller: %v\n", k8sController)
if test.shouldErr && err == nil {
t.Errorf("Test %d: Expected error, but found one for input '%s'. Error was: '%v'", i, test.input, err)
t.Errorf("Test %d: Expected error, but did not find error for input '%s'. Error was: '%v'", i, test.input, err)
}
if err != nil {
if !test.shouldErr {
t.Errorf("Test %d: Expected no error but found one for input %s. Error was: %v", i, test.input, err)
continue
}
if test.shouldErr && (len(test.expectedErrContent) < 1) {
@@ -160,14 +188,13 @@ func TestKubernetesParse(t *testing.T) {
}
if test.shouldErr && (test.expectedZoneCount >= 0) {
t.Fatalf("Test %d: Test marked as expecting an error, but provides value for expectedZoneCount!=-1 for input '%s'. Error was: '%v'", i, test.input, err)
t.Errorf("Test %d: Test marked as expecting an error, but provides value for expectedZoneCount!=-1 for input '%s'. Error was: '%v'", i, test.input, err)
}
if !strings.Contains(err.Error(), test.expectedErrContent) {
t.Errorf("Test %d: Expected error to contain: %v, found error: %v, input: %s", i, test.expectedErrContent, err, test.input)
}
return
continue
}
// No error was raised, so validate initialization of k8sController
@@ -191,7 +218,8 @@ func TestKubernetesParse(t *testing.T) {
foundNSCount := len(k8sController.Namespaces)
if foundNSCount != test.expectedNSCount {
t.Errorf("Test %d: Expected kubernetes controller to be initialized with %d namespaces. Instead found %d namespaces: '%v' for input '%s'", i, test.expectedNSCount, foundNSCount, k8sController.Namespaces, test.input)
}
t.Logf("k8sController is: %v", k8sController)
t.Logf("k8sController.Namespaces is: %v", k8sController.Namespaces)
}
}
}

70
kubernetes-rc.yaml Normal file
View File

@@ -0,0 +1,70 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: coredns-configmap
namespace: kube-system
data:
corefile: |
.:53 {
kubernetes coredns.local {
}
#cache 160 coredns.local
errors stdout
log stdout
}
---
apiVersion: v1
kind: ReplicationController
metadata:
labels:
k8s-app: kube-dns
kubernetes.io/cluster-service: "true"
version: v20
name: kube-dns-v20
namespace: kube-system
spec:
replicas: 1
selector:
k8s-app: kube-dns
version: v20
template:
metadata:
labels:
k8s-app: kube-dns
kubernetes.io/cluster-service: "true"
version: v20
spec:
containers:
- args:
- -conf=/cfg/corefile
image: aledbf/kube-coredns:0.6
imagePullPolicy: IfNotPresent
name: kube-dns
ports:
- containerPort: 53
name: dns
protocol: UDP
- containerPort: 53
name: dns-tcp
protocol: TCP
volumeMounts:
- name: config-volume
mountPath: /cfg
- args:
- -cmd=nslookup kubernetes.default.svc.cluster.local localhost >/dev/null
- -port=8080
image: gcr.io/google_containers/exechealthz:1.0
imagePullPolicy: IfNotPresent
name: healthz
ports:
- containerPort: 8080
protocol: TCP
resources:
limits:
cpu: 10m
memory: 20Mi
dnsPolicy: Default
volumes:
- name: config-volume
configMap:
name: coredns-configmap

View File

@@ -276,11 +276,13 @@ TBD:
* Update kubernetes middleware documentation to describe running CoreDNS as a
SkyDNS replacement. (Include descriptions of different ways to pass CoreFile
to coredns command.)
* Remove dependency on healthz for health checking in
`kubernetes-rc.yaml` file.
* Expose load-balancer IP addresses.
* Calculate SRV priority based on number of instances running.
(See SkyDNS README.md)
* Functional work
* (done) ~~Implement wildcard-based lookup. Minimally support `*`, consider `?` as well.~~
* (done. '?' not supported yet) ~~Implement wildcard-based lookup. Minimally support `*`, consider `?` as well.~~
* (done) ~~Note from Miek on PR 181: "SkyDNS also supports the word `any`.~~
* Implement SkyDNS-style synthetic zones such as "svc" to group k8s objects. (This
should be optional behavior.) Also look at "pod" synthetic zones.
@@ -303,17 +305,14 @@ TBD:
* Performance
* Improve lookup to reduce size of query result obtained from k8s API.
(namespace-based?, other ideas?)
* Caching of k8s API dataset.
* Caching/notification of k8s API dataset. (See aledbf fork for
implementation ideas.)
* DNS response caching is good, but we should also cache at the http query
level as well. (Take a look at https://github.com/patrickmn/go-cache as
a potential expiring cache implementation for the http API queries.)
* Push notifications from k8s for data changes rather than pull via API?
* Additional features:
* Implement namespace filtering to different zones. That is, zone "a.b"
publishes services from namespace "foo", and zone "x.y" publishes services
from namespaces "bar" and "baz". (Basic version implemented -- need test cases.)
* Reverse IN-ADDR entries for services. (Is there any value in supporting
reverse lookup records?
reverse lookup records?)
* How to support label specification in Corefile to allow use of labels to
indicate zone? (Is this even useful?) For example, the following
configuration exposes all services labeled for the "staging" environment
@@ -334,11 +333,14 @@ TBD:
flattening to lower case and mapping of non-DNS characters to DNS characters
in a standard way.)
* Expose arbitrary kubernetes repository data as TXT records?
* Support custom user-provided templates for k8s names. A string provided
* (done) ~~Support custom user-provided templates for k8s names. A string provided
in the middleware configuration like `{service}.{namespace}.{type}` defines
the template of how to construct record names for the zone. This example
would produce `myservice.mynamespace.svc.cluster.local`. (Basic template
implemented. Need to slice zone out of current template implementation.)
implemented. Need to slice zone out of current template implementation.)~~
* (done) ~~Implement namespace filtering to different zones. That is, zone "a.b"
publishes services from namespace "foo", and zone "x.y" publishes services
from namespaces "bar" and "baz". (Basic version implemented -- need test cases.)~~
* DNS Correctness
* Do we need to generate synthetic zone records for namespaces?
* Do we need to generate synthetic zone records for the skydns synthetic zones?
@@ -347,10 +349,10 @@ TBD:
using the `cache` directive. Tested working using 20s cache timeout
and A-record queries. Automate testing with cache in place.
* Automate CoreDNS performance tests. Initially for zone files, and for
pre-loaded k8s API cache.
pre-loaded k8s API cache. With and without CoreDNS response caching.
* Try to get rid of kubernetes launch scripts by moving operations into
.travis.yml file.
* ~~Implement test cases for http data parsing using dependency injection
for http get operations.~~
* ~~Automate integration testing with kubernetes. (k8s launch and service start-up
automation is in middleware/kubernetes/tests)~~
* ~~Automate integration testing with kubernetes. (k8s launch and service
start-up automation is in middleware/kubernetes/tests)~~

View File

@@ -0,0 +1,195 @@
package kubernetes
import (
"fmt"
"log"
"sync"
"time"
"github.com/miekg/coredns/middleware/kubernetes/util"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
)
var (
namespace = api.NamespaceAll
)
type dnsController struct {
client *client.Client
endpController *framework.Controller
svcController *framework.Controller
nsController *framework.Controller
svcLister cache.StoreToServiceLister
endpLister cache.StoreToEndpointsLister
nsLister util.StoreToNamespaceLister
// stopLock is used to enforce only a single call to Stop is active.
// Needed because we allow stopping through an http endpoint and
// allowing concurrent stoppers leads to stack traces.
stopLock sync.Mutex
shutdown bool
stopCh chan struct{}
}
// newDNSController creates a controller for coredns
func newdnsController(kubeClient *client.Client, resyncPeriod time.Duration) *dnsController {
dns := dnsController{
client: kubeClient,
stopCh: make(chan struct{}),
}
dns.endpLister.Store, dns.endpController = framework.NewInformer(
&cache.ListWatch{
ListFunc: endpointsListFunc(dns.client, namespace),
WatchFunc: endpointsWatchFunc(dns.client, namespace),
},
&api.Endpoints{}, resyncPeriod, framework.ResourceEventHandlerFuncs{})
dns.svcLister.Store, dns.svcController = framework.NewInformer(
&cache.ListWatch{
ListFunc: serviceListFunc(dns.client, namespace),
WatchFunc: serviceWatchFunc(dns.client, namespace),
},
&api.Service{}, resyncPeriod, framework.ResourceEventHandlerFuncs{})
dns.nsLister.Store, dns.nsController = framework.NewInformer(
&cache.ListWatch{
ListFunc: namespaceListFunc(dns.client),
WatchFunc: namespaceWatchFunc(dns.client),
},
&api.Namespace{}, resyncPeriod, framework.ResourceEventHandlerFuncs{})
return &dns
}
func serviceListFunc(c *client.Client, ns string) func(api.ListOptions) (runtime.Object, error) {
return func(opts api.ListOptions) (runtime.Object, error) {
return c.Services(ns).List(opts)
}
}
func serviceWatchFunc(c *client.Client, ns string) func(options api.ListOptions) (watch.Interface, error) {
return func(options api.ListOptions) (watch.Interface, error) {
return c.Services(ns).Watch(options)
}
}
func endpointsListFunc(c *client.Client, ns string) func(api.ListOptions) (runtime.Object, error) {
return func(opts api.ListOptions) (runtime.Object, error) {
return c.Endpoints(ns).List(opts)
}
}
func endpointsWatchFunc(c *client.Client, ns string) func(options api.ListOptions) (watch.Interface, error) {
return func(options api.ListOptions) (watch.Interface, error) {
return c.Endpoints(ns).Watch(options)
}
}
func namespaceListFunc(c *client.Client) func(api.ListOptions) (runtime.Object, error) {
return func(opts api.ListOptions) (runtime.Object, error) {
return c.Namespaces().List(opts)
}
}
func namespaceWatchFunc(c *client.Client) func(options api.ListOptions) (watch.Interface, error) {
return func(options api.ListOptions) (watch.Interface, error) {
return c.Namespaces().Watch(options)
}
}
func (dns *dnsController) controllersInSync() bool {
return dns.svcController.HasSynced() && dns.endpController.HasSynced()
}
// Stop stops the controller.
func (dns *dnsController) Stop() error {
dns.stopLock.Lock()
defer dns.stopLock.Unlock()
// Only try draining the workqueue if we haven't already.
if !dns.shutdown {
close(dns.stopCh)
log.Println("shutting down controller queues")
dns.shutdown = true
return nil
}
return fmt.Errorf("shutdown already in progress")
}
// Run starts the controller.
func (dns *dnsController) Run() {
log.Println("[debug] starting coredns controller")
go dns.endpController.Run(dns.stopCh)
go dns.svcController.Run(dns.stopCh)
go dns.nsController.Run(dns.stopCh)
<-dns.stopCh
log.Println("[debug] shutting down coredns controller")
}
func (dns *dnsController) GetNamespaceList() *api.NamespaceList {
nsList, err := dns.nsLister.List()
if err != nil {
return &api.NamespaceList{}
}
return &nsList
}
func (dns *dnsController) GetServiceList() *api.ServiceList {
log.Printf("[debug] here in GetServiceList")
svcList, err := dns.svcLister.List()
if err != nil {
return &api.ServiceList{}
}
return &svcList
}
// GetServicesByNamespace returns a map of
// namespacename :: [ kubernetesService ]
func (dns *dnsController) GetServicesByNamespace() map[string][]api.Service {
k8sServiceList := dns.GetServiceList()
if k8sServiceList == nil {
return nil
}
items := make(map[string][]api.Service, len(k8sServiceList.Items))
for _, i := range k8sServiceList.Items {
namespace := i.Namespace
items[namespace] = append(items[namespace], i)
}
return items
}
// GetServiceInNamespace returns the Service that matches
// servicename in the namespace
func (dns *dnsController) GetServiceInNamespace(namespace string, servicename string) *api.Service {
svcKey := fmt.Sprintf("%v/%v", namespace, servicename)
svcObj, svcExists, err := dns.svcLister.Store.GetByKey(svcKey)
if err != nil {
log.Printf("error getting service %v from the cache: %v\n", svcKey, err)
return nil
}
if !svcExists {
log.Printf("service %v does not exists\n", svcKey)
return nil
}
return svcObj.(*api.Service)
}

View File

@@ -3,6 +3,7 @@ package kubernetes
import (
"fmt"
"log"
"strings"
"github.com/miekg/coredns/middleware"
@@ -18,6 +19,26 @@ func (k Kubernetes) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.M
return dns.RcodeServerFailure, fmt.Errorf("can only deal with ClassINET")
}
m := new(dns.Msg)
m.SetReply(r)
m.Authoritative, m.RecursionAvailable, m.Compress = true, true, true
// TODO: find an alternative to this block
if strings.HasSuffix(state.Name(), arpaSuffix) {
ip, _ := extractIP(state.Name())
records := k.getServiceRecordForIP(ip, state.Name())
if len(records) > 0 {
srvPTR := &records[0]
m.Answer = append(m.Answer, srvPTR.NewPTR(state.QName(), ip))
m = dedup(m)
state.SizeAndDo(m)
m, _ = state.Scrub(m)
w.WriteMsg(m)
return dns.RcodeSuccess, nil
}
}
// Check that query matches one of the zones served by this middleware,
// otherwise delegate to the next in the pipeline.
zone := middleware.Zones(k.Zones).Matches(state.Name())
@@ -28,10 +49,6 @@ func (k Kubernetes) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.M
return k.Next.ServeDNS(ctx, w, r)
}
m := new(dns.Msg)
m.SetReply(r)
m.Authoritative, m.RecursionAvailable, m.Compress = true, true, true
var (
records, extra []dns.RR
err error

View File

@@ -4,33 +4,70 @@ package kubernetes
import (
"errors"
"log"
"strings"
"time"
"github.com/miekg/coredns/middleware"
k8sc "github.com/miekg/coredns/middleware/kubernetes/k8sclient"
"github.com/miekg/coredns/middleware/kubernetes/msg"
"github.com/miekg/coredns/middleware/kubernetes/nametemplate"
"github.com/miekg/coredns/middleware/kubernetes/util"
"github.com/miekg/coredns/middleware/proxy"
"github.com/miekg/dns"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
)
const (
defaultResyncPeriod = 5 * time.Minute
)
type Kubernetes struct {
Next middleware.Handler
Zones []string
Proxy proxy.Proxy // Proxy for looking up names during the resolution process
APIConn *k8sc.K8sConnector
APIEndpoint string
APIConn *dnsController
ResyncPeriod time.Duration
NameTemplate *nametemplate.NameTemplate
Namespaces []string
}
func (g *Kubernetes) StartKubeCache() error {
// For a custom api server or running outside a k8s cluster
// set URL in env.KUBERNETES_MASTER
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
overrides := &clientcmd.ConfigOverrides{}
if len(g.APIEndpoint) > 0 {
overrides.ClusterInfo = clientcmdapi.Cluster{Server: g.APIEndpoint}
}
clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, overrides)
config, err := clientConfig.ClientConfig()
if err != nil {
log.Printf("[debug] error connecting to the client: %v", err)
return err
}
kubeClient, err := unversioned.New(config)
if err != nil {
log.Printf("[ERROR] Failed to create kubernetes notification controller: %v", err)
return err
}
g.APIConn = newdnsController(kubeClient, g.ResyncPeriod)
go g.APIConn.Run()
return err
}
// getZoneForName returns the zone string that matches the name and a
// list of the DNS labels from name that are within the zone.
// For example, if "coredns.local" is a zone configured for the
// Kubernetes middleware, then getZoneForName("a.b.coredns.local")
// will return ("coredns.local", ["a", "b"]).
func (g Kubernetes) getZoneForName(name string) (string, []string) {
func (g *Kubernetes) getZoneForName(name string) (string, []string) {
var zone string
var serviceSegments []string
@@ -51,7 +88,14 @@ func (g Kubernetes) getZoneForName(name string) (string, []string) {
// If exact is true, it will lookup just
// this name. This is used when find matches when completing SRV lookups
// for instance.
func (g Kubernetes) Records(name string, exact bool) ([]msg.Service, error) {
func (g *Kubernetes) Records(name string, exact bool) ([]msg.Service, error) {
// TODO: refector this.
// Right now GetNamespaceFromSegmentArray do not supports PRE queries
if strings.HasSuffix(name, arpaSuffix) {
ip, _ := extractIP(name)
records := g.getServiceRecordForIP(ip, name)
return records, nil
}
var (
serviceName string
namespace string
@@ -99,6 +143,7 @@ func (g Kubernetes) Records(name string, exact bool) ([]msg.Service, error) {
return nil, nil
}
log.Printf("before g.Get(namespace, nsWildcard, serviceName, serviceWildcard): %v %v %v %v", namespace, nsWildcard, serviceName, serviceWildcard)
k8sItems, err := g.Get(namespace, nsWildcard, serviceName, serviceWildcard)
log.Printf("[debug] k8s items: %v\n", k8sItems)
if err != nil {
@@ -115,7 +160,7 @@ func (g Kubernetes) Records(name string, exact bool) ([]msg.Service, error) {
}
// TODO: assemble name from parts found in k8s data based on name template rather than reusing query string
func (g Kubernetes) getRecordsForServiceItems(serviceItems []k8sc.ServiceItem, values nametemplate.NameValues) []msg.Service {
func (g *Kubernetes) getRecordsForServiceItems(serviceItems []api.Service, values nametemplate.NameValues) []msg.Service {
var records []msg.Service
for _, item := range serviceItems {
@@ -131,7 +176,7 @@ func (g Kubernetes) getRecordsForServiceItems(serviceItems []k8sc.ServiceItem, v
// Create records for each exposed port...
for _, p := range item.Spec.Ports {
log.Printf("[debug] port: %v\n", p.Port)
s := msg.Service{Host: clusterIP, Port: p.Port}
s := msg.Service{Host: clusterIP, Port: int(p.Port)}
records = append(records, s)
}
}
@@ -141,22 +186,24 @@ func (g Kubernetes) getRecordsForServiceItems(serviceItems []k8sc.ServiceItem, v
}
// Get performs the call to the Kubernetes http API.
func (g Kubernetes) Get(namespace string, nsWildcard bool, servicename string, serviceWildcard bool) ([]k8sc.ServiceItem, error) {
serviceList, err := g.APIConn.GetServiceList()
func (g *Kubernetes) Get(namespace string, nsWildcard bool, servicename string, serviceWildcard bool) ([]api.Service, error) {
serviceList := g.APIConn.GetServiceList()
/* TODO: Remove?
if err != nil {
log.Printf("[ERROR] Getting service list produced error: %v", err)
return nil, err
}
*/
var resultItems []k8sc.ServiceItem
var resultItems []api.Service
for _, item := range serviceList.Items {
if symbolMatches(namespace, item.Metadata.Namespace, nsWildcard) && symbolMatches(servicename, item.Metadata.Name, serviceWildcard) {
if symbolMatches(namespace, item.Namespace, nsWildcard) && symbolMatches(servicename, item.Name, serviceWildcard) {
// If namespace has a wildcard, filter results against Corefile namespace list.
// (Namespaces without a wildcard were filtered before the call to this function.)
if nsWildcard && (len(g.Namespaces) > 0) && (!util.StringInSlice(item.Metadata.Namespace, g.Namespaces)) {
log.Printf("[debug] Namespace '%v' is not published by Corefile\n", item.Metadata.Namespace)
if nsWildcard && (len(g.Namespaces) > 0) && (!util.StringInSlice(item.Namespace, g.Namespaces)) {
log.Printf("[debug] Namespace '%v' is not published by Corefile\n", item.Namespace)
continue
}
resultItems = append(resultItems, item)
@@ -179,104 +226,26 @@ func symbolMatches(queryString string, candidateString string, wildcard bool) bo
return result
}
// TODO: Remove these unused functions. One is related to Ttl calculation
// Implement Ttl and priority calculation based on service count before
// removing this code.
/*
// splitDNSName separates the name into DNS segments and reverses the segments.
func (g Kubernetes) splitDNSName(name string) []string {
l := dns.SplitDomainName(name)
for i, j := 0, len(l)-1; i < j; i, j = i+1, j-1 {
l[i], l[j] = l[j], l[i]
}
return l
}
*/
// skydns/local/skydns/east/staging/web
// skydns/local/skydns/west/production/web
//
// skydns/local/skydns/*/*/web
// skydns/local/skydns/*/web
/*
// loopNodes recursively loops through the nodes and returns all the values. The nodes' keyname
// will be match against any wildcards when star is true.
func (g Kubernetes) loopNodes(ns []*etcdc.Node, nameParts []string, star bool, bx map[msg.Service]bool) (sx []msg.Service, err error) {
if bx == nil {
bx = make(map[msg.Service]bool)
}
Nodes:
for _, n := range ns {
if n.Dir {
nodes, err := g.loopNodes(n.Nodes, nameParts, star, bx)
if err != nil {
return nil, err
}
sx = append(sx, nodes...)
continue
}
if star {
keyParts := strings.Split(n.Key, "/")
for i, n := range nameParts {
if i > len(keyParts)-1 {
// name is longer than key
continue Nodes
}
if n == "*" || n == "any" {
continue
}
if keyParts[i] != n {
continue Nodes
}
}
}
serv := new(msg.Service)
if err := json.Unmarshal([]byte(n.Value), serv); err != nil {
return nil, err
}
b := msg.Service{Host: serv.Host, Port: serv.Port, Priority: serv.Priority, Weight: serv.Weight, Text: serv.Text, Key: n.Key}
if _, ok := bx[b]; ok {
continue
}
bx[b] = true
serv.Key = n.Key
serv.Ttl = g.Ttl(n, serv)
if serv.Priority == 0 {
serv.Priority = priority
}
sx = append(sx, *serv)
}
return sx, nil
}
// Ttl returns the smaller of the kubernetes TTL and the service's
// TTL. If neither of these are set (have a zero value), a default is used.
func (g Kubernetes) Ttl(node *etcdc.Node, serv *msg.Service) uint32 {
kubernetesTtl := uint32(node.TTL)
if kubernetesTtl == 0 && serv.Ttl == 0 {
return ttl
}
if kubernetesTtl == 0 {
return serv.Ttl
}
if serv.Ttl == 0 {
return kubernetesTtl
}
if kubernetesTtl < serv.Ttl {
return kubernetesTtl
}
return serv.Ttl
}
*/
// kubernetesNameError checks if the error is ErrorCodeKeyNotFound from kubernetes.
func isKubernetesNameError(err error) bool {
return false
}
func (g *Kubernetes) getServiceRecordForIP(ip, name string) []msg.Service {
svcList, err := g.APIConn.svcLister.List()
if err != nil {
return nil
}
for _, service := range svcList.Items {
if service.Spec.ClusterIP == ip {
return []msg.Service{msg.Service{Host: ip}}
}
}
return nil
}
const (
priority = 10 // default priority when nothing is set
ttl = 300 // default ttl when nothing is set

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"math"
"net"
"strings"
"time"
"github.com/miekg/coredns/middleware"
@@ -12,6 +13,11 @@ import (
"github.com/miekg/dns"
)
const (
// arpaSuffix is the standard suffix for PTR IP reverse lookups.
arpaSuffix = ".in-addr.arpa."
)
func (k Kubernetes) records(state middleware.State, exact bool) ([]msg.Service, error) {
services, err := k.Records(state.Name(), exact)
if err != nil {
@@ -64,13 +70,13 @@ func (k Kubernetes) A(zone string, state middleware.State, previousRecords []dns
// We should already have found it
continue
}
m1, e1 := k.Proxy.Lookup(state, target, state.QType())
if e1 != nil {
mes, err := k.Proxy.Lookup(state, target, state.QType())
if err != nil {
continue
}
// Len(m1.Answer) > 0 here is well?
// Len(mes.Answer) > 0 here is well?
records = append(records, newRecord)
records = append(records, m1.Answer...)
records = append(records, mes.Answer...)
continue
case ip.To4() != nil:
records = append(records, serv.NewA(state.QName(), ip.To4()))
@@ -285,7 +291,33 @@ func (k Kubernetes) SOA(zone string, state middleware.State) *dns.SOA {
}
}
// TODO(miek): DNSKEY and friends... intercepted by the DNSSEC middleware?
func (k Kubernetes) PTR(zone string, state middleware.State) ([]dns.RR, error) {
reverseIP, ok := extractIP(state.Name())
if !ok {
return nil, fmt.Errorf("does not support reverse lookup for %s", state.QName())
}
records := make([]dns.RR, 1)
services, err := k.records(state, false)
if err != nil {
return nil, err
}
for _, serv := range services {
ip := net.ParseIP(serv.Host)
if reverseIP != serv.Host {
continue
}
switch {
case ip.To4() != nil:
records = append(records, serv.NewPTR(state.QName(), ip.To4().String()))
break
case ip.To4() == nil:
// nodata?
}
}
return records, nil
}
func isDuplicateCNAME(r *dns.CNAME, records []dns.RR) bool {
for _, rec := range records {
@@ -300,6 +332,27 @@ func isDuplicateCNAME(r *dns.CNAME, records []dns.RR) bool {
func copyState(state middleware.State, target string, typ uint16) middleware.State {
state1 := middleware.State{W: state.W, Req: state.Req.Copy()}
state1.Req.Question[0] = dns.Question{dns.Fqdn(target), dns.ClassINET, typ}
state1.Req.Question[0] = dns.Question{Name: dns.Fqdn(target), Qtype: dns.ClassINET, Qclass: typ}
return state1
}
// extractIP turns a standard PTR reverse record lookup name
// into an IP address
func extractIP(reverseName string) (string, bool) {
if !strings.HasSuffix(reverseName, arpaSuffix) {
return "", false
}
search := strings.TrimSuffix(reverseName, arpaSuffix)
// reverse the segments and then combine them
segments := reverseArray(strings.Split(search, "."))
return strings.Join(segments, "."), true
}
func reverseArray(arr []string) []string {
for i := 0; i < len(arr)/2; i++ {
j := len(arr) - i - 1
arr[i], arr[j] = arr[j], arr[i]
}
return arr
}

View File

@@ -77,6 +77,11 @@ func (s *Service) NewNS(name string) *dns.NS {
return &dns.NS{Hdr: dns.RR_Header{Name: name, Rrtype: dns.TypeNS, Class: dns.ClassINET, Ttl: s.Ttl}, Ns: host}
}
// NewPTR returns a new PTR record based on the Service.
func (s *Service) NewPTR(name string, target string) *dns.PTR {
return &dns.PTR{Hdr: dns.RR_Header{Name: name, Rrtype: dns.TypePTR, Class: dns.ClassINET, Ttl: s.Ttl}, Ptr: dns.Fqdn(target)}
}
// Group checks the services in sx, it looks for a Group attribute on the shortest
// keys. If there are multiple shortest keys *and* the group attribute disagrees (and
// is not empty), we don't consider it a group.

View File

@@ -69,7 +69,7 @@ run_and_expose_service() {
wait_until_k8s_ready
NAMESPACES="demo test"
NAMESPACES="demo poddemo test"
create_namespaces
echo ""

View File

@@ -3,6 +3,9 @@ package util
import (
"strings"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
)
// StringInSlice check whether string a is a member of slice.
@@ -24,3 +27,16 @@ const (
WildcardStar = "*"
WildcardAny = "any"
)
// StoreToNamespaceLister makes a Store that lists Namespaces.
type StoreToNamespaceLister struct {
cache.Store
}
// List lists all Namespaces in the store.
func (s *StoreToNamespaceLister) List() (ns api.NamespaceList, err error) {
for _, m := range s.Store.List() {
ns.Items = append(ns.Items, *(m.(*api.Namespace)))
}
return ns, nil
}