-
Andre Sterba authored
See merge request !365
Andre Sterba authoredSee merge request !365
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
pndFilesystemStore.go 4.98 KiB
package nucleus
import (
"encoding/json"
"os"
"sync"
cpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/csbi"
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/device"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/store"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// FilesystemPndStore provides a filesystem implementation for a pnd store.
type FilesystemPndStore struct {
pendingChannels map[uuid.UUID]chan device.Details
csbiClient cpb.CsbiServiceClient
pathToPndFile string
fileMutex sync.Mutex
}
// NewFilesystemPndStore returns a filesystem implementation for a pnd store.
func NewFilesystemPndStore() FilesystemPndStore {
if err := store.EnsureFilesystemStorePathExists(store.PndFilename); err != nil {
log.Error(err)
}
return FilesystemPndStore{
pendingChannels: make(map[uuid.UUID]chan device.Details),
pathToPndFile: store.GetCompletePathToFileStore(store.PndFilename),
fileMutex: sync.Mutex{},
}
}
func (t *FilesystemPndStore) readAllPndsFromFile() ([]networkdomain.NetworkDomain, error) {
var loadedPnds []LoadedPnd
content, err := os.ReadFile(t.pathToPndFile)
if err != nil {
return nil, err
}
err = json.Unmarshal(content, &loadedPnds)
if err != nil {
return nil, err
}
pnds := make([]networkdomain.NetworkDomain, len(loadedPnds))
csbiClient, err := t.getCsbiClient()
if err != nil {
return nil, err
}
for i, loadedPND := range loadedPnds {
newPnd, err := NewPND(
loadedPND.Name,
loadedPND.Description,
uuid.MustParse(loadedPND.ID),
csbiClient,
t.callback,
)
if err != nil {
return nil, err
}
pnds[i] = newPnd
}
return pnds, nil
}
func (t *FilesystemPndStore) writeAllPndsToFile(pnds []networkdomain.NetworkDomain) error {
serializedData, err := json.Marshal(pnds)
if err != nil {
return err
}
err = os.WriteFile(t.pathToPndFile, serializedData, 0600)
if err != nil {
return err
}
return nil
}
// Add a pnd to the store.
func (t *FilesystemPndStore) Add(pndToAdd networkdomain.NetworkDomain) error {
t.fileMutex.Lock()
defer t.fileMutex.Unlock()
pnds, err := t.readAllPndsFromFile()
if err != nil {
return err
}
pnds = append(pnds, pndToAdd)
err = t.writeAllPndsToFile(pnds)
if err != nil {
return err
}
return nil
}
// Delete deletes a pnd from the store.
func (t *FilesystemPndStore) Delete(pndToDelete networkdomain.NetworkDomain) error {
t.fileMutex.Lock()
defer t.fileMutex.Unlock()
pnds, err := t.readAllPndsFromFile()
if err != nil {
return err
}
for i, pnd := range pnds {
if pnd.ID() == pndToDelete.ID() {
//remove item from slice
pnds[i] = pnds[len(pnds)-1]
pnds = pnds[:len(pnds)-1]
err = t.writeAllPndsToFile(pnds)
if err != nil {
return err
}
return nil
}
}
return &customerrs.CouldNotDeleteError{Identifier: pndToDelete.ID(), Type: pndToDelete, Err: err}
}
// Get provides a the query interface to find a stored pnd.
func (t *FilesystemPndStore) Get(query store.Query) (networkdomain.NetworkDomain, error) {
t.fileMutex.Lock()
defer t.fileMutex.Unlock()
pnds, err := t.readAllPndsFromFile()
if err != nil {
return nil, err
}
for _, pnd := range pnds {
if pnd.ID() == query.ID || pnd.GetName() == query.Name {
return pnd, nil
}
}
return nil, &customerrs.CouldNotFindError{ID: query.ID, Name: query.Name}
}
// GetAll returns all pnds currently on the store.
func (t *FilesystemPndStore) GetAll() ([]networkdomain.NetworkDomain, error) {
t.fileMutex.Lock()
defer t.fileMutex.Unlock()
pnds, err := t.readAllPndsFromFile()
return pnds, err
}
// PendingChannels holds channels used communicate with pending
// cSBI deployments.
func (t *FilesystemPndStore) PendingChannels(id uuid.UUID, parseErrors ...error) (chan device.Details, error) {
ch, ok := t.pendingChannels[id]
if !ok {
return nil, &customerrs.CouldNotFindError{ID: id}
}
return ch, nil
}
// AddPendingChannel adds a pending channel to the map.
func (t *FilesystemPndStore) AddPendingChannel(id uuid.UUID, ch chan device.Details) {
t.pendingChannels[id] = ch
}
// RemovePendingChannel removes a pending channel from the map.
func (t *FilesystemPndStore) RemovePendingChannel(id uuid.UUID) {
delete(t.pendingChannels, id)
}
func (t *FilesystemPndStore) callback(id uuid.UUID, ch chan device.Details) {
if ch != nil {
t.AddPendingChannel(id, ch)
log.Infof("pending channel %v added", id)
} else {
t.RemovePendingChannel(id)
log.Infof("pending channel %v removed", id)
}
}
func (t *FilesystemPndStore) getCsbiClient() (cpb.CsbiServiceClient, error) {
if t.csbiClient == nil {
orchestrator := viper.GetString("csbi-orchestrator")
conn, err := grpc.Dial(orchestrator, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
t.csbiClient = cpb.NewCsbiServiceClient(conn)
}
return t.csbiClient, nil
}