diff --git a/go.mod b/go.mod index c9656aee4cb4dfca747abae3411d2eb4818cf7a5..6052ad4ba7f343011c12a100caf3c28bd5a471bf 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/vishvananda/netlink v1.3.0 github.com/vishvananda/netns v0.0.5 github.com/vladimirvivien/gexe v0.4.1 - github.com/vmware/go-ipfix v0.12.0 + github.com/vmware/go-ipfix v0.13.0 golang.org/x/sys v0.30.0 google.golang.org/grpc v1.70.0 google.golang.org/protobuf v1.36.5 @@ -94,7 +94,7 @@ require ( github.com/netobserv/loki-client-go v0.0.0-20220927092034-f37122a54500 // indirect github.com/netsampler/goflow2 v1.3.7 // indirect github.com/ovn-org/libovsdb v0.7.1-0.20240820095311-ce1951614a20 // indirect - github.com/pierrec/lz4/v4 v4.1.21 // indirect + github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/pion/dtls/v2 v2.2.12 // indirect github.com/pion/logging v0.2.2 // indirect github.com/pion/transport/v2 v2.2.10 // indirect @@ -108,7 +108,7 @@ require ( github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/safchain/ethtool v0.3.1-0.20231027162144-83e5e0097c91 // indirect github.com/spf13/cobra v1.8.1 // indirect - github.com/spf13/pflag v1.0.5 // indirect + github.com/spf13/pflag v1.0.6 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/urfave/cli/v2 v2.27.2 // indirect github.com/x448/float16 v0.8.4 // indirect diff --git a/go.sum b/go.sum index 0c57a1f359901e30d33b9273baae0bdadee3a2ae..a5a1852db4cd5ff251e6eee56f757725104b0396 100644 --- a/go.sum +++ b/go.sum @@ -768,8 +768,8 @@ github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= -github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= -github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= +github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pion/dtls/v2 v2.2.12 h1:KP7H5/c1EiVAAKUmXyCzPiQe5+bCJrpOeKg/L05dunk= github.com/pion/dtls/v2 v2.2.12/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= @@ -893,8 +893,9 @@ github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3k github.com/spf13/pflag v0.0.0-20170130214245-9ff6c6923cff/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= -github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o= +github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.19.0 h1:RWq5SEjt8o25SROyN3z2OrDB9l7RPd3lwTWU8EcEdcI= github.com/spf13/viper v1.19.0/go.mod h1:GQUN9bilAbhU/jgc1bKs99f/suXKeUMct8Adx5+Ntkg= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= @@ -939,8 +940,8 @@ github.com/vishvananda/netns v0.0.5 h1:DfiHV+j8bA32MFM7bfEunvT8IAqQ/NzSJHtcmW5zd github.com/vishvananda/netns v0.0.5/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM= github.com/vladimirvivien/gexe v0.4.1 h1:W9gWkp8vSPjDoXDu04Yp4KljpVMaSt8IQuHswLDd5LY= github.com/vladimirvivien/gexe v0.4.1/go.mod h1:3gjgTqE2c0VyHnU5UOIwk7gyNzZDGulPb/DJPgcw64E= -github.com/vmware/go-ipfix v0.12.0 h1:a4YXeCWTa251aZO7u7e9dKDOoU2eHJID45SPlq9j+HI= -github.com/vmware/go-ipfix v0.12.0/go.mod h1:9PiutVWLhQQ6WHncRrGkH0i2Rx82DEOKhu80VSd9jds= +github.com/vmware/go-ipfix v0.13.0 h1:v3paBzd7oq7LEU1SzDwD5RGoYcGROLQycYyN3EzLvDk= +github.com/vmware/go-ipfix v0.13.0/go.mod h1:UTIR38AuEePzrWYjQOvnORCYRG33xZJ56E0K75mSosM= github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/wlynxg/anet v0.0.3/go.mod h1:eay5PRQr7fIVAMbTbchTnO9gG65Hg/uYGdc7mguHxoA= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= diff --git a/vendor/github.com/pierrec/lz4/v4/internal/lz4stream/block.go b/vendor/github.com/pierrec/lz4/v4/internal/lz4stream/block.go index e96465460c5d488b8e1a6bdfe66c97f952b08f69..04aaca8480d3f5371ff3e20c2078ae09b0beeafa 100644 --- a/vendor/github.com/pierrec/lz4/v4/internal/lz4stream/block.go +++ b/vendor/github.com/pierrec/lz4/v4/internal/lz4stream/block.go @@ -246,7 +246,7 @@ func (b *FrameDataBlock) Compress(f *Frame, src []byte, level lz4block.Compressi b.src = src // keep track of the source for content checksum if f.Descriptor.Flags.BlockChecksum() { - b.Checksum = xxh32.ChecksumZero(src) + b.Checksum = xxh32.ChecksumZero(b.Data) } return b } @@ -328,7 +328,7 @@ func (b *FrameDataBlock) Uncompress(f *Frame, dst, dict []byte, sum bool) ([]byt dst = dst[:n] } if f.Descriptor.Flags.BlockChecksum() { - if c := xxh32.ChecksumZero(dst); c != b.Checksum { + if c := xxh32.ChecksumZero(b.data); c != b.Checksum { err := fmt.Errorf("%w: got %x; expected %x", lz4errors.ErrInvalidBlockChecksum, c, b.Checksum) return nil, err } diff --git a/vendor/github.com/spf13/pflag/.editorconfig b/vendor/github.com/spf13/pflag/.editorconfig new file mode 100644 index 0000000000000000000000000000000000000000..4492e9f9fe15be31c56bb65e3710820a114804a4 --- /dev/null +++ b/vendor/github.com/spf13/pflag/.editorconfig @@ -0,0 +1,12 @@ +root = true + +[*] +charset = utf-8 +end_of_line = lf +indent_size = 4 +indent_style = space +insert_final_newline = true +trim_trailing_whitespace = true + +[*.go] +indent_style = tab diff --git a/vendor/github.com/spf13/pflag/.golangci.yaml b/vendor/github.com/spf13/pflag/.golangci.yaml new file mode 100644 index 0000000000000000000000000000000000000000..b274f248451b3c9d6949d266e9d01ac59955e1ff --- /dev/null +++ b/vendor/github.com/spf13/pflag/.golangci.yaml @@ -0,0 +1,4 @@ +linters: + disable-all: true + enable: + - nolintlint diff --git a/vendor/github.com/spf13/pflag/flag.go b/vendor/github.com/spf13/pflag/flag.go index 24a5036e95b61b7dd9a66674ee897cf4768a5c06..7c058de3744a1a4c2028908bc2116cf13e939f24 100644 --- a/vendor/github.com/spf13/pflag/flag.go +++ b/vendor/github.com/spf13/pflag/flag.go @@ -160,7 +160,7 @@ type FlagSet struct { args []string // arguments after flags argsLenAtDash int // len(args) when a '--' was located when parsing, or -1 if no -- errorHandling ErrorHandling - output io.Writer // nil means stderr; use out() accessor + output io.Writer // nil means stderr; use Output() accessor interspersed bool // allow interspersed option/non-option args normalizeNameFunc func(f *FlagSet, name string) NormalizedName @@ -255,13 +255,20 @@ func (f *FlagSet) normalizeFlagName(name string) NormalizedName { return n(f, name) } -func (f *FlagSet) out() io.Writer { +// Output returns the destination for usage and error messages. os.Stderr is returned if +// output was not set or was set to nil. +func (f *FlagSet) Output() io.Writer { if f.output == nil { return os.Stderr } return f.output } +// Name returns the name of the flag set. +func (f *FlagSet) Name() string { + return f.name +} + // SetOutput sets the destination for usage and error messages. // If output is nil, os.Stderr is used. func (f *FlagSet) SetOutput(output io.Writer) { @@ -358,7 +365,7 @@ func (f *FlagSet) ShorthandLookup(name string) *Flag { } if len(name) > 1 { msg := fmt.Sprintf("can not look up shorthand which is more than one ASCII character: %q", name) - fmt.Fprintf(f.out(), msg) + fmt.Fprintf(f.Output(), msg) panic(msg) } c := name[0] @@ -482,7 +489,7 @@ func (f *FlagSet) Set(name, value string) error { } if flag.Deprecated != "" { - fmt.Fprintf(f.out(), "Flag --%s has been deprecated, %s\n", flag.Name, flag.Deprecated) + fmt.Fprintf(f.Output(), "Flag --%s has been deprecated, %s\n", flag.Name, flag.Deprecated) } return nil } @@ -523,7 +530,7 @@ func Set(name, value string) error { // otherwise, the default values of all defined flags in the set. func (f *FlagSet) PrintDefaults() { usages := f.FlagUsages() - fmt.Fprint(f.out(), usages) + fmt.Fprint(f.Output(), usages) } // defaultIsZeroValue returns true if the default value for this flag represents @@ -758,7 +765,7 @@ func PrintDefaults() { // defaultUsage is the default function to print a usage message. func defaultUsage(f *FlagSet) { - fmt.Fprintf(f.out(), "Usage of %s:\n", f.name) + fmt.Fprintf(f.Output(), "Usage of %s:\n", f.name) f.PrintDefaults() } @@ -844,7 +851,7 @@ func (f *FlagSet) AddFlag(flag *Flag) { _, alreadyThere := f.formal[normalizedFlagName] if alreadyThere { msg := fmt.Sprintf("%s flag redefined: %s", f.name, flag.Name) - fmt.Fprintln(f.out(), msg) + fmt.Fprintln(f.Output(), msg) panic(msg) // Happens only if flags are declared with identical names } if f.formal == nil { @@ -860,7 +867,7 @@ func (f *FlagSet) AddFlag(flag *Flag) { } if len(flag.Shorthand) > 1 { msg := fmt.Sprintf("%q shorthand is more than one ASCII character", flag.Shorthand) - fmt.Fprintf(f.out(), msg) + fmt.Fprintf(f.Output(), msg) panic(msg) } if f.shorthands == nil { @@ -870,7 +877,7 @@ func (f *FlagSet) AddFlag(flag *Flag) { used, alreadyThere := f.shorthands[c] if alreadyThere { msg := fmt.Sprintf("unable to redefine %q shorthand in %q flagset: it's already used for %q flag", c, f.name, used.Name) - fmt.Fprintf(f.out(), msg) + fmt.Fprintf(f.Output(), msg) panic(msg) } f.shorthands[c] = flag @@ -909,7 +916,7 @@ func VarP(value Value, name, shorthand, usage string) { func (f *FlagSet) failf(format string, a ...interface{}) error { err := fmt.Errorf(format, a...) if f.errorHandling != ContinueOnError { - fmt.Fprintln(f.out(), err) + fmt.Fprintln(f.Output(), err) f.usage() } return err @@ -1060,7 +1067,7 @@ func (f *FlagSet) parseSingleShortArg(shorthands string, args []string, fn parse } if flag.ShorthandDeprecated != "" { - fmt.Fprintf(f.out(), "Flag shorthand -%s has been deprecated, %s\n", flag.Shorthand, flag.ShorthandDeprecated) + fmt.Fprintf(f.Output(), "Flag shorthand -%s has been deprecated, %s\n", flag.Shorthand, flag.ShorthandDeprecated) } err = fn(flag, value) diff --git a/vendor/github.com/spf13/pflag/ip.go b/vendor/github.com/spf13/pflag/ip.go index 3d414ba69fe1d3881c1551a5cdf2850b3886d35a..06b8bcb5721595c38ef394aed505b063e63f5ecd 100644 --- a/vendor/github.com/spf13/pflag/ip.go +++ b/vendor/github.com/spf13/pflag/ip.go @@ -16,6 +16,9 @@ func newIPValue(val net.IP, p *net.IP) *ipValue { func (i *ipValue) String() string { return net.IP(*i).String() } func (i *ipValue) Set(s string) error { + if s == "" { + return nil + } ip := net.ParseIP(strings.TrimSpace(s)) if ip == nil { return fmt.Errorf("failed to parse IP: %q", s) diff --git a/vendor/github.com/spf13/pflag/ipnet_slice.go b/vendor/github.com/spf13/pflag/ipnet_slice.go new file mode 100644 index 0000000000000000000000000000000000000000..6b541aa8798cf1b3937abc4d19e6e1d13464b10b --- /dev/null +++ b/vendor/github.com/spf13/pflag/ipnet_slice.go @@ -0,0 +1,147 @@ +package pflag + +import ( + "fmt" + "io" + "net" + "strings" +) + +// -- ipNetSlice Value +type ipNetSliceValue struct { + value *[]net.IPNet + changed bool +} + +func newIPNetSliceValue(val []net.IPNet, p *[]net.IPNet) *ipNetSliceValue { + ipnsv := new(ipNetSliceValue) + ipnsv.value = p + *ipnsv.value = val + return ipnsv +} + +// Set converts, and assigns, the comma-separated IPNet argument string representation as the []net.IPNet value of this flag. +// If Set is called on a flag that already has a []net.IPNet assigned, the newly converted values will be appended. +func (s *ipNetSliceValue) Set(val string) error { + + // remove all quote characters + rmQuote := strings.NewReplacer(`"`, "", `'`, "", "`", "") + + // read flag arguments with CSV parser + ipNetStrSlice, err := readAsCSV(rmQuote.Replace(val)) + if err != nil && err != io.EOF { + return err + } + + // parse ip values into slice + out := make([]net.IPNet, 0, len(ipNetStrSlice)) + for _, ipNetStr := range ipNetStrSlice { + _, n, err := net.ParseCIDR(strings.TrimSpace(ipNetStr)) + if err != nil { + return fmt.Errorf("invalid string being converted to CIDR: %s", ipNetStr) + } + out = append(out, *n) + } + + if !s.changed { + *s.value = out + } else { + *s.value = append(*s.value, out...) + } + + s.changed = true + + return nil +} + +// Type returns a string that uniquely represents this flag's type. +func (s *ipNetSliceValue) Type() string { + return "ipNetSlice" +} + +// String defines a "native" format for this net.IPNet slice flag value. +func (s *ipNetSliceValue) String() string { + + ipNetStrSlice := make([]string, len(*s.value)) + for i, n := range *s.value { + ipNetStrSlice[i] = n.String() + } + + out, _ := writeAsCSV(ipNetStrSlice) + return "[" + out + "]" +} + +func ipNetSliceConv(val string) (interface{}, error) { + val = strings.Trim(val, "[]") + // Emtpy string would cause a slice with one (empty) entry + if len(val) == 0 { + return []net.IPNet{}, nil + } + ss := strings.Split(val, ",") + out := make([]net.IPNet, len(ss)) + for i, sval := range ss { + _, n, err := net.ParseCIDR(strings.TrimSpace(sval)) + if err != nil { + return nil, fmt.Errorf("invalid string being converted to CIDR: %s", sval) + } + out[i] = *n + } + return out, nil +} + +// GetIPNetSlice returns the []net.IPNet value of a flag with the given name +func (f *FlagSet) GetIPNetSlice(name string) ([]net.IPNet, error) { + val, err := f.getFlagType(name, "ipNetSlice", ipNetSliceConv) + if err != nil { + return []net.IPNet{}, err + } + return val.([]net.IPNet), nil +} + +// IPNetSliceVar defines a ipNetSlice flag with specified name, default value, and usage string. +// The argument p points to a []net.IPNet variable in which to store the value of the flag. +func (f *FlagSet) IPNetSliceVar(p *[]net.IPNet, name string, value []net.IPNet, usage string) { + f.VarP(newIPNetSliceValue(value, p), name, "", usage) +} + +// IPNetSliceVarP is like IPNetSliceVar, but accepts a shorthand letter that can be used after a single dash. +func (f *FlagSet) IPNetSliceVarP(p *[]net.IPNet, name, shorthand string, value []net.IPNet, usage string) { + f.VarP(newIPNetSliceValue(value, p), name, shorthand, usage) +} + +// IPNetSliceVar defines a []net.IPNet flag with specified name, default value, and usage string. +// The argument p points to a []net.IPNet variable in which to store the value of the flag. +func IPNetSliceVar(p *[]net.IPNet, name string, value []net.IPNet, usage string) { + CommandLine.VarP(newIPNetSliceValue(value, p), name, "", usage) +} + +// IPNetSliceVarP is like IPNetSliceVar, but accepts a shorthand letter that can be used after a single dash. +func IPNetSliceVarP(p *[]net.IPNet, name, shorthand string, value []net.IPNet, usage string) { + CommandLine.VarP(newIPNetSliceValue(value, p), name, shorthand, usage) +} + +// IPNetSlice defines a []net.IPNet flag with specified name, default value, and usage string. +// The return value is the address of a []net.IPNet variable that stores the value of that flag. +func (f *FlagSet) IPNetSlice(name string, value []net.IPNet, usage string) *[]net.IPNet { + p := []net.IPNet{} + f.IPNetSliceVarP(&p, name, "", value, usage) + return &p +} + +// IPNetSliceP is like IPNetSlice, but accepts a shorthand letter that can be used after a single dash. +func (f *FlagSet) IPNetSliceP(name, shorthand string, value []net.IPNet, usage string) *[]net.IPNet { + p := []net.IPNet{} + f.IPNetSliceVarP(&p, name, shorthand, value, usage) + return &p +} + +// IPNetSlice defines a []net.IPNet flag with specified name, default value, and usage string. +// The return value is the address of a []net.IP variable that stores the value of the flag. +func IPNetSlice(name string, value []net.IPNet, usage string) *[]net.IPNet { + return CommandLine.IPNetSliceP(name, "", value, usage) +} + +// IPNetSliceP is like IPNetSlice, but accepts a shorthand letter that can be used after a single dash. +func IPNetSliceP(name, shorthand string, value []net.IPNet, usage string) *[]net.IPNet { + return CommandLine.IPNetSliceP(name, shorthand, value, usage) +} diff --git a/vendor/github.com/spf13/pflag/string_array.go b/vendor/github.com/spf13/pflag/string_array.go index 4894af818023bf132665556333e84426f80d7cc8..d1ff0a96ba0b5e4b67fc39db6ee85d125494f147 100644 --- a/vendor/github.com/spf13/pflag/string_array.go +++ b/vendor/github.com/spf13/pflag/string_array.go @@ -31,11 +31,7 @@ func (s *stringArrayValue) Append(val string) error { func (s *stringArrayValue) Replace(val []string) error { out := make([]string, len(val)) for i, d := range val { - var err error out[i] = d - if err != nil { - return err - } } *s.value = out return nil diff --git a/vendor/github.com/vmware/go-ipfix/pkg/collector/process.go b/vendor/github.com/vmware/go-ipfix/pkg/collector/process.go index 5f406b9d3da8b0603baa2271f2f7854d3e2c5c69..3fa4bb5d70c844add6fe5877a27c5f4e1bb56461 100644 --- a/vendor/github.com/vmware/go-ipfix/pkg/collector/process.go +++ b/vendor/github.com/vmware/go-ipfix/pkg/collector/process.go @@ -28,7 +28,6 @@ import ( "github.com/vmware/go-ipfix/pkg/entities" "github.com/vmware/go-ipfix/pkg/registry" - "github.com/vmware/go-ipfix/pkg/util" ) // DecodingMode specifies how unknown information elements (in templates) are handled when decoding. @@ -84,11 +83,16 @@ type CollectingProcess struct { // decoding. decodingMode DecodingMode // caCert, serverCert and serverKey are for storing encryption info when using TLS/DTLS - caCert []byte - serverCert []byte - serverKey []byte - wg sync.WaitGroup - numOfRecordsReceived uint64 + caCert []byte + serverCert []byte + serverKey []byte + wg sync.WaitGroup + // stats for IPFIX objects received + numOfMessagesReceived uint64 + numOfTemplateSetsReceived uint64 + numOfDataSetsReceived uint64 + numOfTemplateRecordsReceived uint64 + numOfDataRecordsReceived uint64 // clock implementation: enables injecting a fake clock for testing clock clock } @@ -181,10 +185,18 @@ func (cp *CollectingProcess) GetMsgChan() <-chan *entities.Message { return cp.messageChan } +// GetNumRecordsReceived returns the number of data records received by the collector. func (cp *CollectingProcess) GetNumRecordsReceived() int64 { cp.mutex.RLock() defer cp.mutex.RUnlock() - return int64(cp.numOfRecordsReceived) + return int64(cp.numOfDataRecordsReceived) +} + +// GetNumMessagesReceived returns the number of IPFIX messages received by the collector. +func (cp *CollectingProcess) GetNumMessagesReceived() int64 { + cp.mutex.RLock() + defer cp.mutex.RUnlock() + return int64(cp.numOfMessagesReceived) } func (cp *CollectingProcess) GetNumConnToCollector() int64 { @@ -193,17 +205,46 @@ func (cp *CollectingProcess) GetNumConnToCollector() int64 { return int64(len(cp.sessions)) } -func (cp *CollectingProcess) incrementNumRecordsReceived() { +func (cp *CollectingProcess) incrementReceivedStats(numMessages, numTemplateSets, numDataSets, numTemplateRecords, numDataRecords uint64) { cp.mutex.Lock() defer cp.mutex.Unlock() - cp.numOfRecordsReceived = cp.numOfRecordsReceived + 1 + cp.numOfMessagesReceived += numMessages + cp.numOfTemplateSetsReceived += numTemplateSets + cp.numOfDataSetsReceived += numDataSets + cp.numOfTemplateRecordsReceived += numTemplateRecords + cp.numOfDataRecordsReceived += numDataRecords +} + +func decodeMessageHeader(buf *bytes.Buffer, version *uint16, length *uint16, exportTime *uint32, sequenceNum *uint32, obsDomainID *uint32) error { + data := buf.Next(entities.MsgHeaderLength) + if len(data) < entities.MsgHeaderLength { + return fmt.Errorf("buffer too short") + } + bigEndian := binary.BigEndian + *version = bigEndian.Uint16(data) + *length = bigEndian.Uint16(data[2:]) + *exportTime = bigEndian.Uint32(data[4:]) + *sequenceNum = bigEndian.Uint32(data[8:]) + *obsDomainID = bigEndian.Uint32(data[12:]) + return nil +} + +func decodeSetHeader(buf *bytes.Buffer, setID, setLen *uint16) error { + data := buf.Next(entities.SetHeaderLen) + if len(data) < entities.SetHeaderLen { + return fmt.Errorf("buffer too short") + } + bigEndian := binary.BigEndian + *setID = bigEndian.Uint16(data) + *setLen = bigEndian.Uint16(data[2:]) + return nil } func (cp *CollectingProcess) decodePacket(session *transportSession, packetBuffer *bytes.Buffer, exportAddress string) (*entities.Message, error) { var length, version, setID, setLen uint16 - var exportTime, sequencNum, obsDomainID uint32 - if err := util.Decode(packetBuffer, binary.BigEndian, &version, &length, &exportTime, &sequencNum, &obsDomainID, &setID, &setLen); err != nil { - return nil, err + var exportTime, sequenceNum, obsDomainID uint32 + if err := decodeMessageHeader(packetBuffer, &version, &length, &exportTime, &sequenceNum, &obsDomainID); err != nil { + return nil, fmt.Errorf("failed to decode IPFIX message header: %w", err) } if version != uint16(10) { return nil, fmt.Errorf("collector only supports IPFIX (v10); invalid version %d received", version) @@ -213,7 +254,7 @@ func (cp *CollectingProcess) decodePacket(session *transportSession, packetBuffe message.SetVersion(version) message.SetMessageLen(length) message.SetExportTime(exportTime) - message.SetSequenceNum(sequencNum) + message.SetSequenceNum(sequenceNum) message.SetObsDomainID(obsDomainID) // handle IPv6 address which may involve [] @@ -223,6 +264,16 @@ func (cp *CollectingProcess) decodePacket(session *transportSession, packetBuffe exportAddress = strings.Replace(exportAddress, "]", "", -1) message.SetExportAddress(exportAddress) + // At the moment we assume exactly one set per IPFIX message. + if packetBuffer.Len() == 0 { + return nil, fmt.Errorf("empty IPFIX message") + } + if err := decodeSetHeader(packetBuffer, &setID, &setLen); err != nil { + return nil, fmt.Errorf("failed to decode set header: %w", err) + } + + var numTemplateSets, numDataSets, numTemplateRecords, numDataRecords uint64 + var set entities.Set var err error if setID == entities.TemplateSetID { @@ -230,42 +281,93 @@ func (cp *CollectingProcess) decodePacket(session *transportSession, packetBuffe if err != nil { return nil, fmt.Errorf("error in decoding message: %v", err) } + numTemplateSets += 1 + numTemplateRecords += uint64(set.GetNumberOfRecords()) } else { set, err = cp.decodeDataSet(session, packetBuffer, obsDomainID, setID) if err != nil { return nil, fmt.Errorf("error in decoding message: %v", err) } + numDataSets += 1 + numDataRecords += uint64(set.GetNumberOfRecords()) } message.AddSet(set) + cp.incrementReceivedStats(1, numTemplateSets, numDataSets, numTemplateRecords, numDataRecords) + // the thread(s)/client(s) executing the code will get blocked until the message is consumed/read in other goroutines. cp.messageChan <- message - cp.incrementNumRecordsReceived() return message, nil } +func decodeTemplateRecordHeader(buf *bytes.Buffer, templateID, fieldCount *uint16) error { + data := buf.Next(entities.TemplateRecordHeaderLength) + if len(data) < entities.TemplateRecordHeaderLength { + return fmt.Errorf("buffer too short") + } + bigEndian := binary.BigEndian + *templateID = bigEndian.Uint16(data) + *fieldCount = bigEndian.Uint16(data[2:]) + return nil +} + +func decodeTemplateRecordField(buf *bytes.Buffer, elementID, elementLength *uint16, enterpriseID *uint32) error { + data := buf.Next(4) + if len(data) < 4 { + return fmt.Errorf("buffer too short") + } + bigEndian := binary.BigEndian + *elementID = bigEndian.Uint16(data) + *elementLength = bigEndian.Uint16(data[2:]) + // check whether enterprise ID is 0 or not + isNonIANARegistry := (*elementID & 0x8000) > 0 + if isNonIANARegistry { + /* + Encoding format for Enterprise-Specific Information Elements: + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + |1| Information element id. = 15 | Field Length = 4 (16 bits) | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Enterprise number (32 bits) | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + 1: 1 bit + Information element id: 15 bits + Field Length: 16 bits + Enterprise ID: 32 bits + (Reference: https://tools.ietf.org/html/rfc7011#appendix-A.2.2) + */ + data = buf.Next(4) + if len(data) < 4 { + return fmt.Errorf("buffer too short") + } + *enterpriseID = bigEndian.Uint32(data) + // clear enterprise bit + *elementID &= 0x7fff + } else { + *enterpriseID = registry.IANAEnterpriseID + } + return nil +} + func (cp *CollectingProcess) decodeTemplateSet(session *transportSession, templateBuffer *bytes.Buffer, obsDomainID uint32) (entities.Set, error) { + // At the moment we assume exactly one record per template set. var templateID uint16 var fieldCount uint16 - if err := util.Decode(templateBuffer, binary.BigEndian, &templateID, &fieldCount); err != nil { - return nil, err + if err := decodeTemplateRecordHeader(templateBuffer, &templateID, &fieldCount); err != nil { + return nil, fmt.Errorf("failed to decode template record header: %w", err) } decodeField := func() (entities.InfoElementWithValue, error) { var element *entities.InfoElement - var enterpriseID uint32 var elementID uint16 - // check whether enterprise ID is 0 or not - elementid := make([]byte, 2) var elementLength uint16 - err := util.Decode(templateBuffer, binary.BigEndian, &elementid, &elementLength) - if err != nil { - return nil, err + var enterpriseID uint32 + if err := decodeTemplateRecordField(templateBuffer, &elementID, &elementLength, &enterpriseID); err != nil { + return nil, fmt.Errorf("failed to decode template record field: %w", err) } - isNonIANARegistry := elementid[0]>>7 == 1 - if !isNonIANARegistry { - elementID = binary.BigEndian.Uint16(elementid) - enterpriseID = registry.IANAEnterpriseID + var err error + if enterpriseID == registry.IANAEnterpriseID { element, err = registry.GetInfoElementFromID(elementID, enterpriseID) if err != nil { if cp.decodingMode == DecodingModeStrict { @@ -275,27 +377,6 @@ func (cp *CollectingProcess) decodeTemplateSet(session *transportSession, templa element = entities.NewInfoElement("", elementID, entities.OctetArray, enterpriseID, elementLength) } } else { - /* - Encoding format for Enterprise-Specific Information Elements: - 0 1 2 3 - 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - |1| Information element id. = 15 | Field Length = 4 (16 bits) | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | Enterprise number (32 bits) | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - 1: 1 bit - Information element id: 15 bits - Field Length: 16 bits - Enterprise ID: 32 bits - (Reference: https://tools.ietf.org/html/rfc7011#appendix-A.2.2) - */ - err = util.Decode(templateBuffer, binary.BigEndian, &enterpriseID) - if err != nil { - return nil, err - } - elementid[0] = elementid[0] ^ 0x80 - elementID = binary.BigEndian.Uint16(elementid) element, err = registry.GetInfoElementFromID(elementID, enterpriseID) if err != nil { if cp.decodingMode == DecodingModeStrict { @@ -358,7 +439,11 @@ func (cp *CollectingProcess) decodeDataSet(session *transportSession, dataBuffer for _, ie := range template { var length int if ie.Len == entities.VariableLength { // string / octet array - length = getFieldLength(dataBuffer) + var err error + length, err = getFieldLength(dataBuffer) + if err != nil { + return nil, fmt.Errorf("failed to read variable field length: %w", err) + } } else { length = int(ie.Len) } @@ -393,22 +478,22 @@ func getMessageLength(reader *bufio.Reader) (int, error) { if err != nil { return 0, err } - var msgLen uint16 - err = util.Decode(bytes.NewBuffer(partialHeader[2:]), binary.BigEndian, &msgLen) - if err != nil { - return 0, fmt.Errorf("cannot decode message: %w", err) - } - return int(msgLen), nil + return int(binary.BigEndian.Uint16(partialHeader[2:])), nil } // getFieldLength returns string field length for data record // (encoding reference: https://tools.ietf.org/html/rfc7011#appendix-A.5) -func getFieldLength(dataBuffer *bytes.Buffer) int { - oneByte, _ := dataBuffer.ReadByte() +func getFieldLength(dataBuffer *bytes.Buffer) (int, error) { + oneByte, err := dataBuffer.ReadByte() + if err != nil { + return 0, err + } if oneByte < 255 { // string length is less than 255 - return int(oneByte) + return int(oneByte), nil + } + twoBytes := dataBuffer.Next(2) + if len(twoBytes) < 2 { + return 0, fmt.Errorf("buffer too short") } - var lengthTwoBytes uint16 - util.Decode(dataBuffer, binary.BigEndian, &lengthTwoBytes) - return int(lengthTwoBytes) + return int(binary.BigEndian.Uint16(twoBytes)), nil } diff --git a/vendor/github.com/vmware/go-ipfix/pkg/collector/tcp.go b/vendor/github.com/vmware/go-ipfix/pkg/collector/tcp.go index d0a98e7df5f0bc60d9961e928ae389c9d5d757bd..a5ad0cf44128fb780e7fd92125e00748074c8ff3 100644 --- a/vendor/github.com/vmware/go-ipfix/pkg/collector/tcp.go +++ b/vendor/github.com/vmware/go-ipfix/pkg/collector/tcp.go @@ -87,6 +87,7 @@ func (cp *CollectingProcess) handleTCPClient(conn net.Conn) { go func() { defer cp.wg.Done() defer close(doneCh) + var b bytes.Buffer for { length, err := getMessageLength(reader) if errors.Is(err, io.EOF) { @@ -97,13 +98,21 @@ func (cp *CollectingProcess) handleTCPClient(conn net.Conn) { klog.ErrorS(err, "Error when retrieving message length") return } - buff := make([]byte, length) + // Make sure we have enough capacity for the message. + b.Grow(length) + // The buff slice is guaranteed to have a capacity >= length, and will have + // a length of 0. + buff := b.AvailableBuffer() + // Increase the length of buff to fit the message. Note that slices can be + // resliced up to their capacity. + buff = buff[:length] _, err = io.ReadFull(reader, buff) if err != nil { klog.ErrorS(err, "Error when reading the message") return } - message, err := cp.decodePacket(session, bytes.NewBuffer(buff), address) + b.Write(buff) + message, err := cp.decodePacket(session, &b, address) if err != nil { // This can be an invalid template record, or invalid data record. // We close the connection, which is the best way to let the client @@ -113,6 +122,7 @@ func (cp *CollectingProcess) handleTCPClient(conn net.Conn) { } klog.V(4).InfoS("Processed message from exporter", "observationDomainID", message.GetObsDomainID(), "setType", message.GetSet().GetSetType(), "numRecords", message.GetSet().GetNumberOfRecords()) + b.Reset() } }() select { diff --git a/vendor/github.com/vmware/go-ipfix/pkg/entities/ie.go b/vendor/github.com/vmware/go-ipfix/pkg/entities/ie.go index 3b3190bd26742a5c9181062b38f51ff2e32bd619..07b4da62060894fdfbdcf75b41683c2b6bb69319 100644 --- a/vendor/github.com/vmware/go-ipfix/pkg/entities/ie.go +++ b/vendor/github.com/vmware/go-ipfix/pkg/entities/ie.go @@ -488,8 +488,8 @@ func EncodeToIEDataType(dataType IEDataType, val interface{}) ([]byte, error) { if !ok { return nil, fmt.Errorf("val argument %v is not of type net.IP for this element", val) } - if ipv4Add := v.To4(); ipv4Add != nil { - return ipv4Add, nil + if ipv4Addr := v.To4(); ipv4Addr != nil { + return ipv4Addr, nil } else { return nil, fmt.Errorf("provided IP %v does not belong to IPv4 address family", v) } @@ -499,8 +499,8 @@ func EncodeToIEDataType(dataType IEDataType, val interface{}) ([]byte, error) { if !ok { return nil, fmt.Errorf("val argument %v is not of type net.IP for this element", val) } - if ipv6Add := v.To16(); ipv6Add != nil { - return ipv6Add, nil + if ipv6Addr := v.To16(); ipv6Addr != nil { + return ipv6Addr, nil } else { return nil, fmt.Errorf("provided IPv6 address %v is not of correct length", v) } @@ -590,14 +590,14 @@ func encodeInfoElementValueToBuff(element InfoElementWithValue, buffer []byte, i case MacAddress: copy(buffer[index:], element.GetMacAddressValue()) case Ipv4Address: - if ipv4Add := element.GetIPAddressValue().To4(); ipv4Add != nil { - copy(buffer[index:], ipv4Add) + if ipv4Addr := element.GetIPAddressValue().To4(); ipv4Addr != nil { + copy(buffer[index:], ipv4Addr) } else { return fmt.Errorf("provided IP %v does not belong to IPv4 address family", element.GetIPAddressValue()) } case Ipv6Address: - if ipv6Add := element.GetIPAddressValue().To16(); ipv6Add != nil { - copy(buffer[index:], ipv6Add) + if ipv6Addr := element.GetIPAddressValue().To16(); ipv6Addr != nil { + copy(buffer[index:], ipv6Addr) } else { return fmt.Errorf("provided IPv6 address %v is not of correct length", element.GetIPAddressValue()) } @@ -620,3 +620,94 @@ func encodeInfoElementValueToBuff(element InfoElementWithValue, buffer []byte, i } return nil } + +// appendInfoElementValueToBuffer appends the encoded element value to the provided buffer. +func appendInfoElementValueToBuffer(element InfoElementWithValue, buffer []byte) ([]byte, error) { + switch element.GetDataType() { + case OctetArray: + v := element.GetOctetArrayValue() + ieLen := element.GetInfoElement().Len + if ieLen < VariableLength { + // fixed length case + if len(v) != int(ieLen) { + return nil, fmt.Errorf("invalid value for fixed-length octet array: length mismatch") + } + buffer = append(buffer, v...) + } else if len(v) < 255 { + buffer = append(buffer, byte(len(v))) + buffer = append(buffer, v...) + } else if len(v) <= math.MaxUint16 { + buffer = append(buffer, byte(255)) + buffer = binary.BigEndian.AppendUint16(buffer, uint16(len(v))) + buffer = append(buffer, v...) + } else { + return nil, fmt.Errorf("provided OctetArray value is too long and cannot be encoded: len=%d, maxlen=%d", len(v), math.MaxUint16) + } + case Unsigned8: + buffer = append(buffer, element.GetUnsigned8Value()) + case Unsigned16: + buffer = binary.BigEndian.AppendUint16(buffer, element.GetUnsigned16Value()) + case Unsigned32: + buffer = binary.BigEndian.AppendUint32(buffer, element.GetUnsigned32Value()) + case Unsigned64: + buffer = binary.BigEndian.AppendUint64(buffer, element.GetUnsigned64Value()) + case Signed8: + buffer = append(buffer, byte(element.GetSigned8Value())) + case Signed16: + buffer = binary.BigEndian.AppendUint16(buffer, uint16(element.GetSigned16Value())) + case Signed32: + buffer = binary.BigEndian.AppendUint32(buffer, uint32(element.GetSigned32Value())) + case Signed64: + buffer = binary.BigEndian.AppendUint64(buffer, uint64(element.GetSigned64Value())) + case Float32: + buffer = binary.BigEndian.AppendUint32(buffer, math.Float32bits(element.GetFloat32Value())) + case Float64: + buffer = binary.BigEndian.AppendUint64(buffer, math.Float64bits(element.GetFloat64Value())) + case Boolean: + // Following boolean spec from RFC7011 + indicator := byte(1) + if !element.GetBooleanValue() { + indicator = byte(2) + } + buffer = append(buffer, indicator) + case DateTimeSeconds: + buffer = binary.BigEndian.AppendUint32(buffer, element.GetUnsigned32Value()) + case DateTimeMilliseconds: + buffer = binary.BigEndian.AppendUint64(buffer, element.GetUnsigned64Value()) + // Currently only supporting seconds and milliseconds + case DateTimeMicroseconds, DateTimeNanoseconds: + // TODO: RFC 7011 has extra spec for these data types. Need to follow that + return nil, fmt.Errorf("API does not support micro and nano seconds types yet") + case MacAddress: + buffer = append(buffer, element.GetMacAddressValue()...) + case Ipv4Address: + if ipv4Add := element.GetIPAddressValue().To4(); ipv4Add != nil { + buffer = append(buffer, ipv4Add...) + } else { + return nil, fmt.Errorf("provided IP %v does not belong to IPv4 address family", element.GetIPAddressValue()) + } + case Ipv6Address: + if ipv6Add := element.GetIPAddressValue().To16(); ipv6Add != nil { + buffer = append(buffer, ipv6Add...) + } else { + return nil, fmt.Errorf("provided IPv6 address %v is not of correct length", element.GetIPAddressValue()) + } + case String: + v := element.GetStringValue() + if len(v) < 255 { + buffer = append(buffer, byte(len(v))) + // See https://pkg.go.dev/builtin#append + // As a special case, it is legal to append a string to a byte slice + buffer = append(buffer, v...) + } else if len(v) <= math.MaxUint16 { + buffer = append(buffer, byte(255)) // marker byte for long strings + buffer = binary.BigEndian.AppendUint16(buffer, uint16(len(v))) + buffer = append(buffer, v...) + } else { + return nil, fmt.Errorf("provided String value is too long and cannot be encoded: len=%d, maxlen=%d", len(v), math.MaxUint16) + } + default: + return nil, fmt.Errorf("API supports only valid information elements with datatypes given in RFC7011") + } + return buffer, nil +} diff --git a/vendor/github.com/vmware/go-ipfix/pkg/entities/message.go b/vendor/github.com/vmware/go-ipfix/pkg/entities/message.go index 8db7adc20ec9ad665887291c1d490d44367efb2b..0899021b4c7df5da5a2b425d08417c99764f8ced 100644 --- a/vendor/github.com/vmware/go-ipfix/pkg/entities/message.go +++ b/vendor/github.com/vmware/go-ipfix/pkg/entities/message.go @@ -123,6 +123,5 @@ func (m *Message) GetMsgHeader() []byte { } func (m *Message) ResetMsgHeader() { - m.msgHeader = nil - m.msgHeader = make([]byte, MsgHeaderLength) + clear(m.msgHeader) } diff --git a/vendor/github.com/vmware/go-ipfix/pkg/entities/record.go b/vendor/github.com/vmware/go-ipfix/pkg/entities/record.go index e823d1ab87054109dc6dfa8fc8044c77c47a682a..a3a68ab9729e00a2e4b4e71c82a2ac24538f4a87 100644 --- a/vendor/github.com/vmware/go-ipfix/pkg/entities/record.go +++ b/vendor/github.com/vmware/go-ipfix/pkg/entities/record.go @@ -17,8 +17,6 @@ package entities import ( "encoding/binary" "fmt" - - "k8s.io/klog/v2" ) //go:generate mockgen -copyright_file ../../license_templates/license_header.raw.txt -destination=testing/mock_record.go -package=testing github.com/vmware/go-ipfix/pkg/entities Record @@ -29,11 +27,14 @@ import ( // To begin with, we will have local buffer in record. // Have an interface and expose functions to user. +const TemplateRecordHeaderLength = 4 + type Record interface { PrepareRecord() error AddInfoElement(element InfoElementWithValue) error // TODO: Functions for multiple elements as well. - GetBuffer() []byte + GetBuffer() ([]byte, error) + AppendToBuffer(buffer []byte) ([]byte, error) GetTemplateID() uint16 GetFieldCount() uint16 GetOrderedElementList() []InfoElementWithValue @@ -51,7 +52,6 @@ type baseRecord struct { orderedElementList []InfoElementWithValue isDecoding bool len int - Record } type dataRecord struct { @@ -203,20 +203,31 @@ func (d *dataRecord) PrepareRecord() error { return nil } -func (d *dataRecord) GetBuffer() []byte { +func (d *dataRecord) GetBuffer() ([]byte, error) { if len(d.buffer) == d.len || d.isDecoding { - return d.buffer + return d.buffer, nil } d.buffer = make([]byte, d.len) index := 0 for _, element := range d.orderedElementList { - err := encodeInfoElementValueToBuff(element, d.buffer, index) - if err != nil { - klog.Error(err) + if err := encodeInfoElementValueToBuff(element, d.buffer, index); err != nil { + return nil, err } index += element.GetLength() } - return d.buffer + return d.buffer, nil +} + +// Callers should ensure that the provided slice has enough capacity (e.g., by calling +// GetRecordLength), in order to avoid memory allocations. +func (d *dataRecord) AppendToBuffer(buffer []byte) ([]byte, error) { + var err error + for _, element := range d.orderedElementList { + if buffer, err = appendInfoElementValueToBuffer(element, buffer); err != nil { + return nil, err + } + } + return buffer, nil } func (d *dataRecord) GetRecordLength() int { @@ -236,6 +247,11 @@ func (d *dataRecord) AddInfoElement(element InfoElementWithValue) error { return nil } +// This method is only meaningful for template records. +func (d *dataRecord) GetMinDataRecordLen() uint16 { + return 0 +} + func (d *dataRecord) GetRecordType() ContentType { return Data } @@ -281,8 +297,12 @@ func (t *templateRecord) AddInfoElement(element InfoElementWithValue) error { return nil } -func (t *templateRecord) GetBuffer() []byte { - return t.buffer +func (t *templateRecord) GetBuffer() ([]byte, error) { + return t.buffer, nil +} + +func (t *templateRecord) AppendToBuffer(buffer []byte) ([]byte, error) { + return append(buffer, t.buffer...), nil } func (t *templateRecord) GetRecordLength() int { diff --git a/vendor/github.com/vmware/go-ipfix/pkg/exporter/buffered.go b/vendor/github.com/vmware/go-ipfix/pkg/exporter/buffered.go new file mode 100644 index 0000000000000000000000000000000000000000..f4272531f4f66818c751d698babae89cf8a81dca --- /dev/null +++ b/vendor/github.com/vmware/go-ipfix/pkg/exporter/buffered.go @@ -0,0 +1,199 @@ +// Copyright 2025 VMware, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exporter + +import ( + "bytes" + "encoding/binary" + "fmt" + "time" + + "github.com/vmware/go-ipfix/pkg/entities" +) + +// BufferedIPFIXExporter wraps an ExportingProcess instance and supports buffering data records +// before sending them. BufferedIPFIXExporter is not safe for usage by multiple goroutines. There +// should be a single BufferedIPFIXExporter created for a given ExportingProcess. +// While the BufferedIPFIXExporter supports sending JSON records, in that case it mostly acts as a +// passthrough to the underlying ExportingProcess (no actual buffering). +type BufferedIPFIXExporter struct { + ep *ExportingProcess + templateSet entities.Set + // maps templateID to the corresponding buffer for data records. Note that entries are never + // deleted from this map. + messages map[uint16]*bufferedMessage + jsonBuffer bytes.Buffer +} + +type bufferedMessage struct { + ep *ExportingProcess + templateID uint16 + buffer []byte + numRecords int +} + +func newBufferedMessage(ep *ExportingProcess, templateID uint16) *bufferedMessage { + m := &bufferedMessage{ + ep: ep, + templateID: templateID, + buffer: make([]byte, 0, ep.maxMsgSize), + numRecords: 0, + } + m.reset() + return m +} + +// NewBufferedIPFIXExporter creates a BufferedIPFIXExporter . +func NewBufferedIPFIXExporter(ep *ExportingProcess) *BufferedIPFIXExporter { + bufferedExporter := &BufferedIPFIXExporter{ + ep: ep, + templateSet: entities.NewSet(false), + } + if !ep.sendJSONRecord { + bufferedExporter.messages = make(map[uint16]*bufferedMessage) + } + return bufferedExporter +} + +func (e *BufferedIPFIXExporter) addTemplateRecord(record entities.Record) error { + templateID := record.GetTemplateID() + // If the templateID already exists, we should send corresponding buffered data records + // immediately, as they may not match the new template definition. + if m, ok := e.messages[templateID]; ok { + if err := m.flush(); err != nil { + return fmt.Errorf("error when flushing buffered records for templateID %d: %w", templateID, err) + } + } + e.templateSet.ResetSet() + if err := e.templateSet.PrepareSet(entities.Template, entities.TemplateSetID); err != nil { + return err + } + if err := e.templateSet.AddRecordV3(record); err != nil { + return err + } + // It's important to use the method from ExporterProcess, for template management purposes. + _, err := e.ep.SendSet(e.templateSet) + return err +} + +func (e *BufferedIPFIXExporter) addDataRecord(record entities.Record) error { + templateID := record.GetTemplateID() + m, ok := e.messages[templateID] + if ok { + return m.addRecord(record) + } + m = newBufferedMessage(e.ep, templateID) + e.messages[templateID] = m + return m.addRecord(record) +} + +// AddRecord adds a record to be sent to the destination collector. If it is a template record, then +// it will be sent to the collector right away. If it is a data record, it will be added to the +// buffer. If adding the record to the buffer would cause the buffer length to exceed the max +// message size, the buffer is flushed first. Note that because data records are serialized to the +// buffer immediately, it is safe for the provided record to be mutated as soon as this function +// returns. +func (e *BufferedIPFIXExporter) AddRecord(record entities.Record) error { + recordType := record.GetRecordType() + if recordType == entities.Template { + // We don't send templates for JSON records + if e.ep.sendJSONRecord { + return nil + } + return e.addTemplateRecord(record) + } else if recordType == entities.Data { + if e.ep.sendJSONRecord { + _, _, err := e.ep.createAndSendJSONRecords([]entities.Record{record}, &e.jsonBuffer) + return err + } + return e.addDataRecord(record) + } + return fmt.Errorf("invalid record type: %v", recordType) +} + +// Flush sends all buffered data records immediately. +func (e *BufferedIPFIXExporter) Flush() error { + if e.ep.sendJSONRecord { + return nil + } + for _, m := range e.messages { + if err := m.flush(); err != nil { + return err + } + } + return nil +} + +func (m *bufferedMessage) addRecord(record entities.Record) error { + recordLength := record.GetRecordLength() + if len(m.buffer)+recordLength > m.ep.maxMsgSize { + if m.numRecords == 0 { + return fmt.Errorf("record is too big to fit into single message") + } + if _, err := m.sendMessage(); err != nil { + return err + } + } + var err error + m.buffer, err = record.AppendToBuffer(m.buffer) + if err != nil { + return err + } + m.numRecords += 1 + return nil +} + +func (m *bufferedMessage) flush() error { + if m.numRecords == 0 { + return nil + } + _, err := m.sendMessage() + return err +} + +func (m *bufferedMessage) reset() { + const headerLength = entities.MsgHeaderLength + entities.SetHeaderLen + m.buffer = m.buffer[:headerLength] + m.numRecords = 0 +} + +func encodeMessageHeader(buf []byte, version, length uint16, exportTime, seqNumber, obsDomainID uint32) { + bigEndian := binary.BigEndian + bigEndian.PutUint16(buf, version) + bigEndian.PutUint16(buf[2:], length) + bigEndian.PutUint32(buf[4:], exportTime) + bigEndian.PutUint32(buf[8:], seqNumber) + bigEndian.PutUint32(buf[12:], obsDomainID) +} + +func encodeSetHeader(buf []byte, templateID, length uint16) { + bigEndian := binary.BigEndian + bigEndian.PutUint16(buf, templateID) + bigEndian.PutUint16(buf[2:], length) +} + +func (m *bufferedMessage) sendMessage() (int, error) { + now := time.Now() + m.ep.seqNumber = m.ep.seqNumber + uint32(m.numRecords) + msgLen := len(m.buffer) + encodeMessageHeader(m.buffer, 10, uint16(msgLen), uint32(now.Unix()), m.ep.seqNumber, m.ep.obsDomainID) + encodeSetHeader(m.buffer[entities.MsgHeaderLength:], m.templateID, uint16(msgLen-entities.MsgHeaderLength)) + n, err := m.ep.connToCollector.Write(m.buffer) + if err != nil { + return n, err + } + m.reset() + return n, nil +} diff --git a/vendor/github.com/vmware/go-ipfix/pkg/exporter/msg.go b/vendor/github.com/vmware/go-ipfix/pkg/exporter/msg.go index 6f5bdd2448238be329ad79518beb00f4b66614f9..ea0d066b70e73173965da6964dbfbfc191d967e8 100644 --- a/vendor/github.com/vmware/go-ipfix/pkg/exporter/msg.go +++ b/vendor/github.com/vmware/go-ipfix/pkg/exporter/msg.go @@ -41,7 +41,7 @@ func WriteIPFIXMsgToBuffer(set entities.Set, obsDomainID uint32, seqNumber uint3 msgLen := entities.MsgHeaderLength + set.GetSetLength() if msgLen > entities.MaxSocketMsgSize { // This is applicable for both TCP and UDP sockets. - return msgLen, fmt.Errorf("message size exceeds max socket buffer size") + return 0, fmt.Errorf("message size exceeds max socket buffer size") } // Set the fields in the message header. @@ -56,9 +56,17 @@ func WriteIPFIXMsgToBuffer(set entities.Set, obsDomainID uint32, seqNumber uint3 buf.Grow(msgLen) buf.Write(msg.GetMsgHeader()) buf.Write(set.GetHeaderBuffer()) + b := buf.AvailableBuffer() for _, record := range set.GetRecords() { - buf.Write(record.GetBuffer()) + var err error + // Calls to AppendToBuffer won't cause memory allocations as the + // buffer is already guaranteed to have enough capacity. + b, err = record.AppendToBuffer(b) + if err != nil { + return 0, err + } } + buf.Write(b) return msgLen, nil } diff --git a/vendor/github.com/vmware/go-ipfix/pkg/exporter/process.go b/vendor/github.com/vmware/go-ipfix/pkg/exporter/process.go index 82638d55e06c595263347a805ea50c45a3d2d33c..958df804658dfdb846f2f5e572311800849e2275 100644 --- a/vendor/github.com/vmware/go-ipfix/pkg/exporter/process.go +++ b/vendor/github.com/vmware/go-ipfix/pkg/exporter/process.go @@ -48,8 +48,7 @@ type templateValue struct { // 3. Supports only TCP and UDP; one session at a time. SCTP is not supported. // 4. UDP needs to send PMTU size packets as per RFC7011. In order to guarantee // this, maxMsgSize should be set correctly. maxMsgSize is the maximum -// payload (IPFIX message) size, not the maximum packet size. You need to -// compute maxMsgSize based on the desired maximum packet size. If +// payload (IPFIX message) size, not the maximum packet size. If // maxMsgSize is not set correctly, the message may be fragmented. type ExportingProcess struct { connToCollector net.Conn @@ -94,17 +93,68 @@ type ExporterInput struct { // JSONBufferLen is recommended for sending json records. If not given a // valid value, we use a default of 5000B JSONBufferLen int - // For UDP, this should be set by taking into account the PMTU and - // header sizes. - MaxMsgSize int + // MaxMsgSize can be used to provide a custom maximum IPFIX message + // size. If it is omitted, we will use an appropriate default based on + // the configured protocol. For UDP, we want to avoid fragmentation, so + // the MaxMsgSize should be set by taking into account the PMTU and + // header sizes. The recommended approach is to keep MaxMsgSize unset + // and provide the correct PMTU value. + MaxMsgSize int + // PathMTU is used to calculate the maximum message size when the + // protocol is UDP. It is ignored for TCP. If both MaxMsgSize and + // PathMTU are set, and MaxMsgSize is incompatible with the provided + // PathMTU, exporter initialization will fail. + PathMTU int CheckConnInterval time.Duration } +func calculateMaxMsgSize(proto string, requestedSize int, pathMTU int, isIPv6 bool) (int, error) { + if requestedSize > 0 && (requestedSize < entities.MinSupportedMsgSize || requestedSize > entities.MaxSocketMsgSize) { + return 0, fmt.Errorf("requested message size should be between %d and %d", entities.MinSupportedMsgSize, entities.MaxSocketMsgSize) + } + if proto == "tcp" { + if requestedSize == 0 { + return entities.MaxSocketMsgSize, nil + } else { + return requestedSize, nil + } + } + // UDP protocol + if pathMTU == 0 { + if requestedSize == 0 { + klog.InfoS("Neither max IPFIX message size nor PMTU were provided, defaulting to min message size", "messageSize", entities.MinSupportedMsgSize) + return entities.MinSupportedMsgSize, nil + } + klog.InfoS("PMTU was not provided, configured message size may cause fragmentation", "messageSize", requestedSize) + return requestedSize, nil + } + // 20-byte IPv4, 8-byte UDP header + mtuDeduction := 28 + if isIPv6 { + // An extra 20 bytes for IPv6 + mtuDeduction += 20 + } + maxMsgSize := pathMTU - mtuDeduction + if maxMsgSize < entities.MinSupportedMsgSize { + return 0, fmt.Errorf("provided PMTU %d is not large enough to accommodate min message size %d", pathMTU, entities.MinSupportedMsgSize) + } + if requestedSize > maxMsgSize { + return 0, fmt.Errorf("requested message size %d exceeds max message size %d calculated from provided PMTU", requestedSize, maxMsgSize) + } + if requestedSize > 0 { + return requestedSize, nil + } + return maxMsgSize, nil +} + // InitExportingProcess takes in collector address(net.Addr format), obsID(observation ID) // and tempRefTimeout(template refresh timeout). tempRefTimeout is applicable only // for collectors listening over UDP; unit is seconds. For TCP, you can pass any // value and it will be ignored. For UDP, if 0 is passed, 600s is used as the default. func InitExportingProcess(input ExporterInput) (*ExportingProcess, error) { + if input.CollectorProtocol != "tcp" && input.CollectorProtocol != "udp" { + return nil, fmt.Errorf("unsupported collector protocol: %s", input.CollectorProtocol) + } var conn net.Conn var err error if input.TLSClientConfig != nil { @@ -151,6 +201,15 @@ func InitExportingProcess(input ExporterInput) (*ExportingProcess, error) { return nil, err } } + var isIPv6 bool + switch addr := conn.RemoteAddr().(type) { + case *net.TCPAddr: + isIPv6 = addr.IP.To4() == nil + case *net.UDPAddr: + isIPv6 = addr.IP.To4() == nil + default: + return nil, fmt.Errorf("unsupported net.Addr type %T", addr) + } expProc := &ExportingProcess{ connToCollector: conn, obsDomainID: input.ObservationDomainID, @@ -169,13 +228,12 @@ func InitExportingProcess(input ExporterInput) (*ExportingProcess, error) { expProc.jsonBufferLen = input.JSONBufferLen } } else { - if input.MaxMsgSize == 0 { - expProc.maxMsgSize = entities.MaxSocketMsgSize - } else if input.MaxMsgSize < entities.MinSupportedMsgSize { - return nil, fmt.Errorf("maxMsgSize cannot be less than 512B") - } else { - expProc.maxMsgSize = input.MaxMsgSize + maxMsgSize, err := calculateMaxMsgSize(input.CollectorProtocol, input.MaxMsgSize, input.PathMTU, isIPv6) + if err != nil { + return nil, err } + klog.InfoS("Calculated max IPFIX message size", "size", maxMsgSize) + expProc.maxMsgSize = maxMsgSize } // Start a goroutine to check whether the collector has already closed the TCP connection. @@ -549,7 +607,8 @@ func (ep *ExportingProcess) dataRecSanityCheck(rec entities.Record) error { if rec.GetFieldCount() != uint16(len(ep.templatesMap[templateID].elements)) { return fmt.Errorf("process: field count of data does not match templateID %d", templateID) } - if len(rec.GetBuffer()) < int(ep.templatesMap[templateID].minDataRecLen) { + + if rec.GetRecordLength() < int(ep.templatesMap[templateID].minDataRecLen) { return fmt.Errorf("process: Data Record does not pass the min required length (%d) check for template ID %d", ep.templatesMap[templateID].minDataRecLen, templateID) } return nil diff --git a/vendor/github.com/vmware/go-ipfix/pkg/util/util.go b/vendor/github.com/vmware/go-ipfix/pkg/util/util.go deleted file mode 100644 index 88b5701ce06a726505cd8dd7ee57041ba416b187..0000000000000000000000000000000000000000 --- a/vendor/github.com/vmware/go-ipfix/pkg/util/util.go +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright 2020 VMware, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package util - -import ( - "encoding/binary" - "fmt" - "io" -) - -// Decode decodes data from io reader to specified interfaces -/* Example: -var num1 uint16 -var num2 uint32 -// read the buffer 2 bytes and 4 bytes sequentially -// decode and output corresponding uint16 and uint32 number into num1 and num2 respectively -err := Decode(buffer, &num1, &num2) -*/ -func Decode(buffer io.Reader, byteOrder binary.ByteOrder, outputs ...interface{}) error { - var err error - for _, out := range outputs { - err = binary.Read(buffer, byteOrder, out) - if err != nil { - return fmt.Errorf("error in decoding data: %v", err) - } - } - return nil -} diff --git a/vendor/modules.txt b/vendor/modules.txt index 9c59aa9f1f1b50f4d478f70757e2d693e5ee6b2d..a699520ce71c2a297beeaa2903e7c93035e7c823 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -373,7 +373,7 @@ github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types # github.com/paulbellamy/ratecounter v0.2.0 ## explicit github.com/paulbellamy/ratecounter -# github.com/pierrec/lz4/v4 v4.1.21 +# github.com/pierrec/lz4/v4 v4.1.22 ## explicit; go 1.14 github.com/pierrec/lz4/v4 github.com/pierrec/lz4/v4/internal/lz4block @@ -518,7 +518,7 @@ github.com/sirupsen/logrus # github.com/spf13/cobra v1.8.1 ## explicit; go 1.15 github.com/spf13/cobra -# github.com/spf13/pflag v1.0.5 +# github.com/spf13/pflag v1.0.6 ## explicit; go 1.12 github.com/spf13/pflag # github.com/stretchr/objx v0.5.2 @@ -550,13 +550,12 @@ github.com/vladimirvivien/gexe/net github.com/vladimirvivien/gexe/prog github.com/vladimirvivien/gexe/str github.com/vladimirvivien/gexe/vars -# github.com/vmware/go-ipfix v0.12.0 +# github.com/vmware/go-ipfix v0.13.0 ## explicit; go 1.23.0 github.com/vmware/go-ipfix/pkg/collector github.com/vmware/go-ipfix/pkg/entities github.com/vmware/go-ipfix/pkg/exporter github.com/vmware/go-ipfix/pkg/registry -github.com/vmware/go-ipfix/pkg/util # github.com/x448/float16 v0.8.4 ## explicit; go 1.11 github.com/x448/float16