diff --git a/applications/ws-events/app.go b/applications/ws-events/app.go index 1629de8f643709a109463e021d35589ce8f89489..8eab01500e9a059cfcf47fd74811813dd5d8e806 100644 --- a/applications/ws-events/app.go +++ b/applications/ws-events/app.go @@ -22,9 +22,10 @@ type Application struct { } // Run runs the application. -func (a *Application) Run() { +func (a *Application) Run() error { signal.Notify(a.stopChannel, os.Interrupt, syscall.SIGTERM) + // subscribe to Add,Delete and Update event types. a.eventService.SubscribeToEventType([]event.TypeToCallbackTuple{ {Type: event.Add, Callback: a.callback}, {Type: event.Delete, Callback: a.callback}, @@ -43,16 +44,18 @@ func (a *Application) Run() { err := svr.Start() if err != nil { - fmt.Printf("Server start failed: %v\n", err) - return + return fmt.Errorf("Server start failed: %w\n", err) } <-a.stopChannel ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - svr.Shutdown(ctx) + return svr.Shutdown(ctx) } +// callback handles an incoming event from the event system. The event is +// turned into JSON encoding and published to all websocket clients through the +// ClientManager. func (a *Application) callback(event *event.Event) { b, err := json.Marshal(event) if err != nil { diff --git a/applications/ws-events/main.go b/applications/ws-events/main.go index 096d304b7268fb6bd5305753f333be4f17419036..74131c99192356f6090a4555325ae1b949597b52 100644 --- a/applications/ws-events/main.go +++ b/applications/ws-events/main.go @@ -13,12 +13,15 @@ func main() { controllerAddress := flag.String("controllerAddress", "localhost:55055", "The address to a goSDN controller to register at.") flag.Parse() + // Register the application at the controller queueCredentials, err := registration.Register(*controllerAddress, "ws-events", "SecurePresharedToken") if err != nil { logrus.Errorf("failed to register application on control plane. %v", err) os.Exit(1) } + // New EventService with topic ManagedNetworkElement (could also be + // extended with other topics to subscribe to) eventService, err := event.NewEventService( queueCredentials, []event.Topic{event.ManagedNetworkElement}, @@ -33,5 +36,8 @@ func main() { stopChannel: make(chan os.Signal, 1), } - app.Run() + // Run the Application + if err := app.Run(); err != nil { + logrus.Error(err) + } } diff --git a/applications/ws-events/ws.go b/applications/ws-events/ws.go index 534791f0df4651f5755e5cb7a8ff42d5d2a43233..1477af1602f718fb2918159601f7701f6ea1f399 100644 --- a/applications/ws-events/ws.go +++ b/applications/ws-events/ws.go @@ -18,11 +18,14 @@ var ( keepaliveTimer = time.Second * 60 ) +// ClientManager holds all the current active websocket connections as so +// called clients. type ClientManager struct { mux sync.Mutex clients map[*websocket.Conn]struct{} } +// Register adds a new client (a websocket connection) to the ClientManager. func (cMngr *ClientManager) Register(client *websocket.Conn) { cMngr.mux.Lock() defer cMngr.mux.Unlock() @@ -30,6 +33,8 @@ func (cMngr *ClientManager) Register(client *websocket.Conn) { cMngr.clients[client] = struct{}{} logrus.Println("Added new client: ", client.RemoteAddr()) } + +// Deregister removes a client (a websocket connection) from the ClientManager. func (cMngr *ClientManager) Deregister(client *websocket.Conn) { cMngr.mux.Lock() defer cMngr.mux.Unlock() @@ -37,6 +42,9 @@ func (cMngr *ClientManager) Deregister(client *websocket.Conn) { delete(cMngr.clients, client) logrus.Println("Removed client: ", client.RemoteAddr()) } + +// Publish sends the given byte slice to all the clients (a client is a +// websocket connection). func (cMngr *ClientManager) Publish(message []byte) { cMngr.mux.Lock() defer cMngr.mux.Unlock() @@ -52,12 +60,15 @@ func (cMngr *ClientManager) Publish(message []byte) { } } +// NewClientManager returns a new websocket ClientManager. func NewClientManager() *ClientManager { return &ClientManager{ clients: make(map[*websocket.Conn]struct{}), } } +// newUpgrader creates a new websocket.Upgrader and implements the OnOpen, +// OnClose and OnMessage functions. func newUpgrader() *websocket.Upgrader { u := websocket.NewUpgrader() @@ -70,12 +81,15 @@ func newUpgrader() *websocket.Upgrader { }) u.OnMessage(func(c *websocket.Conn, messageType websocket.MessageType, data []byte) { - c.SetReadDeadline(time.Now().Add(keepaliveTimer)) + if err := c.SetReadDeadline(time.Now().Add(keepaliveTimer)); err != nil { + logrus.Printf("Could not update ReadDeadline: %v\n", err) + } }) return u } +// onWebsocket is the handling function for a new websocket. func onWebsocket(w http.ResponseWriter, r *http.Request) { upgrader := newUpgrader() conn, err := upgrader.Upgrade(w, r, nil) @@ -84,5 +98,8 @@ func onWebsocket(w http.ResponseWriter, r *http.Request) { return } - conn.SetReadDeadline(time.Now().Add(keepaliveTimer)) + if err := conn.SetReadDeadline(time.Now().Add(keepaliveTimer)); err != nil { + logrus.Printf("Could not set initial ReadDeadline: %v\n", err) + return + } }