Skip to content
Snippets Groups Projects
subscribe.go 7.53 KiB
Newer Older
  • Learn to ignore specific revisions
  • Martin Stiemerling's avatar
    Martin Stiemerling committed
    package gnmiserver
    
    import (
    
    Martin Stiemerling's avatar
    Martin Stiemerling committed
    	"time"
    
    
    	"code.fbi.h-da.de/danet/gnmi-target/internal/notifications"
    
    Martin Stiemerling's avatar
    Martin Stiemerling committed
    	"github.com/openconfig/gnmi/proto/gnmi"
    	"github.com/openconfig/ygot/ygot"
    
    Martin Stiemerling's avatar
    Martin Stiemerling committed
    	"google.golang.org/grpc/codes"
    	"google.golang.org/grpc/status"
    
    	log "github.com/sirupsen/logrus"
    )
    
    /* Subscribe: First cut of an implementation
     *
     * Subscribe is called as go-routine and does not need to spawn a new go routine
     */
    func (s *Server) Subscribe(stream gnmi.GNMI_SubscribeServer) error {
    	var subscribeMode gnmi.SubscriptionList_Mode
    
    	// Sanitize the incoming Stream
    	subscriptionRequest, err := stream.Recv()
    	if err != nil {
    		log.Infof("Subscribe failed stream with %s", err)
    		return status.Error(codes.Internal, "stream.Recv() failed.")
    	}
    
    	// retrieve subscription paths
    	subscriptions := subscriptionRequest.GetSubscribe()
    
    
    	// Check if all of the subscription paths exist. If one is not part of the model, an error is returned.
    
    Martin Stiemerling's avatar
    Martin Stiemerling committed
    	for _, sub := range subscriptions.Subscription {
    
    		path := sub.GetPath()
    		// NOTE: Using ytypes.GetNode might not be the best solution to
    		// check if a given path exists. Since we do not really care about
    		// the node at all in this case.
    		if _, err := ytypes.GetNode(s.model.schemaTreeRoot, s.config, sub.GetPath()); err != nil {
    			return status.Error(codes.Internal, fmt.Sprintf("The provided path: %s, does not exist, subscribe not possible.", path.String()))
    		}
    
    		// Print the path a subscription is requested for.
    
    		log.Infof("Received subscribe for gnmi path %s with mode %s", path.String(), subscriptions.GetMode().String())
    
    Martin Stiemerling's avatar
    Martin Stiemerling committed
    	}
    
    	subscribeMode = subscriptions.GetMode()
    
    	switch subscribeMode {
    	case gnmi.SubscriptionList_ONCE:
    		log.Infoln("Subscribe ONCE is not yet implemented.")
    		return status.Error(codes.Unimplemented, "Subscribe ONCE is not yet implemented.")
    	case gnmi.SubscriptionList_POLL:
    		log.Infoln("Subscribe POLL is not yet implemented.")
    		return status.Error(codes.Unimplemented, "Subscribe POLL is not yet implemented.")
    	case gnmi.SubscriptionList_STREAM:
    
    		for _, sub := range subscriptions.Subscription {
    			switch sub.GetMode() {
    			case gnmi.SubscriptionMode_ON_CHANGE:
    				log.Infoln("Serving STREAM Subscribe SubscriptionMode_ON_CHANGE.")
    
    				if err := s.handleStreamOnChangeSubscription(stream, subscriptions); err != nil {
    					return status.Error(codes.Internal, err.Error())
    				}
    
    Martin Stiemerling's avatar
    Martin Stiemerling committed
    				log.Infoln("Stopping STREAM Subscribe SubscriptionMode_ON_CHANGE.")
    				return nil
    			case gnmi.SubscriptionMode_SAMPLE:
    				log.Infoln("SubscriptionMode_SAMPLE is not yet implemented.")
    				return status.Error(codes.Unimplemented, "SubscriptionMode_SAMPLE is not yet implemented.")
    			case gnmi.SubscriptionMode_TARGET_DEFINED:
    				log.Infoln("SubscriptionMode_TARGET_DEFINED is not yet implemented.")
    				return status.Error(codes.Unimplemented, "SubscriptionMode_TARGET_DEFINED is not yet implemented.")
    			}
    		}
    
    	}
    
    	return status.Error(codes.Unimplemented, "Subscribe ONCE is not yet implemented.")
    }
    
    // Subscribes to all required path with a single subscriber.
    
    func (s *Server) handleStreamOnChangeSubscription(stream gnmi.GNMI_SubscribeServer, subscriptions *gnmi.SubscriptionList) error {
    
    Martin Stiemerling's avatar
    Martin Stiemerling committed
    	streamSub := s.YangModelChangeDispatcher.AddSubscriber(stream)
    
    
    	// remove subscriber if done
    	defer s.YangModelChangeDispatcher.RemoveSubscriber(streamSub)
    
    
    Martin Stiemerling's avatar
    Martin Stiemerling committed
    	for _, sub := range subscriptions.Subscription {
    
    		path, err := ygot.PathToString(sub.GetPath())
    		if err != nil {
    			return err
    		}
    		s.YangModelChangeDispatcher.Subscribe(streamSub, path)
    
    Martin Stiemerling's avatar
    Martin Stiemerling committed
    	}
    
    	// Handler takes care of serving the stream
    	s.streamOnChangeSubscriptionHandler(streamSub)
    
    
    Martin Stiemerling's avatar
    Martin Stiemerling committed
    }
    
    func (s *Server) streamOnChangeSubscriptionHandler(notificationSubscriber *notifications.Subscriber) {
    	notificationChannel := notificationSubscriber.ReturnListenChannel()
    
    	streamReceiver := notificationSubscriber.GetstreamRequestor()
    
    
    	// TODO: send initial subscribe notification -> move into own function.
    	for _, topic := range notificationSubscriber.GetTopics() {
    		path, err := ygot.StringToStructuredPath(topic)
    		if err != nil {
    			log.Error(err)
    			return
    		}
    		node, err := ytypes.GetNode(s.model.schemaTreeRoot, s.config, path)
    		if err != nil {
    			log.Error(err)
    			return
    		}
    
    		var notifications []*gnmi.Notification
    		if node[0].Schema.IsLeaf() {
    			v, err := ygot.EncodeTypedValue(node[0].Data, gnmi.Encoding_JSON)
    			if err != nil {
    				log.Error(err)
    				return
    			}
    			notifications = []*gnmi.Notification{
    				&gnmi.Notification{
    					Timestamp: time.Now().UnixNano(),
    					Update: []*gnmi.Update{
    						&gnmi.Update{
    							Path: node[0].Path,
    							Val:  v,
    						},
    					},
    				},
    			}
    		} else {
    			notifications, err = ygot.TogNMINotifications(node[0].Data.(ygot.GoStruct), time.Now().UnixNano(), ygot.GNMINotificationsConfig{UsePathElem: true})
    			if err != nil {
    				log.Error(err)
    				return
    			}
    		}
    
    		for _, notification := range notifications {
    			sendErr := streamReceiver.Send(&gnmi.SubscribeResponse{
    				Response: &gnmi.SubscribeResponse_Update{
    					Update: notification,
    				},
    			})
    			if sendErr != nil {
    				log.Infof("streamReceiver.Send(subUpdate) failed with %s", sendErr)
    				return
    			}
    
    			sendErr = streamReceiver.Send(createSubscriptionSyncResponse())
    			if sendErr != nil {
    				log.Infof("streamReceiver.Send(createSubscriptionSyncResponse) failed with %s", sendErr)
    				return
    			}
    		}
    	}
    
    
    Martin Stiemerling's avatar
    Martin Stiemerling committed
    	for {
    
    Martin Stiemerling's avatar
    Martin Stiemerling committed
    			log.Infof("streamOnChangeSubscriptionHandler received topic: %s\n", msg.GetYangPath())
    			// generate subscription response
    			myNotification := msg.GetMessageBody()
    
    			updates := myNotification.GetUpdate()
    			if updates != nil {
    				for _, update := range updates {
    					subUpdate := createUpdateSubscriptionResponse(update)
    
    					sendErr := streamReceiver.Send(subUpdate)
    					if sendErr != nil {
    						log.Infof("streamReceiver.Send(subUpdate) failed with %s", sendErr)
    						return
    					}
    				}
    			}
    			deletes := myNotification.GetDelete()
    			if deletes != nil {
    				for _, deleted := range deletes {
    					deletedResponse := createDeleteSubscriptionResponse(deleted)
    
    					sendErr := streamReceiver.Send(deletedResponse)
    					if sendErr != nil {
    						log.Infof("streamReceiver.Send(deletedResponse) failed with %s", sendErr)
    						return
    					}
    				}
    			}
    			sendErr := streamReceiver.Send(createSubscriptionSyncResponse())
    			if sendErr != nil {
    				log.Infof("streamReceiver.Send(createSubscriptionSyncResponse) failed with %s", sendErr)
    				return
    			}
    
    Martin Stiemerling's avatar
    Martin Stiemerling committed
    		}
    	}
    }
    
    func createUpdateSubscriptionResponse(update *gnmi.Update) *gnmi.SubscribeResponse {
    	updates := make([]*gnmi.Update, 0)
    	updates = append(updates, update)
    
    	// create gnmi notification
    	notification := &gnmi.Notification{
    		Timestamp: time.Now().UnixNano(),
    		Update:    updates,
    	}
    
    	responseUpdate := &gnmi.SubscribeResponse_Update{
    		Update: notification,
    	}
    	return &gnmi.SubscribeResponse{
    		Response: responseUpdate,
    	}
    }
    
    func createDeleteSubscriptionResponse(deletedPath *gnmi.Path) *gnmi.SubscribeResponse {
    	deletedPaths := make([]*gnmi.Path, 0)
    	deletedPaths = append(deletedPaths, deletedPath)
    
    	// create gnmi notification with deleted paths
    	notification := &gnmi.Notification{
    		Timestamp: time.Now().UnixNano(),
    		Delete:    deletedPaths,
    	}
    
    	responseUpdate := &gnmi.SubscribeResponse_Update{
    		Update: notification,
    	}
    	return &gnmi.SubscribeResponse{
    		Response: responseUpdate,
    	}
    }
    
    func createSubscriptionSyncResponse() *gnmi.SubscribeResponse {
    	responseSync := &gnmi.SubscribeResponse_SyncResponse{
    		SyncResponse: true,
    	}
    	return &gnmi.SubscribeResponse{
    		Response: responseSync,
    	}
    }