From ff2b51a07892b38f3bcf3d0749fde178ff723656 Mon Sep 17 00:00:00 2001
From: Malte Bauch <malte.bauch@stud.h-da.de>
Date: Fri, 11 Aug 2023 10:30:43 +0000
Subject: [PATCH] Resolve "Application that allows to access events through
 websocket"

See merge request danet/gosdn!490

Co-authored-by: Fabian Seidl <fabian.seidl@h-da.de>
---
 Makefile                                      |   7 +-
 applications/ws-events/app.go                 |  65 +++++++++++
 applications/ws-events/main.go                |  43 +++++++
 applications/ws-events/ws-events.Dockerfile   |  16 +++
 .../ws-events.Dockerfile.dockerignore         |  29 +++++
 applications/ws-events/ws.go                  | 105 ++++++++++++++++++
 go.mod                                        |   4 +
 go.sum                                        |   9 ++
 8 files changed, 277 insertions(+), 1 deletion(-)
 create mode 100644 applications/ws-events/app.go
 create mode 100644 applications/ws-events/main.go
 create mode 100644 applications/ws-events/ws-events.Dockerfile
 create mode 100644 applications/ws-events/ws-events.Dockerfile.dockerignore
 create mode 100644 applications/ws-events/ws.go

diff --git a/Makefile b/Makefile
index 7195c32ef..aef433ec6 100644
--- a/Makefile
+++ b/Makefile
@@ -74,7 +74,6 @@ generate-csbi-yang-models: install-tools
 
 build: pre build-gosdn build-gosdnc build-plugin-registry build-venv-manager build-arista-routing-engine-app build-hostname-checker-app build-basic-interface-monitoring-app
 
-
 build-plugins:
 	for d in ./plugins/examples/*/ ; do\
 		d="$${d%*/}";\
@@ -110,6 +109,9 @@ build-hostname-checker-app: pre
 build-basic-interface-monitoring-app: pre
 	$(GOBUILD) -trimpath -o $(BUILD_ARTIFACTS_PATH)/basic-interface-monitoring ./applications/basic-interface-monitoring
 
+build-ws-events-app: pre
+	$(GOBUILD) -trimpath -o $(BUILD_ARTIFACTS_PATH)/ws-events ./applications/ws-events
+
 containerize-all: containerize-gosdn containerize-gosdnc containerize-plugin-registry
 
 containerize-gosdn:
@@ -133,6 +135,9 @@ containerize-arista-routing-engine-app:
 containerize-hostname-checker-app:
 	docker buildx build --rm -t hostname-checker-app -f applications/hostname-checker/hostname-checker.Dockerfile .
 
+containerize-ws-events-app:
+	docker buildx build --rm -t ws-events-app -f applications/ws-events/ws-events.Dockerfile .
+
 containerlab-start: create-clab-dir containerize-all generate-gnmi-target-certs
 	cd $(CLAB_DIR) &&\
 	sudo containerlab deploy --topo $(MAKEFILE_DIR)dev_env_data/clab/gosdn.clab.yaml
