Skip to content
Snippets Groups Projects
Commit 7ca03889 authored by André Sterba's avatar André Sterba
Browse files

Init application example

parent ab0968c6
Branches
Tags
2 merge requests!376Add additional example application hostname-checker,!343Add basic application framework and example application to show interaction between events an NBI
Pipeline #107324 passed
...@@ -52,7 +52,7 @@ generate-csbi-yang-models: install-tools ...@@ -52,7 +52,7 @@ generate-csbi-yang-models: install-tools
../../$(TOOLS_DIR)/go-ygot-generator-generator config.yaml gostructs.go &&\ ../../$(TOOLS_DIR)/go-ygot-generator-generator config.yaml gostructs.go &&\
go generate go generate
build: pre build-gosdn build-gosdnc build-orchestrator build: pre build-gosdn build-gosdnc build-orchestrator build-example-app
build-gosdn: pre build-gosdn: pre
$(GOBUILD) -trimpath -o $(BUILD_ARTIFACTS_PATH)/gosdn ./controller/cmd/gosdn $(GOBUILD) -trimpath -o $(BUILD_ARTIFACTS_PATH)/gosdn ./controller/cmd/gosdn
...@@ -63,6 +63,9 @@ build-gosdnc: pre ...@@ -63,6 +63,9 @@ build-gosdnc: pre
build-orchestrator: pre build-orchestrator: pre
CGO_ENABLED=0 $(GOBUILD) -o $(BUILD_ARTIFACTS_PATH)/orchestrator ./csbi/cmd/csbi/ CGO_ENABLED=0 $(GOBUILD) -o $(BUILD_ARTIFACTS_PATH)/orchestrator ./csbi/cmd/csbi/
build-example-app: pre
$(GOBUILD) -trimpath -o $(BUILD_ARTIFACTS_PATH)/example-application ./applications/example-application
containerize-all: containerize-gosdn containerize-gosdnc containerize-orchestrator containerize-target containerize-all: containerize-gosdn containerize-gosdnc containerize-orchestrator containerize-target
containerize-gosdn: containerize-gosdn:
......
package event
import (
"encoding/json"
"github.com/google/uuid"
)
// Event is a event that can be published via the event service as payload.
type Event struct {
ID uuid.UUID `json:"id,omitempty"`
EntityID uuid.UUID `json:"entity_id,omitempty"`
Type string `json:"type,omitempty"`
}
func getEventFromPayload(payload []byte) (*Event, error) {
event := &Event{}
err := json.Unmarshal(payload, event)
if err != nil {
return nil, err
}
return event, nil
}
package event
import (
"fmt"
"os"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors"
amqp "github.com/rabbitmq/amqp091-go"
)
type (
eventCallbackFunc func(*Event)
)
// AmqpURIBuilder builds a rabbitMQ connection string
func AmqpURIBuilder(prefix, user, pass, host, port string) string {
return fmt.Sprintf("%s%s:%s@%s:%s/", prefix, user, pass, host, port)
}
// ServiceInterface defines an event service
type ServiceInterface interface {
SubscribeToTopic(string, eventCallbackFunc)
SetupEventReciever(stopChan chan os.Signal)
CloseConnection()
}
// Service is used to setup a connection to a broker and publish events to topics.
type Service struct {
connection *amqp.Connection
channel *amqp.Channel
subscriber map[string]eventCallbackFunc
topics []string
}
// NewEventService creates a new connection to the broker and opens a channel for later usage.
func NewEventService(brokerConnectionString string, topics []string) (ServiceInterface, error) {
conn, err := amqp.Dial(brokerConnectionString)
if err != nil {
return nil, &errors.ErrAMQPInitFail{Action: "failed to connect to RabbitMQ", Err: err}
}
ch, err := conn.Channel()
if err != nil {
return nil, &errors.ErrAMQPInitFail{Action: "failed to open a channel", Err: err}
}
return &Service{
connection: conn,
channel: ch,
subscriber: map[string]eventCallbackFunc{},
topics: topics,
}, nil
}
// SubscribeToTopic subscribes a provided callback to an event
// TODO: Fix name
func (e *Service) SubscribeToTopic(eventName string, callback eventCallbackFunc) {
// TODO: Handle multiple subscribers per event
e.subscriber[eventName] = callback
}
// SetupEventReciever will be removed in the future
func (e *Service) SetupEventReciever(stopChan chan os.Signal) {
for _, topic := range e.topics {
go e.setupQueueConsume(topic, stopChan)
}
}
// setupQueueConsume sets up a connection to the broker, listens to messages on a specific topic
// and writes them into a channel.
func (e *Service) setupQueueConsume(topic string, stopChan chan os.Signal) error {
q, err := e.channel.QueueDeclare(
topic, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return &errors.ErrAMQPMessageFail{Action: "failed declaring queue", Err: err}
}
msgs, err := e.channel.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return &errors.ErrAMQPInitFail{Action: "failed to register consumer", Err: err}
}
var forever chan struct{}
go func() {
for d := range msgs {
// fmt.Printf("Recieved message: %+v\n", string(d.Body))
event, err := getEventFromPayload(d.Body)
if err != nil {
fmt.Printf("Error: +%v\n", err)
}
val, ok := e.subscriber[event.Type]
if ok {
fmt.Print("Executing Callback: ")
val(event)
fmt.Println()
}
}
}()
<-forever
return nil
}
// CloseConnection closes an exisiting connection.
func (e *Service) CloseConnection() {
e.connection.Close()
}
package main
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"
"code.fbi.h-da.de/danet/gosdn/api/go/gosdn/rbac"
"code.fbi.h-da.de/danet/gosdn/application-framework/event"
"google.golang.org/grpc"
)
// Application is an example for a sdn application
type Application struct {
eventService event.ServiceInterface
stopChannel chan os.Signal
grpcClientConn *grpc.ClientConn
}
// Run runs the application
func (a *Application) Run() {
signal.Notify(a.stopChannel, os.Interrupt, syscall.SIGTERM)
a.eventService.SubscribeToTopic("add", a.callback)
a.eventService.SetupEventReciever(a.stopChannel)
conn, err := grpc.Dial("localhost:55055", grpc.WithInsecure())
if err != nil {
panic(err)
}
a.grpcClientConn = conn
var forever chan struct{}
go func() {
for {
select {
case <-a.stopChannel:
close(forever)
a.grpcClientConn.Close()
return
}
}
}()
<-forever
}
func (a *Application) callback(event *event.Event) {
fmt.Printf("Event Callback: %+v", event)
ctx := context.Background()
userService := rbac.NewUserServiceClient(a.grpcClientConn)
request := &rbac.GetUserRequest{
Timestamp: time.Now().UnixNano(),
Name: event.ID.String(),
}
response, err := userService.GetUser(ctx, request)
if err != nil {
fmt.Printf("Error %+v\n ", err)
return
}
fmt.Printf("ID: %v, Name: %v \n", response.User.Id, response.User.Name)
// for key, elem := range resp.User.Roles {
// log.Infof("Role on PND: %v %v \n", key, elem)
// }
}
package main
import (
"os"
"code.fbi.h-da.de/danet/gosdn/application-framework/event"
)
func main() {
eventService, err := event.NewEventService(
event.AmqpURIBuilder("amqp://", "guest", "guest", "127.0.0.1", "5672"),
[]string{"user"},
)
if err != nil {
panic(err)
}
app := &Application{
eventService: eventService,
stopChannel: make(chan os.Signal, 1),
}
app.Run()
}
...@@ -17,7 +17,6 @@ services: ...@@ -17,7 +17,6 @@ services:
ME_CONFIG_MONGODB_ADMINUSERNAME: root ME_CONFIG_MONGODB_ADMINUSERNAME: root
ME_CONFIG_MONGODB_ADMINPASSWORD: example ME_CONFIG_MONGODB_ADMINPASSWORD: example
rabbitmq: rabbitmq:
image: rabbitmq:3-management image: rabbitmq:3-management
ports: ports:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment