From 24ff94ffaa995c53bf8fb7dde3c9d38d8b5c82e4 Mon Sep 17 00:00:00 2001 From: Joel Takvorian <joel.takvorian@qaraywa.net> Date: Mon, 28 Apr 2025 17:49:05 +0200 Subject: [PATCH] Simplify previous test fix (#684) --- pkg/flow/limiter_test.go | 25 +++++++++---------------- 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/pkg/flow/limiter_test.go b/pkg/flow/limiter_test.go index 9b2134b48..eb0e65866 100644 --- a/pkg/flow/limiter_test.go +++ b/pkg/flow/limiter_test.go @@ -2,7 +2,6 @@ package flow import ( "strconv" - "sync" "testing" "time" @@ -81,22 +80,16 @@ func capacityLimiterPipe() (chan<- []*model.Record, <-chan []*model.Record) { limiter := node.AsMiddle((NewCapacityLimiter(metrics.NewMetrics(&metrics.Settings{}))).Limit) term := node.AsTerminal(func(termIn <-chan []*model.Record) { // Let records accumulate in the channel, and release only when the flow stops growing - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - prev := -1 - for { - n := len(termIn) - if n == prev { - // No new record - return - } - prev = n - time.Sleep(50 * time.Millisecond) + prev := -1 + for { + n := len(termIn) + if n == prev { + // No new record + break } - }() - wg.Wait() + prev = n + time.Sleep(50 * time.Millisecond) + } // Start output for i := range termIn { -- GitLab