diff --git a/go.mod b/go.mod index a1aa60d601ed235ae606fa0a58c74b430a5e5dae..41f0a1e108f5d171612f38b32dff805101c5ba02 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/netobserv/flowlogs-pipeline v0.1.11-0.20231123152750-f3b03fa192aa github.com/netobserv/gopipes v0.3.0 github.com/paulbellamy/ratecounter v0.2.0 - github.com/segmentio/kafka-go v0.4.45 + github.com/segmentio/kafka-go v0.4.46 github.com/sirupsen/logrus v1.9.3 github.com/stretchr/testify v1.8.4 github.com/vishvananda/netlink v1.1.0 diff --git a/go.sum b/go.sum index 2aa3d92a8643b5a0dfd38009d89ec86790cf42b2..97a3694c836574a7bf08bed0e4242f8f07747d83 100644 --- a/go.sum +++ b/go.sum @@ -755,8 +755,8 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= -github.com/segmentio/kafka-go v0.4.45 h1:prqrZp1mMId4kI6pyPolkLsH6sWOUmDxmmucbL4WS6E= -github.com/segmentio/kafka-go v0.4.45/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= +github.com/segmentio/kafka-go v0.4.46 h1:Sx8/kvtY+/G8nM0roTNnFezSJj3bT2sW0Xy/YY3CgBI= +github.com/segmentio/kafka-go v0.4.46/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= diff --git a/vendor/github.com/segmentio/kafka-go/README.md b/vendor/github.com/segmentio/kafka-go/README.md index 304c1603be099ccdadfb04ec49b837f8a229a813..e178788251b6f208872fdc4a0256e8897b0a1412 100644 --- a/vendor/github.com/segmentio/kafka-go/README.md +++ b/vendor/github.com/segmentio/kafka-go/README.md @@ -401,7 +401,7 @@ for i := 0; i < retries; i++ { // attempt to create topic prior to publishing the message err = w.WriteMessages(ctx, messages...) - if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) { + if errors.Is(err, kafka.LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) { time.Sleep(time.Millisecond * 250) continue } diff --git a/vendor/github.com/segmentio/kafka-go/balancer.go b/vendor/github.com/segmentio/kafka-go/balancer.go index f4768cf8835676021a6bd1a8612fd853c200d1a1..4136fce7b81e9adf01451a1ac4b6f44056af1bf9 100644 --- a/vendor/github.com/segmentio/kafka-go/balancer.go +++ b/vendor/github.com/segmentio/kafka-go/balancer.go @@ -36,11 +36,14 @@ func (f BalancerFunc) Balance(msg Message, partitions ...int) int { } // RoundRobin is an Balancer implementation that equally distributes messages -// across all available partitions. +// across all available partitions. It can take an optional chunk size to send +// ChunkSize messages to the same partition before moving to the next partition. +// This can be used to improve batch sizes. type RoundRobin struct { + ChunkSize int // Use a 32 bits integer so RoundRobin values don't need to be aligned to // apply atomic increments. - offset uint32 + counter uint32 } // Balance satisfies the Balancer interface. @@ -49,8 +52,14 @@ func (rr *RoundRobin) Balance(msg Message, partitions ...int) int { } func (rr *RoundRobin) balance(partitions []int) int { - length := uint32(len(partitions)) - offset := atomic.AddUint32(&rr.offset, 1) - 1 + if rr.ChunkSize < 1 { + rr.ChunkSize = 1 + } + + length := len(partitions) + counterNow := atomic.LoadUint32(&rr.counter) + offset := int(counterNow / uint32(rr.ChunkSize)) + atomic.AddUint32(&rr.counter, 1) return partitions[offset%length] } @@ -122,7 +131,7 @@ var ( // // The logic to calculate the partition is: // -// hasher.Sum32() % len(partitions) => partition +// hasher.Sum32() % len(partitions) => partition // // By default, Hash uses the FNV-1a algorithm. This is the same algorithm used // by the Sarama Producer and ensures that messages produced by kafka-go will @@ -173,7 +182,7 @@ func (h *Hash) Balance(msg Message, partitions ...int) int { // // The logic to calculate the partition is: // -// (int32(hasher.Sum32()) & 0x7fffffff) % len(partitions) => partition +// (int32(hasher.Sum32()) & 0x7fffffff) % len(partitions) => partition // // By default, ReferenceHash uses the FNV-1a algorithm. This is the same algorithm as // the Sarama NewReferenceHashPartitioner and ensures that messages produced by kafka-go will diff --git a/vendor/modules.txt b/vendor/modules.txt index e8c32c137f401a55358c8a2cbb250faa7719e267..b4e2e532a618a51446861ac2cd9e5ab9c7a1333b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -340,7 +340,7 @@ github.com/prometheus/prometheus/util/strutil # github.com/rs/xid v1.5.0 ## explicit; go 1.12 github.com/rs/xid -# github.com/segmentio/kafka-go v0.4.45 +# github.com/segmentio/kafka-go v0.4.46 ## explicit; go 1.15 github.com/segmentio/kafka-go github.com/segmentio/kafka-go/compress