diff --git a/README.md b/README.md index 41cd9e6ee672e571a4464c721e74081408787186..31fb761bfa1f81f4910b6e665e501e3bca3b479d 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,12 @@ egress flows on a Linux host (required a Kernel 4.18+ with eBPF enabled). make build ``` +To build the agent image and push it to your Docker / Quay repository, run: + +```bash +IMG=quay.io/myaccount/netobserv-ebpf-agent:dev make image-build image-push +``` + ## How to configure The eBPF Agent is configured by means of environment variables. Check the diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index c91eaee61405da17d1b2e7c5e2a233238df3cbf9..cef54b45766abfdb43eb6105d1f72541eb408f3c 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -100,6 +100,14 @@ func FlowsAgent(cfg *Config) (*Flows, error) { return nil, fmt.Errorf("wrong Kafka compression value %s. Admitted values are "+ "none, gzip, snappy, lz4, zstd: %w", cfg.KafkaCompression, err) } + transport := kafkago.Transport{} + if cfg.KafkaEnableTLS { + tlsConfig, err := buildTLSConfig(cfg) + if err != nil { + return nil, err + } + transport.TLS = tlsConfig + } exportFunc = (&exporter.KafkaJSON{ Writer: &kafkago.Writer{ Addr: kafkago.TCP(cfg.KafkaBrokers...), @@ -114,6 +122,7 @@ func FlowsAgent(cfg *Config) (*Flows, error) { BatchBytes: cfg.KafkaBatchBytes, Async: cfg.KafkaAsync, Compression: compression, + Transport: &transport, }, }).ExportFlows default: diff --git a/pkg/agent/config.go b/pkg/agent/config.go index b79687eb4489e8975c038b55b034baf9b62f4f40..e0ca50d9c4fbcac6979c44000deabe75e03fc9c7 100644 --- a/pkg/agent/config.go +++ b/pkg/agent/config.go @@ -67,6 +67,16 @@ type Config struct { // KafkaCompression sets the compression codec to be used to compress messages. The accepted // values are: none (default), gzip, snappy, lz4, zstd. KafkaCompression string `env:"KAFKA_COMPRESSION" envDefault:"none"` + // KafkaEnableTLS set true to enable TLS + KafkaEnableTLS bool `env:"KAFKA_ENABLE_TLS" envDefault:"false"` + // KafkaTLSInsecureSkipVerify skips server certificate verification in TLS connections + KafkaTLSInsecureSkipVerify bool `env:"KAFKA_TLS_INSECURE_SKIP_VERIFY" envDefault:"false"` + // KafkaTLSCACertPath is the path to the Kafka server certificate for TLS connections + KafkaTLSCACertPath string `env:"KAFKA_TLS_CA_CERT_PATH"` + // KafkaTLSUserCertPath is the path to the user (client) certificate for mTLS connections + KafkaTLSUserCertPath string `env:"KAFKA_TLS_USER_CERT_PATH"` + // KafkaTLSUserKeyPath is the path to the user (client) private key for mTLS connections + KafkaTLSUserKeyPath string `env:"KAFKA_TLS_USER_KEY_PATH"` // ProfilePort sets the listening port for Go's Pprof tool. If it is not set, profile is disabled ProfilePort int `env:"PROFILE_PORT"` } diff --git a/pkg/agent/tls.go b/pkg/agent/tls.go new file mode 100644 index 0000000000000000000000000000000000000000..24af3d8f4eb267330153930bc50bc7da0b2735c6 --- /dev/null +++ b/pkg/agent/tls.go @@ -0,0 +1,39 @@ +package agent + +import ( + "crypto/tls" + "crypto/x509" + "io/ioutil" +) + +func buildTLSConfig(cfg *Config) (*tls.Config, error) { + tlsConfig := &tls.Config{ + InsecureSkipVerify: cfg.KafkaTLSInsecureSkipVerify, + } + if cfg.KafkaTLSCACertPath != "" { + caCert, err := ioutil.ReadFile(cfg.KafkaTLSCACertPath) + if err != nil { + return nil, err + } + tlsConfig.RootCAs = x509.NewCertPool() + tlsConfig.RootCAs.AppendCertsFromPEM(caCert) + + if cfg.KafkaTLSUserCertPath != "" && cfg.KafkaTLSUserKeyPath != "" { + userCert, err := ioutil.ReadFile(cfg.KafkaTLSUserCertPath) + if err != nil { + return nil, err + } + userKey, err := ioutil.ReadFile(cfg.KafkaTLSUserKeyPath) + if err != nil { + return nil, err + } + pair, err := tls.X509KeyPair([]byte(userCert), []byte(userKey)) + if err != nil { + return nil, err + } + tlsConfig.Certificates = []tls.Certificate{pair} + } + return tlsConfig, nil + } + return nil, nil +}