Correctly re-establish the stream

Signed-off-by: Miek Gieben <miek@miek.nl>
This commit is contained in:
Miek Gieben
2020-02-05 16:10:58 +01:00
parent a47aea02f8
commit 72dac14b48
3 changed files with 35 additions and 21 deletions

View File

@@ -38,7 +38,26 @@ func setup(c *caddy.Controller) error {
}) })
c.OnStartup(func() error { c.OnStartup(func() error {
go t.c.Run() go func() {
for {
opts := []grpc.DialOption{grpc.WithInsecure()}
if t.tlsConfig != nil {
opts = []grpc.DialOption{grpc.WithTransportCredentials(credentials.NewTLS(t.tlsConfig))}
}
redo:
t.c, err = xds.New(t.hosts[0], t.node, opts...)
err := t.c.Run()
if err != nil {
log.Warning(err)
time.Sleep(2 * time.Second) // back off foo
goto redo
}
// err == nil
break
}
}()
metrics.MustRegister(c, xds.ClusterGauge) metrics.MustRegister(c, xds.ClusterGauge)
return nil return nil
}) })
@@ -47,9 +66,8 @@ func setup(c *caddy.Controller) error {
} }
func parseTraffic(c *caddy.Controller) (*Traffic, error) { func parseTraffic(c *caddy.Controller) (*Traffic, error) {
node := "coredns"
toHosts := []string{} toHosts := []string{}
t := &Traffic{} t := &Traffic{node: "coredns"}
var ( var (
err error err error
tlsConfig *tls.Config tlsConfig *tls.Config
@@ -85,7 +103,7 @@ func parseTraffic(c *caddy.Controller) (*Traffic, error) {
if len(args) != 1 { if len(args) != 1 {
return nil, c.ArgErr() return nil, c.ArgErr()
} }
node = args[0] t.node = args[0]
case "tls": case "tls":
args := c.RemainingArgs() args := c.RemainingArgs()
if len(args) > 3 { if len(args) > 3 {
@@ -109,19 +127,13 @@ func parseTraffic(c *caddy.Controller) (*Traffic, error) {
} }
} }
opts := []grpc.DialOption{grpc.WithInsecure()}
if tlsConfig != nil { if tlsConfig != nil {
t.tlsConfig = tlsConfig
if tlsServerName != "" { if tlsServerName != "" {
tlsConfig.ServerName = tlsServerName t.tlsConfig.ServerName = tlsServerName
} }
opts = []grpc.DialOption{grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))}
} }
t.hosts = toHosts
// TODO: only the first host is used, need to figure out how to reconcile multiple upstream providers.
if t.c, err = xds.New(toHosts[0], node, opts...); err != nil {
return nil, err
}
return t, nil return t, nil
} }

View File

@@ -2,6 +2,7 @@ package traffic
import ( import (
"context" "context"
"crypto/tls"
"fmt" "fmt"
"strconv" "strconv"
"strings" "strings"
@@ -17,7 +18,11 @@ import (
// Traffic is a plugin that load balances according to assignments. // Traffic is a plugin that load balances according to assignments.
type Traffic struct { type Traffic struct {
c *xds.Client c *xds.Client
node string
tlsConfig *tls.Config
hosts []string
id string id string
health bool health bool
origins []string origins []string

View File

@@ -24,7 +24,6 @@ import (
"context" "context"
"fmt" "fmt"
"sync" "sync"
"time"
"github.com/coredns/coredns/coremain" "github.com/coredns/coredns/coremain"
clog "github.com/coredns/coredns/plugin/pkg/log" clog "github.com/coredns/coredns/plugin/pkg/log"
@@ -79,21 +78,19 @@ func New(addr, node string, opts ...grpc.DialOption) (*Client, error) {
func (c *Client) Stop() error { c.cancel(); return c.cc.Close() } func (c *Client) Stop() error { c.cancel(); return c.cc.Close() }
// Run starts all goroutines and gathers the clusters and endpoint information from the upstream manager. // Run starts all goroutines and gathers the clusters and endpoint information from the upstream manager.
func (c *Client) Run() { func (c *Client) Run() error {
first := true first := true
for { for {
select { select {
case <-c.ctx.Done(): case <-c.ctx.Done():
return return nil
default: default:
} }
cli := xdspb.NewAggregatedDiscoveryServiceClient(c.cc) cli := xdspb.NewAggregatedDiscoveryServiceClient(c.cc)
stream, err := cli.StreamAggregatedResources(c.ctx) stream, err := cli.StreamAggregatedResources(c.ctx)
if err != nil { if err != nil {
log.Debug(err) return err
time.Sleep(2 * time.Second) // grpc's client.go does more spiffy exp. backoff, do we really need that?
continue
} }
if first { if first {
@@ -107,7 +104,7 @@ func (c *Client) Run() {
} }
if err := c.receive(stream); err != nil { if err := c.receive(stream); err != nil {
log.Warning(err) return err
} }
} }
} }