Newer
Older
package integration_test_application
import (
"context"
"os"
"os/signal"
"strings"
"syscall"
"code.fbi.h-da.de/danet/gosdn/application-framework/event"
"code.fbi.h-da.de/danet/gosdn/application-framework/registration"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
const localhost = "127.0.0.1"
Fabian Seidl
committed
const rabbitmq = "rabbitmq"
// Application is an example for a sdn application.
type Application struct {
eventService event.ServiceInterface
stopChannel chan os.Signal
grpcClientConn *grpc.ClientConn
addEventChannel chan event.Event
updateEventChannel chan event.Event
deleteEventChannel chan event.Event
subscribeEventChannel chan event.Event
}
func NewApplication(ctx context.Context, grpcClientConn *grpc.ClientConn, controllerAddress string, topics []event.Topic, rabbitMQAddress string) *Application {
queueCredentials, err := registration.Register(ctx, controllerAddress, "integration-test-application", "SecurePresharedToken")
if err != nil {
logrus.Errorf("failed to register application on control plane. %v", err)
os.Exit(1)
}
Fabian Seidl
committed
if rabbitMQAddress == "" {
queueCredentials = strings.ReplaceAll(queueCredentials, rabbitmq, localhost)
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
}
eventService, err := event.NewEventService(
queueCredentials,
topics,
)
if err != nil {
logrus.Errorf("failed to create event service. %v", err)
os.Exit(1)
}
return &Application{
eventService: eventService,
stopChannel: make(chan os.Signal, 1),
grpcClientConn: grpcClientConn,
addEventChannel: make(chan event.Event, 1),
updateEventChannel: make(chan event.Event, 1),
deleteEventChannel: make(chan event.Event, 1),
subscribeEventChannel: make(chan event.Event, 1),
}
}
// Run runs the application.
func (a *Application) Run(eventTypeCallbackTuples []event.TypeToCallbackTuple) {
signal.Notify(a.stopChannel, os.Interrupt, syscall.SIGTERM)
a.eventService.SubscribeToEventType(eventTypeCallbackTuples)
a.eventService.SetupEventReciever(a.stopChannel)
var forever chan struct{}
go func() {
for {
select {
case <-a.stopChannel:
close(forever)
_ = a.grpcClientConn.Close()
return
}
}
}()
<-forever
}
func (a *Application) addCallback(event *event.Event) {
logrus.Infof("Incoming Event: EntityID: %v, ID: %v, PathsAndValues: %v, Type: %v", event.EntityID, event.ID, event.PathsAndValuesMap, event.Type)
a.addEventChannel <- *event
}
func (a *Application) updateCallback(event *event.Event) {
logrus.Infof("Incoming Event: EntityID: %v, ID: %v, PathsAndValues: %v, Type: %v", event.EntityID, event.ID, event.PathsAndValuesMap, event.Type)
a.updateEventChannel <- *event
}
func (a *Application) deleteCallback(event *event.Event) {
logrus.Infof("Incoming Event: EntityID: %v, ID: %v, PathsAndValues: %v, Type: %v", event.EntityID, event.ID, event.PathsAndValuesMap, event.Type)
a.deleteEventChannel <- *event
}
func (a *Application) subscribeCallback(event *event.Event) {
logrus.Infof("Incoming Event: EntityID: %v, ID: %v, PathsAndValues: %v, Type: %v", event.EntityID, event.ID, event.PathsAndValuesMap, event.Type)
a.subscribeEventChannel <- *event
}