diff --git a/applications/ws-events/main.go b/applications/ws-events/main.go index 53ca1520d2d68918434565b58239abeee2ffb7fa..096d304b7268fb6bd5305753f333be4f17419036 100644 --- a/applications/ws-events/main.go +++ b/applications/ws-events/main.go @@ -1,6 +1,7 @@ package main import ( + "flag" "os" "code.fbi.h-da.de/danet/gosdn/application-framework/event" @@ -9,7 +10,10 @@ import ( ) func main() { - queueCredentials, err := registration.Register("localhost:55055", "ws-events", "SecurePresharedToken") + controllerAddress := flag.String("controllerAddress", "localhost:55055", "The address to a goSDN controller to register at.") + flag.Parse() + + queueCredentials, err := registration.Register(*controllerAddress, "ws-events", "SecurePresharedToken") if err != nil { logrus.Errorf("failed to register application on control plane. %v", err) os.Exit(1) diff --git a/applications/ws-events/ws.go b/applications/ws-events/ws.go index 8db7fba8c72ada9e2618d5624f48cffbcefcd709..e863c0da1c1d0d10853b32d9640ab9d4bd875a0b 100644 --- a/applications/ws-events/ws.go +++ b/applications/ws-events/ws.go @@ -2,47 +2,62 @@ package main import ( "net/http" + "sync" "time" + "github.com/hashicorp/go-multierror" "github.com/lesismal/nbio/nbhttp/websocket" "github.com/sirupsen/logrus" - "golang.org/x/sync/errgroup" ) +// This implementation follows the code example at: +// https://github.com/lesismal/nbio/issues/92#issuecomment-922183823 + var ( clientManager = NewClientManager() keepaliveTimer = time.Second * 30 ) type ClientManager struct { + mux sync.Mutex clients map[*websocket.Conn]struct{} } func (cMngr *ClientManager) Register(client *websocket.Conn) { + cMngr.mux.Lock() + defer cMngr.mux.Unlock() + cMngr.clients[client] = struct{}{} logrus.Println("Added new client: ", client.RemoteAddr()) } func (cMngr *ClientManager) Deregister(client *websocket.Conn) { + cMngr.mux.Lock() + defer cMngr.mux.Unlock() + delete(cMngr.clients, client) logrus.Println("Removed client: ", client.RemoteAddr()) } func (cMngr *ClientManager) Publish(message []byte) { - var eg errgroup.Group + cMngr.mux.Lock() + defer cMngr.mux.Unlock() + + var eg multierror.Group for c := range cMngr.clients { eg.Go(func() error { - logrus.Println("Publishing to client", c.RemoteAddr()) return c.WriteMessage(websocket.TextMessage, message) }) - if err := eg.Wait(); err != nil { - logrus.Printf("Websocket failed to broadcast message to: %s, %v\n", c.Conn.RemoteAddr().String(), err) - } + } + if err := eg.Wait(); err != nil { + logrus.Printf("Publish encountered errors while broadcasting: %v\n", err) } } + func NewClientManager() *ClientManager { return &ClientManager{ clients: make(map[*websocket.Conn]struct{}), } } + func newUpgrader() *websocket.Upgrader { u := websocket.NewUpgrader() diff --git a/go.mod b/go.mod index b2a7b0a9a01e95960f9c4b5a7204f18de04de72a..7418d3c61f556f6f6bd91a2f967f1479b47b076b 100644 --- a/go.mod +++ b/go.mod @@ -89,6 +89,7 @@ require ( ) require ( + github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/go-plugin v1.4.10 github.com/lesismal/nbio v1.3.17 google.golang.org/genproto/googleapis/api v0.0.0-20230807174057-1744710a1577 @@ -100,6 +101,7 @@ require ( atomicgo.dev/schedule v0.0.2 // indirect github.com/containerd/console v1.0.3 // indirect github.com/fatih/color v1.15.0 // indirect + github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-hclog v1.5.0 // indirect github.com/hashicorp/yamux v0.1.1 // indirect github.com/lesismal/llib v1.1.12 // indirect diff --git a/go.sum b/go.sum index 2a87df3a10a293e16542e1dfe1f1e7cf1fa3d8c4..449ad0e68f3b71c402da08e8051570ef87803da1 100644 --- a/go.sum +++ b/go.sum @@ -540,6 +540,7 @@ github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBt github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8= github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-hclog v1.5.0 h1:bI2ocEMgcVlz55Oj1xZNBsVi900c7II+fWDyV9o+13c= @@ -548,6 +549,7 @@ github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjh github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-multierror v0.0.0-20161216184304-ed905158d874/go.mod h1:JMRHfdO9jKNzS/+BTlxCjKNQHg/jZAft8U7LloJvN7I= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/go-plugin v1.4.10 h1:xUbmA4jC6Dq163/fWcp8P3JuHilrHHMLNRxzGQJ9hNk= github.com/hashicorp/go-plugin v1.4.10/go.mod h1:6/1TEzT0eQznvI/gV2CM29DLSkAK/e58mUWKVsPaph0=