From 1867e5e834a1a129ca0f277398dc8c2ca25465dd Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 11 Dec 2023 12:33:59 +0000 Subject: [PATCH] Bump github.com/segmentio/kafka-go from 0.4.45 to 0.4.46 (#234) Bumps [github.com/segmentio/kafka-go](https://github.com/segmentio/kafka-go) from 0.4.45 to 0.4.46. - [Release notes](https://github.com/segmentio/kafka-go/releases) - [Commits](https://github.com/segmentio/kafka-go/compare/v0.4.45...v0.4.46) --- updated-dependencies: - dependency-name: github.com/segmentio/kafka-go dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 ++-- .../github.com/segmentio/kafka-go/README.md | 2 +- .../github.com/segmentio/kafka-go/balancer.go | 21 +++++++++++++------ vendor/modules.txt | 2 +- 5 files changed, 20 insertions(+), 11 deletions(-) diff --git a/go.mod b/go.mod index a1aa60d6..41f0a1e1 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/netobserv/flowlogs-pipeline v0.1.11-0.20231123152750-f3b03fa192aa github.com/netobserv/gopipes v0.3.0 github.com/paulbellamy/ratecounter v0.2.0 - github.com/segmentio/kafka-go v0.4.45 + github.com/segmentio/kafka-go v0.4.46 github.com/sirupsen/logrus v1.9.3 github.com/stretchr/testify v1.8.4 github.com/vishvananda/netlink v1.1.0 diff --git a/go.sum b/go.sum index 2aa3d92a..97a3694c 100644 --- a/go.sum +++ b/go.sum @@ -755,8 +755,8 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= -github.com/segmentio/kafka-go v0.4.45 h1:prqrZp1mMId4kI6pyPolkLsH6sWOUmDxmmucbL4WS6E= -github.com/segmentio/kafka-go v0.4.45/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= +github.com/segmentio/kafka-go v0.4.46 h1:Sx8/kvtY+/G8nM0roTNnFezSJj3bT2sW0Xy/YY3CgBI= +github.com/segmentio/kafka-go v0.4.46/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= diff --git a/vendor/github.com/segmentio/kafka-go/README.md b/vendor/github.com/segmentio/kafka-go/README.md index 304c1603..e1787882 100644 --- a/vendor/github.com/segmentio/kafka-go/README.md +++ b/vendor/github.com/segmentio/kafka-go/README.md @@ -401,7 +401,7 @@ for i := 0; i < retries; i++ { // attempt to create topic prior to publishing the message err = w.WriteMessages(ctx, messages...) - if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) { + if errors.Is(err, kafka.LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) { time.Sleep(time.Millisecond * 250) continue } diff --git a/vendor/github.com/segmentio/kafka-go/balancer.go b/vendor/github.com/segmentio/kafka-go/balancer.go index f4768cf8..4136fce7 100644 --- a/vendor/github.com/segmentio/kafka-go/balancer.go +++ b/vendor/github.com/segmentio/kafka-go/balancer.go @@ -36,11 +36,14 @@ func (f BalancerFunc) Balance(msg Message, partitions ...int) int { } // RoundRobin is an Balancer implementation that equally distributes messages -// across all available partitions. +// across all available partitions. It can take an optional chunk size to send +// ChunkSize messages to the same partition before moving to the next partition. +// This can be used to improve batch sizes. type RoundRobin struct { + ChunkSize int // Use a 32 bits integer so RoundRobin values don't need to be aligned to // apply atomic increments. - offset uint32 + counter uint32 } // Balance satisfies the Balancer interface. @@ -49,8 +52,14 @@ func (rr *RoundRobin) Balance(msg Message, partitions ...int) int { } func (rr *RoundRobin) balance(partitions []int) int { - length := uint32(len(partitions)) - offset := atomic.AddUint32(&rr.offset, 1) - 1 + if rr.ChunkSize < 1 { + rr.ChunkSize = 1 + } + + length := len(partitions) + counterNow := atomic.LoadUint32(&rr.counter) + offset := int(counterNow / uint32(rr.ChunkSize)) + atomic.AddUint32(&rr.counter, 1) return partitions[offset%length] } @@ -122,7 +131,7 @@ var ( // // The logic to calculate the partition is: // -// hasher.Sum32() % len(partitions) => partition +// hasher.Sum32() % len(partitions) => partition // // By default, Hash uses the FNV-1a algorithm. This is the same algorithm used // by the Sarama Producer and ensures that messages produced by kafka-go will @@ -173,7 +182,7 @@ func (h *Hash) Balance(msg Message, partitions ...int) int { // // The logic to calculate the partition is: // -// (int32(hasher.Sum32()) & 0x7fffffff) % len(partitions) => partition +// (int32(hasher.Sum32()) & 0x7fffffff) % len(partitions) => partition // // By default, ReferenceHash uses the FNV-1a algorithm. This is the same algorithm as // the Sarama NewReferenceHashPartitioner and ensures that messages produced by kafka-go will diff --git a/vendor/modules.txt b/vendor/modules.txt index e8c32c13..b4e2e532 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -340,7 +340,7 @@ github.com/prometheus/prometheus/util/strutil # github.com/rs/xid v1.5.0 ## explicit; go 1.12 github.com/rs/xid -# github.com/segmentio/kafka-go v0.4.45 +# github.com/segmentio/kafka-go v0.4.46 ## explicit; go 1.15 github.com/segmentio/kafka-go github.com/segmentio/kafka-go/compress -- GitLab