package nucleus import ( "context" "fmt" "strings" "code.fbi.h-da.de/danet/gosdn/controller/customerrs" "code.fbi.h-da.de/danet/gosdn/controller/event" eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport" "code.fbi.h-da.de/danet/gosdn/controller/nucleus/types" "code.fbi.h-da.de/danet/gosdn/controller/store" "code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi" "github.com/google/uuid" gpb "github.com/openconfig/gnmi/proto/gnmi" log "github.com/sirupsen/logrus" ) const ( subscribeSampleInterval uint64 = 1000000000 // 1 second in nanoseconds // TODO: These gNMI options are adjusted to arista gNMI fork. Change when switching to native gNMI. gNMISubscribeMode string = "stream" gNMIStreamMode string = "on_change" ) // NetworkElementWatcher is a component that subscribes to network elements via gNMI from within the controller and handles // responses by triggering the internal event process. type NetworkElementWatcher struct { pndStore networkdomain.PndStore networkelementSubcriptions map[uuid.UUID]*networkelementSubscriptionHelper eventService eventInterfaces.Service } // networkelementSubscriptionHelper is used to store information to stop a running subscribe go routine. type networkelementSubscriptionHelper struct { stopSubscribeCtx context.Context stopFunc context.CancelFunc } // NewNetworkElementWatcher takes a pndStore to subscribe to network element paths. func NewNetworkElementWatcher(pndStore networkdomain.PndStore, eventService eventInterfaces.Service) *NetworkElementWatcher { return &NetworkElementWatcher{ pndStore: pndStore, networkelementSubcriptions: make(map[uuid.UUID]*networkelementSubscriptionHelper), eventService: eventService, } } // SubToNetworkElements subscribes to every available network element in each network domain according to provided SubscribeOptions. // Paths should be provided in the following format [][]string{{"system", "config", "hostname"}} // SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second). func (n *NetworkElementWatcher) SubToNetworkElements(paths [][]string, opts *gnmi.SubscribeOptions) { if opts == nil { opts = &gnmi.SubscribeOptions{ Mode: gNMISubscribeMode, StreamMode: gNMIStreamMode, Paths: paths, SampleInterval: subscribeSampleInterval, } } pnds, err := n.pndStore.GetAll() if err != nil { log.Error(err) } for _, pnd := range pnds { n.subscribeToPndNetworkElements(pnd, opts) } } func (n *NetworkElementWatcher) subscribeToPndNetworkElements(pnd networkdomain.NetworkDomain, opts *gnmi.SubscribeOptions) { for _, mne := range pnd.NetworkElements() { subID := uuid.New() stopContext, cancel := context.WithCancel(context.Background()) n.addToNetworkElementSubscriptions(subID, &networkelementSubscriptionHelper{ stopSubscribeCtx: stopContext, stopFunc: cancel, }) go n.callSubscribe(stopContext, pnd.ID().String(), mne, opts) } } func (n *NetworkElementWatcher) callSubscribe(stopContext context.Context, pndID string, mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) { gNMIOptionsCtx := context.Background() gNMIOptionsCtx = context.WithValue(gNMIOptionsCtx, types.CtxKeyOpts, opts) // SubscriptionInformation contains pnd ID, network element ID and name to be used in the internal subscribe to check // from which network element a response was sent if err := mne.Transport().ControlPlaneSubscribe(gNMIOptionsCtx, n.handleSubscribeResponse, &transport.SubscriptionInformation{ PndID: pndID, NetworkElementID: mne.ID().String(), NetworkElementName: mne.Name(), StopContext: stopContext, }); err != nil { log.Error(err) } } func (n *NetworkElementWatcher) addToNetworkElementSubscriptions(subID uuid.UUID, devSub *networkelementSubscriptionHelper) { //TODO: improve handling of subscriptions, like be able to expose to apps so specific subscriptions instead of only all can be stopped in the future n.networkelementSubcriptions[subID] = devSub } // StopAndRemoveAllNetworkElementSubscriptions stops and removes all the available running subscriptions. func (n *NetworkElementWatcher) StopAndRemoveAllNetworkElementSubscriptions() { for key := range n.networkelementSubcriptions { n.StopAndRemoveNetworkElementSubscription(key) } } // StopAndRemoveNetworkElementSubscription passes a subscription uuid to stop the running subscription go routing and removes the entry from the map // of network element subscriptions. func (n *NetworkElementWatcher) StopAndRemoveNetworkElementSubscription(subID uuid.UUID) { n.networkelementSubcriptions[subID].stopFunc() delete(n.networkelementSubcriptions, subID) } // handleSubscribeResponse takes the subscribe response and additional information about the network element to distinguish // from which network element a subscribe response was sent including improved error handling. func (n *NetworkElementWatcher) handleSubscribeResponse(resp *gpb.SubscribeResponse, subscriptionInfo *transport.SubscriptionInformation) { switch resp := resp.Response.(type) { case *gpb.SubscribeResponse_Error: log.Error(&customerrs.SubscribeResponseError{ PndID: subscriptionInfo.PndID, NetworkElementID: subscriptionInfo.NetworkElementID, NetworkElementName: subscriptionInfo.NetworkElementName, Err: fmt.Sprint("SubscribeResponse_Error"), }) case *gpb.SubscribeResponse_SyncResponse: if !resp.SyncResponse { log.Error(&customerrs.SubscribeSyncResponseError{ PndID: subscriptionInfo.PndID, NetworkElementID: subscriptionInfo.NetworkElementID, NetworkElementName: subscriptionInfo.NetworkElementName, }) } case *gpb.SubscribeResponse_Update: n.handleSubscribeResponseUpdate(resp, subscriptionInfo) default: log.Infof("Invalid SubscribeResponse, %v", resp) } } func (n *NetworkElementWatcher) handleSubscribeResponseUpdate(resp *gpb.SubscribeResponse_Update, subscriptionInfo *transport.SubscriptionInformation) { pndID, err := uuid.Parse(subscriptionInfo.PndID) if err != nil { log.Error(err) } pnd, err := n.pndStore.Get(store.Query{ID: pndID}) if err != nil { log.Error(err) } mne, err := pnd.GetNetworkElement(subscriptionInfo.NetworkElementID) if err != nil { log.Error(err) } err = mne.Transport().ProcessControlPlaneSubscribeResponse(resp) if err != nil { log.Error(err) } pathsAndValues := make(map[string]string, len(resp.Update.Update)) for _, update := range resp.Update.Update { pathString := "" // go through elem to build full path for _, elem := range update.Path.Elem { // remove unwanted parts of path string example: "name:\"system\"" -> "system" filteredElem := elem.String()[strings.Index(elem.String(), ":\"")+2 : len(elem.String())-1] pathString += filteredElem + "/" } pathsAndValues[pathString] = update.Val.GetStringVal() } pubEvent := event.NewMneUpdateEvent(mne.ID(), pathsAndValues) if err := n.eventService.PublishEvent(NetworkElementEventTopic, pubEvent); err != nil { go func() { n.eventService.Reconnect() retryErr := n.eventService.RetryPublish(NetworkElementEventTopic, pubEvent) if retryErr != nil { log.Error(retryErr) } }() } }