Newer
Older
package nucleus
import (
"fmt"
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
"code.fbi.h-da.de/danet/gosdn/controller/store"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
)
// DatabaseNetworkElementStore is used to store Network Elements.
type DatabaseNetworkElementStore struct {
}
// NewDatabaseNetworkElementStore returns a NetworkElementStore.
func NewDatabaseNetworkElementStore(pndUUID uuid.UUID, db *mongo.Database) networkelement.Store {
storeName := fmt.Sprintf("networkElement-store-%s.json", pndUUID.String())
collection := db.Collection(storeName)
return &DatabaseNetworkElementStore{
}
}
// Get takes a NetworkElement's UUID or name and returns the NetworkElement.
func (s *DatabaseNetworkElementStore) Get(ctx context.Context, query store.Query) (networkelement.LoadedNetworkElement, error) {
var loadedNetworkElement networkelement.LoadedNetworkElement
if query.ID.String() != "" {
loadedNetworkElement, err := s.getByID(ctx, query.ID)
if err != nil {
return loadedNetworkElement, err
}
return loadedNetworkElement, nil
}
loadedNetworkElement, err := s.getByName(ctx, query.Name)
if err != nil {
return loadedNetworkElement, err
}
return loadedNetworkElement, nil
}
func (s *DatabaseNetworkElementStore) getByID(ctx context.Context, idOfNetworkElement uuid.UUID) (loadedNetworkElement networkelement.LoadedNetworkElement, err error) {
result := s.collection.FindOne(ctx, bson.D{primitive.E{Key: "_id", Value: idOfNetworkElement.String()}})
if result == nil {
return loadedNetworkElement, customerrs.CouldNotFindError{ID: idOfNetworkElement}
}
err = result.Decode(&loadedNetworkElement)
if err != nil {
log.Printf("Failed marshalling %v", err)
return loadedNetworkElement, customerrs.CouldNotMarshallError{Identifier: idOfNetworkElement, Type: loadedNetworkElement, Err: err}
}
return loadedNetworkElement, nil
}
func (s *DatabaseNetworkElementStore) getByName(ctx context.Context, nameOfNetworkElement string) (loadedNetworkElement networkelement.LoadedNetworkElement, err error) {
result := s.collection.FindOne(ctx, bson.D{primitive.E{Key: "name", Value: nameOfNetworkElement}})
if result == nil {
return loadedNetworkElement, customerrs.CouldNotFindError{Name: nameOfNetworkElement}
}
err = result.Decode(&loadedNetworkElement)
if err != nil {
log.Printf("Failed marshalling %v", err)
return loadedNetworkElement, customerrs.CouldNotMarshallError{Identifier: nameOfNetworkElement, Type: loadedNetworkElement, Err: err}
}
return loadedNetworkElement, nil
}
// GetAll returns all stored network elements.
func (s *DatabaseNetworkElementStore) GetAll(ctx context.Context) (loadedNetworkElements []networkelement.LoadedNetworkElement, err error) {
cursor, err := s.collection.Find(ctx, bson.D{})
if err != nil {
return nil, err
}
defer func() {
if ferr := cursor.Close(ctx); ferr != nil {
fErrString := ferr.Error()
err = fmt.Errorf("InternalError=%w DeferError=%+s", err, fErrString)
}
}()
err = cursor.All(ctx, &loadedNetworkElements)
if err != nil {
log.Printf("Failed marshalling %v", err)
return nil, customerrs.CouldNotMarshallError{Type: loadedNetworkElements, Err: err}
}
return loadedNetworkElements, nil
}
// Add adds a network element to the network element store.
func (s *DatabaseNetworkElementStore) Add(ctx context.Context, networkElementToAdd networkelement.NetworkElement) (err error) {
_, err = s.collection.InsertOne(ctx, networkElementToAdd)
if err != nil {
log.Printf("Could not create NetworkElement: %v", err)
return customerrs.CouldNotCreateError{Identifier: networkElementToAdd.ID(), Type: networkElementToAdd, Err: err}
}
return nil
}
// Update updates a existing network element.
func (s *DatabaseNetworkElementStore) Update(ctx context.Context, networkElementToUpdate networkelement.NetworkElement) (err error) {
var updatedLoadedNetworkElement networkelement.LoadedNetworkElement
wc := writeconcern.Majority()
txnOptions := options.Transaction().SetWriteConcern(wc)
// Starts a session on the client
session, err := s.collection.Database().Client().StartSession()
if err != nil {
return err
// Defers ending the session after the transaction is committed or ended
defer session.EndSession(ctx)
// Transaction
callback := func(sessCtx mongo.SessionContext) (interface{}, error) {
// 1. Fetch exisiting Entity
existingNetworkElement, err := s.getByID(ctx, networkElementToUpdate.ID())
if err != nil {
return nil, err
}
// 2. Check if Entity.Metadata.ResourceVersion == UpdatedEntity.Metadata.ResourceVersion
if networkElementToUpdate.GetMetadata().ResourceVersion != existingNetworkElement.Metadata.ResourceVersion {
// 2.1 End transaction
// 2.2 If no -> return error
return nil, fmt.Errorf(
"resource version %d of provided network element %s is older or newer than %d in the store",
networkElementToUpdate.GetMetadata().ResourceVersion,
networkElementToUpdate.ID().String(), existingNetworkElement.Metadata.ResourceVersion,
)
}
// Important: You must pass sessCtx as the Context parameter to the operations for them to be executed in the
// transaction.
u, _ := networkElementToUpdate.(*CommonNetworkElement)
u.Metadata.ResourceVersion = u.Metadata.ResourceVersion + 1
update := bson.D{primitive.E{Key: "$set", Value: u}}
upsert := false
after := options.After
opt := options.FindOneAndUpdateOptions{
Upsert: &upsert,
ReturnDocument: &after,
}
FindOneAndUpdate(
ctx, bson.M{"_id": networkElementToUpdate.ID().String()}, update, &opt).
Decode(&updatedLoadedNetworkElement)
if err != nil {
log.Printf("Could not update network element: %v", err)
return nil, customerrs.CouldNotUpdateError{Identifier: networkElementToUpdate.ID(), Type: networkElementToUpdate, Err: err}
}
// 3.2.2 End transaction
return "", nil
}
_, err = session.WithTransaction(ctx, callback, txnOptions)
if err != nil {
return err
}
return nil
}
// Delete deletes a network element from the network element store.
func (s *DatabaseNetworkElementStore) Delete(ctx context.Context, networkElementToDelete networkelement.NetworkElement) (err error) {
_, err = s.collection.DeleteOne(ctx, bson.D{primitive.E{Key: "_id", Value: networkElementToDelete.ID().String()}})
if err != nil {
return customerrs.CouldNotDeleteError{Identifier: networkElementToDelete.ID(), Type: networkElementToDelete, Err: err}
}
return nil
}