diff --git a/applications/ws-events/app.go b/applications/ws-events/app.go
new file mode 100644
index 000000000..8eab01500
--- /dev/null
+++ b/applications/ws-events/app.go
@@ -0,0 +1,65 @@
+package main
+
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"net/http"
+	"os"
+	"os/signal"
+	"syscall"
+	"time"
+
+	"code.fbi.h-da.de/danet/gosdn/application-framework/event"
+	"github.com/lesismal/nbio/nbhttp"
+	"github.com/sirupsen/logrus"
+)
+
+// Application is an example for a sdn application.
+type Application struct {
+	eventService event.ServiceInterface
+	stopChannel  chan os.Signal
+}
+
+// Run runs the application.
+func (a *Application) Run() error {
+	signal.Notify(a.stopChannel, os.Interrupt, syscall.SIGTERM)
+
+	// subscribe to Add,Delete and Update event types.
+	a.eventService.SubscribeToEventType([]event.TypeToCallbackTuple{
+		{Type: event.Add, Callback: a.callback},
+		{Type: event.Delete, Callback: a.callback},
+		{Type: event.Update, Callback: a.callback},
+	})
+	a.eventService.SetupEventReciever(a.stopChannel)
+
+	mux := &http.ServeMux{}
+	mux.HandleFunc("/events", onWebsocket)
+
+	svr := nbhttp.NewServer(nbhttp.Config{
+		Network: "tcp",
+		Addrs:   []string{"localhost:4000"},
+		Handler: mux,
+	})
+
+	err := svr.Start()
+	if err != nil {
+		return fmt.Errorf("Server start failed: %w\n", err)
+	}
+
+	<-a.stopChannel
+	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
+	defer cancel()
+	return svr.Shutdown(ctx)
+}
+
+// callback handles an incoming event from the event system. The event is
+// turned into JSON encoding and published to all websocket clients through the
+// ClientManager.
+func (a *Application) callback(event *event.Event) {
+	b, err := json.Marshal(event)
+	if err != nil {
+		logrus.Error("Failed marshal of event")
+	}
+	clientManager.Publish(b)
+}
diff --git a/applications/ws-events/main.go b/applications/ws-events/main.go
new file mode 100644
index 000000000..74131c991
--- /dev/null
+++ b/applications/ws-events/main.go
@@ -0,0 +1,43 @@
+package main
+
+import (
+	"flag"
+	"os"
+
+	"code.fbi.h-da.de/danet/gosdn/application-framework/event"
+	"code.fbi.h-da.de/danet/gosdn/application-framework/registration"
+	"github.com/sirupsen/logrus"
+)
+
+func main() {
+	controllerAddress := flag.String("controllerAddress", "localhost:55055", "The address to a goSDN controller to register at.")
+	flag.Parse()
+
+	// Register the application at the controller
+	queueCredentials, err := registration.Register(*controllerAddress, "ws-events", "SecurePresharedToken")
+	if err != nil {
+		logrus.Errorf("failed to register application on control plane. %v", err)
+		os.Exit(1)
+	}
+
+	// New EventService with topic ManagedNetworkElement (could also be
+	// extended with other topics to subscribe to)
+	eventService, err := event.NewEventService(
+		queueCredentials,
+		[]event.Topic{event.ManagedNetworkElement},
+	)
+	if err != nil {
+		logrus.Errorf("failed to create event service. %v", err)
+		os.Exit(1)
+	}
+
+	app := &Application{
+		eventService: eventService,
+		stopChannel:  make(chan os.Signal, 1),
+	}
+
+	// Run the Application
+	if err := app.Run(); err != nil {
+		logrus.Error(err)
+	}
+}
diff --git a/applications/ws-events/ws-events.Dockerfile b/applications/ws-events/ws-events.Dockerfile
new file mode 100644
index 000000000..442629743
--- /dev/null
+++ b/applications/ws-events/ws-events.Dockerfile
@@ -0,0 +1,16 @@
+ARG GOLANG_VERSION=1.21
+ARG BUILDARGS
+ARG $GITLAB_PROXY
+
+FROM ${GITLAB_PROXY}golang:$GOLANG_VERSION-alpine as builder
+WORKDIR /app/
+RUN apk add --no-cache build-base
+RUN apk add --no-cache bash
+COPY . .
+RUN --mount=type=cache,target=/root/go/pkg/mod \
+    --mount=type=cache,target=/root/.cache/go-build \
+    make build-ws-events-app
+
+FROM scratch
+COPY --from=builder /app/artifacts/ws-events /app/artifacts/ws-events
+ENTRYPOINT ["./artifacts/ws-events"]
diff --git a/applications/ws-events/ws-events.Dockerfile.dockerignore b/applications/ws-events/ws-events.Dockerfile.dockerignore
new file mode 100644
index 000000000..596599720
--- /dev/null
+++ b/applications/ws-events/ws-events.Dockerfile.dockerignore
@@ -0,0 +1,29 @@
+.git
+.gitlab
+build
+documentation
+mocks
+test
+clab-gosdn_csbi_arista_base
+clab-gosdn_sts_demo_basic
+.cobra.yaml
+.dockeringore
+.gitlab-ci.yaml
+ARCHITECTURE.md
+CONTRIBUTING.md
+README.md
+artifacts
+build-tools
+cli
+config
+csbi
+dev_env_data
+docker_volume_backup
+docs
+documentation
+forks
+lab-vm
+models
+plugin-registry
+plugins
+scripts
diff --git a/applications/ws-events/ws.go b/applications/ws-events/ws.go
new file mode 100644
index 000000000..1477af160
--- /dev/null
+++ b/applications/ws-events/ws.go
@@ -0,0 +1,105 @@
+package main
+
+import (
+	"net/http"
+	"sync"
+	"time"
+
+	"github.com/hashicorp/go-multierror"
+	"github.com/lesismal/nbio/nbhttp/websocket"
+	"github.com/sirupsen/logrus"
+)
+
+// This implementation follows the code example at:
+// https://github.com/lesismal/nbio/issues/92#issuecomment-922183823
+
+var (
+	clientManager  = NewClientManager()
+	keepaliveTimer = time.Second * 60
+)
+
+// ClientManager holds all the current active websocket connections as so
+// called clients.
+type ClientManager struct {
+	mux     sync.Mutex
+	clients map[*websocket.Conn]struct{}
+}
+
+// Register adds a new client (a websocket connection) to the ClientManager.
+func (cMngr *ClientManager) Register(client *websocket.Conn) {
+	cMngr.mux.Lock()
+	defer cMngr.mux.Unlock()
+
+	cMngr.clients[client] = struct{}{}
+	logrus.Println("Added new client: ", client.RemoteAddr())
+}
+
+// Deregister removes a client (a websocket connection) from the ClientManager.
+func (cMngr *ClientManager) Deregister(client *websocket.Conn) {
+	cMngr.mux.Lock()
+	defer cMngr.mux.Unlock()
+
+	delete(cMngr.clients, client)
+	logrus.Println("Removed client: ", client.RemoteAddr())
+}
+
+// Publish sends the given byte slice to all the clients (a client is a
+// websocket connection).
+func (cMngr *ClientManager) Publish(message []byte) {
+	cMngr.mux.Lock()
+	defer cMngr.mux.Unlock()
+
+	var eg multierror.Group
+	for c := range cMngr.clients {
+		eg.Go(func() error {
+			return c.WriteMessage(websocket.TextMessage, message)
+		})
+	}
+	if err := eg.Wait(); err != nil {
+		logrus.Printf("Publish encountered errors while broadcasting: %v\n", err)
+	}
+}
+
+// NewClientManager returns a new websocket ClientManager.
+func NewClientManager() *ClientManager {
+	return &ClientManager{
+		clients: make(map[*websocket.Conn]struct{}),
+	}
+}
+
+// newUpgrader creates a new websocket.Upgrader and implements the OnOpen,
+// OnClose and OnMessage functions.
+func newUpgrader() *websocket.Upgrader {
+	u := websocket.NewUpgrader()
+
+	u.OnOpen(func(c *websocket.Conn) {
+		clientManager.Register(c)
+	})
+
+	u.OnClose(func(c *websocket.Conn, err error) {
+		clientManager.Deregister(c)
+	})
+
+	u.OnMessage(func(c *websocket.Conn, messageType websocket.MessageType, data []byte) {
+		if err := c.SetReadDeadline(time.Now().Add(keepaliveTimer)); err != nil {
+			logrus.Printf("Could not update ReadDeadline: %v\n", err)
+		}
+	})
+
+	return u
+}
+
+// onWebsocket is the handling function for a new websocket.
+func onWebsocket(w http.ResponseWriter, r *http.Request) {
+	upgrader := newUpgrader()
+	conn, err := upgrader.Upgrade(w, r, nil)
+	if err != nil {
+		logrus.Println("Could not create websocket")
+		return
+	}
+
+	if err := conn.SetReadDeadline(time.Now().Add(keepaliveTimer)); err != nil {
+		logrus.Printf("Could not set initial ReadDeadline: %v\n", err)
+		return
+	}
+}
diff --git a/go.mod b/go.mod
index b4f15fb19..7418d3c61 100644
--- a/go.mod
+++ b/go.mod
@@ -89,7 +89,9 @@ require (
 )
 
 require (
+	github.com/hashicorp/go-multierror v1.1.1
 	github.com/hashicorp/go-plugin v1.4.10
+	github.com/lesismal/nbio v1.3.17
 	google.golang.org/genproto/googleapis/api v0.0.0-20230807174057-1744710a1577
 )
 
@@ -99,8 +101,10 @@ require (
 	atomicgo.dev/schedule v0.0.2 // indirect
 	github.com/containerd/console v1.0.3 // indirect
 	github.com/fatih/color v1.15.0 // indirect
+	github.com/hashicorp/errwrap v1.1.0 // indirect
 	github.com/hashicorp/go-hclog v1.5.0 // indirect
 	github.com/hashicorp/yamux v0.1.1 // indirect
+	github.com/lesismal/llib v1.1.12 // indirect
 	github.com/lithammer/fuzzysearch v1.1.7 // indirect
 	github.com/mitchellh/go-testing-interface v1.14.1 // indirect
 	github.com/moby/patternmatcher v0.5.0 // indirect
diff --git a/go.sum b/go.sum
index a50be444e..449ad0e68 100644
--- a/go.sum
+++ b/go.sum
@@ -540,6 +540,7 @@ github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBt
 github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
 github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
 github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
+github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
 github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
 github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
 github.com/hashicorp/go-hclog v1.5.0 h1:bI2ocEMgcVlz55Oj1xZNBsVi900c7II+fWDyV9o+13c=
@@ -548,6 +549,7 @@ github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjh
 github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
 github.com/hashicorp/go-multierror v0.0.0-20161216184304-ed905158d874/go.mod h1:JMRHfdO9jKNzS/+BTlxCjKNQHg/jZAft8U7LloJvN7I=
 github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
+github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
 github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
 github.com/hashicorp/go-plugin v1.4.10 h1:xUbmA4jC6Dq163/fWcp8P3JuHilrHHMLNRxzGQJ9hNk=
 github.com/hashicorp/go-plugin v1.4.10/go.mod h1:6/1TEzT0eQznvI/gV2CM29DLSkAK/e58mUWKVsPaph0=
@@ -628,6 +630,10 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
 github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
 github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
 github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
+github.com/lesismal/llib v1.1.12 h1:KJFB8bL02V+QGIvILEw/w7s6bKj9Ps9Px97MZP2EOk0=
+github.com/lesismal/llib v1.1.12/go.mod h1:70tFXXe7P1FZ02AU9l8LgSOK7d7sRrpnkUr3rd3gKSg=
+github.com/lesismal/nbio v1.3.17 h1:rsDwVdRfFK/QcFckDBgdGIZTf6Y98XWHRQWDY+EldO8=
+github.com/lesismal/nbio v1.3.17/go.mod h1:KWlouFT5cgDdW5sMX8RsHASUMGniea9X0XIellZ0B38=
 github.com/linuxkit/virtsock v0.0.0-20201010232012-f8cee7dfc7a3/go.mod h1:3r6x7q95whyfWQpmGZTu3gk3v2YkMi05HEzl7Tf7YEo=
 github.com/lithammer/fuzzysearch v1.1.7 h1:q8rZNmBIUkqxsxb/IlwsXVbCoPIH/0juxjFHY0UIwhU=
 github.com/lithammer/fuzzysearch v1.1.7/go.mod h1:ZhIlfRGxnD8qa9car/yplC6GmnM14CS07BYAKJJBK2I=
@@ -1073,10 +1079,12 @@ golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPh
 golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
 golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
 golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
+golang.org/x/crypto v0.0.0-20210513122933-cd7d49e622d5/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
 golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
 golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
 golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
 golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
+golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
 golang.org/x/crypto v0.12.0 h1:tFM/ta59kqch6LlvYnPa0yx5a83cL2nHflFhYKvv9Yk=
 golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
 golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@@ -1169,6 +1177,7 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
 golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
 golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
 golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
+golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20210520170846-37e1c6afe023/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20210825183410-e898025ed96a/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
-- 
GitLab