Newer
Older
package nucleus
import (
"encoding/json"
"sync"
cpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/csbi"
plugin_registry "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/plugin-registry"
Fabian Seidl
committed
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/plugin"
"code.fbi.h-da.de/danet/gosdn/controller/store"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// FilesystemPndStore provides a filesystem implementation for a pnd store.
type FilesystemPndStore struct {
pendingChannels map[uuid.UUID]chan networkelement.Details
pluginService plugin.Service
csbiClient cpb.CsbiServiceClient
pluginRegistryClient plugin_registry.PluginRegistryServiceClient
pathToPndFile string
fileMutex sync.Mutex
}
// NewFilesystemPndStore returns a filesystem implementation for a pnd store.
func NewFilesystemPndStore(pluginService plugin.Service) FilesystemPndStore {
if err := store.EnsureFilesystemStorePathExists(store.PndFilename); err != nil {
log.Error(err)
}
return FilesystemPndStore{
pendingChannels: make(map[uuid.UUID]chan networkelement.Details),
pathToPndFile: store.GetCompletePathToFileStore(store.PndFilename),
fileMutex: sync.Mutex{},
pluginService: pluginService,
}
}
func (t *FilesystemPndStore) readAllPndsFromFile() ([]networkdomain.NetworkDomain, error) {
var loadedPnds []LoadedPnd
if err != nil {
return nil, err
}
err = json.Unmarshal(content, &loadedPnds)
if err != nil {
return nil, err
}
pnds := make([]networkdomain.NetworkDomain, len(loadedPnds))
csbiClient, err := t.getCsbiClient()
if err != nil {
return nil, err
}
pluginRegistryClient, err := t.getPluginRegistryClient()
if err != nil {
return nil, err
}
for i, loadedPND := range loadedPnds {
newPnd, err := NewPND(
loadedPND.Name,
loadedPND.Description,
uuid.MustParse(loadedPND.ID),
csbiClient,
pluginRegistryClient,
t.pluginService,
t.callback,
)
if err != nil {
return nil, err
}
pnds[i] = newPnd
}
return pnds, nil
}
func (t *FilesystemPndStore) writeAllPndsToFile(pnds []networkdomain.NetworkDomain) error {
serializedData, err := json.Marshal(pnds)
if err != nil {
return err
}
err = os.WriteFile(t.pathToPndFile, serializedData, 0600)
if err != nil {
return err
}
return nil
}
// Add a pnd to the store.
func (t *FilesystemPndStore) Add(pndToAdd networkdomain.NetworkDomain) error {
t.fileMutex.Lock()
defer t.fileMutex.Unlock()
pnds, err := t.readAllPndsFromFile()
if err != nil {
return err
}
pnds = append(pnds, pndToAdd)
err = t.writeAllPndsToFile(pnds)
if err != nil {
return err
}
return nil
}
// Delete deletes a pnd from the store.
func (t *FilesystemPndStore) Delete(pndToDelete networkdomain.NetworkDomain) error {
t.fileMutex.Lock()
defer t.fileMutex.Unlock()
pnds, err := t.readAllPndsFromFile()
if err != nil {
return err
}
for i, pnd := range pnds {
if pnd.ID() == pndToDelete.ID() {
//remove item from slice
pnds[i] = pnds[len(pnds)-1]
pnds = pnds[:len(pnds)-1]
err = t.writeAllPndsToFile(pnds)
if err != nil {
return err
}
return nil
}
}
Fabian Seidl
committed
return &customerrs.CouldNotDeleteError{Identifier: pndToDelete.ID(), Type: pndToDelete, Err: err}
}
// Get provides a the query interface to find a stored pnd.
func (t *FilesystemPndStore) Get(query store.Query) (networkdomain.NetworkDomain, error) {
t.fileMutex.Lock()
defer t.fileMutex.Unlock()
pnds, err := t.readAllPndsFromFile()
if err != nil {
return nil, err
}
for _, pnd := range pnds {
if pnd.ID() == query.ID || pnd.GetName() == query.Name {
return pnd, nil
}
}
Fabian Seidl
committed
return nil, &customerrs.CouldNotFindError{ID: query.ID, Name: query.Name}
}
// GetAll returns all pnds currently on the store.
func (t *FilesystemPndStore) GetAll() ([]networkdomain.NetworkDomain, error) {
t.fileMutex.Lock()
defer t.fileMutex.Unlock()
pnds, err := t.readAllPndsFromFile()
return pnds, err
}
// PendingChannels holds channels used communicate with pending
// cSBI deployments.
func (t *FilesystemPndStore) PendingChannels(id uuid.UUID, parseErrors ...error) (chan networkelement.Details, error) {
ch, ok := t.pendingChannels[id]
if !ok {
Fabian Seidl
committed
return nil, &customerrs.CouldNotFindError{ID: id}
}
return ch, nil
}
// AddPendingChannel adds a pending channel to the map.
func (t *FilesystemPndStore) AddPendingChannel(id uuid.UUID, ch chan networkelement.Details) {
t.pendingChannels[id] = ch
}
// RemovePendingChannel removes a pending channel from the map.
func (t *FilesystemPndStore) RemovePendingChannel(id uuid.UUID) {
delete(t.pendingChannels, id)
}
func (t *FilesystemPndStore) callback(id uuid.UUID, ch chan networkelement.Details) {
if ch != nil {
t.AddPendingChannel(id, ch)
log.Infof("pending channel %v added", id)
} else {
t.RemovePendingChannel(id)
log.Infof("pending channel %v removed", id)
}
}
func (t *FilesystemPndStore) getCsbiClient() (cpb.CsbiServiceClient, error) {
if t.csbiClient == nil {
orchestrator := viper.GetString("csbi-orchestrator")
conn, err := grpc.Dial(orchestrator, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
t.csbiClient = cpb.NewCsbiServiceClient(conn)
}
return t.csbiClient, nil
}
func (t *FilesystemPndStore) getPluginRegistryClient() (plugin_registry.PluginRegistryServiceClient, error) {
if t.pluginRegistryClient == nil {
pluginRegistry := viper.GetString("plugin-registry")
conn, err := grpc.Dial(pluginRegistry, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
t.pluginRegistryClient = plugin_registry.NewPluginRegistryServiceClient(conn)
}
return t.pluginRegistryClient, nil
}