diff --git a/applications/rtdt-manager/README.md b/applications/rtdt-manager/README.md index 64c16fc92e7c453f4f1fa1fad30e7a7c8007edf3..11f756cf2455bcc6910b7bebda1cec2eda8f5806 100644 --- a/applications/rtdt-manager/README.md +++ b/applications/rtdt-manager/README.md @@ -7,35 +7,37 @@ however since that application was defunct, it was determined that it would be s ### Building You can clone and build this program like this: -``` -git clone git@code.fbi.h-da.de:danet/gosdn.git -make build-gosdn -make containerize-gosdn -make build-rtdt-manager +```bash +$ git clone git@code.fbi.h-da.de:danet/gosdn.git +$ make build-gosdn +$ make containerize-gosdn +$ make build-rtdt-manager ``` The application relies on building a local gnmi-target first based on the `interface-enabled-test` branch. For this, clone the gnmi-target repo: -``` -git clone git@code.fbi.h-da.de:danet/gnmi-target.git -git checkout interface-enabled-test -make all +```bash +$ git clone git@code.fbi.h-da.de:danet/gnmi-target.git +$ git checkout interface-enabled-test +$ make all ``` The file under `yang/yang.go` was copied from the danet gnmi-target repo ([https://code.fbi.h-da.de/danet/gnmi-target](https://code.fbi.h-da.de/danet/gnmi-target)) after building it with `make all`. It is necessary so that YANG paths received through events in the callback function can be parsed and their type can be extracted. This is the mechanism used to replicate changes from the Physical Network to the Network Digital Twin (See [#Theory of Operation](#theory-of-operation)). -Starting the application is done for example like this: -``` -./artifacts/rtdt-manager -u admin -a 172.100.0.5:55055 -p TestPassword -c applications/rtdt-manager/data/base-clab.yaml --sdnconfig applications/rtdt-manager/test/downloaded-config.json --with-twin -``` ### Theory of Operation +Starting the application can be done for example like this: + +```bash +$ ./artifacts/rtdt-manager -u admin -a 172.100.0.5:55055 -p TestPassword -c applications/rtdt-manager/data/base-clab.yaml --sdnconfig applications/rtdt-manager/test/downloaded-config.json +``` + The application expects a .yaml containerlab topology file which specifies the base gosdn environment that should be used. It also takes a .json SDN configuration file which mirrors the Topology MongoDB collection. This specifies how the physical network looks. -In a use-case with an actual physical network that uses arist switches, this would be assumed to be just retrievable from the database directly. +In a use-case with an actual physical network that uses e.g. arista EOS-based switches, this would be assumed to be just retrievable from the database directly. -When starting up, the application first creates the physical network. It does this by first taking the base Containerlab .yaml file to configure the physical network and to +When starting up, the application first creates the physical network automatically. It does this by first taking the base Containerlab .yaml file to configure the physical network and to determine the addresses and configuration of the goSDN Controller, Plugin-Registry, MongoDB and MongoDB-Express, and RabbitMQ and then extracting the rest of the topology from the SDN-config file in .json. This is combined into a Containerlab configuration that specifies the entire physical network and once this has been deployed with Containerlab, the .json file is applied to the MongoDB database store with the Northbound Interface (NBI) configuration management service. diff --git a/applications/rtdt-manager/main.go b/applications/rtdt-manager/main.go index 9d1b119eb8ff4f1d104e5465196149c547c27355..1983201e222d4628713fb62321b375e773e74a07 100644 --- a/applications/rtdt-manager/main.go +++ b/applications/rtdt-manager/main.go @@ -73,7 +73,7 @@ func main() { return } - err = rtdtMan.InitEventSystemRealnet() + err = rtdtMan.InitEventSystem() if err != nil { fmt.Printf("In main(): %v\n", err) return diff --git a/applications/rtdt-manager/rtdt-manager/rtdt-manager.go b/applications/rtdt-manager/rtdt-manager/rtdt-manager.go index 00e5203409b3bf87687468b2748b3c9f64cc8968..464b42ffd72f9348a78b7fa9d4d6b3950c14c50b 100644 --- a/applications/rtdt-manager/rtdt-manager/rtdt-manager.go +++ b/applications/rtdt-manager/rtdt-manager/rtdt-manager.go @@ -194,30 +194,49 @@ func (r *RtdtManager) LaunchTwin(twinSubnetIPv4, twinSubnetIPv6, twinName string twin := venv.NewVEnv(twinName, twinClabFName, "admin", "TestPassword", &r.waitGroup, nil) r.rtdt_twins = append(r.rtdt_twins, twin) + // Plugins are broken in goSDN, wait for fix for now, not having + // them set here should cause plugins to be started TwinSdnConfig.Plugins = nil TwinSdnConfig.WriteSdnConfig(twinName + ".json") err = twin.ApplyConfiguration(&TwinSdnConfig) if err != nil { fmt.Printf("Failed to apply configuration: %v\n", err) } + err = r.InitEventSystem() + if err != nil { + fmt.Printf("Failed to initEventSystem for twin: %s\n", twin.Name) + } return nil } -// Apply the changes from a twin back to realnet - NOT USED YET -func (r *RtdtManager) ApplyChanges(twinName string) error { +// Apply the changes from a twin back to realnet +func (r *RtdtManager) ApplyEvents(twinName string) error { var twin *venv.VEnv for _, tw := range r.rtdt_twins { if tw.Name == twinName { twin = tw } } - for _, change := range *twin.GetSavedChanges() { - mneid := change.Mneid - path := change.Path.String() - val := change.Value - //apiOp := change.ApiOp - r.realnet.SetGnmiPath(path, val.GetStringVal(), mneid, false) + // The changes we play back to realnet should not be synced back to us: + r.realnet.SyncBack = false + + var err error + if twin != nil { + if len(twin.SavedEvents) < 1 { + fmt.Printf("No events have been recorded, nothing to do!\n") + return nil + } + for _, event := range twin.SavedEvents { + for path, value := range event.PathsAndValuesMap { + err = r.realnet.SetGnmiPath(path, value, event.EntityID.String(), false) + if err != nil { + return fmt.Errorf("Applying events failed: %w", err) + } + } + } } + twin.SavedEvents = nil // reset events + r.realnet.SyncBack = true // Turn sync back on return nil } @@ -286,7 +305,7 @@ func (r *RtdtManager) Run() error { scanner := bufio.NewScanner(os.Stdin) for { - fmt.Print("$: ") + fmt.Print("$cmd (type 'h' to show commands):") if !scanner.Scan() { fmt.Println("Failed to read from stdin. Exiting!") close(r.stopChan) @@ -295,6 +314,9 @@ func (r *RtdtManager) Run() error { inputLine := scanner.Text() tokens := strings.Fields(inputLine) + if len(tokens) == 0 { + continue + } switch tokens[0] { case "launch-twin": if len(tokens) == 1 { @@ -313,18 +335,30 @@ func (r *RtdtManager) Run() error { fmt.Printf("Wrong number of arguments for launch-twin!\nUsage: launch-twin <IPv4 range> <IPv6 range> <twin name>\n") break } + case "playback": + if len(r.rtdt_twins) < 1 { + fmt.Printf("No twin from which to apply events!\n") + break + } + fmt.Printf("Applying recorded events in twin back to realnet!\n") + err := r.ApplyEvents(r.rtdt_twins[0].Name) // For now, support only one twin + if err != nil { + fmt.Printf("Failed to apply recorded events: %v\n", err) + } + case "benchmark": fmt.Printf("Launching benchmark!\n") r.RunBenchmark0() case "exit", "quit": close(r.stopChan) return - case "help": + case "help", "h": fmt.Println("Available commands:") - fmt.Println(" launch-twin Launch a twin with default options") - fmt.Println(" launch-twin <IPv4> <IPv6> <name> Launch a twin with specified network ranges and name") - fmt.Println(" benchmark Measure propagation delay of twin") - fmt.Println(" exit / quit Exit the program") + fmt.Println(" launch-twin Launch a twin with default options") + fmt.Println(" launch-twin <IPv4> <IPv6> <name> Launch a twin with specified network ranges and name") + fmt.Println(" playback Play changes recorded by twin back to realnet") + fmt.Println(" benchmark Measure propagation delay of twin") + fmt.Println(" exit / quit Exit the program") } } }() @@ -335,54 +369,97 @@ func (r *RtdtManager) Run() error { } // Receive events from realnet VEnv -func (r *RtdtManager) InitEventSystemRealnet() error { - fmt.Println("Starting Event System for realnet!") - // realnet_auth := r.realnet.GetAuth() - // ctx := realnet_auth.CreateContextWithAuthorization() - // queueCredentials, err := registration.Register(ctx, realnet_auth.GetAddress(), "basic-interface-monitoring", "SecurePresharedToken") - var queueAddress string - - for nodename, node := range r.realnet.GetClabData().Topology.Nodes { - if nodename == "rabbitmq" { - queueAddress = "amqp://guest:guest@" + node.MgmtIPv4 + ":5672/" +func (r *RtdtManager) InitEventSystem() error { + // Start realnet event system if not started yet + if !r.realnet.EventSystemStarted { + fmt.Println("Starting Event System for realnet!") + + // realnet_auth := r.realnet.GetAuth() + // ctx := realnet_auth.CreateContextWithAuthorization() + // queueCredentials, err := registration.Register(ctx, realnet_auth.GetAddress(), "basic-interface-monitoring", "SecurePresharedToken") + queueAddress, err := r.realnet.FindQueueAddress() + if err != nil { + return fmt.Errorf("Error in InitEventSystem(): %w", err) + } + + eventServiceMNE, err := event.NewEventService(queueAddress, []event.Topic{event.ManagedNetworkElement}) + if err != nil { + return fmt.Errorf("Failed to attach to event system: %w", err) } + eventServiceUser, err := event.NewEventService(queueAddress, []event.Topic{event.User}) + if err != nil { + return fmt.Errorf("Failed to attach to event system: %w", err) + } + + // Can have different callback per type per topic (e.g. adding mne or updating mne) + eventServiceMNE.SubscribeToEventType([]event.TypeToCallbackTuple{ + // {Type: event.Type(event.Update), Callback: r.updateMNECallbackRealnet}, + // {Type: event.Type(event.Add), Callback: r.updateMNECallbackRealnet}, + // {Type: event.Type(event.Delete), Callback: r.updateMNECallbackRealnet}, + {Type: event.Type(event.Subscribe), Callback: r.updateMNECallbackRealnet}, + }) + eventServiceUser.SubscribeToEventType([]event.TypeToCallbackTuple{ + {Type: event.Type(event.Update), Callback: r.userEventCallback}, + {Type: event.Type(event.Add), Callback: r.userEventCallback}, + {Type: event.Type(event.Delete), Callback: r.userEventCallback}, + }) + // Now iterate over all topics of service and create goRoutines + // that consumes queue + // This function is supposed to be removed in the future? + eventServiceMNE.SetupEventReciever(make(chan os.Signal, 1)) // doesn't seem to use stop channel internally.. + eventServiceUser.SetupEventReciever(make(chan os.Signal, 1)) // doesn't seem to use stop channel internally.. + fmt.Println("Subscribed to events user and mne for realnet") + r.realnet.EventSystemStarted = true } - // TODO: Find out how I can receive the ip address here (it returns rabbitmq) - // if err != nil { - // return fmt.Errorf("Encountered error while trying to register event system: %w", err) - // } - // You have to have one event service for a topic - eventServiceMNE, err := event.NewEventService(queueAddress, []event.Topic{event.ManagedNetworkElement}) - if err != nil { - return fmt.Errorf("Failed to attach to event system: %w", err) + + if len(r.rtdt_twins) < 1 { + return nil } - eventServiceUser, err := event.NewEventService(queueAddress, []event.Topic{event.User}) - if err != nil { - return fmt.Errorf("Failed to attach to event system: %w", err) - } - - // Can have different callback per type per topic (e.g. adding mne or updating mne) - eventServiceMNE.SubscribeToEventType([]event.TypeToCallbackTuple{ - {Type: event.Type(event.Update), Callback: r.updateMNECallbackRealnet}, - {Type: event.Type(event.Add), Callback: r.updateMNECallbackRealnet}, - {Type: event.Type(event.Delete), Callback: r.updateMNECallbackRealnet}, - {Type: event.Type(event.Subscribe), Callback: r.updateMNECallbackRealnet}, - }) - eventServiceUser.SubscribeToEventType([]event.TypeToCallbackTuple{ - {Type: event.Type(event.Update), Callback: r.userEventCallback}, - {Type: event.Type(event.Add), Callback: r.userEventCallback}, - {Type: event.Type(event.Delete), Callback: r.userEventCallback}, - }) - // Now iterate over all topics of service and create goRoutines - // that consumes queue - // This function is supposed to be removed in the future? - eventServiceMNE.SetupEventReciever(make(chan os.Signal, 1)) // doesn't seem to use stop channel internally.. - eventServiceUser.SetupEventReciever(make(chan os.Signal, 1)) // doesn't seem to use stop channel internally.. - fmt.Println("Subscribed to events user and mne") + for _, twin := range r.rtdt_twins { + if !twin.EventSystemStarted { + twinQueueAddress, err := twin.FindQueueAddress() + if err != nil { + return fmt.Errorf("Error in InitEventSystem() for twin %s: %w", twin.Name, err) + } + eventServMneTwin, err := event.NewEventService(twinQueueAddress, []event.Topic{event.ManagedNetworkElement}) + if err != nil { + return fmt.Errorf("Failed to attach to twin event system: %w", err) + } + eventServMneTwin.SubscribeToEventType([]event.TypeToCallbackTuple{ + {Type: event.Type(event.Subscribe), Callback: r.updateMNECallbackTwin}, + }) + eventServMneTwin.SetupEventReciever(make(chan os.Signal, 1)) + twin.EventSystemStarted = true + } + } + return nil } +func (r *RtdtManager) updateMNECallbackTwin(event *event.Event) { + // Don't save changes we receive from realnet + if !r.rtdt_twins[0].SyncBack { + return + } + // Get the relevant twin first + fmt.Println("--------------------------------") + fmt.Println("---------- MNE EVENT IN TWIN -----------") + fmt.Println("EventID: ", event.ID.ID()) + fmt.Println("Event Type: ", event.Type) + fmt.Println("PathsAndValuesMap: ", event.PathsAndValuesMap) + fmt.Println("EntityID", event.EntityID) + + for path, value := range event.PathsAndValuesMap { + r.rtdt_twins[0].SavedEvents = append(r.rtdt_twins[0].SavedEvents, event) + fmt.Printf("Saved change with path %s and value %s\n", path, value) + } + +} + func (r *RtdtManager) updateMNECallbackRealnet(event *event.Event) { + if !r.realnet.SyncBack { + return + } fmt.Println("--------------------------------") fmt.Println("---------- MNE EVENT -----------") fmt.Println("EventID: ", event.ID.ID()) @@ -400,8 +477,9 @@ func (r *RtdtManager) updateMNECallbackRealnet(event *event.Event) { //TODO: This is where some selection process should happen to select the right twin based on the event var twin *venv.VEnv if len(r.rtdt_twins) > 0 { - fmt.Println("------------Found twin!") - twin = r.rtdt_twins[0] // just support one twin for now + fmt.Println("Found twin to apply change to:", r.rtdt_twins[0].Name) + twin = r.rtdt_twins[0] // just support one twin for now + r.rtdt_twins[0].SyncBack = false // Don't record this in twin } else { fmt.Println("Event triggered but no twin to apply it to exists (yet)") return @@ -412,6 +490,9 @@ func (r *RtdtManager) updateMNECallbackRealnet(event *event.Event) { // Based on EntityID, get the gnmi target from twin's ClabConfig // First get hostname of realnet node realnetNode := r.realnet.GetSdnConfig().GetNodeByUUID(event.EntityID.String()) + // LINK UP/DOWN + interfaceRegex := regexp.MustCompile(`/interfaces/interface\[name=([^]]+)]/state/oper-status`) + regexMatch := interfaceRegex.FindStringSubmatch(path) // Get the ID of parallel mne in twin network // parallel nodes are nodes that have the same name in twin and realnet @@ -423,14 +504,15 @@ func (r *RtdtManager) updateMNECallbackRealnet(event *event.Event) { } var err error + // Some explicitly supported paths // MTU Change if strings.HasPrefix(path, prefix) && strings.HasSuffix(path, suffixMTU) { fmt.Println("--- CHANGE MTU TRIGGERED ---") fmt.Println("Value of new MTU: ", value) twin.SetGnmiPath(path, value, twinEntityID, false) - } - // Hostname change - if strings.HasPrefix(path, prefixHostname) { + // Set Hostname + } else if strings.HasPrefix(path, prefixHostname) { + // Hostname change fmt.Println("--- CHANGE HOSTNAME TRIGGERED ---") for _, twin := range r.rtdt_twins { if twin == nil { @@ -444,22 +526,24 @@ func (r *RtdtManager) updateMNECallbackRealnet(event *event.Event) { return } } - } - // LINK UP/DOWN - re := regexp.MustCompile(`/interfaces/interface\[name=([^]]+)]/state/oper-status`) - match := re.FindStringSubmatch(path) - if match != nil && len(r.rtdt_twins) > 0 { - fmt.Println("Setting interface", match[1], "UP/DOWN") - fmt.Printf("match: %v\n", match) - path := "/interfaces/interface[name=" + match[1] + "]/config/enabled" + // Interface UP/Down + } else if regexMatch != nil && len(r.rtdt_twins) > 0 { + + fmt.Println("Setting interface", regexMatch[1], "UP/DOWN") + fmt.Printf("match: %v\n", regexMatch) + path := "/interfaces/interface[name=" + regexMatch[1] + "]/config/enabled" if value == "DOWN" { value = "false" } else { value = "true" } twin.SetGnmiPath(path, value, twinEntityID, false) + // CATCHALL + } else { + twin.SetGnmiPath(path, value, twinEntityID, false) } } + twin.SyncBack = true } func (r *RtdtManager) userEventCallback(event *event.Event) { @@ -469,7 +553,3 @@ func (r *RtdtManager) userEventCallback(event *event.Event) { fmt.Println("Event Type: ", event.Type) fmt.Println("PathsAndValuesMap: ", event.PathsAndValuesMap) } - -func (r *RtdtManager) updateMneCallbackTwin(event *event.Event) { - -} diff --git a/applications/rtdt-manager/venv/venv.go b/applications/rtdt-manager/venv/venv.go index 3113f298e34c559fa4dd9dccae5eb6e34b46b87c..26839ba9316cf85863c95d74d1eff579e77b1193 100644 --- a/applications/rtdt-manager/venv/venv.go +++ b/applications/rtdt-manager/venv/venv.go @@ -15,6 +15,7 @@ import ( "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd" topoPb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/topology" tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport" + "code.fbi.h-da.de/danet/gosdn/application-framework/event" clab "code.fbi.h-da.de/danet/gosdn/applications/rtdt-manager/clab-config" clabconfig "code.fbi.h-da.de/danet/gosdn/applications/rtdt-manager/clab-config" "code.fbi.h-da.de/danet/gosdn/applications/rtdt-manager/gosdnutil" @@ -41,7 +42,10 @@ type VEnv struct { StopChan <-chan struct{} waitGroup *sync.WaitGroup containerRegistryURL string - savedChanges []*networkelement.ChangeRequest + SavedEvents []*event.Event + // This is an ugly way of temporarily ignoring events to avoid e.g. infinite sync loops: + SyncBack bool + EventSystemStarted bool } // Accepts a yaml filename to deploy a container lab environment @@ -117,6 +121,7 @@ func NewVEnv(name, clabFilename, user, pass string, wg *sync.WaitGroup, sdnConfi waitGroup: wg, sdnConfig: sdnConfig, containerRegistryURL: "registry.code.fbi.h-da.de/danet/gnmi-target/debian:interface-enabled-test", // TODO: Could let user choose + SyncBack: true, } } @@ -249,7 +254,25 @@ func (v *VEnv) UploadTopology() error { } func getTypedValue(value, ytype string) (*gnmi.TypedValue, error) { + // Special case for interface up/down events + if value == "DOWN" { + value = "false" + } else if value == "UP" { + value = "true" + } + fmt.Printf("getTypedValue: value=%s, ytype=%s\n", value, ytype) switch ytype { + case "bool": + { + boolVal, err := strconv.ParseBool(value) + if err == nil { + return &gnmi.TypedValue{ + Value: &gnmi.TypedValue_BoolVal{BoolVal: boolVal}, + }, nil + } else { + return nil, err + } + } case "uint16": { uintVal, err := strconv.ParseUint(value, 10, 64) @@ -284,8 +307,8 @@ func getTypedValue(value, ytype string) (*gnmi.TypedValue, error) { } func (v *VEnv) SetGnmiPath(path, value, mneid string, save bool) error { - ctx := v.auth.CreateContextWithAuthorization() fmt.Println("--IN SETGNMIPATH-----------------------") + ctx := v.auth.CreateContextWithAuthorization() mneService := networkelement.NewNetworkElementServiceClient(v.conn) gnmiPath, err := ygot.StringToStructuredPath(path) if err != nil { @@ -351,9 +374,9 @@ func (v *VEnv) SetGnmiPath(path, value, mneid string, save bool) error { } else { fmt.Println("Successfully applied changes:", clResponse) } - if save { - v.savedChanges = append(v.savedChanges, changeRequest) - } + // if save { + // v.SavedChanges = append(v.SavedChanges, changeRequest) + // } return nil } @@ -385,6 +408,18 @@ func (v *VEnv) fetchTopology() error { return nil } +func (v *VEnv) FindQueueAddress() (string, error) { + var queueAddress string + for nodename, node := range v.GetClabData().Topology.Nodes { + if nodename == "rabbitmq" { + // This is hardcoded for now, for later this could use credentials passed via CLI + queueAddress = "amqp://guest:guest@" + node.MgmtIPv4 + ":5672/" + return queueAddress, nil + } + } + return "", fmt.Errorf("Couldn't retrieve queue address!") +} + // {G,S}ETTERS func (v VEnv) GetName() string { return v.Name @@ -411,6 +446,6 @@ func (v *VEnv) GetSdnConfig() *sdnconfig.SdnConfig { return v.sdnConfig } -func (v *VEnv) GetSavedChanges() *[]*networkelement.ChangeRequest { - return &v.savedChanges +func (v *VEnv) GetSavedChanges() []*event.Event { + return v.SavedEvents }