Skip to content
Snippets Groups Projects
Commit eb88f77f authored by Malte Bauch's avatar Malte Bauch
Browse files

Comments added

parent c3a3484f
No related branches found
No related tags found
1 merge request!490Resolve "Application that allows to access events through websocket"
......@@ -22,9 +22,10 @@ type Application struct {
}
// Run runs the application.
func (a *Application) Run() {
func (a *Application) Run() error {
signal.Notify(a.stopChannel, os.Interrupt, syscall.SIGTERM)
// subscribe to Add,Delete and Update event types.
a.eventService.SubscribeToEventType([]event.TypeToCallbackTuple{
{Type: event.Add, Callback: a.callback},
{Type: event.Delete, Callback: a.callback},
......@@ -43,16 +44,18 @@ func (a *Application) Run() {
err := svr.Start()
if err != nil {
fmt.Printf("Server start failed: %v\n", err)
return
return fmt.Errorf("Server start failed: %w\n", err)
}
<-a.stopChannel
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
svr.Shutdown(ctx)
return svr.Shutdown(ctx)
}
// callback handles an incoming event from the event system. The event is
// turned into JSON encoding and published to all websocket clients through the
// ClientManager.
func (a *Application) callback(event *event.Event) {
b, err := json.Marshal(event)
if err != nil {
......
......@@ -13,12 +13,15 @@ func main() {
controllerAddress := flag.String("controllerAddress", "localhost:55055", "The address to a goSDN controller to register at.")
flag.Parse()
// Register the application at the controller
queueCredentials, err := registration.Register(*controllerAddress, "ws-events", "SecurePresharedToken")
if err != nil {
logrus.Errorf("failed to register application on control plane. %v", err)
os.Exit(1)
}
// New EventService with topic ManagedNetworkElement (could also be
// extended with other topics to subscribe to)
eventService, err := event.NewEventService(
queueCredentials,
[]event.Topic{event.ManagedNetworkElement},
......@@ -33,5 +36,8 @@ func main() {
stopChannel: make(chan os.Signal, 1),
}
app.Run()
// Run the Application
if err := app.Run(); err != nil {
logrus.Error(err)
}
}
......@@ -18,11 +18,14 @@ var (
keepaliveTimer = time.Second * 60
)
// ClientManager holds all the current active websocket connections as so
// called clients.
type ClientManager struct {
mux sync.Mutex
clients map[*websocket.Conn]struct{}
}
// Register adds a new client (a websocket connection) to the ClientManager.
func (cMngr *ClientManager) Register(client *websocket.Conn) {
cMngr.mux.Lock()
defer cMngr.mux.Unlock()
......@@ -30,6 +33,8 @@ func (cMngr *ClientManager) Register(client *websocket.Conn) {
cMngr.clients[client] = struct{}{}
logrus.Println("Added new client: ", client.RemoteAddr())
}
// Deregister removes a client (a websocket connection) from the ClientManager.
func (cMngr *ClientManager) Deregister(client *websocket.Conn) {
cMngr.mux.Lock()
defer cMngr.mux.Unlock()
......@@ -37,6 +42,9 @@ func (cMngr *ClientManager) Deregister(client *websocket.Conn) {
delete(cMngr.clients, client)
logrus.Println("Removed client: ", client.RemoteAddr())
}
// Publish sends the given byte slice to all the clients (a client is a
// websocket connection).
func (cMngr *ClientManager) Publish(message []byte) {
cMngr.mux.Lock()
defer cMngr.mux.Unlock()
......@@ -52,12 +60,15 @@ func (cMngr *ClientManager) Publish(message []byte) {
}
}
// NewClientManager returns a new websocket ClientManager.
func NewClientManager() *ClientManager {
return &ClientManager{
clients: make(map[*websocket.Conn]struct{}),
}
}
// newUpgrader creates a new websocket.Upgrader and implements the OnOpen,
// OnClose and OnMessage functions.
func newUpgrader() *websocket.Upgrader {
u := websocket.NewUpgrader()
......@@ -70,12 +81,15 @@ func newUpgrader() *websocket.Upgrader {
})
u.OnMessage(func(c *websocket.Conn, messageType websocket.MessageType, data []byte) {
c.SetReadDeadline(time.Now().Add(keepaliveTimer))
if err := c.SetReadDeadline(time.Now().Add(keepaliveTimer)); err != nil {
logrus.Printf("Could not update ReadDeadline: %v\n", err)
}
})
return u
}
// onWebsocket is the handling function for a new websocket.
func onWebsocket(w http.ResponseWriter, r *http.Request) {
upgrader := newUpgrader()
conn, err := upgrader.Upgrade(w, r, nil)
......@@ -84,5 +98,8 @@ func onWebsocket(w http.ResponseWriter, r *http.Request) {
return
}
conn.SetReadDeadline(time.Now().Add(keepaliveTimer))
if err := conn.SetReadDeadline(time.Now().Add(keepaliveTimer)); err != nil {
logrus.Printf("Could not set initial ReadDeadline: %v\n", err)
return
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment