diff --git a/pkg/flow/limiter_test.go b/pkg/flow/limiter_test.go index 9b2134b48e93a07e430a71df31fa796e9142cedd..eb0e65866fd634affd4ec1a4c9ae1cb07e11a32f 100644 --- a/pkg/flow/limiter_test.go +++ b/pkg/flow/limiter_test.go @@ -2,7 +2,6 @@ package flow import ( "strconv" - "sync" "testing" "time" @@ -81,22 +80,16 @@ func capacityLimiterPipe() (chan<- []*model.Record, <-chan []*model.Record) { limiter := node.AsMiddle((NewCapacityLimiter(metrics.NewMetrics(&metrics.Settings{}))).Limit) term := node.AsTerminal(func(termIn <-chan []*model.Record) { // Let records accumulate in the channel, and release only when the flow stops growing - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - prev := -1 - for { - n := len(termIn) - if n == prev { - // No new record - return - } - prev = n - time.Sleep(50 * time.Millisecond) + prev := -1 + for { + n := len(termIn) + if n == prev { + // No new record + break } - }() - wg.Wait() + prev = n + time.Sleep(50 * time.Millisecond) + } // Start output for i := range termIn {