diff --git a/go.mod b/go.mod
index 0323789dcfb468b23fcee8454162785ed3ff7bd9..a1aa60d601ed235ae606fa0a58c74b430a5e5dae 100644
--- a/go.mod
+++ b/go.mod
@@ -13,7 +13,7 @@ require (
 	github.com/netobserv/flowlogs-pipeline v0.1.11-0.20231123152750-f3b03fa192aa
 	github.com/netobserv/gopipes v0.3.0
 	github.com/paulbellamy/ratecounter v0.2.0
-	github.com/segmentio/kafka-go v0.4.44
+	github.com/segmentio/kafka-go v0.4.45
 	github.com/sirupsen/logrus v1.9.3
 	github.com/stretchr/testify v1.8.4
 	github.com/vishvananda/netlink v1.1.0
diff --git a/go.sum b/go.sum
index 0aa47f2464d4d14192e403a3918d1d4139660090..2aa3d92a8643b5a0dfd38009d89ec86790cf42b2 100644
--- a/go.sum
+++ b/go.sum
@@ -755,8 +755,8 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh
 github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
 github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
 github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
-github.com/segmentio/kafka-go v0.4.44 h1:Vjjksniy0WSTZ7CuVJrz1k04UoZeTc77UV6Yyk6tLY4=
-github.com/segmentio/kafka-go v0.4.44/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
+github.com/segmentio/kafka-go v0.4.45 h1:prqrZp1mMId4kI6pyPolkLsH6sWOUmDxmmucbL4WS6E=
+github.com/segmentio/kafka-go v0.4.45/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVOfURM5KMd8qg=
 github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
 github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg=
 github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
diff --git a/vendor/github.com/segmentio/kafka-go/alterpartitionreassignments.go b/vendor/github.com/segmentio/kafka-go/alterpartitionreassignments.go
index ec76dbd8bafe90c23e1cb53694829f649eee5ec0..dd67d003b32331da5e273e271c30bcfc1d5d527e 100644
--- a/vendor/github.com/segmentio/kafka-go/alterpartitionreassignments.go
+++ b/vendor/github.com/segmentio/kafka-go/alterpartitionreassignments.go
@@ -13,7 +13,8 @@ type AlterPartitionReassignmentsRequest struct {
 	// Address of the kafka broker to send the request to.
 	Addr net.Addr
 
-	// Topic is the name of the topic to alter partitions in.
+	// Topic is the name of the topic to alter partitions in. Keep this field empty and use Topic in AlterPartitionReassignmentsRequestAssignment to
+	// reassign to multiple topics.
 	Topic string
 
 	// Assignments is the list of partition reassignments to submit to the API.
@@ -26,10 +27,13 @@ type AlterPartitionReassignmentsRequest struct {
 // AlterPartitionReassignmentsRequestAssignment contains the requested reassignments for a single
 // partition.
 type AlterPartitionReassignmentsRequestAssignment struct {
+	// Topic is the name of the topic to alter partitions in. If empty, the value of Topic in AlterPartitionReassignmentsRequest is used.
+	Topic string
+
 	// PartitionID is the ID of the partition to make the reassignments in.
 	PartitionID int
 
-	// BrokerIDs is a slice of brokers to set the partition replicas to.
+	// BrokerIDs is a slice of brokers to set the partition replicas to, or null to cancel a pending reassignment for this partition.
 	BrokerIDs []int
 }
 
@@ -46,6 +50,9 @@ type AlterPartitionReassignmentsResponse struct {
 // AlterPartitionReassignmentsResponsePartitionResult contains the detailed result of
 // doing reassignments for a single partition.
 type AlterPartitionReassignmentsResponsePartitionResult struct {
+	// Topic is the topic name.
+	Topic string
+
 	// PartitionID is the ID of the partition that was altered.
 	PartitionID int
 
@@ -58,16 +65,29 @@ func (c *Client) AlterPartitionReassignments(
 	ctx context.Context,
 	req *AlterPartitionReassignmentsRequest,
 ) (*AlterPartitionReassignmentsResponse, error) {
-	apiPartitions := []alterpartitionreassignments.RequestPartition{}
+	apiTopicMap := make(map[string]*alterpartitionreassignments.RequestTopic)
 
 	for _, assignment := range req.Assignments {
+		topic := assignment.Topic
+		if topic == "" {
+			topic = req.Topic
+		}
+
+		apiTopic := apiTopicMap[topic]
+		if apiTopic == nil {
+			apiTopic = &alterpartitionreassignments.RequestTopic{
+				Name: topic,
+			}
+			apiTopicMap[topic] = apiTopic
+		}
+
 		replicas := []int32{}
 		for _, brokerID := range assignment.BrokerIDs {
 			replicas = append(replicas, int32(brokerID))
 		}
 
-		apiPartitions = append(
-			apiPartitions,
+		apiTopic.Partitions = append(
+			apiTopic.Partitions,
 			alterpartitionreassignments.RequestPartition{
 				PartitionIndex: int32(assignment.PartitionID),
 				Replicas:       replicas,
@@ -77,12 +97,10 @@ func (c *Client) AlterPartitionReassignments(
 
 	apiReq := &alterpartitionreassignments.Request{
 		TimeoutMs: int32(req.Timeout.Milliseconds()),
-		Topics: []alterpartitionreassignments.RequestTopic{
-			{
-				Name:       req.Topic,
-				Partitions: apiPartitions,
-			},
-		},
+	}
+
+	for _, apiTopic := range apiTopicMap {
+		apiReq.Topics = append(apiReq.Topics, *apiTopic)
 	}
 
 	protoResp, err := c.roundTrip(
@@ -104,6 +122,7 @@ func (c *Client) AlterPartitionReassignments(
 			resp.PartitionResults = append(
 				resp.PartitionResults,
 				AlterPartitionReassignmentsResponsePartitionResult{
+					Topic:       topicResult.Name,
 					PartitionID: int(partitionResult.PartitionIndex),
 					Error:       makeError(partitionResult.ErrorCode, partitionResult.ErrorMessage),
 				},
diff --git a/vendor/github.com/segmentio/kafka-go/createacls.go b/vendor/github.com/segmentio/kafka-go/createacls.go
index 672f6fdce9187719d071754242c8bfbb3c9b5314..60197417175927be261b8ab6005232ac0135c5a9 100644
--- a/vendor/github.com/segmentio/kafka-go/createacls.go
+++ b/vendor/github.com/segmentio/kafka-go/createacls.go
@@ -4,6 +4,7 @@ import (
 	"context"
 	"fmt"
 	"net"
+	"strings"
 	"time"
 
 	"github.com/segmentio/kafka-go/protocol/createacls"
@@ -42,6 +43,43 @@ const (
 	ACLPermissionTypeAllow   ACLPermissionType = 3
 )
 
+func (apt ACLPermissionType) String() string {
+	mapping := map[ACLPermissionType]string{
+		ACLPermissionTypeUnknown: "Unknown",
+		ACLPermissionTypeAny:     "Any",
+		ACLPermissionTypeDeny:    "Deny",
+		ACLPermissionTypeAllow:   "Allow",
+	}
+	s, ok := mapping[apt]
+	if !ok {
+		s = mapping[ACLPermissionTypeUnknown]
+	}
+	return s
+}
+
+// MarshalText transforms an ACLPermissionType into its string representation.
+func (apt ACLPermissionType) MarshalText() ([]byte, error) {
+	return []byte(apt.String()), nil
+}
+
+// UnmarshalText takes a string representation of the resource type and converts it to an ACLPermissionType.
+func (apt *ACLPermissionType) UnmarshalText(text []byte) error {
+	normalized := strings.ToLower(string(text))
+	mapping := map[string]ACLPermissionType{
+		"unknown": ACLPermissionTypeUnknown,
+		"any":     ACLPermissionTypeAny,
+		"deny":    ACLPermissionTypeDeny,
+		"allow":   ACLPermissionTypeAllow,
+	}
+	parsed, ok := mapping[normalized]
+	if !ok {
+		*apt = ACLPermissionTypeUnknown
+		return fmt.Errorf("cannot parse %s as an ACLPermissionType", normalized)
+	}
+	*apt = parsed
+	return nil
+}
+
 type ACLOperationType int8
 
 const (
@@ -60,6 +98,62 @@ const (
 	ACLOperationTypeIdempotentWrite ACLOperationType = 12
 )
 
+func (aot ACLOperationType) String() string {
+	mapping := map[ACLOperationType]string{
+		ACLOperationTypeUnknown:         "Unknown",
+		ACLOperationTypeAny:             "Any",
+		ACLOperationTypeAll:             "All",
+		ACLOperationTypeRead:            "Read",
+		ACLOperationTypeWrite:           "Write",
+		ACLOperationTypeCreate:          "Create",
+		ACLOperationTypeDelete:          "Delete",
+		ACLOperationTypeAlter:           "Alter",
+		ACLOperationTypeDescribe:        "Describe",
+		ACLOperationTypeClusterAction:   "ClusterAction",
+		ACLOperationTypeDescribeConfigs: "DescribeConfigs",
+		ACLOperationTypeAlterConfigs:    "AlterConfigs",
+		ACLOperationTypeIdempotentWrite: "IdempotentWrite",
+	}
+	s, ok := mapping[aot]
+	if !ok {
+		s = mapping[ACLOperationTypeUnknown]
+	}
+	return s
+}
+
+// MarshalText transforms an ACLOperationType into its string representation.
+func (aot ACLOperationType) MarshalText() ([]byte, error) {
+	return []byte(aot.String()), nil
+}
+
+// UnmarshalText takes a string representation of the resource type and converts it to an ACLPermissionType.
+func (aot *ACLOperationType) UnmarshalText(text []byte) error {
+	normalized := strings.ToLower(string(text))
+	mapping := map[string]ACLOperationType{
+		"unknown":         ACLOperationTypeUnknown,
+		"any":             ACLOperationTypeAny,
+		"all":             ACLOperationTypeAll,
+		"read":            ACLOperationTypeRead,
+		"write":           ACLOperationTypeWrite,
+		"create":          ACLOperationTypeCreate,
+		"delete":          ACLOperationTypeDelete,
+		"alter":           ACLOperationTypeAlter,
+		"describe":        ACLOperationTypeDescribe,
+		"clusteraction":   ACLOperationTypeClusterAction,
+		"describeconfigs": ACLOperationTypeDescribeConfigs,
+		"alterconfigs":    ACLOperationTypeAlterConfigs,
+		"idempotentwrite": ACLOperationTypeIdempotentWrite,
+	}
+	parsed, ok := mapping[normalized]
+	if !ok {
+		*aot = ACLOperationTypeUnknown
+		return fmt.Errorf("cannot parse %s as an ACLOperationType", normalized)
+	}
+	*aot = parsed
+	return nil
+
+}
+
 type ACLEntry struct {
 	ResourceType        ResourceType
 	ResourceName        string
diff --git a/vendor/github.com/segmentio/kafka-go/listpartitionreassignments.go b/vendor/github.com/segmentio/kafka-go/listpartitionreassignments.go
new file mode 100644
index 0000000000000000000000000000000000000000..aa01fff3f1b20784028ddfbf2f68676664983e1f
--- /dev/null
+++ b/vendor/github.com/segmentio/kafka-go/listpartitionreassignments.go
@@ -0,0 +1,135 @@
+package kafka
+
+import (
+	"context"
+	"net"
+	"time"
+
+	"github.com/segmentio/kafka-go/protocol/listpartitionreassignments"
+)
+
+// ListPartitionReassignmentsRequest is a request to the ListPartitionReassignments API.
+type ListPartitionReassignmentsRequest struct {
+	// Address of the kafka broker to send the request to.
+	Addr net.Addr
+
+	// Topics we want reassignments for, mapped by their name, or nil to list everything.
+	Topics map[string]ListPartitionReassignmentsRequestTopic
+
+	// Timeout is the amount of time to wait for the request to complete.
+	Timeout time.Duration
+}
+
+// ListPartitionReassignmentsRequestTopic contains the requested partitions for a single
+// topic.
+type ListPartitionReassignmentsRequestTopic struct {
+	// The partitions to list partition reassignments for.
+	PartitionIndexes []int
+}
+
+// ListPartitionReassignmentsResponse is a response from the ListPartitionReassignments API.
+type ListPartitionReassignmentsResponse struct {
+	// Error is set to a non-nil value including the code and message if a top-level
+	// error was encountered.
+	Error error
+
+	// Topics contains results for each topic, mapped by their name.
+	Topics map[string]ListPartitionReassignmentsResponseTopic
+}
+
+// ListPartitionReassignmentsResponseTopic contains the detailed result of
+// ongoing reassignments for a topic.
+type ListPartitionReassignmentsResponseTopic struct {
+	// Partitions contains result for topic partitions.
+	Partitions []ListPartitionReassignmentsResponsePartition
+}
+
+// ListPartitionReassignmentsResponsePartition contains the detailed result of
+// ongoing reassignments for a single partition.
+type ListPartitionReassignmentsResponsePartition struct {
+	// PartitionIndex contains index of the partition.
+	PartitionIndex int
+
+	// Replicas contains the current replica set.
+	Replicas []int
+
+	// AddingReplicas contains the set of replicas we are currently adding.
+	AddingReplicas []int
+
+	// RemovingReplicas contains the set of replicas we are currently removing.
+	RemovingReplicas []int
+}
+
+func (c *Client) ListPartitionReassignments(
+	ctx context.Context,
+	req *ListPartitionReassignmentsRequest,
+) (*ListPartitionReassignmentsResponse, error) {
+	apiReq := &listpartitionreassignments.Request{
+		TimeoutMs: int32(req.Timeout.Milliseconds()),
+	}
+
+	for topicName, topicReq := range req.Topics {
+		apiReq.Topics = append(
+			apiReq.Topics,
+			listpartitionreassignments.RequestTopic{
+				Name:             topicName,
+				PartitionIndexes: intToInt32Array(topicReq.PartitionIndexes),
+			},
+		)
+	}
+
+	protoResp, err := c.roundTrip(
+		ctx,
+		req.Addr,
+		apiReq,
+	)
+	if err != nil {
+		return nil, err
+	}
+	apiResp := protoResp.(*listpartitionreassignments.Response)
+
+	resp := &ListPartitionReassignmentsResponse{
+		Error:  makeError(apiResp.ErrorCode, apiResp.ErrorMessage),
+		Topics: make(map[string]ListPartitionReassignmentsResponseTopic),
+	}
+
+	for _, topicResult := range apiResp.Topics {
+		respTopic := ListPartitionReassignmentsResponseTopic{}
+		for _, partitionResult := range topicResult.Partitions {
+			respTopic.Partitions = append(
+				respTopic.Partitions,
+				ListPartitionReassignmentsResponsePartition{
+					PartitionIndex:   int(partitionResult.PartitionIndex),
+					Replicas:         int32ToIntArray(partitionResult.Replicas),
+					AddingReplicas:   int32ToIntArray(partitionResult.AddingReplicas),
+					RemovingReplicas: int32ToIntArray(partitionResult.RemovingReplicas),
+				},
+			)
+		}
+		resp.Topics[topicResult.Name] = respTopic
+	}
+
+	return resp, nil
+}
+
+func intToInt32Array(arr []int) []int32 {
+	if arr == nil {
+		return nil
+	}
+	res := make([]int32, len(arr))
+	for i := range arr {
+		res[i] = int32(arr[i])
+	}
+	return res
+}
+
+func int32ToIntArray(arr []int32) []int {
+	if arr == nil {
+		return nil
+	}
+	res := make([]int, len(arr))
+	for i := range arr {
+		res[i] = int(arr[i])
+	}
+	return res
+}
diff --git a/vendor/github.com/segmentio/kafka-go/protocol/alterpartitionreassignments/alterpartitionreassignments.go b/vendor/github.com/segmentio/kafka-go/protocol/alterpartitionreassignments/alterpartitionreassignments.go
index 4894a2e6a7821ce0e14c8fdee7fcf30505d4304d..7f8d2ed2fa4fcbbde18beda905415fdaf4f5053a 100644
--- a/vendor/github.com/segmentio/kafka-go/protocol/alterpartitionreassignments/alterpartitionreassignments.go
+++ b/vendor/github.com/segmentio/kafka-go/protocol/alterpartitionreassignments/alterpartitionreassignments.go
@@ -23,7 +23,7 @@ type RequestTopic struct {
 
 type RequestPartition struct {
 	PartitionIndex int32   `kafka:"min=v0,max=v0"`
-	Replicas       []int32 `kafka:"min=v0,max=v0"`
+	Replicas       []int32 `kafka:"min=v0,max=v0,nullable"`
 }
 
 func (r *Request) ApiKey() protocol.ApiKey {
diff --git a/vendor/github.com/segmentio/kafka-go/protocol/listpartitionreassignments/listpartitionreassignments.go b/vendor/github.com/segmentio/kafka-go/protocol/listpartitionreassignments/listpartitionreassignments.go
new file mode 100644
index 0000000000000000000000000000000000000000..d26a641016e7074d57f2d6585554f9906020be43
--- /dev/null
+++ b/vendor/github.com/segmentio/kafka-go/protocol/listpartitionreassignments/listpartitionreassignments.go
@@ -0,0 +1,70 @@
+package listpartitionreassignments
+
+import "github.com/segmentio/kafka-go/protocol"
+
+func init() {
+	protocol.Register(&Request{}, &Response{})
+}
+
+// Detailed API definition: https://kafka.apache.org/protocol#The_Messages_ListPartitionReassignments.
+
+type Request struct {
+	// We need at least one tagged field to indicate that this is a "flexible" message
+	// type.
+	_ struct{} `kafka:"min=v0,max=v0,tag"`
+
+	TimeoutMs int32          `kafka:"min=v0,max=v0"`
+	Topics    []RequestTopic `kafka:"min=v0,max=v0,nullable"`
+}
+
+type RequestTopic struct {
+	// We need at least one tagged field to indicate that this is a "flexible" message
+	// type.
+	_ struct{} `kafka:"min=v0,max=v0,tag"`
+
+	Name             string  `kafka:"min=v0,max=v0"`
+	PartitionIndexes []int32 `kafka:"min=v0,max=v0"`
+}
+
+func (r *Request) ApiKey() protocol.ApiKey {
+	return protocol.ListPartitionReassignments
+}
+
+func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) {
+	return cluster.Brokers[cluster.Controller], nil
+}
+
+type Response struct {
+	// We need at least one tagged field to indicate that this is a "flexible" message
+	// type.
+	_ struct{} `kafka:"min=v0,max=v0,tag"`
+
+	ThrottleTimeMs int32           `kafka:"min=v0,max=v0"`
+	ErrorCode      int16           `kafka:"min=v0,max=v0"`
+	ErrorMessage   string          `kafka:"min=v0,max=v0,nullable"`
+	Topics         []ResponseTopic `kafka:"min=v0,max=v0"`
+}
+
+type ResponseTopic struct {
+	// We need at least one tagged field to indicate that this is a "flexible" message
+	// type.
+	_ struct{} `kafka:"min=v0,max=v0,tag"`
+
+	Name       string              `kafka:"min=v0,max=v0"`
+	Partitions []ResponsePartition `kafka:"min=v0,max=v0"`
+}
+
+type ResponsePartition struct {
+	// We need at least one tagged field to indicate that this is a "flexible" message
+	// type.
+	_ struct{} `kafka:"min=v0,max=v0,tag"`
+
+	PartitionIndex   int32   `kafka:"min=v0,max=v0"`
+	Replicas         []int32 `kafka:"min=v0,max=v0"`
+	AddingReplicas   []int32 `kafka:"min=v0,max=v0"`
+	RemovingReplicas []int32 `kafka:"min=v0,max=v0"`
+}
+
+func (r *Response) ApiKey() protocol.ApiKey {
+	return protocol.ListPartitionReassignments
+}
diff --git a/vendor/github.com/segmentio/kafka-go/resource.go b/vendor/github.com/segmentio/kafka-go/resource.go
index f5c2e73a5269ddead846c5014d6f9cc626241bc4..b9be107c27ef7ea5e215506d176fbec98c2d9557 100644
--- a/vendor/github.com/segmentio/kafka-go/resource.go
+++ b/vendor/github.com/segmentio/kafka-go/resource.go
@@ -1,5 +1,10 @@
 package kafka
 
+import (
+	"fmt"
+	"strings"
+)
+
 // https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
 type ResourceType int8
 
@@ -15,6 +20,50 @@ const (
 	ResourceTypeDelegationToken ResourceType = 6
 )
 
+func (rt ResourceType) String() string {
+	mapping := map[ResourceType]string{
+		ResourceTypeUnknown: "Unknown",
+		ResourceTypeAny:     "Any",
+		ResourceTypeTopic:   "Topic",
+		ResourceTypeGroup:   "Group",
+		// Note that ResourceTypeBroker and ResourceTypeCluster have the same value.
+		// A map cannot have duplicate values so we just use the same value for both.
+		ResourceTypeCluster:         "Cluster",
+		ResourceTypeTransactionalID: "Transactionalid",
+		ResourceTypeDelegationToken: "Delegationtoken",
+	}
+	s, ok := mapping[rt]
+	if !ok {
+		s = mapping[ResourceTypeUnknown]
+	}
+	return s
+}
+
+func (rt ResourceType) MarshalText() ([]byte, error) {
+	return []byte(rt.String()), nil
+}
+
+func (rt *ResourceType) UnmarshalText(text []byte) error {
+	normalized := strings.ToLower(string(text))
+	mapping := map[string]ResourceType{
+		"unknown":         ResourceTypeUnknown,
+		"any":             ResourceTypeAny,
+		"topic":           ResourceTypeTopic,
+		"group":           ResourceTypeGroup,
+		"broker":          ResourceTypeBroker,
+		"cluster":         ResourceTypeCluster,
+		"transactionalid": ResourceTypeTransactionalID,
+		"delegationtoken": ResourceTypeDelegationToken,
+	}
+	parsed, ok := mapping[normalized]
+	if !ok {
+		*rt = ResourceTypeUnknown
+		return fmt.Errorf("cannot parse %s as a ResourceType", normalized)
+	}
+	*rt = parsed
+	return nil
+}
+
 // https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/PatternType.java
 type PatternType int8
 
@@ -35,3 +84,40 @@ const (
 	// that start with 'foo'.
 	PatternTypePrefixed PatternType = 4
 )
+
+func (pt PatternType) String() string {
+	mapping := map[PatternType]string{
+		PatternTypeUnknown:  "Unknown",
+		PatternTypeAny:      "Any",
+		PatternTypeMatch:    "Match",
+		PatternTypeLiteral:  "Literal",
+		PatternTypePrefixed: "Prefixed",
+	}
+	s, ok := mapping[pt]
+	if !ok {
+		s = mapping[PatternTypeUnknown]
+	}
+	return s
+}
+
+func (pt PatternType) MarshalText() ([]byte, error) {
+	return []byte(pt.String()), nil
+}
+
+func (pt *PatternType) UnmarshalText(text []byte) error {
+	normalized := strings.ToLower(string(text))
+	mapping := map[string]PatternType{
+		"unknown":  PatternTypeUnknown,
+		"any":      PatternTypeAny,
+		"match":    PatternTypeMatch,
+		"literal":  PatternTypeLiteral,
+		"prefixed": PatternTypePrefixed,
+	}
+	parsed, ok := mapping[normalized]
+	if !ok {
+		*pt = PatternTypeUnknown
+		return fmt.Errorf("cannot parse %s as a PatternType", normalized)
+	}
+	*pt = parsed
+	return nil
+}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 4ec00af01783de112900e251422958d158048956..e8c32c137f401a55358c8a2cbb250faa7719e267 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -340,7 +340,7 @@ github.com/prometheus/prometheus/util/strutil
 # github.com/rs/xid v1.5.0
 ## explicit; go 1.12
 github.com/rs/xid
-# github.com/segmentio/kafka-go v0.4.44
+# github.com/segmentio/kafka-go v0.4.45
 ## explicit; go 1.15
 github.com/segmentio/kafka-go
 github.com/segmentio/kafka-go/compress
@@ -379,6 +379,7 @@ github.com/segmentio/kafka-go/protocol/joingroup
 github.com/segmentio/kafka-go/protocol/leavegroup
 github.com/segmentio/kafka-go/protocol/listgroups
 github.com/segmentio/kafka-go/protocol/listoffsets
+github.com/segmentio/kafka-go/protocol/listpartitionreassignments
 github.com/segmentio/kafka-go/protocol/metadata
 github.com/segmentio/kafka-go/protocol/offsetcommit
 github.com/segmentio/kafka-go/protocol/offsetdelete