Skip to content
Snippets Groups Projects
Unverified Commit 08030df2 authored by Julien Pinsonneau's avatar Julien Pinsonneau Committed by GitHub
Browse files

NETOBSERV-1471 gRPC export for packet capture (#291)

* grpc export for packet capture

* add missing error check when write

* addressed feedback

* update pb doc + Makefile

* allow deprecated configs

* remove target mentions

* deprecated config fixes

* removed tcp packet capture and updated example

* updated import order

* different than zero

* aggregate errors

* import order 2

* misc return nill
parent 6e00a92e
Branches
Tags
No related merge requests found
Showing
with 215 additions and 87 deletions
...@@ -126,6 +126,7 @@ generate: prereqs ## Generate artifacts of the code repo (pkg/ebpf and pkg/proto ...@@ -126,6 +126,7 @@ generate: prereqs ## Generate artifacts of the code repo (pkg/ebpf and pkg/proto
go generate ./pkg/... go generate ./pkg/...
@echo "### Generating gRPC and Protocol Buffers code" @echo "### Generating gRPC and Protocol Buffers code"
PATH="$(shell pwd)/protoc/bin:$$PATH" protoc --go_out=pkg --go-grpc_out=pkg proto/flow.proto PATH="$(shell pwd)/protoc/bin:$$PATH" protoc --go_out=pkg --go-grpc_out=pkg proto/flow.proto
PATH="$(shell pwd)/protoc/bin:$$PATH" protoc --go_out=pkg --go-grpc_out=pkg proto/packet.proto
.PHONY: docker-generate .PHONY: docker-generate
docker-generate: ## Create the container that generates the eBPF binaries docker-generate: ## Create the container that generates the eBPF binaries
......
...@@ -46,8 +46,8 @@ configured by our [Network Observability Operator](https://github.com/netobserv/ ...@@ -46,8 +46,8 @@ configured by our [Network Observability Operator](https://github.com/netobserv/
Anyway you can run it directly as an executable from your command line: Anyway you can run it directly as an executable from your command line:
``` ```
export FLOWS_TARGET_HOST=... export HOST=...
export FLOWS_TARGET_PORT=... export PORT=...
sudo -E bin/netobserv-ebpf-agent sudo -E bin/netobserv-ebpf-agent
``` ```
......
...@@ -32,11 +32,11 @@ spec: ...@@ -32,11 +32,11 @@ spec:
- SYS_RESOURCE - SYS_RESOURCE
runAsUser: 0 runAsUser: 0
env: env:
- name: FLOWS_TARGET_HOST - name: HOST
valueFrom: valueFrom:
fieldRef: fieldRef:
fieldPath: status.hostIP fieldPath: status.hostIP
- name: FLOWS_TARGET_PORT - name: PORT
value: "9999" value: "9999"
--- ---
apiVersion: apps/v1 apiVersion: apps/v1
......
...@@ -27,11 +27,11 @@ spec: ...@@ -27,11 +27,11 @@ spec:
privileged: true privileged: true
runAsUser: 0 runAsUser: 0
env: env:
- name: FLOWS_TARGET_HOST - name: HOST
valueFrom: valueFrom:
fieldRef: fieldRef:
fieldPath: status.hostIP fieldPath: status.hostIP
- name: FLOWS_TARGET_PORT - name: PORT
value: "9999" value: "9999"
--- ---
apiVersion: apps/v1 apiVersion: apps/v1
......
...@@ -27,9 +27,9 @@ spec: ...@@ -27,9 +27,9 @@ spec:
privileged: true privileged: true
runAsUser: 0 runAsUser: 0
env: env:
- name: FLOWS_TARGET_HOST - name: HOST
value: "flp" value: "flp"
- name: FLOWS_TARGET_PORT - name: PORT
value: "9999" value: "9999"
--- ---
apiVersion: v1 apiVersion: v1
......
...@@ -5,8 +5,8 @@ _Please also refer to the file [config.go](../pkg/agent/config.go) which is the ...@@ -5,8 +5,8 @@ _Please also refer to the file [config.go](../pkg/agent/config.go) which is the
The following environment variables are available to configure the NetObserv eBFP Agent: The following environment variables are available to configure the NetObserv eBFP Agent:
* `EXPORT` (default: `grpc`). Flows' exporter protocol. Accepted values are: `grpc`, `kafka`, `ipfix+udp`, `ipfix+tcp` or `direct-flp`. In `direct-flp` mode, [flowlogs-pipeline](https://github.com/netobserv/flowlogs-pipeline) is run internally from the agent, allowing more filtering, transformations and exporting options. * `EXPORT` (default: `grpc`). Flows' exporter protocol. Accepted values are: `grpc`, `kafka`, `ipfix+udp`, `ipfix+tcp` or `direct-flp`. In `direct-flp` mode, [flowlogs-pipeline](https://github.com/netobserv/flowlogs-pipeline) is run internally from the agent, allowing more filtering, transformations and exporting options.
* `FLOWS_TARGET_HOST` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Host name or IP of the target Flow collector. * `HOST` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Host name or IP of the Flow collector.
* `FLOWS_TARGET_PORT` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Port of the target flow collector. * `PORT` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Port of the flow collector.
* `GRPC_MESSAGE_MAX_FLOWS` (default: `10000`). Specifies the limit, in number of flows, of each GRPC * `GRPC_MESSAGE_MAX_FLOWS` (default: `10000`). Specifies the limit, in number of flows, of each GRPC
message. Messages larger than that number will be split and submitted sequentially. message. Messages larger than that number will be split and submitted sequentially.
* `AGENT_IP` (optional). Allows overriding the reported Agent IP address on each flow. * `AGENT_IP` (optional). Allows overriding the reported Agent IP address on each flow.
......
...@@ -26,11 +26,11 @@ spec: ...@@ -26,11 +26,11 @@ spec:
value: 200ms value: 200ms
- name: LOG_LEVEL - name: LOG_LEVEL
value: debug value: debug
- name: FLOWS_TARGET_HOST - name: HOST
valueFrom: valueFrom:
fieldRef: fieldRef:
fieldPath: status.hostIP fieldPath: status.hostIP
- name: FLOWS_TARGET_PORT - name: PORT
value: "9999" value: "9999"
volumeMounts: volumeMounts:
- name: bpf-kernel-debug - name: bpf-kernel-debug
......
...@@ -28,11 +28,11 @@ spec: ...@@ -28,11 +28,11 @@ spec:
value: 200ms value: 200ms
- name: LOG_LEVEL - name: LOG_LEVEL
value: debug value: debug
- name: FLOWS_TARGET_HOST - name: HOST
valueFrom: valueFrom:
fieldRef: fieldRef:
fieldPath: status.hostIP fieldPath: status.hostIP
- name: FLOWS_TARGET_PORT - name: PORT
value: "9999" value: "9999"
volumeMounts: volumeMounts:
- name: bpf-kernel-debug - name: bpf-kernel-debug
......
...@@ -14,7 +14,7 @@ go build -mod vendor -o bin/flowlogs-dump-collector examples/flowlogs-dump/serve ...@@ -14,7 +14,7 @@ go build -mod vendor -o bin/flowlogs-dump-collector examples/flowlogs-dump/serve
``` ```
Start the agent using: Start the agent using:
```bash ```bash
sudo FLOWS_TARGET_HOST=127.0.0.1 FLOWS_TARGET_PORT=9999 ./bin/netobserv-ebpf-agent sudo HOST=127.0.0.1 PORT=9999 ./bin/netobserv-ebpf-agent
``` ```
Start the flowlogs-dump-collector using: (in a secondary shell) Start the flowlogs-dump-collector using: (in a secondary shell)
......
...@@ -22,7 +22,7 @@ import ( ...@@ -22,7 +22,7 @@ import (
"log" "log"
"net" "net"
"github.com/netobserv/netobserv-ebpf-agent/pkg/grpc" grpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/flow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow" "github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
) )
......
# packetcapture-client # Packet Capture TCP Client
## How to run ## How to run
...@@ -12,14 +12,14 @@ Build the packetcapture-dump-collector (the client that receives full packets fr ...@@ -12,14 +12,14 @@ Build the packetcapture-dump-collector (the client that receives full packets fr
```bash ```bash
go build -mod vendor -o bin/packetcapture-client examples/packetcapture-dump/client/packetcapture-client.go go build -mod vendor -o bin/packetcapture-client examples/packetcapture-dump/client/packetcapture-client.go
``` ```
Start the agent using: Start the packetcapture-client using: (in a secondary shell)
```bash ```bash
sudo PCA_SERVER_PORT=9990 ENABLE_PCA=true PCA_FILTER=tcp,22 ./bin/netobserv-ebpf-agent ./bin/packetcapture-client -outfile=capture.pcap
``` ```
Start the packetcapture-client using: (in a secondary shell) Start the agent using:
```bash ```bash
./bin/packetcapture-client -outfile=capture.pcap sudo HOST=localhost PORT=9990 ENABLE_PCA=true PCA_FILTER=tcp,22 ./bin/netobserv-ebpf-agent
``` ```
You should see output such as: You should see output such as:
...@@ -29,13 +29,13 @@ By default, the read packets are printed on stdout. ...@@ -29,13 +29,13 @@ By default, the read packets are printed on stdout.
To write to a pcap file use flag '-outfile=[filename]' To write to a pcap file use flag '-outfile=[filename]'
This creates a file [filename] and writes packets to it. This creates a file [filename] and writes packets to it.
To view captured packets 'tcpdump -r [filename]'. To view captured packets 'tcpdump -r [filename]'.
writting into capture.pcap
07-24-2023 07:58:59.264323 : Received Packet of length 24 03-22-2024 10:48:44.941828 : Received Packet of length 136
07-24-2023 07:59:04.268965 : Received Packet of length 410 03-22-2024 10:48:44.942901 : Received Packet of length 106
07-24-2023 07:59:04.269048 : Received Packet of length 644 03-22-2024 10:48:44.943597 : Received Packet of length 110
07-24-2023 07:59:04.269087 : Received Packet of length 224 03-22-2024 10:48:44.944182 : Received Packet of length 70
07-24-2023 07:59:04.269125 : Received Packet of length 82 03-22-2024 10:48:44.944447 : Received Packet of length 70
07-24-2023 07:59:04.269173 : Received Packet of length 148 03-22-2024 10:48:44.944644 : Received Packet of length 138
... ...
``` ```
......
...@@ -20,17 +20,24 @@ package main ...@@ -20,17 +20,24 @@ package main
import ( import (
"flag" "flag"
"fmt" "fmt"
"net"
"os" "os"
"time" "time"
"github.com/netobserv/netobserv-ebpf-agent/pkg/exporter"
grpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/packet"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbpacket"
"github.com/google/gopacket/layers"
) )
var ( var (
PORT = flag.String("connect_port", "9990", "TCP port to connect to for packet stream") PORT = flag.Int("port", 9990, "gRPC collector port for packet stream")
HOST = flag.String("connect_host", "localhost", "Packet Capture Agent IP") FILENAME = flag.String("outfile", "", "Create and write to <Filename>.pcap")
FILENAME = flag.String("outfile", "", "Create and write to Filename.pcap")
) )
// Setting Snapshot length to 0 sets it to maximum packet size
var snapshotlen uint32
func check(e error) { func check(e error) {
if e != nil { if e != nil {
panic(e) panic(e)
...@@ -45,47 +52,41 @@ func main() { ...@@ -45,47 +52,41 @@ func main() {
fmt.Println("To view captured packets 'tcpdump -r [filename]'.") fmt.Println("To view captured packets 'tcpdump -r [filename]'.")
flag.Parse() flag.Parse()
tcpServer, err := net.ResolveTCPAddr("tcp", *HOST+":"+*PORT) flowPackets := make(chan *pbpacket.Packet, 100)
collector, err := grpc.StartCollector(*PORT, flowPackets)
if err != nil {
println("ResolveTCPAddr failed:", err.Error())
os.Exit(1)
}
conn, err := net.DialTCP("tcp", nil, tcpServer)
if err != nil { if err != nil {
println("Dial failed:", err.Error()) fmt.Println("StartCollector failed:", err.Error())
os.Exit(1) os.Exit(1)
} }
var f *os.File var f *os.File
if *FILENAME != "" { if *FILENAME != "" {
f, err = os.Create(*FILENAME) f, err = os.Create(*FILENAME)
if err != nil { if err != nil {
fmt.Println("Create file failed:", err.Error())
os.Exit(1)
}
// write pcap file header
_, err = f.Write(exporter.GetPCAPFileHeader(snapshotlen, layers.LinkTypeEthernet))
if err != nil {
fmt.Println("Write file header failed:", err.Error())
os.Exit(1) os.Exit(1)
} }
fmt.Println("writting into", *FILENAME)
defer f.Close() defer f.Close()
for { for fp := range flowPackets {
received := make([]byte, 65535) _, err = f.Write(fp.Pcap.Value)
n, err := conn.Read(received)
if err != nil {
println("Read data failed:", err.Error())
os.Exit(1)
}
_, err = f.Write(received[:n])
check(err) check(err)
dt := time.Now() dt := time.Now()
fmt.Println(dt.Format("01-02-2006 15:04:05.000000"), ": Received Packet of length ", n) fmt.Println(dt.Format("01-02-2006 15:04:05.000000"), ": Received Packet of length ", len(fp.Pcap.Value))
} }
} else { } else {
fmt.Println("into else") fmt.Println("printing stdout without saving in file")
for {
received := make([]byte, 65535) for fp := range flowPackets {
n, err := conn.Read(received) fmt.Println(fp.Pcap.Value)
if err != nil {
println("Read data failed:", err.Error())
os.Exit(1)
}
fmt.Println(received[:n])
} }
} }
conn.Close() collector.Close()
} }
...@@ -67,9 +67,9 @@ spec: ...@@ -67,9 +67,9 @@ spec:
value: call_error,cares_resolver,dns_resolver value: call_error,cares_resolver,dns_resolver
- name: GRPC_DNS_RESOLVER - name: GRPC_DNS_RESOLVER
value: "ares" value: "ares"
- name: FLOWS_TARGET_HOST - name: HOST
value: "packet-counter" value: "packet-counter"
- name: FLOWS_TARGET_PORT - name: PORT
value: "9999" value: "9999"
# resources: # resources:
# limits: # limits:
......
...@@ -5,7 +5,7 @@ import ( ...@@ -5,7 +5,7 @@ import (
"log" "log"
"time" "time"
"github.com/netobserv/netobserv-ebpf-agent/pkg/grpc" grpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/flow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow" "github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
"github.com/paulbellamy/ratecounter" "github.com/paulbellamy/ratecounter"
) )
......
...@@ -3,5 +3,5 @@ ...@@ -3,5 +3,5 @@
# /etc/default/netobserv-ebpf-agent # /etc/default/netobserv-ebpf-agent
DIRECTION=both DIRECTION=both
FLOWS_TARGET_HOST=127.0.0.1 HOST=127.0.0.1
FLOWS_TARGET_PORT=9999 PORT=9999
...@@ -135,6 +135,9 @@ type ebpfFlowFetcher interface { ...@@ -135,6 +135,9 @@ type ebpfFlowFetcher interface {
func FlowsAgent(cfg *Config) (*Flows, error) { func FlowsAgent(cfg *Config) (*Flows, error) {
alog.Info("initializing Flows agent") alog.Info("initializing Flows agent")
// manage deprecated configs
manageDeprecatedConfigs(cfg)
// configure informer for new interfaces // configure informer for new interfaces
var informer = configureInformer(cfg, alog) var informer = configureInformer(cfg, alog)
...@@ -286,16 +289,16 @@ func buildFlowExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*fl ...@@ -286,16 +289,16 @@ func buildFlowExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*fl
case "direct-flp": case "direct-flp":
return buildDirectFLPExporter(cfg) return buildDirectFLPExporter(cfg)
default: default:
return nil, fmt.Errorf("wrong export type %s. Admitted values are grpc, kafka", cfg.Export) return nil, fmt.Errorf("wrong flow export type %s", cfg.Export)
} }
} }
func buildGRPCExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*flow.Record], error) { func buildGRPCExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*flow.Record], error) {
if cfg.TargetHost == "" || cfg.TargetPort == 0 { if cfg.Host == "" || cfg.Port == 0 {
return nil, fmt.Errorf("missing target host or port: %s:%d", return nil, fmt.Errorf("missing target host or port: %s:%d",
cfg.TargetHost, cfg.TargetPort) cfg.Host, cfg.Port)
} }
grpcExporter, err := exporter.StartGRPCProto(cfg.TargetHost, cfg.TargetPort, cfg.GRPCMessageMaxFlows, m) grpcExporter, err := exporter.StartGRPCProto(cfg.Host, cfg.Port, cfg.GRPCMessageMaxFlows, m)
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -361,11 +364,11 @@ func buildKafkaExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*f ...@@ -361,11 +364,11 @@ func buildKafkaExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*f
} }
func buildIPFIXExporter(cfg *Config, proto string) (node.TerminalFunc[[]*flow.Record], error) { func buildIPFIXExporter(cfg *Config, proto string) (node.TerminalFunc[[]*flow.Record], error) {
if cfg.TargetHost == "" || cfg.TargetPort == 0 { if cfg.Host == "" || cfg.Port == 0 {
return nil, fmt.Errorf("missing target host or port: %s:%d", return nil, fmt.Errorf("missing target host or port: %s:%d",
cfg.TargetHost, cfg.TargetPort) cfg.Host, cfg.Port)
} }
ipfix, err := exporter.StartIPFIXExporter(cfg.TargetHost, cfg.TargetPort, proto) ipfix, err := exporter.StartIPFIXExporter(cfg.Host, cfg.Port, proto)
if err != nil { if err != nil {
return nil, err return nil, err
} }
......
...@@ -29,10 +29,10 @@ func TestFlowsAgent_InvalidConfigs(t *testing.T) { ...@@ -29,10 +29,10 @@ func TestFlowsAgent_InvalidConfigs(t *testing.T) {
c: Config{Export: "foo"}, c: Config{Export: "foo"},
}, { }, {
d: "GRPC: missing host", d: "GRPC: missing host",
c: Config{Export: "grpc", TargetPort: 3333}, c: Config{Export: "grpc", Port: 3333},
}, { }, {
d: "GRPC: missing port", d: "GRPC: missing port",
c: Config{Export: "grpc", TargetHost: "flp"}, c: Config{Export: "grpc", Host: "flp"},
}, { }, {
d: "Kafka: missing brokers", d: "Kafka: missing brokers",
c: Config{Export: "kafka"}, c: Config{Export: "kafka"},
......
...@@ -2,8 +2,12 @@ package agent ...@@ -2,8 +2,12 @@ package agent
import ( import (
"time" "time"
"github.com/sirupsen/logrus"
) )
var clog = logrus.WithField("component", "config")
const ( const (
ListenPoll = "poll" ListenPoll = "poll"
ListenWatch = "watch" ListenWatch = "watch"
...@@ -34,14 +38,15 @@ type Config struct { ...@@ -34,14 +38,15 @@ type Config struct {
// in the AgentID field of each flow. Accepted values are: any (default), ipv4, ipv6. // in the AgentID field of each flow. Accepted values are: any (default), ipv4, ipv6.
// If the AgentIP configuration property is set, this property has no effect. // If the AgentIP configuration property is set, this property has no effect.
AgentIPType string `env:"AGENT_IP_TYPE" envDefault:"any"` AgentIPType string `env:"AGENT_IP_TYPE" envDefault:"any"`
// Export selects the flows' exporter protocol. Accepted values are: grpc (default), kafka, // Export selects the exporter protocol.
// ipfix+udp, ipfix+tcp or direct-flp. // Accepted values for Flows are: grpc (default), kafka, ipfix+udp, ipfix+tcp or direct-flp.
// Accepted values for Packets are: grpc (default) or tcp
Export string `env:"EXPORT" envDefault:"grpc"` Export string `env:"EXPORT" envDefault:"grpc"`
// TargetHost is the host name or IP of the target Flow collector, when the EXPORT variable is // Host is the host name or IP of the Flow collector, when the EXPORT variable is
// set to "grpc" // set to "grpc"
TargetHost string `env:"FLOWS_TARGET_HOST"` Host string `env:"HOST"`
// TargetPort is the port the target Flow collector, when the EXPORT variable is set to "grpc" // Port is the port the Flow collector, when the EXPORT variable is set to "grpc"
TargetPort int `env:"FLOWS_TARGET_PORT"` Port int `env:"PORT"`
// GRPCMessageMaxFlows specifies the limit, in number of flows, of each GRPC message. Messages // GRPCMessageMaxFlows specifies the limit, in number of flows, of each GRPC message. Messages
// larger than that number will be split and submitted sequentially. // larger than that number will be split and submitted sequentially.
GRPCMessageMaxFlows int `env:"GRPC_MESSAGE_MAX_FLOWS" envDefault:"10000"` GRPCMessageMaxFlows int `env:"GRPC_MESSAGE_MAX_FLOWS" envDefault:"10000"`
...@@ -164,8 +169,6 @@ type Config struct { ...@@ -164,8 +169,6 @@ type Config struct {
// PCAFilters set the filters to determine packets to filter using Packet Capture Agent (PCA). It is a comma separated set. // PCAFilters set the filters to determine packets to filter using Packet Capture Agent (PCA). It is a comma separated set.
// The format is [protocol], [port number] Example: PCA_FILTER = "tcp,80". Currently, we support 'tcp','udp','sctp' for protocol. // The format is [protocol], [port number] Example: PCA_FILTER = "tcp,80". Currently, we support 'tcp','udp','sctp' for protocol.
PCAFilters string `env:"PCA_FILTER"` PCAFilters string `env:"PCA_FILTER"`
// PCAServerPort is the port PCA Server starts at, when ENABLE_PCA variable is set to true.
PCAServerPort int `env:"PCA_SERVER_PORT" envDefault:"9990"`
// MetricsEnable enables http server to collect ebpf agent metrics, default is false. // MetricsEnable enables http server to collect ebpf agent metrics, default is false.
MetricsEnable bool `env:"METRICS_ENABLE" envDefault:"false"` MetricsEnable bool `env:"METRICS_ENABLE" envDefault:"false"`
// MetricsServerAddress is the address of the server that collects ebpf agent metrics. // MetricsServerAddress is the address of the server that collects ebpf agent metrics.
...@@ -174,4 +177,30 @@ type Config struct { ...@@ -174,4 +177,30 @@ type Config struct {
MetricsPort int `env:"METRICS_SERVER_PORT" envDefault:"9090"` MetricsPort int `env:"METRICS_SERVER_PORT" envDefault:"9090"`
// MetricsPrefix is the prefix of the metrics that are sent to the server. // MetricsPrefix is the prefix of the metrics that are sent to the server.
MetricsPrefix string `env:"METRICS_PREFIX" envDefault:"ebpf_agent_"` MetricsPrefix string `env:"METRICS_PREFIX" envDefault:"ebpf_agent_"`
/* Deprecated configs are listed below this line
* See manageDeprecatedConfigs function for details
*/
// Deprecated FlowsTargetHost replaced by TargetHost
FlowsTargetHost string `env:"FLOWS_TARGET_HOST"`
// Deprecated FlowsTargetPort replaced by TargetPort
FlowsTargetPort int `env:"FLOWS_TARGET_PORT"`
// Deprecated PCAServerPort replaced by TargetPort
PCAServerPort int `env:"PCA_SERVER_PORT"`
}
func manageDeprecatedConfigs(cfg *Config) {
if len(cfg.FlowsTargetHost) != 0 {
clog.Infof("Using deprecated FlowsTargetHost %s", cfg.FlowsTargetHost)
cfg.Host = cfg.FlowsTargetHost
}
if cfg.FlowsTargetPort != 0 {
clog.Infof("Using deprecated FlowsTargetPort %d", cfg.FlowsTargetPort)
cfg.Port = cfg.FlowsTargetPort
} else if cfg.PCAServerPort != 0 {
clog.Infof("Using deprecated PCAServerPort %d", cfg.PCAServerPort)
cfg.Port = cfg.PCAServerPort
}
} }
...@@ -49,6 +49,9 @@ type ebpfPacketFetcher interface { ...@@ -49,6 +49,9 @@ type ebpfPacketFetcher interface {
func PacketsAgent(cfg *Config) (*Packets, error) { func PacketsAgent(cfg *Config) (*Packets, error) {
plog.Info("initializing Packets agent") plog.Info("initializing Packets agent")
// manage deprecated configs
manageDeprecatedConfigs(cfg)
// configure informer for new interfaces // configure informer for new interfaces
informer := configureInformer(cfg, plog) informer := configureInformer(cfg, plog)
...@@ -131,18 +134,27 @@ func packetsAgent(cfg *Config, ...@@ -131,18 +134,27 @@ func packetsAgent(cfg *Config,
}, nil }, nil
} }
func buildPacketExporter(cfg *Config) (node.TerminalFunc[[]*flow.PacketRecord], error) { func buildGRPCPacketExporter(cfg *Config) (node.TerminalFunc[[]*flow.PacketRecord], error) {
if cfg.PCAServerPort == 0 { if cfg.Host == "" || cfg.Port == 0 {
return nil, fmt.Errorf("missing PCA Server port: %d", return nil, fmt.Errorf("missing target host or port for PCA: %s:%d",
cfg.PCAServerPort) cfg.Host, cfg.Port)
} }
pcapStreamer, err := exporter.StartPCAPSend(fmt.Sprintf("%d", cfg.PCAServerPort)) plog.Info("starting gRPC Packet send")
pcapStreamer, err := exporter.StartGRPCPacketSend(cfg.Host, cfg.Port)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return pcapStreamer.ExportFlows, err return pcapStreamer.ExportGRPCPackets, nil
}
func buildPacketExporter(cfg *Config) (node.TerminalFunc[[]*flow.PacketRecord], error) {
switch cfg.Export {
case "grpc":
return buildGRPCPacketExporter(cfg)
default:
return nil, fmt.Errorf("unsupported packet export type %s", cfg.Export)
}
} }
// Run a Packets agent. The function will keep running in the same thread // Run a Packets agent. The function will keep running in the same thread
......
package exporter
import (
"context"
"fmt"
"github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
grpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/packet"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbpacket"
"github.com/google/gopacket"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/anypb"
)
type GRPCPacketProto struct {
hostIP string
hostPort int
clientConn *grpc.ClientConnection
}
var gplog = logrus.WithField("component", "packet/GRPCPackets")
// WritePacket writes the given packet data out to gRPC.
func writeGRPCPacket(ci gopacket.CaptureInfo, data []byte, conn *grpc.ClientConnection) error {
if ci.CaptureLength != len(data) {
return fmt.Errorf("capture length %d does not match data length %d", ci.CaptureLength, len(data))
}
if ci.CaptureLength > ci.Length {
return fmt.Errorf("invalid capture info %+v: capture length > length", ci)
}
gplog.Debugf("Sending Packet to client. Length: %d", len(data))
b, err := GetPacketHeader(ci)
if err != nil {
return fmt.Errorf("error writing packet header: %w", err)
}
// write 16 byte packet header & data all at once
_, err = conn.Client().Send(context.TODO(), &pbpacket.Packet{
Pcap: &anypb.Any{
Value: append(b, data...),
},
})
return err
}
func StartGRPCPacketSend(hostIP string, hostPort int) (*GRPCPacketProto, error) {
clientConn, err := grpc.ConnectClient(hostIP, hostPort)
if err != nil {
return nil, err
}
return &GRPCPacketProto{
hostIP: hostIP,
hostPort: hostPort,
clientConn: clientConn,
}, nil
}
func (p *GRPCPacketProto) ExportGRPCPackets(in <-chan []*flow.PacketRecord) {
for packetRecord := range in {
var errs []error
for _, packet := range packetRecord {
packetStream := packet.Stream
packetTimestamp := packet.Time
if len(packetStream) != 0 {
captureInfo := gopacket.CaptureInfo{
Timestamp: packetTimestamp,
CaptureLength: len(packetStream),
Length: len(packetStream),
}
if err := writeGRPCPacket(captureInfo, packetStream, p.clientConn); err != nil {
errs = append(errs, err)
}
}
}
if len(errs) != 0 {
gplog.Errorf("%d errors while sending packets:\n%s", len(errs), errs)
}
}
if err := p.clientConn.Close(); err != nil {
gplog.WithError(err).Warn("couldn't close packet export client")
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment