mw/kubernetes: remove kService and kPod

Remove the intermediate step of gathering everyhing in a kPod and
kService and extracting the msg.Service from there. Now findPods and
findServices return []msg.Service.

This cuts down on the code and also removed the double looping of
finding the data we need, so it should be faster.
This commit is contained in:
Miek Gieben
2017-08-22 07:53:11 +01:00
parent 0191ec74d8
commit aacb91ef0b
6 changed files with 95 additions and 148 deletions

View File

@@ -75,27 +75,6 @@ const (
DNSSchemaVersion = "1.0.1"
)
type endpoint struct {
addr api.EndpointAddress
port api.EndpointPort
}
// kService is a service as retrieved via the k8s API.
type kService struct {
name string
namespace string
addr string
ports []api.ServicePort
endpoints []endpoint
}
// kPod is a pod as retrieved via the k8s API.
type kPod struct {
name string
namespace string
addr string
}
var (
errNoItems = errors.New("no items found")
errNsNotExposed = errors.New("namespace is not exposed")
@@ -301,7 +280,7 @@ func (k *Kubernetes) Records(name string, exact bool) ([]msg.Service, error) {
// Entries looks up services in kubernetes.
func (k *Kubernetes) Entries(state request.Request) ([]msg.Service, error) {
r, e := k.parseRequest(state)
r, e := parseRequest(state)
if e != nil {
return nil, e
}
@@ -310,16 +289,13 @@ func (k *Kubernetes) Entries(state request.Request) ([]msg.Service, error) {
return nil, errNsNotExposed
}
services, pods, err := k.get(r)
if err != nil {
return nil, err
}
if len(services) == 0 && len(pods) == 0 {
return nil, errNoItems
if r.podOrSvc == Pod {
pods, err := k.findPods(r, state.Zone)
return pods, err
}
records := k.getRecordsForK8sItems(services, pods, state.Zone)
return records, nil
services, err := k.findServices(r, state.Zone)
return services, err
}
func endpointHostname(addr api.EndpointAddress) string {
@@ -335,51 +311,18 @@ func endpointHostname(addr api.EndpointAddress) string {
return ""
}
func (k *Kubernetes) getRecordsForK8sItems(services []kService, pods []kPod, zone string) (records []msg.Service) {
zonePath := msg.Path(zone, "coredns")
for _, svc := range services {
if svc.addr == api.ClusterIPNone || len(svc.endpoints) > 0 {
// This is a headless service or endpoints are present, create records for each endpoint
for _, ep := range svc.endpoints {
s := msg.Service{Host: ep.addr.IP, Port: int(ep.port.Port)}
s.Key = strings.Join([]string{zonePath, Svc, svc.namespace, svc.name, endpointHostname(ep.addr)}, "/")
records = append(records, s)
}
continue
}
// Create records for each exposed port...
for _, p := range svc.ports {
s := msg.Service{Host: svc.addr, Port: int(p.Port)}
s.Key = strings.Join([]string{zonePath, Svc, svc.namespace, svc.name}, "/")
records = append(records, s)
}
// If the addr is not an IP (i.e. an external service), add the record ...
s := msg.Service{Key: strings.Join([]string{zonePath, Svc, svc.namespace, svc.name}, "/"), Host: svc.addr}
if t, _ := s.HostType(); t == dns.TypeCNAME {
s.Key = strings.Join([]string{zonePath, Svc, svc.namespace, svc.name}, "/")
records = append(records, s)
}
}
for _, p := range pods {
s := msg.Service{Key: strings.Join([]string{zonePath, Pod, p.namespace, p.name}, "/"), Host: p.addr}
records = append(records, s)
}
return records
}
func (k *Kubernetes) findPods(namespace, podname string) (pods []kPod, err error) {
// findPods returns the pods matching r from the cache.
func (k *Kubernetes) findPods(r recordRequest, zone string) (pods []msg.Service, err error) {
if k.PodMode == PodModeDisabled {
return pods, errPodsDisabled
return nil, errPodsDisabled
}
var ip string
namespace := r.namespace
podname := r.service
zonePath := msg.Path(zone, "coredns")
ip := ""
err = errNoItems
if strings.Count(podname, "-") == 3 && !strings.Contains(podname, "--") {
ip = strings.Replace(podname, "-", ".", -1)
} else {
@@ -387,9 +330,7 @@ func (k *Kubernetes) findPods(namespace, podname string) (pods []kPod, err error
}
if k.PodMode == PodModeInsecure {
s := kPod{name: podname, namespace: namespace, addr: ip}
pods = append(pods, s)
return pods, nil
return []msg.Service{{Key: strings.Join([]string{zonePath, Pod, namespace, podname}, "/"), Host: ip}}, nil
}
// PodModeVerified
@@ -406,29 +347,20 @@ func (k *Kubernetes) findPods(namespace, podname string) (pods []kPod, err error
}
// check for matching ip and namespace
if ip == p.Status.PodIP && match(namespace, p.Namespace) {
s := kPod{name: podname, namespace: namespace, addr: ip}
s := msg.Service{Key: strings.Join([]string{zonePath, Pod, namespace, podname}, "/"), Host: ip}
pods = append(pods, s)
return pods, nil
err = nil
}
}
return pods, nil
return pods, err
}
// get retrieves matching data from the cache.
func (k *Kubernetes) get(r recordRequest) (services []kService, pods []kPod, err error) {
switch r.podOrSvc {
case Pod:
pods, err = k.findPods(r.namespace, r.service)
return nil, pods, err
default:
services, err = k.findServices(r)
return services, nil, err
}
}
func (k *Kubernetes) findServices(r recordRequest) ([]kService, error) {
// findServices returns the services matching r from the cache.
func (k *Kubernetes) findServices(r recordRequest, zone string) (services []msg.Service, err error) {
serviceList := k.APIConn.ServiceList()
var resultItems []kService
zonePath := msg.Path(zone, "coredns")
err = errNoItems // Set to errNoItems to signal really nothing found, gets reset when name is matched.
for _, svc := range serviceList {
if !(match(r.namespace, svc.Namespace) && match(r.service, svc.Name)) {
@@ -440,17 +372,17 @@ func (k *Kubernetes) findServices(r recordRequest) ([]kService, error) {
if wildcard(r.namespace) && !k.namespaceExposed(svc.Namespace) {
continue
}
s := kService{name: svc.Name, namespace: svc.Namespace}
// Endpoint query or headless service
if svc.Spec.ClusterIP == api.ClusterIPNone || r.endpoint != "" {
s.addr = svc.Spec.ClusterIP
endpointsList := k.APIConn.EndpointsList()
for _, ep := range endpointsList.Items {
if ep.ObjectMeta.Name != svc.Name || ep.ObjectMeta.Namespace != svc.Namespace {
continue
}
err = nil
for _, eps := range ep.Subsets {
for _, addr := range eps.Addresses {
@@ -466,36 +398,45 @@ func (k *Kubernetes) findServices(r recordRequest) ([]kService, error) {
if !(match(r.port, p.Name) && match(r.protocol, string(p.Protocol))) {
continue
}
s.endpoints = append(s.endpoints, endpoint{addr: addr, port: p})
s := msg.Service{Host: addr.IP, Port: int(p.Port)}
s.Key = strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name, endpointHostname(addr)}, "/")
services = append(services, s)
}
}
}
}
if len(s.endpoints) > 0 {
resultItems = append(resultItems, s)
}
continue
}
// External service
if svc.Spec.ExternalName != "" {
s.addr = svc.Spec.ExternalName
resultItems = append(resultItems, s)
continue
s := msg.Service{Key: strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name}, "/"), Host: svc.Spec.ExternalName}
if t, _ := s.HostType(); t == dns.TypeCNAME {
s.Key = strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name}, "/")
services = append(services, s)
err = nil
continue
}
}
// ClusterIP service
s.addr = svc.Spec.ClusterIP
for _, p := range svc.Spec.Ports {
if !(match(r.port, p.Name) && match(r.protocol, string(p.Protocol))) {
continue
}
s.ports = append(s.ports, p)
}
resultItems = append(resultItems, s)
err = nil
s := msg.Service{Host: svc.Spec.ClusterIP, Port: int(p.Port)}
s.Key = strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name}, "/")
services = append(services, s)
}
}
return resultItems, nil
return services, err
}
// match checks if a and b are equal taking wildcards into account.
@@ -514,39 +455,6 @@ func wildcard(s string) bool {
return s == "*" || s == "any"
}
// serviceRecordForIP gets a service record with a cluster ip matching the ip argument
// If a service cluster ip does not match, it checks all endpoints
func (k *Kubernetes) serviceRecordForIP(ip, name string) []msg.Service {
// First check services with cluster ips
svcList := k.APIConn.ServiceList()
for _, service := range svcList {
if (len(k.Namespaces) > 0) && !k.namespaceExposed(service.Namespace) {
continue
}
if service.Spec.ClusterIP == ip {
domain := strings.Join([]string{service.Name, service.Namespace, Svc, k.primaryZone()}, ".")
return []msg.Service{{Host: domain}}
}
}
// If no cluster ips match, search endpoints
epList := k.APIConn.EndpointsList()
for _, ep := range epList.Items {
if (len(k.Namespaces) > 0) && !k.namespaceExposed(ep.ObjectMeta.Namespace) {
continue
}
for _, eps := range ep.Subsets {
for _, addr := range eps.Addresses {
if addr.IP == ip {
domain := strings.Join([]string{endpointHostname(addr), ep.ObjectMeta.Name, ep.ObjectMeta.Namespace, Svc, k.primaryZone()}, ".")
return []msg.Service{{Host: domain}}
}
}
}
}
return nil
}
// namespaceExposed returns true when the namespace is exposed.
func (k *Kubernetes) namespaceExposed(namespace string) bool {
_, ok := k.Namespaces[namespace]