package nucleus import ( "context" "errors" "fmt" "strconv" "sync" "code.fbi.h-da.de/danet/gosdn/controller/config" "code.fbi.h-da.de/danet/gosdn/controller/customerrs" "code.fbi.h-da.de/danet/gosdn/controller/event" eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport" "code.fbi.h-da.de/danet/gosdn/controller/nucleus/types" "code.fbi.h-da.de/danet/gosdn/controller/store" "code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi" "github.com/google/uuid" gpb "github.com/openconfig/gnmi/proto/gnmi" ygotutil "github.com/openconfig/ygot/util" "github.com/openconfig/ygot/ygot" log "github.com/sirupsen/logrus" ) const ( subscribeSampleInterval uint64 = 1000000000 // 1 second in nanoseconds // Note: These gNMI options are adjusted to arista gNMI fork. Change when switching to native gNMI. gNMISubscribeMode string = "stream" gNMIStreamMode string = "on_change" numberWorkers int = 5 workerQueueSize int = 1000 ) // NetworkElementWatcher is a component that subscribes to network elements via gNMI from within the controller and handles // responses by triggering the internal event process. type NetworkElementWatcher struct { mneService networkelement.Service networkelementSubscriptionsMutex sync.Mutex networkelementSubcriptions map[uuid.UUID]*networkelementSubscriptionHelper eventService eventInterfaces.Service } // networkelementSubscriptionHelper is used to store information to stop a running subscribe go routine. type networkelementSubscriptionHelper struct { SubID string MneID string MneName string PndID string stopSubscribeCtx context.Context stopFunc context.CancelFunc Opts *gnmi.SubscribeOptions } // NewNetworkElementWatcher allows to subscribe to network element paths. func NewNetworkElementWatcher(mneService networkelement.Service, eventService eventInterfaces.Service) *NetworkElementWatcher { return &NetworkElementWatcher{ mneService: mneService, networkelementSubcriptions: make(map[uuid.UUID]*networkelementSubscriptionHelper), eventService: eventService, } } // SubscribeToNetworkElements subscribes to every available network element in each network domain according to provided SubscribeOptions. // SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second). func (n *NetworkElementWatcher) SubscribeToNetworkElements(subScriptionInfos []*networkelementSubscriptionHelper) { var tmpOpts *gnmi.SubscribeOptions if len(subScriptionInfos) == 0 { tmpOpts = &gnmi.SubscribeOptions{ Mode: gNMISubscribeMode, StreamMode: gNMIStreamMode, SampleInterval: subscribeSampleInterval, } } mnes, err := n.mneService.GetAll() if err != nil { log.Error(err) return } for _, mne := range mnes { if len(subScriptionInfos) > 0 { tmpOpts, err = getOptionsForMne(mne.ID().String(), subScriptionInfos) if err != nil { log.Infof("Couldn't find options for mne %s, reason: %v. \n Skipping subscription.", mne.Name(), err) continue } } n.subscribeToNetworkElement(mne, tmpOpts) } } // SubscribeToNetworkElement subscribes to the provided network element according to provided SubscribeOptions. // SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second). func (n *NetworkElementWatcher) SubscribeToNetworkElement(mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) { if opts == nil { opts = &gnmi.SubscribeOptions{ Mode: gNMISubscribeMode, StreamMode: gNMIStreamMode, SampleInterval: subscribeSampleInterval, } } n.subscribeToNetworkElement(mne, opts) } // SubscribeToNetworkElementWithID subscribes to the network element matching the provided ID according to provided SubscribeOptions. // SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second). func (n *NetworkElementWatcher) SubscribeToNetworkElementWithID(mneID uuid.UUID, opts *gnmi.SubscribeOptions) error { if opts == nil { opts = &gnmi.SubscribeOptions{ Mode: gNMISubscribeMode, StreamMode: gNMIStreamMode, SampleInterval: subscribeSampleInterval, } } mne, err := n.mneService.Get(store.Query{ID: mneID}) if err != nil { log.Error(err) return err } n.subscribeToNetworkElement(mne, opts) return nil } func (n *NetworkElementWatcher) subscribeToNetworkElement(mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) { subID := uuid.New() opts.Paths = n.mergeGnmiSubscriptions(mne.GetGnmiSubscriptionPaths(), config.GetGnmiSubscriptionPaths()) stopContext, cancel := context.WithCancel(context.Background()) n.addToNetworkElementSubscriptions(subID, &networkelementSubscriptionHelper{ stopSubscribeCtx: stopContext, stopFunc: cancel, SubID: subID.String(), MneID: mne.ID().String(), MneName: mne.Name(), PndID: mne.PndID().String(), Opts: opts, }) go n.callSubscribe(stopContext, mne, opts) } // callSubscribe spawns a worker pool to handle gNMI subscription updates for each individual subscription // and then sets up the gNMI subscription listener. func (n *NetworkElementWatcher) callSubscribe(stopContext context.Context, mne networkelement.NetworkElement, opts *gnmi.SubscribeOptions) { gNMIOptionsCtx := context.Background() gNMIOptionsCtx = context.WithValue(gNMIOptionsCtx, types.CtxKeyOpts, opts) subInfoChan := make(chan transport.SubscriptionInformation, workerQueueSize) for i := 1; i <= numberWorkers; i++ { name := "Worker " + strconv.Itoa(i) worker := NewSubscriptionQueueWorker(name, n.handleSubscribeResponse) go worker.HandleGnmiSubscriptionUpdates(subInfoChan) } // SubscriptionInformation contains pnd ID, network element ID and name to be used in the internal subscribe to check // from which network element a response was sent if err := mne.Transport().ControlPlaneSubscribe(gNMIOptionsCtx, transport.SubscriptionInformation{ PndID: mne.PndID().String(), NetworkElementID: mne.ID().String(), NetworkElementName: mne.Name(), StopContext: stopContext, }, subInfoChan); err != nil { log.Error(err) } } func (n *NetworkElementWatcher) addToNetworkElementSubscriptions(subID uuid.UUID, devSub *networkelementSubscriptionHelper) { n.networkelementSubscriptionsMutex.Lock() defer n.networkelementSubscriptionsMutex.Unlock() n.networkelementSubcriptions[subID] = devSub } // StopAndRemoveAllNetworkElementSubscriptions stops and removes all the available running subscriptions. func (n *NetworkElementWatcher) StopAndRemoveAllNetworkElementSubscriptions() { for key := range n.networkelementSubcriptions { n.StopAndRemoveNetworkElementSubscription(key) } } // StopAndRemoveNetworkElementSubscription passes a subscription uuid to stop the running subscription go routing and removes the entry from the map // of network element subscriptions. func (n *NetworkElementWatcher) StopAndRemoveNetworkElementSubscription(subID uuid.UUID) { n.networkelementSubscriptionsMutex.Lock() defer n.networkelementSubscriptionsMutex.Unlock() n.networkelementSubcriptions[subID].stopFunc() delete(n.networkelementSubcriptions, subID) } // handleSubscribeResponse takes the subscribe response and additional information about the network element to distinguish // from which network element a subscribe response was sent including improved error handling. func (n *NetworkElementWatcher) handleSubscribeResponse(subscriptionInfo *transport.SubscriptionInformation, workerName string) { log.Debugf("Received Subscribe response: MNE ID: %s, MNE Name: %s, SubResponse: %v", subscriptionInfo.NetworkElementID, subscriptionInfo.NetworkElementName, subscriptionInfo.SubResponse) if subscriptionInfo.SubResponse == nil { // Note: This needs proper error handling, no idea how yet. Simply logging would lead to spam in the console // if the target that was subscribed to is not reachable anymore. // log.Error("Error: subresponse == nil") } else { switch resp := subscriptionInfo.SubResponse.Response.(type) { case *gpb.SubscribeResponse_Error: log.Error(&customerrs.SubscribeResponseError{ PndID: subscriptionInfo.PndID, NetworkElementID: subscriptionInfo.NetworkElementID, NetworkElementName: subscriptionInfo.NetworkElementName, Err: fmt.Sprint("SubscribeResponse_Error"), }) case *gpb.SubscribeResponse_SyncResponse: if !resp.SyncResponse { log.Error(&customerrs.SubscribeSyncResponseError{ PndID: subscriptionInfo.PndID, NetworkElementID: subscriptionInfo.NetworkElementID, NetworkElementName: subscriptionInfo.NetworkElementName, }) } case *gpb.SubscribeResponse_Update: n.handleSubscribeResponseUpdate(resp, subscriptionInfo) default: log.Infof("Invalid SubscribeResponse, %v", resp) } } } func (n *NetworkElementWatcher) handleSubscribeResponseUpdate(resp *gpb.SubscribeResponse_Update, subscriptionInfo *transport.SubscriptionInformation) { pathsAndValues := make(map[string]string, len(resp.Update.Update)) if resp.Update == nil || len(resp.Update.Update) == 0 { log.Debugf("handleSubscribeResponseUpdate empty update or updates; Update: %v, InnerUpdates: %v", resp.Update, resp.Update.Update) return } for _, update := range resp.Update.Update { pathString, err := ygot.PathToString(update.Path) if err != nil { log.Errorf("Error trying to create a string from path: %v", err) } switch v := update.GetVal().GetValue().(type) { case *gpb.TypedValue_StringVal: pathsAndValues[pathString] = update.Val.GetStringVal() case *gpb.TypedValue_JsonIetfVal: pathsAndValues[pathString] = string(update.GetVal().GetJsonIetfVal()) case *gpb.TypedValue_UintVal: pathsAndValues[pathString] = fmt.Sprintf("%d", update.Val.GetUintVal()) default: log.Errorf("The given value of type: %T, provided by a SubResponse from network element with ID: %s is not supported", v, subscriptionInfo.NetworkElementID) return } } mneID, err := uuid.Parse(subscriptionInfo.NetworkElementID) if err != nil { log.Errorf("Error trying to parse uuid, could not handle subscription response: %v", err) } pubEvent := event.NewGnmiSubscribeEvent(mneID, pathsAndValues) if err := n.eventService.PublishEvent(NetworkElementEventTopic, pubEvent); err != nil { go func() { n.eventService.Reconnect() retryErr := n.eventService.RetryPublish(NetworkElementEventTopic, pubEvent) if retryErr != nil { log.Error(retryErr) } }() } } // mergeGnmiSubscriptions takes paths for gNMI Subscriptions from two sources (the MNE and a config file) // and merges them to one set of subscription paths without duplicates. // Paths should be provided in the following format [][]string{{"system", "config", "hostname"}}. func (n *NetworkElementWatcher) mergeGnmiSubscriptions(gNMISusbcriptionPathsFromMne, gNMISusbcriptionPathsFromConfig [][]string) [][]string { // create slice with all elements var tempJoinedPaths = append(gNMISusbcriptionPathsFromMne, gNMISusbcriptionPathsFromConfig...) // remove duplicates inResult := make(map[string]bool) var joinedPaths [][]string for _, pathSlice := range tempJoinedPaths { path := ygotutil.SlicePathToString(pathSlice) if _, ok := inResult[path]; !ok { inResult[path] = true joinedPaths = append(joinedPaths, pathSlice) } } return joinedPaths } // GetAllSubscriptionInformations returns the information of all running sunscriptions. func (n *NetworkElementWatcher) GetAllSubscriptionInformations() []*networkelementSubscriptionHelper { var information []*networkelementSubscriptionHelper n.networkelementSubscriptionsMutex.Lock() defer n.networkelementSubscriptionsMutex.Unlock() for _, info := range n.networkelementSubcriptions { information = append(information, info) } return information } // GetSubscriptionInformations returns the information for one specific subscription referenced by its ID. func (n *NetworkElementWatcher) GetSubscriptionInformations(subID uuid.UUID) (*networkelementSubscriptionHelper, error) { n.networkelementSubscriptionsMutex.Lock() defer n.networkelementSubscriptionsMutex.Unlock() information, ok := n.networkelementSubcriptions[subID] if !ok { return nil, errors.New(fmt.Sprintf("Couldn't retrieve information for subscription with ID: %s", subID.String())) } return information, nil } // getOptionsForMne checks if there is a match of the mneID with all the mne IDs provided in the slice of subscription // information and returns the related subscribe options or an error. func getOptionsForMne(mneID string, subInfos []*networkelementSubscriptionHelper) (*gnmi.SubscribeOptions, error) { for _, subInfo := range subInfos { if subInfo.MneID == mneID { return subInfo.Opts, nil } } return nil, fmt.Errorf("error: did not find subscription infos matching to provided mne ID: %s", mneID) }