mirror of
				https://github.com/coredns/coredns.git
				synced 2025-11-03 18:53:13 -05:00 
			
		
		
		
	@@ -26,14 +26,15 @@ $ cd ~/src/github.com/envoyproxy/go-control-plane
 | 
				
			|||||||
% make integration.xds
 | 
					% make integration.xds
 | 
				
			||||||
~~~
 | 
					~~~
 | 
				
			||||||
 | 
					
 | 
				
			||||||
This runs a binary from pkg/test/main. Now we're testing xDS, but there is also aDS (which does
 | 
					This runs a binary from pkg/test/main. Now we're testing aDS.
 | 
				
			||||||
everything including xDS). I'm still figuring out what do to here.
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
The script stops, unless you have Envoy installed (which I haven't), but you can run it manually:
 | 
					The script stops, unless you have Envoy installed (which I haven't), but you can run it manually:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
~~~ sh
 | 
					~~~ sh
 | 
				
			||||||
./bin/test --xds=xds --runtimes=1 -debug  # for xds
 | 
					./bin/test --xds=ads --runtimes=2 -debug  # for ads
 | 
				
			||||||
~~~
 | 
					~~~
 | 
				
			||||||
 | 
					
 | 
				
			||||||
This fails with `timeout waiting for the first request`, means you're consumer wasn't quick enough
 | 
					This fails with `timeout waiting for the first request`, means you're consumer wasn't quick enough
 | 
				
			||||||
in asking for xDS assignments.
 | 
					in asking for xDS assignments.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Use insecure.
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1,6 +1,7 @@
 | 
				
			|||||||
package traffic
 | 
					package traffic
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
	"math/rand"
 | 
						"math/rand"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -32,7 +33,8 @@ func setup(c *caddy.Controller) error {
 | 
				
			|||||||
		return t
 | 
							return t
 | 
				
			||||||
	})
 | 
						})
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	t.c.WatchCluster("xds_experimental", func(xds.CDSUpdate, error) {})
 | 
						t.c.WatchCluster("", func(x xds.CDSUpdate, _ error) { fmt.Printf("CDSUpdate: %+v\n", x) })
 | 
				
			||||||
 | 
						t.c.WatchEndpoints("", func(x *xds.EDSUpdate, _ error) { fmt.Printf("EDSUpdate: %+v\n", x) })
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -28,6 +28,7 @@ import (
 | 
				
			|||||||
// handleCDSResponse processes an CDS response received from the xDS server. On
 | 
					// handleCDSResponse processes an CDS response received from the xDS server. On
 | 
				
			||||||
// receipt of a good response, it also invokes the registered watcher callback.
 | 
					// receipt of a good response, it also invokes the registered watcher callback.
 | 
				
			||||||
func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error {
 | 
					func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error {
 | 
				
			||||||
 | 
						println("handlCDSResponse")
 | 
				
			||||||
	v2c.mu.Lock()
 | 
						v2c.mu.Lock()
 | 
				
			||||||
	defer v2c.mu.Unlock()
 | 
						defer v2c.mu.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -47,6 +48,7 @@ func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error {
 | 
				
			|||||||
		if !ok {
 | 
							if !ok {
 | 
				
			||||||
			return fmt.Errorf("xds: unexpected resource type: %T in CDS response", resource.Message)
 | 
								return fmt.Errorf("xds: unexpected resource type: %T in CDS response", resource.Message)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
							fmt.Printf("CLUSTER %+v\n", cluster)
 | 
				
			||||||
		update, err := validateCluster(cluster)
 | 
							update, err := validateCluster(cluster)
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			return err
 | 
								return err
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -115,7 +115,7 @@ func (v2c *v2Client) run() {
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if retries != 0 {
 | 
							if retries != 0 {
 | 
				
			||||||
			t := time.NewTimer(v2c.backoff(retries))
 | 
								t := time.NewTimer(1 * time.Second) // backoff bla bla.
 | 
				
			||||||
			select {
 | 
								select {
 | 
				
			||||||
			case <-t.C:
 | 
								case <-t.C:
 | 
				
			||||||
			case <-v2c.ctx.Done():
 | 
								case <-v2c.ctx.Done():
 | 
				
			||||||
@@ -127,14 +127,12 @@ func (v2c *v2Client) run() {
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		retries++
 | 
							retries++
 | 
				
			||||||
		println("SENDING STUFF, retries", retries)
 | 
					 | 
				
			||||||
		cli := adsgrpc.NewAggregatedDiscoveryServiceClient(v2c.cc)
 | 
							cli := adsgrpc.NewAggregatedDiscoveryServiceClient(v2c.cc)
 | 
				
			||||||
		stream, err := cli.StreamAggregatedResources(v2c.ctx) //, grpc.WaitForReady(true))
 | 
							stream, err := cli.StreamAggregatedResources(v2c.ctx) //, grpc.WaitForReady(true))
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			log.Infof("xds: ADS stream creation failed: %v", err)
 | 
								log.Infof("xds: ADS stream creation failed: %v", err)
 | 
				
			||||||
			continue
 | 
								continue
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		println("created ads stream")
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
		// send() could be blocked on reading updates from the different update
 | 
							// send() could be blocked on reading updates from the different update
 | 
				
			||||||
		// channels when it is not actually sending out messages. So, we need a
 | 
							// channels when it is not actually sending out messages. So, we need a
 | 
				
			||||||
@@ -145,7 +143,6 @@ func (v2c *v2Client) run() {
 | 
				
			|||||||
		if v2c.recv(stream) {
 | 
							if v2c.recv(stream) {
 | 
				
			||||||
			retries = 0
 | 
								retries = 0
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		println("sending has succeeded")
 | 
					 | 
				
			||||||
		close(done)
 | 
							close(done)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -286,6 +283,8 @@ func (v2c *v2Client) send(stream adsStream, done chan struct{}) {
 | 
				
			|||||||
			case *watchInfo:
 | 
								case *watchInfo:
 | 
				
			||||||
				println("watchInfo")
 | 
									println("watchInfo")
 | 
				
			||||||
				target, typeURL, version, nonce, send = v2c.processWatchInfo(t)
 | 
									target, typeURL, version, nonce, send = v2c.processWatchInfo(t)
 | 
				
			||||||
 | 
									println(target, typeURL, version, nonce, send)
 | 
				
			||||||
 | 
									fmt.Printf("%+v\n", target)
 | 
				
			||||||
			case *ackInfo:
 | 
								case *ackInfo:
 | 
				
			||||||
				println("ackInfo")
 | 
									println("ackInfo")
 | 
				
			||||||
				target, typeURL, version, nonce, send = v2c.processAckInfo(t)
 | 
									target, typeURL, version, nonce, send = v2c.processAckInfo(t)
 | 
				
			||||||
@@ -305,19 +304,25 @@ func (v2c *v2Client) send(stream adsStream, done chan struct{}) {
 | 
				
			|||||||
// recv receives xDS responses on the provided ADS stream and branches out to
 | 
					// recv receives xDS responses on the provided ADS stream and branches out to
 | 
				
			||||||
// message specific handlers.
 | 
					// message specific handlers.
 | 
				
			||||||
func (v2c *v2Client) recv(stream adsStream) bool {
 | 
					func (v2c *v2Client) recv(stream adsStream) bool {
 | 
				
			||||||
 | 
						println("v2 recv")
 | 
				
			||||||
	success := false
 | 
						success := false
 | 
				
			||||||
	for {
 | 
						for {
 | 
				
			||||||
 | 
							println("WATIIGNM")
 | 
				
			||||||
		resp, err := stream.Recv()
 | 
							resp, err := stream.Recv()
 | 
				
			||||||
		// TODO: call watch callbacks with error when stream is broken.
 | 
							// TODO: call watch callbacks with error when stream is broken.
 | 
				
			||||||
 | 
							println("DONE")
 | 
				
			||||||
		if err != nil {
 | 
							if err != nil {
 | 
				
			||||||
			log.Warningf("xds: ADS stream recv failed: %v", err)
 | 
								log.Warningf("xds: ADS stream recv failed: %v", err)
 | 
				
			||||||
			return success
 | 
								return success
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
							println("RECEIVING")
 | 
				
			||||||
		var respHandleErr error
 | 
							var respHandleErr error
 | 
				
			||||||
		switch resp.GetTypeUrl() {
 | 
							switch resp.GetTypeUrl() {
 | 
				
			||||||
		case cdsURL:
 | 
							case cdsURL:
 | 
				
			||||||
 | 
								println("CDS")
 | 
				
			||||||
			respHandleErr = v2c.handleCDSResponse(resp)
 | 
								respHandleErr = v2c.handleCDSResponse(resp)
 | 
				
			||||||
		case edsURL:
 | 
							case edsURL:
 | 
				
			||||||
 | 
								println("EDS")
 | 
				
			||||||
			respHandleErr = v2c.handleEDSResponse(resp)
 | 
								respHandleErr = v2c.handleEDSResponse(resp)
 | 
				
			||||||
		default:
 | 
							default:
 | 
				
			||||||
			log.Warningf("xds: unknown response URL type: %v", resp.GetTypeUrl())
 | 
								log.Warningf("xds: unknown response URL type: %v", resp.GetTypeUrl())
 | 
				
			||||||
@@ -375,9 +380,7 @@ func (v2c *v2Client) watchEDS(clusterName string, edsCb edsCallback) (cancel fun
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (v2c *v2Client) watch(wi *watchInfo) (cancel func()) {
 | 
					func (v2c *v2Client) watch(wi *watchInfo) (cancel func()) {
 | 
				
			||||||
	println("watch")
 | 
					 | 
				
			||||||
	v2c.sendCh.Put(wi)
 | 
						v2c.sendCh.Put(wi)
 | 
				
			||||||
	println("returning from watch")
 | 
					 | 
				
			||||||
	return func() {
 | 
						return func() {
 | 
				
			||||||
		v2c.mu.Lock()
 | 
							v2c.mu.Lock()
 | 
				
			||||||
		defer v2c.mu.Unlock()
 | 
							defer v2c.mu.Unlock()
 | 
				
			||||||
@@ -399,6 +402,7 @@ func (v2c *v2Client) watch(wi *watchInfo) (cancel func()) {
 | 
				
			|||||||
// Caller should hold v2c.mu
 | 
					// Caller should hold v2c.mu
 | 
				
			||||||
func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) {
 | 
					func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) {
 | 
				
			||||||
	if existing := v2c.watchMap[wi.typeURL]; existing != nil {
 | 
						if existing := v2c.watchMap[wi.typeURL]; existing != nil {
 | 
				
			||||||
 | 
							println("cancel")
 | 
				
			||||||
		existing.cancel()
 | 
							existing.cancel()
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -408,7 +412,10 @@ func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) {
 | 
				
			|||||||
	// we need to access the watchInfo, which is stored in the watchMap.
 | 
						// we need to access the watchInfo, which is stored in the watchMap.
 | 
				
			||||||
	case cdsURL:
 | 
						case cdsURL:
 | 
				
			||||||
		clusterName := wi.target[0]
 | 
							clusterName := wi.target[0]
 | 
				
			||||||
 | 
							println("CDS URLS", clusterName)
 | 
				
			||||||
		if update, ok := v2c.cdsCache[clusterName]; ok {
 | 
							if update, ok := v2c.cdsCache[clusterName]; ok {
 | 
				
			||||||
 | 
								println("UPDATE SEEN, ok")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
			var err error
 | 
								var err error
 | 
				
			||||||
			if v2c.watchMap[cdsURL] == nil {
 | 
								if v2c.watchMap[cdsURL] == nil {
 | 
				
			||||||
				err = fmt.Errorf("xds: no CDS watcher found when handling CDS watch for cluster {%v} from cache", clusterName)
 | 
									err = fmt.Errorf("xds: no CDS watcher found when handling CDS watch for cluster {%v} from cache", clusterName)
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user