Skip to content
Snippets Groups Projects
Unverified Commit 24ff94ff authored by Joel Takvorian's avatar Joel Takvorian Committed by GitHub
Browse files

Simplify previous test fix (#684)

parent e0b2df2f
No related branches found
No related tags found
No related merge requests found
...@@ -2,7 +2,6 @@ package flow ...@@ -2,7 +2,6 @@ package flow
import ( import (
"strconv" "strconv"
"sync"
"testing" "testing"
"time" "time"
...@@ -81,22 +80,16 @@ func capacityLimiterPipe() (chan<- []*model.Record, <-chan []*model.Record) { ...@@ -81,22 +80,16 @@ func capacityLimiterPipe() (chan<- []*model.Record, <-chan []*model.Record) {
limiter := node.AsMiddle((NewCapacityLimiter(metrics.NewMetrics(&metrics.Settings{}))).Limit) limiter := node.AsMiddle((NewCapacityLimiter(metrics.NewMetrics(&metrics.Settings{}))).Limit)
term := node.AsTerminal(func(termIn <-chan []*model.Record) { term := node.AsTerminal(func(termIn <-chan []*model.Record) {
// Let records accumulate in the channel, and release only when the flow stops growing // Let records accumulate in the channel, and release only when the flow stops growing
wg := sync.WaitGroup{} prev := -1
wg.Add(1) for {
go func() { n := len(termIn)
defer wg.Done() if n == prev {
prev := -1 // No new record
for { break
n := len(termIn)
if n == prev {
// No new record
return
}
prev = n
time.Sleep(50 * time.Millisecond)
} }
}() prev = n
wg.Wait() time.Sleep(50 * time.Millisecond)
}
// Start output // Start output
for i := range termIn { for i := range termIn {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment