diff --git a/go.mod b/go.mod
index ddfeb672f649f8ead8d9149201d1d53681ff4b65..0e69e054dead1dcdef5d99a186f94ebee1c8c1a5 100644
--- a/go.mod
+++ b/go.mod
@@ -13,7 +13,7 @@ require (
 	github.com/netobserv/flowlogs-pipeline v0.1.11-0.20231123152750-f3b03fa192aa
 	github.com/netobserv/gopipes v0.3.0
 	github.com/paulbellamy/ratecounter v0.2.0
-	github.com/segmentio/kafka-go v0.4.46
+	github.com/segmentio/kafka-go v0.4.47
 	github.com/sirupsen/logrus v1.9.3
 	github.com/stretchr/testify v1.8.4
 	github.com/vishvananda/netlink v1.1.0
diff --git a/go.sum b/go.sum
index 1bf1a9c592af7c17dbe0b896a0168625dd018e2d..57bae83feefcdf8bb68fe0ed8b2fdc190d5bcc55 100644
--- a/go.sum
+++ b/go.sum
@@ -755,8 +755,8 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh
 github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
 github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
 github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
-github.com/segmentio/kafka-go v0.4.46 h1:Sx8/kvtY+/G8nM0roTNnFezSJj3bT2sW0Xy/YY3CgBI=
-github.com/segmentio/kafka-go v0.4.46/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
+github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0=
+github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
 github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
 github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg=
 github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
diff --git a/vendor/github.com/segmentio/kafka-go/balancer.go b/vendor/github.com/segmentio/kafka-go/balancer.go
index 4136fce7b81e9adf01451a1ac4b6f44056af1bf9..ee3a258854c739f52641a4f0e13c52ca61fcbf11 100644
--- a/vendor/github.com/segmentio/kafka-go/balancer.go
+++ b/vendor/github.com/segmentio/kafka-go/balancer.go
@@ -7,7 +7,6 @@ import (
 	"math/rand"
 	"sort"
 	"sync"
-	"sync/atomic"
 )
 
 // The Balancer interface provides an abstraction of the message distribution
@@ -42,8 +41,10 @@ func (f BalancerFunc) Balance(msg Message, partitions ...int) int {
 type RoundRobin struct {
 	ChunkSize int
 	// Use a 32 bits integer so RoundRobin values don't need to be aligned to
-	// apply atomic increments.
+	// apply increments.
 	counter uint32
+
+	mutex sync.Mutex
 }
 
 // Balance satisfies the Balancer interface.
@@ -52,14 +53,17 @@ func (rr *RoundRobin) Balance(msg Message, partitions ...int) int {
 }
 
 func (rr *RoundRobin) balance(partitions []int) int {
+	rr.mutex.Lock()
+	defer rr.mutex.Unlock()
+
 	if rr.ChunkSize < 1 {
 		rr.ChunkSize = 1
 	}
 
 	length := len(partitions)
-	counterNow := atomic.LoadUint32(&rr.counter)
+	counterNow := rr.counter
 	offset := int(counterNow / uint32(rr.ChunkSize))
-	atomic.AddUint32(&rr.counter, 1)
+	rr.counter++
 	return partitions[offset%length]
 }
 
diff --git a/vendor/github.com/segmentio/kafka-go/protocol/protocol.go b/vendor/github.com/segmentio/kafka-go/protocol/protocol.go
index ebf91a7982d41b894c813ab380b0bf6b56910907..3d0a7b8ddd8ea40671b0d05b862c6f895e764ac6 100644
--- a/vendor/github.com/segmentio/kafka-go/protocol/protocol.go
+++ b/vendor/github.com/segmentio/kafka-go/protocol/protocol.go
@@ -213,6 +213,37 @@ func Register(req, res Message) {
 	}
 }
 
+// OverrideTypeMessage is an interface implemented by messages that want to override the standard
+// request/response types for a given API.
+type OverrideTypeMessage interface {
+	TypeKey() OverrideTypeKey
+}
+
+type OverrideTypeKey int16
+
+const (
+	RawProduceOverride OverrideTypeKey = 0
+)
+
+var overrideApiTypes [numApis]map[OverrideTypeKey]apiType
+
+func RegisterOverride(req, res Message, key OverrideTypeKey) {
+	k1 := req.ApiKey()
+	k2 := res.ApiKey()
+
+	if k1 != k2 {
+		panic(fmt.Sprintf("[%T/%T]: request and response API keys mismatch: %d != %d", req, res, k1, k2))
+	}
+
+	if overrideApiTypes[k1] == nil {
+		overrideApiTypes[k1] = make(map[OverrideTypeKey]apiType)
+	}
+	overrideApiTypes[k1][key] = apiType{
+		requests:  typesOf(req),
+		responses: typesOf(res),
+	}
+}
+
 func typesOf(v interface{}) []messageType {
 	return makeTypes(reflect.TypeOf(v).Elem())
 }
diff --git a/vendor/github.com/segmentio/kafka-go/protocol/rawproduce/rawproduce.go b/vendor/github.com/segmentio/kafka-go/protocol/rawproduce/rawproduce.go
new file mode 100644
index 0000000000000000000000000000000000000000..bad83138d51b80edece907557b3685d9144663ea
--- /dev/null
+++ b/vendor/github.com/segmentio/kafka-go/protocol/rawproduce/rawproduce.go
@@ -0,0 +1,91 @@
+package rawproduce
+
+import (
+	"fmt"
+
+	"github.com/segmentio/kafka-go/protocol"
+	"github.com/segmentio/kafka-go/protocol/produce"
+)
+
+func init() {
+	// Register a type override so that raw produce requests will be encoded with the correct type.
+	req := &Request{}
+	protocol.RegisterOverride(req, &produce.Response{}, req.TypeKey())
+}
+
+type Request struct {
+	TransactionalID string         `kafka:"min=v3,max=v8,nullable"`
+	Acks            int16          `kafka:"min=v0,max=v8"`
+	Timeout         int32          `kafka:"min=v0,max=v8"`
+	Topics          []RequestTopic `kafka:"min=v0,max=v8"`
+}
+
+func (r *Request) ApiKey() protocol.ApiKey { return protocol.Produce }
+
+func (r *Request) TypeKey() protocol.OverrideTypeKey { return protocol.RawProduceOverride }
+
+func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
+	broker := protocol.Broker{ID: -1}
+
+	for i := range r.Topics {
+		t := &r.Topics[i]
+
+		topic, ok := cluster.Topics[t.Topic]
+		if !ok {
+			return broker, NewError(protocol.NewErrNoTopic(t.Topic))
+		}
+
+		for j := range t.Partitions {
+			p := &t.Partitions[j]
+
+			partition, ok := topic.Partitions[p.Partition]
+			if !ok {
+				return broker, NewError(protocol.NewErrNoPartition(t.Topic, p.Partition))
+			}
+
+			if b, ok := cluster.Brokers[partition.Leader]; !ok {
+				return broker, NewError(protocol.NewErrNoLeader(t.Topic, p.Partition))
+			} else if broker.ID < 0 {
+				broker = b
+			} else if b.ID != broker.ID {
+				return broker, NewError(fmt.Errorf("mismatching leaders (%d!=%d)", b.ID, broker.ID))
+			}
+		}
+	}
+
+	return broker, nil
+}
+
+func (r *Request) HasResponse() bool {
+	return r.Acks != 0
+}
+
+type RequestTopic struct {
+	Topic      string             `kafka:"min=v0,max=v8"`
+	Partitions []RequestPartition `kafka:"min=v0,max=v8"`
+}
+
+type RequestPartition struct {
+	Partition int32                 `kafka:"min=v0,max=v8"`
+	RecordSet protocol.RawRecordSet `kafka:"min=v0,max=v8"`
+}
+
+var (
+	_ protocol.BrokerMessage = (*Request)(nil)
+)
+
+type Error struct {
+	Err error
+}
+
+func NewError(err error) *Error {
+	return &Error{Err: err}
+}
+
+func (e *Error) Error() string {
+	return fmt.Sprintf("fetch request error: %v", e.Err)
+}
+
+func (e *Error) Unwrap() error {
+	return e.Err
+}
diff --git a/vendor/github.com/segmentio/kafka-go/protocol/record.go b/vendor/github.com/segmentio/kafka-go/protocol/record.go
index 84594868b88c530e6a39ca12746e0f1a8b1569e2..e11af4dcc0269660f95c61337cab5dc4ac51c70c 100644
--- a/vendor/github.com/segmentio/kafka-go/protocol/record.go
+++ b/vendor/github.com/segmentio/kafka-go/protocol/record.go
@@ -292,6 +292,46 @@ func (rs *RecordSet) WriteTo(w io.Writer) (int64, error) {
 	return n, nil
 }
 
