Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package main
import (
"net/http"
"sync"
"time"
"github.com/hashicorp/go-multierror"
"github.com/lesismal/nbio/nbhttp/websocket"
"github.com/sirupsen/logrus"
)
// This implementation follows the code example at:
// https://github.com/lesismal/nbio/issues/92#issuecomment-922183823
var (
clientManager = NewClientManager()
keepaliveTimer = time.Second * 60
)
// ClientManager holds all the current active websocket connections as so
// called clients.
type ClientManager struct {
mux sync.Mutex
clients map[*websocket.Conn]struct{}
}
// Register adds a new client (a websocket connection) to the ClientManager.
func (cMngr *ClientManager) Register(client *websocket.Conn) {
cMngr.mux.Lock()
defer cMngr.mux.Unlock()
cMngr.clients[client] = struct{}{}
logrus.Println("Added new client: ", client.RemoteAddr())
}
// Deregister removes a client (a websocket connection) from the ClientManager.
func (cMngr *ClientManager) Deregister(client *websocket.Conn) {
cMngr.mux.Lock()
defer cMngr.mux.Unlock()
delete(cMngr.clients, client)
logrus.Println("Removed client: ", client.RemoteAddr())
}
// Publish sends the given byte slice to all the clients (a client is a
// websocket connection).
func (cMngr *ClientManager) Publish(message []byte) {
cMngr.mux.Lock()
defer cMngr.mux.Unlock()
var eg multierror.Group
for c := range cMngr.clients {
eg.Go(func() error {
return c.WriteMessage(websocket.TextMessage, message)
})
}
if err := eg.Wait(); err != nil {
logrus.Printf("Publish encountered errors while broadcasting: %v\n", err)
}
}
// NewClientManager returns a new websocket ClientManager.
func NewClientManager() *ClientManager {
return &ClientManager{
clients: make(map[*websocket.Conn]struct{}),
}
}
// newUpgrader creates a new websocket.Upgrader and implements the OnOpen,
// OnClose and OnMessage functions.
func newUpgrader() *websocket.Upgrader {
u := websocket.NewUpgrader()
u.OnOpen(func(c *websocket.Conn) {
clientManager.Register(c)
})
u.OnClose(func(c *websocket.Conn, err error) {
clientManager.Deregister(c)
})
u.OnMessage(func(c *websocket.Conn, messageType websocket.MessageType, data []byte) {
if err := c.SetReadDeadline(time.Now().Add(keepaliveTimer)); err != nil {
logrus.Printf("Could not update ReadDeadline: %v\n", err)
}
})
return u
}
// onWebsocket is the handling function for a new websocket.
func onWebsocket(w http.ResponseWriter, r *http.Request) {
upgrader := newUpgrader()
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
logrus.Println("Could not create websocket")
return
}
if err := conn.SetReadDeadline(time.Now().Add(keepaliveTimer)); err != nil {
logrus.Printf("Could not set initial ReadDeadline: %v\n", err)
return
}
}