Skip to content
Snippets Groups Projects
Commit 881253d1 authored by Malte Bauch's avatar Malte Bauch
Browse files

Comments added

parent f181cd09
Branches main
No related tags found
No related merge requests found
...@@ -22,9 +22,10 @@ type Application struct { ...@@ -22,9 +22,10 @@ type Application struct {
} }
// Run runs the application. // Run runs the application.
func (a *Application) Run() { func (a *Application) Run() error {
signal.Notify(a.stopChannel, os.Interrupt, syscall.SIGTERM) signal.Notify(a.stopChannel, os.Interrupt, syscall.SIGTERM)
// subscribe to Add,Delete and Update event types.
a.eventService.SubscribeToEventType([]event.TypeToCallbackTuple{ a.eventService.SubscribeToEventType([]event.TypeToCallbackTuple{
{Type: event.Add, Callback: a.callback}, {Type: event.Add, Callback: a.callback},
{Type: event.Delete, Callback: a.callback}, {Type: event.Delete, Callback: a.callback},
...@@ -43,16 +44,18 @@ func (a *Application) Run() { ...@@ -43,16 +44,18 @@ func (a *Application) Run() {
err := svr.Start() err := svr.Start()
if err != nil { if err != nil {
fmt.Printf("Server start failed: %v\n", err) return fmt.Errorf("Server start failed: %w\n", err)
return
} }
<-a.stopChannel <-a.stopChannel
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel() defer cancel()
svr.Shutdown(ctx) return svr.Shutdown(ctx)
} }
// callback handles an incoming event from the event system. The event is
// turned into JSON encoding and published to all websocket clients through the
// ClientManager.
func (a *Application) callback(event *event.Event) { func (a *Application) callback(event *event.Event) {
b, err := json.Marshal(event) b, err := json.Marshal(event)
if err != nil { if err != nil {
......
...@@ -13,12 +13,15 @@ func main() { ...@@ -13,12 +13,15 @@ func main() {
controllerAddress := flag.String("controllerAddress", "localhost:55055", "The address to a goSDN controller to register at.") controllerAddress := flag.String("controllerAddress", "localhost:55055", "The address to a goSDN controller to register at.")
flag.Parse() flag.Parse()
// Register the application at the controller
queueCredentials, err := registration.Register(*controllerAddress, "ws-events", "SecurePresharedToken") queueCredentials, err := registration.Register(*controllerAddress, "ws-events", "SecurePresharedToken")
if err != nil { if err != nil {
logrus.Errorf("failed to register application on control plane. %v", err) logrus.Errorf("failed to register application on control plane. %v", err)
os.Exit(1) os.Exit(1)
} }
// New EventService with topic ManagedNetworkElement (could also be
// extended with other topics to subscribe to)
eventService, err := event.NewEventService( eventService, err := event.NewEventService(
queueCredentials, queueCredentials,
[]event.Topic{event.ManagedNetworkElement}, []event.Topic{event.ManagedNetworkElement},
...@@ -33,5 +36,8 @@ func main() { ...@@ -33,5 +36,8 @@ func main() {
stopChannel: make(chan os.Signal, 1), stopChannel: make(chan os.Signal, 1),
} }
app.Run() // Run the Application
if err := app.Run(); err != nil {
logrus.Error(err)
}
} }
...@@ -18,11 +18,14 @@ var ( ...@@ -18,11 +18,14 @@ var (
keepaliveTimer = time.Second * 60 keepaliveTimer = time.Second * 60
) )
// ClientManager holds all the current active websocket connections as so
// called clients.
type ClientManager struct { type ClientManager struct {
mux sync.Mutex mux sync.Mutex
clients map[*websocket.Conn]struct{} clients map[*websocket.Conn]struct{}
} }
// Register adds a new client (a websocket connection) to the ClientManager.
func (cMngr *ClientManager) Register(client *websocket.Conn) { func (cMngr *ClientManager) Register(client *websocket.Conn) {
cMngr.mux.Lock() cMngr.mux.Lock()
defer cMngr.mux.Unlock() defer cMngr.mux.Unlock()
...@@ -30,6 +33,8 @@ func (cMngr *ClientManager) Register(client *websocket.Conn) { ...@@ -30,6 +33,8 @@ func (cMngr *ClientManager) Register(client *websocket.Conn) {
cMngr.clients[client] = struct{}{} cMngr.clients[client] = struct{}{}
logrus.Println("Added new client: ", client.RemoteAddr()) logrus.Println("Added new client: ", client.RemoteAddr())
} }
// Deregister removes a client (a websocket connection) from the ClientManager.
func (cMngr *ClientManager) Deregister(client *websocket.Conn) { func (cMngr *ClientManager) Deregister(client *websocket.Conn) {
cMngr.mux.Lock() cMngr.mux.Lock()
defer cMngr.mux.Unlock() defer cMngr.mux.Unlock()
...@@ -37,6 +42,9 @@ func (cMngr *ClientManager) Deregister(client *websocket.Conn) { ...@@ -37,6 +42,9 @@ func (cMngr *ClientManager) Deregister(client *websocket.Conn) {
delete(cMngr.clients, client) delete(cMngr.clients, client)
logrus.Println("Removed client: ", client.RemoteAddr()) logrus.Println("Removed client: ", client.RemoteAddr())
} }
// Publish sends the given byte slice to all the clients (a client is a
// websocket connection).
func (cMngr *ClientManager) Publish(message []byte) { func (cMngr *ClientManager) Publish(message []byte) {
cMngr.mux.Lock() cMngr.mux.Lock()
defer cMngr.mux.Unlock() defer cMngr.mux.Unlock()
...@@ -52,12 +60,15 @@ func (cMngr *ClientManager) Publish(message []byte) { ...@@ -52,12 +60,15 @@ func (cMngr *ClientManager) Publish(message []byte) {
} }
} }
// NewClientManager returns a new websocket ClientManager.
func NewClientManager() *ClientManager { func NewClientManager() *ClientManager {
return &ClientManager{ return &ClientManager{
clients: make(map[*websocket.Conn]struct{}), clients: make(map[*websocket.Conn]struct{}),
} }
} }
// newUpgrader creates a new websocket.Upgrader and implements the OnOpen,
// OnClose and OnMessage functions.
func newUpgrader() *websocket.Upgrader { func newUpgrader() *websocket.Upgrader {
u := websocket.NewUpgrader() u := websocket.NewUpgrader()
...@@ -70,12 +81,15 @@ func newUpgrader() *websocket.Upgrader { ...@@ -70,12 +81,15 @@ func newUpgrader() *websocket.Upgrader {
}) })
u.OnMessage(func(c *websocket.Conn, messageType websocket.MessageType, data []byte) { u.OnMessage(func(c *websocket.Conn, messageType websocket.MessageType, data []byte) {
c.SetReadDeadline(time.Now().Add(keepaliveTimer)) if err := c.SetReadDeadline(time.Now().Add(keepaliveTimer)); err != nil {
logrus.Printf("Could not update ReadDeadline: %v\n", err)
}
}) })
return u return u
} }
// onWebsocket is the handling function for a new websocket.
func onWebsocket(w http.ResponseWriter, r *http.Request) { func onWebsocket(w http.ResponseWriter, r *http.Request) {
upgrader := newUpgrader() upgrader := newUpgrader()
conn, err := upgrader.Upgrade(w, r, nil) conn, err := upgrader.Upgrade(w, r, nil)
...@@ -84,5 +98,8 @@ func onWebsocket(w http.ResponseWriter, r *http.Request) { ...@@ -84,5 +98,8 @@ func onWebsocket(w http.ResponseWriter, r *http.Request) {
return return
} }
conn.SetReadDeadline(time.Now().Add(keepaliveTimer)) if err := conn.SetReadDeadline(time.Now().Add(keepaliveTimer)); err != nil {
logrus.Printf("Could not set initial ReadDeadline: %v\n", err)
return
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment