pkg/up: add generic run-this-functions (#1481)

This adds a generic way of start a check function to check a backend.
This package can be used to kick off healthchecks. The package makes
sure only 1 is run at any one time.

It should allow for:
See upstream error -> kick off healthcheck

and not to worry about overwhelming the upstream with a barrage of
queries.
This commit is contained in:
Miek Gieben
2018-02-05 22:00:29 +00:00
committed by GitHub
parent 2ce88a40c1
commit fb1cafe5fa
2 changed files with 106 additions and 0 deletions

66
plugin/pkg/up/up.go Normal file
View File

@@ -0,0 +1,66 @@
package up
import (
"sync"
"time"
)
// Probe is used to run a single Func until it returns true (indicating a target is healthy). If an Func
// is already in progress no new one will be added, i.e. there is always a maximum of 1 checks in flight.
type Probe struct {
do chan Func
stop chan bool
target string
sync.Mutex
inprogress bool
}
// Func is used to determine if a target is alive. If so this function must return true.
type Func func(target string) bool
// New returns a pointer to an intialized Probe.
func New() *Probe {
return &Probe{stop: make(chan bool), do: make(chan Func)}
}
// Do will probe target, if a probe is already in progress this is a noop.
func (p *Probe) Do(f Func) { p.do <- f }
// Stop stops the probing.
func (p *Probe) Stop() { p.stop <- true }
// Start will start the probe manager, after which probes can be initialized with Do.
func (p *Probe) Start(target string, interval time.Duration) { go p.start(target, interval) }
func (p *Probe) start(target string, interval time.Duration) {
for {
select {
case <-p.stop:
return
case f := <-p.do:
p.Lock()
if p.inprogress {
p.Unlock()
continue
}
p.inprogress = true
p.Unlock()
// Passed the lock. Now run f for as long it returns false. If a true is returned
// we return from the goroutine and we can accept another Func to run.
go func() {
for {
if ok := f(target); ok {
break
}
time.Sleep(interval)
}
p.Lock()
p.inprogress = false
p.Unlock()
}()
}
}
}

40
plugin/pkg/up/up_test.go Normal file
View File

@@ -0,0 +1,40 @@
package up
import (
"sync"
"sync/atomic"
"testing"
"time"
)
func TestUp(t *testing.T) {
pr := New()
wg := sync.WaitGroup{}
hits := int32(0)
upfunc := func(s string) bool {
atomic.AddInt32(&hits, 1)
// Sleep tiny amount so that our other pr.Do() calls hit the lock.
time.Sleep(3 * time.Millisecond)
wg.Done()
return true
}
pr.Start("nonexistent", 5*time.Millisecond)
defer pr.Stop()
// These functions AddInt32 to the same hits variable, but we only want to wait when
// upfunc finishes, as that only calls Done() on the waitgroup.
upfuncNoWg := func(s string) bool { atomic.AddInt32(&hits, 1); return true }
wg.Add(1)
pr.Do(upfunc)
pr.Do(upfuncNoWg)
pr.Do(upfuncNoWg)
wg.Wait()
h := atomic.LoadInt32(&hits)
if h != 1 {
t.Errorf("Expected hits to be %d, got %d", 1, h)
}
}