diff --git a/Dockerfile b/Dockerfile index 0ef78dc7b5458a55b8850f03f15990ebe30cdc2e..ae57888e59bdada7f1ab1ced3435a50c79b0bb5a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ # Build the manager binary -FROM registry.access.redhat.com/ubi8/go-toolset:1.18.4 as builder +FROM registry.access.redhat.com/ubi9/go-toolset:1.18.4 as builder ARG SW_VERSION="unknown" ARG GOVERSION="1.18.4" @@ -20,7 +20,7 @@ COPY Makefile Makefile RUN make compile # Create final image from minimal + built binary -FROM registry.access.redhat.com/ubi8/ubi-minimal:8.7 +FROM registry.access.redhat.com/ubi9/ubi-minimal:9.1.0 WORKDIR / COPY --from=builder /opt/app-root/bin/netobserv-ebpf-agent . USER 65532:65532 diff --git a/docs/architecture.md b/docs/architecture.md index 3e799481c9e874a293c0920943ff133aa7ffb0c5..f659b92a219fe28b97673dc3fea12d8d43fa281c 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -6,7 +6,6 @@ The following graph provides a birds' eye view on how the different components a For more info on each component, please check their corresponding Go docs. - ```mermaid flowchart TD E(ebpf.FlowFetcher) --> |"pushes via<br/>RingBuffer"| RB(flow.RingBufTracer) @@ -14,6 +13,7 @@ flowchart TD E --> |"polls<br/>PerCPUHashMap"| M(flow.MapTracer) RB --> |chan *flow.Record| ACC(flow.Accounter) + RB -.-> |flushes| M ACC --> |"chan []*flow.Record"| DD(flow.Deduper) M --> |"chan []*flow.Record"| DD @@ -26,4 +26,4 @@ flowchart TD CL --> |"chan []*flow.Record"| DC(flow.Decorator) DC --> |"chan []*flow.Record"| EX("export.GRPCProto<br/>or<br/>export.KafkaProto") -``` \ No newline at end of file +``` diff --git a/pkg/ebpf/tracer.go b/pkg/ebpf/tracer.go index 2ae07741f93b47964a0cbcad055ffe1df19e3e6d..b504e7cc57ba128222b573e62248ce244bb40a19 100644 --- a/pkg/ebpf/tracer.go +++ b/pkg/ebpf/tracer.go @@ -233,22 +233,25 @@ func (m *FlowFetcher) Close() error { m.objects = nil } for iface, ef := range m.egressFilters { - log.WithField("interface", iface).Debug("deleting egress filter") - if err := netlink.FilterDel(ef); err != nil { + log := log.WithField("interface", iface) + log.Debug("deleting egress filter") + if err := doIgnoreNoDev(netlink.FilterDel, netlink.Filter(ef), log); err != nil { errs = append(errs, fmt.Errorf("deleting egress filter: %w", err)) } } m.egressFilters = map[ifaces.Interface]*netlink.BpfFilter{} for iface, igf := range m.ingressFilters { - log.WithField("interface", iface).Debug("deleting ingress filter") - if err := netlink.FilterDel(igf); err != nil { + log := log.WithField("interface", iface) + log.Debug("deleting ingress filter") + if err := doIgnoreNoDev(netlink.FilterDel, netlink.Filter(igf), log); err != nil { errs = append(errs, fmt.Errorf("deleting ingress filter: %w", err)) } } m.ingressFilters = map[ifaces.Interface]*netlink.BpfFilter{} for iface, qd := range m.qdiscs { - log.WithField("interface", iface).Debug("deleting Qdisc") - if err := netlink.QdiscDel(qd); err != nil { + log := log.WithField("interface", iface) + log.Debug("deleting Qdisc") + if err := doIgnoreNoDev(netlink.QdiscDel, netlink.Qdisc(qd), log); err != nil { errs = append(errs, fmt.Errorf("deleting qdisc: %w", err)) } } @@ -264,6 +267,28 @@ func (m *FlowFetcher) Close() error { return errors.New(`errors: "` + strings.Join(errStrings, `", "`) + `"`) } +// doIgnoreNoDev runs the provided syscall over the provided device and ignores the error +// if the cause is a non-existing device (just logs the error as debug). +// If the agent is deployed as part of the Network Observability pipeline, normally +// undeploying the FlowCollector could cause the agent to try to remove resources +// from Pods that have been removed immediately before (e.g. flowlogs-pipeline or the +// console plugin), so we avoid logging some errors that would unnecessarily raise the +// user's attention. +// This function uses generics because the set of provided functions accept different argument +// types. +func doIgnoreNoDev[T any](sysCall func(T) error, dev T, log *logrus.Entry) error { + if err := sysCall(dev); err != nil { + if errors.Is(err, unix.ENODEV) { + log.WithError(err).Error("can't delete. Ignore this error if other pods or interfaces " + + " are also being deleted at this moment. For example, if you are undeploying " + + " a FlowCollector or Deployment where this agent is part of") + } else { + return err + } + } + return nil +} + func (m *FlowFetcher) ReadRingBuf() (ringbuf.Record, error) { return m.ringbufReader.Read() } diff --git a/pkg/flow/tracer_map.go b/pkg/flow/tracer_map.go index a71c88dda594690c32d945c811c77c8628368bc9..197ccc5177d85349f9080cccc5d8ba2a29b25229 100644 --- a/pkg/flow/tracer_map.go +++ b/pkg/flow/tracer_map.go @@ -48,8 +48,6 @@ func (m *MapTracer) TraceLoop(ctx context.Context) node.StartFunc[[]*Record] { for { select { case <-ctx.Done(): - mtlog.Debug("triggering flow eviction after context cancelation") - m.Flush() evictionTicker.Stop() mtlog.Debug("exiting trace loop due to context cancellation") return @@ -77,14 +75,14 @@ func (m *MapTracer) evictionSynchronization(ctx context.Context, out chan<- []*R return default: mtlog.Debug("evictionSynchronization signal received") - m.evictFlows(out) + m.evictFlows(ctx, out) } m.evictionCond.L.Unlock() } } -func (m *MapTracer) evictFlows(forwardFlows chan<- []*Record) { +func (m *MapTracer) evictFlows(ctx context.Context, forwardFlows chan<- []*Record) { // it's important that this monotonic timer reports same or approximate values as kernel-side bpf_ktime_get_ns() monotonicTimeNow := monotime.Now() currentTime := time.Now() @@ -109,7 +107,12 @@ func (m *MapTracer) evictFlows(forwardFlows chan<- []*Record) { )) } m.lastEvictionNs = laterFlowNs - forwardFlows <- forwardingFlows + select { + case <-ctx.Done(): + mtlog.Debug("skipping flow eviction as agent is being stopped") + default: + forwardFlows <- forwardingFlows + } mtlog.Debugf("%d flows evicted", len(forwardingFlows)) }