diff --git a/.gitignore b/.gitignore index 5fa2ffa0ffd521c3989ebc87ca0a4b36679083d2..00dfa3bc67467b7267c2815c8f6645be0759b6bf 100644 --- a/.gitignore +++ b/.gitignore @@ -30,6 +30,7 @@ controller/config/*_test.toml controller/configs/ci-testing-gosdn.toml controller/stores_testing controller/stores/** +controller/cmd/gosdn/stores/ controller/plugins controller/config/.gosdnc.toml controller/debug.test diff --git a/controller/api/initialise_test.go b/controller/api/initialise_test.go index 21fc210175207bc3ccd3a766c040aada357f9184..46726014e7acdd5493f6de98a6caea0e3b71f714 100644 --- a/controller/api/initialise_test.go +++ b/controller/api/initialise_test.go @@ -13,6 +13,7 @@ import ( apb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/rbac" tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport" "code.fbi.h-da.de/danet/gosdn/controller/config" + eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/device" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/rbac" @@ -81,10 +82,12 @@ func bootstrapUnitTest() { log.Fatal(err) } + eventService := eventservice.NewMockEventService() + pndStore = nucleus.NewMemoryPndStore() sbiStore = nucleus.NewMemorySbiStore() - userService = rbacImpl.NewUserService(rbacImpl.NewMemoryUserStore()) - roleService = rbacImpl.NewRoleService(rbacImpl.NewMemoryRoleStore()) + userService = rbacImpl.NewUserService(rbacImpl.NewMemoryUserStore(), eventService) + roleService = rbacImpl.NewRoleService(rbacImpl.NewMemoryRoleStore(), eventService) clearAndCreateAuthTestSetup() previousHostname := "previousHostname" diff --git a/controller/config/config.go b/controller/config/config.go index 059821cf767b683eeb40d7c7054ee2307ccead50..dded01cadf31958c52211ffb211c6c19aab5d9dc 100644 --- a/controller/config/config.go +++ b/controller/config/config.go @@ -21,6 +21,13 @@ const ( jwtDurationKey = "defaultJWTDuration" defaultJWTDuration = time.Hour * 24 jwtSecretKey = "jwtSecret" + + // RabbitMQ Broker + amqpPrefixKey = "amqpPrefix" + amqpUserKey = "amqpUser" + amqpPasswordKey = "amqpPassword" + amqpHostKey = "amqpHost" + amqpPortKey = "amqpPort" ) // BasePndUUID is an uuid for the base PND @@ -50,6 +57,21 @@ var JWTDuration time.Duration // JWTSecret determines the scret that is used to sign tokens var JWTSecret string +// AMQPPrefix is the amqp prefix +var AMQPPrefix string + +// AMQPUser is the amqp user +var AMQPUser string + +// AMQPPassword is the amqp user password +var AMQPPassword string + +// AMQPHost is the amqp host +var AMQPHost string + +// AMQPPort is the amqp port +var AMQPPort string + // Init gets called on module import func Init() { err := InitializeConfig() @@ -103,6 +125,8 @@ func InitializeConfig() error { JWTSecret = viper.GetString(jwtSecretKey) + loadAMQPConfig() + if err := viper.WriteConfig(); err != nil { return err } @@ -171,3 +195,11 @@ func getDurationFromViper(viperKey string, unit time.Duration) (time.Duration, e return 0, viper.ConfigParseError{} } + +func loadAMQPConfig() { + AMQPPrefix = getStringFromViper(amqpPrefixKey) + AMQPUser = getStringFromViper(amqpUserKey) + AMQPPassword = getStringFromViper(amqpPasswordKey) + AMQPHost = getStringFromViper(amqpHostKey) + AMQPPort = getStringFromViper(amqpPortKey) +} diff --git a/controller/configs/containerlab-gosdn.toml.example b/controller/configs/containerlab-gosdn.toml.example index 2c5e7f28d8a88179317e17d4528afc108a5eed45..3fffe360b55fc3f9d7909890e1a2bd844d05dc35 100644 --- a/controller/configs/containerlab-gosdn.toml.example +++ b/controller/configs/containerlab-gosdn.toml.example @@ -12,3 +12,8 @@ socket = ":55055" databaseConnection = "mongodb://root:example@clab-gosdn_csbi_arista_base-mongodb:27017" filesystemPathToStores = "stores" +amqpPrefix = "amqp://" +amqpUser = "guest" +amqpPassword = "guest" +amqpHost = "localhost" +amqpPort = "5672" diff --git a/controller/configs/development-gosdn.toml.example b/controller/configs/development-gosdn.toml.example index 6ae6a6a9450b2c2637cbec9366b0b8e8bcdfa9d1..b1cf52c3a1548b582a5ff48a6d836c51880c3bb9 100644 --- a/controller/configs/development-gosdn.toml.example +++ b/controller/configs/development-gosdn.toml.example @@ -12,3 +12,9 @@ socket = ":55055" databaseConnection = "mongodb://root:example@localhost:27017" filesystemPathToStores = "stores" + +amqpPrefix = "amqp://" +amqpUser = "guest" +amqpPassword = "guest" +amqpHost = "localhost" +amqpPort = "5672" diff --git a/controller/controller.go b/controller/controller.go index cb853c45d6b94fc97552340672830dfad5706de3..a1a8333c6a01fc30d5709ca918d05114423cbe0b 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -25,6 +25,7 @@ import ( apb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/rbac" spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound" "code.fbi.h-da.de/danet/gosdn/controller/config" + eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/device" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/rbac" @@ -33,6 +34,8 @@ import ( rbacImpl "code.fbi.h-da.de/danet/gosdn/controller/rbac" "code.fbi.h-da.de/danet/gosdn/controller/store" + eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event" + "code.fbi.h-da.de/danet/gosdn/controller/nucleus" ) @@ -41,13 +44,14 @@ var coreOnce sync.Once // Core is the representation of the controller's core type Core struct { - pndStore networkdomain.PndStore - userService rbac.UserService - roleService rbac.RoleService - httpServer *http.Server - grpcServer *grpc.Server - nbi *nbi.NorthboundInterface - stopChan chan os.Signal + pndStore networkdomain.PndStore + userService rbac.UserService + roleService rbac.RoleService + httpServer *http.Server + grpcServer *grpc.Server + nbi *nbi.NorthboundInterface + eventService eventInterfaces.Service + stopChan chan os.Signal csbiClient cpb.CsbiServiceClient } @@ -61,11 +65,17 @@ func initialize() error { return err } + eventService, err := eventservice.NewEventService() + if err != nil { + return err + } + c = &Core{ - pndStore: nucleus.NewPndStore(), - userService: rbacImpl.NewUserService(rbacImpl.NewUserStore()), - roleService: rbacImpl.NewRoleService(rbacImpl.NewRoleStore()), - stopChan: make(chan os.Signal, 1), + pndStore: nucleus.NewPndStore(), + userService: rbacImpl.NewUserService(rbacImpl.NewUserStore(), eventService), + roleService: rbacImpl.NewRoleService(rbacImpl.NewRoleStore(), eventService), + eventService: eventService, + stopChan: make(chan os.Signal, 1), } // Setting up signal capturing @@ -265,6 +275,7 @@ func shutdown() error { coreLock.Lock() defer coreLock.Unlock() c.grpcServer.GracefulStop() + c.eventService.CloseConnection() return stopHttpServer() } diff --git a/controller/event/event.go b/controller/event/event.go new file mode 100644 index 0000000000000000000000000000000000000000..4f9aaef8db2bc5a702fac335c62d4bb44030acd4 --- /dev/null +++ b/controller/event/event.go @@ -0,0 +1,48 @@ +package event + +import "github.com/google/uuid" + +// Event is a event that can be published via the event service as payload. +type Event struct { + ID uuid.UUID `json:"id,omitempty"` + EntityID uuid.UUID `json:"entity_id,omitempty"` + Type string `json:"type,omitempty"` +} + +const ( + // TypeAdd is an add event. + TypeAdd = "add" + + // TypeUpdate is an update event. + TypeUpdate = "update" + + // TypeDelete is a delete event. + TypeDelete = "delete" +) + +// NewAddEvent creates a new add event. +func NewAddEvent(entityID uuid.UUID) Event { + return Event{ + ID: uuid.New(), + EntityID: entityID, + Type: TypeAdd, + } +} + +// NewDeleteEvent creates a new delete event. +func NewDeleteEvent(entityID uuid.UUID) Event { + return Event{ + ID: uuid.New(), + EntityID: entityID, + Type: TypeDelete, + } +} + +// NewUpdateEvent creates a new update event. +func NewUpdateEvent(entityID uuid.UUID) Event { + return Event{ + ID: uuid.New(), + EntityID: entityID, + Type: TypeUpdate, + } +} diff --git a/controller/eventService/Service.go b/controller/eventService/Service.go new file mode 100644 index 0000000000000000000000000000000000000000..5c13102d4926f0246cf2c250fe009fa72de3ca9b --- /dev/null +++ b/controller/eventService/Service.go @@ -0,0 +1,82 @@ +package eventservice + +import ( + "encoding/json" + + "code.fbi.h-da.de/danet/gosdn/controller/config" + "code.fbi.h-da.de/danet/gosdn/controller/event" + "code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors" + + interfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event" + + amqp "github.com/rabbitmq/amqp091-go" +) + +// EventService is used to setup a connection to a broker and publish events to topics. +type EventService struct { + connection *amqp.Connection + channel *amqp.Channel +} + +// NewEventService creates a new connection to the broker and opens a channel for later usage. +func NewEventService() (interfaces.Service, error) { + // TODO: This is an fugly hack to mitigate that some tests are trying to connect to an actual broker. (staester) + if config.AMQPPrefix == "" { + return NewMockEventService(), nil + } + + conn, err := amqp.Dial(amqpURIBuilder(config.AMQPPrefix, config.AMQPUser, config.AMQPPassword, config.AMQPHost, config.AMQPPort)) + if err != nil { + return nil, &errors.ErrAMQPInitFail{Action: "failed to connect to RabbitMQ", Err: err} + } + + ch, err := conn.Channel() + if err != nil { + return nil, &errors.ErrAMQPInitFail{Action: "failed to open a channel", Err: err} + } + + return &EventService{ + connection: conn, + channel: ch, + }, nil +} + +// PublishEvent declares a queue and publishes events. +func (e *EventService) PublishEvent(topic string, event event.Event) error { + q, err := e.channel.QueueDeclare( + topic, // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + if err != nil { + return &errors.ErrAMQPInitFail{Action: "failed declaring queue", Err: err} + } + + eventBody, err := json.Marshal(event) + if err != nil { + return &errors.ErrCouldNotMarshall{Identifier: topic + " " + event.EntityID.String(), Type: event.Type, Err: err} + } + + err = e.channel.Publish( + "", // exchange + q.Name, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: eventBody, + }) + if err != nil { + return &errors.ErrAMQPMessageFail{Action: "failed to publish message", Err: err} + } + + return nil +} + +// CloseConnection closes an exisiting connection. +func (e *EventService) CloseConnection() { + e.connection.Close() +} diff --git a/controller/eventService/utils.go b/controller/eventService/utils.go new file mode 100644 index 0000000000000000000000000000000000000000..891142fef2e7bf809daca4885fb58921d4e4f9e3 --- /dev/null +++ b/controller/eventService/utils.go @@ -0,0 +1,35 @@ +package eventservice + +import ( + "fmt" + + "code.fbi.h-da.de/danet/gosdn/controller/event" + interfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event" +) + +func amqpURIBuilder(prefix, user, pass, host, port string) string { + return fmt.Sprintf("%s%s:%s@%s:%s/", prefix, user, pass, host, port) +} + +// MockEventService is used to setup a connection to a broker and publish events to topics. +type MockEventService struct { + Queue map[string][]event.Event +} + +// NewMockEventService creates a new connection to the broker and opens a channel for later usage. +func NewMockEventService() interfaces.Service { + return &MockEventService{ + Queue: make(map[string][]event.Event), + } +} + +// PublishEvent declares a queue and publishes events. +func (e *MockEventService) PublishEvent(topic string, event event.Event) error { + e.Queue[topic] = append(e.Queue[topic], event) + + return nil +} + +// CloseConnection closes an exisiting connection. +func (e *MockEventService) CloseConnection() { +} diff --git a/controller/interfaces/event/service.go b/controller/interfaces/event/service.go new file mode 100644 index 0000000000000000000000000000000000000000..49cf8f6bc09711ecbfbd382b703a156bed675c6c --- /dev/null +++ b/controller/interfaces/event/service.go @@ -0,0 +1,9 @@ +package event + +import "code.fbi.h-da.de/danet/gosdn/controller/event" + +// Service is the event service +type Service interface { + PublishEvent(topic string, event event.Event) error + CloseConnection() +} diff --git a/controller/northbound/server/auth_interceptor_test.go b/controller/northbound/server/auth_interceptor_test.go index 0b4bb3757e4e00fbb5694ea1ccf0b90bd6a0e3d2..a1863957682e90d9068aee629233ea54e67f2bf7 100644 --- a/controller/northbound/server/auth_interceptor_test.go +++ b/controller/northbound/server/auth_interceptor_test.go @@ -9,6 +9,7 @@ import ( apb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/rbac" spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound" + eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService" "code.fbi.h-da.de/danet/gosdn/controller/nucleus" "code.fbi.h-da.de/danet/gosdn/controller/rbac" "google.golang.org/grpc" @@ -20,11 +21,13 @@ import ( func getTestAuthInterceptorServer(t *testing.T) (*AuthInterceptor, *User, *Role, *SbiServer) { initUUIDs(t) jwtManager := rbac.NewJWTManager("test", time.Minute) + eventService := eventservice.NewMockEventService() + userStore := rbac.NewMemoryUserStore() - userService := rbac.NewUserService(userStore) + userService := rbac.NewUserService(userStore, eventService) roleStore := rbac.NewMemoryRoleStore() - roleService := rbac.NewRoleService(roleStore) + roleService := rbac.NewRoleService(roleStore, eventService) mockPnd := getMockPnd(t) diff --git a/controller/northbound/server/auth_test.go b/controller/northbound/server/auth_test.go index fa3e0d9ef891be7ff9d4f961b7501fb0d3c931c6..8d916d80c92cc0e8eacc8149cf9d1bcbedcb4d82 100644 --- a/controller/northbound/server/auth_test.go +++ b/controller/northbound/server/auth_test.go @@ -7,18 +7,20 @@ import ( "time" apb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/rbac" + eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService" "code.fbi.h-da.de/danet/gosdn/controller/rbac" "google.golang.org/grpc/metadata" ) func getTestAuthServer(t *testing.T) *Auth { jwtManager := rbac.NewJWTManager("test", time.Minute) + eventService := eventservice.NewMockEventService() userStore := rbac.NewMemoryUserStore() - userService := rbac.NewUserService(userStore) + userService := rbac.NewUserService(userStore, eventService) roleStore := rbac.NewMemoryRoleStore() - roleService := rbac.NewRoleService(roleStore) + roleService := rbac.NewRoleService(roleStore, eventService) s := NewAuthServer(jwtManager, userService) err := clearAndCreateAuthTestSetup(s.userService, roleService) diff --git a/controller/northbound/server/role_test.go b/controller/northbound/server/role_test.go index 75582e3c2e43f099168963c92db538e74e7880ae..b9d0a62bbd992f585e08197930c61573eb440a0f 100644 --- a/controller/northbound/server/role_test.go +++ b/controller/northbound/server/role_test.go @@ -9,16 +9,19 @@ import ( apb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/rbac" "code.fbi.h-da.de/danet/gosdn/controller/rbac" "github.com/google/uuid" + + eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService" ) func getTestRoleServer(t *testing.T) *Role { jwtManager := rbac.NewJWTManager("test", time.Second) + eventService := eventservice.NewMockEventService() userStore := rbac.NewMemoryUserStore() - userService := rbac.NewUserService(userStore) + userService := rbac.NewUserService(userStore, eventService) roleStore := rbac.NewMemoryRoleStore() - roleService := rbac.NewRoleService(roleStore) + roleService := rbac.NewRoleService(roleStore, eventService) s := NewRoleServer(jwtManager, roleService) err := clearAndCreateAuthTestSetup(userService, roleService) diff --git a/controller/northbound/server/user_test.go b/controller/northbound/server/user_test.go index eb8f27fb3d06fcfe849ea0f5be104cdc19788232..83a9c081390a16e961aa7c6b7185f4e0c9de07bd 100644 --- a/controller/northbound/server/user_test.go +++ b/controller/northbound/server/user_test.go @@ -7,18 +7,20 @@ import ( "time" apb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/rbac" + eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService" "code.fbi.h-da.de/danet/gosdn/controller/rbac" "github.com/google/uuid" ) func getTestUserServer(t *testing.T) *User { jwtManager := rbac.NewJWTManager("test", time.Second) + eventService := eventservice.NewMockEventService() userStore := rbac.NewMemoryUserStore() - userService := rbac.NewUserService(userStore) + userService := rbac.NewUserService(userStore, eventService) roleStore := rbac.NewMemoryRoleStore() - roleService := rbac.NewRoleService(roleStore) + roleService := rbac.NewRoleService(roleStore, eventService) s := NewUserServer(jwtManager, userService) err := clearAndCreateAuthTestSetup(s.userService, roleService) diff --git a/controller/nucleus/deviceService.go b/controller/nucleus/deviceService.go index c565274e85199fa1e0a383b395d7c7bd8f75cce4..618963bd7fff2988f169498975e029ffc03ec402 100644 --- a/controller/nucleus/deviceService.go +++ b/controller/nucleus/deviceService.go @@ -4,7 +4,9 @@ import ( "fmt" spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound" + "code.fbi.h-da.de/danet/gosdn/controller/event" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/device" + eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound" "code.fbi.h-da.de/danet/gosdn/controller/store" "github.com/google/uuid" @@ -13,18 +15,29 @@ import ( tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport" ) +const ( + // DeviceEventTopic is the used topic for device related entity changes. + DeviceEventTopic = "device" +) + // DeviceService provides a device service implementation. // This services provides abstraction between the user (e.g a PND) and the matching store (e.g. deviceStore) type DeviceService struct { - deviceStore device.Store - sbiService southbound.Service + deviceStore device.Store + sbiService southbound.Service + eventService eventInterfaces.Service } // NewDeviceService creates a device service. -func NewDeviceService(deviceStore device.Store, sbiService southbound.Service) device.Service { +func NewDeviceService( + deviceStore device.Store, + sbiService southbound.Service, + eventService eventInterfaces.Service, +) device.Service { return &DeviceService{ - deviceStore: deviceStore, - sbiService: sbiService, + deviceStore: deviceStore, + sbiService: sbiService, + eventService: eventService, } } @@ -71,6 +84,8 @@ func (s *DeviceService) Add(deviceToAdd device.Device) error { return err } + s.eventService.PublishEvent(DeviceEventTopic, event.NewAddEvent(deviceToAdd.ID())) + return nil } @@ -81,6 +96,8 @@ func (s *DeviceService) Update(deviceToUpdate device.Device) error { return err } + s.eventService.PublishEvent(DeviceEventTopic, event.NewUpdateEvent(deviceToUpdate.ID())) + return nil } @@ -96,6 +113,9 @@ func (s *DeviceService) Delete(deviceToDelete device.Device) error { return err } } + + s.eventService.PublishEvent(DeviceEventTopic, event.NewDeleteEvent(deviceToDelete.ID())) + return nil } diff --git a/controller/nucleus/deviceService_test.go b/controller/nucleus/deviceService_test.go index b4ee228dd0d2475a84f47077fb9ad6627a95e5b5..0364bdf6601ec8229ddea8fe5c808043970a67fc 100644 --- a/controller/nucleus/deviceService_test.go +++ b/controller/nucleus/deviceService_test.go @@ -3,6 +3,7 @@ package nucleus import ( "testing" + eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/device" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound" "code.fbi.h-da.de/danet/gosdn/controller/mocks" @@ -22,12 +23,14 @@ func getMockDevice(deviceID uuid.UUID, sbi southbound.SouthboundInterface) devic } func getDeviceTestStores(t *testing.T, deviceID uuid.UUID) (device.Service, southbound.Service, device.Device, southbound.SouthboundInterface) { + eventService := eventservice.NewMockEventService() sbiStore := NewMemorySbiStore() deviceStore := NewMemoryDeviceStore() - sbiService := NewSbiService(sbiStore) + sbiService := NewSbiService(sbiStore, eventService) deviceService := NewDeviceService( deviceStore, sbiService, + eventService, ) sbi, err := NewSBI(spb.Type_TYPE_OPENCONFIG) diff --git a/controller/nucleus/errors/errors.go b/controller/nucleus/errors/errors.go index db262595f4543e6c17108dc3953760a0b1b37d16..d155b6950024a187d6b97a467c3f282acc627de0 100644 --- a/controller/nucleus/errors/errors.go +++ b/controller/nucleus/errors/errors.go @@ -229,3 +229,25 @@ type ErrNoNewChanges struct { func (e ErrNoNewChanges) Error() string { return fmt.Sprintf("There are no changes between %v and %v", e.Original, e.Modified) } + +// ErrAMQPInitFail implements the Error interface and is called if there is any issue related to +// the setup of the event management. +type ErrAMQPInitFail struct { + Action string + Err error +} + +func (e ErrAMQPInitFail) Error() string { + return fmt.Sprintf("Action: %s, Internal error: %v", e.Action, e.Err) +} + +// ErrAMQPMessageFail implements the Error interface and is called if there is any issue with sending +// or receiving messages. +type ErrAMQPMessageFail struct { + Action string + Err error +} + +func (e ErrAMQPMessageFail) Error() string { + return fmt.Sprintf("Action: %s, Internal error: %v", e.Action, e.Err) +} diff --git a/controller/nucleus/initialise_test.go b/controller/nucleus/initialise_test.go index 958991f27435204cc0f977126d6195929f6e4fca..98bbe72e9ffec75644f03e9ebf3ae2061146bdd6 100644 --- a/controller/nucleus/initialise_test.go +++ b/controller/nucleus/initialise_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/device" "code.fbi.h-da.de/danet/gosdn/controller/store" @@ -153,12 +154,15 @@ func mockDevice() device.Device { } func newPnd() pndImplementation { + eventService := eventservice.NewMockEventService() + sbiStore := NewMemorySbiStore() deviceStore := NewMemoryDeviceStore() - sbiService := NewSbiService(sbiStore) + sbiService := NewSbiService(sbiStore, eventService) deviceService := NewDeviceService( deviceStore, sbiService, + eventService, ) return pndImplementation{ diff --git a/controller/nucleus/principalNetworkDomain.go b/controller/nucleus/principalNetworkDomain.go index ad53c9694b1c85e87f9702cbaed130e2edd278c4..21dff969692182794f922c4af510fcf9494c02af 100644 --- a/controller/nucleus/principalNetworkDomain.go +++ b/controller/nucleus/principalNetworkDomain.go @@ -20,11 +20,13 @@ import ( ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd" spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound" tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport" + eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService" "google.golang.org/grpc" "google.golang.org/protobuf/proto" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/change" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/device" + eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound" "code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors" @@ -52,14 +54,20 @@ func NewPND( c cpb.CsbiServiceClient, callback func(uuid.UUID, chan device.Details), ) (networkdomain.NetworkDomain, error) { + eventService, err := eventservice.NewEventService() + if err != nil { + return nil, err + } + sbiStore := NewSbiStore(id) deviceStore := NewDeviceStore(id) changeStore := store.NewChangeStore() - sbiService := NewSbiService(sbiStore) + sbiService := NewSbiService(sbiStore, eventService) deviceService := NewDeviceService( deviceStore, sbiService, + eventService, ) changeStore, ok := changeStoreMap[id] @@ -76,8 +84,9 @@ func NewPND( changes: changeStore, Id: id, - csbiClient: c, - callback: callback, + csbiClient: c, + callback: callback, + eventService: eventService, } existingSBIs, err := sbiStore.GetAll() @@ -105,8 +114,9 @@ type pndImplementation struct { //nolint Id uuid.UUID `json:"id,omitempty"` - csbiClient cpb.CsbiServiceClient - callback func(uuid.UUID, chan device.Details) + csbiClient cpb.CsbiServiceClient + callback func(uuid.UUID, chan device.Details) + eventService eventInterfaces.Service } func (pnd *pndImplementation) PendingChanges() []uuid.UUID { diff --git a/controller/nucleus/principalNetworkDomain_test.go b/controller/nucleus/principalNetworkDomain_test.go index ac28816d94daad4aaa19d070a72e79fdfb73af71..0cd9c3b14a2eb3e9089cf672d53300d0dcd3e0d1 100644 --- a/controller/nucleus/principalNetworkDomain_test.go +++ b/controller/nucleus/principalNetworkDomain_test.go @@ -10,6 +10,7 @@ import ( ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd" spb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/southbound" tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport" + eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/device" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound" @@ -366,10 +367,11 @@ func Test_pndImplementation_RemoveSbi(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + eventService := eventservice.NewMockEventService() sbiStore := NewMemorySbiStore() deviceStore := NewMemoryDeviceStore() - sbiService := NewSbiService(sbiStore) - deviceService := NewDeviceService(deviceStore, sbiService) + sbiService := NewSbiService(sbiStore, eventService) + deviceService := NewDeviceService(deviceStore, sbiService, eventService) pnd := &pndImplementation{ Name: "test-remove-sbi", @@ -431,10 +433,11 @@ func Test_pndImplementation_RemoveSbiWithAssociatedDevices(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + eventService := eventservice.NewMockEventService() sbiStore := NewMemorySbiStore() deviceStore := NewMemoryDeviceStore() - sbiService := NewSbiService(sbiStore) - deviceService := NewDeviceService(deviceStore, sbiService) + sbiService := NewSbiService(sbiStore, eventService) + deviceService := NewDeviceService(deviceStore, sbiService, eventService) pnd := &pndImplementation{ Name: "test-remove-sbi", diff --git a/controller/nucleus/sbiService.go b/controller/nucleus/sbiService.go index f00247f790a23add96cc00b0abc34008fe184326..c52a8f882138141c129aff11ae97d41fc9f84285 100644 --- a/controller/nucleus/sbiService.go +++ b/controller/nucleus/sbiService.go @@ -3,20 +3,29 @@ package nucleus import ( "os" + "code.fbi.h-da.de/danet/gosdn/controller/event" + eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound" "code.fbi.h-da.de/danet/gosdn/controller/store" "github.com/google/uuid" ) +const ( + // SbiEventTopic is the used topic for sbi related entity changes. + SbiEventTopic = "sbi" +) + // SbiService provides a sbi service implementation. type SbiService struct { - sbiStore southbound.Store + sbiStore southbound.Store + eventService eventInterfaces.Service } // NewSbiService creates a sbi service. -func NewSbiService(sbiStore southbound.Store) southbound.Service { +func NewSbiService(sbiStore southbound.Store, eventService eventInterfaces.Service) southbound.Service { return &SbiService{ - sbiStore: sbiStore, + sbiStore: sbiStore, + eventService: eventService, } } @@ -63,6 +72,8 @@ func (s *SbiService) Add(sbiToAdd southbound.SouthboundInterface) error { return err } + s.eventService.PublishEvent(SbiEventTopic, event.NewAddEvent(sbiToAdd.ID())) + return nil } @@ -80,6 +91,8 @@ func (s *SbiService) Delete(sbiToDelete southbound.SouthboundInterface) error { } } + s.eventService.PublishEvent(SbiEventTopic, event.NewDeleteEvent(sbiToDelete.ID())) + return nil } diff --git a/controller/nucleus/sbiService_test.go b/controller/nucleus/sbiService_test.go index d26d6d72e1096326926d1033594645e14f09f603..13436109465a1b69bad2ee602b35c79970dc263f 100644 --- a/controller/nucleus/sbiService_test.go +++ b/controller/nucleus/sbiService_test.go @@ -3,6 +3,7 @@ package nucleus import ( "testing" + eventservice "code.fbi.h-da.de/danet/gosdn/controller/eventService" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound" "code.fbi.h-da.de/danet/gosdn/controller/store" "github.com/google/uuid" @@ -13,8 +14,9 @@ func getMockSbi(sbiID uuid.UUID) southbound.SouthboundInterface { } func getSbiTestStores(t *testing.T, sbiID uuid.UUID) (southbound.Service, southbound.SouthboundInterface) { + eventService := eventservice.NewMockEventService() sbiStore := NewMemorySbiStore() - sbiService := NewSbiService(sbiStore) + sbiService := NewSbiService(sbiStore, eventService) mockSbi := getMockSbi(sbiID) diff --git a/controller/rbac/rbacService.go b/controller/rbac/rbacService.go index 1cbb6f94d25638e0ab337ed06c8eeee1c60b5b26..72f9f9127563757ae4ff1c102a6cb72dd6b0bae9 100644 --- a/controller/rbac/rbacService.go +++ b/controller/rbac/rbacService.go @@ -1,21 +1,35 @@ package rbac import ( + "code.fbi.h-da.de/danet/gosdn/controller/event" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/rbac" "code.fbi.h-da.de/danet/gosdn/controller/store" "github.com/google/uuid" + + eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event" +) + +const ( + // UserEventTopic is the used topic for user related entity changes. + UserEventTopic = "user" + // RoleEventTopic is the used topic for role related entity changes. + RoleEventTopic = "role" ) -//UserService provides a user service implementation. +// UserService provides a user service implementation. type UserService struct { - userStore rbac.UserStore + userStore rbac.UserStore + eventService eventInterfaces.Service } // NewUserService creates a user service. -func NewUserService(userStore rbac.UserStore) rbac.UserService { - return &UserService{ - userStore: userStore, +func NewUserService(userStore rbac.UserStore, eventService eventInterfaces.Service) rbac.UserService { + userService := &UserService{ + userStore: userStore, + eventService: eventService, } + + return userService } // Add adds a user to the user store. @@ -25,6 +39,8 @@ func (s *UserService) Add(userToAdd rbac.User) error { return err } + s.eventService.PublishEvent(UserEventTopic, event.NewAddEvent(userToAdd.ID())) + return nil } @@ -35,6 +51,8 @@ func (s *UserService) Delete(userToDelete rbac.User) error { return err } + s.eventService.PublishEvent(UserEventTopic, event.NewDeleteEvent(userToDelete.ID())) + return nil } @@ -45,6 +63,8 @@ func (s *UserService) Update(userToUpdate rbac.User) error { return err } + s.eventService.PublishEvent(UserEventTopic, event.NewUpdateEvent(userToUpdate.ID())) + return nil } @@ -78,15 +98,17 @@ func (s *UserService) createUserFromStore(loadedUser rbac.LoadedUser) rbac.User return NewUser(uuid.MustParse(loadedUser.ID), loadedUser.UserName, loadedUser.Roles, loadedUser.Password, loadedUser.Token, loadedUser.Salt) } -//RoleService provides a role service implementation. +// RoleService provides a role service implementation. type RoleService struct { - roleStore rbac.RoleStore + roleStore rbac.RoleStore + eventService eventInterfaces.Service } // NewRoleService creates a role service. -func NewRoleService(roleStore rbac.RoleStore) rbac.RoleService { +func NewRoleService(roleStore rbac.RoleStore, eventService eventInterfaces.Service) rbac.RoleService { return &RoleService{ - roleStore: roleStore, + roleStore: roleStore, + eventService: eventService, } } @@ -97,6 +119,8 @@ func (s *RoleService) Add(roleToAdd rbac.Role) error { return err } + s.eventService.PublishEvent(RoleEventTopic, event.NewAddEvent(roleToAdd.ID())) + return nil } @@ -107,6 +131,8 @@ func (s *RoleService) Delete(roleToDelete rbac.Role) error { return err } + s.eventService.PublishEvent(RoleEventTopic, event.NewDeleteEvent(roleToDelete.ID())) + return nil } @@ -117,6 +143,8 @@ func (s *RoleService) Update(roleToUpdate rbac.Role) error { return err } + s.eventService.PublishEvent(RoleEventTopic, event.NewUpdateEvent(roleToUpdate.ID())) + return nil } diff --git a/docker-compose.yml b/docker-compose.yml index 1dedf6ff2bc85191d4165e89297a075759baf707..57be1632040f72bf1bd6982e446cae067c423109 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -16,3 +16,10 @@ services: environment: ME_CONFIG_MONGODB_ADMINUSERNAME: root ME_CONFIG_MONGODB_ADMINPASSWORD: example + + + rabbitmq: + image: rabbitmq:3-management + ports: + - 127.0.0.1:5672:5672 + - 127.0.0.1:15672:15672 diff --git a/go.mod b/go.mod index 0b0117e5f1f7ede6ddca58d2840cb053b961eac1..0686446d582f931ff515eb83301c4b85c6ee9c76 100644 --- a/go.mod +++ b/go.mod @@ -89,6 +89,7 @@ require ( github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.30.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect + github.com/rabbitmq/amqp091-go v1.3.4 github.com/spf13/afero v1.8.2 // indirect github.com/spf13/cast v1.4.1 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect @@ -98,7 +99,7 @@ require ( github.com/xdg-go/stringprep v1.0.2 // indirect github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect go.opencensus.io v0.23.0 // indirect - golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect + golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4 golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 // indirect golang.org/x/text v0.3.7 // indirect diff --git a/go.sum b/go.sum index 770a1e02db29e35de2b863a66ff3546716d7da91..593d67fdefb443b18522fe10c1dacd3b0ccb5048 100644 --- a/go.sum +++ b/go.sum @@ -837,6 +837,8 @@ github.com/pterm/pterm v0.12.36/go.mod h1:NjiL09hFhT/vWjQHSj1athJpx6H8cjpHXNAK5b github.com/pterm/pterm v0.12.40/go.mod h1:ffwPLwlbXxP+rxT0GsgDTzS3y3rmpAO1NMjUkGTYf8s= github.com/pterm/pterm v0.12.41 h1:e2BRfFo1H9nL8GY0S3ImbZqfZ/YimOk9XtkhoobKJVs= github.com/pterm/pterm v0.12.41/go.mod h1:LW/G4J2A42XlTaPTAGRPvbBfF4UXvHWhC6SN7ueU4jU= +github.com/rabbitmq/amqp091-go v1.3.4 h1:tXuIslN1nhDqs2t6Jrz3BAoqvt4qIZzxvdbdcxWtHYU= +github.com/rabbitmq/amqp091-go v1.3.4/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=