+// RawRecordSet represents a record set for a RawProduce request. The record set is
+// represented as a raw sequence of pre-encoded record set bytes.
+type RawRecordSet struct {
+	// Reader exposes the raw sequence of record set bytes.
+	Reader io.Reader
+}
+
+// ReadFrom reads the representation of a record set from r into rrs. It re-uses the
+// existing RecordSet.ReadFrom implementation to first read/decode data into a RecordSet,
+// then writes/encodes the RecordSet to a buffer referenced by the RawRecordSet.
+//
+// Note: re-using the RecordSet.ReadFrom implementation makes this suboptimal from a
+// performance standpoint as it require an extra copy of the record bytes. Holding off
+// on optimizing, as this code path is only invoked in tests.
+func (rrs *RawRecordSet) ReadFrom(r io.Reader) (int64, error) {
+	rs := &RecordSet{}
+	n, err := rs.ReadFrom(r)
+	if err != nil {
+		return 0, err
+	}
+
+	buf := &bytes.Buffer{}
+	rs.WriteTo(buf)
+	*rrs = RawRecordSet{
+		Reader: buf,
+	}
+
+	return n, nil
+}
+
+// WriteTo writes the RawRecordSet to an io.Writer. Since this is a raw record set representation, all that is
+// done here is copying bytes from the underlying reader to the specified writer.
+func (rrs *RawRecordSet) WriteTo(w io.Writer) (int64, error) {
+	if rrs.Reader == nil {
+		return 0, ErrNoRecord
+	}
+
+	return io.Copy(w, rrs.Reader)
+}
+
 func makeTime(t int64) time.Time {
 	return time.Unix(t/1000, (t%1000)*int64(time.Millisecond))
 }
diff --git a/vendor/github.com/segmentio/kafka-go/protocol/request.go b/vendor/github.com/segmentio/kafka-go/protocol/request.go
index 8b99e053711b1ad12968df5e2729d12e88b6bf6c..135b938bb441cc816b72b626dd7d3b2b66a97e70 100644
--- a/vendor/github.com/segmentio/kafka-go/protocol/request.go
+++ b/vendor/github.com/segmentio/kafka-go/protocol/request.go
@@ -81,6 +81,12 @@ func WriteRequest(w io.Writer, apiVersion int16, correlationID int32, clientID s
 		return fmt.Errorf("unsupported api: %s", apiNames[apiKey])
 	}
 
+	if typedMessage, ok := msg.(OverrideTypeMessage); ok {
+		typeKey := typedMessage.TypeKey()
+		overrideType := overrideApiTypes[apiKey][typeKey]
+		t = &overrideType
+	}
+
 	minVersion := t.minVersion()
 	maxVersion := t.maxVersion()
 
diff --git a/vendor/github.com/segmentio/kafka-go/protocol/response.go b/vendor/github.com/segmentio/kafka-go/protocol/response.go
index 6194803133c94a165023a24183c1e2924aac0900..a43bd0237a135e6d50fcc6b3a8e9eb6791d74d1b 100644
--- a/vendor/github.com/segmentio/kafka-go/protocol/response.go
+++ b/vendor/github.com/segmentio/kafka-go/protocol/response.go
@@ -95,6 +95,12 @@ func WriteResponse(w io.Writer, apiVersion int16, correlationID int32, msg Messa
 		return fmt.Errorf("unsupported api: %s", apiNames[apiKey])
 	}
 
+	if typedMessage, ok := msg.(OverrideTypeMessage); ok {
+		typeKey := typedMessage.TypeKey()
+		overrideType := overrideApiTypes[apiKey][typeKey]
+		t = &overrideType
+	}
+
 	minVersion := t.minVersion()
 	maxVersion := t.maxVersion()
 
