Newer
Older
import (
"encoding/binary"
"fmt"
"io"
"net"
"time"
Mohamed S. Mahmoud
committed
"github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf"
// Values according to field 61 in https://www.iana.org/assignments/ipfix/ipfix.xhtml
const (
DirectionIngress = uint8(0)
DirectionEgress = uint8(1)
)
const MacLen = 6
// IPv6Type value as defined in IEEE 802: https://www.iana.org/assignments/ieee-802-numbers/ieee-802-numbers.xhtml
const IPv6Type = 0x86DD
type MacAddr [MacLen]uint8
type Direction uint8
// IPAddr encodes v4 and v6 IPs with a fixed length.
// IPv4 addresses are encoded as IPv6 addresses with prefix ::ffff/96
// as described in https://datatracker.ietf.org/doc/html/rfc4038#section-4.2
// (same behavior as Go's net.IP type)
type IPAddr [net.IPv6len]uint8
Mohamed S. Mahmoud
committed
type RawRecord ebpf.BpfFlowRecordT
// Record contains accumulated metrics from a flow
type Record struct {
RawRecord
// TODO: redundant field from RecordMetrics. Reorganize structs
Mohamed S. Mahmoud
committed
TimeFlowStart time.Time
TimeFlowEnd time.Time
DNSLatency time.Duration
Interface string
// Duplicate tells whether this flow has another duplicate so it has to be excluded from
// any metrics' aggregation (e.g. bytes/second rates between two pods).
// The reason for this field is that the same flow can be observed from multiple interfaces,
// so the agent needs to choose only a view of the same flow and mark the others as
// "exclude from aggregation". Otherwise rates, sums, etc... values would be multiplied by the
// number of interfaces this flow is observed from.
Duplicate bool
// AgentIP provides information about the source of the flow (the Agent that traced it)
AgentIP net.IP
// Calculated RTT which is set when record is created by calling NewRecord
TimeFlowRtt time.Duration
func NewRecord(
Mohamed S. Mahmoud
committed
key ebpf.BpfFlowId,
metrics *ebpf.BpfFlowMetrics,
currentTime time.Time,
monotonicCurrentTime uint64,
) *Record {
Mohamed S. Mahmoud
committed
startDelta := time.Duration(monotonicCurrentTime - metrics.StartMonoTimeTs)
endDelta := time.Duration(monotonicCurrentTime - metrics.EndMonoTimeTs)
var record = Record{
RawRecord: RawRecord{
Mohamed S. Mahmoud
committed
Id: key,
Metrics: *metrics,
},
TimeFlowStart: currentTime.Add(-startDelta),
TimeFlowEnd: currentTime.Add(-endDelta),
}
if metrics.FlowRtt != 0 {
Mohamed S. Mahmoud
committed
record.TimeFlowRtt = time.Duration(metrics.FlowRtt)
}
Mohamed S. Mahmoud
committed
if metrics.DnsRecord.Latency != 0 {
record.DNSLatency = time.Duration(metrics.DnsRecord.Latency)
}
return &record
// IP returns the net.IP equivalent object
Mohamed S. Mahmoud
committed
func IP(ia IPAddr) net.IP {
}
// IntEncodeV4 encodes an IPv4 address as an integer (in network encoding, big endian).
// It assumes that the passed IP is already IPv4. Otherwise it would just encode the
// last 4 bytes of an IPv6 address
Mohamed S. Mahmoud
committed
func IntEncodeV4(ia [net.IPv6len]uint8) uint32 {
return binary.BigEndian.Uint32(ia[net.IPv6len-net.IPv4len : net.IPv6len])
func (ia *IPAddr) MarshalJSON() ([]byte, error) {
Mohamed S. Mahmoud
committed
return []byte(`"` + IP(*ia).String() + `"`), nil
func (m *MacAddr) String() string {
return fmt.Sprintf("%02x:%02x:%02x:%02x:%02x:%02x", m[0], m[1], m[2], m[3], m[4], m[5])
}
func (m *MacAddr) MarshalJSON() ([]byte, error) {
return []byte("\"" + m.String() + "\""), nil
}
// ReadFrom reads a Record from a binary source, in LittleEndian order
func ReadFrom(reader io.Reader) (*RawRecord, error) {
var fr RawRecord
err := binary.Read(reader, binary.LittleEndian, &fr)
return &fr, err