Skip to content
Snippets Groups Projects
Unverified Commit fb3ae596 authored by Mario Macias's avatar Mario Macias Committed by GitHub
Browse files

Added ICMP single-packet test (#42)

* Added ICMP single-packet test

* Fix timestamp comparison to avoid getting twice the same record
parent bbc573bc
No related branches found
No related tags found
No related merge requests found
...@@ -243,7 +243,6 @@ func (bt *FlowCaptureTester) lokiQuery(t *testing.T, logQL string) tester.LokiQu ...@@ -243,7 +243,6 @@ func (bt *FlowCaptureTester) lokiQuery(t *testing.T, logQL string) tester.LokiQu
require.NotNil(t, query) require.NotNil(t, query)
require.NotEmpty(t, query.Data.Result) require.NotEmpty(t, query.Data.Result)
}, test.Interval(time.Second)) }, test.Interval(time.Second))
require.NotEmpty(t, query.Data.Result)
result := query.Data.Result[0] result := query.Data.Result[0]
return result return result
} }
......
...@@ -3,12 +3,21 @@ ...@@ -3,12 +3,21 @@
package basic package basic
import ( import (
"context"
"path" "path"
"testing" "testing"
"time" "time"
"github.com/mariomac/guara/pkg/test"
"github.com/netobserv/netobserv-ebpf-agent/e2e/cluster" "github.com/netobserv/netobserv-ebpf-agent/e2e/cluster"
"github.com/netobserv/netobserv-ebpf-agent/e2e/cluster/tester"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/e2e-framework/pkg/envconf"
"sigs.k8s.io/e2e-framework/pkg/features"
) )
const ( const (
...@@ -42,3 +51,120 @@ func TestBasicFlowCapture(t *testing.T) { ...@@ -42,3 +51,120 @@ func TestBasicFlowCapture(t *testing.T) {
} }
bt.DoTest(t) bt.DoTest(t)
} }
// TestSinglePacketFlows uses a known packet size and number to check that,
// (1) packets are aggregated only once,
// (2) once packets are evicted, no more flows are aggregated on top of them.
func TestSinglePacketFlows(t *testing.T) {
var pingerIP, serverPodIP string
var latestFlowMS time.Time
testCluster.TestEnv().Test(t, features.New("single-packet flow capture").Setup(
func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
kclient, err := kubernetes.NewForConfig(cfg.Client().RESTConfig())
require.NoError(t, err)
// extract pinger Pod information from kubernetes
test.Eventually(t, testTimeout, func(t require.TestingT) {
client, err := kclient.CoreV1().Pods(namespace).
Get(ctx, "pinger", metav1.GetOptions{})
require.NoError(t, err)
require.NotEmpty(t, client.Status.PodIP)
pingerIP = client.Status.PodIP
}, test.Interval(time.Second))
// extract server (ping destination) pod information from kubernetes
test.Eventually(t, testTimeout, func(t require.TestingT) {
server, err := kclient.CoreV1().Pods(namespace).
List(ctx, metav1.ListOptions{LabelSelector: "app=server"})
require.NoError(t, err)
require.Len(t, server.Items, 1)
require.NotEmpty(t, server.Items)
require.NotEmpty(t, server.Items[0].Status.PodIP)
serverPodIP = server.Items[0].Status.PodIP
}, test.Interval(time.Second))
return ctx
},
).Assess("correctness of single, small ICMP packet from pinger to server",
func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
pods, err := tester.NewPods(cfg)
require.NoError(t, err)
logrus.WithField("destinationIP", serverPodIP).Info("Sending ICMP packet")
stdOut, stdErr, err := pods.Execute(ctx, namespace, "pinger",
"ping", "-c", "1", serverPodIP)
require.NoError(t, err)
logrus.WithFields(logrus.Fields{"stdOut": stdOut, "stdErr": stdErr}).Info("ping sent")
sent, recv := getPingFlows(t, time.Now().Add(-time.Minute))
assert.Equal(t, pingerIP, sent["SrcAddr"])
assert.Equal(t, serverPodIP, sent["DstAddr"])
assert.EqualValues(t, 98, sent["Bytes"]) // default ping data size + IP+ICMP headers
assert.EqualValues(t, 1, sent["Packets"])
assert.Equal(t, pingerIP, recv["DstAddr"])
assert.Equal(t, serverPodIP, recv["SrcAddr"])
assert.EqualValues(t, 98, recv["Bytes"]) // default ping data size + IP+ICMP headers
assert.EqualValues(t, 1, recv["Packets"])
latestFlowMS = asTime(recv["TimeFlowEndMs"])
return ctx
},
).Assess("correctness of another ICMP packet contained in another flow",
func(ctx context.Context, t *testing.T, cfg *envconf.Config) context.Context {
pods, err := tester.NewPods(cfg)
require.NoError(t, err)
logrus.WithField("destinationIP", serverPodIP).Info("Sending ICMP packet")
stdOut, stdErr, err := pods.Execute(ctx, namespace, "pinger",
"ping", "-s", "100", "-c", "1", serverPodIP)
require.NoError(t, err)
logrus.WithFields(logrus.Fields{"stdOut": stdOut, "stdErr": stdErr}).Info("ping sent")
// We filter by time to avoid getting twice the same flows
sent, recv := getPingFlows(t, latestFlowMS)
assert.Equal(t, pingerIP, sent["SrcAddr"])
assert.Equal(t, serverPodIP, sent["DstAddr"])
assert.EqualValues(t, 142, sent["Bytes"]) // 100-byte data size + IP+ICMP headers
assert.EqualValues(t, 1, sent["Packets"])
assert.Equal(t, pingerIP, recv["DstAddr"])
assert.Equal(t, serverPodIP, recv["SrcAddr"])
assert.EqualValues(t, 142, recv["Bytes"]) // 100-byte data size + IP+ICMP headers
assert.EqualValues(t, 1, recv["Packets"])
return ctx
},
).Feature())
}
func getPingFlows(t *testing.T, newerThan time.Time) (sent, recv map[string]interface{}) {
logrus.Info("Verifying that the request/return ICMP packets have been captured individually")
var query *tester.LokiQueryResponse
var err error
test.Eventually(t, testTimeout, func(t require.TestingT) {
query, err = testCluster.Loki().
Query(1, `{SrcK8S_OwnerName="pinger",DstK8S_OwnerName="server"}|="\"Proto\":1,"`) // Proto 1 == ICMP
require.NoError(t, err)
require.NotNil(t, query)
require.Len(t, query.Data.Result, 1)
if len(query.Data.Result) > 0 {
sent, err = query.Data.Result[0].Values[0].FlowData()
require.NoError(t, err)
require.Less(t, newerThan.UnixMilli(),
asTime(sent["TimeFlowStartMs"]).UnixMilli())
}
}, test.Interval(time.Second))
test.Eventually(t, testTimeout, func(t require.TestingT) {
query, err = testCluster.Loki().
Query(1, `{DstK8S_OwnerName="pinger",SrcK8S_OwnerName="server"}|="\"Proto\":1,"`) // Proto 1 == ICMP
require.NoError(t, err)
require.NotNil(t, query)
require.Len(t, query.Data.Result, 1)
if len(query.Data.Result) > 0 {
recv, err = query.Data.Result[0].Values[0].FlowData()
require.NoError(t, err)
require.Less(t, newerThan.UnixMilli(),
asTime(sent["TimeFlowStartMs"]).UnixMilli())
}
}, test.Interval(time.Second))
return sent, recv
}
...@@ -42,4 +42,14 @@ spec: ...@@ -42,4 +42,14 @@ spec:
spec: spec:
containers: containers:
- name: nginx - name: nginx
image: nginx:latest image: nginx:latest
\ No newline at end of file ---
# Used for single-packet test
apiVersion: v1
kind: Pod
metadata:
name: pinger
spec:
containers:
- name: pinger
image: ibmcom/ping
...@@ -11,10 +11,11 @@ import ( ...@@ -11,10 +11,11 @@ import (
) )
const ( const (
pathReady = "/ready" pathReady = "/ready"
pathQuery = "/loki/api/v1/query" pathQueryRange = "/loki/api/v1/query_range"
queryArgLimit = "limit" queryArgLimit = "limit"
queryArgQuery = "query" queryArgQuery = "query"
queryStep = "step=30m"
) )
var llog = logrus.WithField("component", "loki.Tester") var llog = logrus.WithField("component", "loki.Tester")
...@@ -52,8 +53,8 @@ func (l *Loki) Ready() error { ...@@ -52,8 +53,8 @@ func (l *Loki) Ready() error {
// Query executes an arbitrary logQL query, given a limit in the results // Query executes an arbitrary logQL query, given a limit in the results
func (l *Loki) Query(limit int, logQL string) (*LokiQueryResponse, error) { func (l *Loki) Query(limit int, logQL string) (*LokiQueryResponse, error) {
status, body, err := l.get(fmt.Sprintf("%s?%s=%d&%s=%s", status, body, err := l.get(fmt.Sprintf("%s?%s=%d&%s&%s=%s",
pathQuery, queryArgLimit, limit, pathQueryRange, queryArgLimit, limit, queryStep,
queryArgQuery, url.QueryEscape(logQL))) queryArgQuery, url.QueryEscape(logQL)))
if err != nil { if err != nil {
return nil, fmt.Errorf("loki request error: %w", err) return nil, fmt.Errorf("loki request error: %w", err)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment