Skip to content
Snippets Groups Projects
notifications.go 6.42 KiB
Newer Older
  • Learn to ignore specific revisions
  • Martin Stiemerling's avatar
    Martin Stiemerling committed
    package notifications
    
    
    // thanks to this https://github.com/krazygaurav/pubsub-go as starting point!
    
    import (
    	"crypto/rand"
    	"fmt"
    
    Martin Stiemerling's avatar
    Martin Stiemerling committed
    	"github.com/openconfig/gnmi/proto/gnmi"
    
    	log "github.com/sirupsen/logrus"
    )
    
    
    /* This implements a notification service based on the publish/subscribe principle.
     * It is a simple pub/sub service with some predefined topics
     * A more advanced usage could use yang paths instead of predefined topics,
     * but this sounds more like a task for an external pub/sub system
     */
    
    
    type InfoChangeMessage struct {
    
    Martin Stiemerling's avatar
    Martin Stiemerling committed
    	yangPath string             // topic of the message
    	body     *gnmi.Notification // the changes as gnmi notification
    
    Martin Stiemerling's avatar
    Martin Stiemerling committed
    func NewInfoChangeMessage(msg *gnmi.Notification, yangPath string) *InfoChangeMessage {
    
    	// Returns the message object
    	return &InfoChangeMessage{
    		yangPath: yangPath,
    		body:     msg,
    	}
    }
    func (m *InfoChangeMessage) GetYangPath() string {
    	// returns the topic of the message
    	return m.yangPath
    }
    
    Martin Stiemerling's avatar
    Martin Stiemerling committed
    func (m *InfoChangeMessage) GetMessageBody() *gnmi.Notification {
    
    	// returns the message body.
    	return m.body
    }
    
    type Subscribers map[string]*Subscriber
    
    type Dispatcher struct {
    	subscribers Subscribers            // map of subscribers id:Subscriber
    	topics      map[string]Subscribers // map of topic to subscribers
    	mu          sync.RWMutex           // muex lock
    }
    
    func NewDispatcher() *Dispatcher {
    	return &Dispatcher{
    		subscribers: Subscribers{},
    		topics:      map[string]Subscribers{},
    	}
    }
    
    
    Martin Stiemerling's avatar
    Martin Stiemerling committed
    func (b *Dispatcher) AddSubscriber(streamRequestor gnmi.GNMI_SubscribeServer) *Subscriber {
    
    	b.mu.Lock()
    	defer b.mu.Unlock()
    
    Martin Stiemerling's avatar
    Martin Stiemerling committed
    	id, s := CreateNewSubscriber(streamRequestor)
    
    	b.subscribers[id] = s
    	return s
    }
    
    func (b *Dispatcher) RemoveSubscriber(s *Subscriber) {
    	for topic := range s.topics {
    		b.Unsubscribe(s, topic)
    	}
    	b.mu.Lock()
    	// remove subscriber from list of subscribers.
    	delete(b.subscribers, s.id)
    	b.mu.Unlock()
    	s.Destruct()
    }
    
    
    Martin Stiemerling's avatar
    Martin Stiemerling committed
    func (b *Dispatcher) Broadcast(msg *gnmi.Notification, topics []string) {
    
    	// broadcast message to all topics.
    	for _, topic := range topics {
    		for _, s := range b.topics[topic] {
    			m := NewInfoChangeMessage(msg, topic)
    			go (func(s *Subscriber) {
    				s.Signal(m)
    			})(s)
    		}
    	}
    }
    
    func (b *Dispatcher) GetSubscribers(topic string) int {
    	// get all subscribers subscribed to given topic.
    	b.mu.RLock()
    	defer b.mu.RUnlock()
    	return len(b.topics[topic])
    }
    
    func (b *Dispatcher) Subscribe(s *Subscriber, topic string) {
    	// subscribe to given topic
    	b.mu.Lock()
    	defer b.mu.Unlock()
    
    	if b.topics[topic] == nil {
    		b.topics[topic] = Subscribers{}
    	}
    	s.AddTopic(topic)
    	b.topics[topic][s.id] = s
    
    	log.Printf("%s Subscribed for topic: %s", s.id, topic)
    
    }
    
    func (b *Dispatcher) Unsubscribe(s *Subscriber, topic string) {
    	// unsubscribe to given topic
    	b.mu.RLock()
    	defer b.mu.RUnlock()
    
    	delete(b.topics[topic], s.id)
    	s.RemoveTopic(topic)
    
    	log.Printf("%s Unsubscribed for topic: %s", s.id, topic)
    
    func (b *Dispatcher) Publish(stringPath string, msg *gnmi.Notification) {
    	path, err := ygot.StringToStructuredPath(stringPath)
    	if err != nil {
    
    		log.Errorf("Publish/ygot.StringToStructuredPath of %s failed with %v", stringPath, err)
    		return
    
    	log.Debugf("topics: %v", topics)
    
    		log.Errorf("Publish/pathToStringList failed with %v", err)
    		return
    
    	}
    	for _, topic := range topics {
    		b.mu.RLock()
    		interestedSubscribers := b.topics[topic]
    		b.mu.RUnlock()
    
    		log.Debugf("checking topic: %s, has %d subscribers\n", topic, len(interestedSubscribers))
    
    		for _, s := range interestedSubscribers {
    			m := NewInfoChangeMessage(msg, stringPath)
    			if !s.active {
    				return
    			}
    			go (func(s *Subscriber) {
    				s.Signal(m)
    			})(s)
    		}
    	}
    }
    
    
    func pathToStringList(path *gnmi.Path) ([]string, error) {
    
    	for p := path; len(p.Elem) != 0; p.Elem = p.Elem[:len(p.Elem)-1] {
    		pathStrings, err := ygot.PathToStrings(path)
    		if err != nil {
    			return nil, err
    		}
    		pathStringsAsSingleString := "/" + stdpath.Join(pathStrings...)
    		paths = append(paths, pathStringsAsSingleString)
    
    		if len(p.Elem[len(p.Elem)-1].Key) > 0 {
    			schemaPath, err := ygot.PathToSchemaPath(p)
    			if err != nil {
    				return nil, err
    			}
    			paths = append(paths, schemaPath)
    
    Martin Stiemerling's avatar
    Martin Stiemerling committed
    	id              string                  // id of subscriber
    	messages        chan *InfoChangeMessage // messages channel
    	topics          map[string]bool         // topics it is subscribed to.
    	streamRequestor gnmi.GNMI_SubscribeServer
    	active          bool         // if given subscriber is active
    	mutex           sync.RWMutex // lock
    
    Martin Stiemerling's avatar
    Martin Stiemerling committed
    func CreateNewSubscriber(streamRequestor gnmi.GNMI_SubscribeServer) (string, *Subscriber) {
    
    	// returns a new subscriber.
    	b := make([]byte, 8)
    	_, err := rand.Read(b)
    	if err != nil {
    		log.Fatal(err)
    	}
    	id := fmt.Sprintf("%X-%X", b[4:8], b[0:4])
    
    	log.Printf("New subscriber with ID: %s\n", id)
    
    	return id, &Subscriber{
    
    Martin Stiemerling's avatar
    Martin Stiemerling committed
    		id:              id,
    		messages:        make(chan *InfoChangeMessage),
    		topics:          map[string]bool{},
    		streamRequestor: streamRequestor,
    		active:          true,
    
    	}
    }
    
    func (s *Subscriber) AddTopic(topic string) {
    	// add topic to the subscriber
    	s.mutex.RLock()
    	defer s.mutex.RUnlock()
    	s.topics[topic] = true
    }
    
    func (s *Subscriber) RemoveTopic(topic string) {
    	// remove topic to the subscriber
    	s.mutex.RLock()
    	defer s.mutex.RUnlock()
    	delete(s.topics, topic)
    }
    
    func (s *Subscriber) GetTopics() []string {
    	// Get all topic of the subscriber
    	s.mutex.RLock()
    	defer s.mutex.RUnlock()
    	topics := []string{}
    
    		topics = append(topics, topic)
    	}
    	return topics
    }
    
    
    Martin Stiemerling's avatar
    Martin Stiemerling committed
    func (s *Subscriber) GetstreamRequestor() gnmi.GNMI_SubscribeServer {
    	return s.streamRequestor
    }
    
    
    func (s *Subscriber) Destruct() {
    	// destructor for subscriber.
    	s.mutex.RLock()
    	defer s.mutex.RUnlock()
    	s.active = false
    	close(s.messages)
    }
    
    func (s *Subscriber) Signal(msg *InfoChangeMessage) {
    	// Gets the message from the channel
    	s.mutex.RLock()
    	defer s.mutex.RUnlock()
    	if s.active {
    		s.messages <- msg
    	}
    }
    
    func (s *Subscriber) Listen() {
    	// Listens to the message channel, prints once received.
    	for {
    		if msg, ok := <-s.messages; ok {
    			log.Infof("Subscriber %s, received: %s from topic: %s\n", s.id, msg.GetMessageBody(), msg.GetYangPath())
    		}
    	}
    }
    
    Martin Stiemerling's avatar
    Martin Stiemerling committed
    
    func (s *Subscriber) ReturnListenChannel() (pubChannel chan *InfoChangeMessage) {
    	return s.messages
    }