Files
coredns/plugin/dnstap/dnstapio/io.go
Miek Gieben b3b8a7e4b7 plugin/dnstap: various cleanups (#4179)
* plugin/dnstap: various cleanups

A recent issue made me look into this plugin, I suspect various other
cleanups (hopefully deletion of code) can be made as well

Remove identical functions ToClientQuery etc, and just use tap.Message
as the base type in plugin. Keep msg/ for a few helper functions that
may proof useful.

This remove the whole test directory as we will just check the things we
are interested in which gives much better feedback and keeps that code
closer together.

tapwr dir is also not needed, writer_test.go was just duplicating the
tests already done. This moves writer.go to the top directory.

Make the only user of dnstap, the forward plugin, use the newer code
also remove the test, a better test there would be a full e2e test to
see the correct thing happens.

Cleanup the Tapper interface and move it to dnstapio where it belongs,
remove higher level interfaces that are not used. This remove
dnstap.Tapper and dnstap.IORoutines.

Use the standard mechanism for getting access to a plugin and remove
shuffling the plugin into the context.

Signed-off-by: Miek Gieben <miek@miek.nl>

* use opts to get the correct proto

Signed-off-by: Miek Gieben <miek@miek.nl>

* Various fixes

Signed-off-by: Miek Gieben <miek@miek.nl>

* Remove bad addr test, as dnstap is only called from within coredns where these fields have been preparsed

Signed-off-by: Miek Gieben <miek@miek.nl>

* dnstap: remove saving the error

all these fields have been preparsed, no need for dnstap to be pedantic
and check (and save!) this error again.

Simplifies it a bit more.

Signed-off-by: Miek Gieben <miek@miek.nl>

* Update plugin/forward/dnstap.go

Co-authored-by: Ruslan Drozhdzh <30860269+rdrozhdzh@users.noreply.github.com>

* Code review

Signed-off-by: Miek Gieben <miek@miek.nl>

* add back in preferUDP

Signed-off-by: Miek Gieben <miek@miek.nl>

* nit

Signed-off-by: Miek Gieben <miek@miek.nl>

Co-authored-by: Ruslan Drozhdzh <30860269+rdrozhdzh@users.noreply.github.com>
2020-10-12 19:10:35 +02:00

144 lines
2.9 KiB
Go

package dnstapio
import (
"net"
"sync/atomic"
"time"
clog "github.com/coredns/coredns/plugin/pkg/log"
tap "github.com/dnstap/golang-dnstap"
fs "github.com/farsightsec/golang-framestream"
)
var log = clog.NewWithPlugin("dnstap")
const (
tcpWriteBufSize = 1024 * 1024
tcpTimeout = 4 * time.Second
flushTimeout = 1 * time.Second
queueSize = 10000
)
// Tapper interface is used in testing to mock the Dnstap method.
type Tapper interface {
Dnstap(tap.Dnstap)
}
// dio implements the Tapper interface.
type dio struct {
endpoint string
socket bool
conn net.Conn
enc *dnstapEncoder
queue chan tap.Dnstap
dropped uint32
quit chan struct{}
}
// New returns a new and initialized pointer to a dio.
func New(endpoint string, socket bool) *dio {
return &dio{
endpoint: endpoint,
socket: socket,
enc: newDnstapEncoder(&fs.EncoderOptions{
ContentType: []byte("protobuf:dnstap.Dnstap"),
Bidirectional: true,
}),
queue: make(chan tap.Dnstap, queueSize),
quit: make(chan struct{}),
}
}
func (d *dio) newConnect() error {
var err error
if d.socket {
if d.conn, err = net.Dial("unix", d.endpoint); err != nil {
return err
}
} else {
if d.conn, err = net.DialTimeout("tcp", d.endpoint, tcpTimeout); err != nil {
return err
}
if tcpConn, ok := d.conn.(*net.TCPConn); ok {
tcpConn.SetWriteBuffer(tcpWriteBufSize)
tcpConn.SetNoDelay(false)
}
}
return d.enc.resetWriter(d.conn)
}
// Connect connects to the dnstap endpoint.
func (d *dio) Connect() {
if err := d.newConnect(); err != nil {
log.Error("No connection to dnstap endpoint")
}
go d.serve()
}
// Dnstap enqueues the payload for log.
func (d *dio) Dnstap(payload tap.Dnstap) {
select {
case d.queue <- payload:
default:
atomic.AddUint32(&d.dropped, 1)
}
}
func (d *dio) closeConnection() {
d.enc.close()
if d.conn != nil {
d.conn.Close()
d.conn = nil
}
}
// Close waits until the I/O routine is finished to return.
func (d *dio) Close() { close(d.quit) }
func (d *dio) flushBuffer() {
if d.conn == nil {
if err := d.newConnect(); err != nil {
return
}
log.Info("Reconnected to dnstap")
}
if err := d.enc.flushBuffer(); err != nil {
log.Warningf("Connection lost: %s", err)
d.closeConnection()
if err := d.newConnect(); err != nil {
log.Errorf("Cannot connect to dnstap: %s", err)
} else {
log.Info("Reconnected to dnstap")
}
}
}
func (d *dio) write(payload *tap.Dnstap) {
if err := d.enc.writeMsg(payload); err != nil {
atomic.AddUint32(&d.dropped, 1)
}
}
func (d *dio) serve() {
timeout := time.After(flushTimeout)
for {
select {
case <-d.quit:
d.flushBuffer()
d.closeConnection()
return
case payload := <-d.queue:
d.write(&payload)
case <-timeout:
if dropped := atomic.SwapUint32(&d.dropped, 0); dropped > 0 {
log.Warningf("Dropped dnstap messages: %d", dropped)
}
d.flushBuffer()
timeout = time.After(flushTimeout)
}
}
}