Skip to content
Snippets Groups Projects
Unverified Commit 1db7150f authored by Mario Macias's avatar Mario Macias Committed by GitHub
Browse files

NETOBSERV-661: fix crash on shutdown (#88)

* NETOBSERV-661: fix crash on shutdown

* Fix linting

* moved Debug to Error message
parent 45b4757d
Branches
Tags
No related merge requests found
# Build the manager binary # Build the manager binary
FROM registry.access.redhat.com/ubi8/go-toolset:1.18.4 as builder FROM registry.access.redhat.com/ubi9/go-toolset:1.18.4 as builder
ARG SW_VERSION="unknown" ARG SW_VERSION="unknown"
ARG GOVERSION="1.18.4" ARG GOVERSION="1.18.4"
...@@ -20,7 +20,7 @@ COPY Makefile Makefile ...@@ -20,7 +20,7 @@ COPY Makefile Makefile
RUN make compile RUN make compile
# Create final image from minimal + built binary # Create final image from minimal + built binary
FROM registry.access.redhat.com/ubi8/ubi-minimal:8.7 FROM registry.access.redhat.com/ubi9/ubi-minimal:9.1.0
WORKDIR / WORKDIR /
COPY --from=builder /opt/app-root/bin/netobserv-ebpf-agent . COPY --from=builder /opt/app-root/bin/netobserv-ebpf-agent .
USER 65532:65532 USER 65532:65532
......
...@@ -6,7 +6,6 @@ The following graph provides a birds' eye view on how the different components a ...@@ -6,7 +6,6 @@ The following graph provides a birds' eye view on how the different components a
For more info on each component, please check their corresponding Go docs. For more info on each component, please check their corresponding Go docs.
```mermaid ```mermaid
flowchart TD flowchart TD
E(ebpf.FlowFetcher) --> |"pushes via<br/>RingBuffer"| RB(flow.RingBufTracer) E(ebpf.FlowFetcher) --> |"pushes via<br/>RingBuffer"| RB(flow.RingBufTracer)
...@@ -14,6 +13,7 @@ flowchart TD ...@@ -14,6 +13,7 @@ flowchart TD
E --> |"polls<br/>PerCPUHashMap"| M(flow.MapTracer) E --> |"polls<br/>PerCPUHashMap"| M(flow.MapTracer)
RB --> |chan *flow.Record| ACC(flow.Accounter) RB --> |chan *flow.Record| ACC(flow.Accounter)
RB -.-> |flushes| M
ACC --> |"chan []*flow.Record"| DD(flow.Deduper) ACC --> |"chan []*flow.Record"| DD(flow.Deduper)
M --> |"chan []*flow.Record"| DD M --> |"chan []*flow.Record"| DD
...@@ -26,4 +26,4 @@ flowchart TD ...@@ -26,4 +26,4 @@ flowchart TD
CL --> |"chan []*flow.Record"| DC(flow.Decorator) CL --> |"chan []*flow.Record"| DC(flow.Decorator)
DC --> |"chan []*flow.Record"| EX("export.GRPCProto<br/>or<br/>export.KafkaProto") DC --> |"chan []*flow.Record"| EX("export.GRPCProto<br/>or<br/>export.KafkaProto")
``` ```
\ No newline at end of file
...@@ -233,22 +233,25 @@ func (m *FlowFetcher) Close() error { ...@@ -233,22 +233,25 @@ func (m *FlowFetcher) Close() error {
m.objects = nil m.objects = nil
} }
for iface, ef := range m.egressFilters { for iface, ef := range m.egressFilters {
log.WithField("interface", iface).Debug("deleting egress filter") log := log.WithField("interface", iface)
if err := netlink.FilterDel(ef); err != nil { log.Debug("deleting egress filter")
if err := doIgnoreNoDev(netlink.FilterDel, netlink.Filter(ef), log); err != nil {
errs = append(errs, fmt.Errorf("deleting egress filter: %w", err)) errs = append(errs, fmt.Errorf("deleting egress filter: %w", err))
} }
} }
m.egressFilters = map[ifaces.Interface]*netlink.BpfFilter{} m.egressFilters = map[ifaces.Interface]*netlink.BpfFilter{}
for iface, igf := range m.ingressFilters { for iface, igf := range m.ingressFilters {
log.WithField("interface", iface).Debug("deleting ingress filter") log := log.WithField("interface", iface)
if err := netlink.FilterDel(igf); err != nil { log.Debug("deleting ingress filter")
if err := doIgnoreNoDev(netlink.FilterDel, netlink.Filter(igf), log); err != nil {
errs = append(errs, fmt.Errorf("deleting ingress filter: %w", err)) errs = append(errs, fmt.Errorf("deleting ingress filter: %w", err))
} }
} }
m.ingressFilters = map[ifaces.Interface]*netlink.BpfFilter{} m.ingressFilters = map[ifaces.Interface]*netlink.BpfFilter{}
for iface, qd := range m.qdiscs { for iface, qd := range m.qdiscs {
log.WithField("interface", iface).Debug("deleting Qdisc") log := log.WithField("interface", iface)
if err := netlink.QdiscDel(qd); err != nil { log.Debug("deleting Qdisc")
if err := doIgnoreNoDev(netlink.QdiscDel, netlink.Qdisc(qd), log); err != nil {
errs = append(errs, fmt.Errorf("deleting qdisc: %w", err)) errs = append(errs, fmt.Errorf("deleting qdisc: %w", err))
} }
} }
...@@ -264,6 +267,28 @@ func (m *FlowFetcher) Close() error { ...@@ -264,6 +267,28 @@ func (m *FlowFetcher) Close() error {
return errors.New(`errors: "` + strings.Join(errStrings, `", "`) + `"`) return errors.New(`errors: "` + strings.Join(errStrings, `", "`) + `"`)
} }
// doIgnoreNoDev runs the provided syscall over the provided device and ignores the error
// if the cause is a non-existing device (just logs the error as debug).
// If the agent is deployed as part of the Network Observability pipeline, normally
// undeploying the FlowCollector could cause the agent to try to remove resources
// from Pods that have been removed immediately before (e.g. flowlogs-pipeline or the
// console plugin), so we avoid logging some errors that would unnecessarily raise the
// user's attention.
// This function uses generics because the set of provided functions accept different argument
// types.
func doIgnoreNoDev[T any](sysCall func(T) error, dev T, log *logrus.Entry) error {
if err := sysCall(dev); err != nil {
if errors.Is(err, unix.ENODEV) {
log.WithError(err).Error("can't delete. Ignore this error if other pods or interfaces " +
" are also being deleted at this moment. For example, if you are undeploying " +
" a FlowCollector or Deployment where this agent is part of")
} else {
return err
}
}
return nil
}
func (m *FlowFetcher) ReadRingBuf() (ringbuf.Record, error) { func (m *FlowFetcher) ReadRingBuf() (ringbuf.Record, error) {
return m.ringbufReader.Read() return m.ringbufReader.Read()
} }
......
...@@ -48,8 +48,6 @@ func (m *MapTracer) TraceLoop(ctx context.Context) node.StartFunc[[]*Record] { ...@@ -48,8 +48,6 @@ func (m *MapTracer) TraceLoop(ctx context.Context) node.StartFunc[[]*Record] {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
mtlog.Debug("triggering flow eviction after context cancelation")
m.Flush()
evictionTicker.Stop() evictionTicker.Stop()
mtlog.Debug("exiting trace loop due to context cancellation") mtlog.Debug("exiting trace loop due to context cancellation")
return return
...@@ -77,14 +75,14 @@ func (m *MapTracer) evictionSynchronization(ctx context.Context, out chan<- []*R ...@@ -77,14 +75,14 @@ func (m *MapTracer) evictionSynchronization(ctx context.Context, out chan<- []*R
return return
default: default:
mtlog.Debug("evictionSynchronization signal received") mtlog.Debug("evictionSynchronization signal received")
m.evictFlows(out) m.evictFlows(ctx, out)
} }
m.evictionCond.L.Unlock() m.evictionCond.L.Unlock()
} }
} }
func (m *MapTracer) evictFlows(forwardFlows chan<- []*Record) { func (m *MapTracer) evictFlows(ctx context.Context, forwardFlows chan<- []*Record) {
// it's important that this monotonic timer reports same or approximate values as kernel-side bpf_ktime_get_ns() // it's important that this monotonic timer reports same or approximate values as kernel-side bpf_ktime_get_ns()
monotonicTimeNow := monotime.Now() monotonicTimeNow := monotime.Now()
currentTime := time.Now() currentTime := time.Now()
...@@ -109,7 +107,12 @@ func (m *MapTracer) evictFlows(forwardFlows chan<- []*Record) { ...@@ -109,7 +107,12 @@ func (m *MapTracer) evictFlows(forwardFlows chan<- []*Record) {
)) ))
} }
m.lastEvictionNs = laterFlowNs m.lastEvictionNs = laterFlowNs
forwardFlows <- forwardingFlows select {
case <-ctx.Done():
mtlog.Debug("skipping flow eviction as agent is being stopped")
default:
forwardFlows <- forwardingFlows
}
mtlog.Debugf("%d flows evicted", len(forwardingFlows)) mtlog.Debugf("%d flows evicted", len(forwardingFlows))
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment