Skip to content
Snippets Groups Projects
Commit 7b83eebe authored by Malte Bauch's avatar Malte Bauch
Browse files

Comments added

parent 8a972dd1
Branches
Tags
No related merge requests found
Pipeline #159207 failed
...@@ -22,9 +22,10 @@ type Application struct { ...@@ -22,9 +22,10 @@ type Application struct {
} }
// Run runs the application. // Run runs the application.
func (a *Application) Run() { func (a *Application) Run() error {
signal.Notify(a.stopChannel, os.Interrupt, syscall.SIGTERM) signal.Notify(a.stopChannel, os.Interrupt, syscall.SIGTERM)
// subscribe to Add,Delete and Update event types.
a.eventService.SubscribeToEventType([]event.TypeToCallbackTuple{ a.eventService.SubscribeToEventType([]event.TypeToCallbackTuple{
{Type: event.Add, Callback: a.callback}, {Type: event.Add, Callback: a.callback},
{Type: event.Delete, Callback: a.callback}, {Type: event.Delete, Callback: a.callback},
...@@ -43,16 +44,18 @@ func (a *Application) Run() { ...@@ -43,16 +44,18 @@ func (a *Application) Run() {
err := svr.Start() err := svr.Start()
if err != nil { if err != nil {
fmt.Printf("Server start failed: %v\n", err) return fmt.Errorf("Server start failed: %w\n", err)
return
} }
<-a.stopChannel <-a.stopChannel
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel() defer cancel()
svr.Shutdown(ctx) return svr.Shutdown(ctx)
} }
// callback handles an incoming event from the event system. The event is
// turned into JSON encoding and published to all websocket clients through the
// ClientManager.
func (a *Application) callback(event *event.Event) { func (a *Application) callback(event *event.Event) {
b, err := json.Marshal(event) b, err := json.Marshal(event)
if err != nil { if err != nil {
......
...@@ -13,12 +13,15 @@ func main() { ...@@ -13,12 +13,15 @@ func main() {
controllerAddress := flag.String("controllerAddress", "localhost:55055", "The address to a goSDN controller to register at.") controllerAddress := flag.String("controllerAddress", "localhost:55055", "The address to a goSDN controller to register at.")
flag.Parse() flag.Parse()
// Register the application at the controller
queueCredentials, err := registration.Register(*controllerAddress, "ws-events", "SecurePresharedToken") queueCredentials, err := registration.Register(*controllerAddress, "ws-events", "SecurePresharedToken")
if err != nil { if err != nil {
logrus.Errorf("failed to register application on control plane. %v", err) logrus.Errorf("failed to register application on control plane. %v", err)
os.Exit(1) os.Exit(1)
} }
// New EventService with topic ManagedNetworkElement (could also be
// extended with other topics to subscribe to)
eventService, err := event.NewEventService( eventService, err := event.NewEventService(
queueCredentials, queueCredentials,
[]event.Topic{event.ManagedNetworkElement}, []event.Topic{event.ManagedNetworkElement},
...@@ -33,5 +36,8 @@ func main() { ...@@ -33,5 +36,8 @@ func main() {
stopChannel: make(chan os.Signal, 1), stopChannel: make(chan os.Signal, 1),
} }
app.Run() // Run the Application
if err := app.Run(); err != nil {
logrus.Error(err)
}
} }
...@@ -18,11 +18,14 @@ var ( ...@@ -18,11 +18,14 @@ var (
keepaliveTimer = time.Second * 60 keepaliveTimer = time.Second * 60
) )
// ClientManager holds all the current active websocket connections as so
// called clients.
type ClientManager struct { type ClientManager struct {
mux sync.Mutex mux sync.Mutex
clients map[*websocket.Conn]struct{} clients map[*websocket.Conn]struct{}
} }
// Register adds a new client (a websocket connection) to the ClientManager.
func (cMngr *ClientManager) Register(client *websocket.Conn) { func (cMngr *ClientManager) Register(client *websocket.Conn) {
cMngr.mux.Lock() cMngr.mux.Lock()
defer cMngr.mux.Unlock() defer cMngr.mux.Unlock()
...@@ -30,6 +33,8 @@ func (cMngr *ClientManager) Register(client *websocket.Conn) { ...@@ -30,6 +33,8 @@ func (cMngr *ClientManager) Register(client *websocket.Conn) {
cMngr.clients[client] = struct{}{} cMngr.clients[client] = struct{}{}
logrus.Println("Added new client: ", client.RemoteAddr()) logrus.Println("Added new client: ", client.RemoteAddr())
} }
// Deregister removes a client (a websocket connection) from the ClientManager.
func (cMngr *ClientManager) Deregister(client *websocket.Conn) { func (cMngr *ClientManager) Deregister(client *websocket.Conn) {
cMngr.mux.Lock() cMngr.mux.Lock()
defer cMngr.mux.Unlock() defer cMngr.mux.Unlock()
...@@ -37,6 +42,9 @@ func (cMngr *ClientManager) Deregister(client *websocket.Conn) { ...@@ -37,6 +42,9 @@ func (cMngr *ClientManager) Deregister(client *websocket.Conn) {
delete(cMngr.clients, client) delete(cMngr.clients, client)
logrus.Println("Removed client: ", client.RemoteAddr()) logrus.Println("Removed client: ", client.RemoteAddr())
} }
// Publish sends the given byte slice to all the clients (a client is a
// websocket connection).
func (cMngr *ClientManager) Publish(message []byte) { func (cMngr *ClientManager) Publish(message []byte) {
cMngr.mux.Lock() cMngr.mux.Lock()
defer cMngr.mux.Unlock() defer cMngr.mux.Unlock()
...@@ -52,12 +60,15 @@ func (cMngr *ClientManager) Publish(message []byte) { ...@@ -52,12 +60,15 @@ func (cMngr *ClientManager) Publish(message []byte) {
} }
} }
// NewClientManager returns a new websocket ClientManager.
func NewClientManager() *ClientManager { func NewClientManager() *ClientManager {
return &ClientManager{ return &ClientManager{
clients: make(map[*websocket.Conn]struct{}), clients: make(map[*websocket.Conn]struct{}),
} }
} }
// newUpgrader creates a new websocket.Upgrader and implements the OnOpen,
// OnClose and OnMessage functions.
func newUpgrader() *websocket.Upgrader { func newUpgrader() *websocket.Upgrader {
u := websocket.NewUpgrader() u := websocket.NewUpgrader()
...@@ -70,12 +81,15 @@ func newUpgrader() *websocket.Upgrader { ...@@ -70,12 +81,15 @@ func newUpgrader() *websocket.Upgrader {
}) })
u.OnMessage(func(c *websocket.Conn, messageType websocket.MessageType, data []byte) { u.OnMessage(func(c *websocket.Conn, messageType websocket.MessageType, data []byte) {
c.SetReadDeadline(time.Now().Add(keepaliveTimer)) if err := c.SetReadDeadline(time.Now().Add(keepaliveTimer)); err != nil {
logrus.Printf("Could not update ReadDeadline: %v\n", err)
}
}) })
return u return u
} }
// onWebsocket is the handling function for a new websocket.
func onWebsocket(w http.ResponseWriter, r *http.Request) { func onWebsocket(w http.ResponseWriter, r *http.Request) {
upgrader := newUpgrader() upgrader := newUpgrader()
conn, err := upgrader.Upgrade(w, r, nil) conn, err := upgrader.Upgrade(w, r, nil)
...@@ -84,5 +98,8 @@ func onWebsocket(w http.ResponseWriter, r *http.Request) { ...@@ -84,5 +98,8 @@ func onWebsocket(w http.ResponseWriter, r *http.Request) {
return return
} }
conn.SetReadDeadline(time.Now().Add(keepaliveTimer)) if err := conn.SetReadDeadline(time.Now().Add(keepaliveTimer)); err != nil {
logrus.Printf("Could not set initial ReadDeadline: %v\n", err)
return
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment