Skip to content
Snippets Groups Projects
Unverified Commit b3c02b8e authored by Joel Takvorian's avatar Joel Takvorian Committed by GitHub
Browse files

Share errors counter across components (#278)

- All errors can now be found from metric "errors_total"
- Use error name and exporter as labels
parent cbbdaddf
No related branches found
No related tags found
No related merge requests found
......@@ -363,7 +363,7 @@ func buildKafkaExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error)
},
NumberOfRecordsExportedByKafka: m.CreateNumberOfRecordsExportedByKafka(),
ExportedRecordsBatchSize: m.CreateKafkaBatchSize(),
ErrCanNotExportToKafka: m.CreateErrorCanNotWriteToKafka(),
Errors: m.GetErrorsCounter(),
}).ExportFlows, nil
}
......
......@@ -27,7 +27,7 @@ type GRPCProto struct {
maxFlowsPerMessage int
numberOfRecordsExportedByGRPC prometheus.Counter
exportedRecordsBatchSize prometheus.Counter
errExportByGRPC prometheus.Counter
errors *metrics.ErrorCounter
}
func StartGRPCProto(hostIP string, hostPort int, maxFlowsPerMessage int, m *metrics.Metrics) (*GRPCProto, error) {
......@@ -42,7 +42,7 @@ func StartGRPCProto(hostIP string, hostPort int, maxFlowsPerMessage int, m *metr
maxFlowsPerMessage: maxFlowsPerMessage,
numberOfRecordsExportedByGRPC: m.CreateNumberOfRecordsExportedByGRPC(),
exportedRecordsBatchSize: m.CreateGRPCBatchSize(),
errExportByGRPC: m.CreateErrorCanNotWriteToGRPC(),
errors: m.GetErrorsCounter(),
}, nil
}
......@@ -56,6 +56,7 @@ func (g *GRPCProto) ExportFlows(input <-chan []*flow.Record) {
for _, pbRecords := range flowsToPB(inputRecords, g.maxFlowsPerMessage) {
log.Debugf("sending %d records", len(pbRecords.Entries))
if _, err := g.clientConn.Client().Send(context.TODO(), pbRecords); err != nil {
g.errors.WithValues("CantWriteMessage", "grpc").Inc()
log.WithError(err).Error("couldn't send flow records to collector")
}
g.numberOfRecordsExportedByGRPC.Add(float64(len(pbRecords.Entries)))
......@@ -63,6 +64,6 @@ func (g *GRPCProto) ExportFlows(input <-chan []*flow.Record) {
}
if err := g.clientConn.Close(); err != nil {
log.WithError(err).Warn("couldn't close flow export client")
g.errExportByGRPC.Inc()
g.errors.WithValues("CantCloseClient", "grpc").Inc()
}
}
......@@ -4,6 +4,7 @@ import (
"context"
"github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/metrics"
"github.com/prometheus/client_golang/prometheus"
kafkago "github.com/segmentio/kafka-go"
......@@ -23,7 +24,7 @@ type KafkaProto struct {
Writer kafkaWriter
NumberOfRecordsExportedByKafka prometheus.Counter
ExportedRecordsBatchSize prometheus.Counter
ErrCanNotExportToKafka prometheus.Counter
Errors *metrics.ErrorCounter
}
func (kp *KafkaProto) ExportFlows(input <-chan []*flow.Record) {
......@@ -52,6 +53,7 @@ func (kp *KafkaProto) batchAndSubmit(records []*flow.Record) {
pbBytes, err := proto.Marshal(flowToPB(record))
if err != nil {
klog.WithError(err).Debug("can't encode protobuf message. Ignoring")
kp.Errors.WithValues("CantEncodeMessage", "kafka").Inc()
continue
}
msgs = append(msgs, kafkago.Message{Value: pbBytes, Key: getFlowKey(record)})
......@@ -60,7 +62,7 @@ func (kp *KafkaProto) batchAndSubmit(records []*flow.Record) {
if err := kp.Writer.WriteMessages(context.TODO(), msgs...); err != nil {
klog.WithError(err).Error("can't write messages into Kafka")
kp.ErrCanNotExportToKafka.Inc()
kp.Errors.WithValues("CantWriteMessage", "kafka").Inc()
}
kp.NumberOfRecordsExportedByKafka.Add(float64(len(records)))
}
......
......@@ -41,7 +41,7 @@ func TestProtoConversion(t *testing.T) {
Writer: &wc,
NumberOfRecordsExportedByKafka: m.CreateNumberOfRecordsExportedByKafka(),
ExportedRecordsBatchSize: m.CreateKafkaBatchSize(),
ErrCanNotExportToKafka: m.CreateErrorCanNotWriteToKafka(),
Errors: m.GetErrorsCounter(),
}
input := make(chan []*flow.Record, 11)
record := flow.Record{}
......
......@@ -29,7 +29,7 @@ type MapTracer struct {
hmapEvictionCounter prometheus.Counter
numberOfEvictedFlows prometheus.Counter
timeSpentinLookupAndDelete prometheus.Histogram
errCanNotDeleteflows prometheus.Counter
errors *metrics.ErrorCounter
}
type mapFetcher interface {
......@@ -47,7 +47,7 @@ func NewMapTracer(fetcher mapFetcher, evictionTimeout, staleEntriesEvictTimeout
hmapEvictionCounter: m.CreateHashMapCounter(),
numberOfEvictedFlows: m.CreateNumberOfEvictedFlows(),
timeSpentinLookupAndDelete: m.CreateTimeSpendInLookupAndDelete(),
errCanNotDeleteflows: m.CreateCanNotDeleteFlows(),
errors: m.GetErrorsCounter(),
}
}
......@@ -105,7 +105,7 @@ func (m *MapTracer) evictFlows(ctx context.Context, forceGC bool, forwardFlows c
var forwardingFlows []*Record
laterFlowNs := uint64(0)
flows := m.mapFetcher.LookupAndDeleteMap(m.errCanNotDeleteflows)
flows := m.mapFetcher.LookupAndDeleteMap(m.errors.WithValues("CannotDeleteFlows", ""))
elapsed := time.Since(currentTime)
for flowKey, flowMetrics := range flows {
aggregatedMetrics := m.aggregate(flowMetrics)
......
......@@ -88,22 +88,13 @@ var (
"Sampling rate seconds",
TypeGauge,
)
errCanNotWriteRecordsTotal = defineMetric(
"err_can_not_write_records_total",
"error can not write records total",
errorsCounter = defineMetric(
"errors_total",
"errors counter",
TypeCounter,
"error",
"exporter",
)
errReadingRingBufferMapTotal = defineMetric(
"err_reading_ring_buffer_map_total",
"Error reading ring buffer map total",
TypeCounter,
)
errCanNotDeleteFlowEntriesTotal = defineMetric(
"err_can_not_delete_flow_entries_total",
"Error can not delete flow entries total",
TypeCounter,
)
)
func (def *MetricDefinition) mapLabels(labels []string) prometheus.Labels {
......@@ -155,6 +146,17 @@ func (m *Metrics) NewCounter(def *MetricDefinition, labels ...string) prometheus
return c
}
func (m *Metrics) NewCounterVec(def *MetricDefinition) *prometheus.CounterVec {
verifyMetricType(def, TypeCounter)
fullName := m.Settings.Prefix + def.Name
c := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: fullName,
Help: def.Help,
}, def.Labels)
m.register(c, fullName)
return c
}
func (m *Metrics) NewGauge(def *MetricDefinition, labels ...string) prometheus.Gauge {
verifyMetricType(def, TypeGauge)
fullName := m.Settings.Prefix + def.Name
......@@ -220,18 +222,16 @@ func (m *Metrics) CreateSamplingRate() prometheus.Gauge {
return m.NewGauge(&samplingRateSeconds)
}
func (m *Metrics) CreateErrorCanNotWriteToGRPC() prometheus.Counter {
return m.NewCounter(&errCanNotWriteRecordsTotal, "grpc")
}
func (m *Metrics) CreateErrorCanNotWriteToKafka() prometheus.Counter {
return m.NewCounter(&errCanNotWriteRecordsTotal, "kafka")
func (m *Metrics) GetErrorsCounter() *ErrorCounter {
return &ErrorCounter{
vec: m.NewCounterVec(&errorsCounter),
}
}
func (m *Metrics) CreateCanNotReadFromRingBufferMap() prometheus.Counter {
return m.NewCounter(&errReadingRingBufferMapTotal)
type ErrorCounter struct {
vec *prometheus.CounterVec
}
func (m *Metrics) CreateCanNotDeleteFlows() prometheus.Counter {
return m.NewCounter(&errCanNotDeleteFlowEntriesTotal)
func (c *ErrorCounter) WithValues(errName, exporter string) prometheus.Counter {
return c.vec.WithLabelValues(errName, exporter)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment