Newer
Older
// thanks to this https://github.com/krazygaurav/pubsub-go as starting point!
import (
"crypto/rand"
"fmt"
Malte Bauch
committed
stdpath "path"
Malte Bauch
committed
"github.com/openconfig/ygot/ygot"
log "github.com/sirupsen/logrus"
)
/* This implements a notification service based on the publish/subscribe principle.
* It is a simple pub/sub service with some predefined topics
* A more advanced usage could use yang paths instead of predefined topics,
* but this sounds more like a task for an external pub/sub system
*/
type InfoChangeMessage struct {
yangPath string // topic of the message
body *gnmi.Notification // the changes as gnmi notification
func NewInfoChangeMessage(msg *gnmi.Notification, yangPath string) *InfoChangeMessage {
// Returns the message object
return &InfoChangeMessage{
yangPath: yangPath,
body: msg,
}
}
func (m *InfoChangeMessage) GetYangPath() string {
// returns the topic of the message
return m.yangPath
}
func (m *InfoChangeMessage) GetMessageBody() *gnmi.Notification {
// returns the message body.
return m.body
}
type Subscribers map[string]*Subscriber
type Dispatcher struct {
subscribers Subscribers // map of subscribers id:Subscriber
topics map[string]Subscribers // map of topic to subscribers
mu sync.RWMutex // muex lock
}
func NewDispatcher() *Dispatcher {
return &Dispatcher{
subscribers: Subscribers{},
topics: map[string]Subscribers{},
}
}
func (b *Dispatcher) AddSubscriber(streamRequestor gnmi.GNMI_SubscribeServer) *Subscriber {
b.mu.Lock()
defer b.mu.Unlock()
b.subscribers[id] = s
return s
}
func (b *Dispatcher) RemoveSubscriber(s *Subscriber) {
for topic := range s.topics {
b.Unsubscribe(s, topic)
}
b.mu.Lock()
// remove subscriber from list of subscribers.
delete(b.subscribers, s.id)
b.mu.Unlock()
s.Destruct()
}
func (b *Dispatcher) Broadcast(msg *gnmi.Notification, topics []string) {
// broadcast message to all topics.
for _, topic := range topics {
for _, s := range b.topics[topic] {
m := NewInfoChangeMessage(msg, topic)
go (func(s *Subscriber) {
s.Signal(m)
})(s)
}
}
}
func (b *Dispatcher) GetSubscribers(topic string) int {
// get all subscribers subscribed to given topic.
b.mu.RLock()
defer b.mu.RUnlock()
return len(b.topics[topic])
}
func (b *Dispatcher) Subscribe(s *Subscriber, topic string) {
// subscribe to given topic
b.mu.Lock()
defer b.mu.Unlock()
if b.topics[topic] == nil {
b.topics[topic] = Subscribers{}
}
s.AddTopic(topic)
b.topics[topic][s.id] = s
Malte Bauch
committed
log.Printf("%s Subscribed for topic: %s", s.id, topic)
}
func (b *Dispatcher) Unsubscribe(s *Subscriber, topic string) {
// unsubscribe to given topic
b.mu.RLock()
defer b.mu.RUnlock()
delete(b.topics[topic], s.id)
s.RemoveTopic(topic)
Malte Bauch
committed
log.Printf("%s Unsubscribed for topic: %s", s.id, topic)

Neil-Jocelyn Schark
committed
// Publish the message for given topic.
Malte Bauch
committed
func (b *Dispatcher) Publish(stringPath string, msg *gnmi.Notification) {
path, err := ygot.StringToStructuredPath(stringPath)
if err != nil {

Neil-Jocelyn Schark
committed
log.Errorf("Publish/ygot.StringToStructuredPath of %s failed with %v", stringPath, err)
return
Malte Bauch
committed
}
topics, err := pathToStringList(path)
log.Debugf("topics: %v", topics)
Malte Bauch
committed
if err != nil {

Neil-Jocelyn Schark
committed
log.Errorf("Publish/pathToStringList failed with %v", err)
return
Malte Bauch
committed
}
for _, topic := range topics {
b.mu.RLock()
interestedSubscribers := b.topics[topic]
b.mu.RUnlock()
log.Debugf("checking topic: %s, has %d subscribers\n", topic, len(interestedSubscribers))
Malte Bauch
committed
for _, s := range interestedSubscribers {
m := NewInfoChangeMessage(msg, stringPath)
if !s.active {
return
}
go (func(s *Subscriber) {
s.Signal(m)
})(s)
}
}
}

Neil-Jocelyn Schark
committed
// TODO: rename and add description.
Malte Bauch
committed
func pathToStringList(path *gnmi.Path) ([]string, error) {
paths := []string{"/"}
Malte Bauch
committed
for p := path; len(p.Elem) != 0; p.Elem = p.Elem[:len(p.Elem)-1] {
pathStrings, err := ygot.PathToStrings(path)
if err != nil {
return nil, err
}
pathStringsAsSingleString := "/" + stdpath.Join(pathStrings...)
paths = append(paths, pathStringsAsSingleString)
if len(p.Elem[len(p.Elem)-1].Key) > 0 {
schemaPath, err := ygot.PathToSchemaPath(p)
if err != nil {
return nil, err
}
paths = append(paths, schemaPath)
Malte Bauch
committed
return paths, nil
}
type Subscriber struct {
id string // id of subscriber
messages chan *InfoChangeMessage // messages channel
topics map[string]bool // topics it is subscribed to.
streamRequestor gnmi.GNMI_SubscribeServer
active bool // if given subscriber is active
mutex sync.RWMutex // lock
func CreateNewSubscriber(streamRequestor gnmi.GNMI_SubscribeServer) (string, *Subscriber) {
// returns a new subscriber.
b := make([]byte, 8)
_, err := rand.Read(b)
if err != nil {
log.Fatal(err)
}
id := fmt.Sprintf("%X-%X", b[4:8], b[0:4])
Malte Bauch
committed
log.Printf("New subscriber with ID: %s\n", id)
return id, &Subscriber{
id: id,
messages: make(chan *InfoChangeMessage),
topics: map[string]bool{},
streamRequestor: streamRequestor,
active: true,
}
}
func (s *Subscriber) AddTopic(topic string) {
// add topic to the subscriber
s.mutex.RLock()
defer s.mutex.RUnlock()
s.topics[topic] = true
}
func (s *Subscriber) RemoveTopic(topic string) {
// remove topic to the subscriber
s.mutex.RLock()
defer s.mutex.RUnlock()
delete(s.topics, topic)
}
func (s *Subscriber) GetTopics() []string {
// Get all topic of the subscriber
s.mutex.RLock()
defer s.mutex.RUnlock()
topics := []string{}
for topic := range s.topics {
topics = append(topics, topic)
}
return topics
}
func (s *Subscriber) GetstreamRequestor() gnmi.GNMI_SubscribeServer {
return s.streamRequestor
}
func (s *Subscriber) Destruct() {
// destructor for subscriber.
s.mutex.RLock()
defer s.mutex.RUnlock()
s.active = false
close(s.messages)
}
func (s *Subscriber) Signal(msg *InfoChangeMessage) {
// Gets the message from the channel
s.mutex.RLock()
defer s.mutex.RUnlock()
if s.active {
s.messages <- msg
}
}
func (s *Subscriber) Listen() {
// Listens to the message channel, prints once received.
for {
if msg, ok := <-s.messages; ok {
log.Infof("Subscriber %s, received: %s from topic: %s\n", s.id, msg.GetMessageBody(), msg.GetYangPath())
}
}
}
func (s *Subscriber) ReturnListenChannel() (pubChannel chan *InfoChangeMessage) {
return s.messages
}