Skip to content
Snippets Groups Projects
Commit 81d7d8ce authored by André Sterba's avatar André Sterba
Browse files

Merge branch...

Merge branch '226-implement-m2m-protocol-for-event-management-of-sbi-data-plane-communication' into istaester/init-topology
parents 68b64e12 191a7d3b
Branches
No related tags found
5 merge requests!376Add additional example application hostname-checker,!349Northbound refactoring to implement NIB concept for devices,!343Add basic application framework and example application to show interaction between events an NBI,!339Create basic venv-manager for use with arista,!324Provide prototype implementation for topology handling
Showing
with 339 additions and 31 deletions
...@@ -30,6 +30,7 @@ controller/config/*_test.toml ...@@ -30,6 +30,7 @@ controller/config/*_test.toml
controller/configs/ci-testing-gosdn.toml controller/configs/ci-testing-gosdn.toml
controller/stores_testing controller/stores_testing
controller/stores/** controller/stores/**
controller/cmd/gosdn/stores/
controller/plugins controller/plugins
controller/config/.gosdnc.toml controller/config/.gosdnc.toml
controller/debug.test controller/debug.test
......
...@@ -13,6 +13,7 @@ import ( ...@@ -13,6 +13,7 @@ import (
apb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/rbac" apb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/rbac"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport" tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
"code.fbi.h-da.de/danet/gosdn/controller/config" "code.fbi.h-da.de/danet/gosdn/controller/config"
eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/device" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/device"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/rbac" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/rbac"
...@@ -86,10 +87,12 @@ func bootstrapUnitTest() { ...@@ -86,10 +87,12 @@ func bootstrapUnitTest() {
log.Fatal(err) log.Fatal(err)
} }
eventService := eventservice.NewMockEventService()
pndStore = nucleus.NewMemoryPndStore() pndStore = nucleus.NewMemoryPndStore()
sbiStore = nucleus.NewMemorySbiStore() sbiStore = nucleus.NewMemorySbiStore()
userService = rbacImpl.NewUserService(rbacImpl.NewMemoryUserStore()) userService = rbacImpl.NewUserService(rbacImpl.NewMemoryUserStore(), eventService)
roleService = rbacImpl.NewRoleService(rbacImpl.NewMemoryRoleStore()) roleService = rbacImpl.NewRoleService(rbacImpl.NewMemoryRoleStore(), eventService)
clearAndCreateAuthTestSetup() clearAndCreateAuthTestSetup()
previousHostname := "previousHostname" previousHostname := "previousHostname"
......
...@@ -21,6 +21,13 @@ const ( ...@@ -21,6 +21,13 @@ const (
jwtDurationKey = "defaultJWTDuration" jwtDurationKey = "defaultJWTDuration"
defaultJWTDuration = time.Hour * 24 defaultJWTDuration = time.Hour * 24
jwtSecretKey = "jwtSecret" jwtSecretKey = "jwtSecret"
// RabbitMQ Broker
amqpPrefixKey = "amqpPrefix"
amqpUserKey = "amqpUser"
amqpPasswordKey = "amqpPassword"
amqpHostKey = "amqpHost"
amqpPortKey = "amqpPort"
) )
// BasePndUUID is an uuid for the base PND // BasePndUUID is an uuid for the base PND
...@@ -50,6 +57,21 @@ var JWTDuration time.Duration ...@@ -50,6 +57,21 @@ var JWTDuration time.Duration
// JWTSecret determines the scret that is used to sign tokens // JWTSecret determines the scret that is used to sign tokens
var JWTSecret string var JWTSecret string
// AMQPPrefix is the amqp prefix
var AMQPPrefix string
// AMQPUser is the amqp user
var AMQPUser string
// AMQPPassword is the amqp user password
var AMQPPassword string
// AMQPHost is the amqp host
var AMQPHost string
// AMQPPort is the amqp port
var AMQPPort string
// Init gets called on module import // Init gets called on module import
func Init() { func Init() {
err := InitializeConfig() err := InitializeConfig()
...@@ -103,6 +125,8 @@ func InitializeConfig() error { ...@@ -103,6 +125,8 @@ func InitializeConfig() error {
JWTSecret = viper.GetString(jwtSecretKey) JWTSecret = viper.GetString(jwtSecretKey)
loadAMQPConfig()
if err := viper.WriteConfig(); err != nil { if err := viper.WriteConfig(); err != nil {
return err return err
} }
...@@ -171,3 +195,11 @@ func getDurationFromViper(viperKey string, unit time.Duration) (time.Duration, e ...@@ -171,3 +195,11 @@ func getDurationFromViper(viperKey string, unit time.Duration) (time.Duration, e
return 0, viper.ConfigParseError{} return 0, viper.ConfigParseError{}
} }
func loadAMQPConfig() {
AMQPPrefix = getStringFromViper(amqpPrefixKey)
AMQPUser = getStringFromViper(amqpUserKey)
AMQPPassword = getStringFromViper(amqpPasswordKey)
AMQPHost = getStringFromViper(amqpHostKey)
AMQPPort = getStringFromViper(amqpPortKey)
}
...@@ -12,3 +12,8 @@ socket = ":55055" ...@@ -12,3 +12,8 @@ socket = ":55055"
databaseConnection = "mongodb://root:example@clab-gosdn_csbi_arista_base-mongodb:27017" databaseConnection = "mongodb://root:example@clab-gosdn_csbi_arista_base-mongodb:27017"
filesystemPathToStores = "stores" filesystemPathToStores = "stores"
amqpPrefix = "amqp://"
amqpUser = "guest"
amqpPassword = "guest"
amqpHost = "localhost"
amqpPort = "5672"
...@@ -12,3 +12,9 @@ socket = ":55055" ...@@ -12,3 +12,9 @@ socket = ":55055"
databaseConnection = "mongodb://root:example@localhost:27017" databaseConnection = "mongodb://root:example@localhost:27017"
filesystemPathToStores = "stores" filesystemPathToStores = "stores"
amqpPrefix = "amqp://"
amqpUser = "guest"
amqpPassword = "guest"
amqpHost = "localhost"
amqpPort = "5672"
...@@ -26,6 +26,7 @@ import ( ...@@ -26,6 +26,7 @@ import (
spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound" spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/topology" tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/topology"
"code.fbi.h-da.de/danet/gosdn/controller/config" "code.fbi.h-da.de/danet/gosdn/controller/config"
eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/device" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/device"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/rbac" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/rbac"
...@@ -37,6 +38,8 @@ import ( ...@@ -37,6 +38,8 @@ import (
"code.fbi.h-da.de/danet/gosdn/controller/topology/nodes" "code.fbi.h-da.de/danet/gosdn/controller/topology/nodes"
"code.fbi.h-da.de/danet/gosdn/controller/topology/ports" "code.fbi.h-da.de/danet/gosdn/controller/topology/ports"
eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus" "code.fbi.h-da.de/danet/gosdn/controller/nucleus"
) )
...@@ -54,6 +57,7 @@ type Core struct { ...@@ -54,6 +57,7 @@ type Core struct {
httpServer *http.Server httpServer *http.Server
grpcServer *grpc.Server grpcServer *grpc.Server
nbi *nbi.NorthboundInterface nbi *nbi.NorthboundInterface
eventService eventInterfaces.Service
stopChan chan os.Signal stopChan chan os.Signal
csbiClient cpb.CsbiServiceClient csbiClient cpb.CsbiServiceClient
...@@ -68,21 +72,27 @@ func initialize() error { ...@@ -68,21 +72,27 @@ func initialize() error {
return err return err
} }
eventService, err := eventservice.NewEventService()
if err != nil {
return err
}
nodeService := nodes.NewNodeService(nodes.NewDatabaseNodeStore()) nodeService := nodes.NewNodeService(nodes.NewDatabaseNodeStore())
portService := ports.NewPortService(ports.NewDatabasePortStore()) portService := ports.NewPortService(ports.NewDatabasePortStore())
c = &Core{ c = &Core{
pndStore: nucleus.NewPndStore(), pndStore: nucleus.NewPndStore(),
userService: rbacImpl.NewUserService(rbacImpl.NewUserStore()), userService: rbacImpl.NewUserService(rbacImpl.NewUserStore(), eventService),
roleService: rbacImpl.NewRoleService(rbacImpl.NewRoleStore()), roleService: rbacImpl.NewRoleService(rbacImpl.NewRoleStore(), eventService),
topologyService: topology.NewTopologyService( topologyService: topology.NewTopologyService(
topology.NewDatabaseTopologyStore(), topology.NewDatabaseTopologyStore(),
nodeService, nodeService,
portService, portService,
), ),
nodeService: nodeService, nodeService: nodeService,
portService: portService, portService: portService,
stopChan: make(chan os.Signal, 1), eventService: eventService,
stopChan: make(chan os.Signal, 1),
} }
// Setting up signal capturing // Setting up signal capturing
...@@ -292,6 +302,7 @@ func shutdown() error { ...@@ -292,6 +302,7 @@ func shutdown() error {
coreLock.Lock() coreLock.Lock()
defer coreLock.Unlock() defer coreLock.Unlock()
c.grpcServer.GracefulStop() c.grpcServer.GracefulStop()
c.eventService.CloseConnection()
return stopHttpServer() return stopHttpServer()
} }
......
package event
import "github.com/google/uuid"
// Event is a event that can be published via the event service as payload.
type Event struct {
ID uuid.UUID
EntityID uuid.UUID
Type string
}
// TypeAdd is an add event.
const TypeAdd = "add"
// TypeUpdate is an update event.
const TypeUpdate = "update"
// TypeDelete is a delete event.
const TypeDelete = "delete"
// NewAddEvent creates a new add event.
func NewAddEvent(entityID uuid.UUID) Event {
return Event{
ID: uuid.New(),
EntityID: entityID,
Type: TypeAdd,
}
}
// NewDeleteEvent creates a new delete event.
func NewDeleteEvent(entityID uuid.UUID) Event {
return Event{
ID: uuid.New(),
EntityID: entityID,
Type: TypeDelete,
}
}
// NewUpdateEvent creates a new update event.
func NewUpdateEvent(entityID uuid.UUID) Event {
return Event{
ID: uuid.New(),
EntityID: entityID,
Type: TypeUpdate,
}
}
package eventservice
import (
"encoding/json"
"code.fbi.h-da.de/danet/gosdn/controller/config"
"code.fbi.h-da.de/danet/gosdn/controller/event"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors"
interfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
amqp "github.com/rabbitmq/amqp091-go"
)
// EventService is used to setup a connection to a broker and publish events to topics.
type EventService struct {
connection *amqp.Connection
channel *amqp.Channel
}
// NewEventService creates a new connection to the broker and opens a channel for later usage.
func NewEventService() (interfaces.Service, error) {
// TODO: This is an fugly hack to mitigate that some tests are trying to connect to an actual broker. (staester)
if config.AMQPPrefix == "" {
return NewMockEventService(), nil
}
conn, err := amqp.Dial(amqpURIBuilder(config.AMQPPrefix, config.AMQPUser, config.AMQPPassword, config.AMQPHost, config.AMQPPort))
if err != nil {
return nil, &errors.ErrAMQPInitFail{Action: "failed to connect to RabbitMQ", Err: err}
}
ch, err := conn.Channel()
if err != nil {
return nil, &errors.ErrAMQPInitFail{Action: "failed to open a channel", Err: err}
}
return &EventService{
connection: conn,
channel: ch,
}, nil
}
// PublishEvent declares a queue and publishes events.
func (e *EventService) PublishEvent(topic string, event event.Event) error {
q, err := e.channel.QueueDeclare(
topic, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return &errors.ErrAMQPInitFail{Action: "failed declaring queue", Err: err}
}
eventBody, err := json.Marshal(event)
if err != nil {
return &errors.ErrCouldNotMarshall{Identifier: topic + " " + event.EntityID.String(), Type: event.Type, Err: err}
}
err = e.channel.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: eventBody,
})
if err != nil {
return &errors.ErrAMQPMessageFail{Action: "failed to publish message", Err: err}
}
return nil
}
// CloseConnection closes an exisiting connection.
func (e *EventService) CloseConnection() {
e.connection.Close()
}
package eventservice
import (
"fmt"
"code.fbi.h-da.de/danet/gosdn/controller/event"
interfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
)
func amqpURIBuilder(prefix, user, pass, host, port string) string {
return fmt.Sprintf("%s%s:%s@%s:%s/", prefix, user, pass, host, port)
}
// Service is the event service
type Service interface {
PublishEvent(topic string, event event.Event) error
CloseConnection()
}
// MockEventService is used to setup a connection to a broker and publish events to topics.
type MockEventService struct {
Queue map[string][]event.Event
}
// NewMockEventService creates a new connection to the broker and opens a channel for later usage.
func NewMockEventService() interfaces.Service {
return &MockEventService{
Queue: make(map[string][]event.Event),
}
}
// PublishEvent declares a queue and publishes events.
func (e *MockEventService) PublishEvent(topic string, event event.Event) error {
e.Queue[topic] = append(e.Queue[topic], event)
return nil
}
// CloseConnection closes an exisiting connection.
func (e *MockEventService) CloseConnection() {
}
package event
import "code.fbi.h-da.de/danet/gosdn/controller/event"
// Service is the event service
type Service interface {
PublishEvent(topic string, event event.Event) error
CloseConnection()
}
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
apb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/rbac" apb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/rbac"
spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound" spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound"
eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus" "code.fbi.h-da.de/danet/gosdn/controller/nucleus"
"code.fbi.h-da.de/danet/gosdn/controller/rbac" "code.fbi.h-da.de/danet/gosdn/controller/rbac"
"google.golang.org/grpc" "google.golang.org/grpc"
...@@ -20,11 +21,13 @@ import ( ...@@ -20,11 +21,13 @@ import (
func getTestAuthInterceptorServer(t *testing.T) (*AuthInterceptor, *User, *Role, *SbiServer) { func getTestAuthInterceptorServer(t *testing.T) (*AuthInterceptor, *User, *Role, *SbiServer) {
initUUIDs(t) initUUIDs(t)
jwtManager := rbac.NewJWTManager("test", time.Minute) jwtManager := rbac.NewJWTManager("test", time.Minute)
eventService := eventservice.NewMockEventService()
userStore := rbac.NewMemoryUserStore() userStore := rbac.NewMemoryUserStore()
userService := rbac.NewUserService(userStore) userService := rbac.NewUserService(userStore, eventService)
roleStore := rbac.NewMemoryRoleStore() roleStore := rbac.NewMemoryRoleStore()
roleService := rbac.NewRoleService(roleStore) roleService := rbac.NewRoleService(roleStore, eventService)
mockPnd := getMockPnd(t) mockPnd := getMockPnd(t)
......
...@@ -7,18 +7,20 @@ import ( ...@@ -7,18 +7,20 @@ import (
"time" "time"
apb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/rbac" apb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/rbac"
eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService"
"code.fbi.h-da.de/danet/gosdn/controller/rbac" "code.fbi.h-da.de/danet/gosdn/controller/rbac"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
) )
func getTestAuthServer(t *testing.T) *Auth { func getTestAuthServer(t *testing.T) *Auth {
jwtManager := rbac.NewJWTManager("test", time.Minute) jwtManager := rbac.NewJWTManager("test", time.Minute)
eventService := eventservice.NewMockEventService()
userStore := rbac.NewMemoryUserStore() userStore := rbac.NewMemoryUserStore()
userService := rbac.NewUserService(userStore) userService := rbac.NewUserService(userStore, eventService)
roleStore := rbac.NewMemoryRoleStore() roleStore := rbac.NewMemoryRoleStore()
roleService := rbac.NewRoleService(roleStore) roleService := rbac.NewRoleService(roleStore, eventService)
s := NewAuthServer(jwtManager, userService) s := NewAuthServer(jwtManager, userService)
err := clearAndCreateAuthTestSetup(s.userService, roleService) err := clearAndCreateAuthTestSetup(s.userService, roleService)
......
...@@ -9,16 +9,19 @@ import ( ...@@ -9,16 +9,19 @@ import (
apb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/rbac" apb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/rbac"
"code.fbi.h-da.de/danet/gosdn/controller/rbac" "code.fbi.h-da.de/danet/gosdn/controller/rbac"
"github.com/google/uuid" "github.com/google/uuid"
eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService"
) )
func getTestRoleServer(t *testing.T) *Role { func getTestRoleServer(t *testing.T) *Role {
jwtManager := rbac.NewJWTManager("test", time.Second) jwtManager := rbac.NewJWTManager("test", time.Second)
eventService := eventservice.NewMockEventService()
userStore := rbac.NewMemoryUserStore() userStore := rbac.NewMemoryUserStore()
userService := rbac.NewUserService(userStore) userService := rbac.NewUserService(userStore, eventService)
roleStore := rbac.NewMemoryRoleStore() roleStore := rbac.NewMemoryRoleStore()
roleService := rbac.NewRoleService(roleStore) roleService := rbac.NewRoleService(roleStore, eventService)
s := NewRoleServer(jwtManager, roleService) s := NewRoleServer(jwtManager, roleService)
err := clearAndCreateAuthTestSetup(userService, roleService) err := clearAndCreateAuthTestSetup(userService, roleService)
......
...@@ -7,18 +7,20 @@ import ( ...@@ -7,18 +7,20 @@ import (
"time" "time"
apb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/rbac" apb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/rbac"
eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService"
"code.fbi.h-da.de/danet/gosdn/controller/rbac" "code.fbi.h-da.de/danet/gosdn/controller/rbac"
"github.com/google/uuid" "github.com/google/uuid"
) )
func getTestUserServer(t *testing.T) *User { func getTestUserServer(t *testing.T) *User {
jwtManager := rbac.NewJWTManager("test", time.Second) jwtManager := rbac.NewJWTManager("test", time.Second)
eventService := eventservice.NewMockEventService()
userStore := rbac.NewMemoryUserStore() userStore := rbac.NewMemoryUserStore()
userService := rbac.NewUserService(userStore) userService := rbac.NewUserService(userStore, eventService)
roleStore := rbac.NewMemoryRoleStore() roleStore := rbac.NewMemoryRoleStore()
roleService := rbac.NewRoleService(roleStore) roleService := rbac.NewRoleService(roleStore, eventService)
s := NewUserServer(jwtManager, userService) s := NewUserServer(jwtManager, userService)
err := clearAndCreateAuthTestSetup(s.userService, roleService) err := clearAndCreateAuthTestSetup(s.userService, roleService)
......
...@@ -4,7 +4,9 @@ import ( ...@@ -4,7 +4,9 @@ import (
"fmt" "fmt"
spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound" spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound"
"code.fbi.h-da.de/danet/gosdn/controller/event"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/device" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/device"
eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound"
"code.fbi.h-da.de/danet/gosdn/controller/store" "code.fbi.h-da.de/danet/gosdn/controller/store"
"github.com/google/uuid" "github.com/google/uuid"
...@@ -13,18 +15,29 @@ import ( ...@@ -13,18 +15,29 @@ import (
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport" tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
) )
const (
// DeviceEventTopic is the used topic for device related entity changes.
DeviceEventTopic = "device"
)
// DeviceService provides a device service implementation. // DeviceService provides a device service implementation.
// This services provides abstraction between the user (e.g a PND) and the matching store (e.g. deviceStore) // This services provides abstraction between the user (e.g a PND) and the matching store (e.g. deviceStore)
type DeviceService struct { type DeviceService struct {
deviceStore device.Store deviceStore device.Store
sbiService southbound.Service sbiService southbound.Service
eventService eventInterfaces.Service
} }
// NewDeviceService creates a device service. // NewDeviceService creates a device service.
func NewDeviceService(deviceStore device.Store, sbiService southbound.Service) device.Service { func NewDeviceService(
deviceStore device.Store,
sbiService southbound.Service,
eventService eventInterfaces.Service,
) device.Service {
return &DeviceService{ return &DeviceService{
deviceStore: deviceStore, deviceStore: deviceStore,
sbiService: sbiService, sbiService: sbiService,
eventService: eventService,
} }
} }
...@@ -71,6 +84,8 @@ func (s *DeviceService) Add(deviceToAdd device.Device) error { ...@@ -71,6 +84,8 @@ func (s *DeviceService) Add(deviceToAdd device.Device) error {
return err return err
} }
s.eventService.PublishEvent(DeviceEventTopic, event.NewAddEvent(deviceToAdd.ID()))
return nil return nil
} }
...@@ -81,6 +96,8 @@ func (s *DeviceService) Update(deviceToUpdate device.Device) error { ...@@ -81,6 +96,8 @@ func (s *DeviceService) Update(deviceToUpdate device.Device) error {
return err return err
} }
s.eventService.PublishEvent(DeviceEventTopic, event.NewUpdateEvent(deviceToUpdate.ID()))
return nil return nil
} }
...@@ -96,6 +113,9 @@ func (s *DeviceService) Delete(deviceToDelete device.Device) error { ...@@ -96,6 +113,9 @@ func (s *DeviceService) Delete(deviceToDelete device.Device) error {
return err return err
} }
} }
s.eventService.PublishEvent(DeviceEventTopic, event.NewDeleteEvent(deviceToDelete.ID()))
return nil return nil
} }
......
...@@ -3,6 +3,7 @@ package nucleus ...@@ -3,6 +3,7 @@ package nucleus
import ( import (
"testing" "testing"
eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/device" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/device"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound"
"code.fbi.h-da.de/danet/gosdn/controller/mocks" "code.fbi.h-da.de/danet/gosdn/controller/mocks"
...@@ -22,12 +23,14 @@ func getMockDevice(deviceID uuid.UUID, sbi southbound.SouthboundInterface) devic ...@@ -22,12 +23,14 @@ func getMockDevice(deviceID uuid.UUID, sbi southbound.SouthboundInterface) devic
} }
func getDeviceTestStores(t *testing.T, deviceID uuid.UUID) (device.Service, southbound.Service, device.Device, southbound.SouthboundInterface) { func getDeviceTestStores(t *testing.T, deviceID uuid.UUID) (device.Service, southbound.Service, device.Device, southbound.SouthboundInterface) {
eventService := eventservice.NewMockEventService()
sbiStore := NewMemorySbiStore() sbiStore := NewMemorySbiStore()
deviceStore := NewMemoryDeviceStore() deviceStore := NewMemoryDeviceStore()
sbiService := NewSbiService(sbiStore) sbiService := NewSbiService(sbiStore, eventService)
deviceService := NewDeviceService( deviceService := NewDeviceService(
deviceStore, deviceStore,
sbiService, sbiService,
eventService,
) )
sbi, err := NewSBI(spb.Type_TYPE_OPENCONFIG) sbi, err := NewSBI(spb.Type_TYPE_OPENCONFIG)
......
...@@ -229,3 +229,25 @@ type ErrNoNewChanges struct { ...@@ -229,3 +229,25 @@ type ErrNoNewChanges struct {
func (e ErrNoNewChanges) Error() string { func (e ErrNoNewChanges) Error() string {
return fmt.Sprintf("There are no changes between %v and %v", e.Original, e.Modified) return fmt.Sprintf("There are no changes between %v and %v", e.Original, e.Modified)
} }
// ErrAMQPInitFail implements the Error interface and is called if there is any issue related to
// the setup of the event management.
type ErrAMQPInitFail struct {
Action string
Err error
}
func (e ErrAMQPInitFail) Error() string {
return fmt.Sprintf("Action: %s, Internal error: %v", e.Action, e.Err)
}
// ErrAMQPMessageFail implements the Error interface and is called if there is any issue with sending
// or receiving messages.
type ErrAMQPMessageFail struct {
Action string
Err error
}
func (e ErrAMQPMessageFail) Error() string {
return fmt.Sprintf("Action: %s, Internal error: %v", e.Action, e.Err)
}
...@@ -9,6 +9,7 @@ import ( ...@@ -9,6 +9,7 @@ import (
"testing" "testing"
"time" "time"
eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/device" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/device"
"code.fbi.h-da.de/danet/gosdn/controller/store" "code.fbi.h-da.de/danet/gosdn/controller/store"
...@@ -153,12 +154,15 @@ func mockDevice() device.Device { ...@@ -153,12 +154,15 @@ func mockDevice() device.Device {
} }
func newPnd() pndImplementation { func newPnd() pndImplementation {
eventService := eventservice.NewMockEventService()
sbiStore := NewMemorySbiStore() sbiStore := NewMemorySbiStore()
deviceStore := NewMemoryDeviceStore() deviceStore := NewMemoryDeviceStore()
sbiService := NewSbiService(sbiStore) sbiService := NewSbiService(sbiStore, eventService)
deviceService := NewDeviceService( deviceService := NewDeviceService(
deviceStore, deviceStore,
sbiService, sbiService,
eventService,
) )
return pndImplementation{ return pndImplementation{
......
...@@ -20,11 +20,13 @@ import ( ...@@ -20,11 +20,13 @@ import (
ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd" ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound" spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport" tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/device" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/device"
eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors" "code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors"
...@@ -52,14 +54,20 @@ func NewPND( ...@@ -52,14 +54,20 @@ func NewPND(
c cpb.CsbiServiceClient, c cpb.CsbiServiceClient,
callback func(uuid.UUID, chan device.Details), callback func(uuid.UUID, chan device.Details),
) (networkdomain.NetworkDomain, error) { ) (networkdomain.NetworkDomain, error) {
eventService, err := eventservice.NewEventService()
if err != nil {
return nil, err
}
sbiStore := NewSbiStore(id) sbiStore := NewSbiStore(id)
deviceStore := NewDeviceStore(id) deviceStore := NewDeviceStore(id)
changeStore := store.NewChangeStore() changeStore := store.NewChangeStore()
sbiService := NewSbiService(sbiStore) sbiService := NewSbiService(sbiStore, eventService)
deviceService := NewDeviceService( deviceService := NewDeviceService(
deviceStore, deviceStore,
sbiService, sbiService,
eventService,
) )
changeStore, ok := changeStoreMap[id] changeStore, ok := changeStoreMap[id]
...@@ -76,8 +84,9 @@ func NewPND( ...@@ -76,8 +84,9 @@ func NewPND(
changes: changeStore, changes: changeStore,
Id: id, Id: id,
csbiClient: c, csbiClient: c,
callback: callback, callback: callback,
eventService: eventService,
} }
existingSBIs, err := sbiStore.GetAll() existingSBIs, err := sbiStore.GetAll()
...@@ -105,8 +114,9 @@ type pndImplementation struct { ...@@ -105,8 +114,9 @@ type pndImplementation struct {
//nolint //nolint
Id uuid.UUID `json:"id,omitempty"` Id uuid.UUID `json:"id,omitempty"`
csbiClient cpb.CsbiServiceClient csbiClient cpb.CsbiServiceClient
callback func(uuid.UUID, chan device.Details) callback func(uuid.UUID, chan device.Details)
eventService eventInterfaces.Service
} }
func (pnd *pndImplementation) PendingChanges() []uuid.UUID { func (pnd *pndImplementation) PendingChanges() []uuid.UUID {
......
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd" ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound" spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport" tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/device" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/device"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound"
...@@ -366,10 +367,11 @@ func Test_pndImplementation_RemoveSbi(t *testing.T) { ...@@ -366,10 +367,11 @@ func Test_pndImplementation_RemoveSbi(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
eventService := eventservice.NewMockEventService()
sbiStore := NewMemorySbiStore() sbiStore := NewMemorySbiStore()
deviceStore := NewMemoryDeviceStore() deviceStore := NewMemoryDeviceStore()
sbiService := NewSbiService(sbiStore) sbiService := NewSbiService(sbiStore, eventService)
deviceService := NewDeviceService(deviceStore, sbiService) deviceService := NewDeviceService(deviceStore, sbiService, eventService)
pnd := &pndImplementation{ pnd := &pndImplementation{
Name: "test-remove-sbi", Name: "test-remove-sbi",
...@@ -431,10 +433,11 @@ func Test_pndImplementation_RemoveSbiWithAssociatedDevices(t *testing.T) { ...@@ -431,10 +433,11 @@ func Test_pndImplementation_RemoveSbiWithAssociatedDevices(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
eventService := eventservice.NewMockEventService()
sbiStore := NewMemorySbiStore() sbiStore := NewMemorySbiStore()
deviceStore := NewMemoryDeviceStore() deviceStore := NewMemoryDeviceStore()
sbiService := NewSbiService(sbiStore) sbiService := NewSbiService(sbiStore, eventService)
deviceService := NewDeviceService(deviceStore, sbiService) deviceService := NewDeviceService(deviceStore, sbiService, eventService)
pnd := &pndImplementation{ pnd := &pndImplementation{
Name: "test-remove-sbi", Name: "test-remove-sbi",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment