Skip to content
Snippets Groups Projects
Commit 28e9451c authored by Andre Sterba's avatar Andre Sterba Committed by Fabian Seidl
Browse files

Cleanup stores to improve readability

See merge request !281
parent a9ccba47
Branches
Tags
2 merge requests!281Cleanup stores to improve readability,!264WIP: Develop
Pipeline #99111 passed
package nucleus
import (
"fmt"
spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/device"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/database"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors"
"code.fbi.h-da.de/danet/gosdn/controller/store"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
)
// DatabaseDeviceStore is used to store Devices
type DatabaseDeviceStore struct {
storeName string
sbiStore southbound.SbiStore
}
// NewDatabaseDeviceStore returns a DeviceStore
func NewDatabaseDeviceStore(pndUUID uuid.UUID, sbiStore southbound.SbiStore) device.Store {
return &DatabaseDeviceStore{
storeName: fmt.Sprintf("device-store-%s.json", pndUUID.String()),
sbiStore: sbiStore,
}
}
// Get takes a Device's UUID or name and returns the Device.
func (s *DatabaseDeviceStore) Get(query store.Query) (device.Device, error) {
var loadedDevice LoadedDevice
client, ctx, cancel := database.GetMongoConnection()
defer cancel()
defer client.Disconnect(ctx)
db := client.Database(database.DatabaseName)
collection := db.Collection(s.storeName)
result := collection.FindOne(ctx, bson.D{primitive.E{Key: "_id", Value: query.ID}})
if result == nil {
return nil, errors.ErrCouldNotFind{StoreName: pndStoreName}
}
err := result.Decode(&loadedDevice)
if err != nil {
db := client.Database(database.DatabaseName)
collection := db.Collection(s.storeName)
result := collection.FindOne(ctx, bson.D{primitive.E{Key: "_id", Value: query.Name}})
if result == nil {
return nil, errors.ErrCouldNotFind{StoreName: pndStoreName}
}
err := result.Decode(&loadedDevice)
if err != nil {
log.Printf("Failed marshalling %v", err)
return nil, errors.ErrCouldNotFind{StoreName: pndStoreName}
}
}
sbiForDevice, err := s.sbiStore.Get(store.Query{ID: uuid.MustParse(loadedDevice.SBI)})
if err != nil {
return nil, err
}
d, err := NewDevice(
loadedDevice.Name,
uuid.MustParse(loadedDevice.DeviceID),
&tpb.TransportOption{
Address: loadedDevice.TransportAddress,
Username: loadedDevice.TransportUsername,
Password: loadedDevice.TransportPassword,
TransportOption: &tpb.TransportOption_GnmiTransportOption{
GnmiTransportOption: &tpb.GnmiTransportOption{},
},
Type: spb.Type_TYPE_OPENCONFIG,
}, sbiForDevice)
if err != nil {
return nil, err
}
return d, nil
}
// GetAll returns all stored devices.
func (s *DatabaseDeviceStore) GetAll() ([]device.Device, error) {
var loadedDevices []LoadedDevice
var devices []device.Device
client, ctx, cancel := database.GetMongoConnection()
defer cancel()
defer client.Disconnect(ctx)
db := client.Database(database.DatabaseName)
collection := db.Collection(s.storeName)
cursor, err := collection.Find(ctx, bson.D{})
if err != nil {
return nil, err
}
defer cursor.Close(ctx)
err = cursor.All(ctx, &loadedDevices)
if err != nil {
log.Printf("Failed marshalling %v", err)
return nil, errors.ErrCouldNotMarshall{StoreName: pndStoreName}
}
for _, device := range loadedDevices {
sbiForDevice, err := s.sbiStore.Get(store.Query{ID: uuid.MustParse(device.SBI)})
d, err := NewDevice(
device.Name,
uuid.MustParse(device.DeviceID),
&tpb.TransportOption{
Address: device.TransportAddress,
Username: device.TransportUsername,
Password: device.TransportPassword,
TransportOption: &tpb.TransportOption_GnmiTransportOption{
GnmiTransportOption: &tpb.GnmiTransportOption{},
},
Type: spb.Type_TYPE_OPENCONFIG,
}, sbiForDevice)
if err != nil {
return nil, err
}
devices = append(devices, d)
}
return devices, nil
}
// Add adds a device to the device store.
func (s *DatabaseDeviceStore) Add(deviceToAdd device.Device) error {
client, ctx, cancel := database.GetMongoConnection()
defer cancel()
defer client.Disconnect(ctx)
_, err := client.Database(database.DatabaseName).
Collection(s.storeName).
InsertOne(ctx, deviceToAdd)
if err != nil {
log.Printf("Could not create Device: %v", err)
return errors.ErrCouldNotCreate{StoreName: pndStoreName}
}
return nil
}
// Update updates a existing device.
func (s *DatabaseDeviceStore) Update(deviceToUpdate device.Device) error {
var updatedDevice device.Device
client, ctx, cancel := database.GetMongoConnection()
defer cancel()
defer client.Disconnect(ctx)
update := bson.M{
"$set": deviceToUpdate,
}
upsert := false
after := options.After
opt := options.FindOneAndUpdateOptions{
Upsert: &upsert,
ReturnDocument: &after,
}
err := client.Database(database.DatabaseName).
Collection(s.storeName).
FindOneAndUpdate(
ctx, bson.M{"id": deviceToUpdate.ID}, update, &opt).
Decode(&updatedDevice)
if err != nil {
log.Printf("Could not update Device: %v", err)
return errors.ErrCouldNotUpdate{StoreName: pndStoreName}
}
return nil
}
// Delete deletes a device from the device store.
func (s *DatabaseDeviceStore) Delete(deviceToDelete device.Device) error {
client, ctx, cancel := database.GetMongoConnection()
defer cancel()
defer client.Disconnect(ctx)
db := client.Database(database.DatabaseName)
collection := db.Collection(s.storeName)
_, err := collection.DeleteOne(ctx, bson.D{primitive.E{Key: deviceToDelete.ID().String()}})
if err != nil {
return err
}
return nil
}
package nucleus
import (
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/device"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/database"
errors "code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors"
"code.fbi.h-da.de/danet/gosdn/controller/store"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
)
// DatabasePndStore is used to store PrincipalNetworkDomains
type DatabasePndStore struct {
pndStoreName string
pendingChannels map[uuid.UUID]chan device.Details
}
// Get takes a PrincipalNetworkDomain's UUID or name and returns the PrincipalNetworkDomain. If the requested
// PrincipalNetworkDomain does not exist an error is returned.
func (s *DatabasePndStore) Get(query store.Query) (networkdomain.NetworkDomain, error) {
var loadedPND LoadedPnd
client, ctx, cancel := database.GetMongoConnection()
defer cancel()
defer client.Disconnect(ctx)
db := client.Database(database.DatabaseName)
collection := db.Collection(s.pndStoreName)
result := collection.FindOne(ctx, bson.D{primitive.E{Key: "_id", Value: query.ID.String()}})
if result == nil {
return nil, nil
}
err := result.Decode(&loadedPND)
if err != nil {
log.Printf("Failed marshalling %v", err)
return nil, errors.ErrCouldNotMarshall{StoreName: pndStoreName}
}
newPnd, err := NewPND(
loadedPND.Name,
loadedPND.Description,
uuid.MustParse(loadedPND.ID),
nil,
s.callback,
)
if err != nil {
return nil, err
}
return newPnd, nil
}
// GetAll returns all stored pnds.
func (s *DatabasePndStore) GetAll() ([]networkdomain.NetworkDomain, error) {
var loadedPnds []LoadedPnd
var pnds []networkdomain.NetworkDomain
client, ctx, cancel := database.GetMongoConnection()
defer cancel()
defer client.Disconnect(ctx)
db := client.Database(database.DatabaseName)
collection := db.Collection(s.pndStoreName)
cursor, err := collection.Find(ctx, bson.D{})
if err != nil {
return nil, err
}
defer cursor.Close(ctx)
err = cursor.All(ctx, &loadedPnds)
if err != nil {
log.Printf("Failed marshalling %v", err)
return nil, errors.ErrCouldNotMarshall{StoreName: pndStoreName}
}
for _, loadedPND := range loadedPnds {
newPnd, err := NewPND(
loadedPND.Name,
loadedPND.Description,
uuid.MustParse(loadedPND.ID),
nil,
nil,
)
if err != nil {
return nil, err
}
pnds = append(pnds, newPnd)
}
return pnds, nil
}
// Add adds a pnd to the pnd store.
func (s *DatabasePndStore) Add(pnd networkdomain.NetworkDomain) error {
client, ctx, cancel := database.GetMongoConnection()
defer cancel()
defer client.Disconnect(ctx)
_, err := client.Database(database.DatabaseName).
Collection(s.pndStoreName).
InsertOne(ctx, pnd)
if err != nil {
return errors.ErrCouldNotCreate{StoreName: pndStoreName}
}
return nil
}
// Delete deletes a pnd.
// It also deletes all assosicated devices and sbis.
func (s *DatabasePndStore) Delete(pnd networkdomain.NetworkDomain) error {
client, ctx, cancel := database.GetMongoConnection()
defer cancel()
defer client.Disconnect(ctx)
db := client.Database(database.DatabaseName)
collection := db.Collection(s.pndStoreName)
_, err := collection.DeleteOne(ctx, bson.D{primitive.E{Key: pnd.ID().String()}})
if err != nil {
return err
}
// TODO: Delete all assosicated devices + SBIs
return nil
}
// PendingChannels holds channels used communicate with pending
// cSBI deployments
func (s *DatabasePndStore) PendingChannels(id uuid.UUID, parseErrors ...error) (chan device.Details, error) {
ch, ok := s.pendingChannels[id]
if !ok {
return nil, &errors.ErrNotFound{ID: id}
}
return ch, nil
}
// AddPendingChannel adds a pending channel to the map
func (s *DatabasePndStore) AddPendingChannel(id uuid.UUID, ch chan device.Details) {
s.pendingChannels[id] = ch
}
// RemovePendingChannel removes a pending channel from the map
func (s *DatabasePndStore) RemovePendingChannel(id uuid.UUID) {
delete(s.pendingChannels, id)
}
func (s *DatabasePndStore) callback(id uuid.UUID, ch chan device.Details) {
if ch != nil {
s.AddPendingChannel(id, ch)
log.Infof("pending channel %v added", id)
} else {
s.RemovePendingChannel(id)
log.Infof("pending channel %v removed", id)
}
}
package nucleus
import (
spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/database"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors"
"code.fbi.h-da.de/danet/gosdn/controller/store"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
)
// DatabaseSbiStore is used to store SouthboundInterfaces
type DatabaseSbiStore struct {
sbiStoreName string
}
// Add adds a SBI.
func (s *DatabaseSbiStore) Add(item southbound.SouthboundInterface) error {
client, ctx, cancel := database.GetMongoConnection()
defer cancel()
defer client.Disconnect(ctx)
_, err := client.Database(database.DatabaseName).
Collection(s.sbiStoreName).
InsertOne(ctx, item)
if err != nil {
if mongo.IsDuplicateKeyError(err) {
return nil
}
return errors.ErrCouldNotCreate{StoreName: sbiStoreName}
}
return nil
}
// Delete deletes an SBI.
func (s *DatabaseSbiStore) Delete(item southbound.SouthboundInterface) error {
client, ctx, cancel := database.GetMongoConnection()
defer cancel()
defer client.Disconnect(ctx)
_, err := client.Database(database.DatabaseName).
Collection(s.sbiStoreName).
DeleteOne(ctx, bson.D{primitive.E{Key: "_id", Value: item.ID().String()}})
if err != nil {
return errors.ErrCouldNotCreate{StoreName: sbiStoreName}
}
return nil
}
// Get takes a SouthboundInterface's UUID or name and returns the SouthboundInterface. If the requested
// SouthboundInterface does not exist an error is returned.
func (s *DatabaseSbiStore) Get(query store.Query) (southbound.SouthboundInterface, error) {
var loadedSbi *LoadedSbi
client, ctx, cancel := database.GetMongoConnection()
defer cancel()
defer client.Disconnect(ctx)
log.Debugf("SBI-Search-ID: %+v\n", query.ID.String())
db := client.Database(database.DatabaseName)
collection := db.Collection(s.sbiStoreName)
result := collection.FindOne(ctx, bson.D{primitive.E{Key: "_id", Value: query.ID.String()}})
if result == nil {
return nil, nil
}
err := result.Decode(&loadedSbi)
if err != nil {
log.Printf("Failed marshalling %v", err)
return nil, errors.ErrCouldNotMarshall{StoreName: sbiStoreName}
}
newSbi, _ := NewSBI(loadedSbi.Type, uuid.MustParse(loadedSbi.ID))
return newSbi, nil
}
// GetAll returns all SBIs
func (s *DatabaseSbiStore) GetAll() ([]southbound.SouthboundInterface, error) {
var loadedSbis []LoadedSbi
var sbis []southbound.SouthboundInterface
client, ctx, cancel := database.GetMongoConnection()
defer cancel()
defer client.Disconnect(ctx)
db := client.Database(database.DatabaseName)
collection := db.Collection(s.sbiStoreName)
cursor, err := collection.Find(ctx, bson.D{})
if err != nil {
return nil, err
}
defer cursor.Close(ctx)
err = cursor.All(ctx, &loadedSbis)
if err != nil {
log.Printf("Failed marshalling %v", err)
return nil, errors.ErrCouldNotMarshall{StoreName: sbiStoreName}
}
for _, sbi := range loadedSbis {
newSbi, _ := NewSBI(sbi.Type, uuid.MustParse(sbi.ID))
sbis = append(sbis, newSbi)
}
return sbis, nil
}
// GetSBI takes a SouthboundInterface's UUID or name and returns the SouthboundInterface. If the requested
// SouthboundInterface does not exist an error is returned.
func (s *DatabaseSbiStore) GetSBI(id uuid.UUID) (southbound.SouthboundInterface, error) {
var loadedSbi LoadedSbi
client, ctx, cancel := database.GetMongoConnection()
defer cancel()
defer client.Disconnect(ctx)
db := client.Database(database.DatabaseName)
collection := db.Collection(s.sbiStoreName)
result := collection.FindOne(ctx, bson.D{primitive.E{Key: "_id", Value: id.String()}})
if result == nil {
return nil, nil
}
err := result.Decode(&loadedSbi)
if err != nil {
log.Printf("Failed marshalling %v", err)
return nil, errors.ErrCouldNotMarshall{StoreName: sbiStoreName}
}
newSbi, err := NewSBI(loadedSbi.Type, uuid.MustParse(loadedSbi.ID))
return newSbi, nil
}
func getSbiTypeFromString(sbiTypeAsString string) spb.Type {
sbiTypeInt := spb.Type_value[sbiTypeAsString]
return spb.Type(sbiTypeInt)
}
...@@ -3,16 +3,9 @@ package nucleus ...@@ -3,16 +3,9 @@ package nucleus
import ( import (
"fmt" "fmt"
spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/device" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/device"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/database"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors"
"code.fbi.h-da.de/danet/gosdn/controller/store" "code.fbi.h-da.de/danet/gosdn/controller/store"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/google/uuid" "github.com/google/uuid"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
...@@ -64,7 +57,7 @@ func NewDeviceStore(pndUUID uuid.UUID, sbiStore southbound.SbiStore) device.Stor ...@@ -64,7 +57,7 @@ func NewDeviceStore(pndUUID uuid.UUID, sbiStore southbound.SbiStore) device.Stor
return &store return &store
case store.Database: case store.Database:
return &DeviceStore{ return &DatabaseDeviceStore{
storeName: fmt.Sprintf("device-store-%s.json", pndUUID.String()), storeName: fmt.Sprintf("device-store-%s.json", pndUUID.String()),
sbiStore: sbiStore, sbiStore: sbiStore,
} }
...@@ -76,173 +69,3 @@ func NewDeviceStore(pndUUID uuid.UUID, sbiStore southbound.SbiStore) device.Stor ...@@ -76,173 +69,3 @@ func NewDeviceStore(pndUUID uuid.UUID, sbiStore southbound.SbiStore) device.Stor
return nil return nil
} }
} }
// Get takes a Device's UUID or name and returns the Device.
func (s *DeviceStore) Get(query store.Query) (device.Device, error) {
var loadedDevice LoadedDevice
client, ctx, cancel := database.GetMongoConnection()
defer cancel()
defer client.Disconnect(ctx)
db := client.Database(database.DatabaseName)
collection := db.Collection(s.storeName)
result := collection.FindOne(ctx, bson.D{primitive.E{Key: "_id", Value: query.ID}})
if result == nil {
return nil, errors.ErrCouldNotFind{StoreName: pndStoreName}
}
err := result.Decode(&loadedDevice)
if err != nil {
db := client.Database(database.DatabaseName)
collection := db.Collection(s.storeName)
result := collection.FindOne(ctx, bson.D{primitive.E{Key: "_id", Value: query.Name}})
if result == nil {
return nil, errors.ErrCouldNotFind{StoreName: pndStoreName}
}
err := result.Decode(&loadedDevice)
if err != nil {
log.Printf("Failed marshalling %v", err)
return nil, errors.ErrCouldNotFind{StoreName: pndStoreName}
}
}
sbiForDevice, err := s.sbiStore.Get(store.Query{ID: uuid.MustParse(loadedDevice.SBI)})
if err != nil {
return nil, err
}
d, err := NewDevice(
loadedDevice.Name,
uuid.MustParse(loadedDevice.DeviceID),
&tpb.TransportOption{
Address: loadedDevice.TransportAddress,
Username: loadedDevice.TransportUsername,
Password: loadedDevice.TransportPassword,
TransportOption: &tpb.TransportOption_GnmiTransportOption{
GnmiTransportOption: &tpb.GnmiTransportOption{},
},
Type: spb.Type_TYPE_OPENCONFIG,
}, sbiForDevice)
if err != nil {
return nil, err
}
return d, nil
}
// GetAll returns all stored devices.
func (s *DeviceStore) GetAll() ([]device.Device, error) {
var loadedDevices []LoadedDevice
var devices []device.Device
client, ctx, cancel := database.GetMongoConnection()
defer cancel()
defer client.Disconnect(ctx)
db := client.Database(database.DatabaseName)
collection := db.Collection(s.storeName)
cursor, err := collection.Find(ctx, bson.D{})
if err != nil {
return nil, err
}
defer cursor.Close(ctx)
err = cursor.All(ctx, &loadedDevices)
if err != nil {
log.Printf("Failed marshalling %v", err)
return nil, errors.ErrCouldNotMarshall{StoreName: pndStoreName}
}
for _, device := range loadedDevices {
sbiForDevice, err := s.sbiStore.Get(store.Query{ID: uuid.MustParse(device.SBI)})
d, err := NewDevice(
device.Name,
uuid.MustParse(device.DeviceID),
&tpb.TransportOption{
Address: device.TransportAddress,
Username: device.TransportUsername,
Password: device.TransportPassword,
TransportOption: &tpb.TransportOption_GnmiTransportOption{
GnmiTransportOption: &tpb.GnmiTransportOption{},
},
Type: spb.Type_TYPE_OPENCONFIG,
}, sbiForDevice)
if err != nil {
return nil, err
}
devices = append(devices, d)
}
return devices, nil
}
// Add adds a device to the device store.
func (s *DeviceStore) Add(deviceToAdd device.Device) error {
client, ctx, cancel := database.GetMongoConnection()
defer cancel()
defer client.Disconnect(ctx)
_, err := client.Database(database.DatabaseName).
Collection(s.storeName).
InsertOne(ctx, deviceToAdd)
if err != nil {
log.Printf("Could not create Device: %v", err)
return errors.ErrCouldNotCreate{StoreName: pndStoreName}
}
return nil
}
// Update updates a existing device.
func (s *DeviceStore) Update(deviceToUpdate device.Device) error {
var updatedDevice device.Device
client, ctx, cancel := database.GetMongoConnection()
defer cancel()
defer client.Disconnect(ctx)
update := bson.M{
"$set": deviceToUpdate,
}
upsert := false
after := options.After
opt := options.FindOneAndUpdateOptions{
Upsert: &upsert,
ReturnDocument: &after,
}
err := client.Database(database.DatabaseName).
Collection(s.storeName).
FindOneAndUpdate(
ctx, bson.M{"id": deviceToUpdate.ID}, update, &opt).
Decode(&updatedDevice)
if err != nil {
log.Printf("Could not update Device: %v", err)
return errors.ErrCouldNotUpdate{StoreName: pndStoreName}
}
return nil
}
// Delete deletes a device from the device store.
func (s *DeviceStore) Delete(deviceToDelete device.Device) error {
client, ctx, cancel := database.GetMongoConnection()
defer cancel()
defer client.Disconnect(ctx)
db := client.Database(database.DatabaseName)
collection := db.Collection(s.storeName)
_, err := collection.DeleteOne(ctx, bson.D{primitive.E{Key: deviceToDelete.ID().String()}})
if err != nil {
return err
}
return nil
}
...@@ -3,13 +3,9 @@ package nucleus ...@@ -3,13 +3,9 @@ package nucleus
import ( import (
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/device" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/device"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/database"
errors "code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors"
"code.fbi.h-da.de/danet/gosdn/controller/store" "code.fbi.h-da.de/danet/gosdn/controller/store"
"github.com/google/uuid" "github.com/google/uuid"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
) )
const ( const (
...@@ -41,7 +37,7 @@ func NewPndStore() networkdomain.PndStore { ...@@ -41,7 +37,7 @@ func NewPndStore() networkdomain.PndStore {
return &store return &store
case store.Database: case store.Database:
return &PndStore{ return &DatabasePndStore{
pendingChannels: make(map[uuid.UUID]chan device.Details), pendingChannels: make(map[uuid.UUID]chan device.Details),
pndStoreName: "pnd-store.json"} pndStoreName: "pnd-store.json"}
case store.Memory: case store.Memory:
...@@ -52,147 +48,3 @@ func NewPndStore() networkdomain.PndStore { ...@@ -52,147 +48,3 @@ func NewPndStore() networkdomain.PndStore {
return nil return nil
} }
} }
// Get takes a PrincipalNetworkDomain's UUID or name and returns the PrincipalNetworkDomain. If the requested
// PrincipalNetworkDomain does not exist an error is returned.
func (s *PndStore) Get(query store.Query) (networkdomain.NetworkDomain, error) {
var loadedPND LoadedPnd
client, ctx, cancel := database.GetMongoConnection()
defer cancel()
defer client.Disconnect(ctx)
db := client.Database(database.DatabaseName)
collection := db.Collection(s.pndStoreName)
result := collection.FindOne(ctx, bson.D{primitive.E{Key: "_id", Value: query.ID.String()}})
if result == nil {
return nil, nil
}
err := result.Decode(&loadedPND)
if err != nil {
log.Printf("Failed marshalling %v", err)
return nil, errors.ErrCouldNotMarshall{StoreName: pndStoreName}
}
newPnd, err := NewPND(
loadedPND.Name,
loadedPND.Description,
uuid.MustParse(loadedPND.ID),
nil,
s.callback,
)
if err != nil {
return nil, err
}
return newPnd, nil
}
// GetAll returns all stored pnds.
func (s *PndStore) GetAll() ([]networkdomain.NetworkDomain, error) {
var loadedPnds []LoadedPnd
var pnds []networkdomain.NetworkDomain
client, ctx, cancel := database.GetMongoConnection()
defer cancel()
defer client.Disconnect(ctx)
db := client.Database(database.DatabaseName)
collection := db.Collection(s.pndStoreName)
cursor, err := collection.Find(ctx, bson.D{})
if err != nil {
return nil, err
}
defer cursor.Close(ctx)
err = cursor.All(ctx, &loadedPnds)
if err != nil {
log.Printf("Failed marshalling %v", err)
return nil, errors.ErrCouldNotMarshall{StoreName: pndStoreName}
}
for _, loadedPND := range loadedPnds {
newPnd, err := NewPND(
loadedPND.Name,
loadedPND.Description,
uuid.MustParse(loadedPND.ID),
nil,
nil,
)
if err != nil {
return nil, err
}
pnds = append(pnds, newPnd)
}
return pnds, nil
}
// Add adds a pnd to the pnd store.
func (s *PndStore) Add(pnd networkdomain.NetworkDomain) error {
client, ctx, cancel := database.GetMongoConnection()
defer cancel()
defer client.Disconnect(ctx)
_, err := client.Database(database.DatabaseName).
Collection(s.pndStoreName).
InsertOne(ctx, pnd)
if err != nil {
return errors.ErrCouldNotCreate{StoreName: pndStoreName}
}
return nil
}
// Delete deletes a pnd.
// It also deletes all assosicated devices and sbis.
func (s *PndStore) Delete(pnd networkdomain.NetworkDomain) error {
client, ctx, cancel := database.GetMongoConnection()
defer cancel()
defer client.Disconnect(ctx)
db := client.Database(database.DatabaseName)
collection := db.Collection(s.pndStoreName)
_, err := collection.DeleteOne(ctx, bson.D{primitive.E{Key: pnd.ID().String()}})
if err != nil {
return err
}
// TODO: Delete all assosicated devices + SBIs
return nil
}
// PendingChannels holds channels used communicate with pending
// cSBI deployments
func (s *PndStore) PendingChannels(id uuid.UUID, parseErrors ...error) (chan device.Details, error) {
ch, ok := s.pendingChannels[id]
if !ok {
return nil, &errors.ErrNotFound{ID: id}
}
return ch, nil
}
// AddPendingChannel adds a pending channel to the map
func (s *PndStore) AddPendingChannel(id uuid.UUID, ch chan device.Details) {
s.pendingChannels[id] = ch
}
// RemovePendingChannel removes a pending channel from the map
func (s *PndStore) RemovePendingChannel(id uuid.UUID) {
delete(s.pendingChannels, id)
}
func (s *PndStore) callback(id uuid.UUID, ch chan device.Details) {
if ch != nil {
s.AddPendingChannel(id, ch)
log.Infof("pending channel %v added", id)
} else {
s.RemovePendingChannel(id)
log.Infof("pending channel %v removed", id)
}
}
...@@ -195,8 +195,8 @@ func Test_pndImplementation_Destroy(t *testing.T) { ...@@ -195,8 +195,8 @@ func Test_pndImplementation_Destroy(t *testing.T) {
type fields struct { type fields struct {
name string name string
description string description string
sbi *SbiStore sbi southbound.SbiStore
devices *DeviceStore devices device.Store
} }
tests := []struct { tests := []struct {
name string name string
......
...@@ -5,15 +5,9 @@ import ( ...@@ -5,15 +5,9 @@ import (
spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound" spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/database"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors"
"code.fbi.h-da.de/danet/gosdn/controller/store" "code.fbi.h-da.de/danet/gosdn/controller/store"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"github.com/google/uuid" "github.com/google/uuid"
log "github.com/sirupsen/logrus"
) )
const ( const (
...@@ -43,7 +37,7 @@ func NewSbiStore(pndUUID uuid.UUID) southbound.SbiStore { ...@@ -43,7 +37,7 @@ func NewSbiStore(pndUUID uuid.UUID) southbound.SbiStore {
return &store return &store
case store.Database: case store.Database:
return &SbiStore{ return &DatabaseSbiStore{
sbiStoreName: fmt.Sprintf("sbi-store-%s.json", pndUUID.String()), sbiStoreName: fmt.Sprintf("sbi-store-%s.json", pndUUID.String()),
} }
case store.Memory: case store.Memory:
...@@ -54,135 +48,3 @@ func NewSbiStore(pndUUID uuid.UUID) southbound.SbiStore { ...@@ -54,135 +48,3 @@ func NewSbiStore(pndUUID uuid.UUID) southbound.SbiStore {
return nil return nil
} }
} }
// Add adds a SBI.
func (s *SbiStore) Add(item southbound.SouthboundInterface) error {
client, ctx, cancel := database.GetMongoConnection()
defer cancel()
defer client.Disconnect(ctx)
_, err := client.Database(database.DatabaseName).
Collection(s.sbiStoreName).
InsertOne(ctx, item)
if err != nil {
if mongo.IsDuplicateKeyError(err) {
return nil
}
return errors.ErrCouldNotCreate{StoreName: sbiStoreName}
}
return nil
}
// Delete deletes an SBI.
func (s *SbiStore) Delete(item southbound.SouthboundInterface) error {
client, ctx, cancel := database.GetMongoConnection()
defer cancel()
defer client.Disconnect(ctx)
_, err := client.Database(database.DatabaseName).
Collection(s.sbiStoreName).
DeleteOne(ctx, bson.D{primitive.E{Key: "_id", Value: item.ID().String()}})
if err != nil {
return errors.ErrCouldNotCreate{StoreName: sbiStoreName}
}
return nil
}
// Get takes a SouthboundInterface's UUID or name and returns the SouthboundInterface. If the requested
// SouthboundInterface does not exist an error is returned.
func (s *SbiStore) Get(query store.Query) (southbound.SouthboundInterface, error) {
var loadedSbi *LoadedSbi
client, ctx, cancel := database.GetMongoConnection()
defer cancel()
defer client.Disconnect(ctx)
log.Debugf("SBI-Search-ID: %+v\n", query.ID.String())
db := client.Database(database.DatabaseName)
collection := db.Collection(s.sbiStoreName)
result := collection.FindOne(ctx, bson.D{primitive.E{Key: "_id", Value: query.ID.String()}})
if result == nil {
return nil, nil
}
err := result.Decode(&loadedSbi)
if err != nil {
log.Printf("Failed marshalling %v", err)
return nil, errors.ErrCouldNotMarshall{StoreName: sbiStoreName}
}
newSbi, _ := NewSBI(loadedSbi.Type, uuid.MustParse(loadedSbi.ID))
return newSbi, nil
}
// GetAll returns all SBIs
func (s *SbiStore) GetAll() ([]southbound.SouthboundInterface, error) {
var loadedSbis []LoadedSbi
var sbis []southbound.SouthboundInterface
client, ctx, cancel := database.GetMongoConnection()
defer cancel()
defer client.Disconnect(ctx)
db := client.Database(database.DatabaseName)
collection := db.Collection(s.sbiStoreName)
cursor, err := collection.Find(ctx, bson.D{})
if err != nil {
return nil, err
}
defer cursor.Close(ctx)
err = cursor.All(ctx, &loadedSbis)
if err != nil {
log.Printf("Failed marshalling %v", err)
return nil, errors.ErrCouldNotMarshall{StoreName: sbiStoreName}
}
for _, sbi := range loadedSbis {
newSbi, _ := NewSBI(sbi.Type, uuid.MustParse(sbi.ID))
sbis = append(sbis, newSbi)
}
return sbis, nil
}
// GetSBI takes a SouthboundInterface's UUID or name and returns the SouthboundInterface. If the requested
// SouthboundInterface does not exist an error is returned.
func (s *SbiStore) GetSBI(id uuid.UUID) (southbound.SouthboundInterface, error) {
var loadedSbi LoadedSbi
client, ctx, cancel := database.GetMongoConnection()
defer cancel()
defer client.Disconnect(ctx)
db := client.Database(database.DatabaseName)
collection := db.Collection(s.sbiStoreName)
result := collection.FindOne(ctx, bson.D{primitive.E{Key: "_id", Value: id.String()}})
if result == nil {
return nil, nil
}
err := result.Decode(&loadedSbi)
if err != nil {
log.Printf("Failed marshalling %v", err)
return nil, errors.ErrCouldNotMarshall{StoreName: sbiStoreName}
}
newSbi, err := NewSBI(loadedSbi.Type, uuid.MustParse(loadedSbi.ID))
return newSbi, nil
}
func getSbiTypeFromString(sbiTypeAsString string) spb.Type {
sbiTypeInt := spb.Type_value[sbiTypeAsString]
return spb.Type(sbiTypeInt)
}
package store
import (
"log"
"reflect"
"testing"
"code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
"code.fbi.h-da.de/danet/gosdn/controller/mocks"
"github.com/google/uuid"
)
func TestChangeStore_Pending(t *testing.T) {
changeMock := &mocks.Change{}
changeMock.On("ID").Return(cuid)
changeMock.On("State").Return(pnd.ChangeState_CHANGE_STATE_PENDING)
store := NewChangeStore()
pending := changeMock
if err := store.Add(pending); err != nil {
t.Error(err)
return
}
tests := []struct {
name string
want []uuid.UUID
}{
{
name: "default",
want: []uuid.UUID{cuid},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := store
if got := s.Pending(); !reflect.DeepEqual(got, tt.want) {
t.Errorf("Pending() = %v, want %v", got, tt.want)
}
})
}
}
func TestChangeStore_Committed(t *testing.T) {
changeMock := &mocks.Change{}
changeMock.On("ID").Return(cuid)
changeMock.On("State").Return(pnd.ChangeState_CHANGE_STATE_COMMITTED)
changeMock.On("Commit").Return(nil)
store := NewChangeStore()
committed := changeMock
if err := committed.Commit(); err != nil {
t.Error(err)
return
}
if err := store.Add(committed); err != nil {
t.Error(err)
return
}
tests := []struct {
name string
want []uuid.UUID
}{
{
name: "default",
want: []uuid.UUID{cuid},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := store
if got := s.Committed(); !reflect.DeepEqual(got, tt.want) {
t.Errorf("Committed() = %v, want %v", got, tt.want)
}
})
}
}
func TestChangeStore_Confirmed(t *testing.T) {
changeMock := &mocks.Change{}
changeMock.On("ID").Return(cuid)
changeMock.On("State").Return(pnd.ChangeState_CHANGE_STATE_CONFIRMED)
changeMock.On("Commit").Return(nil)
changeMock.On("Confirm").Return(nil)
store := NewChangeStore()
confirmed := changeMock
if err := confirmed.Commit(); err != nil {
t.Error(err)
return
}
if err := confirmed.Confirm(); err != nil {
t.Error(err)
return
}
if err := store.Add(confirmed); err != nil {
t.Error(err)
return
}
tests := []struct {
name string
want []uuid.UUID
}{
{
name: "default",
want: []uuid.UUID{cuid},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := store
if got := s.Confirmed(); !reflect.DeepEqual(got, tt.want) {
t.Errorf("Confirmed() = %v, want %v", got, tt.want)
}
})
}
}
func Test_filterChanges(t *testing.T) {
var pendingCUID uuid.UUID
var committedCUID uuid.UUID
var confirmedCUID uuid.UUID
var err error
pendingCUID, err = uuid.Parse("3e8219b0-e926-400d-8660-217f2a25a7c7")
if err != nil {
log.Fatal(err)
}
committedCUID, err = uuid.Parse("3e8219b0-e926-400d-8660-217f2a25a7c8")
if err != nil {
log.Fatal(err)
}
confirmedCUID, err = uuid.Parse("3e8219b0-e926-400d-8660-217f2a25a7c9")
if err != nil {
log.Fatal(err)
}
changeMockPending := &mocks.Change{}
changeMockPending.On("ID").Return(pendingCUID)
changeMockPending.On("State").Return(pnd.ChangeState_CHANGE_STATE_PENDING)
changeMockPending.On("Commit").Return(nil)
changeMockPending.On("Confirm").Return(nil)
changeMockCommited := &mocks.Change{}
changeMockCommited.On("ID").Return(committedCUID)
changeMockCommited.On("State").Return(pnd.ChangeState_CHANGE_STATE_COMMITTED)
changeMockCommited.On("Commit").Return(nil)
changeMockCommited.On("Confirm").Return(nil)
changeMockConfirmed := &mocks.Change{}
changeMockConfirmed.On("ID").Return(confirmedCUID)
changeMockConfirmed.On("State").Return(pnd.ChangeState_CHANGE_STATE_CONFIRMED)
changeMockConfirmed.On("Commit").Return(nil)
changeMockConfirmed.On("Confirm").Return(nil)
store := NewChangeStore()
pending := changeMockPending
committed := changeMockCommited
if err := committed.Commit(); err != nil {
t.Error(err)
return
}
confirmed := changeMockConfirmed
if err := confirmed.Commit(); err != nil {
t.Error(err)
return
}
if err := confirmed.Confirm(); err != nil {
t.Error(err)
return
}
if err := store.Add(pending); err != nil {
t.Error(err)
return
}
if err := store.Add(committed); err != nil {
t.Error(err)
return
}
if err := store.Add(confirmed); err != nil {
t.Error(err)
return
}
type args struct {
store *ChangeStore
state ppb.ChangeState
}
tests := []struct {
name string
args args
want []uuid.UUID
}{
{
name: "pending",
args: args{
store: store,
state: ppb.ChangeState_CHANGE_STATE_PENDING,
},
want: []uuid.UUID{pendingCUID},
},
{
name: "committed",
args: args{
store: store,
state: ppb.ChangeState_CHANGE_STATE_COMMITTED,
},
want: []uuid.UUID{committedCUID},
},
{
name: "confirmed",
args: args{
store: store,
state: ppb.ChangeState_CHANGE_STATE_CONFIRMED,
},
want: []uuid.UUID{confirmedCUID},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := filterChanges(tt.args.store, tt.args.state); !reflect.DeepEqual(got, tt.want) {
t.Errorf("filterChanges() = %v, want %v", got, tt.want)
}
})
}
}
package store
import (
"reflect"
"sort"
"testing"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/store"
"code.fbi.h-da.de/danet/gosdn/controller/mocks"
"github.com/google/uuid"
)
func Test_Store_add(t *testing.T) {
type args struct {
item store.Storable
}
tests := []struct {
name string
s *genericStore
args args
wantErr bool
}{
{
name: "default",
s: newGenericStore(),
args: args{
item: &mocks.Storable{},
},
},
{
name: "already exists",
s: &genericStore{
Store: map[uuid.UUID]store.Storable{
iid: &mocks.Storable{},
},
},
args: args{
item: &mocks.Storable{},
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.args.item.(*mocks.Storable).On("ID").Return(iid)
switch tt.name {
case "already exixts":
_ = tt.s.Add(tt.args.item)
default:
}
if err := tt.s.Add(tt.args.item); (err != nil) != tt.wantErr {
t.Errorf("Add() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func Test_Store_delete(t *testing.T) {
type args struct {
id uuid.UUID
}
tests := []struct {
name string
s *genericStore
args args
wantErr bool
}{
{
name: "default",
s: &genericStore{
Store: map[uuid.UUID]store.Storable{
iid: &mocks.Storable{},
},
},
args: args{id: iid},
wantErr: false,
},
{
name: "not found empty",
s: newGenericStore(),
args: args{id: iid},
wantErr: true,
},
{
name: "not found",
s: &genericStore{
Store: map[uuid.UUID]store.Storable{
iid: &mocks.Storable{},
},
},
args: args{id: altIid},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := tt.s.Delete(tt.args.id); (err != nil) != tt.wantErr {
t.Errorf("delete() error = %v, wantErr %v", err, tt.wantErr)
}
if tt.name == "default" {
item, ok := tt.s.Store[iid]
if ok {
t.Errorf("delete() item %v still in genericStore %v", item, tt.s)
}
}
})
}
}
func Test_Store_exists(t *testing.T) {
type args struct {
id uuid.UUID
}
tests := []struct {
name string
s *genericStore
args args
want bool
}{
{
name: "default",
s: &genericStore{
Store: map[uuid.UUID]store.Storable{
iid: &mocks.Storable{},
},
},
args: args{id: iid},
want: true,
},
{
name: "not found empty",
s: newGenericStore(),
args: args{id: iid},
want: false,
},
{
name: "not found",
s: &genericStore{
Store: map[uuid.UUID]store.Storable{
iid: &mocks.Storable{},
},
},
args: args{id: altIid},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := tt.s.Exists(tt.args.id); got != tt.want {
t.Errorf("exists() = %v, want %v", got, tt.want)
}
})
}
}
func Test_Store_get(t *testing.T) {
type args struct {
id uuid.UUID
}
tests := []struct {
name string
s *genericStore
args args
want store.Storable
wantErr bool
}{
{
name: "exists",
s: &genericStore{
Store: map[uuid.UUID]store.Storable{
iid: &mocks.Storable{},
},
},
args: args{id: iid},
want: &mocks.Storable{},
wantErr: false,
},
{
name: "not found",
s: &genericStore{
Store: map[uuid.UUID]store.Storable{
iid: &mocks.Storable{},
},
},
args: args{id: altIid},
want: nil,
wantErr: true,
},
{
name: "not found empty",
s: newGenericStore(),
args: args{id: iid},
want: nil,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := tt.s.Get(tt.args.id)
if (err != nil) != tt.wantErr {
t.Errorf("get() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("get() got = %v, want %v", got, tt.want)
}
})
}
}
func Test_Store_UUIDs(t *testing.T) {
tests := []struct {
name string
s *genericStore
want []uuid.UUID
}{
{
name: "default",
s: &genericStore{
Store: map[uuid.UUID]store.Storable{
iid: &mocks.Storable{},
altIid: &mocks.Storable{},
},
},
want: []uuid.UUID{iid, altIid},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sort.Slice(tt.want, func(i, j int) bool {
return tt.want[i].String() < tt.want[j].String()
})
got := tt.s.UUIDs()
sort.Slice(got, func(i, j int) bool {
return got[i].String() < got[j].String()
})
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("UUIDs() = %v, want %v", got, tt.want)
}
})
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment