From 0ef4b55d8d7c87a5f2e43e2fbb22cdef0a2e7b26 Mon Sep 17 00:00:00 2001 From: cangming Date: Tue, 25 Nov 2025 04:14:21 +0800 Subject: [PATCH] plugin/pkg/uniq: fix data race with sync.RWMutex (#7707) Add RWMutex to protect concurrent map access in Set, Unset, and ForEach methods. Change New() to return *U pointer type for proper synchronization. Signed-off-by: Cangming H --- plugin/pkg/uniq/uniq.go | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/plugin/pkg/uniq/uniq.go b/plugin/pkg/uniq/uniq.go index 5f95e41d2..a04c61232 100644 --- a/plugin/pkg/uniq/uniq.go +++ b/plugin/pkg/uniq/uniq.go @@ -2,9 +2,12 @@ // identical events will only be processed once. package uniq +import "sync" + // U keeps track of item to be done. type U struct { - u map[string]item + mu sync.RWMutex + u map[string]item } type item struct { @@ -13,10 +16,24 @@ type item struct { } // New returns a new initialized U. -func New() U { return U{u: make(map[string]item)} } +func New() *U { return &U{u: make(map[string]item)} } // Set sets function f in U under key. If the key already exists it is not overwritten. -func (u U) Set(key string, f func() error) { +func (u *U) Set(key string, f func() error) { + // Read lock for check + u.mu.RLock() + _, exists := u.u[key] + u.mu.RUnlock() + + if exists { + return + } + + // Write lock for modification + u.mu.Lock() + defer u.mu.Unlock() + + // Double-check to avoid TOCTOU if _, ok := u.u[key]; ok { return } @@ -24,12 +41,17 @@ func (u U) Set(key string, f func() error) { } // Unset removes the key. -func (u U) Unset(key string) { +func (u *U) Unset(key string) { + u.mu.Lock() + defer u.mu.Unlock() delete(u.u, key) } // ForEach iterates over u and executes f for each element that is 'todo' and sets it to 'done'. -func (u U) ForEach() error { +func (u *U) ForEach() error { + u.mu.Lock() + defer u.mu.Unlock() + for k, v := range u.u { if v.state == todo { v.f()