From 519ef9ca799b35b94fe91084243bd10b47ac9dc8 Mon Sep 17 00:00:00 2001 From: Miek Gieben Date: Mon, 13 Jan 2020 11:21:20 +0100 Subject: [PATCH] more Signed-off-by: Miek Gieben --- plugin/traffic/HACKING.md | 19 +++++++++++++++++++ plugin/traffic/README.md | 11 +++++++++++ plugin/traffic/setup.go | 3 +++ plugin/traffic/xds/client.go | 6 ++++++ plugin/traffic/xds/v2client.go | 14 ++++++++++++-- plugin/traffic/xds_bootstrap.json | 2 +- 6 files changed, 52 insertions(+), 3 deletions(-) diff --git a/plugin/traffic/HACKING.md b/plugin/traffic/HACKING.md index 0c425c0bf..97d7a4781 100644 --- a/plugin/traffic/HACKING.md +++ b/plugin/traffic/HACKING.md @@ -18,3 +18,22 @@ https://github.com/envoyproxy/learnenvoy/blob/master/_articles/service-discovery Cluster: A cluster is a group of logically similar endpoints that Envoy connects to. In v2, RDS routes points to clusters, CDS provides cluster configuration and Envoy discovers the cluster members via EDS. + +# Testing + +~~~ sh +$ cd ~/src/github.com/envoyproxy/go-control-plane +% make integration.xds +~~~ + +This runs a binary from pkg/test/main. Now we're testing xDS, but there is also aDS (which does +everything including xDS). I'm still figuring out what do to here. + +The script stops, unless you have Envoy installed (which I haven't), but you can run it manually: + +~~~ sh +./bin/test --xds=xds --runtimes=1 -debug # for xds +~~~ + +This fails with `timeout waiting for the first request`, means you're consumer wasn't quick enough +in asking for xDS assignments. diff --git a/plugin/traffic/README.md b/plugin/traffic/README.md index f578c7a39..6c07418aa 100644 --- a/plugin/traffic/README.md +++ b/plugin/traffic/README.md @@ -42,6 +42,17 @@ Findign the xDS endpoint. traffic ~~~ +The extended syntax: + +~~~ +traffic { + server grpc://dsdsd + id ID + } +~~~ + +* id **ID** is how *traffic* identifies itself to the control plane. + This enables traffic load balancing for all (sub-)domains named in the server block. ## Examples diff --git a/plugin/traffic/setup.go b/plugin/traffic/setup.go index befca9fc4..306ce4043 100644 --- a/plugin/traffic/setup.go +++ b/plugin/traffic/setup.go @@ -7,6 +7,7 @@ import ( "github.com/coredns/coredns/core/dnsserver" "github.com/coredns/coredns/plugin" clog "github.com/coredns/coredns/plugin/pkg/log" + "github.com/coredns/coredns/plugin/traffic/xds" "github.com/caddyserver/caddy" ) @@ -31,6 +32,8 @@ func setup(c *caddy.Controller) error { return t }) + t.c.WatchCluster("xds_experimental", func(xds.CDSUpdate, error) {}) + return nil } diff --git a/plugin/traffic/xds/client.go b/plugin/traffic/xds/client.go index 7104bb7c6..783ffc82c 100644 --- a/plugin/traffic/xds/client.go +++ b/plugin/traffic/xds/client.go @@ -69,6 +69,8 @@ func New(opts Options) (*Client, error) { return nil, fmt.Errorf("xds: failed to dial balancer {%s}: %v", opts.Config.BalancerName, err) } + println("dialed balancer at", opts.Config.BalancerName) + c := &Client{ opts: opts, cc: cc, @@ -85,6 +87,10 @@ func (c *Client) Close() { c.cc.Close() } +func (c *Client) Run() { + c.v2c.run() +} + // ServiceUpdate contains update about the service. type ServiceUpdate struct { Cluster string diff --git a/plugin/traffic/xds/v2client.go b/plugin/traffic/xds/v2client.go index dc0db0883..c6e850850 100644 --- a/plugin/traffic/xds/v2client.go +++ b/plugin/traffic/xds/v2client.go @@ -127,12 +127,14 @@ func (v2c *v2Client) run() { } retries++ + println("SENDING STUFF, retries", retries) cli := adsgrpc.NewAggregatedDiscoveryServiceClient(v2c.cc) - stream, err := cli.StreamAggregatedResources(v2c.ctx, grpc.WaitForReady(true)) + stream, err := cli.StreamAggregatedResources(v2c.ctx) //, grpc.WaitForReady(true)) if err != nil { log.Infof("xds: ADS stream creation failed: %v", err) continue } + println("created ads stream") // send() could be blocked on reading updates from the different update // channels when it is not actually sending out messages. So, we need a @@ -143,11 +145,12 @@ func (v2c *v2Client) run() { if v2c.recv(stream) { retries = 0 } + println("sending has succeeded") close(done) } } -// sendRequest sends a request for provided typeURL and resource on the provided +// endRequest sends a request for provided typeURL and resource on the provided // stream. // // version is the ack version to be sent with the request @@ -261,9 +264,12 @@ func (v2c *v2Client) processAckInfo(t *ackInfo) (target []string, typeURL, versi // on the provided ADS stream. func (v2c *v2Client) send(stream adsStream, done chan struct{}) { if !v2c.sendExisting(stream) { + println("not existing stream") return } + println("in send") + for { select { case <-v2c.ctx.Done(): @@ -278,8 +284,10 @@ func (v2c *v2Client) send(stream adsStream, done chan struct{}) { ) switch t := u.(type) { case *watchInfo: + println("watchInfo") target, typeURL, version, nonce, send = v2c.processWatchInfo(t) case *ackInfo: + println("ackInfo") target, typeURL, version, nonce, send = v2c.processAckInfo(t) } if !send { @@ -367,7 +375,9 @@ func (v2c *v2Client) watchEDS(clusterName string, edsCb edsCallback) (cancel fun } func (v2c *v2Client) watch(wi *watchInfo) (cancel func()) { + println("watch") v2c.sendCh.Put(wi) + println("returning from watch") return func() { v2c.mu.Lock() defer v2c.mu.Unlock() diff --git a/plugin/traffic/xds_bootstrap.json b/plugin/traffic/xds_bootstrap.json index c27cf394c..e6df9bbbb 100644 --- a/plugin/traffic/xds_bootstrap.json +++ b/plugin/traffic/xds_bootstrap.json @@ -6,7 +6,7 @@ } }, "xds_servers" : [{ - "server_uri": "trafficdirector.googleapis.com:443", + "server_uri": "localhost:18000", "channel_creds": [ { "type": "google_default" } ]