diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index bec9f3b5e6852a4d2008a099dfda4a6a2b9d968b..cfd2c5e768c40ee0060a1f8b621871243661cb58 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -363,7 +363,7 @@ func buildKafkaExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) }, NumberOfRecordsExportedByKafka: m.CreateNumberOfRecordsExportedByKafka(), ExportedRecordsBatchSize: m.CreateKafkaBatchSize(), - ErrCanNotExportToKafka: m.CreateErrorCanNotWriteToKafka(), + Errors: m.GetErrorsCounter(), }).ExportFlows, nil } diff --git a/pkg/exporter/grpc_proto.go b/pkg/exporter/grpc_proto.go index 1e41b003323b9cc78c49f698c54091f5d96ddd8c..ea6bee947693d6780d2b615c2278e15bcd79f6a5 100644 --- a/pkg/exporter/grpc_proto.go +++ b/pkg/exporter/grpc_proto.go @@ -27,7 +27,7 @@ type GRPCProto struct { maxFlowsPerMessage int numberOfRecordsExportedByGRPC prometheus.Counter exportedRecordsBatchSize prometheus.Counter - errExportByGRPC prometheus.Counter + errors *metrics.ErrorCounter } func StartGRPCProto(hostIP string, hostPort int, maxFlowsPerMessage int, m *metrics.Metrics) (*GRPCProto, error) { @@ -42,7 +42,7 @@ func StartGRPCProto(hostIP string, hostPort int, maxFlowsPerMessage int, m *metr maxFlowsPerMessage: maxFlowsPerMessage, numberOfRecordsExportedByGRPC: m.CreateNumberOfRecordsExportedByGRPC(), exportedRecordsBatchSize: m.CreateGRPCBatchSize(), - errExportByGRPC: m.CreateErrorCanNotWriteToGRPC(), + errors: m.GetErrorsCounter(), }, nil } @@ -56,6 +56,7 @@ func (g *GRPCProto) ExportFlows(input <-chan []*flow.Record) { for _, pbRecords := range flowsToPB(inputRecords, g.maxFlowsPerMessage) { log.Debugf("sending %d records", len(pbRecords.Entries)) if _, err := g.clientConn.Client().Send(context.TODO(), pbRecords); err != nil { + g.errors.WithValues("CantWriteMessage", "grpc").Inc() log.WithError(err).Error("couldn't send flow records to collector") } g.numberOfRecordsExportedByGRPC.Add(float64(len(pbRecords.Entries))) @@ -63,6 +64,6 @@ func (g *GRPCProto) ExportFlows(input <-chan []*flow.Record) { } if err := g.clientConn.Close(); err != nil { log.WithError(err).Warn("couldn't close flow export client") - g.errExportByGRPC.Inc() + g.errors.WithValues("CantCloseClient", "grpc").Inc() } } diff --git a/pkg/exporter/kafka_proto.go b/pkg/exporter/kafka_proto.go index c24dcfcc0a54a7e1a19a71ccabdf06b4b019e600..97ef4a99000d37969f9622e83625de5acf1eeaec 100644 --- a/pkg/exporter/kafka_proto.go +++ b/pkg/exporter/kafka_proto.go @@ -4,6 +4,7 @@ import ( "context" "github.com/netobserv/netobserv-ebpf-agent/pkg/flow" + "github.com/netobserv/netobserv-ebpf-agent/pkg/metrics" "github.com/prometheus/client_golang/prometheus" kafkago "github.com/segmentio/kafka-go" @@ -23,7 +24,7 @@ type KafkaProto struct { Writer kafkaWriter NumberOfRecordsExportedByKafka prometheus.Counter ExportedRecordsBatchSize prometheus.Counter - ErrCanNotExportToKafka prometheus.Counter + Errors *metrics.ErrorCounter } func (kp *KafkaProto) ExportFlows(input <-chan []*flow.Record) { @@ -52,6 +53,7 @@ func (kp *KafkaProto) batchAndSubmit(records []*flow.Record) { pbBytes, err := proto.Marshal(flowToPB(record)) if err != nil { klog.WithError(err).Debug("can't encode protobuf message. Ignoring") + kp.Errors.WithValues("CantEncodeMessage", "kafka").Inc() continue } msgs = append(msgs, kafkago.Message{Value: pbBytes, Key: getFlowKey(record)}) @@ -60,7 +62,7 @@ func (kp *KafkaProto) batchAndSubmit(records []*flow.Record) { if err := kp.Writer.WriteMessages(context.TODO(), msgs...); err != nil { klog.WithError(err).Error("can't write messages into Kafka") - kp.ErrCanNotExportToKafka.Inc() + kp.Errors.WithValues("CantWriteMessage", "kafka").Inc() } kp.NumberOfRecordsExportedByKafka.Add(float64(len(records))) } diff --git a/pkg/exporter/kafka_proto_test.go b/pkg/exporter/kafka_proto_test.go index 3bae7a7d0751b35f061051f66c1e15c64e797b91..b53236c3ba89ce24e15672eadd62f36b25755d39 100644 --- a/pkg/exporter/kafka_proto_test.go +++ b/pkg/exporter/kafka_proto_test.go @@ -41,7 +41,7 @@ func TestProtoConversion(t *testing.T) { Writer: &wc, NumberOfRecordsExportedByKafka: m.CreateNumberOfRecordsExportedByKafka(), ExportedRecordsBatchSize: m.CreateKafkaBatchSize(), - ErrCanNotExportToKafka: m.CreateErrorCanNotWriteToKafka(), + Errors: m.GetErrorsCounter(), } input := make(chan []*flow.Record, 11) record := flow.Record{} diff --git a/pkg/flow/tracer_map.go b/pkg/flow/tracer_map.go index 3e0bad377447b0503e6dcb9df3995de55726a118..ebb9956ed1ad8def79af236ef14e371b8c10cf5c 100644 --- a/pkg/flow/tracer_map.go +++ b/pkg/flow/tracer_map.go @@ -29,7 +29,7 @@ type MapTracer struct { hmapEvictionCounter prometheus.Counter numberOfEvictedFlows prometheus.Counter timeSpentinLookupAndDelete prometheus.Histogram - errCanNotDeleteflows prometheus.Counter + errors *metrics.ErrorCounter } type mapFetcher interface { @@ -47,7 +47,7 @@ func NewMapTracer(fetcher mapFetcher, evictionTimeout, staleEntriesEvictTimeout hmapEvictionCounter: m.CreateHashMapCounter(), numberOfEvictedFlows: m.CreateNumberOfEvictedFlows(), timeSpentinLookupAndDelete: m.CreateTimeSpendInLookupAndDelete(), - errCanNotDeleteflows: m.CreateCanNotDeleteFlows(), + errors: m.GetErrorsCounter(), } } @@ -105,7 +105,7 @@ func (m *MapTracer) evictFlows(ctx context.Context, forceGC bool, forwardFlows c var forwardingFlows []*Record laterFlowNs := uint64(0) - flows := m.mapFetcher.LookupAndDeleteMap(m.errCanNotDeleteflows) + flows := m.mapFetcher.LookupAndDeleteMap(m.errors.WithValues("CannotDeleteFlows", "")) elapsed := time.Since(currentTime) for flowKey, flowMetrics := range flows { aggregatedMetrics := m.aggregate(flowMetrics) diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index aa5de60448514669a695b9860a2765abb8c69aff..dfb4f71a7f2e8765dbcc74daecf0e225d98a9d21 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -88,22 +88,13 @@ var ( "Sampling rate seconds", TypeGauge, ) - errCanNotWriteRecordsTotal = defineMetric( - "err_can_not_write_records_total", - "error can not write records total", + errorsCounter = defineMetric( + "errors_total", + "errors counter", TypeCounter, + "error", "exporter", ) - errReadingRingBufferMapTotal = defineMetric( - "err_reading_ring_buffer_map_total", - "Error reading ring buffer map total", - TypeCounter, - ) - errCanNotDeleteFlowEntriesTotal = defineMetric( - "err_can_not_delete_flow_entries_total", - "Error can not delete flow entries total", - TypeCounter, - ) ) func (def *MetricDefinition) mapLabels(labels []string) prometheus.Labels { @@ -155,6 +146,17 @@ func (m *Metrics) NewCounter(def *MetricDefinition, labels ...string) prometheus return c } +func (m *Metrics) NewCounterVec(def *MetricDefinition) *prometheus.CounterVec { + verifyMetricType(def, TypeCounter) + fullName := m.Settings.Prefix + def.Name + c := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: fullName, + Help: def.Help, + }, def.Labels) + m.register(c, fullName) + return c +} + func (m *Metrics) NewGauge(def *MetricDefinition, labels ...string) prometheus.Gauge { verifyMetricType(def, TypeGauge) fullName := m.Settings.Prefix + def.Name @@ -220,18 +222,16 @@ func (m *Metrics) CreateSamplingRate() prometheus.Gauge { return m.NewGauge(&samplingRateSeconds) } -func (m *Metrics) CreateErrorCanNotWriteToGRPC() prometheus.Counter { - return m.NewCounter(&errCanNotWriteRecordsTotal, "grpc") -} - -func (m *Metrics) CreateErrorCanNotWriteToKafka() prometheus.Counter { - return m.NewCounter(&errCanNotWriteRecordsTotal, "kafka") +func (m *Metrics) GetErrorsCounter() *ErrorCounter { + return &ErrorCounter{ + vec: m.NewCounterVec(&errorsCounter), + } } -func (m *Metrics) CreateCanNotReadFromRingBufferMap() prometheus.Counter { - return m.NewCounter(&errReadingRingBufferMapTotal) +type ErrorCounter struct { + vec *prometheus.CounterVec } -func (m *Metrics) CreateCanNotDeleteFlows() prometheus.Counter { - return m.NewCounter(&errCanNotDeleteFlowEntriesTotal) +func (c *ErrorCounter) WithValues(errName, exporter string) prometheus.Counter { + return c.vec.WithLabelValues(errName, exporter) }