diff --git a/Makefile b/Makefile index b21e5071db4cc659e76265740e152e1ebd7516ec..decc376be166c9160a066918ede5eb266528d3fc 100644 --- a/Makefile +++ b/Makefile @@ -52,7 +52,7 @@ generate-csbi-yang-models: install-tools ../../$(TOOLS_DIR)/go-ygot-generator-generator config.yaml gostructs.go &&\ go generate -build: pre build-gosdn build-gosdnc build-orchestrator +build: pre build-gosdn build-gosdnc build-orchestrator build-example-app build-gosdn: pre $(GOBUILD) -trimpath -o $(BUILD_ARTIFACTS_PATH)/gosdn ./controller/cmd/gosdn @@ -63,6 +63,9 @@ build-gosdnc: pre build-orchestrator: pre CGO_ENABLED=0 $(GOBUILD) -o $(BUILD_ARTIFACTS_PATH)/orchestrator ./csbi/cmd/csbi/ +build-example-app: pre + $(GOBUILD) -trimpath -o $(BUILD_ARTIFACTS_PATH)/example-application ./applications/example-application + containerize-all: containerize-gosdn containerize-gosdnc containerize-orchestrator containerize-target containerize-gosdn: diff --git a/application-framework/event/event.go b/application-framework/event/event.go new file mode 100644 index 0000000000000000000000000000000000000000..abce5b0d6ac804bb41e6ae8021c19726ad80c582 --- /dev/null +++ b/application-framework/event/event.go @@ -0,0 +1,24 @@ +package event + +import ( + "encoding/json" + + "github.com/google/uuid" +) + +// Event is a event that can be published via the event service as payload. +type Event struct { + ID uuid.UUID `json:"id,omitempty"` + EntityID uuid.UUID `json:"entity_id,omitempty"` + Type string `json:"type,omitempty"` +} + +func getEventFromPayload(payload []byte) (*Event, error) { + event := &Event{} + err := json.Unmarshal(payload, event) + if err != nil { + return nil, err + } + + return event, nil +} diff --git a/application-framework/event/eventService.go b/application-framework/event/eventService.go new file mode 100644 index 0000000000000000000000000000000000000000..0bd15585439641fba9d096eae75f50076b0b2cc4 --- /dev/null +++ b/application-framework/event/eventService.go @@ -0,0 +1,127 @@ +package event + +import ( + "fmt" + "os" + + "code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors" + amqp "github.com/rabbitmq/amqp091-go" +) + +type ( + eventCallbackFunc func(*Event) +) + +// AmqpURIBuilder builds a rabbitMQ connection string +func AmqpURIBuilder(prefix, user, pass, host, port string) string { + return fmt.Sprintf("%s%s:%s@%s:%s/", prefix, user, pass, host, port) +} + +// ServiceInterface defines an event service +type ServiceInterface interface { + SubscribeToTopic(string, eventCallbackFunc) + SetupEventReciever(stopChan chan os.Signal) + CloseConnection() +} + +// Service is used to setup a connection to a broker and publish events to topics. +type Service struct { + connection *amqp.Connection + channel *amqp.Channel + subscriber map[string]eventCallbackFunc + topics []string +} + +// NewEventService creates a new connection to the broker and opens a channel for later usage. +func NewEventService(brokerConnectionString string, topics []string) (ServiceInterface, error) { + conn, err := amqp.Dial(brokerConnectionString) + if err != nil { + return nil, &errors.ErrAMQPInitFail{Action: "failed to connect to RabbitMQ", Err: err} + } + + ch, err := conn.Channel() + if err != nil { + return nil, &errors.ErrAMQPInitFail{Action: "failed to open a channel", Err: err} + } + + return &Service{ + connection: conn, + channel: ch, + subscriber: map[string]eventCallbackFunc{}, + topics: topics, + }, nil +} + +// SubscribeToTopic subscribes a provided callback to an event +// TODO: Fix name +func (e *Service) SubscribeToTopic(eventName string, callback eventCallbackFunc) { + // TODO: Handle multiple subscribers per event + e.subscriber[eventName] = callback +} + +// SetupEventReciever will be removed in the future +func (e *Service) SetupEventReciever(stopChan chan os.Signal) { + for _, topic := range e.topics { + go e.setupQueueConsume(topic, stopChan) + } +} + +// setupQueueConsume sets up a connection to the broker, listens to messages on a specific topic +// and writes them into a channel. +func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error { + q, err := e.channel.QueueDeclare( + topic, // name + false, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + + if err != nil { + return &errors.ErrAMQPMessageFail{Action: "failed declaring queue", Err: err} + } + + msgs, err := e.channel.Consume( + q.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + + if err != nil { + return &errors.ErrAMQPInitFail{Action: "failed to register consumer", Err: err} + } + + var forever chan struct{} + + go func() { + for d := range msgs { + // fmt.Printf("Recieved message: %+v\n", string(d.Body)) + + event, err := getEventFromPayload(d.Body) + if err != nil { + fmt.Printf("Error: +%v\n", err) + } + + val, ok := e.subscriber[event.Type] + if ok { + fmt.Print("Executing Callback: ") + val(event) + fmt.Println() + } + } + }() + + <-forever + + return nil +} + +// CloseConnection closes an exisiting connection. +func (e *Service) CloseConnection() { + e.connection.Close() +} diff --git a/applications/example-application/app.go b/applications/example-application/app.go new file mode 100644 index 0000000000000000000000000000000000000000..895c38405e689ba15e4f07007f10d5c1dbdb258d --- /dev/null +++ b/applications/example-application/app.go @@ -0,0 +1,75 @@ +package main + +import ( + "context" + "fmt" + "os" + "os/signal" + "syscall" + "time" + + "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/rbac" + + "code.fbi.h-da.de/danet/gosdn/application-framework/event" + "google.golang.org/grpc" +) + +// Application is an example for a sdn application +type Application struct { + eventService event.ServiceInterface + stopChannel chan os.Signal + grpcClientConn *grpc.ClientConn +} + +// Run runs the application +func (a *Application) Run() { + signal.Notify(a.stopChannel, os.Interrupt, syscall.SIGTERM) + + a.eventService.SubscribeToTopic("add", a.callback) + a.eventService.SetupEventReciever(a.stopChannel) + + conn, err := grpc.Dial("localhost:55055", grpc.WithInsecure()) + if err != nil { + panic(err) + } + + a.grpcClientConn = conn + + var forever chan struct{} + + go func() { + for { + select { + case <-a.stopChannel: + close(forever) + a.grpcClientConn.Close() + return + } + } + }() + + <-forever +} + +func (a *Application) callback(event *event.Event) { + fmt.Printf("Event Callback: %+v", event) + + ctx := context.Background() + userService := rbac.NewUserServiceClient(a.grpcClientConn) + + request := &rbac.GetUserRequest{ + Timestamp: time.Now().UnixNano(), + Name: event.ID.String(), + } + + response, err := userService.GetUser(ctx, request) + if err != nil { + fmt.Printf("Error %+v\n ", err) + return + } + + fmt.Printf("ID: %v, Name: %v \n", response.User.Id, response.User.Name) + // for key, elem := range resp.User.Roles { + // log.Infof("Role on PND: %v %v \n", key, elem) + // } +} diff --git a/applications/example-application/main.go b/applications/example-application/main.go new file mode 100644 index 0000000000000000000000000000000000000000..2eb9ada5b2a7e352680e8a7fb5c41de295683843 --- /dev/null +++ b/applications/example-application/main.go @@ -0,0 +1,24 @@ +package main + +import ( + "os" + + "code.fbi.h-da.de/danet/gosdn/application-framework/event" +) + +func main() { + eventService, err := event.NewEventService( + event.AmqpURIBuilder("amqp://", "guest", "guest", "127.0.0.1", "5672"), + []string{"user"}, + ) + if err != nil { + panic(err) + } + + app := &Application{ + eventService: eventService, + stopChannel: make(chan os.Signal, 1), + } + + app.Run() +} diff --git a/docker-compose.yml b/docker-compose.yml index 57be1632040f72bf1bd6982e446cae067c423109..9a357af4f6e955f425679438e0f481fb299a3015 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -17,7 +17,6 @@ services: ME_CONFIG_MONGODB_ADMINUSERNAME: root ME_CONFIG_MONGODB_ADMINPASSWORD: example - rabbitmq: image: rabbitmq:3-management ports: