package nucleus import ( "code.fbi.h-da.de/danet/gosdn/controller/interfaces/device" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain" cpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/csbi" "code.fbi.h-da.de/danet/gosdn/controller/nucleus/database" errors "code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors" "code.fbi.h-da.de/danet/gosdn/controller/store" "github.com/google/uuid" log "github.com/sirupsen/logrus" "github.com/spf13/viper" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) // DatabasePndStore is used to store PrincipalNetworkDomains type DatabasePndStore struct { pndStoreName string pendingChannels map[uuid.UUID]chan device.Details csbiClient cpb.CsbiServiceClient } // Get takes a PrincipalNetworkDomain's UUID or name and returns the PrincipalNetworkDomain. If the requested // PrincipalNetworkDomain does not exist an error is returned. func (s *DatabasePndStore) Get(query store.Query) (networkdomain.NetworkDomain, error) { var loadedPND LoadedPnd client, ctx, cancel := database.GetMongoConnection() defer cancel() defer client.Disconnect(ctx) db := client.Database(database.DatabaseName) collection := db.Collection(s.pndStoreName) result := collection.FindOne(ctx, bson.D{primitive.E{Key: "_id", Value: query.ID.String()}}) if result == nil { return nil, nil } err := result.Decode(&loadedPND) if err != nil { log.Printf("Failed marshalling %v", err) return nil, errors.ErrCouldNotMarshall{StoreName: pndStoreName} } csbiClient, err := s.getCsbiClient() if err != nil { return nil, err } newPnd, err := NewPND( loadedPND.Name, loadedPND.Description, uuid.MustParse(loadedPND.ID), csbiClient, s.callback, ) if err != nil { return nil, err } return newPnd, nil } // GetAll returns all stored pnds. func (s *DatabasePndStore) GetAll() ([]networkdomain.NetworkDomain, error) { var loadedPnds []LoadedPnd var pnds []networkdomain.NetworkDomain client, ctx, cancel := database.GetMongoConnection() defer cancel() defer client.Disconnect(ctx) db := client.Database(database.DatabaseName) collection := db.Collection(s.pndStoreName) cursor, err := collection.Find(ctx, bson.D{}) if err != nil { return nil, err } defer cursor.Close(ctx) err = cursor.All(ctx, &loadedPnds) if err != nil { log.Printf("Failed marshalling %v", err) return nil, errors.ErrCouldNotMarshall{StoreName: pndStoreName} } csbiClient, err := s.getCsbiClient() if err != nil { return nil, err } for _, loadedPND := range loadedPnds { newPnd, err := NewPND( loadedPND.Name, loadedPND.Description, uuid.MustParse(loadedPND.ID), csbiClient, s.callback, ) if err != nil { return nil, err } pnds = append(pnds, newPnd) } return pnds, nil } // Add adds a pnd to the pnd store. func (s *DatabasePndStore) Add(pnd networkdomain.NetworkDomain) error { client, ctx, cancel := database.GetMongoConnection() defer cancel() defer client.Disconnect(ctx) _, err := client.Database(database.DatabaseName). Collection(s.pndStoreName). InsertOne(ctx, pnd) if err != nil { return errors.ErrCouldNotCreate{StoreName: pndStoreName} } return nil } // Delete deletes a pnd. // It also deletes all assosicated devices and sbis. func (s *DatabasePndStore) Delete(pnd networkdomain.NetworkDomain) error { client, ctx, cancel := database.GetMongoConnection() defer cancel() defer client.Disconnect(ctx) db := client.Database(database.DatabaseName) collection := db.Collection(s.pndStoreName) _, err := collection.DeleteOne(ctx, bson.D{primitive.E{Key: pnd.ID().String()}}) if err != nil { return err } // TODO: Delete all assosicated devices + SBIs return nil } // PendingChannels holds channels used communicate with pending // cSBI deployments func (s *DatabasePndStore) PendingChannels(id uuid.UUID, parseErrors ...error) (chan device.Details, error) { ch, ok := s.pendingChannels[id] if !ok { return nil, &errors.ErrNotFound{ID: id} } return ch, nil } // AddPendingChannel adds a pending channel to the map func (s *DatabasePndStore) AddPendingChannel(id uuid.UUID, ch chan device.Details) { s.pendingChannels[id] = ch } // RemovePendingChannel removes a pending channel from the map func (s *DatabasePndStore) RemovePendingChannel(id uuid.UUID) { delete(s.pendingChannels, id) } func (s *DatabasePndStore) callback(id uuid.UUID, ch chan device.Details) { if ch != nil { s.AddPendingChannel(id, ch) log.Infof("pending channel %v added", id) } else { s.RemovePendingChannel(id) log.Infof("pending channel %v removed", id) } } func (s *DatabasePndStore) getCsbiClient() (cpb.CsbiServiceClient, error) { if s.csbiClient == nil { orchestrator := viper.GetString("csbi-orchestrator") conn, err := grpc.Dial(orchestrator, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } s.csbiClient = cpb.NewCsbiServiceClient(conn) } return s.csbiClient, nil }