Newer
Older
package nucleus
import (
"encoding/json"
"io/ioutil"
"sync"
cpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/csbi"
Fabian Seidl
committed
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/device"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/store"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// FilesystemPndStore provides a filesystem implementation for a pnd store.
type FilesystemPndStore struct {
pendingChannels map[uuid.UUID]chan device.Details
csbiClient cpb.CsbiServiceClient
pathToPndFile string
fileMutex sync.Mutex
}
// NewFilesystemPndStore returns a filesystem implementation for a pnd store.
func NewFilesystemPndStore() FilesystemPndStore {
if err := store.EnsureFilesystemStorePathExists(store.PndFilename); err != nil {
log.Error(err)
}
return FilesystemPndStore{
pendingChannels: make(map[uuid.UUID]chan device.Details),
pathToPndFile: store.GetCompletePathToFileStore(store.PndFilename),
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
fileMutex: sync.Mutex{},
}
}
func (t *FilesystemPndStore) readAllPndsFromFile() ([]networkdomain.NetworkDomain, error) {
var loadedPnds []LoadedPnd
content, err := ioutil.ReadFile(t.pathToPndFile)
if err != nil {
return nil, err
}
err = json.Unmarshal(content, &loadedPnds)
if err != nil {
return nil, err
}
pnds := make([]networkdomain.NetworkDomain, len(loadedPnds))
csbiClient, err := t.getCsbiClient()
if err != nil {
return nil, err
}
for i, loadedPND := range loadedPnds {
newPnd, err := NewPND(
loadedPND.Name,
loadedPND.Description,
uuid.MustParse(loadedPND.ID),
csbiClient,
t.callback,
)
if err != nil {
return nil, err
}
pnds[i] = newPnd
}
return pnds, nil
}
func (t *FilesystemPndStore) writeAllPndsToFile(pnds []networkdomain.NetworkDomain) error {
serializedData, err := json.Marshal(pnds)
if err != nil {
return err
}
err = ioutil.WriteFile(t.pathToPndFile, serializedData, 0600)
if err != nil {
return err
}
return nil
}
// Add a pnd to the store.
func (t *FilesystemPndStore) Add(pndToAdd networkdomain.NetworkDomain) error {
t.fileMutex.Lock()
defer t.fileMutex.Unlock()
pnds, err := t.readAllPndsFromFile()
if err != nil {
return err
}
pnds = append(pnds, pndToAdd)
err = t.writeAllPndsToFile(pnds)
if err != nil {
return err
}
return nil
}
// Delete deletes a pnd from the store.
func (t *FilesystemPndStore) Delete(pndToDelete networkdomain.NetworkDomain) error {
t.fileMutex.Lock()
defer t.fileMutex.Unlock()
pnds, err := t.readAllPndsFromFile()
if err != nil {
return err
}
for i, pnd := range pnds {
if pnd.ID() == pndToDelete.ID() {
//remove item from slice
pnds[i] = pnds[len(pnds)-1]
pnds = pnds[:len(pnds)-1]
err = t.writeAllPndsToFile(pnds)
if err != nil {
return err
}
return nil
}
}
Fabian Seidl
committed
return &customerrs.CouldNotDeleteError{Identifier: pndToDelete.ID(), Type: pndToDelete, Err: err}
}
// Get provides a the query interface to find a stored pnd.
func (t *FilesystemPndStore) Get(query store.Query) (networkdomain.NetworkDomain, error) {
t.fileMutex.Lock()
defer t.fileMutex.Unlock()
pnds, err := t.readAllPndsFromFile()
if err != nil {
return nil, err
}
for _, pnd := range pnds {
if pnd.ID() == query.ID || pnd.GetName() == query.Name {
return pnd, nil
}
}
Fabian Seidl
committed
return nil, &customerrs.CouldNotFindError{ID: query.ID, Name: query.Name}
}
// GetAll returns all pnds currently on the store.
func (t *FilesystemPndStore) GetAll() ([]networkdomain.NetworkDomain, error) {
t.fileMutex.Lock()
defer t.fileMutex.Unlock()
pnds, err := t.readAllPndsFromFile()
return pnds, err
}
// PendingChannels holds channels used communicate with pending
// cSBI deployments.
func (t *FilesystemPndStore) PendingChannels(id uuid.UUID, parseErrors ...error) (chan device.Details, error) {
ch, ok := t.pendingChannels[id]
if !ok {
Fabian Seidl
committed
return nil, &customerrs.CouldNotFindError{ID: id}
}
return ch, nil
}
// AddPendingChannel adds a pending channel to the map.
func (t *FilesystemPndStore) AddPendingChannel(id uuid.UUID, ch chan device.Details) {
t.pendingChannels[id] = ch
}
// RemovePendingChannel removes a pending channel from the map.
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
func (t *FilesystemPndStore) RemovePendingChannel(id uuid.UUID) {
delete(t.pendingChannels, id)
}
func (t *FilesystemPndStore) callback(id uuid.UUID, ch chan device.Details) {
if ch != nil {
t.AddPendingChannel(id, ch)
log.Infof("pending channel %v added", id)
} else {
t.RemovePendingChannel(id)
log.Infof("pending channel %v removed", id)
}
}
func (t *FilesystemPndStore) getCsbiClient() (cpb.CsbiServiceClient, error) {
if t.csbiClient == nil {
orchestrator := viper.GetString("csbi-orchestrator")
conn, err := grpc.Dial(orchestrator, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
t.csbiClient = cpb.NewCsbiServiceClient(conn)
}
return t.csbiClient, nil
}