Dnstap plugin refactoring (#1257)

This commit is contained in:
Uladzimir Trehubenka
2017-11-28 00:36:14 +03:00
committed by Miek Gieben
parent 06006fac56
commit 6d6e1357b9
7 changed files with 223 additions and 416 deletions

View File

@@ -2,67 +2,100 @@ package dnstapio
import (
"log"
"net"
"time"
tap "github.com/dnstap/golang-dnstap"
fs "github.com/farsightsec/golang-framestream"
"github.com/golang/protobuf/proto"
)
// DnstapIO wraps the dnstap I/O routine.
type DnstapIO struct {
protocol Protocol
queue chan tap.Dnstap
stop chan bool
const (
tcpTimeout = 4 * time.Second
flushTimeout = 1 * time.Second
queueSize = 1000
)
type dnstapIO struct {
enc *fs.Encoder
conn net.Conn
queue chan tap.Dnstap
}
// Protocol is either `out.TCP` or `out.Socket`.
type Protocol interface {
// Write takes a single frame at once.
Write([]byte) (int, error)
Close() error
// New returns a new and initialized DnstapIO.
func New() DnstapIO {
return &dnstapIO{queue: make(chan tap.Dnstap, queueSize)}
}
// New dnstap I/O routine from Protocol.
func New(w Protocol) *DnstapIO {
dio := DnstapIO{}
dio.protocol = w
dio.queue = make(chan tap.Dnstap, 10)
dio.stop = make(chan bool)
// DnstapIO interface
type DnstapIO interface {
Connect(endpoint string, socket bool) error
Dnstap(payload tap.Dnstap)
Close()
}
// Connect connects to the dnstop endpoint.
func (dio *dnstapIO) Connect(endpoint string, socket bool) error {
var err error
if socket {
dio.conn, err = net.Dial("unix", endpoint)
} else {
dio.conn, err = net.DialTimeout("tcp", endpoint, tcpTimeout)
}
if err != nil {
return err
}
dio.enc, err = fs.NewEncoder(dio.conn, &fs.EncoderOptions{
ContentType: []byte("protobuf:dnstap.Dnstap"),
Bidirectional: true,
})
if err != nil {
return err
}
go dio.serve()
return &dio
return nil
}
// Dnstap enqueues the payload for log.
func (dio *DnstapIO) Dnstap(payload tap.Dnstap) {
func (dio *dnstapIO) Dnstap(payload tap.Dnstap) {
select {
case dio.queue <- payload:
default:
log.Println("[WARN] Dnstap payload dropped.")
}
}
func (dio *DnstapIO) serve() {
for {
select {
case payload := <-dio.queue:
frame, err := proto.Marshal(&payload)
if err == nil {
dio.protocol.Write(frame)
} else {
log.Printf("[ERROR] Invalid dnstap payload dropped: %s\n", err)
}
case <-dio.stop:
close(dio.queue)
dio.stop <- true
return
}
log.Printf("[ERROR] Dnstap payload dropped")
}
}
// Close waits until the I/O routine is finished to return.
func (dio DnstapIO) Close() error {
dio.stop <- true
<-dio.stop
close(dio.stop)
return dio.protocol.Close()
func (dio *dnstapIO) Close() {
close(dio.queue)
}
func (dio *dnstapIO) serve() {
timeout := time.After(flushTimeout)
for {
select {
case payload, ok := <-dio.queue:
if !ok {
dio.enc.Close()
dio.conn.Close()
return
}
frame, err := proto.Marshal(&payload)
if err != nil {
log.Printf("[ERROR] Invalid dnstap payload dropped: %s", err)
continue
}
_, err = dio.enc.Write(frame)
if err != nil {
log.Printf("[ERROR] Cannot write dnstap payload: %s", err)
continue
}
case <-timeout:
err := dio.enc.Flush()
if err != nil {
log.Printf("[ERROR] Cannot flush dnstap payloads: %s", err)
}
timeout = time.After(flushTimeout)
}
}
}