Skip to content
Snippets Groups Projects
databaseNodeStore.go 5.52 KiB
Newer Older
  • Learn to ignore specific revisions
  • package nodes
    
    import (
    	"context"
    	"fmt"
    	"time"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
    	query "code.fbi.h-da.de/danet/gosdn/controller/store"
    
    	"github.com/google/uuid"
    	"go.mongodb.org/mongo-driver/bson"
    	"go.mongodb.org/mongo-driver/bson/primitive"
    	"go.mongodb.org/mongo-driver/mongo"
    	"go.mongodb.org/mongo-driver/mongo/options"
    	"go.mongodb.org/mongo-driver/mongo/writeconcern"
    )
    
    const storeName = "node-store.json"
    
    // Store defines a NodeStore interface.
    type Store interface {
    	Add(context.Context, Node) error
    	Update(context.Context, Node) error
    	Delete(context.Context, Node) error
    	Get(context.Context, query.Query) (Node, error)
    	GetAll(context.Context) ([]Node, error)
    }
    
    // DatabaseNodeStore is a database store for nodes.
    type DatabaseNodeStore struct {
    	collection *mongo.Collection
    }
    
    // NewDatabaseNodeStore returns a NodeStore.
    func NewDatabaseNodeStore(db *mongo.Database) Store {
    	collection := db.Collection(storeName)
    
    	return &DatabaseNodeStore{
    		collection: collection,
    	}
    }
    
    // Get takes a nodes's UUID or name and returns the nodes.
    func (s *DatabaseNodeStore) Get(ctx context.Context, query query.Query) (Node, error) {
    	var loadedNode Node
    
    	if query.ID.String() != "" {
    		loadedNode, err := s.getByID(ctx, query.ID)
    		if err != nil {
    			return loadedNode, customerrs.CouldNotFindError{ID: query.ID, Name: query.Name}
    		}
    
    		return loadedNode, nil
    	}
    
    	loadedNode, err := s.getByName(ctx, query.Name)
    	if err != nil {
    		return loadedNode, customerrs.CouldNotFindError{ID: query.ID, Name: query.Name}
    	}
    
    	return loadedNode, nil
    }
    
    func (s *DatabaseNodeStore) getByID(ctx context.Context, idOfNode uuid.UUID) (loadedNode Node, err error) {
    	idAsByteArray, _ := idOfNode.MarshalBinary()
    
    	result := s.collection.FindOne(ctx, bson.D{primitive.E{Key: "_id", Value: idAsByteArray}})
    	if result == nil {
    		return loadedNode, customerrs.CouldNotFindError{ID: idOfNode}
    	}
    
    	err = result.Decode(&loadedNode)
    	if err != nil {
    		return loadedNode, customerrs.CouldNotMarshallError{Identifier: idOfNode, Type: loadedNode, Err: err}
    	}
    
    	return loadedNode, nil
    }
    
    func (s *DatabaseNodeStore) getByName(ctx context.Context, nameOfNode string) (loadedNode Node, err error) {
    	result := s.collection.FindOne(ctx, bson.D{primitive.E{Key: "name", Value: nameOfNode}})
    	if result == nil {
    		return loadedNode, customerrs.CouldNotFindError{Name: nameOfNode}
    	}
    
    	err = result.Decode(&loadedNode)
    	if err != nil {
    		return loadedNode, customerrs.CouldNotMarshallError{Identifier: nameOfNode, Type: loadedNode, Err: err}
    	}
    
    	return loadedNode, nil
    }
    
    // GetAll returns all stored nodes.
    func (s *DatabaseNodeStore) GetAll(ctx context.Context) (loadedNode []Node, err error) {
    	cursor, err := s.collection.Find(ctx, bson.D{})
    	if err != nil {
    		return []Node{}, err
    	}
    	defer func() {
    		if ferr := cursor.Close(ctx); ferr != nil {
    			fErrString := ferr.Error()
    			err = fmt.Errorf("InternalError=%w DeferError=%+s", err, fErrString)
    		}
    	}()
    
    	err = cursor.All(ctx, &loadedNode)
    	if err != nil {
    		return loadedNode, customerrs.CouldNotMarshallError{Type: loadedNode, Err: err}
    	}
    
    	return loadedNode, nil
    }
    
    // Add adds a node to the node store.
    func (s *DatabaseNodeStore) Add(ctx context.Context, node Node) (err error) {
    	node.Metadata.ResourceVersion = 0
    	node.Metadata.CreatedAt = time.Now()
    	node.Metadata.LastUpdated = time.Now()
    
    	_, err = s.collection.
    		InsertOne(ctx, node)
    	if err != nil {
    		return customerrs.CouldNotCreateError{Identifier: node.ID, Type: node, Err: err}
    	}
    
    	return nil
    }
    
    // Update updates a existing node.
    func (s *DatabaseNodeStore) Update(ctx context.Context, node Node) (err error) {
    	var updatedLoadedNodes Node
    
    	wc := writeconcern.Majority()
    	txnOptions := options.Transaction().SetWriteConcern(wc)
    	// Starts a session on the client
    
    	session, err := s.collection.Database().Client().StartSession()
    
    	if err != nil {
    		return err
    	}
    	// Defers ending the session after the transaction is committed or ended
    	defer session.EndSession(ctx)
    
    	// Transaction
    	callback := func(sessCtx mongo.SessionContext) (interface{}, error) {
    		// 1. Fetch exisiting Entity
    		existingNode, err := s.getByID(ctx, node.ID)
    		if err != nil {
    			return nil, err
    		}
    
    		// 2. Check if Entity.Metadata.ResourceVersion == UpdatedEntity.Metadata.ResourceVersion
    		if node.Metadata.ResourceVersion != existingNode.Metadata.ResourceVersion {
    			// 2.1 End transaction
    			// 2.2 If no -> return error
    
    			return nil, fmt.Errorf(
    				"resource version %d of provided node %s is older or newer than %d in the store",
    				node.Metadata.ResourceVersion,
    				node.ID.String(), existingNode.Metadata.ResourceVersion,
    			)
    		}
    
    		// Important: You must pass sessCtx as the Context parameter to the operations for them to be executed in the
    		// transaction.
    		update := bson.D{primitive.E{Key: "$set", Value: node}}
    
    		upsert := false
    		after := options.After
    		opt := options.FindOneAndUpdateOptions{
    			Upsert:         &upsert,
    			ReturnDocument: &after,
    		}
    
    		err = s.collection.
    			FindOneAndUpdate(
    				ctx, bson.M{"_id": node.ID.String()}, update, &opt).
    			Decode(&updatedLoadedNodes)
    		if err != nil {
    			return nil, customerrs.CouldNotUpdateError{Identifier: node.ID, Type: node, Err: err}
    		}
    
    		// 3. End transaction
    		return "", nil
    	}
    
    	_, err = session.WithTransaction(ctx, callback, txnOptions)
    	if err != nil {
    		return err
    	}
    
    	return nil
    }
    
    // Delete deletes a node from the node store.
    func (s *DatabaseNodeStore) Delete(ctx context.Context, node Node) (err error) {
    	_, err = s.collection.DeleteOne(ctx, bson.D{primitive.E{Key: node.ID.String()}})
    	if err != nil {
    		return err
    	}
    
    	return nil
    }