diff --git a/go.mod b/go.mod
index 5b030c9c6aab17cf2bd1fa1fb6ffa9dfa3a0ca9d..bf777ab8f543c53215faac6370ccdb4f5ece48a3 100644
--- a/go.mod
+++ b/go.mod
@@ -12,7 +12,7 @@ require (
 	github.com/gopacket/gopacket v1.3.1
 	github.com/mariomac/guara v0.0.0-20250408105519-1e4dbdfb7136
 	github.com/mdlayher/ethernet v0.0.0-20220221185849-529eae5b6118
-	github.com/netobserv/flowlogs-pipeline v1.9.0-crc0.0.20250425123404-cab7e8f74fae
+	github.com/netobserv/flowlogs-pipeline v1.9.0-crc0.0.20250502080315-db59695ae6de
 	github.com/netobserv/gopipes v0.3.0
 	github.com/ovn-org/ovn-kubernetes/go-controller v0.0.0-20250227173154-57a2590a1d16
 	github.com/paulbellamy/ratecounter v0.2.0
@@ -71,7 +71,7 @@ require (
 	github.com/google/go-cmp v0.7.0 // indirect
 	github.com/google/gofuzz v1.2.0 // indirect
 	github.com/google/uuid v1.6.0 // indirect
-	github.com/gorilla/websocket v1.5.3 // indirect
+	github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect
 	github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect
 	github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 // indirect
 	github.com/heptiolabs/healthcheck v0.0.0-20211123025425-613501dd5deb // indirect
diff --git a/go.sum b/go.sum
index 91b7992dc1ee9d349a3a8e40345b6c101fdb406d..ed877dad7b5d57379e89d8b0eab92d60a833480c 100644
--- a/go.sum
+++ b/go.sum
@@ -157,8 +157,8 @@ github.com/googleapis/gax-go/v2 v2.14.1 h1:hb0FFeiPaQskmvakKu5EbCbpntQn48jyHuvrk
 github.com/googleapis/gax-go/v2 v2.14.1/go.mod h1:Hb/NubMaVM88SrNkvl8X/o8XWwDJEPqouaLeN2IUxoA=
 github.com/gopacket/gopacket v1.3.1 h1:ZppWyLrOJNZPe5XkdjLbtuTkfQoxQ0xyMJzQCqtqaPU=
 github.com/gopacket/gopacket v1.3.1/go.mod h1:3I13qcqSpB2R9fFQg866OOgzylYkZxLTmkvcXhvf6qg=
-github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
-github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
+github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 h1:JeSE6pjso5THxAzdVpqr6/geYxZytqFMBCOtn/ujyeo=
+github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674/go.mod h1:r4w70xmWCQKmi1ONH4KIaBptdivuRPyosB9RmPlGEwA=
 github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc h1:GN2Lv3MGO7AS6PrRoT6yV5+wkrOpcszoIsO4+4ds248=
 github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc/go.mod h1:+JKpmjMGhpgPL+rXZ5nsZieVzvarn86asRlBg4uNGnk=
 github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 h1:5ZPtiqj0JL5oKWmcsq4VMaAW5ukBEgSGXEN89zeH1Jo=
@@ -256,8 +256,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+
 github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
 github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus=
 github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
-github.com/netobserv/flowlogs-pipeline v1.9.0-crc0.0.20250425123404-cab7e8f74fae h1:5L1LB2ZuarJjtkhiaOHxMCOS1LmmdUfCZ73stmpz2DM=
-github.com/netobserv/flowlogs-pipeline v1.9.0-crc0.0.20250425123404-cab7e8f74fae/go.mod h1:8555ajG+SsFidnKPu+QVPRpcm+btC6esFgdhNio+Z1I=
+github.com/netobserv/flowlogs-pipeline v1.9.0-crc0.0.20250502080315-db59695ae6de h1:Tsjw77SH/RGvho8QL4A4yTuZyidrDBRS2jFL8iBItxc=
+github.com/netobserv/flowlogs-pipeline v1.9.0-crc0.0.20250502080315-db59695ae6de/go.mod h1:w43fHzPa+/Q6zP5elc6XuqgkWvzZzoduGf8DGb/GbO8=
 github.com/netobserv/gopipes v0.3.0 h1:IYmPnnAVCdSK7VmHmpFhrVBOEm45qpgbZmJz1sSW+60=
 github.com/netobserv/gopipes v0.3.0/go.mod h1:N7/Gz05EOF0CQQSKWsv3eof22Cj2PB08Pbttw98YFYU=
 github.com/netobserv/loki-client-go v0.0.0-20250425113517-526b43e51847 h1:hjzhVZSSKIOmAzHbGUV4JhVIPkgKs/UtrWDx6JSVKMw=
diff --git a/vendor/github.com/gorilla/websocket/README.md b/vendor/github.com/gorilla/websocket/README.md
index d33ed7fdd8f89da3821a20c30d416803ca2d4adf..ff8bfab0b20f36da96e6bc88c09f842a203489f2 100644
--- a/vendor/github.com/gorilla/websocket/README.md
+++ b/vendor/github.com/gorilla/websocket/README.md
@@ -10,10 +10,10 @@ Gorilla WebSocket is a [Go](http://golang.org/) implementation of the
 ### Documentation
 
 * [API Reference](https://pkg.go.dev/github.com/gorilla/websocket?tab=doc)
-* [Chat example](https://github.com/gorilla/websocket/tree/master/examples/chat)
-* [Command example](https://github.com/gorilla/websocket/tree/master/examples/command)
-* [Client and server example](https://github.com/gorilla/websocket/tree/master/examples/echo)
-* [File watch example](https://github.com/gorilla/websocket/tree/master/examples/filewatch)
+* [Chat example](https://github.com/gorilla/websocket/tree/main/examples/chat)
+* [Command example](https://github.com/gorilla/websocket/tree/main/examples/command)
+* [Client and server example](https://github.com/gorilla/websocket/tree/main/examples/echo)
+* [File watch example](https://github.com/gorilla/websocket/tree/main/examples/filewatch)
 
 ### Status
 
@@ -29,5 +29,4 @@ package API is stable.
 
 The Gorilla WebSocket package passes the server tests in the [Autobahn Test
 Suite](https://github.com/crossbario/autobahn-testsuite) using the application in the [examples/autobahn
-subdirectory](https://github.com/gorilla/websocket/tree/master/examples/autobahn).
-
+subdirectory](https://github.com/gorilla/websocket/tree/main/examples/autobahn).
diff --git a/vendor/github.com/gorilla/websocket/client.go b/vendor/github.com/gorilla/websocket/client.go
index 04fdafee18ea87bd40e981393f3405157180949f..00917ea3418a4471577368143b965d82cdfcd2a6 100644
--- a/vendor/github.com/gorilla/websocket/client.go
+++ b/vendor/github.com/gorilla/websocket/client.go
@@ -11,7 +11,6 @@ import (
 	"errors"
 	"fmt"
 	"io"
-	"io/ioutil"
 	"net"
 	"net/http"
 	"net/http/httptrace"
@@ -52,18 +51,34 @@ func NewClient(netConn net.Conn, u *url.URL, requestHeader http.Header, readBufS
 //
 // It is safe to call Dialer's methods concurrently.
 type Dialer struct {
+	// The following custom dial functions can be set to establish
+	// connections to either the backend server or the proxy (if it
+	// exists). The scheme of the dialed entity (either backend or
+	// proxy) determines which custom dial function is selected:
+	// either NetDialTLSContext for HTTPS or NetDialContext/NetDial
+	// for HTTP. Since the "Proxy" function can determine the scheme
+	// dynamically, it can make sense to set multiple custom dial
+	// functions simultaneously.
+	//
 	// NetDial specifies the dial function for creating TCP connections. If
-	// NetDial is nil, net.Dial is used.
+	// NetDial is nil, net.Dialer DialContext is used.
+	// If "Proxy" field is also set, this function dials the proxy--not
+	// the backend server.
 	NetDial func(network, addr string) (net.Conn, error)
 
 	// NetDialContext specifies the dial function for creating TCP connections. If
 	// NetDialContext is nil, NetDial is used.
+	// If "Proxy" field is also set, this function dials the proxy--not
+	// the backend server.
 	NetDialContext func(ctx context.Context, network, addr string) (net.Conn, error)
 
 	// NetDialTLSContext specifies the dial function for creating TLS/TCP connections. If
 	// NetDialTLSContext is nil, NetDialContext is used.
 	// If NetDialTLSContext is set, Dial assumes the TLS handshake is done there and
 	// TLSClientConfig is ignored.
+	// If "Proxy" field is also set, this function dials the proxy (and performs
+	// the TLS handshake with the proxy, ignoring TLSClientConfig). In this TLS proxy
+	// dialing case the TLSClientConfig could still be necessary for TLS to the backend server.
 	NetDialTLSContext func(ctx context.Context, network, addr string) (net.Conn, error)
 
 	// Proxy specifies a function to return a proxy for a given
@@ -74,7 +89,7 @@ type Dialer struct {
 
 	// TLSClientConfig specifies the TLS configuration to use with tls.Client.
 	// If nil, the default configuration is used.
-	// If either NetDialTLS or NetDialTLSContext are set, Dial assumes the TLS handshake
+	// If NetDialTLSContext is set, Dial assumes the TLS handshake
 	// is done there and TLSClientConfig is ignored.
 	TLSClientConfig *tls.Config
 
@@ -245,71 +260,16 @@ func (d *Dialer) DialContext(ctx context.Context, urlStr string, requestHeader h
 		defer cancel()
 	}
 
-	// Get network dial function.
-	var netDial func(network, add string) (net.Conn, error)
-
-	switch u.Scheme {
-	case "http":
-		if d.NetDialContext != nil {
-			netDial = func(network, addr string) (net.Conn, error) {
-				return d.NetDialContext(ctx, network, addr)
-			}
-		} else if d.NetDial != nil {
-			netDial = d.NetDial
-		}
-	case "https":
-		if d.NetDialTLSContext != nil {
-			netDial = func(network, addr string) (net.Conn, error) {
-				return d.NetDialTLSContext(ctx, network, addr)
-			}
-		} else if d.NetDialContext != nil {
-			netDial = func(network, addr string) (net.Conn, error) {
-				return d.NetDialContext(ctx, network, addr)
-			}
-		} else if d.NetDial != nil {
-			netDial = d.NetDial
-		}
-	default:
-		return nil, nil, errMalformedURL
-	}
-
-	if netDial == nil {
-		netDialer := &net.Dialer{}
-		netDial = func(network, addr string) (net.Conn, error) {
-			return netDialer.DialContext(ctx, network, addr)
-		}
-	}
-
-	// If needed, wrap the dial function to set the connection deadline.
-	if deadline, ok := ctx.Deadline(); ok {
-		forwardDial := netDial
-		netDial = func(network, addr string) (net.Conn, error) {
-			c, err := forwardDial(network, addr)
-			if err != nil {
-				return nil, err
-			}
-			err = c.SetDeadline(deadline)
-			if err != nil {
-				c.Close()
-				return nil, err
-			}
-			return c, nil
-		}
-	}
-
-	// If needed, wrap the dial function to connect through a proxy.
+	var proxyURL *url.URL
 	if d.Proxy != nil {
-		proxyURL, err := d.Proxy(req)
+		proxyURL, err = d.Proxy(req)
 		if err != nil {
 			return nil, nil, err
 		}
-		if proxyURL != nil {
-			dialer, err := proxy_FromURL(proxyURL, netDialerFunc(netDial))
-			if err != nil {
-				return nil, nil, err
-			}
-			netDial = dialer.Dial
-		}
+	}
+	netDial, err := d.netDialFn(ctx, proxyURL, u)
+	if err != nil {
+		return nil, nil, err
 	}
 
 	hostPort, hostNoPort := hostPortNoPort(u)
@@ -318,7 +278,7 @@ func (d *Dialer) DialContext(ctx context.Context, urlStr string, requestHeader h
 		trace.GetConn(hostPort)
 	}
 
-	netConn, err := netDial("tcp", hostPort)
+	netConn, err := netDial(ctx, "tcp", hostPort)
 	if err != nil {
 		return nil, nil, err
 	}
@@ -328,14 +288,20 @@ func (d *Dialer) DialContext(ctx context.Context, urlStr string, requestHeader h
 		})
 	}
 
+	// Close the network connection when returning an error. The variable
+	// netConn is set to nil before the success return at the end of the
+	// function.
 	defer func() {
 		if netConn != nil {
-			netConn.Close()
+			// It's safe to ignore the error from Close() because this code is
+			// only executed when returning a more important error to the
+			// application.
+			_ = netConn.Close()
 		}
 	}()
 
-	if u.Scheme == "https" && d.NetDialTLSContext == nil {
-		// If NetDialTLSContext is set, assume that the TLS handshake has already been done
+	// Do TLS handshake over established connection if a proxy exists.
+	if proxyURL != nil && u.Scheme == "https" {
 
 		cfg := cloneTLSConfig(d.TLSClientConfig)
 		if cfg.ServerName == "" {
@@ -400,7 +366,7 @@ func (d *Dialer) DialContext(ctx context.Context, urlStr string, requestHeader h
 		// debugging.
 		buf := make([]byte, 1024)
 		n, _ := io.ReadFull(resp.Body, buf)
-		resp.Body = ioutil.NopCloser(bytes.NewReader(buf[:n]))
+		resp.Body = io.NopCloser(bytes.NewReader(buf[:n]))
 		return nil, resp, ErrBadHandshake
 	}
 
@@ -418,17 +384,134 @@ func (d *Dialer) DialContext(ctx context.Context, urlStr string, requestHeader h
 		break
 	}
 
-	resp.Body = ioutil.NopCloser(bytes.NewReader([]byte{}))
+	resp.Body = io.NopCloser(bytes.NewReader([]byte{}))
 	conn.subprotocol = resp.Header.Get("Sec-Websocket-Protocol")
 
-	netConn.SetDeadline(time.Time{})
-	netConn = nil // to avoid close in defer.
+	if err := netConn.SetDeadline(time.Time{}); err != nil {
+		return nil, resp, err
+	}
+
+	// Success! Set netConn to nil to stop the deferred function above from
+	// closing the network connection.
+	netConn = nil
+
 	return conn, resp, nil
 }
 
+// Returns the dial function to establish the connection to either the backend
+// server or the proxy (if it exists). If the dialed entity is HTTPS, then the
+// returned dial function *also* performs the TLS handshake to the dialed entity.
+// NOTE: If a proxy exists, it is possible for a second TLS handshake to be
+// necessary over the established connection.
+func (d *Dialer) netDialFn(ctx context.Context, proxyURL *url.URL, backendURL *url.URL) (netDialerFunc, error) {
+	var netDial netDialerFunc
+	if proxyURL != nil {
+		netDial = d.netDialFromURL(proxyURL)
+	} else {
+		netDial = d.netDialFromURL(backendURL)
+	}
+	// If needed, wrap the dial function to set the connection deadline.
+	if deadline, ok := ctx.Deadline(); ok {
+		netDial = netDialWithDeadline(netDial, deadline)
+	}
+	// Proxy dialing is wrapped to implement CONNECT method and possibly proxy auth.
+	if proxyURL != nil {
+		return proxyFromURL(proxyURL, netDial)
+	}
+	return netDial, nil
+}
+
+// Returns function to create the connection depending on the Dialer's
+// custom dialing functions and the passed URL of entity connecting to.
+func (d *Dialer) netDialFromURL(u *url.URL) netDialerFunc {
+	var netDial netDialerFunc
+	switch {
+	case d.NetDialContext != nil:
+		netDial = d.NetDialContext
+	case d.NetDial != nil:
+		netDial = func(ctx context.Context, net, addr string) (net.Conn, error) {
+			return d.NetDial(net, addr)
+		}
+	default:
+		netDial = (&net.Dialer{}).DialContext
+	}
+	// If dialed entity is HTTPS, then either use custom TLS dialing function (if exists)
+	// or wrap the previously computed "netDial" to use TLS config for handshake.
+	if u.Scheme == "https" {
+		if d.NetDialTLSContext != nil {
+			netDial = d.NetDialTLSContext
+		} else {
+			netDial = netDialWithTLSHandshake(netDial, d.TLSClientConfig, u)
+		}
+	}
+	return netDial
+}
+
+// Returns wrapped "netDial" function, performing TLS handshake after connecting.
+func netDialWithTLSHandshake(netDial netDialerFunc, tlsConfig *tls.Config, u *url.URL) netDialerFunc {
+	return func(ctx context.Context, unused, addr string) (net.Conn, error) {
+		hostPort, hostNoPort := hostPortNoPort(u)
+		trace := httptrace.ContextClientTrace(ctx)
+		if trace != nil && trace.GetConn != nil {
+			trace.GetConn(hostPort)
+		}
+		// Creates TCP connection to addr using passed "netDial" function.
+		conn, err := netDial(ctx, "tcp", addr)
+		if err != nil {
+			return nil, err
+		}
+		cfg := cloneTLSConfig(tlsConfig)
+		if cfg.ServerName == "" {
+			cfg.ServerName = hostNoPort
+		}
+		tlsConn := tls.Client(conn, cfg)
+		// Do the TLS handshake using TLSConfig over the wrapped connection.
+		if trace != nil && trace.TLSHandshakeStart != nil {
+			trace.TLSHandshakeStart()
+		}
+		err = doHandshake(ctx, tlsConn, cfg)
+		if trace != nil && trace.TLSHandshakeDone != nil {
+			trace.TLSHandshakeDone(tlsConn.ConnectionState(), err)
+		}
+		if err != nil {
+			tlsConn.Close()
+			return nil, err
+		}
+		return tlsConn, nil
+	}
+}
+
+// Returns wrapped "netDial" function, setting passed deadline.
+func netDialWithDeadline(netDial netDialerFunc, deadline time.Time) netDialerFunc {
+	return func(ctx context.Context, network, addr string) (net.Conn, error) {
+		c, err := netDial(ctx, network, addr)
+		if err != nil {
+			return nil, err
+		}
+		err = c.SetDeadline(deadline)
+		if err != nil {
+			c.Close()
+			return nil, err
+		}
+		return c, nil
+	}
+}
+
 func cloneTLSConfig(cfg *tls.Config) *tls.Config {
 	if cfg == nil {
 		return &tls.Config{}
 	}
 	return cfg.Clone()
 }
+
+func doHandshake(ctx context.Context, tlsConn *tls.Conn, cfg *tls.Config) error {
+	if err := tlsConn.HandshakeContext(ctx); err != nil {
+		return err
+	}
+	if !cfg.InsecureSkipVerify {
+		if err := tlsConn.VerifyHostname(cfg.ServerName); err != nil {
+			return err
+		}
+	}
+	return nil
+}
diff --git a/vendor/github.com/gorilla/websocket/compression.go b/vendor/github.com/gorilla/websocket/compression.go
index 813ffb1e84336da415256244366e09bcd65b6765..fe1079edbc60ecd7f16674995f0cc8ca2199b95d 100644
--- a/vendor/github.com/gorilla/websocket/compression.go
+++ b/vendor/github.com/gorilla/websocket/compression.go
@@ -33,7 +33,11 @@ func decompressNoContextTakeover(r io.Reader) io.ReadCloser {
 		"\x01\x00\x00\xff\xff"
 
 	fr, _ := flateReaderPool.Get().(io.ReadCloser)
-	fr.(flate.Resetter).Reset(io.MultiReader(r, strings.NewReader(tail)), nil)
+	mr := io.MultiReader(r, strings.NewReader(tail))
+	if err := fr.(flate.Resetter).Reset(mr, nil); err != nil {
+		// Reset never fails, but handle error in case that changes.
+		fr = flate.NewReader(mr)
+	}
 	return &flateReadWrapper{fr}
 }
 
diff --git a/vendor/github.com/gorilla/websocket/conn.go b/vendor/github.com/gorilla/websocket/conn.go
index 5161ef81f6292b36e6819a719c42fef48219f12b..9562ffd4978ccabea0da2f06256e1c281bcefb7f 100644
--- a/vendor/github.com/gorilla/websocket/conn.go
+++ b/vendor/github.com/gorilla/websocket/conn.go
@@ -6,11 +6,10 @@ package websocket
 
 import (
 	"bufio"
+	"crypto/rand"
 	"encoding/binary"
 	"errors"
 	"io"
-	"io/ioutil"
-	"math/rand"
 	"net"
 	"strconv"
 	"strings"
@@ -181,16 +180,16 @@ var (
 	errInvalidControlFrame = errors.New("websocket: invalid control frame")
 )
 
-func newMaskKey() [4]byte {
-	n := rand.Uint32()
-	return [4]byte{byte(n), byte(n >> 8), byte(n >> 16), byte(n >> 24)}
-}
+// maskRand is an io.Reader for generating mask bytes. The reader is initialized
+// to crypto/rand Reader. Tests swap the reader to a math/rand reader for
+// reproducible results.
+var maskRand = rand.Reader
 
-func hideTempErr(err error) error {
-	if e, ok := err.(net.Error); ok && e.Temporary() {
-		err = &netError{msg: e.Error(), timeout: e.Timeout()}
-	}
-	return err
+// newMaskKey returns a new 32 bit value for masking client frames.
+func newMaskKey() [4]byte {
+	var k [4]byte
+	_, _ = io.ReadFull(maskRand, k[:])
+	return k
 }
 
 func isControl(frameType int) bool {
@@ -358,7 +357,6 @@ func (c *Conn) RemoteAddr() net.Addr {
 // Write methods
 
 func (c *Conn) writeFatal(err error) error {
-	err = hideTempErr(err)
 	c.writeErrMu.Lock()
 	if c.writeErr == nil {
 		c.writeErr = err
@@ -372,7 +370,9 @@ func (c *Conn) read(n int) ([]byte, error) {
 	if err == io.EOF {
 		err = errUnexpectedEOF
 	}
-	c.br.Discard(len(p))
+	// Discard is guaranteed to succeed because the number of bytes to discard
+	// is less than or equal to the number of bytes buffered.
+	_, _ = c.br.Discard(len(p))
 	return p, err
 }
 
@@ -387,7 +387,9 @@ func (c *Conn) write(frameType int, deadline time.Time, buf0, buf1 []byte) error
 		return err
 	}
 
-	c.conn.SetWriteDeadline(deadline)
+	if err := c.conn.SetWriteDeadline(deadline); err != nil {
+		return c.writeFatal(err)
+	}
 	if len(buf1) == 0 {
 		_, err = c.conn.Write(buf0)
 	} else {
@@ -397,7 +399,7 @@ func (c *Conn) write(frameType int, deadline time.Time, buf0, buf1 []byte) error
 		return c.writeFatal(err)
 	}
 	if frameType == CloseMessage {
-		c.writeFatal(ErrCloseSent)
+		_ = c.writeFatal(ErrCloseSent)
 	}
 	return nil
 }
@@ -436,21 +438,27 @@ func (c *Conn) WriteControl(messageType int, data []byte, deadline time.Time) er
 		maskBytes(key, 0, buf[6:])
 	}
 
-	d := 1000 * time.Hour
-	if !deadline.IsZero() {
-		d = deadline.Sub(time.Now())
+	if deadline.IsZero() {
+		// No timeout for zero time.
+		<-c.mu
+	} else {
+		d := time.Until(deadline)
 		if d < 0 {
 			return errWriteTimeout
 		}
+		select {
+		case <-c.mu:
+		default:
+			timer := time.NewTimer(d)
+			select {
+			case <-c.mu:
+				timer.Stop()
+			case <-timer.C:
+				return errWriteTimeout
+			}
+		}
 	}
 
-	timer := time.NewTimer(d)
-	select {
-	case <-c.mu:
-		timer.Stop()
-	case <-timer.C:
-		return errWriteTimeout
-	}
 	defer func() { c.mu <- struct{}{} }()
 
 	c.writeErrMu.Lock()
@@ -460,13 +468,14 @@ func (c *Conn) WriteControl(messageType int, data []byte, deadline time.Time) er
 		return err
 	}
 
-	c.conn.SetWriteDeadline(deadline)
-	_, err = c.conn.Write(buf)
-	if err != nil {
+	if err := c.conn.SetWriteDeadline(deadline); err != nil {
+		return c.writeFatal(err)
+	}
+	if _, err = c.conn.Write(buf); err != nil {
 		return c.writeFatal(err)
 	}
 	if messageType == CloseMessage {
-		c.writeFatal(ErrCloseSent)
+		_ = c.writeFatal(ErrCloseSent)
 	}
 	return err
 }
@@ -630,7 +639,7 @@ func (w *messageWriter) flushFrame(final bool, extra []byte) error {
 	}
 
 	if final {
-		w.endMessage(errWriteClosed)
+		_ = w.endMessage(errWriteClosed)
 		return nil
 	}
 
@@ -795,7 +804,7 @@ func (c *Conn) advanceFrame() (int, error) {
 	// 1. Skip remainder of previous frame.
 
 	if c.readRemaining > 0 {
-		if _, err := io.CopyN(ioutil.Discard, c.br, c.readRemaining); err != nil {
+		if _, err := io.CopyN(io.Discard, c.br, c.readRemaining); err != nil {
 			return noFrame, err
 		}
 	}
@@ -817,7 +826,7 @@ func (c *Conn) advanceFrame() (int, error) {
 	rsv2 := p[0]&rsv2Bit != 0
 	rsv3 := p[0]&rsv3Bit != 0
 	mask := p[1]&maskBit != 0
-	c.setReadRemaining(int64(p[1] & 0x7f))
+	_ = c.setReadRemaining(int64(p[1] & 0x7f)) // will not fail because argument is >= 0
 
 	c.readDecompress = false
 	if rsv1 {
@@ -922,7 +931,8 @@ func (c *Conn) advanceFrame() (int, error) {
 		}
 
 		if c.readLimit > 0 && c.readLength > c.readLimit {
-			c.WriteControl(CloseMessage, FormatCloseMessage(CloseMessageTooBig, ""), time.Now().Add(writeWait))
+			// Make a best effort to send a close message describing the problem.
+			_ = c.WriteControl(CloseMessage, FormatCloseMessage(CloseMessageTooBig, ""), time.Now().Add(writeWait))
 			return noFrame, ErrReadLimit
 		}
 
@@ -934,7 +944,7 @@ func (c *Conn) advanceFrame() (int, error) {
 	var payload []byte
 	if c.readRemaining > 0 {
 		payload, err = c.read(int(c.readRemaining))
-		c.setReadRemaining(0)
+		_ = c.setReadRemaining(0) // will not fail because argument is >= 0
 		if err != nil {
 			return noFrame, err
 		}
@@ -981,7 +991,8 @@ func (c *Conn) handleProtocolError(message string) error {
 	if len(data) > maxControlFramePayloadSize {
 		data = data[:maxControlFramePayloadSize]
 	}
-	c.WriteControl(CloseMessage, data, time.Now().Add(writeWait))
+	// Make a best effor to send a close message describing the problem.
+	_ = c.WriteControl(CloseMessage, data, time.Now().Add(writeWait))
 	return errors.New("websocket: " + message)
 }
 
@@ -1008,7 +1019,7 @@ func (c *Conn) NextReader() (messageType int, r io.Reader, err error) {
 	for c.readErr == nil {
 		frameType, err := c.advanceFrame()
 		if err != nil {
-			c.readErr = hideTempErr(err)
+			c.readErr = err
 			break
 		}
 
@@ -1048,13 +1059,13 @@ func (r *messageReader) Read(b []byte) (int, error) {
 				b = b[:c.readRemaining]
 			}
 			n, err := c.br.Read(b)
-			c.readErr = hideTempErr(err)
+			c.readErr = err
 			if c.isServer {
 				c.readMaskPos = maskBytes(c.readMaskKey, c.readMaskPos, b[:n])
 			}
 			rem := c.readRemaining
 			rem -= int64(n)
-			c.setReadRemaining(rem)
+			_ = c.setReadRemaining(rem) // rem is guaranteed to be >= 0
 			if c.readRemaining > 0 && c.readErr == io.EOF {
 				c.readErr = errUnexpectedEOF
 			}
@@ -1069,7 +1080,7 @@ func (r *messageReader) Read(b []byte) (int, error) {
 		frameType, err := c.advanceFrame()
 		switch {
 		case err != nil:
-			c.readErr = hideTempErr(err)
+			c.readErr = err
 		case frameType == TextMessage || frameType == BinaryMessage:
 			c.readErr = errors.New("websocket: internal error, unexpected text or binary in Reader")
 		}
@@ -1094,7 +1105,7 @@ func (c *Conn) ReadMessage() (messageType int, p []byte, err error) {
 	if err != nil {
 		return messageType, nil, err
 	}
-	p, err = ioutil.ReadAll(r)
+	p, err = io.ReadAll(r)
 	return messageType, p, err
 }
 
@@ -1136,7 +1147,8 @@ func (c *Conn) SetCloseHandler(h func(code int, text string) error) {
 	if h == nil {
 		h = func(code int, text string) error {
 			message := FormatCloseMessage(code, "")
-			c.WriteControl(CloseMessage, message, time.Now().Add(writeWait))
+			// Make a best effor to send the close message.
+			_ = c.WriteControl(CloseMessage, message, time.Now().Add(writeWait))
 			return nil
 		}
 	}
@@ -1158,13 +1170,9 @@ func (c *Conn) PingHandler() func(appData string) error {
 func (c *Conn) SetPingHandler(h func(appData string) error) {
 	if h == nil {
 		h = func(message string) error {
-			err := c.WriteControl(PongMessage, []byte(message), time.Now().Add(writeWait))
-			if err == ErrCloseSent {
-				return nil
-			} else if e, ok := err.(net.Error); ok && e.Temporary() {
-				return nil
-			}
-			return err
+			// Make a best effort to send the pong message.
+			_ = c.WriteControl(PongMessage, []byte(message), time.Now().Add(writeWait))
+			return nil
 		}
 	}
 	c.handlePing = h
diff --git a/vendor/github.com/gorilla/websocket/proxy.go b/vendor/github.com/gorilla/websocket/proxy.go
index e0f466b72fbba37ae29eca5334dbf623542ecfc9..d716a058847a0ae670b74670bbe60c5318ff6907 100644
--- a/vendor/github.com/gorilla/websocket/proxy.go
+++ b/vendor/github.com/gorilla/websocket/proxy.go
@@ -6,34 +6,52 @@ package websocket
 
 import (
 	"bufio"
+	"bytes"
+	"context"
 	"encoding/base64"
 	"errors"
 	"net"
 	"net/http"
 	"net/url"
 	"strings"
+
+	"golang.org/x/net/proxy"
 )
 
-type netDialerFunc func(network, addr string) (net.Conn, error)
+type netDialerFunc func(ctx context.Context, network, addr string) (net.Conn, error)
 
 func (fn netDialerFunc) Dial(network, addr string) (net.Conn, error) {
-	return fn(network, addr)
+	return fn(context.Background(), network, addr)
 }
 
-func init() {
-	proxy_RegisterDialerType("http", func(proxyURL *url.URL, forwardDialer proxy_Dialer) (proxy_Dialer, error) {
-		return &httpProxyDialer{proxyURL: proxyURL, forwardDial: forwardDialer.Dial}, nil
-	})
+func (fn netDialerFunc) DialContext(ctx context.Context, network, addr string) (net.Conn, error) {
+	return fn(ctx, network, addr)
+}
+
+func proxyFromURL(proxyURL *url.URL, forwardDial netDialerFunc) (netDialerFunc, error) {
+	if proxyURL.Scheme == "http" || proxyURL.Scheme == "https" {
+		return (&httpProxyDialer{proxyURL: proxyURL, forwardDial: forwardDial}).DialContext, nil
+	}
+	dialer, err := proxy.FromURL(proxyURL, forwardDial)
+	if err != nil {
+		return nil, err
+	}
+	if d, ok := dialer.(proxy.ContextDialer); ok {
+		return d.DialContext, nil
+	}
+	return func(ctx context.Context, net, addr string) (net.Conn, error) {
+		return dialer.Dial(net, addr)
+	}, nil
 }
 
 type httpProxyDialer struct {
 	proxyURL    *url.URL
-	forwardDial func(network, addr string) (net.Conn, error)
+	forwardDial netDialerFunc
 }
 
-func (hpd *httpProxyDialer) Dial(network string, addr string) (net.Conn, error) {
+func (hpd *httpProxyDialer) DialContext(ctx context.Context, network string, addr string) (net.Conn, error) {
 	hostPort, _ := hostPortNoPort(hpd.proxyURL)
-	conn, err := hpd.forwardDial(network, hostPort)
+	conn, err := hpd.forwardDial(ctx, network, hostPort)
 	if err != nil {
 		return nil, err
 	}
@@ -46,7 +64,6 @@ func (hpd *httpProxyDialer) Dial(network string, addr string) (net.Conn, error)
 			connectHeader.Set("Proxy-Authorization", "Basic "+credential)
 		}
 	}
-
 	connectReq := &http.Request{
 		Method: http.MethodConnect,
 		URL:    &url.URL{Opaque: addr},
@@ -59,7 +76,7 @@ func (hpd *httpProxyDialer) Dial(network string, addr string) (net.Conn, error)
 		return nil, err
 	}
 
-	// Read response. It's OK to use and discard buffered reader here becaue
+	// Read response. It's OK to use and discard buffered reader here because
 	// the remote server does not speak until spoken to.
 	br := bufio.NewReader(conn)
 	resp, err := http.ReadResponse(br, connectReq)
@@ -68,8 +85,18 @@ func (hpd *httpProxyDialer) Dial(network string, addr string) (net.Conn, error)
 		return nil, err
 	}
 
-	if resp.StatusCode != 200 {
-		conn.Close()
+	// Close the response body to silence false positives from linters. Reset
+	// the buffered reader first to ensure that Close() does not read from
+	// conn.
+	// Note: Applications must call resp.Body.Close() on a response returned
+	// http.ReadResponse to inspect trailers or read another response from the
+	// buffered reader. The call to resp.Body.Close() does not release
+	// resources.
+	br.Reset(bytes.NewReader(nil))
+	_ = resp.Body.Close()
+
+	if resp.StatusCode != http.StatusOK {
+		_ = conn.Close()
 		f := strings.SplitN(resp.Status, " ", 2)
 		return nil, errors.New(f[1])
 	}
diff --git a/vendor/github.com/gorilla/websocket/server.go b/vendor/github.com/gorilla/websocket/server.go
index bb335974321331f4b5e430e286765f717769f5b1..02ea01fdcd7ee7d30dee0d7e90ec0073e102573a 100644
--- a/vendor/github.com/gorilla/websocket/server.go
+++ b/vendor/github.com/gorilla/websocket/server.go
@@ -6,8 +6,7 @@ package websocket
 
 import (
 	"bufio"
-	"errors"
-	"io"
+	"net"
 	"net/http"
 	"net/url"
 	"strings"
@@ -101,8 +100,8 @@ func checkSameOrigin(r *http.Request) bool {
 func (u *Upgrader) selectSubprotocol(r *http.Request, responseHeader http.Header) string {
 	if u.Subprotocols != nil {
 		clientProtocols := Subprotocols(r)
-		for _, serverProtocol := range u.Subprotocols {
-			for _, clientProtocol := range clientProtocols {
+		for _, clientProtocol := range clientProtocols {
+			for _, serverProtocol := range u.Subprotocols {
 				if clientProtocol == serverProtocol {
 					return clientProtocol
 				}
@@ -130,7 +129,8 @@ func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeade
 	}
 
 	if !tokenListContainsValue(r.Header, "Upgrade", "websocket") {
-		return u.returnError(w, r, http.StatusBadRequest, badHandshake+"'websocket' token not found in 'Upgrade' header")
+		w.Header().Set("Upgrade", "websocket")
+		return u.returnError(w, r, http.StatusUpgradeRequired, badHandshake+"'websocket' token not found in 'Upgrade' header")
 	}
 
 	if r.Method != http.MethodGet {
@@ -172,28 +172,37 @@ func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeade
 		}
 	}
 
-	h, ok := w.(http.Hijacker)
-	if !ok {
-		return u.returnError(w, r, http.StatusInternalServerError, "websocket: response does not implement http.Hijacker")
-	}
-	var brw *bufio.ReadWriter
-	netConn, brw, err := h.Hijack()
+	netConn, brw, err := http.NewResponseController(w).Hijack()
 	if err != nil {
-		return u.returnError(w, r, http.StatusInternalServerError, err.Error())
+		return u.returnError(w, r, http.StatusInternalServerError,
+			"websocket: hijack: "+err.Error())
 	}
 
-	if brw.Reader.Buffered() > 0 {
-		netConn.Close()
-		return nil, errors.New("websocket: client sent data before handshake is complete")
-	}
+	// Close the network connection when returning an error. The variable
+	// netConn is set to nil before the success return at the end of the
+	// function.
+	defer func() {
+		if netConn != nil {
+			// It's safe to ignore the error from Close() because this code is
+			// only executed when returning a more important error to the
+			// application.
+			_ = netConn.Close()
+		}
+	}()
 
 	var br *bufio.Reader
-	if u.ReadBufferSize == 0 && bufioReaderSize(netConn, brw.Reader) > 256 {
-		// Reuse hijacked buffered reader as connection reader.
+	if u.ReadBufferSize == 0 && brw.Reader.Size() > 256 {
+		// Use hijacked buffered reader as the connection reader.
 		br = brw.Reader
+	} else if brw.Reader.Buffered() > 0 {
+		// Wrap the network connection to read buffered data in brw.Reader
+		// before reading from the network connection. This should be rare
+		// because a client must not send message data before receiving the
+		// handshake response.
+		netConn = &brNetConn{br: brw.Reader, Conn: netConn}
 	}
 
-	buf := bufioWriterBuffer(netConn, brw.Writer)
+	buf := brw.Writer.AvailableBuffer()
 
 	var writeBuf []byte
 	if u.WriteBufferPool == nil && u.WriteBufferSize == 0 && len(buf) >= maxFrameHeaderSize+256 {
@@ -247,20 +256,30 @@ func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeade
 	}
 	p = append(p, "\r\n"...)
 
-	// Clear deadlines set by HTTP server.
-	netConn.SetDeadline(time.Time{})
-
 	if u.HandshakeTimeout > 0 {
-		netConn.SetWriteDeadline(time.Now().Add(u.HandshakeTimeout))
+		if err := netConn.SetWriteDeadline(time.Now().Add(u.HandshakeTimeout)); err != nil {
+			return nil, err
+		}
+	} else {
+		// Clear deadlines set by HTTP server.
+		if err := netConn.SetDeadline(time.Time{}); err != nil {
+			return nil, err
+		}
 	}
+
 	if _, err = netConn.Write(p); err != nil {
-		netConn.Close()
 		return nil, err
 	}
 	if u.HandshakeTimeout > 0 {
-		netConn.SetWriteDeadline(time.Time{})
+		if err := netConn.SetWriteDeadline(time.Time{}); err != nil {
+			return nil, err
+		}
 	}
 
+	// Success! Set netConn to nil to stop the deferred function above from
+	// closing the network connection.
+	netConn = nil
+
 	return c, nil
 }
 
@@ -327,39 +346,28 @@ func IsWebSocketUpgrade(r *http.Request) bool {
 		tokenListContainsValue(r.Header, "Upgrade", "websocket")
 }
 
-// bufioReaderSize size returns the size of a bufio.Reader.
-func bufioReaderSize(originalReader io.Reader, br *bufio.Reader) int {
-	// This code assumes that peek on a reset reader returns
-	// bufio.Reader.buf[:0].
-	// TODO: Use bufio.Reader.Size() after Go 1.10
-	br.Reset(originalReader)
-	if p, err := br.Peek(0); err == nil {
-		return cap(p)
-	}
-	return 0
+type brNetConn struct {
+	br *bufio.Reader
+	net.Conn
 }
 
-// writeHook is an io.Writer that records the last slice passed to it vio
-// io.Writer.Write.
-type writeHook struct {
-	p []byte
+func (b *brNetConn) Read(p []byte) (n int, err error) {
+	if b.br != nil {
+		// Limit read to buferred data.
+		if n := b.br.Buffered(); len(p) > n {
+			p = p[:n]
+		}
+		n, err = b.br.Read(p)
+		if b.br.Buffered() == 0 {
+			b.br = nil
+		}
+		return n, err
+	}
+	return b.Conn.Read(p)
 }
 
-func (wh *writeHook) Write(p []byte) (int, error) {
-	wh.p = p
-	return len(p), nil
+// NetConn returns the underlying connection that is wrapped by b.
+func (b *brNetConn) NetConn() net.Conn {
+	return b.Conn
 }
 
-// bufioWriterBuffer grabs the buffer from a bufio.Writer.
-func bufioWriterBuffer(originalWriter io.Writer, bw *bufio.Writer) []byte {
-	// This code assumes that bufio.Writer.buf[:1] is passed to the
-	// bufio.Writer's underlying writer.
-	var wh writeHook
-	bw.Reset(&wh)
-	bw.WriteByte(0)
-	bw.Flush()
-
-	bw.Reset(originalWriter)
-
-	return wh.p[:cap(wh.p)]
-}
diff --git a/vendor/github.com/gorilla/websocket/tls_handshake.go b/vendor/github.com/gorilla/websocket/tls_handshake.go
deleted file mode 100644
index a62b68ccb11e34561b04c7919d7f9badd47c0353..0000000000000000000000000000000000000000
--- a/vendor/github.com/gorilla/websocket/tls_handshake.go
+++ /dev/null
@@ -1,21 +0,0 @@
-//go:build go1.17
-// +build go1.17
-
-package websocket
-
-import (
-	"context"
-	"crypto/tls"
-)
-
-func doHandshake(ctx context.Context, tlsConn *tls.Conn, cfg *tls.Config) error {
-	if err := tlsConn.HandshakeContext(ctx); err != nil {
-		return err
-	}
-	if !cfg.InsecureSkipVerify {
-		if err := tlsConn.VerifyHostname(cfg.ServerName); err != nil {
-			return err
-		}
-	}
-	return nil
-}
diff --git a/vendor/github.com/gorilla/websocket/tls_handshake_116.go b/vendor/github.com/gorilla/websocket/tls_handshake_116.go
deleted file mode 100644
index e1b2b44f6e6c8cb3a188dfa8f9db6fe3253b262f..0000000000000000000000000000000000000000
--- a/vendor/github.com/gorilla/websocket/tls_handshake_116.go
+++ /dev/null
@@ -1,21 +0,0 @@
-//go:build !go1.17
-// +build !go1.17
-
-package websocket
-
-import (
-	"context"
-	"crypto/tls"
-)
-
-func doHandshake(ctx context.Context, tlsConn *tls.Conn, cfg *tls.Config) error {
-	if err := tlsConn.Handshake(); err != nil {
-		return err
-	}
-	if !cfg.InsecureSkipVerify {
-		if err := tlsConn.VerifyHostname(cfg.ServerName); err != nil {
-			return err
-		}
-	}
-	return nil
-}
diff --git a/vendor/github.com/gorilla/websocket/x_net_proxy.go b/vendor/github.com/gorilla/websocket/x_net_proxy.go
deleted file mode 100644
index 2e668f6b8821e4129856122630dc5691e2f1612a..0000000000000000000000000000000000000000
--- a/vendor/github.com/gorilla/websocket/x_net_proxy.go
+++ /dev/null
@@ -1,473 +0,0 @@
-// Code generated by golang.org/x/tools/cmd/bundle. DO NOT EDIT.
-//go:generate bundle -o x_net_proxy.go golang.org/x/net/proxy
-
-// Package proxy provides support for a variety of protocols to proxy network
-// data.
-//
-
-package websocket
-
-import (
-	"errors"
-	"io"
-	"net"
-	"net/url"
-	"os"
-	"strconv"
-	"strings"
-	"sync"
-)
-
-type proxy_direct struct{}
-
-// Direct is a direct proxy: one that makes network connections directly.
-var proxy_Direct = proxy_direct{}
-
-func (proxy_direct) Dial(network, addr string) (net.Conn, error) {
-	return net.Dial(network, addr)
-}
-
-// A PerHost directs connections to a default Dialer unless the host name
-// requested matches one of a number of exceptions.
-type proxy_PerHost struct {
-	def, bypass proxy_Dialer
-
-	bypassNetworks []*net.IPNet
-	bypassIPs      []net.IP
-	bypassZones    []string
-	bypassHosts    []string
-}
-
-// NewPerHost returns a PerHost Dialer that directs connections to either
-// defaultDialer or bypass, depending on whether the connection matches one of
-// the configured rules.
-func proxy_NewPerHost(defaultDialer, bypass proxy_Dialer) *proxy_PerHost {
-	return &proxy_PerHost{
-		def:    defaultDialer,
-		bypass: bypass,
-	}
-}
-
-// Dial connects to the address addr on the given network through either
-// defaultDialer or bypass.
-func (p *proxy_PerHost) Dial(network, addr string) (c net.Conn, err error) {
-	host, _, err := net.SplitHostPort(addr)
-	if err != nil {
-		return nil, err
-	}
-
-	return p.dialerForRequest(host).Dial(network, addr)
-}
-
-func (p *proxy_PerHost) dialerForRequest(host string) proxy_Dialer {
-	if ip := net.ParseIP(host); ip != nil {
-		for _, net := range p.bypassNetworks {
-			if net.Contains(ip) {
-				return p.bypass
-			}
-		}
-		for _, bypassIP := range p.bypassIPs {
-			if bypassIP.Equal(ip) {
-				return p.bypass
-			}
-		}
-		return p.def
-	}
-
-	for _, zone := range p.bypassZones {
-		if strings.HasSuffix(host, zone) {
-			return p.bypass
-		}
-		if host == zone[1:] {
-			// For a zone ".example.com", we match "example.com"
-			// too.
-			return p.bypass
-		}
-	}
-	for _, bypassHost := range p.bypassHosts {
-		if bypassHost == host {
-			return p.bypass
-		}
-	}
-	return p.def
-}
-
-// AddFromString parses a string that contains comma-separated values
-// specifying hosts that should use the bypass proxy. Each value is either an
-// IP address, a CIDR range, a zone (*.example.com) or a host name
-// (localhost). A best effort is made to parse the string and errors are
-// ignored.
-func (p *proxy_PerHost) AddFromString(s string) {
-	hosts := strings.Split(s, ",")
-	for _, host := range hosts {
-		host = strings.TrimSpace(host)
-		if len(host) == 0 {
-			continue
-		}
-		if strings.Contains(host, "/") {
-			// We assume that it's a CIDR address like 127.0.0.0/8
-			if _, net, err := net.ParseCIDR(host); err == nil {
-				p.AddNetwork(net)
-			}
-			continue
-		}
-		if ip := net.ParseIP(host); ip != nil {
-			p.AddIP(ip)
-			continue
-		}
-		if strings.HasPrefix(host, "*.") {
-			p.AddZone(host[1:])
-			continue
-		}
-		p.AddHost(host)
-	}
-}
-
-// AddIP specifies an IP address that will use the bypass proxy. Note that
-// this will only take effect if a literal IP address is dialed. A connection
-// to a named host will never match an IP.
-func (p *proxy_PerHost) AddIP(ip net.IP) {
-	p.bypassIPs = append(p.bypassIPs, ip)
-}
-
-// AddNetwork specifies an IP range that will use the bypass proxy. Note that
-// this will only take effect if a literal IP address is dialed. A connection
-// to a named host will never match.
-func (p *proxy_PerHost) AddNetwork(net *net.IPNet) {
-	p.bypassNetworks = append(p.bypassNetworks, net)
-}
-
-// AddZone specifies a DNS suffix that will use the bypass proxy. A zone of
-// "example.com" matches "example.com" and all of its subdomains.
-func (p *proxy_PerHost) AddZone(zone string) {
-	if strings.HasSuffix(zone, ".") {
-		zone = zone[:len(zone)-1]
-	}
-	if !strings.HasPrefix(zone, ".") {
-		zone = "." + zone
-	}
-	p.bypassZones = append(p.bypassZones, zone)
-}
-
-// AddHost specifies a host name that will use the bypass proxy.
-func (p *proxy_PerHost) AddHost(host string) {
-	if strings.HasSuffix(host, ".") {
-		host = host[:len(host)-1]
-	}
-	p.bypassHosts = append(p.bypassHosts, host)
-}
-
-// A Dialer is a means to establish a connection.
-type proxy_Dialer interface {
-	// Dial connects to the given address via the proxy.
-	Dial(network, addr string) (c net.Conn, err error)
-}
-
-// Auth contains authentication parameters that specific Dialers may require.
-type proxy_Auth struct {
-	User, Password string
-}
-
-// FromEnvironment returns the dialer specified by the proxy related variables in
-// the environment.
-func proxy_FromEnvironment() proxy_Dialer {
-	allProxy := proxy_allProxyEnv.Get()
-	if len(allProxy) == 0 {
-		return proxy_Direct
-	}
-
-	proxyURL, err := url.Parse(allProxy)
-	if err != nil {
-		return proxy_Direct
-	}
-	proxy, err := proxy_FromURL(proxyURL, proxy_Direct)
-	if err != nil {
-		return proxy_Direct
-	}
-
-	noProxy := proxy_noProxyEnv.Get()
-	if len(noProxy) == 0 {
-		return proxy
-	}
-
-	perHost := proxy_NewPerHost(proxy, proxy_Direct)
-	perHost.AddFromString(noProxy)
-	return perHost
-}
-
-// proxySchemes is a map from URL schemes to a function that creates a Dialer
-// from a URL with such a scheme.
-var proxy_proxySchemes map[string]func(*url.URL, proxy_Dialer) (proxy_Dialer, error)
-
-// RegisterDialerType takes a URL scheme and a function to generate Dialers from
-// a URL with that scheme and a forwarding Dialer. Registered schemes are used
-// by FromURL.
-func proxy_RegisterDialerType(scheme string, f func(*url.URL, proxy_Dialer) (proxy_Dialer, error)) {
-	if proxy_proxySchemes == nil {
-		proxy_proxySchemes = make(map[string]func(*url.URL, proxy_Dialer) (proxy_Dialer, error))
-	}
-	proxy_proxySchemes[scheme] = f
-}
-
-// FromURL returns a Dialer given a URL specification and an underlying
-// Dialer for it to make network requests.
-func proxy_FromURL(u *url.URL, forward proxy_Dialer) (proxy_Dialer, error) {
-	var auth *proxy_Auth
-	if u.User != nil {
-		auth = new(proxy_Auth)
-		auth.User = u.User.Username()
-		if p, ok := u.User.Password(); ok {
-			auth.Password = p
-		}
-	}
-
-	switch u.Scheme {
-	case "socks5":
-		return proxy_SOCKS5("tcp", u.Host, auth, forward)
-	}
-
-	// If the scheme doesn't match any of the built-in schemes, see if it
-	// was registered by another package.
-	if proxy_proxySchemes != nil {
-		if f, ok := proxy_proxySchemes[u.Scheme]; ok {
-			return f(u, forward)
-		}
-	}
-
-	return nil, errors.New("proxy: unknown scheme: " + u.Scheme)
-}
-
-var (
-	proxy_allProxyEnv = &proxy_envOnce{
-		names: []string{"ALL_PROXY", "all_proxy"},
-	}
-	proxy_noProxyEnv = &proxy_envOnce{
-		names: []string{"NO_PROXY", "no_proxy"},
-	}
-)
-
-// envOnce looks up an environment variable (optionally by multiple
-// names) once. It mitigates expensive lookups on some platforms
-// (e.g. Windows).
-// (Borrowed from net/http/transport.go)
-type proxy_envOnce struct {
-	names []string
-	once  sync.Once
-	val   string
-}
-
-func (e *proxy_envOnce) Get() string {
-	e.once.Do(e.init)
-	return e.val
-}
-
-func (e *proxy_envOnce) init() {
-	for _, n := range e.names {
-		e.val = os.Getenv(n)
-		if e.val != "" {
-			return
-		}
-	}
-}
-
-// SOCKS5 returns a Dialer that makes SOCKSv5 connections to the given address
-// with an optional username and password. See RFC 1928 and RFC 1929.
-func proxy_SOCKS5(network, addr string, auth *proxy_Auth, forward proxy_Dialer) (proxy_Dialer, error) {
-	s := &proxy_socks5{
-		network: network,
-		addr:    addr,
-		forward: forward,
-	}
-	if auth != nil {
-		s.user = auth.User
-		s.password = auth.Password
-	}
-
-	return s, nil
-}
-
-type proxy_socks5 struct {
-	user, password string
-	network, addr  string
-	forward        proxy_Dialer
-}
-
-const proxy_socks5Version = 5
-
-const (
-	proxy_socks5AuthNone     = 0
-	proxy_socks5AuthPassword = 2
-)
-
-const proxy_socks5Connect = 1
-
-const (
-	proxy_socks5IP4    = 1
-	proxy_socks5Domain = 3
-	proxy_socks5IP6    = 4
-)
-
-var proxy_socks5Errors = []string{
-	"",
-	"general failure",
-	"connection forbidden",
-	"network unreachable",
-	"host unreachable",
-	"connection refused",
-	"TTL expired",
-	"command not supported",
-	"address type not supported",
-}
-
-// Dial connects to the address addr on the given network via the SOCKS5 proxy.
-func (s *proxy_socks5) Dial(network, addr string) (net.Conn, error) {
-	switch network {
-	case "tcp", "tcp6", "tcp4":
-	default:
-		return nil, errors.New("proxy: no support for SOCKS5 proxy connections of type " + network)
-	}
-
-	conn, err := s.forward.Dial(s.network, s.addr)
-	if err != nil {
-		return nil, err
-	}
-	if err := s.connect(conn, addr); err != nil {
-		conn.Close()
-		return nil, err
-	}
-	return conn, nil
-}
-
-// connect takes an existing connection to a socks5 proxy server,
-// and commands the server to extend that connection to target,
-// which must be a canonical address with a host and port.
-func (s *proxy_socks5) connect(conn net.Conn, target string) error {
-	host, portStr, err := net.SplitHostPort(target)
-	if err != nil {
-		return err
-	}
-
-	port, err := strconv.Atoi(portStr)
-	if err != nil {
-		return errors.New("proxy: failed to parse port number: " + portStr)
-	}
-	if port < 1 || port > 0xffff {
-		return errors.New("proxy: port number out of range: " + portStr)
-	}
-
-	// the size here is just an estimate
-	buf := make([]byte, 0, 6+len(host))
-
-	buf = append(buf, proxy_socks5Version)
-	if len(s.user) > 0 && len(s.user) < 256 && len(s.password) < 256 {
-		buf = append(buf, 2 /* num auth methods */, proxy_socks5AuthNone, proxy_socks5AuthPassword)
-	} else {
-		buf = append(buf, 1 /* num auth methods */, proxy_socks5AuthNone)
-	}
-
-	if _, err := conn.Write(buf); err != nil {
-		return errors.New("proxy: failed to write greeting to SOCKS5 proxy at " + s.addr + ": " + err.Error())
-	}
-
-	if _, err := io.ReadFull(conn, buf[:2]); err != nil {
-		return errors.New("proxy: failed to read greeting from SOCKS5 proxy at " + s.addr + ": " + err.Error())
-	}
-	if buf[0] != 5 {
-		return errors.New("proxy: SOCKS5 proxy at " + s.addr + " has unexpected version " + strconv.Itoa(int(buf[0])))
-	}
-	if buf[1] == 0xff {
-		return errors.New("proxy: SOCKS5 proxy at " + s.addr + " requires authentication")
-	}
-
-	// See RFC 1929
-	if buf[1] == proxy_socks5AuthPassword {
-		buf = buf[:0]
-		buf = append(buf, 1 /* password protocol version */)
-		buf = append(buf, uint8(len(s.user)))
-		buf = append(buf, s.user...)
-		buf = append(buf, uint8(len(s.password)))
-		buf = append(buf, s.password...)
-
-		if _, err := conn.Write(buf); err != nil {
-			return errors.New("proxy: failed to write authentication request to SOCKS5 proxy at " + s.addr + ": " + err.Error())
-		}
-
-		if _, err := io.ReadFull(conn, buf[:2]); err != nil {
-			return errors.New("proxy: failed to read authentication reply from SOCKS5 proxy at " + s.addr + ": " + err.Error())
-		}
-
-		if buf[1] != 0 {
-			return errors.New("proxy: SOCKS5 proxy at " + s.addr + " rejected username/password")
-		}
-	}
-
-	buf = buf[:0]
-	buf = append(buf, proxy_socks5Version, proxy_socks5Connect, 0 /* reserved */)
-
-	if ip := net.ParseIP(host); ip != nil {
-		if ip4 := ip.To4(); ip4 != nil {
-			buf = append(buf, proxy_socks5IP4)
-			ip = ip4
-		} else {
-			buf = append(buf, proxy_socks5IP6)
-		}
-		buf = append(buf, ip...)
-	} else {
-		if len(host) > 255 {
-			return errors.New("proxy: destination host name too long: " + host)
-		}
-		buf = append(buf, proxy_socks5Domain)
-		buf = append(buf, byte(len(host)))
-		buf = append(buf, host...)
-	}
-	buf = append(buf, byte(port>>8), byte(port))
-
-	if _, err := conn.Write(buf); err != nil {
-		return errors.New("proxy: failed to write connect request to SOCKS5 proxy at " + s.addr + ": " + err.Error())
-	}
-
-	if _, err := io.ReadFull(conn, buf[:4]); err != nil {
-		return errors.New("proxy: failed to read connect reply from SOCKS5 proxy at " + s.addr + ": " + err.Error())
-	}
-
-	failure := "unknown error"
-	if int(buf[1]) < len(proxy_socks5Errors) {
-		failure = proxy_socks5Errors[buf[1]]
-	}
-
-	if len(failure) > 0 {
-		return errors.New("proxy: SOCKS5 proxy at " + s.addr + " failed to connect: " + failure)
-	}
-
-	bytesToDiscard := 0
-	switch buf[3] {
-	case proxy_socks5IP4:
-		bytesToDiscard = net.IPv4len
-	case proxy_socks5IP6:
-		bytesToDiscard = net.IPv6len
-	case proxy_socks5Domain:
-		_, err := io.ReadFull(conn, buf[:1])
-		if err != nil {
-			return errors.New("proxy: failed to read domain length from SOCKS5 proxy at " + s.addr + ": " + err.Error())
-		}
-		bytesToDiscard = int(buf[0])
-	default:
-		return errors.New("proxy: got unknown address type " + strconv.Itoa(int(buf[3])) + " from SOCKS5 proxy at " + s.addr)
-	}
-
-	if cap(buf) < bytesToDiscard {
-		buf = make([]byte, bytesToDiscard)
-	} else {
-		buf = buf[:bytesToDiscard]
-	}
-	if _, err := io.ReadFull(conn, buf); err != nil {
-		return errors.New("proxy: failed to read address from SOCKS5 proxy at " + s.addr + ": " + err.Error())
-	}
-
-	// Also need to discard the port number
-	if _, err := io.ReadFull(conn, buf[:2]); err != nil {
-		return errors.New("proxy: failed to read port from SOCKS5 proxy at " + s.addr + ": " + err.Error())
-	}
-
-	return nil
-}
diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/api/transform_filter.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/api/transform_filter.go
index e41d7cd90d0c4794acc5c4fc850a0d74abde403a..29f0b0cfc7f1167ab0afbc01e90db8945bac0449 100644
--- a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/api/transform_filter.go
+++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/api/transform_filter.go
@@ -37,7 +37,7 @@ const (
 	RemoveEntryIfEqual       TransformFilterEnum = "remove_entry_if_equal"        // removes the entry if the field value equals specified value
 	RemoveEntryIfNotEqual    TransformFilterEnum = "remove_entry_if_not_equal"    // removes the entry if the field value does not equal specified value
 	RemoveEntryAllSatisfied  TransformFilterEnum = "remove_entry_all_satisfied"   // removes the entry if all of the defined rules are satisfied
-	KeepEntryAllSatisfied    TransformFilterEnum = "keep_entry_all_satisfied"     // keeps the entry if the set of rules are all satisfied
+	KeepEntryQuery           TransformFilterEnum = "keep_entry_query"             // keeps the entry if it matches the query
 	AddField                 TransformFilterEnum = "add_field"                    // adds (input) field to the entry; overrides previous value if present (key=input, value=value)
 	AddFieldIfDoesntExist    TransformFilterEnum = "add_field_if_doesnt_exist"    // adds a field to the entry if the field does not exist
 	AddFieldIf               TransformFilterEnum = "add_field_if"                 // add output field set to assignee if input field satisfies criteria from parameters field
@@ -56,23 +56,12 @@ const (
 	RemoveEntryIfNotEqualD    TransformFilterRemoveEntryEnum = "remove_entry_if_not_equal"    // removes the entry if the field value does not equal specified value
 )
 
-type TransformFilterKeepEntryEnum string
-
-const (
-	KeepEntryIfExists        TransformFilterKeepEntryEnum = "keep_entry_if_exists"          // keeps the entry if the field exists
-	KeepEntryIfDoesntExist   TransformFilterKeepEntryEnum = "keep_entry_if_doesnt_exist"    // keeps the entry if the field does not exist
-	KeepEntryIfEqual         TransformFilterKeepEntryEnum = "keep_entry_if_equal"           // keeps the entry if the field value equals specified value
-	KeepEntryIfNotEqual      TransformFilterKeepEntryEnum = "keep_entry_if_not_equal"       // keeps the entry if the field value does not equal specified value
-	KeepEntryIfRegexMatch    TransformFilterKeepEntryEnum = "keep_entry_if_regex_match"     // keeps the entry if the field value matches the specified regex
-	KeepEntryIfNotRegexMatch TransformFilterKeepEntryEnum = "keep_entry_if_not_regex_match" // keeps the entry if the field value does not match the specified regex
-)
-
 type TransformFilterRule struct {
 	Type                    TransformFilterEnum              `yaml:"type,omitempty" json:"type,omitempty" doc:"(enum) one of the following:"`
 	RemoveField             *TransformFilterGenericRule      `yaml:"removeField,omitempty" json:"removeField,omitempty" doc:"configuration for remove_field rule"`
 	RemoveEntry             *TransformFilterGenericRule      `yaml:"removeEntry,omitempty" json:"removeEntry,omitempty" doc:"configuration for remove_entry_* rules"`
 	RemoveEntryAllSatisfied []*RemoveEntryRule               `yaml:"removeEntryAllSatisfied,omitempty" json:"removeEntryAllSatisfied,omitempty" doc:"configuration for remove_entry_all_satisfied rule"`
-	KeepEntryAllSatisfied   []*KeepEntryRule                 `yaml:"keepEntryAllSatisfied,omitempty" json:"keepEntryAllSatisfied,omitempty" doc:"configuration for keep_entry rule"`
+	KeepEntryQuery          string                           `yaml:"keepEntryQuery,omitempty" json:"keepEntryQuery,omitempty" doc:"configuration for keep_entry rule"`
 	KeepEntrySampling       uint16                           `yaml:"keepEntrySampling,omitempty" json:"keepEntrySampling,omitempty" doc:"sampling value for keep_entry type: 1 flow on <sampling> is kept"`
 	AddField                *TransformFilterGenericRule      `yaml:"addField,omitempty" json:"addField,omitempty" doc:"configuration for add_field rule"`
 	AddFieldIfDoesntExist   *TransformFilterGenericRule      `yaml:"addFieldIfDoesntExist,omitempty" json:"addFieldIfDoesntExist,omitempty" doc:"configuration for add_field_if_doesnt_exist rule"`
@@ -93,9 +82,6 @@ func (r *TransformFilterRule) preprocess() {
 	for i := range r.RemoveEntryAllSatisfied {
 		r.RemoveEntryAllSatisfied[i].RemoveEntry.preprocess()
 	}
-	for i := range r.KeepEntryAllSatisfied {
-		r.KeepEntryAllSatisfied[i].KeepEntry.preprocess()
-	}
 	for i := range r.ConditionalSampling {
 		r.ConditionalSampling[i].preprocess()
 	}
@@ -127,11 +113,6 @@ type RemoveEntryRule struct {
 	RemoveEntry *TransformFilterGenericRule    `yaml:"removeEntry,omitempty" json:"removeEntry,omitempty" doc:"configuration for remove_entry_* rules"`
 }
 
-type KeepEntryRule struct {
-	Type      TransformFilterKeepEntryEnum `yaml:"type,omitempty" json:"type,omitempty" doc:"(enum) one of the following:"`
-	KeepEntry *TransformFilterGenericRule  `yaml:"keepEntry,omitempty" json:"keepEntry,omitempty" doc:"configuration for keep_entry_* rules"`
-}
-
 type SamplingCondition struct {
 	Value uint16             `yaml:"value,omitempty" json:"value,omitempty" doc:"sampling value: 1 flow on <sampling> is kept"`
 	Rules []*RemoveEntryRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"rules to be satisfied for this sampling configuration"`
diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/dsl/eval.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/dsl/eval.go
new file mode 100644
index 0000000000000000000000000000000000000000..f37604b29b34b0c4941d19e1e9bc50d8ba670cac
--- /dev/null
+++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/dsl/eval.go
@@ -0,0 +1,33 @@
+package dsl
+
+import (
+	"github.com/netobserv/flowlogs-pipeline/pkg/config"
+	"github.com/netobserv/flowlogs-pipeline/pkg/utils/filters"
+)
+
+type tree struct {
+	logicalOp string
+	children  []*tree
+	predicate filters.Predicate
+}
+
+func (t *tree) apply(flow config.GenericMap) bool {
+	if t.predicate != nil {
+		return t.predicate(flow)
+	}
+	if t.logicalOp == operatorAnd {
+		for _, child := range t.children {
+			if !child.apply(flow) {
+				return false
+			}
+		}
+		return true
+	}
+	// t.logicalOp == operatorOr
+	for _, child := range t.children {
+		if child.apply(flow) {
+			return true
+		}
+	}
+	return false
+}
diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/dsl/expr.y b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/dsl/expr.y
new file mode 100644
index 0000000000000000000000000000000000000000..e01a9e7a68b36b7108041ca8b82ba53065b06214
--- /dev/null
+++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/dsl/expr.y
@@ -0,0 +1,42 @@
+%{
+package dsl
+%}
+
+%union{
+	expr  Expression
+	value string
+	intValue int
+}
+
+%type <expr> root
+%type <expr> expr
+
+%token <value> NF_FIELD STRING AND OR EQ NEQ GT LT GE LE REG NREG OPEN_PARENTHESIS CLOSE_PARENTHESIS WITH WITHOUT
+%token <intValue> NUMBER
+%left AND
+%left OR
+%%
+
+root:
+        expr {
+		$$ = $1
+		yylex.(*Lexer).result = $$
+	}
+
+expr:
+	OPEN_PARENTHESIS expr CLOSE_PARENTHESIS { $$ = ParenthesisExpr{inner: $2} }
+	| expr AND expr { $$ = LogicalExpr{left: $1, operator: operatorAnd, right: $3} }
+	| expr OR expr { $$ = LogicalExpr{left: $1, operator: operatorOr, right: $3} }
+	| WITH OPEN_PARENTHESIS NF_FIELD CLOSE_PARENTHESIS { $$ = WithExpr{key: $3} }
+	| WITHOUT OPEN_PARENTHESIS NF_FIELD CLOSE_PARENTHESIS { $$ = WithoutExpr{key: $3} }
+	| NF_FIELD EQ STRING { $$ = EqExpr{key: $1, value: $3} }
+	| NF_FIELD NEQ STRING { $$ = NEqExpr{key: $1, value: $3} }
+	| NF_FIELD EQ NUMBER { $$ = EqNumExpr{key: $1, value: $3} }
+	| NF_FIELD NEQ NUMBER { $$ = NEqNumExpr{key: $1, value: $3} }
+	| NF_FIELD LT NUMBER { $$ = LessThanExpr{key: $1, value: $3} }
+	| NF_FIELD GT NUMBER { $$ = GreaterThanExpr{key: $1, value: $3} }
+	| NF_FIELD LE NUMBER { $$ = LessOrEqualThanExpr{key: $1, value: $3} }
+	| NF_FIELD GE NUMBER { $$ = GreaterOrEqualThanExpr{key: $1, value: $3} }
+	| NF_FIELD REG STRING { $$ = RegExpr{key: $1, value: $3} }
+	| NF_FIELD NREG STRING { $$ = NRegExpr{key: $1, value: $3} }
+%%
diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/dsl/expr.y.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/dsl/expr.y.go
new file mode 100644
index 0000000000000000000000000000000000000000..0c560d4cb3806e72ce1909abd4253f1f56b89bf9
--- /dev/null
+++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/dsl/expr.y.go
@@ -0,0 +1,569 @@
+// Code generated by goyacc -o pkg/dsl/expr.y.go pkg/dsl/expr.y. DO NOT EDIT.
+
+//line pkg/dsl/expr.y:2
+package dsl
+
+import __yyfmt__ "fmt"
+
+//line pkg/dsl/expr.y:2
+
+//line pkg/dsl/expr.y:5
+type yySymType struct {
+	yys      int
+	expr     Expression
+	value    string
+	intValue int
+}
+
+const NF_FIELD = 57346
+const STRING = 57347
+const AND = 57348
+const OR = 57349
+const EQ = 57350
+const NEQ = 57351
+const GT = 57352
+const LT = 57353
+const GE = 57354
+const LE = 57355
+const REG = 57356
+const NREG = 57357
+const OPEN_PARENTHESIS = 57358
+const CLOSE_PARENTHESIS = 57359
+const WITH = 57360
+const WITHOUT = 57361
+const NUMBER = 57362
+
+var yyToknames = [...]string{
+	"$end",
+	"error",
+	"$unk",
+	"NF_FIELD",
+	"STRING",
+	"AND",
+	"OR",
+	"EQ",
+	"NEQ",
+	"GT",
+	"LT",
+	"GE",
+	"LE",
+	"REG",
+	"NREG",
+	"OPEN_PARENTHESIS",
+	"CLOSE_PARENTHESIS",
+	"WITH",
+	"WITHOUT",
+	"NUMBER",
+}
+
+var yyStatenames = [...]string{}
+
+const yyEofCode = 1
+const yyErrCode = 2
+const yyInitialStackSize = 16
+
+//line pkg/dsl/expr.y:42
+
+//line yacctab:1
+var yyExca = [...]int8{
+	-1, 1,
+	1, -1,
+	-2, 0,
+}
+
+const yyPrivate = 57344
+
+const yyLast = 39
+
+var yyAct = [...]int8{
+	6, 12, 13, 15, 14, 17, 16, 18, 19, 27,
+	25, 32, 3, 34, 4, 5, 11, 31, 30, 29,
+	7, 8, 8, 36, 28, 26, 35, 10, 2, 7,
+	8, 22, 9, 33, 24, 23, 20, 21, 1,
+}
+
+var yyPact = [...]int16{
+	-4, -1000, 23, -4, 11, 0, -7, -4, -4, 14,
+	31, 30, 5, 4, -1, -2, -3, -9, 28, 8,
+	15, -1000, -1000, 9, 6, -1000, -1000, -1000, -1000, -1000,
+	-1000, -1000, -1000, -1000, -1000, -1000, -1000,
+}
+
+var yyPgo = [...]int8{
+	0, 38, 28,
+}
+
+var yyR1 = [...]int8{
+	0, 1, 2, 2, 2, 2, 2, 2, 2, 2,
+	2, 2, 2, 2, 2, 2, 2,
+}
+
+var yyR2 = [...]int8{
+	0, 1, 3, 3, 3, 4, 4, 3, 3, 3,
+	3, 3, 3, 3, 3, 3, 3,
+}
+
+var yyChk = [...]int16{
+	-1000, -1, -2, 16, 18, 19, 4, 6, 7, -2,
+	16, 16, 8, 9, 11, 10, 13, 12, 14, 15,
+	-2, -2, 17, 4, 4, 5, 20, 5, 20, 20,
+	20, 20, 20, 5, 5, 17, 17,
+}
+
+var yyDef = [...]int8{
+	0, -2, 1, 0, 0, 0, 0, 0, 0, 0,
+	0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+	3, 4, 2, 0, 0, 7, 9, 8, 10, 11,
+	12, 13, 14, 15, 16, 5, 6,
+}
+
+var yyTok1 = [...]int8{
+	1,
+}
+
+var yyTok2 = [...]int8{
+	2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
+	12, 13, 14, 15, 16, 17, 18, 19, 20,
+}
+
+var yyTok3 = [...]int8{
+	0,
+}
+
+var yyErrorMessages = [...]struct {
+	state int
+	token int
+	msg   string
+}{}
+
+//line yaccpar:1
+
+/*	parser for yacc output	*/
+
+var (
+	yyDebug        = 0
+	yyErrorVerbose = false
+)
+
+type yyLexer interface {
+	Lex(lval *yySymType) int
+	Error(s string)
+}
+
+type yyParser interface {
+	Parse(yyLexer) int
+	Lookahead() int
+}
+
+type yyParserImpl struct {
+	lval  yySymType
+	stack [yyInitialStackSize]yySymType
+	char  int
+}
+
+func (p *yyParserImpl) Lookahead() int {
+	return p.char
+}
+
+func yyNewParser() yyParser {
+	return &yyParserImpl{}
+}
+
+const yyFlag = -1000
+
+func yyTokname(c int) string {
+	if c >= 1 && c-1 < len(yyToknames) {
+		if yyToknames[c-1] != "" {
+			return yyToknames[c-1]
+		}
+	}
+	return __yyfmt__.Sprintf("tok-%v", c)
+}
+
+func yyStatname(s int) string {
+	if s >= 0 && s < len(yyStatenames) {
+		if yyStatenames[s] != "" {
+			return yyStatenames[s]
+		}
+	}
+	return __yyfmt__.Sprintf("state-%v", s)
+}
+
+func yyErrorMessage(state, lookAhead int) string {
+	const TOKSTART = 4
+
+	if !yyErrorVerbose {
+		return "syntax error"
+	}
+
+	for _, e := range yyErrorMessages {
+		if e.state == state && e.token == lookAhead {
+			return "syntax error: " + e.msg
+		}
+	}
+
+	res := "syntax error: unexpected " + yyTokname(lookAhead)
+
+	// To match Bison, suggest at most four expected tokens.
+	expected := make([]int, 0, 4)
+
+	// Look for shiftable tokens.
+	base := int(yyPact[state])
+	for tok := TOKSTART; tok-1 < len(yyToknames); tok++ {
+		if n := base + tok; n >= 0 && n < yyLast && int(yyChk[int(yyAct[n])]) == tok {
+			if len(expected) == cap(expected) {
+				return res
+			}
+			expected = append(expected, tok)
+		}
+	}
+
+	if yyDef[state] == -2 {
+		i := 0
+		for yyExca[i] != -1 || int(yyExca[i+1]) != state {
+			i += 2
+		}
+
+		// Look for tokens that we accept or reduce.
+		for i += 2; yyExca[i] >= 0; i += 2 {
+			tok := int(yyExca[i])
+			if tok < TOKSTART || yyExca[i+1] == 0 {
+				continue
+			}
+			if len(expected) == cap(expected) {
+				return res
+			}
+			expected = append(expected, tok)
+		}
+
+		// If the default action is to accept or reduce, give up.
+		if yyExca[i+1] != 0 {
+			return res
+		}
+	}
+
+	for i, tok := range expected {
+		if i == 0 {
+			res += ", expecting "
+		} else {
+			res += " or "
+		}
+		res += yyTokname(tok)
+	}
+	return res
+}
+
+func yylex1(lex yyLexer, lval *yySymType) (char, token int) {
+	token = 0
+	char = lex.Lex(lval)
+	if char <= 0 {
+		token = int(yyTok1[0])
+		goto out
+	}
+	if char < len(yyTok1) {
+		token = int(yyTok1[char])
+		goto out
+	}
+	if char >= yyPrivate {
+		if char < yyPrivate+len(yyTok2) {
+			token = int(yyTok2[char-yyPrivate])
+			goto out
+		}
+	}
+	for i := 0; i < len(yyTok3); i += 2 {
+		token = int(yyTok3[i+0])
+		if token == char {
+			token = int(yyTok3[i+1])
+			goto out
+		}
+	}
+
+out:
+	if token == 0 {
+		token = int(yyTok2[1]) /* unknown char */
+	}
+	if yyDebug >= 3 {
+		__yyfmt__.Printf("lex %s(%d)\n", yyTokname(token), uint(char))
+	}
+	return char, token
+}
+
+func yyParse(yylex yyLexer) int {
+	return yyNewParser().Parse(yylex)
+}
+
+func (yyrcvr *yyParserImpl) Parse(yylex yyLexer) int {
+	var yyn int
+	var yyVAL yySymType
+	var yyDollar []yySymType
+	_ = yyDollar // silence set and not used
+	yyS := yyrcvr.stack[:]
+
+	Nerrs := 0   /* number of errors */
+	Errflag := 0 /* error recovery flag */
+	yystate := 0
+	yyrcvr.char = -1
+	yytoken := -1 // yyrcvr.char translated into internal numbering
+	defer func() {
+		// Make sure we report no lookahead when not parsing.
+		yystate = -1
+		yyrcvr.char = -1
+		yytoken = -1
+	}()
+	yyp := -1
+	goto yystack
+
+ret0:
+	return 0
+
+ret1:
+	return 1
+
+yystack:
+	/* put a state and value onto the stack */
+	if yyDebug >= 4 {
+		__yyfmt__.Printf("char %v in %v\n", yyTokname(yytoken), yyStatname(yystate))
+	}
+
+	yyp++
+	if yyp >= len(yyS) {
+		nyys := make([]yySymType, len(yyS)*2)
+		copy(nyys, yyS)
+		yyS = nyys
+	}
+	yyS[yyp] = yyVAL
+	yyS[yyp].yys = yystate
+
+yynewstate:
+	yyn = int(yyPact[yystate])
+	if yyn <= yyFlag {
+		goto yydefault /* simple state */
+	}
+	if yyrcvr.char < 0 {
+		yyrcvr.char, yytoken = yylex1(yylex, &yyrcvr.lval)
+	}
+	yyn += yytoken
+	if yyn < 0 || yyn >= yyLast {
+		goto yydefault
+	}
+	yyn = int(yyAct[yyn])
+	if int(yyChk[yyn]) == yytoken { /* valid shift */
+		yyrcvr.char = -1
+		yytoken = -1
+		yyVAL = yyrcvr.lval
+		yystate = yyn
+		if Errflag > 0 {
+			Errflag--
+		}
+		goto yystack
+	}
+
+yydefault:
+	/* default state action */
+	yyn = int(yyDef[yystate])
+	if yyn == -2 {
+		if yyrcvr.char < 0 {
+			yyrcvr.char, yytoken = yylex1(yylex, &yyrcvr.lval)
+		}
+
+		/* look through exception table */
+		xi := 0
+		for {
+			if yyExca[xi+0] == -1 && int(yyExca[xi+1]) == yystate {
+				break
+			}
+			xi += 2
+		}
+		for xi += 2; ; xi += 2 {
+			yyn = int(yyExca[xi+0])
+			if yyn < 0 || yyn == yytoken {
+				break
+			}
+		}
+		yyn = int(yyExca[xi+1])
+		if yyn < 0 {
+			goto ret0
+		}
+	}
+	if yyn == 0 {
+		/* error ... attempt to resume parsing */
+		switch Errflag {
+		case 0: /* brand new error */
+			yylex.Error(yyErrorMessage(yystate, yytoken))
+			Nerrs++
+			if yyDebug >= 1 {
+				__yyfmt__.Printf("%s", yyStatname(yystate))
+				__yyfmt__.Printf(" saw %s\n", yyTokname(yytoken))
+			}
+			fallthrough
+
+		case 1, 2: /* incompletely recovered error ... try again */
+			Errflag = 3
+
+			/* find a state where "error" is a legal shift action */
+			for yyp >= 0 {
+				yyn = int(yyPact[yyS[yyp].yys]) + yyErrCode
+				if yyn >= 0 && yyn < yyLast {
+					yystate = int(yyAct[yyn]) /* simulate a shift of "error" */
+					if int(yyChk[yystate]) == yyErrCode {
+						goto yystack
+					}
+				}
+
+				/* the current p has no shift on "error", pop stack */
+				if yyDebug >= 2 {
+					__yyfmt__.Printf("error recovery pops state %d\n", yyS[yyp].yys)
+				}
+				yyp--
+			}
+			/* there is no state on the stack with an error shift ... abort */
+			goto ret1
+
+		case 3: /* no shift yet; clobber input char */
+			if yyDebug >= 2 {
+				__yyfmt__.Printf("error recovery discards %s\n", yyTokname(yytoken))
+			}
+			if yytoken == yyEofCode {
+				goto ret1
+			}
+			yyrcvr.char = -1
+			yytoken = -1
+			goto yynewstate /* try again in the same state */
+		}
+	}
+
+	/* reduction by production yyn */
+	if yyDebug >= 2 {
+		__yyfmt__.Printf("reduce %v in:\n\t%v\n", yyn, yyStatname(yystate))
+	}
+
+	yynt := yyn
+	yypt := yyp
+	_ = yypt // guard against "declared and not used"
+
+	yyp -= int(yyR2[yyn])
+	// yyp is now the index of $0. Perform the default action. Iff the
+	// reduced production is ε, $1 is possibly out of range.
+	if yyp+1 >= len(yyS) {
+		nyys := make([]yySymType, len(yyS)*2)
+		copy(nyys, yyS)
+		yyS = nyys
+	}
+	yyVAL = yyS[yyp+1]
+
+	/* consult goto table to find next state */
+	yyn = int(yyR1[yyn])
+	yyg := int(yyPgo[yyn])
+	yyj := yyg + yyS[yyp].yys + 1
+
+	if yyj >= yyLast {
+		yystate = int(yyAct[yyg])
+	} else {
+		yystate = int(yyAct[yyj])
+		if int(yyChk[yystate]) != -yyn {
+			yystate = int(yyAct[yyg])
+		}
+	}
+	// dummy call; replaced with literal code
+	switch yynt {
+
+	case 1:
+		yyDollar = yyS[yypt-1 : yypt+1]
+//line pkg/dsl/expr.y:21
+		{
+			yyVAL.expr = yyDollar[1].expr
+			yylex.(*Lexer).result = yyVAL.expr
+		}
+	case 2:
+		yyDollar = yyS[yypt-3 : yypt+1]
+//line pkg/dsl/expr.y:27
+		{
+			yyVAL.expr = ParenthesisExpr{inner: yyDollar[2].expr}
+		}
+	case 3:
+		yyDollar = yyS[yypt-3 : yypt+1]
+//line pkg/dsl/expr.y:28
+		{
+			yyVAL.expr = LogicalExpr{left: yyDollar[1].expr, operator: operatorAnd, right: yyDollar[3].expr}
+		}
+	case 4:
+		yyDollar = yyS[yypt-3 : yypt+1]
+//line pkg/dsl/expr.y:29
+		{
+			yyVAL.expr = LogicalExpr{left: yyDollar[1].expr, operator: operatorOr, right: yyDollar[3].expr}
+		}
+	case 5:
+		yyDollar = yyS[yypt-4 : yypt+1]
+//line pkg/dsl/expr.y:30
+		{
+			yyVAL.expr = WithExpr{key: yyDollar[3].value}
+		}
+	case 6:
+		yyDollar = yyS[yypt-4 : yypt+1]
+//line pkg/dsl/expr.y:31
+		{
+			yyVAL.expr = WithoutExpr{key: yyDollar[3].value}
+		}
+	case 7:
+		yyDollar = yyS[yypt-3 : yypt+1]
+//line pkg/dsl/expr.y:32
+		{
+			yyVAL.expr = EqExpr{key: yyDollar[1].value, value: yyDollar[3].value}
+		}
+	case 8:
+		yyDollar = yyS[yypt-3 : yypt+1]
+//line pkg/dsl/expr.y:33
+		{
+			yyVAL.expr = NEqExpr{key: yyDollar[1].value, value: yyDollar[3].value}
+		}
+	case 9:
+		yyDollar = yyS[yypt-3 : yypt+1]
+//line pkg/dsl/expr.y:34
+		{
+			yyVAL.expr = EqNumExpr{key: yyDollar[1].value, value: yyDollar[3].intValue}
+		}
+	case 10:
+		yyDollar = yyS[yypt-3 : yypt+1]
+//line pkg/dsl/expr.y:35
+		{
+			yyVAL.expr = NEqNumExpr{key: yyDollar[1].value, value: yyDollar[3].intValue}
+		}
+	case 11:
+		yyDollar = yyS[yypt-3 : yypt+1]
+//line pkg/dsl/expr.y:36
+		{
+			yyVAL.expr = LessThanExpr{key: yyDollar[1].value, value: yyDollar[3].intValue}
+		}
+	case 12:
+		yyDollar = yyS[yypt-3 : yypt+1]
+//line pkg/dsl/expr.y:37
+		{
+			yyVAL.expr = GreaterThanExpr{key: yyDollar[1].value, value: yyDollar[3].intValue}
+		}
+	case 13:
+		yyDollar = yyS[yypt-3 : yypt+1]
+//line pkg/dsl/expr.y:38
+		{
+			yyVAL.expr = LessOrEqualThanExpr{key: yyDollar[1].value, value: yyDollar[3].intValue}
+		}
+	case 14:
+		yyDollar = yyS[yypt-3 : yypt+1]
+//line pkg/dsl/expr.y:39
+		{
+			yyVAL.expr = GreaterOrEqualThanExpr{key: yyDollar[1].value, value: yyDollar[3].intValue}
+		}
+	case 15:
+		yyDollar = yyS[yypt-3 : yypt+1]
+//line pkg/dsl/expr.y:40
+		{
+			yyVAL.expr = RegExpr{key: yyDollar[1].value, value: yyDollar[3].value}
+		}
+	case 16:
+		yyDollar = yyS[yypt-3 : yypt+1]
+//line pkg/dsl/expr.y:41
+		{
+			yyVAL.expr = NRegExpr{key: yyDollar[1].value, value: yyDollar[3].value}
+		}
+	}
+	goto yystack /* stack new state and value */
+}
diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/dsl/lexer.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/dsl/lexer.go
new file mode 100644
index 0000000000000000000000000000000000000000..50289a64f4b650f10e6984922994cc05f0599946
--- /dev/null
+++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/dsl/lexer.go
@@ -0,0 +1,236 @@
+package dsl
+
+import (
+	"errors"
+	"fmt"
+	"regexp"
+	"strconv"
+	"strings"
+	"text/scanner"
+
+	"github.com/netobserv/flowlogs-pipeline/pkg/utils/filters"
+)
+
+const (
+	operatorOr  = "or"
+	operatorAnd = "and"
+)
+
+var syntaxTokens = map[string]int{
+	"=":         EQ,
+	"!=":        NEQ,
+	"=~":        REG,
+	"!~":        NREG,
+	">":         GT,
+	"<":         LT,
+	">=":        GE,
+	"<=":        LE,
+	operatorOr:  OR,
+	operatorAnd: AND,
+	"with":      WITH,
+	"without":   WITHOUT,
+	"(":         OPEN_PARENTHESIS,
+	")":         CLOSE_PARENTHESIS,
+}
+
+type Expression interface {
+	toTree() (*tree, error)
+}
+
+type ParenthesisExpr struct {
+	inner Expression
+}
+
+func (e ParenthesisExpr) toTree() (*tree, error) {
+	return e.inner.toTree()
+}
+
+type kvPair struct {
+	key   string
+	value string
+}
+
+type kvPairInt struct {
+	key   string
+	value int
+}
+
+type EqExpr kvPair
+
+func (e EqExpr) toTree() (*tree, error) {
+	return &tree{predicate: filters.Equal(e.key, e.value, false)}, nil
+}
+
+type NEqExpr kvPair
+
+func (e NEqExpr) toTree() (*tree, error) {
+	return &tree{predicate: filters.NotEqual(e.key, e.value, false)}, nil
+}
+
+type EqNumExpr kvPairInt
+
+func (e EqNumExpr) toTree() (*tree, error) {
+	return &tree{predicate: filters.NumEquals(e.key, e.value)}, nil
+}
+
+type NEqNumExpr kvPairInt
+
+func (e NEqNumExpr) toTree() (*tree, error) {
+	return &tree{predicate: filters.NumNotEquals(e.key, e.value)}, nil
+}
+
+type LessThanExpr kvPairInt
+
+func (e LessThanExpr) toTree() (*tree, error) {
+	return &tree{predicate: filters.LessThan(e.key, e.value)}, nil
+}
+
+type GreaterThanExpr kvPairInt
+
+func (e GreaterThanExpr) toTree() (*tree, error) {
+	return &tree{predicate: filters.GreaterThan(e.key, e.value)}, nil
+}
+
+type LessOrEqualThanExpr kvPairInt
+
+func (e LessOrEqualThanExpr) toTree() (*tree, error) {
+	return &tree{predicate: filters.LessOrEqualThan(e.key, e.value)}, nil
+}
+
+type GreaterOrEqualThanExpr kvPairInt
+
+func (e GreaterOrEqualThanExpr) toTree() (*tree, error) {
+	return &tree{predicate: filters.GreaterOrEqualThan(e.key, e.value)}, nil
+}
+
+type RegExpr kvPair
+
+func (e RegExpr) toTree() (*tree, error) {
+	r, err := regexp.Compile(e.value)
+	if err != nil {
+		return nil, fmt.Errorf("invalid regex filter: cannot compile regex [%w]", err)
+	}
+	return &tree{predicate: filters.Regex(e.key, r)}, nil
+}
+
+type NRegExpr kvPair
+
+func (e NRegExpr) toTree() (*tree, error) {
+	r, err := regexp.Compile(e.value)
+	if err != nil {
+		return nil, fmt.Errorf("invalid regex filter: cannot compile regex [%w]", err)
+	}
+	return &tree{predicate: filters.NotRegex(e.key, r)}, nil
+}
+
+type WithExpr struct {
+	key string
+}
+
+func (e WithExpr) toTree() (*tree, error) {
+	return &tree{predicate: filters.Presence(e.key)}, nil
+}
+
+type WithoutExpr struct {
+	key string
+}
+
+func (e WithoutExpr) toTree() (*tree, error) {
+	return &tree{predicate: filters.Absence(e.key)}, nil
+}
+
+type LogicalExpr struct {
+	left     Expression
+	operator string
+	right    Expression
+}
+
+func (le LogicalExpr) toTree() (*tree, error) {
+	left, err := le.left.toTree()
+	if err != nil {
+		return nil, err
+	}
+	right, err := le.right.toTree()
+	if err != nil {
+		return nil, err
+	}
+	return &tree{
+		logicalOp: le.operator,
+		children:  []*tree{left, right},
+	}, nil
+}
+
+type Lexer struct {
+	scanner.Scanner
+	errs   []error
+	result Expression
+}
+
+func (l *Lexer) Lex(lval *yySymType) int {
+	token := l.Scan()
+	if token == scanner.EOF {
+		return 0
+	}
+	tokenText := l.TokenText()
+	lval.value = tokenText
+
+	switch token {
+	case scanner.Int:
+		// Reading arbitrary number
+		res, err := strconv.ParseInt(tokenText, 10, 64)
+		if err != nil {
+			l.Error(err.Error())
+			return 0
+		}
+		lval.intValue = int(res)
+		return NUMBER
+	case scanner.Float:
+		l.Error("Float values are currently unsupported")
+		return 0
+	case scanner.String, scanner.RawString:
+		// Reading arbitrary double-quotes delimited string
+		var err error
+		lval.value, err = strconv.Unquote(tokenText)
+		if err != nil {
+			l.Error(err.Error())
+			return 0
+		}
+		return STRING
+	}
+
+	// Check if this is a syntaxToken
+
+	// Some characters are read as a token, such as "=", regardless of what follows
+	// To read "=~" as a token, we need to Peek next rune manually
+	tokenNext := tokenText + string(l.Peek())
+	if tok, ok := syntaxTokens[tokenNext]; ok {
+		l.Next()
+		return tok
+	}
+
+	if tok, ok := syntaxTokens[strings.ToLower(tokenText)]; ok {
+		return tok
+	}
+
+	// When none of the above returned, this must be a NetFlow field name
+	return NF_FIELD
+}
+
+func (l *Lexer) Error(msg string) {
+	l.errs = append(l.errs, fmt.Errorf("%s: %d:%d", msg, l.Line, l.Column))
+}
+
+func Parse(s string) (filters.Predicate, error) {
+	l := new(Lexer)
+	l.Init(strings.NewReader(s))
+	yyErrorVerbose = true
+	yyParse(l)
+	if len(l.errs) > 0 {
+		return nil, errors.Join(l.errs...)
+	}
+	t, err := l.result.toTree()
+	if err != nil {
+		return nil, err
+	}
+	return t.apply, nil
+}
diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/transform_filter.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/transform_filter.go
index 0d8dd718953dc3469a2f222a4ce8a174199dfaf4..a2ab7285ce48968ae2dbe8152a52f89b0b302e2d 100644
--- a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/transform_filter.go
+++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/transform/transform_filter.go
@@ -27,6 +27,7 @@ import (
 	"github.com/Knetic/govaluate"
 	"github.com/netobserv/flowlogs-pipeline/pkg/api"
 	"github.com/netobserv/flowlogs-pipeline/pkg/config"
+	"github.com/netobserv/flowlogs-pipeline/pkg/dsl"
 	"github.com/netobserv/flowlogs-pipeline/pkg/utils"
 	"github.com/netobserv/flowlogs-pipeline/pkg/utils/filters"
 	"github.com/sirupsen/logrus"
@@ -39,12 +40,12 @@ var (
 
 type Filter struct {
 	Rules     []api.TransformFilterRule
-	KeepRules []predicatesRule
+	KeepRules []predicateRule
 }
 
-type predicatesRule struct {
-	predicates []filters.Predicate
-	sampling   uint16
+type predicateRule struct {
+	predicate filters.Predicate
+	sampling  uint16
 }
 
 // Transform transforms a flow; if false is returned as a second argument, the entry is dropped
@@ -55,7 +56,7 @@ func (f *Filter) Transform(entry config.GenericMap) (config.GenericMap, bool) {
 	if len(f.KeepRules) > 0 {
 		keep := false
 		for _, r := range f.KeepRules {
-			if applyPredicates(outputEntry, r) {
+			if applyPredicate(outputEntry, r) {
 				keep = true
 				break
 			}
@@ -162,9 +163,9 @@ func applyRule(entry config.GenericMap, labels map[string]string, rule *api.Tran
 		return !isRemoveEntrySatisfied(entry, rule.RemoveEntryAllSatisfied)
 	case api.ConditionalSampling:
 		return sample(entry, rule.ConditionalSampling)
-	case api.KeepEntryAllSatisfied:
+	case api.KeepEntryQuery:
 		// This should be processed only in "applyPredicates". Failure to do so is a bug.
-		tlog.Panicf("unexpected KeepEntryAllSatisfied: %v", rule)
+		tlog.Panicf("unexpected KeepEntryQuery: %v", rule)
 	default:
 		tlog.Panicf("unknown type %s for transform.Filter rule: %v", rule.Type, rule)
 	}
@@ -181,16 +182,11 @@ func isRemoveEntrySatisfied(entry config.GenericMap, rules []*api.RemoveEntryRul
 	return true
 }
 
-func applyPredicates(entry config.GenericMap, rule predicatesRule) bool {
+func applyPredicate(entry config.GenericMap, rule predicateRule) bool {
 	if !rollSampling(rule.sampling) {
 		return false
 	}
-	for _, p := range rule.predicates {
-		if !p(entry) {
-			return false
-		}
-	}
-	return true
+	return rule.predicate(entry)
 }
 
 func sample(entry config.GenericMap, rules []*api.SamplingCondition) bool {
@@ -209,22 +205,21 @@ func rollSampling(value uint16) bool {
 // NewTransformFilter create a new filter transform
 func NewTransformFilter(params config.StageParam) (Transformer, error) {
 	tlog.Debugf("entering NewTransformFilter")
-	keepRules := []predicatesRule{}
+	keepRules := []predicateRule{}
 	rules := []api.TransformFilterRule{}
 	if params.Transform != nil && params.Transform.Filter != nil {
 		params.Transform.Filter.Preprocess()
 		for i := range params.Transform.Filter.Rules {
 			baseRules := &params.Transform.Filter.Rules[i]
-			if baseRules.Type == api.KeepEntryAllSatisfied {
-				pr := predicatesRule{sampling: baseRules.KeepEntrySampling}
-				for _, keepRule := range baseRules.KeepEntryAllSatisfied {
-					pred, err := filters.FromKeepEntry(keepRule)
-					if err != nil {
-						return nil, err
-					}
-					pr.predicates = append(pr.predicates, pred)
+			if baseRules.Type == api.KeepEntryQuery {
+				predicate, err := dsl.Parse(baseRules.KeepEntryQuery)
+				if err != nil {
+					return nil, err
 				}
-				keepRules = append(keepRules, pr)
+				keepRules = append(keepRules, predicateRule{
+					sampling:  baseRules.KeepEntrySampling,
+					predicate: predicate,
+				})
 			} else {
 				rules = append(rules, *baseRules)
 			}
diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/utils/filters/filters.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/utils/filters/filters.go
index 8bb12fcb6cff40d950857635d3cfefe64eae45fd..21b62dac34245ab25cc1ae73d30a30f20d020dad 100644
--- a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/utils/filters/filters.go
+++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/utils/filters/filters.go
@@ -1,11 +1,9 @@
 package filters
 
 import (
-	"fmt"
 	"regexp"
 	"strings"
 
-	"github.com/netobserv/flowlogs-pipeline/pkg/api"
 	"github.com/netobserv/flowlogs-pipeline/pkg/config"
 	"github.com/netobserv/flowlogs-pipeline/pkg/utils"
 )
@@ -69,6 +67,41 @@ func NotEqual(key string, filterValue any, convertString bool) Predicate {
 	return func(flow config.GenericMap) bool { return !pred(flow) }
 }
 
+func NumEquals(key string, filterValue int) Predicate {
+	return castIntAndCheck(key, func(i int) bool { return i == filterValue })
+}
+
+func NumNotEquals(key string, filterValue int) Predicate {
+	return castIntAndCheck(key, func(i int) bool { return i != filterValue })
+}
+
+func LessThan(key string, filterValue int) Predicate {
+	return castIntAndCheck(key, func(i int) bool { return i < filterValue })
+}
+
+func GreaterThan(key string, filterValue int) Predicate {
+	return castIntAndCheck(key, func(i int) bool { return i > filterValue })
+}
+
+func LessOrEqualThan(key string, filterValue int) Predicate {
+	return castIntAndCheck(key, func(i int) bool { return i <= filterValue })
+}
+
+func GreaterOrEqualThan(key string, filterValue int) Predicate {
+	return castIntAndCheck(key, func(i int) bool { return i >= filterValue })
+}
+
+func castIntAndCheck(key string, check func(int) bool) Predicate {
+	return func(flow config.GenericMap) bool {
+		if val, found := flow[key]; found {
+			if cast, err := utils.ConvertToInt(val); err == nil {
+				return check(cast)
+			}
+		}
+		return false
+	}
+}
+
 func Regex(key string, filterRegex *regexp.Regexp) Predicate {
 	return func(flow config.GenericMap) bool {
 		if val, found := flow[key]; found {
@@ -113,41 +146,3 @@ func injectVars(flow config.GenericMap, filterValue string, varLookups [][]strin
 	}
 	return injected
 }
-
-func FromKeepEntry(from *api.KeepEntryRule) (Predicate, error) {
-	switch from.Type {
-	case api.KeepEntryIfExists:
-		return Presence(from.KeepEntry.Input), nil
-	case api.KeepEntryIfDoesntExist:
-		return Absence(from.KeepEntry.Input), nil
-	case api.KeepEntryIfEqual:
-		return Equal(from.KeepEntry.Input, from.KeepEntry.Value, true), nil
-	case api.KeepEntryIfNotEqual:
-		return NotEqual(from.KeepEntry.Input, from.KeepEntry.Value, true), nil
-	case api.KeepEntryIfRegexMatch:
-		if r, err := compileRegex(from.KeepEntry); err != nil {
-			return nil, err
-		} else {
-			return Regex(from.KeepEntry.Input, r), nil
-		}
-	case api.KeepEntryIfNotRegexMatch:
-		if r, err := compileRegex(from.KeepEntry); err != nil {
-			return nil, err
-		} else {
-			return NotRegex(from.KeepEntry.Input, r), nil
-		}
-	}
-	return nil, fmt.Errorf("keep entry rule type not recognized: %s", from.Type)
-}
-
-func compileRegex(from *api.TransformFilterGenericRule) (*regexp.Regexp, error) {
-	s, ok := from.Value.(string)
-	if !ok {
-		return nil, fmt.Errorf("invalid regex keep rule: rule value must be a string [%v]", from)
-	}
-	r, err := regexp.Compile(s)
-	if err != nil {
-		return nil, fmt.Errorf("invalid regex keep rule: cannot compile regex [%w]", err)
-	}
-	return r, nil
-}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 8c80844f74867119042005ed7dc6e82fbcc67b72..0ad3ec9d7ca538204c3cccd8efe7e0d76a6334c7 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -194,8 +194,8 @@ github.com/google/uuid
 ## explicit; go 1.22.0
 github.com/gopacket/gopacket
 github.com/gopacket/gopacket/layers
-# github.com/gorilla/websocket v1.5.3
-## explicit; go 1.12
+# github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674
+## explicit; go 1.20
 github.com/gorilla/websocket
 # github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc
 ## explicit; go 1.21
@@ -298,10 +298,11 @@ github.com/mwitkow/go-conntrack
 # github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f
 ## explicit
 github.com/mxk/go-flowrate/flowrate
-# github.com/netobserv/flowlogs-pipeline v1.9.0-crc0.0.20250425123404-cab7e8f74fae
+# github.com/netobserv/flowlogs-pipeline v1.9.0-crc0.0.20250502080315-db59695ae6de
 ## explicit; go 1.23.0
 github.com/netobserv/flowlogs-pipeline/pkg/api
 github.com/netobserv/flowlogs-pipeline/pkg/config
+github.com/netobserv/flowlogs-pipeline/pkg/dsl
 github.com/netobserv/flowlogs-pipeline/pkg/kafka
 github.com/netobserv/flowlogs-pipeline/pkg/operational
 github.com/netobserv/flowlogs-pipeline/pkg/pipeline