Newer
Older
package nucleus
import (
"encoding/json"
"sync"
cpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/csbi"
Fabian Seidl
committed
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkelement"
"code.fbi.h-da.de/danet/gosdn/controller/store"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// FilesystemPndStore provides a filesystem implementation for a pnd store.
type FilesystemPndStore struct {
pendingChannels map[uuid.UUID]chan networkelement.Details
csbiClient cpb.CsbiServiceClient
pathToPndFile string
fileMutex sync.Mutex
}
// NewFilesystemPndStore returns a filesystem implementation for a pnd store.
func NewFilesystemPndStore() FilesystemPndStore {
if err := store.EnsureFilesystemStorePathExists(store.PndFilename); err != nil {
log.Error(err)
}
return FilesystemPndStore{
pendingChannels: make(map[uuid.UUID]chan networkelement.Details),
pathToPndFile: store.GetCompletePathToFileStore(store.PndFilename),
fileMutex: sync.Mutex{},
}
}
func (t *FilesystemPndStore) readAllPndsFromFile() ([]networkdomain.NetworkDomain, error) {
var loadedPnds []LoadedPnd
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
if err != nil {
return nil, err
}
err = json.Unmarshal(content, &loadedPnds)
if err != nil {
return nil, err
}
pnds := make([]networkdomain.NetworkDomain, len(loadedPnds))
csbiClient, err := t.getCsbiClient()
if err != nil {
return nil, err
}
for i, loadedPND := range loadedPnds {
newPnd, err := NewPND(
loadedPND.Name,
loadedPND.Description,
uuid.MustParse(loadedPND.ID),
csbiClient,
t.callback,
)
if err != nil {
return nil, err
}
pnds[i] = newPnd
}
return pnds, nil
}
func (t *FilesystemPndStore) writeAllPndsToFile(pnds []networkdomain.NetworkDomain) error {
serializedData, err := json.Marshal(pnds)
if err != nil {
return err
}
err = os.WriteFile(t.pathToPndFile, serializedData, 0600)
if err != nil {
return err
}
return nil
}
// Add a pnd to the store.
func (t *FilesystemPndStore) Add(pndToAdd networkdomain.NetworkDomain) error {
t.fileMutex.Lock()
defer t.fileMutex.Unlock()
pnds, err := t.readAllPndsFromFile()
if err != nil {
return err
}
pnds = append(pnds, pndToAdd)
err = t.writeAllPndsToFile(pnds)
if err != nil {
return err
}
return nil
}
// Delete deletes a pnd from the store.
func (t *FilesystemPndStore) Delete(pndToDelete networkdomain.NetworkDomain) error {
t.fileMutex.Lock()
defer t.fileMutex.Unlock()
pnds, err := t.readAllPndsFromFile()
if err != nil {
return err
}
for i, pnd := range pnds {
if pnd.ID() == pndToDelete.ID() {
//remove item from slice
pnds[i] = pnds[len(pnds)-1]
pnds = pnds[:len(pnds)-1]
err = t.writeAllPndsToFile(pnds)
if err != nil {
return err
}
return nil
}
}
Fabian Seidl
committed
return &customerrs.CouldNotDeleteError{Identifier: pndToDelete.ID(), Type: pndToDelete, Err: err}
}
// Get provides a the query interface to find a stored pnd.
func (t *FilesystemPndStore) Get(query store.Query) (networkdomain.NetworkDomain, error) {
t.fileMutex.Lock()
defer t.fileMutex.Unlock()
pnds, err := t.readAllPndsFromFile()
if err != nil {
return nil, err
}
for _, pnd := range pnds {
if pnd.ID() == query.ID || pnd.GetName() == query.Name {
return pnd, nil
}
}
Fabian Seidl
committed
return nil, &customerrs.CouldNotFindError{ID: query.ID, Name: query.Name}
}
// GetAll returns all pnds currently on the store.
func (t *FilesystemPndStore) GetAll() ([]networkdomain.NetworkDomain, error) {
t.fileMutex.Lock()
defer t.fileMutex.Unlock()
pnds, err := t.readAllPndsFromFile()
return pnds, err
}
// PendingChannels holds channels used communicate with pending
// cSBI deployments.
func (t *FilesystemPndStore) PendingChannels(id uuid.UUID, parseErrors ...error) (chan networkelement.Details, error) {
ch, ok := t.pendingChannels[id]
if !ok {
Fabian Seidl
committed
return nil, &customerrs.CouldNotFindError{ID: id}
}
return ch, nil
}
// AddPendingChannel adds a pending channel to the map.
func (t *FilesystemPndStore) AddPendingChannel(id uuid.UUID, ch chan networkelement.Details) {
t.pendingChannels[id] = ch
}
// RemovePendingChannel removes a pending channel from the map.
func (t *FilesystemPndStore) RemovePendingChannel(id uuid.UUID) {
delete(t.pendingChannels, id)
}
func (t *FilesystemPndStore) callback(id uuid.UUID, ch chan networkelement.Details) {
if ch != nil {
t.AddPendingChannel(id, ch)
log.Infof("pending channel %v added", id)
} else {
t.RemovePendingChannel(id)
log.Infof("pending channel %v removed", id)
}
}
func (t *FilesystemPndStore) getCsbiClient() (cpb.CsbiServiceClient, error) {
if t.csbiClient == nil {
orchestrator := viper.GetString("csbi-orchestrator")
conn, err := grpc.Dial(orchestrator, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
t.csbiClient = cpb.NewCsbiServiceClient(conn)
}
return t.csbiClient, nil
}