diff --git a/plugin/traffic/HACKING.md b/plugin/traffic/HACKING.md index 97d7a4781..0e908da62 100644 --- a/plugin/traffic/HACKING.md +++ b/plugin/traffic/HACKING.md @@ -26,14 +26,15 @@ $ cd ~/src/github.com/envoyproxy/go-control-plane % make integration.xds ~~~ -This runs a binary from pkg/test/main. Now we're testing xDS, but there is also aDS (which does -everything including xDS). I'm still figuring out what do to here. +This runs a binary from pkg/test/main. Now we're testing aDS. The script stops, unless you have Envoy installed (which I haven't), but you can run it manually: ~~~ sh -./bin/test --xds=xds --runtimes=1 -debug # for xds +./bin/test --xds=ads --runtimes=2 -debug # for ads ~~~ This fails with `timeout waiting for the first request`, means you're consumer wasn't quick enough in asking for xDS assignments. + +Use insecure. diff --git a/plugin/traffic/setup.go b/plugin/traffic/setup.go index 306ce4043..d4f8509de 100644 --- a/plugin/traffic/setup.go +++ b/plugin/traffic/setup.go @@ -1,6 +1,7 @@ package traffic import ( + "fmt" "math/rand" "time" @@ -32,7 +33,8 @@ func setup(c *caddy.Controller) error { return t }) - t.c.WatchCluster("xds_experimental", func(xds.CDSUpdate, error) {}) + t.c.WatchCluster("", func(x xds.CDSUpdate, _ error) { fmt.Printf("CDSUpdate: %+v\n", x) }) + t.c.WatchEndpoints("", func(x *xds.EDSUpdate, _ error) { fmt.Printf("EDSUpdate: %+v\n", x) }) return nil } diff --git a/plugin/traffic/xds/cds.go b/plugin/traffic/xds/cds.go index 381b0d0c9..c51d0a46a 100644 --- a/plugin/traffic/xds/cds.go +++ b/plugin/traffic/xds/cds.go @@ -28,6 +28,7 @@ import ( // handleCDSResponse processes an CDS response received from the xDS server. On // receipt of a good response, it also invokes the registered watcher callback. func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error { + println("handlCDSResponse") v2c.mu.Lock() defer v2c.mu.Unlock() @@ -47,6 +48,7 @@ func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error { if !ok { return fmt.Errorf("xds: unexpected resource type: %T in CDS response", resource.Message) } + fmt.Printf("CLUSTER %+v\n", cluster) update, err := validateCluster(cluster) if err != nil { return err diff --git a/plugin/traffic/xds/v2client.go b/plugin/traffic/xds/v2client.go index c6e850850..40215e40c 100644 --- a/plugin/traffic/xds/v2client.go +++ b/plugin/traffic/xds/v2client.go @@ -115,7 +115,7 @@ func (v2c *v2Client) run() { } if retries != 0 { - t := time.NewTimer(v2c.backoff(retries)) + t := time.NewTimer(1 * time.Second) // backoff bla bla. select { case <-t.C: case <-v2c.ctx.Done(): @@ -127,14 +127,12 @@ func (v2c *v2Client) run() { } retries++ - println("SENDING STUFF, retries", retries) cli := adsgrpc.NewAggregatedDiscoveryServiceClient(v2c.cc) stream, err := cli.StreamAggregatedResources(v2c.ctx) //, grpc.WaitForReady(true)) if err != nil { log.Infof("xds: ADS stream creation failed: %v", err) continue } - println("created ads stream") // send() could be blocked on reading updates from the different update // channels when it is not actually sending out messages. So, we need a @@ -145,7 +143,6 @@ func (v2c *v2Client) run() { if v2c.recv(stream) { retries = 0 } - println("sending has succeeded") close(done) } } @@ -286,6 +283,8 @@ func (v2c *v2Client) send(stream adsStream, done chan struct{}) { case *watchInfo: println("watchInfo") target, typeURL, version, nonce, send = v2c.processWatchInfo(t) + println(target, typeURL, version, nonce, send) + fmt.Printf("%+v\n", target) case *ackInfo: println("ackInfo") target, typeURL, version, nonce, send = v2c.processAckInfo(t) @@ -305,19 +304,25 @@ func (v2c *v2Client) send(stream adsStream, done chan struct{}) { // recv receives xDS responses on the provided ADS stream and branches out to // message specific handlers. func (v2c *v2Client) recv(stream adsStream) bool { + println("v2 recv") success := false for { + println("WATIIGNM") resp, err := stream.Recv() // TODO: call watch callbacks with error when stream is broken. + println("DONE") if err != nil { log.Warningf("xds: ADS stream recv failed: %v", err) return success } + println("RECEIVING") var respHandleErr error switch resp.GetTypeUrl() { case cdsURL: + println("CDS") respHandleErr = v2c.handleCDSResponse(resp) case edsURL: + println("EDS") respHandleErr = v2c.handleEDSResponse(resp) default: log.Warningf("xds: unknown response URL type: %v", resp.GetTypeUrl()) @@ -375,9 +380,7 @@ func (v2c *v2Client) watchEDS(clusterName string, edsCb edsCallback) (cancel fun } func (v2c *v2Client) watch(wi *watchInfo) (cancel func()) { - println("watch") v2c.sendCh.Put(wi) - println("returning from watch") return func() { v2c.mu.Lock() defer v2c.mu.Unlock() @@ -399,6 +402,7 @@ func (v2c *v2Client) watch(wi *watchInfo) (cancel func()) { // Caller should hold v2c.mu func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) { if existing := v2c.watchMap[wi.typeURL]; existing != nil { + println("cancel") existing.cancel() } @@ -408,7 +412,10 @@ func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) { // we need to access the watchInfo, which is stored in the watchMap. case cdsURL: clusterName := wi.target[0] + println("CDS URLS", clusterName) if update, ok := v2c.cdsCache[clusterName]; ok { + println("UPDATE SEEN, ok") + var err error if v2c.watchMap[cdsURL] == nil { err = fmt.Errorf("xds: no CDS watcher found when handling CDS watch for cluster {%v} from cache", clusterName)