use tickers instead of time.After to avoid memory leak (#5220)

Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
This commit is contained in:
Chris O'Haver
2022-03-04 02:36:02 -05:00
committed by GitHub
parent d40d224271
commit 967814161a
5 changed files with 31 additions and 13 deletions

View File

@@ -84,12 +84,16 @@ func (h *Azure) Run(ctx context.Context) error {
return err return err
} }
go func() { go func() {
delay := 1 * time.Minute
timer := time.NewTimer(delay)
defer timer.Stop()
for { for {
timer.Reset(delay)
select { select {
case <-ctx.Done(): case <-ctx.Done():
log.Debugf("Breaking out of Azure update loop for %v: %v", h.zoneNames, ctx.Err()) log.Debugf("Breaking out of Azure update loop for %v: %v", h.zoneNames, ctx.Err())
return return
case <-time.After(1 * time.Minute): case <-timer.C:
if err := h.updateZones(ctx); err != nil && ctx.Err() == nil { if err := h.updateZones(ctx); err != nil && ctx.Err() == nil {
log.Errorf("Failed to update zones %v: %v", h.zoneNames, err) log.Errorf("Failed to update zones %v: %v", h.zoneNames, err)
} }

View File

@@ -82,12 +82,16 @@ func (h *CloudDNS) Run(ctx context.Context) error {
return err return err
} }
go func() { go func() {
delay := 1 * time.Minute
timer := time.NewTimer(delay)
defer timer.Stop()
for { for {
timer.Reset(delay)
select { select {
case <-ctx.Done(): case <-ctx.Done():
log.Debugf("Breaking out of CloudDNS update loop for %v: %v", h.zoneNames, ctx.Err()) log.Debugf("Breaking out of CloudDNS update loop for %v: %v", h.zoneNames, ctx.Err())
return return
case <-time.After(1 * time.Minute): case <-timer.C:
if err := h.updateZones(ctx); err != nil && ctx.Err() == nil /* Don't log error if ctx expired. */ { if err := h.updateZones(ctx); err != nil && ctx.Err() == nil /* Don't log error if ctx expired. */ {
log.Errorf("Failed to update zones %v: %v", h.zoneNames, err) log.Errorf("Failed to update zones %v: %v", h.zoneNames, err)
} }

View File

@@ -92,8 +92,10 @@ func (d *dio) write(payload *tap.Dnstap) error {
} }
func (d *dio) serve() { func (d *dio) serve() {
timeout := time.After(d.flushTimeout) timeout := time.NewTimer(d.flushTimeout)
defer timeout.Stop()
for { for {
timeout.Reset(d.flushTimeout)
select { select {
case <-d.quit: case <-d.quit:
if d.enc == nil { if d.enc == nil {
@@ -106,7 +108,7 @@ func (d *dio) serve() {
if err := d.write(&payload); err != nil { if err := d.write(&payload); err != nil {
d.dial() d.dial()
} }
case <-timeout: case <-timeout.C:
if dropped := atomic.SwapUint32(&d.dropped, 0); dropped > 0 { if dropped := atomic.SwapUint32(&d.dropped, 0); dropped > 0 {
log.Warningf("Dropped dnstap messages: %d", dropped) log.Warningf("Dropped dnstap messages: %d", dropped)
} }
@@ -115,7 +117,6 @@ func (d *dio) serve() {
} else { } else {
d.enc.flush() d.enc.flush()
} }
timeout = time.After(d.flushTimeout)
} }
} }
} }

View File

@@ -275,19 +275,25 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (onStart func() error, o
k.APIConn.Run() k.APIConn.Run()
}() }()
timeout := time.After(5 * time.Second) timeout := 5 * time.Second
logWaiting := time.After(500 * time.Millisecond) timeoutTicker := time.NewTicker(timeout)
ticker := time.NewTicker(100 * time.Millisecond) defer timeoutTicker.Stop()
defer ticker.Stop() logDelay := 500 * time.Millisecond
logTicker := time.NewTicker(logDelay)
defer logTicker.Stop()
checkSyncTicker := time.NewTicker(100 * time.Millisecond)
defer checkSyncTicker.Stop()
for { for {
timeoutTicker.Reset(timeout)
logTicker.Reset(logDelay)
select { select {
case <-ticker.C: case <-checkSyncTicker.C:
if k.APIConn.HasSynced() { if k.APIConn.HasSynced() {
return nil return nil
} }
case <-logWaiting: case <-logTicker.C:
log.Info("waiting for Kubernetes API before starting server") log.Info("waiting for Kubernetes API before starting server")
case <-timeout: case <-timeoutTicker.C:
log.Warning("starting server with unsynced Kubernetes API") log.Warning("starting server with unsynced Kubernetes API")
return nil return nil
} }

View File

@@ -84,12 +84,15 @@ func (h *Route53) Run(ctx context.Context) error {
return err return err
} }
go func() { go func() {
timer := time.NewTimer(h.refresh)
defer timer.Stop()
for { for {
timer.Reset(h.refresh)
select { select {
case <-ctx.Done(): case <-ctx.Done():
log.Debugf("Breaking out of Route53 update loop for %v: %v", h.zoneNames, ctx.Err()) log.Debugf("Breaking out of Route53 update loop for %v: %v", h.zoneNames, ctx.Err())
return return
case <-time.After(h.refresh): case <-timer.C:
if err := h.updateZones(ctx); err != nil && ctx.Err() == nil /* Don't log error if ctx expired. */ { if err := h.updateZones(ctx); err != nil && ctx.Err() == nil /* Don't log error if ctx expired. */ {
log.Errorf("Failed to update zones %v: %v", h.zoneNames, err) log.Errorf("Failed to update zones %v: %v", h.zoneNames, err)
} }