diff --git a/Makefile b/Makefile index a565d271685a02d666957a215c31eeec3f7e1bb4..598b161d4c9820be2153622ad52313834138c527 100644 --- a/Makefile +++ b/Makefile @@ -126,6 +126,7 @@ generate: prereqs ## Generate artifacts of the code repo (pkg/ebpf and pkg/proto go generate ./pkg/... @echo "### Generating gRPC and Protocol Buffers code" PATH="$(shell pwd)/protoc/bin:$$PATH" protoc --go_out=pkg --go-grpc_out=pkg proto/flow.proto + PATH="$(shell pwd)/protoc/bin:$$PATH" protoc --go_out=pkg --go-grpc_out=pkg proto/packet.proto .PHONY: docker-generate docker-generate: ## Create the container that generates the eBPF binaries diff --git a/README.md b/README.md index e678abc3399c72ee442dc335d1f13732147ac537..65f112cad3c00b5790a98ffae94120bc873a44a4 100644 --- a/README.md +++ b/README.md @@ -46,8 +46,8 @@ configured by our [Network Observability Operator](https://github.com/netobserv/ Anyway you can run it directly as an executable from your command line: ``` -export FLOWS_TARGET_HOST=... -export FLOWS_TARGET_PORT=... +export HOST=... +export PORT=... sudo -E bin/netobserv-ebpf-agent ``` diff --git a/deployments/flp-daemonset-cap.yml b/deployments/flp-daemonset-cap.yml index 23a4c4864773995702c3b0aa7939aedf78965362..f7dbea2a2ee0625677232657319aebfc66238827 100644 --- a/deployments/flp-daemonset-cap.yml +++ b/deployments/flp-daemonset-cap.yml @@ -32,11 +32,11 @@ spec: - SYS_RESOURCE runAsUser: 0 env: - - name: FLOWS_TARGET_HOST + - name: HOST valueFrom: fieldRef: fieldPath: status.hostIP - - name: FLOWS_TARGET_PORT + - name: PORT value: "9999" --- apiVersion: apps/v1 diff --git a/deployments/flp-daemonset.yml b/deployments/flp-daemonset.yml index 5c9d331c83302539e198ea38da7756c72d7ff42b..ab73b730233018a9e88c9f5eaf39e2ebac6d2c59 100644 --- a/deployments/flp-daemonset.yml +++ b/deployments/flp-daemonset.yml @@ -27,11 +27,11 @@ spec: privileged: true runAsUser: 0 env: - - name: FLOWS_TARGET_HOST + - name: HOST valueFrom: fieldRef: fieldPath: status.hostIP - - name: FLOWS_TARGET_PORT + - name: PORT value: "9999" --- apiVersion: apps/v1 diff --git a/deployments/flp-service.yml b/deployments/flp-service.yml index d4e2d6185568d5875f7769cd312ca71839d86ce4..e48dd7f9c83c1a1fd31dd779f8d699c76eede5e0 100644 --- a/deployments/flp-service.yml +++ b/deployments/flp-service.yml @@ -27,9 +27,9 @@ spec: privileged: true runAsUser: 0 env: - - name: FLOWS_TARGET_HOST + - name: HOST value: "flp" - - name: FLOWS_TARGET_PORT + - name: PORT value: "9999" --- apiVersion: v1 diff --git a/docs/config.md b/docs/config.md index 022aa6586ef8c6b52693e1e11b9ff4475964cba7..5a104b305e692366ceed494cd6c778a4e9c0326a 100644 --- a/docs/config.md +++ b/docs/config.md @@ -5,8 +5,8 @@ _Please also refer to the file [config.go](../pkg/agent/config.go) which is the The following environment variables are available to configure the NetObserv eBFP Agent: * `EXPORT` (default: `grpc`). Flows' exporter protocol. Accepted values are: `grpc`, `kafka`, `ipfix+udp`, `ipfix+tcp` or `direct-flp`. In `direct-flp` mode, [flowlogs-pipeline](https://github.com/netobserv/flowlogs-pipeline) is run internally from the agent, allowing more filtering, transformations and exporting options. -* `FLOWS_TARGET_HOST` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Host name or IP of the target Flow collector. -* `FLOWS_TARGET_PORT` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Port of the target flow collector. +* `HOST` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Host name or IP of the Flow collector. +* `PORT` (required if `EXPORT` is `grpc` or `ipfix+[tcp/udp]`). Port of the flow collector. * `GRPC_MESSAGE_MAX_FLOWS` (default: `10000`). Specifies the limit, in number of flows, of each GRPC message. Messages larger than that number will be split and submitted sequentially. * `AGENT_IP` (optional). Allows overriding the reported Agent IP address on each flow. diff --git a/e2e/cluster/base/04-agent.yml b/e2e/cluster/base/04-agent.yml index 7771b9f5d591374d597ebf75fb4fdaeb78a81576..d79b9ca2993c411bc56f4e88e77e5e52425bdb20 100644 --- a/e2e/cluster/base/04-agent.yml +++ b/e2e/cluster/base/04-agent.yml @@ -26,11 +26,11 @@ spec: value: 200ms - name: LOG_LEVEL value: debug - - name: FLOWS_TARGET_HOST + - name: HOST valueFrom: fieldRef: fieldPath: status.hostIP - - name: FLOWS_TARGET_PORT + - name: PORT value: "9999" volumeMounts: - name: bpf-kernel-debug diff --git a/e2e/ipfix/manifests/30-agent.yml b/e2e/ipfix/manifests/30-agent.yml index 8a0b09fc3c9e24692ed2ac42fb5c9f0374af3e3c..bef3564f533401f48257eca012c1fa0f2ffe997e 100644 --- a/e2e/ipfix/manifests/30-agent.yml +++ b/e2e/ipfix/manifests/30-agent.yml @@ -28,11 +28,11 @@ spec: value: 200ms - name: LOG_LEVEL value: debug - - name: FLOWS_TARGET_HOST + - name: HOST valueFrom: fieldRef: fieldPath: status.hostIP - - name: FLOWS_TARGET_PORT + - name: PORT value: "9999" volumeMounts: - name: bpf-kernel-debug diff --git a/examples/flowlogs-dump/README.md b/examples/flowlogs-dump/README.md index a5169b78f42539075760f7ea6393baebcb39b894..79c307f1a335edc843a2f770fcc792448e2fbccd 100644 --- a/examples/flowlogs-dump/README.md +++ b/examples/flowlogs-dump/README.md @@ -14,7 +14,7 @@ go build -mod vendor -o bin/flowlogs-dump-collector examples/flowlogs-dump/serve ``` Start the agent using: ```bash -sudo FLOWS_TARGET_HOST=127.0.0.1 FLOWS_TARGET_PORT=9999 ./bin/netobserv-ebpf-agent +sudo HOST=127.0.0.1 PORT=9999 ./bin/netobserv-ebpf-agent ``` Start the flowlogs-dump-collector using: (in a secondary shell) diff --git a/examples/flowlogs-dump/server/flowlogs-dump-collector.go b/examples/flowlogs-dump/server/flowlogs-dump-collector.go index d3a5d0a74ed91d89fe7df7e1995b2f2d4758c73f..83869bc09a483b17f644dd1d600db8d8f95bb55e 100644 --- a/examples/flowlogs-dump/server/flowlogs-dump-collector.go +++ b/examples/flowlogs-dump/server/flowlogs-dump-collector.go @@ -22,7 +22,7 @@ import ( "log" "net" - "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc" + grpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/flow" "github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow" ) diff --git a/examples/packetcapture-dump/README.md b/examples/packetcapture-dump/README.md index 9bdd1802b2225f8258ce7f5ef1fc21f9ca4173e6..9f21cf68290bdcde012faeec6dcd777f93608470 100644 --- a/examples/packetcapture-dump/README.md +++ b/examples/packetcapture-dump/README.md @@ -1,4 +1,4 @@ -# packetcapture-client +# Packet Capture TCP Client ## How to run @@ -12,14 +12,14 @@ Build the packetcapture-dump-collector (the client that receives full packets fr ```bash go build -mod vendor -o bin/packetcapture-client examples/packetcapture-dump/client/packetcapture-client.go ``` -Start the agent using: +Start the packetcapture-client using: (in a secondary shell) ```bash -sudo PCA_SERVER_PORT=9990 ENABLE_PCA=true PCA_FILTER=tcp,22 ./bin/netobserv-ebpf-agent +./bin/packetcapture-client -outfile=capture.pcap ``` -Start the packetcapture-client using: (in a secondary shell) +Start the agent using: ```bash -./bin/packetcapture-client -outfile=capture.pcap +sudo HOST=localhost PORT=9990 ENABLE_PCA=true PCA_FILTER=tcp,22 ./bin/netobserv-ebpf-agent ``` You should see output such as: @@ -29,13 +29,13 @@ By default, the read packets are printed on stdout. To write to a pcap file use flag '-outfile=[filename]' This creates a file [filename] and writes packets to it. To view captured packets 'tcpdump -r [filename]'. - -07-24-2023 07:58:59.264323 : Received Packet of length 24 -07-24-2023 07:59:04.268965 : Received Packet of length 410 -07-24-2023 07:59:04.269048 : Received Packet of length 644 -07-24-2023 07:59:04.269087 : Received Packet of length 224 -07-24-2023 07:59:04.269125 : Received Packet of length 82 -07-24-2023 07:59:04.269173 : Received Packet of length 148 +writting into capture.pcap +03-22-2024 10:48:44.941828 : Received Packet of length 136 +03-22-2024 10:48:44.942901 : Received Packet of length 106 +03-22-2024 10:48:44.943597 : Received Packet of length 110 +03-22-2024 10:48:44.944182 : Received Packet of length 70 +03-22-2024 10:48:44.944447 : Received Packet of length 70 +03-22-2024 10:48:44.944644 : Received Packet of length 138 ... ``` diff --git a/examples/packetcapture-dump/client/packetcapture-client.go b/examples/packetcapture-dump/client/packetcapture-client.go index bdb605fe80742858f7f14e22d3670aaeaf76a89a..114a5bb67f5e0bd30ed5819e33c6a36e941670e2 100644 --- a/examples/packetcapture-dump/client/packetcapture-client.go +++ b/examples/packetcapture-dump/client/packetcapture-client.go @@ -20,17 +20,24 @@ package main import ( "flag" "fmt" - "net" "os" "time" + + "github.com/netobserv/netobserv-ebpf-agent/pkg/exporter" + grpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/packet" + "github.com/netobserv/netobserv-ebpf-agent/pkg/pbpacket" + + "github.com/google/gopacket/layers" ) var ( - PORT = flag.String("connect_port", "9990", "TCP port to connect to for packet stream") - HOST = flag.String("connect_host", "localhost", "Packet Capture Agent IP") - FILENAME = flag.String("outfile", "", "Create and write to Filename.pcap") + PORT = flag.Int("port", 9990, "gRPC collector port for packet stream") + FILENAME = flag.String("outfile", "", "Create and write to <Filename>.pcap") ) +// Setting Snapshot length to 0 sets it to maximum packet size +var snapshotlen uint32 + func check(e error) { if e != nil { panic(e) @@ -45,47 +52,41 @@ func main() { fmt.Println("To view captured packets 'tcpdump -r [filename]'.") flag.Parse() - tcpServer, err := net.ResolveTCPAddr("tcp", *HOST+":"+*PORT) - - if err != nil { - println("ResolveTCPAddr failed:", err.Error()) - os.Exit(1) - } - conn, err := net.DialTCP("tcp", nil, tcpServer) + flowPackets := make(chan *pbpacket.Packet, 100) + collector, err := grpc.StartCollector(*PORT, flowPackets) if err != nil { - println("Dial failed:", err.Error()) + fmt.Println("StartCollector failed:", err.Error()) os.Exit(1) } + var f *os.File if *FILENAME != "" { f, err = os.Create(*FILENAME) if err != nil { + fmt.Println("Create file failed:", err.Error()) + os.Exit(1) + } + // write pcap file header + _, err = f.Write(exporter.GetPCAPFileHeader(snapshotlen, layers.LinkTypeEthernet)) + if err != nil { + fmt.Println("Write file header failed:", err.Error()) os.Exit(1) } + fmt.Println("writting into", *FILENAME) + defer f.Close() - for { - received := make([]byte, 65535) - n, err := conn.Read(received) - if err != nil { - println("Read data failed:", err.Error()) - os.Exit(1) - } - _, err = f.Write(received[:n]) + for fp := range flowPackets { + _, err = f.Write(fp.Pcap.Value) check(err) dt := time.Now() - fmt.Println(dt.Format("01-02-2006 15:04:05.000000"), ": Received Packet of length ", n) + fmt.Println(dt.Format("01-02-2006 15:04:05.000000"), ": Received Packet of length ", len(fp.Pcap.Value)) } } else { - fmt.Println("into else") - for { - received := make([]byte, 65535) - n, err := conn.Read(received) - if err != nil { - println("Read data failed:", err.Error()) - os.Exit(1) - } - fmt.Println(received[:n]) + fmt.Println("printing stdout without saving in file") + + for fp := range flowPackets { + fmt.Println(fp.Pcap.Value) } } - conn.Close() + collector.Close() } diff --git a/examples/performance/deployment.yml b/examples/performance/deployment.yml index de1a327c6f65860cc1c87eef8ee26ed514cad448..23007eafcd0d90f75900677bfbb8d629c0c40f8c 100644 --- a/examples/performance/deployment.yml +++ b/examples/performance/deployment.yml @@ -67,9 +67,9 @@ spec: value: call_error,cares_resolver,dns_resolver - name: GRPC_DNS_RESOLVER value: "ares" - - name: FLOWS_TARGET_HOST + - name: HOST value: "packet-counter" - - name: FLOWS_TARGET_PORT + - name: PORT value: "9999" # resources: # limits: diff --git a/examples/performance/server/packet-counter-collector.go b/examples/performance/server/packet-counter-collector.go index 23ab52e6d9c3fed7fb3349f393fe9dbee2263d60..5f354c275b973b3cfdd344bf1108150d664af585 100644 --- a/examples/performance/server/packet-counter-collector.go +++ b/examples/performance/server/packet-counter-collector.go @@ -5,7 +5,7 @@ import ( "log" "time" - "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc" + grpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/flow" "github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow" "github.com/paulbellamy/ratecounter" ) diff --git a/examples/systemd/netobserv-ebpf-agent b/examples/systemd/netobserv-ebpf-agent index 716e465106dab899d7e224bc224a04aa524ab589..585ec9f39d717fe7f9e282a2bba5b3dba9538957 100644 --- a/examples/systemd/netobserv-ebpf-agent +++ b/examples/systemd/netobserv-ebpf-agent @@ -3,5 +3,5 @@ # /etc/default/netobserv-ebpf-agent DIRECTION=both -FLOWS_TARGET_HOST=127.0.0.1 -FLOWS_TARGET_PORT=9999 +HOST=127.0.0.1 +PORT=9999 diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index e8bf27517e85f8f9f7ab80c73280481a1126ac4b..00329ed089515e0785015df1ec13d1d78308eebb 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -135,6 +135,9 @@ type ebpfFlowFetcher interface { func FlowsAgent(cfg *Config) (*Flows, error) { alog.Info("initializing Flows agent") + // manage deprecated configs + manageDeprecatedConfigs(cfg) + // configure informer for new interfaces var informer = configureInformer(cfg, alog) @@ -286,16 +289,16 @@ func buildFlowExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*fl case "direct-flp": return buildDirectFLPExporter(cfg) default: - return nil, fmt.Errorf("wrong export type %s. Admitted values are grpc, kafka", cfg.Export) + return nil, fmt.Errorf("wrong flow export type %s", cfg.Export) } } func buildGRPCExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*flow.Record], error) { - if cfg.TargetHost == "" || cfg.TargetPort == 0 { + if cfg.Host == "" || cfg.Port == 0 { return nil, fmt.Errorf("missing target host or port: %s:%d", - cfg.TargetHost, cfg.TargetPort) + cfg.Host, cfg.Port) } - grpcExporter, err := exporter.StartGRPCProto(cfg.TargetHost, cfg.TargetPort, cfg.GRPCMessageMaxFlows, m) + grpcExporter, err := exporter.StartGRPCProto(cfg.Host, cfg.Port, cfg.GRPCMessageMaxFlows, m) if err != nil { return nil, err } @@ -361,11 +364,11 @@ func buildKafkaExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*f } func buildIPFIXExporter(cfg *Config, proto string) (node.TerminalFunc[[]*flow.Record], error) { - if cfg.TargetHost == "" || cfg.TargetPort == 0 { + if cfg.Host == "" || cfg.Port == 0 { return nil, fmt.Errorf("missing target host or port: %s:%d", - cfg.TargetHost, cfg.TargetPort) + cfg.Host, cfg.Port) } - ipfix, err := exporter.StartIPFIXExporter(cfg.TargetHost, cfg.TargetPort, proto) + ipfix, err := exporter.StartIPFIXExporter(cfg.Host, cfg.Port, proto) if err != nil { return nil, err } diff --git a/pkg/agent/agent_test.go b/pkg/agent/agent_test.go index 7c2172676447b1359f005fb346432f8189263bed..2768faaa4efed7f0523d5a61de157590e6c90bed 100644 --- a/pkg/agent/agent_test.go +++ b/pkg/agent/agent_test.go @@ -29,10 +29,10 @@ func TestFlowsAgent_InvalidConfigs(t *testing.T) { c: Config{Export: "foo"}, }, { d: "GRPC: missing host", - c: Config{Export: "grpc", TargetPort: 3333}, + c: Config{Export: "grpc", Port: 3333}, }, { d: "GRPC: missing port", - c: Config{Export: "grpc", TargetHost: "flp"}, + c: Config{Export: "grpc", Host: "flp"}, }, { d: "Kafka: missing brokers", c: Config{Export: "kafka"}, diff --git a/pkg/agent/config.go b/pkg/agent/config.go index 5b3907ef4ae5f79dfe6c21a53163242a1186c203..2e408befa111a6e7869b000a34767d8ef5ed78ca 100644 --- a/pkg/agent/config.go +++ b/pkg/agent/config.go @@ -2,8 +2,12 @@ package agent import ( "time" + + "github.com/sirupsen/logrus" ) +var clog = logrus.WithField("component", "config") + const ( ListenPoll = "poll" ListenWatch = "watch" @@ -34,14 +38,15 @@ type Config struct { // in the AgentID field of each flow. Accepted values are: any (default), ipv4, ipv6. // If the AgentIP configuration property is set, this property has no effect. AgentIPType string `env:"AGENT_IP_TYPE" envDefault:"any"` - // Export selects the flows' exporter protocol. Accepted values are: grpc (default), kafka, - // ipfix+udp, ipfix+tcp or direct-flp. + // Export selects the exporter protocol. + // Accepted values for Flows are: grpc (default), kafka, ipfix+udp, ipfix+tcp or direct-flp. + // Accepted values for Packets are: grpc (default) or tcp Export string `env:"EXPORT" envDefault:"grpc"` - // TargetHost is the host name or IP of the target Flow collector, when the EXPORT variable is + // Host is the host name or IP of the Flow collector, when the EXPORT variable is // set to "grpc" - TargetHost string `env:"FLOWS_TARGET_HOST"` - // TargetPort is the port the target Flow collector, when the EXPORT variable is set to "grpc" - TargetPort int `env:"FLOWS_TARGET_PORT"` + Host string `env:"HOST"` + // Port is the port the Flow collector, when the EXPORT variable is set to "grpc" + Port int `env:"PORT"` // GRPCMessageMaxFlows specifies the limit, in number of flows, of each GRPC message. Messages // larger than that number will be split and submitted sequentially. GRPCMessageMaxFlows int `env:"GRPC_MESSAGE_MAX_FLOWS" envDefault:"10000"` @@ -164,8 +169,6 @@ type Config struct { // PCAFilters set the filters to determine packets to filter using Packet Capture Agent (PCA). It is a comma separated set. // The format is [protocol], [port number] Example: PCA_FILTER = "tcp,80". Currently, we support 'tcp','udp','sctp' for protocol. PCAFilters string `env:"PCA_FILTER"` - // PCAServerPort is the port PCA Server starts at, when ENABLE_PCA variable is set to true. - PCAServerPort int `env:"PCA_SERVER_PORT" envDefault:"9990"` // MetricsEnable enables http server to collect ebpf agent metrics, default is false. MetricsEnable bool `env:"METRICS_ENABLE" envDefault:"false"` // MetricsServerAddress is the address of the server that collects ebpf agent metrics. @@ -174,4 +177,30 @@ type Config struct { MetricsPort int `env:"METRICS_SERVER_PORT" envDefault:"9090"` // MetricsPrefix is the prefix of the metrics that are sent to the server. MetricsPrefix string `env:"METRICS_PREFIX" envDefault:"ebpf_agent_"` + + /* Deprecated configs are listed below this line + * See manageDeprecatedConfigs function for details + */ + + // Deprecated FlowsTargetHost replaced by TargetHost + FlowsTargetHost string `env:"FLOWS_TARGET_HOST"` + // Deprecated FlowsTargetPort replaced by TargetPort + FlowsTargetPort int `env:"FLOWS_TARGET_PORT"` + // Deprecated PCAServerPort replaced by TargetPort + PCAServerPort int `env:"PCA_SERVER_PORT"` +} + +func manageDeprecatedConfigs(cfg *Config) { + if len(cfg.FlowsTargetHost) != 0 { + clog.Infof("Using deprecated FlowsTargetHost %s", cfg.FlowsTargetHost) + cfg.Host = cfg.FlowsTargetHost + } + + if cfg.FlowsTargetPort != 0 { + clog.Infof("Using deprecated FlowsTargetPort %d", cfg.FlowsTargetPort) + cfg.Port = cfg.FlowsTargetPort + } else if cfg.PCAServerPort != 0 { + clog.Infof("Using deprecated PCAServerPort %d", cfg.PCAServerPort) + cfg.Port = cfg.PCAServerPort + } } diff --git a/pkg/agent/packets_agent.go b/pkg/agent/packets_agent.go index 619fe33d326f150e1bbffa754c15a4485e626b0c..cea30eeb9537252894e7d78f404e93a9352785a7 100644 --- a/pkg/agent/packets_agent.go +++ b/pkg/agent/packets_agent.go @@ -49,6 +49,9 @@ type ebpfPacketFetcher interface { func PacketsAgent(cfg *Config) (*Packets, error) { plog.Info("initializing Packets agent") + // manage deprecated configs + manageDeprecatedConfigs(cfg) + // configure informer for new interfaces informer := configureInformer(cfg, plog) @@ -131,18 +134,27 @@ func packetsAgent(cfg *Config, }, nil } -func buildPacketExporter(cfg *Config) (node.TerminalFunc[[]*flow.PacketRecord], error) { - if cfg.PCAServerPort == 0 { - return nil, fmt.Errorf("missing PCA Server port: %d", - cfg.PCAServerPort) +func buildGRPCPacketExporter(cfg *Config) (node.TerminalFunc[[]*flow.PacketRecord], error) { + if cfg.Host == "" || cfg.Port == 0 { + return nil, fmt.Errorf("missing target host or port for PCA: %s:%d", + cfg.Host, cfg.Port) } - pcapStreamer, err := exporter.StartPCAPSend(fmt.Sprintf("%d", cfg.PCAServerPort)) + plog.Info("starting gRPC Packet send") + pcapStreamer, err := exporter.StartGRPCPacketSend(cfg.Host, cfg.Port) if err != nil { return nil, err } - return pcapStreamer.ExportFlows, err + return pcapStreamer.ExportGRPCPackets, nil +} +func buildPacketExporter(cfg *Config) (node.TerminalFunc[[]*flow.PacketRecord], error) { + switch cfg.Export { + case "grpc": + return buildGRPCPacketExporter(cfg) + default: + return nil, fmt.Errorf("unsupported packet export type %s", cfg.Export) + } } // Run a Packets agent. The function will keep running in the same thread diff --git a/pkg/exporter/grpc_packets.go b/pkg/exporter/grpc_packets.go new file mode 100644 index 0000000000000000000000000000000000000000..ea820d8621e2ff2220738f2d67f4de3d92fb091c --- /dev/null +++ b/pkg/exporter/grpc_packets.go @@ -0,0 +1,82 @@ +package exporter + +import ( + "context" + "fmt" + + "github.com/netobserv/netobserv-ebpf-agent/pkg/flow" + grpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/packet" + "github.com/netobserv/netobserv-ebpf-agent/pkg/pbpacket" + + "github.com/google/gopacket" + "github.com/sirupsen/logrus" + "google.golang.org/protobuf/types/known/anypb" +) + +type GRPCPacketProto struct { + hostIP string + hostPort int + clientConn *grpc.ClientConnection +} + +var gplog = logrus.WithField("component", "packet/GRPCPackets") + +// WritePacket writes the given packet data out to gRPC. +func writeGRPCPacket(ci gopacket.CaptureInfo, data []byte, conn *grpc.ClientConnection) error { + if ci.CaptureLength != len(data) { + return fmt.Errorf("capture length %d does not match data length %d", ci.CaptureLength, len(data)) + } + if ci.CaptureLength > ci.Length { + return fmt.Errorf("invalid capture info %+v: capture length > length", ci) + } + gplog.Debugf("Sending Packet to client. Length: %d", len(data)) + b, err := GetPacketHeader(ci) + if err != nil { + return fmt.Errorf("error writing packet header: %w", err) + } + // write 16 byte packet header & data all at once + _, err = conn.Client().Send(context.TODO(), &pbpacket.Packet{ + Pcap: &anypb.Any{ + Value: append(b, data...), + }, + }) + return err +} + +func StartGRPCPacketSend(hostIP string, hostPort int) (*GRPCPacketProto, error) { + clientConn, err := grpc.ConnectClient(hostIP, hostPort) + if err != nil { + return nil, err + } + return &GRPCPacketProto{ + hostIP: hostIP, + hostPort: hostPort, + clientConn: clientConn, + }, nil +} + +func (p *GRPCPacketProto) ExportGRPCPackets(in <-chan []*flow.PacketRecord) { + for packetRecord := range in { + var errs []error + for _, packet := range packetRecord { + packetStream := packet.Stream + packetTimestamp := packet.Time + if len(packetStream) != 0 { + captureInfo := gopacket.CaptureInfo{ + Timestamp: packetTimestamp, + CaptureLength: len(packetStream), + Length: len(packetStream), + } + if err := writeGRPCPacket(captureInfo, packetStream, p.clientConn); err != nil { + errs = append(errs, err) + } + } + } + if len(errs) != 0 { + gplog.Errorf("%d errors while sending packets:\n%s", len(errs), errs) + } + } + if err := p.clientConn.Close(); err != nil { + gplog.WithError(err).Warn("couldn't close packet export client") + } +} diff --git a/pkg/exporter/grpc_proto.go b/pkg/exporter/grpc_proto.go index 159fdc5bbd3bcf31effb6f11429d9ffca2c3624c..77277592df14438d99a5893d1cf116981e501824 100644 --- a/pkg/exporter/grpc_proto.go +++ b/pkg/exporter/grpc_proto.go @@ -4,7 +4,7 @@ import ( "context" "github.com/netobserv/netobserv-ebpf-agent/pkg/flow" - "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc" + grpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/flow" "github.com/netobserv/netobserv-ebpf-agent/pkg/metrics" "github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow" "github.com/netobserv/netobserv-ebpf-agent/pkg/utils" diff --git a/pkg/exporter/grpc_proto_test.go b/pkg/exporter/grpc_proto_test.go index 010e77d22a96a86e2ff438d55f898b40f428afbe..bdaa6bfbc309643ccaee4ad256d154d934a7cb96 100644 --- a/pkg/exporter/grpc_proto_test.go +++ b/pkg/exporter/grpc_proto_test.go @@ -11,7 +11,7 @@ import ( "github.com/mariomac/guara/pkg/test" "github.com/netobserv/netobserv-ebpf-agent/pkg/ebpf" "github.com/netobserv/netobserv-ebpf-agent/pkg/flow" - "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc" + grpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/flow" "github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" diff --git a/pkg/exporter/packets.go b/pkg/exporter/packets.go deleted file mode 100644 index a07d38e8e185d61dc1d5b4966e779a9c3032a890..0000000000000000000000000000000000000000 --- a/pkg/exporter/packets.go +++ /dev/null @@ -1,135 +0,0 @@ -package exporter - -import ( - "encoding/binary" - "fmt" - "net" - - "github.com/google/gopacket" - "github.com/google/gopacket/layers" - - "github.com/netobserv/netobserv-ebpf-agent/pkg/flow" - - "github.com/sirupsen/logrus" -) - -type PCAPStream struct { - hostPort string - clientConn net.Conn -} - -// PCAP Magic number is fixed for each endianness. -const pcapMagicNumber = 0xA1B2C3D4 -const versionMajor = 2 -const versionMinor = 4 -const nanosPerMicro = 1000 - -var plog = logrus.WithField("component", "packet/Packets") - -// Setting Snapshot length to 0 sets it to maximum packet size -var snapshotlen uint32 - -func writePCAPFileHeader(snaplen uint32, linktype layers.LinkType, conn net.Conn) error { - var buf [24]byte - binary.LittleEndian.PutUint32(buf[0:4], pcapMagicNumber) - binary.LittleEndian.PutUint16(buf[4:6], versionMajor) - binary.LittleEndian.PutUint16(buf[6:8], versionMinor) - binary.LittleEndian.PutUint32(buf[16:20], snaplen) - binary.LittleEndian.PutUint32(buf[20:24], uint32(linktype)) - _, err := conn.Write(buf[:]) - if err != nil { - plog.Fatal(err) - } - return err -} - -func writePacketHeader(ci gopacket.CaptureInfo, conn net.Conn) error { - var buf [16]byte - t := ci.Timestamp - if t.IsZero() { - return fmt.Errorf("incoming packet does not have a timestamp. Ignoring packet") - } - secs := t.Unix() - usecs := t.Nanosecond() / nanosPerMicro - binary.LittleEndian.PutUint32(buf[0:4], uint32(secs)) - binary.LittleEndian.PutUint32(buf[4:8], uint32(usecs)) - binary.LittleEndian.PutUint32(buf[8:12], uint32(ci.CaptureLength)) - binary.LittleEndian.PutUint32(buf[12:16], uint32(ci.Length)) - _, err := conn.Write(buf[:]) - if err != nil { - plog.Fatal(err) - } - return err -} - -// WritePacket writes the given packet data out to the file. -func writePacket(ci gopacket.CaptureInfo, data []byte, conn net.Conn) error { - if ci.CaptureLength != len(data) { - return fmt.Errorf("capture length %d does not match data length %d", ci.CaptureLength, len(data)) - } - if ci.CaptureLength > ci.Length { - return fmt.Errorf("invalid capture info %+v: capture length > length", ci) - } - plog.Debugf("Sending Packet to client. Length: %d", len(data)) - //Write 16 byte packet header - if err := writePacketHeader(ci, conn); err != nil { - return fmt.Errorf("error writing packet header: %w", err) - } - - _, err := conn.Write(data) - if err != nil { - plog.Fatal(err) - } - return err -} - -// FIXME: Only after client connects to it, the agent starts collecting and sending packets. -// This behavior needs to be fixed. -func StartPCAPSend(hostPort string) (*PCAPStream, error) { - PORT := ":" + hostPort - l, err := net.Listen("tcp", PORT) - if err != nil { - return nil, err - } - defer l.Close() - clientConn, err := l.Accept() - - if err != nil { - return nil, err - } - - return &PCAPStream{ - hostPort: hostPort, - clientConn: clientConn, - }, nil - -} - -func (p *PCAPStream) ExportFlows(in <-chan []*flow.PacketRecord) { - - //Create handler by opening PCAP stream - Write 24 byte size PCAP File Header - err := writePCAPFileHeader(snapshotlen, layers.LinkTypeEthernet, p.clientConn) - if err != nil { - plog.Fatal(err) - } - for packetRecord := range in { - if len(packetRecord) > 0 { - for _, packet := range packetRecord { - packetStream := packet.Stream - packetTimestamp := packet.Time - if len(packetStream) != 0 { - captureInfo := gopacket.CaptureInfo{ - Timestamp: packetTimestamp, - CaptureLength: len(packetStream), - Length: len(packetStream), - } - err = writePacket(captureInfo, packetStream, p.clientConn) - if err != nil { - plog.Fatal(err) - } - } - } - } - } - -} diff --git a/pkg/exporter/packets_proto.go b/pkg/exporter/packets_proto.go new file mode 100644 index 0000000000000000000000000000000000000000..0fb335cc65b9e8fcb950ce1a4a1d826a9bab35ff --- /dev/null +++ b/pkg/exporter/packets_proto.go @@ -0,0 +1,40 @@ +package exporter + +import ( + "encoding/binary" + "fmt" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" +) + +// PCAP Magic number is fixed for each endianness. +const pcapMagicNumber = 0xA1B2C3D4 +const versionMajor = 2 +const versionMinor = 4 +const nanosPerMicro = 1000 + +func GetPCAPFileHeader(snaplen uint32, linktype layers.LinkType) []byte { + var buf [24]byte + binary.LittleEndian.PutUint32(buf[0:4], pcapMagicNumber) + binary.LittleEndian.PutUint16(buf[4:6], versionMajor) + binary.LittleEndian.PutUint16(buf[6:8], versionMinor) + binary.LittleEndian.PutUint32(buf[16:20], snaplen) + binary.LittleEndian.PutUint32(buf[20:24], uint32(linktype)) + return buf[:] +} + +func GetPacketHeader(ci gopacket.CaptureInfo) ([]byte, error) { + var buf [16]byte + t := ci.Timestamp + if t.IsZero() { + return nil, fmt.Errorf("incoming packet does not have a timestamp. Ignoring packet") + } + secs := t.Unix() + usecs := t.Nanosecond() / nanosPerMicro + binary.LittleEndian.PutUint32(buf[0:4], uint32(secs)) + binary.LittleEndian.PutUint32(buf[4:8], uint32(usecs)) + binary.LittleEndian.PutUint32(buf[8:12], uint32(ci.CaptureLength)) + binary.LittleEndian.PutUint32(buf[12:16], uint32(ci.Length)) + return buf[:], nil +} diff --git a/pkg/grpc/client.go b/pkg/grpc/flow/client.go similarity index 88% rename from pkg/grpc/client.go rename to pkg/grpc/flow/client.go index 5d58857304054a453cffd6ad3d3824c13a287620..12f117f080e6548c76dddf0e70620f81792937c5 100644 --- a/pkg/grpc/client.go +++ b/pkg/grpc/flow/client.go @@ -1,5 +1,5 @@ -// Package grpc provides the basic interfaces to build a gRPC+Protobuf flows client & server -package grpc +// Package flowgrpc provides the basic interfaces to build a gRPC+Protobuf flows client & server +package flowgrpc import ( "github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow" diff --git a/pkg/grpc/grpc_test.go b/pkg/grpc/flow/grpc_test.go similarity index 99% rename from pkg/grpc/grpc_test.go rename to pkg/grpc/flow/grpc_test.go index ff3824ca3c6df375dc320e70eaa61ef3c8fd5413..791319a65a53002372c109ebc30d7568ec638502 100644 --- a/pkg/grpc/grpc_test.go +++ b/pkg/grpc/flow/grpc_test.go @@ -1,4 +1,4 @@ -package grpc +package flowgrpc import ( "context" diff --git a/pkg/grpc/server.go b/pkg/grpc/flow/server.go similarity index 99% rename from pkg/grpc/server.go rename to pkg/grpc/flow/server.go index 90edada506ffa4a15a99cb7b1ff2d465994a427b..ca5ee8daef128fa291754ee9fc5f46e656f3692f 100644 --- a/pkg/grpc/server.go +++ b/pkg/grpc/flow/server.go @@ -1,4 +1,4 @@ -package grpc +package flowgrpc import ( "context" diff --git a/pkg/grpc/packet/client.go b/pkg/grpc/packet/client.go new file mode 100644 index 0000000000000000000000000000000000000000..3af22e131dfc3bb87bba2236a43beab7021eedd8 --- /dev/null +++ b/pkg/grpc/packet/client.go @@ -0,0 +1,37 @@ +// Package pktgrpc provides the basic interfaces to build a gRPC+Protobuf packet client & server +package pktgrpc + +import ( + "github.com/netobserv/netobserv-ebpf-agent/pkg/pbpacket" + "github.com/netobserv/netobserv-ebpf-agent/pkg/utils" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// ClientConnection wraps a gRPC+protobuf connection +type ClientConnection struct { + client pbpacket.CollectorClient + conn *grpc.ClientConn +} + +func ConnectClient(hostIP string, hostPort int) (*ClientConnection, error) { + // TODO: allow configuring some options (keepalive, backoff...) + socket := utils.GetSocket(hostIP, hostPort) + conn, err := grpc.Dial(socket, + grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, err + } + return &ClientConnection{ + client: pbpacket.NewCollectorClient(conn), + conn: conn, + }, nil +} + +func (cp *ClientConnection) Client() pbpacket.CollectorClient { + return cp.client +} + +func (cp *ClientConnection) Close() error { + return cp.conn.Close() +} diff --git a/pkg/grpc/packet/grpc_test.go b/pkg/grpc/packet/grpc_test.go new file mode 100644 index 0000000000000000000000000000000000000000..7282c2612164938dcde913c0966387b667f82e4d --- /dev/null +++ b/pkg/grpc/packet/grpc_test.go @@ -0,0 +1,88 @@ +package pktgrpc + +import ( + "context" + "testing" + "time" + + "github.com/netobserv/netobserv-ebpf-agent/pkg/pbpacket" + + "github.com/mariomac/guara/pkg/test" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/anypb" +) + +const timeout = 5 * time.Second + +func TestGRPCCommunication(t *testing.T) { + port, err := test.FreeTCPPort() + require.NoError(t, err) + serverOut := make(chan *pbpacket.Packet) + _, err = StartCollector(port, serverOut) + require.NoError(t, err) + cc, err := ConnectClient("127.0.0.1", port) + require.NoError(t, err) + client := cc.Client() + + value := []byte("test") + go func() { + _, err = client.Send(context.Background(), + &pbpacket.Packet{ + Pcap: &anypb.Any{ + Value: value, + }, + }) + require.NoError(t, err) + }() + + var rs *pbpacket.Packet + select { + case rs = <-serverOut: + case <-time.After(timeout): + require.Fail(t, "timeout waiting for packet") + } + assert.NotNil(t, rs.Pcap) + assert.EqualValues(t, value, rs.Pcap.Value) + select { + case rs = <-serverOut: + assert.Failf(t, "shouldn't have received any packet", "Got: %#v", rs) + default: + //ok! + } +} + +func TestConstructorOptions(t *testing.T) { + port, err := test.FreeTCPPort() + require.NoError(t, err) + intercepted := make(chan struct{}) + // Override the default GRPC collector to verify that StartCollector is applying the + // passed options + _, err = StartCollector(port, make(chan *pbpacket.Packet), + WithGRPCServerOptions(grpc.UnaryInterceptor(func( + ctx context.Context, + req interface{}, + info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler, + ) (resp interface{}, err error) { + close(intercepted) + return handler(ctx, req) + }))) + require.NoError(t, err) + cc, err := ConnectClient("127.0.0.1", port) + require.NoError(t, err) + client := cc.Client() + + go func() { + _, err = client.Send(context.Background(), + &pbpacket.Packet{Pcap: &anypb.Any{}}) + require.NoError(t, err) + }() + + select { + case <-intercepted: + case <-time.After(timeout): + require.Fail(t, "timeout waiting for unary interceptor to work") + } +} diff --git a/pkg/grpc/packet/server.go b/pkg/grpc/packet/server.go new file mode 100644 index 0000000000000000000000000000000000000000..272814857d2d59936f22a371ae4e8ff5a2d5caff --- /dev/null +++ b/pkg/grpc/packet/server.go @@ -0,0 +1,77 @@ +package pktgrpc + +import ( + "context" + "fmt" + "net" + + "github.com/netobserv/netobserv-ebpf-agent/pkg/pbpacket" + + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" +) + +// CollectorServer wraps a Flow Collector connection & session +type CollectorServer struct { + grpcServer *grpc.Server +} + +type collectorOptions struct { + grpcServerOptions []grpc.ServerOption +} + +// CollectorOption allows overriding the default configuration of the CollectorServer instance. +// Use them in the StartCollector function. +type CollectorOption func(options *collectorOptions) + +func WithGRPCServerOptions(options ...grpc.ServerOption) CollectorOption { + return func(copt *collectorOptions) { + copt.grpcServerOptions = options + } +} + +// StartCollector listens in background for gRPC+Protobuf flows in the given port, and forwards each +// set of *pbpacket.Packet by the provided channel. +func StartCollector( + port int, pktForwarder chan<- *pbpacket.Packet, options ...CollectorOption, +) (*CollectorServer, error) { + copts := collectorOptions{} + for _, opt := range options { + opt(&copts) + } + + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + return nil, err + } + grpcServer := grpc.NewServer(copts.grpcServerOptions...) + pbpacket.RegisterCollectorServer(grpcServer, &collectorAPI{ + pktForwarder: pktForwarder, + }) + reflection.Register(grpcServer) + go func() { + if err := grpcServer.Serve(lis); err != nil { + panic("error connecting to server: " + err.Error()) + } + }() + return &CollectorServer{ + grpcServer: grpcServer, + }, nil +} + +func (c *CollectorServer) Close() error { + c.grpcServer.Stop() + return nil +} + +type collectorAPI struct { + pbpacket.UnimplementedCollectorServer + pktForwarder chan<- *pbpacket.Packet +} + +var okReply = &pbpacket.CollectorReply{} + +func (c *collectorAPI) Send(_ context.Context, pkts *pbpacket.Packet) (*pbpacket.CollectorReply, error) { + c.pktForwarder <- pkts + return okReply, nil +} diff --git a/pkg/pbpacket/packet.pb.go b/pkg/pbpacket/packet.pb.go new file mode 100644 index 0000000000000000000000000000000000000000..38712fdf839e268d301ac52088850475bf31a9e0 --- /dev/null +++ b/pkg/pbpacket/packet.pb.go @@ -0,0 +1,208 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.0 +// protoc v3.19.6 +// source: proto/packet.proto + +package pbpacket + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + anypb "google.golang.org/protobuf/types/known/anypb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// The request message containing the Packet +type Packet struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Pcap *anypb.Any `protobuf:"bytes,1,opt,name=pcap,proto3" json:"pcap,omitempty"` +} + +func (x *Packet) Reset() { + *x = Packet{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_packet_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Packet) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Packet) ProtoMessage() {} + +func (x *Packet) ProtoReflect() protoreflect.Message { + mi := &file_proto_packet_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Packet.ProtoReflect.Descriptor instead. +func (*Packet) Descriptor() ([]byte, []int) { + return file_proto_packet_proto_rawDescGZIP(), []int{0} +} + +func (x *Packet) GetPcap() *anypb.Any { + if x != nil { + return x.Pcap + } + return nil +} + +// intentionally empty +type CollectorReply struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *CollectorReply) Reset() { + *x = CollectorReply{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_packet_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CollectorReply) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CollectorReply) ProtoMessage() {} + +func (x *CollectorReply) ProtoReflect() protoreflect.Message { + mi := &file_proto_packet_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CollectorReply.ProtoReflect.Descriptor instead. +func (*CollectorReply) Descriptor() ([]byte, []int) { + return file_proto_packet_proto_rawDescGZIP(), []int{1} +} + +var File_proto_packet_proto protoreflect.FileDescriptor + +var file_proto_packet_proto_rawDesc = []byte{ + 0x0a, 0x12, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x70, 0x62, 0x70, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x1a, 0x19, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, + 0x61, 0x6e, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x32, 0x0a, 0x06, 0x50, 0x61, 0x63, + 0x6b, 0x65, 0x74, 0x12, 0x28, 0x0a, 0x04, 0x70, 0x63, 0x61, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x04, 0x70, 0x63, 0x61, 0x70, 0x22, 0x10, 0x0a, + 0x0e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x32, + 0x41, 0x0a, 0x09, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x12, 0x34, 0x0a, 0x04, + 0x53, 0x65, 0x6e, 0x64, 0x12, 0x10, 0x2e, 0x70, 0x62, 0x70, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x2e, + 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x1a, 0x18, 0x2e, 0x70, 0x62, 0x70, 0x61, 0x63, 0x6b, 0x65, + 0x74, 0x2e, 0x43, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x70, 0x6c, 0x79, + 0x22, 0x00, 0x42, 0x0c, 0x5a, 0x0a, 0x2e, 0x2f, 0x70, 0x62, 0x70, 0x61, 0x63, 0x6b, 0x65, 0x74, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_proto_packet_proto_rawDescOnce sync.Once + file_proto_packet_proto_rawDescData = file_proto_packet_proto_rawDesc +) + +func file_proto_packet_proto_rawDescGZIP() []byte { + file_proto_packet_proto_rawDescOnce.Do(func() { + file_proto_packet_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_packet_proto_rawDescData) + }) + return file_proto_packet_proto_rawDescData +} + +var file_proto_packet_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_proto_packet_proto_goTypes = []interface{}{ + (*Packet)(nil), // 0: pbpacket.Packet + (*CollectorReply)(nil), // 1: pbpacket.CollectorReply + (*anypb.Any)(nil), // 2: google.protobuf.Any +} +var file_proto_packet_proto_depIdxs = []int32{ + 2, // 0: pbpacket.Packet.pcap:type_name -> google.protobuf.Any + 0, // 1: pbpacket.Collector.Send:input_type -> pbpacket.Packet + 1, // 2: pbpacket.Collector.Send:output_type -> pbpacket.CollectorReply + 2, // [2:3] is the sub-list for method output_type + 1, // [1:2] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_proto_packet_proto_init() } +func file_proto_packet_proto_init() { + if File_proto_packet_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_proto_packet_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Packet); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_packet_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CollectorReply); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_proto_packet_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_proto_packet_proto_goTypes, + DependencyIndexes: file_proto_packet_proto_depIdxs, + MessageInfos: file_proto_packet_proto_msgTypes, + }.Build() + File_proto_packet_proto = out.File + file_proto_packet_proto_rawDesc = nil + file_proto_packet_proto_goTypes = nil + file_proto_packet_proto_depIdxs = nil +} diff --git a/pkg/pbpacket/packet_grpc.pb.go b/pkg/pbpacket/packet_grpc.pb.go new file mode 100644 index 0000000000000000000000000000000000000000..7ec98b708fe0b5b71ee5ffa8d23f87d1b761928a --- /dev/null +++ b/pkg/pbpacket/packet_grpc.pb.go @@ -0,0 +1,105 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.19.6 +// source: proto/packet.proto + +package pbpacket + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// CollectorClient is the client API for Collector service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type CollectorClient interface { + Send(ctx context.Context, in *Packet, opts ...grpc.CallOption) (*CollectorReply, error) +} + +type collectorClient struct { + cc grpc.ClientConnInterface +} + +func NewCollectorClient(cc grpc.ClientConnInterface) CollectorClient { + return &collectorClient{cc} +} + +func (c *collectorClient) Send(ctx context.Context, in *Packet, opts ...grpc.CallOption) (*CollectorReply, error) { + out := new(CollectorReply) + err := c.cc.Invoke(ctx, "/pbpacket.Collector/Send", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// CollectorServer is the server API for Collector service. +// All implementations must embed UnimplementedCollectorServer +// for forward compatibility +type CollectorServer interface { + Send(context.Context, *Packet) (*CollectorReply, error) + mustEmbedUnimplementedCollectorServer() +} + +// UnimplementedCollectorServer must be embedded to have forward compatible implementations. +type UnimplementedCollectorServer struct { +} + +func (UnimplementedCollectorServer) Send(context.Context, *Packet) (*CollectorReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method Send not implemented") +} +func (UnimplementedCollectorServer) mustEmbedUnimplementedCollectorServer() {} + +// UnsafeCollectorServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to CollectorServer will +// result in compilation errors. +type UnsafeCollectorServer interface { + mustEmbedUnimplementedCollectorServer() +} + +func RegisterCollectorServer(s grpc.ServiceRegistrar, srv CollectorServer) { + s.RegisterService(&Collector_ServiceDesc, srv) +} + +func _Collector_Send_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Packet) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(CollectorServer).Send(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/pbpacket.Collector/Send", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(CollectorServer).Send(ctx, req.(*Packet)) + } + return interceptor(ctx, in, info, handler) +} + +// Collector_ServiceDesc is the grpc.ServiceDesc for Collector service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Collector_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "pbpacket.Collector", + HandlerType: (*CollectorServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Send", + Handler: _Collector_Send_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "proto/packet.proto", +} diff --git a/proto/README.md b/proto/README.md new file mode 100644 index 0000000000000000000000000000000000000000..ea7268e61783660ad69cfbaf329f296304796330 --- /dev/null +++ b/proto/README.md @@ -0,0 +1,23 @@ +# Protobuf + gRPC + +Golang files are automatically updated when running `make generate` according to `.proto` files content. + +# Development + +## Manually update flow gRPC + +Run the following commands to update `flow.pb.go` and `flow_grpc.pb.go`: + +```bash +protoc --go_out=./pkg/ ./proto/flow.proto +protoc --go-grpc_out=./pkg/ ./proto/flow.proto +``` + +## Manually update packet gRPC + +Run the following commands to update `packet.pb.go` and `packet_grpc.pb.go`: + +```bash +protoc --go_out=./pkg/ ./proto/packet.proto +protoc --go-grpc_out=./pkg/ ./proto/packet.proto +``` \ No newline at end of file diff --git a/proto/packet.proto b/proto/packet.proto new file mode 100644 index 0000000000000000000000000000000000000000..d50fd389d7e886839078aa0b803ebea1e6df2c2f --- /dev/null +++ b/proto/packet.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +option go_package = "./pbpacket"; + +package pbpacket; + +import "google/protobuf/any.proto"; + +service Collector { + rpc Send (Packet) returns (CollectorReply) {} +} + +// The request message containing the Packet +message Packet { + google.protobuf.Any pcap = 1; +} + +// intentionally empty +message CollectorReply {} diff --git a/scripts/agent.yml b/scripts/agent.yml index 1b2499a09f871606e2d7c666d9f6ebe3ed4ae134..c7ad86c23e5564cc38ac727d1768ab74c7b0097c 100644 --- a/scripts/agent.yml +++ b/scripts/agent.yml @@ -34,11 +34,11 @@ spec: value: 200ms - name: LOG_LEVEL value: debug - - name: FLOWS_TARGET_HOST + - name: HOST valueFrom: fieldRef: fieldPath: status.hostIP - - name: FLOWS_TARGET_PORT + - name: PORT value: "9999" - name: ENABLE_RTT value: "true" diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest/ingest_grpc.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest/ingest_grpc.go index 754a33abb171190ba0f69a478cae215bbee5d6c2..db9750b02d463a7c6299f3e157c58d1babc2c86f 100644 --- a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest/ingest_grpc.go +++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest/ingest_grpc.go @@ -9,7 +9,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/operational" "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils" "github.com/netobserv/netobserv-ebpf-agent/pkg/decode" - "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc" + grpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/flow" "github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow" "github.com/sirupsen/logrus"