Implement notifies for transfer plugin (#3972)

* Fix notifies in transfer plugin

Signed-off-by: Miek Gieben <miek@miek.nl>

* Make it compile

Signed-off-by: Miek Gieben <miek@miek.nl>

* Port more plugins

Signed-off-by: Miek Gieben <miek@miek.nl>

* golint

Signed-off-by: Miek Gieben <miek@miek.nl>

* Fix tests

Signed-off-by: Miek Gieben <miek@miek.nl>

* Fix notifies in transfer plugin

Signed-off-by: Miek Gieben <miek@miek.nl>

* Make it compile

Signed-off-by: Miek Gieben <miek@miek.nl>

* Port more plugins

Signed-off-by: Miek Gieben <miek@miek.nl>

* golint

Signed-off-by: Miek Gieben <miek@miek.nl>

* Fix tests

Signed-off-by: Miek Gieben <miek@miek.nl>

* Fix tests

Signed-off-by: Miek Gieben <miek@miek.nl>

* really fix test

Signed-off-by: Miek Gieben <miek@miek.nl>

* Implement ixfr fallback and unify file and auto for transfering

Signed-off-by: Miek Gieben <miek@miek.nl>

* Add transfer tests

copied and modified from #3452

Signed-off-by: Miek Gieben <miek@miek.nl>

* Test correct selection of plugin

Signed-off-by: Miek Gieben <miek@miek.nl>

* add upstream back in

Signed-off-by: Miek Gieben <miek@miek.nl>

* Implement ixfr fallback and unify file and auto for transfering

Signed-off-by: Miek Gieben <miek@miek.nl>

* fix test

Signed-off-by: Miek Gieben <miek@miek.nl>

* properly merge

Signed-off-by: Miek Gieben <miek@miek.nl>
This commit is contained in:
Miek Gieben
2020-07-07 21:38:07 +02:00
committed by GitHub
parent 435d27b58d
commit 68f1dd5ddf
42 changed files with 705 additions and 986 deletions

View File

@@ -4,207 +4,151 @@ import (
"context"
"math"
"net"
"sort"
"strings"
"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/etcd/msg"
"github.com/coredns/coredns/plugin/transfer"
"github.com/coredns/coredns/request"
"github.com/miekg/dns"
api "k8s.io/api/core/v1"
)
const transferLength = 2000
// Serial implements the Transferer interface.
func (k *Kubernetes) Serial(state request.Request) uint32 { return uint32(k.APIConn.Modified()) }
// MinTTL implements the Transferer interface.
func (k *Kubernetes) MinTTL(state request.Request) uint32 { return k.ttl }
// Transfer implements the Transferer interface.
func (k *Kubernetes) Transfer(ctx context.Context, state request.Request) (int, error) {
if !k.transferAllowed(state) {
return dns.RcodeRefused, nil
}
// Get all services.
rrs := make(chan dns.RR)
go k.transfer(rrs, state.Zone)
records := []dns.RR{}
for r := range rrs {
records = append(records, r)
}
if len(records) == 0 {
return dns.RcodeServerFailure, nil
}
ch := make(chan *dns.Envelope)
tr := new(dns.Transfer)
soa, err := plugin.SOA(ctx, k, state.Zone, state, plugin.Options{})
// Transfer implements the transfer.Transfer interface.
func (k *Kubernetes) Transfer(zone string, serial uint32) (<-chan []dns.RR, error) {
// state is not used here, hence the empty request.Request{]
soa, err := plugin.SOA(context.TODO(), k, zone, request.Request{}, plugin.Options{})
if err != nil {
return dns.RcodeServerFailure, nil
return nil, transfer.ErrNotAuthoritative
}
records = append(soa, records...)
records = append(records, soa...)
go func(ch chan *dns.Envelope) {
j, l := 0, 0
log.Infof("Outgoing transfer of %d records of zone %s to %s started", len(records), state.Zone, state.IP())
for i, r := range records {
l += dns.Len(r)
if l > transferLength {
ch <- &dns.Envelope{RR: records[j:i]}
l = 0
j = i
}
}
if j < len(records) {
ch <- &dns.Envelope{RR: records[j:]}
}
close(ch)
}(ch)
tr.Out(state.W, state.Req, ch)
// Defer closing to the client
state.W.Hijack()
return dns.RcodeSuccess, nil
}
// transferAllowed checks if incoming request for transferring the zone is allowed according to the ACLs.
// Note: This is copied from zone.transferAllowed, but should eventually be factored into a common transfer pkg.
func (k *Kubernetes) transferAllowed(state request.Request) bool {
for _, t := range k.TransferTo {
if t == "*" {
return true
}
// If remote IP matches we accept.
remote := state.IP()
to, _, err := net.SplitHostPort(t)
if err != nil {
continue
}
if to == remote {
return true
}
}
return false
}
func (k *Kubernetes) transfer(c chan dns.RR, zone string) {
defer close(c)
ch := make(chan []dns.RR)
zonePath := msg.Path(zone, "coredns")
serviceList := k.APIConn.ServiceList()
for _, svc := range serviceList {
if !k.namespaceExposed(svc.Namespace) {
continue
go func() {
// ixfr fallback
if serial != 0 && soa[0].(*dns.SOA).Serial == serial {
ch <- soa
close(ch)
return
}
svcBase := []string{zonePath, Svc, svc.Namespace, svc.Name}
switch svc.Type {
case api.ServiceTypeClusterIP, api.ServiceTypeNodePort, api.ServiceTypeLoadBalancer:
clusterIP := net.ParseIP(svc.ClusterIP)
if clusterIP != nil {
s := msg.Service{Host: svc.ClusterIP, TTL: k.ttl}
s.Key = strings.Join(svcBase, "/")
ch <- soa
// Change host from IP to Name for SRV records
host := emitAddressRecord(c, s)
sort.Slice(serviceList, func(i, j int) bool {
return serviceList[i].Name < serviceList[j].Name
})
for _, p := range svc.Ports {
s := msg.Service{Host: host, Port: int(p.Port), TTL: k.ttl}
s.Key = strings.Join(svcBase, "/")
// Need to generate this to handle use cases for peer-finder
// ref: https://github.com/coredns/coredns/pull/823
c <- s.NewSRV(msg.Domain(s.Key), 100)
// As per spec unnamed ports do not have a srv record
// https://github.com/kubernetes/dns/blob/master/docs/specification.md#232---srv-records
if p.Name == "" {
continue
}
s.Key = strings.Join(append(svcBase, strings.ToLower("_"+string(p.Protocol)), strings.ToLower("_"+string(p.Name))), "/")
c <- s.NewSRV(msg.Domain(s.Key), 100)
}
// Skip endpoint discovery if clusterIP is defined
for _, svc := range serviceList {
if !k.namespaceExposed(svc.Namespace) {
continue
}
svcBase := []string{zonePath, Svc, svc.Namespace, svc.Name}
switch svc.Type {
endpointsList := k.APIConn.EpIndex(svc.Name + "." + svc.Namespace)
case api.ServiceTypeClusterIP, api.ServiceTypeNodePort, api.ServiceTypeLoadBalancer:
clusterIP := net.ParseIP(svc.ClusterIP)
if clusterIP != nil {
s := msg.Service{Host: svc.ClusterIP, TTL: k.ttl}
s.Key = strings.Join(svcBase, "/")
for _, ep := range endpointsList {
if ep.Name != svc.Name || ep.Namespace != svc.Namespace {
// Change host from IP to Name for SRV records
host := emitAddressRecord(ch, s)
for _, p := range svc.Ports {
s := msg.Service{Host: host, Port: int(p.Port), TTL: k.ttl}
s.Key = strings.Join(svcBase, "/")
// Need to generate this to handle use cases for peer-finder
// ref: https://github.com/coredns/coredns/pull/823
ch <- []dns.RR{s.NewSRV(msg.Domain(s.Key), 100)}
// As per spec unnamed ports do not have a srv record
// https://github.com/kubernetes/dns/blob/master/docs/specification.md#232---srv-records
if p.Name == "" {
continue
}
s.Key = strings.Join(append(svcBase, strings.ToLower("_"+string(p.Protocol)), strings.ToLower("_"+string(p.Name))), "/")
ch <- []dns.RR{s.NewSRV(msg.Domain(s.Key), 100)}
}
// Skip endpoint discovery if clusterIP is defined
continue
}
for _, eps := range ep.Subsets {
srvWeight := calcSRVWeight(len(eps.Addresses))
for _, addr := range eps.Addresses {
s := msg.Service{Host: addr.IP, TTL: k.ttl}
s.Key = strings.Join(svcBase, "/")
// We don't need to change the msg.Service host from IP to Name yet
// so disregard the return value here
emitAddressRecord(c, s)
endpointsList := k.APIConn.EpIndex(svc.Name + "." + svc.Namespace)
s.Key = strings.Join(append(svcBase, endpointHostname(addr, k.endpointNameMode)), "/")
// Change host from IP to Name for SRV records
host := emitAddressRecord(c, s)
s.Host = host
for _, ep := range endpointsList {
if ep.Name != svc.Name || ep.Namespace != svc.Namespace {
continue
}
for _, p := range eps.Ports {
// As per spec unnamed ports do not have a srv record
// https://github.com/kubernetes/dns/blob/master/docs/specification.md#232---srv-records
if p.Name == "" {
continue
for _, eps := range ep.Subsets {
srvWeight := calcSRVWeight(len(eps.Addresses))
for _, addr := range eps.Addresses {
s := msg.Service{Host: addr.IP, TTL: k.ttl}
s.Key = strings.Join(svcBase, "/")
// We don't need to change the msg.Service host from IP to Name yet
// so disregard the return value here
emitAddressRecord(ch, s)
s.Key = strings.Join(append(svcBase, endpointHostname(addr, k.endpointNameMode)), "/")
// Change host from IP to Name for SRV records
host := emitAddressRecord(ch, s)
s.Host = host
for _, p := range eps.Ports {
// As per spec unnamed ports do not have a srv record
// https://github.com/kubernetes/dns/blob/master/docs/specification.md#232---srv-records
if p.Name == "" {
continue
}
s.Port = int(p.Port)
s.Key = strings.Join(append(svcBase, strings.ToLower("_"+string(p.Protocol)), strings.ToLower("_"+string(p.Name))), "/")
ch <- []dns.RR{s.NewSRV(msg.Domain(s.Key), srvWeight)}
}
s.Port = int(p.Port)
s.Key = strings.Join(append(svcBase, strings.ToLower("_"+string(p.Protocol)), strings.ToLower("_"+string(p.Name))), "/")
c <- s.NewSRV(msg.Domain(s.Key), srvWeight)
}
}
}
}
case api.ServiceTypeExternalName:
case api.ServiceTypeExternalName:
s := msg.Service{Key: strings.Join(svcBase, "/"), Host: svc.ExternalName, TTL: k.ttl}
if t, _ := s.HostType(); t == dns.TypeCNAME {
c <- s.NewCNAME(msg.Domain(s.Key), s.Host)
s := msg.Service{Key: strings.Join(svcBase, "/"), Host: svc.ExternalName, TTL: k.ttl}
if t, _ := s.HostType(); t == dns.TypeCNAME {
ch <- []dns.RR{s.NewCNAME(msg.Domain(s.Key), s.Host)}
}
}
}
}
ch <- soa
close(ch)
}()
return ch, nil
}
// emitAddressRecord generates a new A or AAAA record based on the msg.Service and writes it to
// a channel.
// emitAddressRecord generates a new A or AAAA record based on the msg.Service and writes it to a channel.
// emitAddressRecord returns the host name from the generated record.
func emitAddressRecord(c chan dns.RR, message msg.Service) string {
ip := net.ParseIP(message.Host)
var host string
dnsType, _ := message.HostType()
func emitAddressRecord(c chan<- []dns.RR, s msg.Service) string {
ip := net.ParseIP(s.Host)
dnsType, _ := s.HostType()
switch dnsType {
case dns.TypeA:
arec := message.NewA(msg.Domain(message.Key), ip)
host = arec.Hdr.Name
c <- arec
r := s.NewA(msg.Domain(s.Key), ip)
c <- []dns.RR{r}
return r.Hdr.Name
case dns.TypeAAAA:
arec := message.NewAAAA(msg.Domain(message.Key), ip)
host = arec.Hdr.Name
c <- arec
r := s.NewAAAA(msg.Domain(s.Key), ip)
c <- []dns.RR{r}
return r.Hdr.Name
}
return host
return ""
}
// calcSrvWeight borrows the logic implemented in plugin.SRV for dynamically