diff --git a/plugin/traffic/setup.go b/plugin/traffic/setup.go index 2bdca7f27..ca764ce6d 100644 --- a/plugin/traffic/setup.go +++ b/plugin/traffic/setup.go @@ -38,7 +38,26 @@ func setup(c *caddy.Controller) error { }) c.OnStartup(func() error { - go t.c.Run() + go func() { + for { + opts := []grpc.DialOption{grpc.WithInsecure()} + if t.tlsConfig != nil { + opts = []grpc.DialOption{grpc.WithTransportCredentials(credentials.NewTLS(t.tlsConfig))} + } + + redo: + + t.c, err = xds.New(t.hosts[0], t.node, opts...) + err := t.c.Run() + if err != nil { + log.Warning(err) + time.Sleep(2 * time.Second) // back off foo + goto redo + } + // err == nil + break + } + }() metrics.MustRegister(c, xds.ClusterGauge) return nil }) @@ -47,9 +66,8 @@ func setup(c *caddy.Controller) error { } func parseTraffic(c *caddy.Controller) (*Traffic, error) { - node := "coredns" toHosts := []string{} - t := &Traffic{} + t := &Traffic{node: "coredns"} var ( err error tlsConfig *tls.Config @@ -85,7 +103,7 @@ func parseTraffic(c *caddy.Controller) (*Traffic, error) { if len(args) != 1 { return nil, c.ArgErr() } - node = args[0] + t.node = args[0] case "tls": args := c.RemainingArgs() if len(args) > 3 { @@ -109,19 +127,13 @@ func parseTraffic(c *caddy.Controller) (*Traffic, error) { } } - opts := []grpc.DialOption{grpc.WithInsecure()} if tlsConfig != nil { + t.tlsConfig = tlsConfig if tlsServerName != "" { - tlsConfig.ServerName = tlsServerName + t.tlsConfig.ServerName = tlsServerName } - opts = []grpc.DialOption{grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))} } - - // TODO: only the first host is used, need to figure out how to reconcile multiple upstream providers. - if t.c, err = xds.New(toHosts[0], node, opts...); err != nil { - return nil, err - } - + t.hosts = toHosts return t, nil } diff --git a/plugin/traffic/traffic.go b/plugin/traffic/traffic.go index 01e115b48..a73a93823 100644 --- a/plugin/traffic/traffic.go +++ b/plugin/traffic/traffic.go @@ -2,6 +2,7 @@ package traffic import ( "context" + "crypto/tls" "fmt" "strconv" "strings" @@ -17,7 +18,11 @@ import ( // Traffic is a plugin that load balances according to assignments. type Traffic struct { - c *xds.Client + c *xds.Client + node string + tlsConfig *tls.Config + hosts []string + id string health bool origins []string diff --git a/plugin/traffic/xds/client.go b/plugin/traffic/xds/client.go index 3d4b9fdd9..7213ac73d 100644 --- a/plugin/traffic/xds/client.go +++ b/plugin/traffic/xds/client.go @@ -24,7 +24,6 @@ import ( "context" "fmt" "sync" - "time" "github.com/coredns/coredns/coremain" clog "github.com/coredns/coredns/plugin/pkg/log" @@ -79,21 +78,19 @@ func New(addr, node string, opts ...grpc.DialOption) (*Client, error) { func (c *Client) Stop() error { c.cancel(); return c.cc.Close() } // Run starts all goroutines and gathers the clusters and endpoint information from the upstream manager. -func (c *Client) Run() { +func (c *Client) Run() error { first := true for { select { case <-c.ctx.Done(): - return + return nil default: } cli := xdspb.NewAggregatedDiscoveryServiceClient(c.cc) stream, err := cli.StreamAggregatedResources(c.ctx) if err != nil { - log.Debug(err) - time.Sleep(2 * time.Second) // grpc's client.go does more spiffy exp. backoff, do we really need that? - continue + return err } if first { @@ -107,7 +104,7 @@ func (c *Client) Run() { } if err := c.receive(stream); err != nil { - log.Warning(err) + return err } } }