Skip to content
Snippets Groups Projects
Unverified Commit 036919ce authored by Joel Takvorian's avatar Joel Takvorian Committed by GitHub
Browse files

Merge pull request #44 from jotak/kafka-tls

NETOBSERV-397 Implement TLS for Kafka connection in the agent
parents 83db69e7 6b2189ba
No related branches found
No related tags found
No related merge requests found
......@@ -17,6 +17,12 @@ egress flows on a Linux host (required a Kernel 4.18+ with eBPF enabled).
make build
```
To build the agent image and push it to your Docker / Quay repository, run:
```bash
IMG=quay.io/myaccount/netobserv-ebpf-agent:dev make image-build image-push
```
## How to configure
The eBPF Agent is configured by means of environment variables. Check the
......
......@@ -46,5 +46,10 @@ The following environment variables are available to configure the NetObserv eBF
means that errors are ignored since the caller will not receive the returned value.
* `KAFKA_COMPRESSION` (default: `none`). Compression codec to be used to compress messages. Accepted
values: `none`, `gzip`, `snappy`, `lz4`, `zstd`.
* `KAFKA_ENABLE_TLS` (default: false). If `true`, enable TLS encryption for Kafka messages. The following settings are used only when TLS is enabled:
* `KAFKA_TLS_INSECURE_SKIP_VERIFY` (default: false). Skips server certificate verification in TLS connections.
* `KAFKA_TLS_CA_CERT_PATH` (default: unset). Path to the Kafka server certificate for TLS connections.
* `KAFKA_TLS_USER_CERT_PATH` (default: unset). Path to the user (client) certificate for mutual TLS connections.
* `KAFKA_TLS_USER_KEY_PATH` (default: unset). Path to the user (client) private key for mutual TLS connections.
* `PROFILE_PORT` (default: unset). Sets the listening port for [Go's Pprof tool](https://pkg.go.dev/net/http/pprof).
If it is not set, profile is disabled.
......@@ -100,6 +100,14 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
return nil, fmt.Errorf("wrong Kafka compression value %s. Admitted values are "+
"none, gzip, snappy, lz4, zstd: %w", cfg.KafkaCompression, err)
}
transport := kafkago.Transport{}
if cfg.KafkaEnableTLS {
tlsConfig, err := buildTLSConfig(cfg)
if err != nil {
return nil, err
}
transport.TLS = tlsConfig
}
exportFunc = (&exporter.KafkaJSON{
Writer: &kafkago.Writer{
Addr: kafkago.TCP(cfg.KafkaBrokers...),
......@@ -114,6 +122,7 @@ func FlowsAgent(cfg *Config) (*Flows, error) {
BatchBytes: cfg.KafkaBatchBytes,
Async: cfg.KafkaAsync,
Compression: compression,
Transport: &transport,
},
}).ExportFlows
default:
......
......@@ -67,6 +67,16 @@ type Config struct {
// KafkaCompression sets the compression codec to be used to compress messages. The accepted
// values are: none (default), gzip, snappy, lz4, zstd.
KafkaCompression string `env:"KAFKA_COMPRESSION" envDefault:"none"`
// KafkaEnableTLS set true to enable TLS
KafkaEnableTLS bool `env:"KAFKA_ENABLE_TLS" envDefault:"false"`
// KafkaTLSInsecureSkipVerify skips server certificate verification in TLS connections
KafkaTLSInsecureSkipVerify bool `env:"KAFKA_TLS_INSECURE_SKIP_VERIFY" envDefault:"false"`
// KafkaTLSCACertPath is the path to the Kafka server certificate for TLS connections
KafkaTLSCACertPath string `env:"KAFKA_TLS_CA_CERT_PATH"`
// KafkaTLSUserCertPath is the path to the user (client) certificate for mTLS connections
KafkaTLSUserCertPath string `env:"KAFKA_TLS_USER_CERT_PATH"`
// KafkaTLSUserKeyPath is the path to the user (client) private key for mTLS connections
KafkaTLSUserKeyPath string `env:"KAFKA_TLS_USER_KEY_PATH"`
// ProfilePort sets the listening port for Go's Pprof tool. If it is not set, profile is disabled
ProfilePort int `env:"PROFILE_PORT"`
}
package agent
import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
)
func buildTLSConfig(cfg *Config) (*tls.Config, error) {
tlsConfig := &tls.Config{
InsecureSkipVerify: cfg.KafkaTLSInsecureSkipVerify,
}
if cfg.KafkaTLSCACertPath != "" {
caCert, err := ioutil.ReadFile(cfg.KafkaTLSCACertPath)
if err != nil {
return nil, err
}
tlsConfig.RootCAs = x509.NewCertPool()
tlsConfig.RootCAs.AppendCertsFromPEM(caCert)
if cfg.KafkaTLSUserCertPath != "" && cfg.KafkaTLSUserKeyPath != "" {
userCert, err := ioutil.ReadFile(cfg.KafkaTLSUserCertPath)
if err != nil {
return nil, err
}
userKey, err := ioutil.ReadFile(cfg.KafkaTLSUserKeyPath)
if err != nil {
return nil, err
}
pair, err := tls.X509KeyPair([]byte(userCert), []byte(userKey))
if err != nil {
return nil, err
}
tlsConfig.Certificates = []tls.Certificate{pair}
}
return tlsConfig, nil
}
return nil, nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment