Its working

Signed-off-by: Miek Gieben <miek@miek.nl>
This commit is contained in:
Miek Gieben
2020-01-15 20:33:53 +01:00
parent 1652395efa
commit e24c9703e5
30 changed files with 200 additions and 3971 deletions

View File

@@ -0,0 +1,112 @@
package xds
import (
"fmt"
"math/rand"
"net"
"sync"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
)
type assignment struct {
mu sync.RWMutex
cla map[string]*xdspb.ClusterLoadAssignment
version int // not sure what do with and if we should discard all clusters.
}
func (a assignment) SetClusterLoadAssignment(cluster string, cla *xdspb.ClusterLoadAssignment) {
// If cla is nil we just found a cluster, check if we already know about it, or if we need to make a new entry.
a.mu.Lock()
defer a.mu.Unlock()
_, ok := a.cla[cluster]
if !ok {
log.Debugf("Adding cluster %q", cluster)
a.cla[cluster] = cla
return
}
if cla == nil {
return
}
fmt.Printf("%+v\n", cla)
log.Debugf("Updating cluster %q", cluster)
a.cla[cluster] = cla
}
// ClusterLoadAssignment returns the healthy endpoints and their weight.
func (a assignment) ClusterLoadAssignment(cluster string) *xdspb.ClusterLoadAssignment {
a.mu.RLock()
cla, ok := a.cla[cluster]
a.mu.RUnlock()
if !ok {
return nil
}
return cla
}
func (a assignment) Clusters() []string {
a.mu.RLock()
defer a.mu.RUnlock()
clusters := make([]string, len(a.cla))
i := 0
for k := range a.cla {
clusters[i] = k
i++
}
return clusters
}
// Select selects a backend from cla, using weighted random selection. It only selects
// backends that are reporting healthy.
func (a assignment) Select(cluster string) net.IP {
cla := a.ClusterLoadAssignment(cluster)
if cla == nil {
return nil
}
total := 0
i := 0
for _, ep := range cla.Endpoints {
for _, lb := range ep.GetLbEndpoints() {
// if lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY {
// continue
// }
total += int(lb.GetLoadBalancingWeight().GetValue())
i++
}
}
if total == 0 {
// all weights are 0, randomly select one of the endpoints.
r := rand.Intn(i)
i := 0
for _, ep := range cla.Endpoints {
for _, lb := range ep.GetLbEndpoints() {
// if lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY {
// continue
// }
if r == i {
return net.ParseIP(lb.GetEndpoint().GetAddress().GetSocketAddress().GetAddress())
}
i++
}
}
return nil
}
r := rand.Intn(total) + 1
for _, ep := range cla.Endpoints {
for _, lb := range ep.GetLbEndpoints() {
// if lb.GetHealthStatus() != corepb.HealthStatus_HEALTHY {
// continue
// }
r -= int(lb.GetLoadBalancingWeight().GetValue())
if r <= 0 {
return net.ParseIP(lb.GetEndpoint().GetAddress().GetSocketAddress().GetAddress())
}
}
}
return nil
}

View File

@@ -22,7 +22,7 @@ package xds
import (
"context"
"sync"
"net"
"time"
clog "github.com/coredns/coredns/plugin/pkg/log"
@@ -34,7 +34,7 @@ import (
"google.golang.org/grpc"
)
var log = clog.NewWithPlugin("traffic xds:")
var log = clog.NewWithPlugin("traffic: xds")
const (
cdsURL = "type.googleapis.com/envoy.api.v2.Cluster"
@@ -52,45 +52,6 @@ type Client struct {
stop chan struct{}
}
type assignment struct {
mu sync.RWMutex
cla map[string]*xdspb.ClusterLoadAssignment
version int // not sure what do with and if we should discard all clusters.
}
func (a assignment) SetClusterLoadAssignment(cluster string, cla *xdspb.ClusterLoadAssignment) {
// if cla is nil we just found a cluster, check if we already know about it, or if we need to make
// a new entry
a.mu.Lock()
defer a.mu.Unlock()
_, ok := a.cla[cluster]
if !ok {
a.cla[cluster] = cla
return
}
if cla == nil {
return
}
a.cla[cluster] = cla
}
func (a assignment) ClusterLoadAssignment(cluster string) *xdspb.ClusterLoadAssignment {
return nil
}
func (a assignment) Clusters() []string {
a.mu.RLock()
defer a.mu.RUnlock()
clusters := make([]string, len(a.cla))
i := 0
for k := range a.cla {
clusters[i] = k
i++
}
return clusters
}
// New returns a new client that's dialed to addr using node as the local identifier.
func New(addr, node string) (*Client, error) {
// todo credentials
@@ -144,7 +105,7 @@ func (c *Client) Receive(stream adsStream) error {
resp, err := stream.Recv()
if err != nil {
log.Warningf("Trouble receiving from the gRPC connection: %s", err)
time.Sleep(1 * time.Second) // better.
time.Sleep(10 * time.Second) // better.
}
switch resp.GetTypeUrl() {
@@ -160,8 +121,7 @@ func (c *Client) Receive(stream adsStream) error {
}
c.assignments.SetClusterLoadAssignment(cluster.GetName(), nil)
}
println("CDS", len(resp.GetResources()), "processed")
log.Debug("Cluster discovery processed with %d resources", len(resp.GetResources()))
log.Debugf("Cluster discovery processed with %d resources", len(resp.GetResources()))
// ack the CDS proto, with we we've got. (empty version would be NACK)
if err := c.ClusterDiscovery(stream, resp.GetVersionInfo(), resp.GetNonce(), c.assignments.Clusters()); err != nil {
log.Warningf("Failed to acknowledge cluster discovery: %s", err)
@@ -188,13 +148,14 @@ func (c *Client) Receive(stream adsStream) error {
c.assignments.SetClusterLoadAssignment(cla.GetClusterName(), cla)
// ack the bloody thing
}
println("EDS", len(resp.GetResources()), "processed")
log.Debug("Endpoint discovery processed with %d resources", len(resp.GetResources()))
log.Debugf("Endpoint discovery processed with %d resources", len(resp.GetResources()))
default:
log.Warningf("Unknown response URL for discovery: %q", resp.GetTypeUrl())
continue
}
}
return nil
}
// Select is a small wrapper. bla bla, keeps assigmens private.
func (c *Client) Select(cluster string) net.IP { return c.assignments.Select(cluster) }