diff --git a/vendor/github.com/segmentio/kafka-go/rawproduce.go b/vendor/github.com/segmentio/kafka-go/rawproduce.go
new file mode 100644
index 0000000000000000000000000000000000000000..5928cb2f834f8cfbc59e3be788a624cefe312ce2
--- /dev/null
+++ b/vendor/github.com/segmentio/kafka-go/rawproduce.go
@@ -0,0 +1,103 @@
+package kafka
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"net"
+
+	"github.com/segmentio/kafka-go/protocol"
+	produceAPI "github.com/segmentio/kafka-go/protocol/produce"
+	"github.com/segmentio/kafka-go/protocol/rawproduce"
+)
+
+// RawProduceRequest represents a request sent to a kafka broker to produce records
+// to a topic partition. The request contains a pre-encoded/raw record set.
+type RawProduceRequest struct {
+	// Address of the kafka broker to send the request to.
+	Addr net.Addr
+
+	// The topic to produce the records to.
+	Topic string
+
+	// The partition to produce the records to.
+	Partition int
+
+	// The level of required acknowledgements to ask the kafka broker for.
+	RequiredAcks RequiredAcks
+
+	// The message format version used when encoding the records.
+	//
+	// By default, the client automatically determine which version should be
+	// used based on the version of the Produce API supported by the server.
+	MessageVersion int
+
+	// An optional transaction id when producing to the kafka broker is part of
+	// a transaction.
+	TransactionalID string
+
+	// The sequence of records to produce to the topic partition.
+	RawRecords protocol.RawRecordSet
+}
+
+// RawProduce sends a raw produce request to a kafka broker and returns the response.
+//
+// If the request contained no records, an error wrapping protocol.ErrNoRecord
+// is returned.
+//
+// When the request is configured with RequiredAcks=none, both the response and
+// the error will be nil on success.
+func (c *Client) RawProduce(ctx context.Context, req *RawProduceRequest) (*ProduceResponse, error) {
+	m, err := c.roundTrip(ctx, req.Addr, &rawproduce.Request{
+		TransactionalID: req.TransactionalID,
+		Acks:            int16(req.RequiredAcks),
+		Timeout:         c.timeoutMs(ctx, defaultProduceTimeout),
+		Topics: []rawproduce.RequestTopic{{
+			Topic: req.Topic,
+			Partitions: []rawproduce.RequestPartition{{
+				Partition: int32(req.Partition),
+				RecordSet: req.RawRecords,
+			}},
+		}},
+	})
+
+	switch {
+	case err == nil:
+	case errors.Is(err, protocol.ErrNoRecord):
+		return new(ProduceResponse), nil
+	default:
+		return nil, fmt.Errorf("kafka.(*Client).RawProduce: %w", err)
+	}
+
+	if req.RequiredAcks == RequireNone {
+		return nil, nil
+	}
+
+	res := m.(*produceAPI.Response)
+	if len(res.Topics) == 0 {
+		return nil, fmt.Errorf("kafka.(*Client).RawProduce: %w", protocol.ErrNoTopic)
+	}
+	topic := &res.Topics[0]
+	if len(topic.Partitions) == 0 {
+		return nil, fmt.Errorf("kafka.(*Client).RawProduce: %w", protocol.ErrNoPartition)
+	}
+	partition := &topic.Partitions[0]
+
+	ret := &ProduceResponse{
+		Throttle:       makeDuration(res.ThrottleTimeMs),
+		Error:          makeError(partition.ErrorCode, partition.ErrorMessage),
+		BaseOffset:     partition.BaseOffset,
+		LogAppendTime:  makeTime(partition.LogAppendTime),
+		LogStartOffset: partition.LogStartOffset,
+	}
+
+	if len(partition.RecordErrors) != 0 {
+		ret.RecordErrors = make(map[int]error, len(partition.RecordErrors))
+
+		for _, recErr := range partition.RecordErrors {
+			ret.RecordErrors[int(recErr.BatchIndex)] = errors.New(recErr.BatchIndexErrorMessage)
+		}
+	}
+
+	return ret, nil
+}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index f735bcf604de71f1505ba311e9c86416f24a6d9e..b2ea8da0be0c6fac269450e345683ad0660917a5 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -340,7 +340,7 @@ github.com/prometheus/prometheus/util/strutil
 # github.com/rs/xid v1.5.0
 ## explicit; go 1.12
 github.com/rs/xid
-# github.com/segmentio/kafka-go v0.4.46
+# github.com/segmentio/kafka-go v0.4.47
 ## explicit; go 1.15
 github.com/segmentio/kafka-go
 github.com/segmentio/kafka-go/compress
@@ -385,6 +385,7 @@ github.com/segmentio/kafka-go/protocol/offsetcommit
 github.com/segmentio/kafka-go/protocol/offsetdelete
 github.com/segmentio/kafka-go/protocol/offsetfetch
 github.com/segmentio/kafka-go/protocol/produce
+github.com/segmentio/kafka-go/protocol/rawproduce
 github.com/segmentio/kafka-go/protocol/saslauthenticate
 github.com/segmentio/kafka-go/protocol/saslhandshake
 github.com/segmentio/kafka-go/protocol/syncgroup