diff --git a/cmd/start.go b/cmd/start.go index 910ec8b84362110ba483b494a698c697bf4246f9..20eeb5232f96864b1b376ecf2e55907d9057ebb6 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -115,6 +115,7 @@ var startCmd = &cobra.Command{ system.NewSystemHandler(), danet.NewKmsHandler(ekmsClient), danet.NewPeerHandler(ekmsClient), + danet.NewKeyRoutingSessionHandler(ekmsClient), danet.NewAssignForwardingHandler(ekmsClient), } diff --git a/etsiqkdnclient/etsi-qkdn-client.go b/etsiqkdnclient/etsi-qkdn-client.go index 43cd5238af67f51540336d28a5893748ca399413..a816338273eff9e63ae242a39d42b98a758281ec 100644 --- a/etsiqkdnclient/etsi-qkdn-client.go +++ b/etsiqkdnclient/etsi-qkdn-client.go @@ -67,7 +67,6 @@ type EkmsClient interface { Version() *ekmsVersionInformation ID() uuid.UUID Ekms() *kms.EKMS - Subscribe() chan string } // TODO: change this in the future @@ -127,7 +126,6 @@ func NewEkmsClient(bootInfo *Config) (myInfo *ekmsInfo) { func emulatedKMS(config *Config, id uuid.UUID, peerChannel chan string) *kms.EKMS { // Attach to eKMS emuKMS := kms.NewEKMS(config.Name, id, os.Stdout, log.TraceLevel, false, config.InterComAddr) - emuKMS.AddExternalNotifierKMSPeer(peerChannel) var qm kms.QuantumModule var err error @@ -184,7 +182,3 @@ func (qkdnInfo *ekmsInfo) ID() uuid.UUID { func (qkdnInfo *ekmsInfo) Ekms() *kms.EKMS { return qkdnInfo.kms } - -func (qkdnInfo *ekmsInfo) Subscribe() chan string { - return qkdnInfo.KmsPeerUpdateChannel -} diff --git a/handlers/danet/keyRoutingSessionsHandler.go b/handlers/danet/keyRoutingSessionsHandler.go new file mode 100644 index 0000000000000000000000000000000000000000..6508e4de3d018d4222fdd1aa4a8c9455c839331f --- /dev/null +++ b/handlers/danet/keyRoutingSessionsHandler.go @@ -0,0 +1,138 @@ +package danet + +import ( + "fmt" + + "code.fbi.h-da.de/danet/ekms/etsiqkdnclient" + "code.fbi.h-da.de/danet/ekms/internal/kms/event" + gnmitargetygot "code.fbi.h-da.de/danet/ekms/model" + "github.com/openconfig/gnmi/proto/gnmi" + "github.com/openconfig/ygot/ygot" + log "github.com/sirupsen/logrus" +) + +type KeyRoutingSessionHandler struct { + name string + paths map[string]struct{} + ekmsClient etsiqkdnclient.EkmsClient + events <-chan event.Event +} + +func NewKeyRoutingSessionHandler(client etsiqkdnclient.EkmsClient) *KeyRoutingSessionHandler { + return &KeyRoutingSessionHandler{ + name: "kms-handler", + paths: map[string]struct{}{ + // TODO:change to right path + "/key-routing-sessions": struct{}{}, + }, + ekmsClient: client, + } +} + +func (yh *KeyRoutingSessionHandler) Name() string { + return yh.name +} + +func (yh *KeyRoutingSessionHandler) Paths() map[string]struct{} { + return yh.paths +} + +func (yh *KeyRoutingSessionHandler) Init(c ygot.ValidatedGoStruct) error { + config, ok := c.(*gnmitargetygot.Gnmitarget) + if !ok { + return fmt.Errorf("failed type assertion for newConfig %T", (*gnmitargetygot.Gnmitarget)(nil)) + } + + var err error + yh.events, err = yh.ekmsClient.Ekms().EventBus().Subscribe(event.ROUTE) + if err != nil { + return err + } + + // Create ygot structs for the kms + confKeyRoutingSessions := config.GetOrCreateKeyRoutingSessions() + + // Fill out the known fields in the ygot structs + err = updateOrCreateKeyRoutingSessions(confKeyRoutingSessions, yh.ekmsClient) + if err != nil { + return err + } + + // Start the go routine that takes care of any update from the kms + go func() { + for { + select { + case <-yh.events: + log.Println("Update for Routes.") + if err := updateOrCreateKeyRoutingSessions(confKeyRoutingSessions, yh.ekmsClient); err != nil { + log.Println("Error within route update goroutine.") + } + } + } + }() + + return nil +} + +func (yh *KeyRoutingSessionHandler) Update(c ygot.ValidatedGoStruct, jobs []*gnmi.Update) error { + fmt.Println("Update request received for ", yh.name) + config, ok := c.(*gnmitargetygot.Gnmitarget) + if !ok { + return fmt.Errorf("failed type assertion for config %T", (*gnmitargetygot.Gnmitarget)(nil)) + } + + routingSessions := config.GetOrCreateKeyRoutingSessions() + for _, routingSession := range routingSessions.RoutingSessions { + pathId := routingSession.GetPathId() + nextHop := routingSession.GetNextHop() + prevHop := routingSession.GetPrevHop() + + var nextHopString string + var prevHopString string + if nextHop != nil { + nextHopString = fmt.Sprintf("%s:%d", nextHop.GetIpAddress(), nextHop.GetPort()) + } + if prevHop != nil { + prevHopString = fmt.Sprintf("%s:%d", prevHop.GetIpAddress(), prevHop.GetPort()) + } + + err := yh.ekmsClient.Ekms().AssignForwardingRoute(pathId, prevHopString, nextHopString) + if err != nil { + return err + } + } + + return nil +} + +func updateOrCreateKeyRoutingSessions(confKMS *gnmitargetygot.Temp_KeyRoutingSessions, ekmsClient etsiqkdnclient.EkmsClient) error { + ekmsRoutingTable := ekmsClient.Ekms().RoutingTableDeepCopy() + + for _, route := range ekmsRoutingTable { + confTempRoutingSession := confKMS.GetOrCreateRoutingSessions(route.PathId.String()) + + if route.Previous != nil { + confPrevHop := confTempRoutingSession.GetOrCreatePrevHop() + confPrevHop.NodeId = ygot.String(route.Previous.GetKmsPeerId().String()) + confPrevHop.IpAddress = ygot.String(route.Previous.Address().IP.String()) + confPrevHop.Port = ygot.Uint16(route.Previous.Address().AddrPort().Port()) + } + + if route.Next != nil { + confNextHop := confTempRoutingSession.GetOrCreateNextHop() + confNextHop.NodeId = ygot.String(route.Next.GetKmsPeerId().String()) + confNextHop.IpAddress = ygot.String(route.Next.Address().IP.String()) + confNextHop.Port = ygot.Uint16(route.Next.Address().AddrPort().Port()) + } + + // TODO: add key properties + //confKeyProperties := confTempRoutingSession.GetOrCreateKeyProperties() + + // TODO: add amount key forwarded + //confTempRoutingSession.AmountKeysForwarded = ygot.Int64(0) + + // TODO: add operation status + } + + return nil +} diff --git a/handlers/danet/kmsHandler.go b/handlers/danet/kmsHandler.go index fb2c62df181465c8e93c04e35f9b43762f9f9aad..f53554101516409e3e4cc3a638a89e950529a431 100644 --- a/handlers/danet/kmsHandler.go +++ b/handlers/danet/kmsHandler.go @@ -39,9 +39,6 @@ func (yh *KmsHandler) Init(c ygot.ValidatedGoStruct) error { return fmt.Errorf("failed type assertion for newConfig %T", (*gnmitargetygot.Gnmitarget)(nil)) } - //Prepare the eKMS to send KMSPeer notifications - yh.ekmsClient.Ekms().AddExternalNotifierKMSPeer(yh.ekmsClient.Subscribe()) - // Create ygot structs for the kms confEkms := config.GetOrCreateKms() diff --git a/handlers/danet/peerHandler.go b/handlers/danet/peerHandler.go index 13d396c0a82c22cab7577b50beeeb8031964e02e..ddefca7d9b01d52bd83c434af8db33409b0ceb8d 100644 --- a/handlers/danet/peerHandler.go +++ b/handlers/danet/peerHandler.go @@ -5,6 +5,7 @@ import ( "code.fbi.h-da.de/danet/ekms/etsiqkdnclient" "code.fbi.h-da.de/danet/ekms/internal/kms" + "code.fbi.h-da.de/danet/ekms/internal/kms/event" gnmitargetygot "code.fbi.h-da.de/danet/ekms/model" "github.com/openconfig/gnmi/proto/gnmi" "github.com/openconfig/ygot/ygot" @@ -15,6 +16,7 @@ type PeerHandler struct { name string paths map[string]struct{} ekmsClient etsiqkdnclient.EkmsClient + events <-chan event.Event } func NewPeerHandler(client etsiqkdnclient.EkmsClient) *PeerHandler { @@ -42,24 +44,26 @@ func (yh *PeerHandler) Init(c ygot.ValidatedGoStruct) error { return fmt.Errorf("failed type assertion for newConfig %T", (*gnmitargetygot.Gnmitarget)(nil)) } - //Prepare the eKMS to send KMSPeer notifications - yh.ekmsClient.Ekms().AddExternalNotifierKMSPeer(yh.ekmsClient.Subscribe()) + var err error + yh.events, err = yh.ekmsClient.Ekms().EventBus().Subscribe(event.PEER) + if err != nil { + return err + } // Create ygot structs for the kms confPeerTable := config.GetOrCreateKmsPeerTable() // Fill out the known fields in the ygot structs - err := updateOrCreatePeerTable(confPeerTable, yh.ekmsClient) + err = updateOrCreatePeerTable(confPeerTable, yh.ekmsClient) if err != nil { return err } // Start the go routine that takes care of any update from the kms go func() { - kmsChan := yh.ekmsClient.Subscribe() for { select { - case <-kmsChan: + case <-yh.events: log.Println("Update for KMSPeer.") if err := updateOrCreatePeerTable(confPeerTable, yh.ekmsClient); err != nil { log.Println("Error within kmspeer update goroutine.") diff --git a/internal/kms/event/bus.go b/internal/kms/event/bus.go new file mode 100644 index 0000000000000000000000000000000000000000..562e4a1e3750cbc07161fe28afbc32fb0bbb7ead --- /dev/null +++ b/internal/kms/event/bus.go @@ -0,0 +1,42 @@ +package event + +import "fmt" + +type EventBus struct { + subscribers map[Topic]map[chan<- Event]struct{} +} + +func NewEventBus() *EventBus { + return &EventBus{ + subscribers: make(map[Topic]map[chan<- Event]struct{}), + } +} + +func (b *EventBus) Subscribe(topic Topic) (<-chan Event, error) { + newSubChan := make(chan Event) + + subs, ok := b.subscribers[topic] + if !ok { + initialSub := map[chan<- Event]struct{}{ + newSubChan: struct{}{}, + } + b.subscribers[topic] = initialSub + } else if subs != nil { + subs[newSubChan] = struct{}{} + } else { + return nil, fmt.Errorf("could not subscribe to topic: %d", topic) + } + + return newSubChan, nil +} + +func (b *EventBus) Publish(event Event) error { + subs, ok := b.subscribers[event.Topic()] + if !ok { + return fmt.Errorf("There are no active subscribers for topic: %d", event.Topic()) + } + for sub, _ := range subs { + sub <- event + } + return nil +} diff --git a/internal/kms/event/event.go b/internal/kms/event/event.go new file mode 100644 index 0000000000000000000000000000000000000000..43b3da591ca433d7931a70679965a56b29384a1d --- /dev/null +++ b/internal/kms/event/event.go @@ -0,0 +1,63 @@ +package event + +import "time" + +type Topic int + +const ( + PEER Topic = iota + ROUTE + QUANTUM_MODULE +) + +// Event ... +type Event interface { + Topic() Topic + Time() time.Time +} + +type PeerEvent struct { + EventTopic Topic + Timestamp time.Time + SocketStr string +} + +func NewPeerEvent(socket string) *PeerEvent { + return &PeerEvent{ + EventTopic: PEER, + Timestamp: time.Now(), + SocketStr: socket, + } +} + +func (e *PeerEvent) Topic() Topic { + return e.EventTopic +} + +func (e *PeerEvent) Time() time.Time { + return e.Timestamp +} + +func (e *PeerEvent) Socket() string { + return e.SocketStr +} + +type RouteEvent struct { + EventTopic Topic + Timestamp time.Time +} + +func NewRouteEvent() *RouteEvent { + return &RouteEvent{ + EventTopic: ROUTE, + Timestamp: time.Now(), + } +} + +func (e *RouteEvent) Topic() Topic { + return e.EventTopic +} + +func (e *RouteEvent) Time() time.Time { + return e.Timestamp +} diff --git a/internal/kms/kms.go b/internal/kms/kms.go index b63f2d47e77106471680e8575b112844f2157b18..86cc72e4c6e465caa604dd6edc1ce3d6f91d54ca 100644 --- a/internal/kms/kms.go +++ b/internal/kms/kms.go @@ -17,22 +17,13 @@ import ( pbETSI "code.fbi.h-da.de/danet/ekms/internal/api/gen/proto/go/kmsetsi" pbIC "code.fbi.h-da.de/danet/ekms/internal/api/gen/proto/go/kmsintercom" + "code.fbi.h-da.de/danet/ekms/internal/kms/event" pbQS "code.fbi.h-da.de/danet/quipsec/gen/go/quipsec" "github.com/google/uuid" ) -//type Qkdnkms interface { -// //AddExternalNotifierGeneral(chan bool) // used to indicate unspecific changes -// AddExternalNotifierQLE(chan uint32) // used to indicate changes to specific Quantum Link Element (QLE) -// AddExternalNotifierKMSPeer(chan string) // used to indicate changes to specific KMSPeer -// AddQuantumElement() *EmulatedQuantumModule -// GlobalKeyHandler(time.Duration) error -// AddPeer(kmsPeerSocket string, servingQLE *EmulatedQuantumModule) -// RemovePeer(kmsPeerSocket string) -// FindPeerUuid(uuid.UUID) *kmsPeer -//} - type Route struct { + PathId uuid.UUID Previous *kmsPeer Next *kmsPeer } @@ -50,21 +41,20 @@ type EKMS struct { kmsName string kmsUUID uuid.UUID interComAddr string - qleMapMutex sync.Mutex // TODO create a mapping between ids and address quantumModules map[string]QuantumModule quantumModulesMutex sync.RWMutex - externalNotifierQLE chan uint32 kmsPeersMutex sync.Mutex // TODO(maba): find a better name for this // TODO: add mutex - keysForPathId map[uuid.UUID]string - routingTable map[uuid.UUID]*Route - KmsPeers map[string]*kmsPeer - externalNotifierKMSPeer chan string + keysForPathId map[uuid.UUID]string + routingTable map[uuid.UUID]*Route + routingTableMutex sync.RWMutex + KmsPeers map[string]*kmsPeer pbETSI.UnimplementedKmsETSIServer pbIC.UnimplementedKmsTalkerServer supportedKeyLengths map[BitKeyLength]bool + eventBus *event.EventBus } // Will keep information about the quantum elements that this EKMS is talking to @@ -97,15 +87,14 @@ func NewEKMS(kmsName string, kmsUUID uuid.UUID, logOutput io.Writer, logLevel lo } createdEKMS := &EKMS{ - kmsName: kmsName, - kmsUUID: kmsUUID, - interComAddr: interComAddr, - quantumModules: make(map[string]QuantumModule), - routingTable: make(map[uuid.UUID]*Route), - KmsPeers: make(map[string]*kmsPeer), - externalNotifierQLE: nil, // just be surely set to nil! - externalNotifierKMSPeer: nil, // just be surely set to nil! - supportedKeyLengths: make(map[BitKeyLength]bool), + kmsName: kmsName, + kmsUUID: kmsUUID, + interComAddr: interComAddr, + quantumModules: make(map[string]QuantumModule), + routingTable: make(map[uuid.UUID]*Route), + KmsPeers: make(map[string]*kmsPeer), + supportedKeyLengths: make(map[BitKeyLength]bool), + eventBus: event.NewEventBus(), } createdEKMS.supportedKeyLengths[BitKeyLen256] = true @@ -161,7 +150,7 @@ func (kms *EKMS) AddPeer(peerKmsId string, kmsPeerSocket string, servingQLE Quan log.Errorf("Trying to add existing peer %s", kmsPeerSocket) return nil, fmt.Errorf("Trying to add existing peer %s", kmsPeerSocket) } - peer, err := NewKmsPeer(peerKmsId, servingQLE, kmsPeerSocket, kms.interComAddr, kms.externalNotifierKMSPeer) + peer, err := NewKmsPeer(peerKmsId, servingQLE, kmsPeerSocket, kms.interComAddr, kms.eventBus) if err != nil { return nil, err } @@ -197,15 +186,27 @@ func (kms *EKMS) AssignForwardingRoute(pId, pHop, nHop string) error { } } + kms.routingTableMutex.Lock() // set the route within routing table kms.routingTable[pathId] = &Route{ + PathId: pathId, Previous: previousHop, Next: nextHop, } + kms.routingTableMutex.Unlock() + + err = kms.eventBus.Publish(event.NewRouteEvent()) + if err != nil { + log.Error(err) + } return nil } +func (kms *EKMS) EventBus() *event.EventBus { + return kms.eventBus +} + // TODO/XXX error handling func (kms *EKMS) RemovePeer(kmsPeerSocket string) { if _, there := kms.KmsPeers[kmsPeerSocket]; there { @@ -217,14 +218,6 @@ func (kms *EKMS) RemovePeer(kmsPeerSocket string) { return } -func (kms *EKMS) AddExternalNotifierQLE(in chan uint32) { - kms.externalNotifierQLE = in -} - -func (kms *EKMS) AddExternalNotifierKMSPeer(in chan string) { - kms.externalNotifierKMSPeer = in -} - func (kms *EKMS) FindPeerUuid(lookup uuid.UUID) (peer *kmsPeer) { if kms.KmsPeers != nil { for _, peer = range kms.KmsPeers { @@ -237,6 +230,18 @@ func (kms *EKMS) FindPeerUuid(lookup uuid.UUID) (peer *kmsPeer) { return nil } +func (kms *EKMS) RoutingTableDeepCopy() map[uuid.UUID]*Route { + copy := make(map[uuid.UUID]*Route, len(kms.KmsPeers)) + + kms.routingTableMutex.Lock() + for k, v := range kms.routingTable { + copy[k] = v + } + kms.routingTableMutex.Unlock() + + return copy +} + func (kms *EKMS) PeersDeepCopy() map[string]*kmsPeer { copy := make(map[string]*kmsPeer, len(kms.KmsPeers)) diff --git a/internal/kms/kmsetsi.go b/internal/kms/kmsetsi.go index 4a491da07a78d4ee25ece9cc71e9d2b5f10ec235..7e26e469ac7046cafccc96807679aa979d5cee8d 100644 --- a/internal/kms/kmsetsi.go +++ b/internal/kms/kmsetsi.go @@ -12,6 +12,7 @@ import ( log "github.com/sirupsen/logrus" pb "code.fbi.h-da.de/danet/ekms/internal/api/gen/proto/go/kmsetsi" + "code.fbi.h-da.de/danet/ekms/internal/kms/event" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -124,12 +125,18 @@ func (es *etsiServer) ETSIAssignForwarding(ctx context.Context, in *pb.ETSIAssig // set the route within routing table es.handlingEkms.routingTable[pathId] = &Route{ + PathId: pathId, Previous: previousHop, Next: nextHop, } log.Infof("%s added a routing table entry for path id: %s", es.handlingEkms.kmsName, pathId.String()) + err = es.handlingEkms.eventBus.Publish(event.NewRouteEvent()) + if err != nil { + log.Error(err) + } + return &pb.ETSIAssignForwardingReply{ Timestamp: time.Now().Unix(), }, nil diff --git a/internal/kms/kmsintercom.go b/internal/kms/kmsintercom.go index 64fac9bad92e296a2a06de57d84fbd2f9a2a59fb..dfe4161d78778089bd08df89520cff06f14e833f 100644 --- a/internal/kms/kmsintercom.go +++ b/internal/kms/kmsintercom.go @@ -11,6 +11,7 @@ import ( etsi14 "code.fbi.h-da.de/danet/ekms/api/go/rest/etsi/client" pb "code.fbi.h-da.de/danet/ekms/internal/api/gen/proto/go/kmsintercom" + "code.fbi.h-da.de/danet/ekms/internal/kms/event" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -145,8 +146,11 @@ func (s *kmsTalkerServer) SyncKeyIdsForBulk(ctx context.Context, in *pb.SyncKeyI // update the peer status to up peer.peerStatus = KmsPeerUp // Send notification about change - if peer.externalNotifierKMSPeer != nil { - peer.externalNotifierKMSPeer <- peer.tcpSocketStr + if peer.eventBus != nil { + err := peer.eventBus.Publish(event.NewPeerEvent(peer.tcpSocketStr)) + if err != nil { + log.Error(err) + } } return &pb.SyncKeyIdsForBulkResponse{ diff --git a/internal/kms/kmspeers.go b/internal/kms/kmspeers.go index ab2cec0c303d0d0bc460aa9bdd0e8cf07b6224b7..301c0aec6839f9922c46e2fa6b8af18de6f705a8 100644 --- a/internal/kms/kmspeers.go +++ b/internal/kms/kmspeers.go @@ -8,6 +8,7 @@ import ( "time" pbIC "code.fbi.h-da.de/danet/ekms/internal/api/gen/proto/go/kmsintercom" + "code.fbi.h-da.de/danet/ekms/internal/kms/event" "github.com/google/uuid" log "github.com/sirupsen/logrus" "google.golang.org/grpc" @@ -31,21 +32,21 @@ type kmsPeerInfo interface { } type kmsPeer struct { - externalNotifierKMSPeer chan string - peerClient pbIC.KmsTalkerClient - peerStatus KmsPeerStatus - peerKmsId uuid.UUID // NOTE: might be changed in the future - interComAddr string - servingQuantumModul QuantumModule - tcpSocket *net.TCPAddr // the IP address and TCP port (aka socket) of the kms peer - tcpSocketStr string // string rep. of tcpSocket - et CryptoAlgorithm - name string // the name of the kms peer - id uuid.UUID // uuid of the peer - quit chan bool // cancel the peer goroutine + peerClient pbIC.KmsTalkerClient + peerStatus KmsPeerStatus + peerKmsId uuid.UUID // NOTE: might be changed in the future + interComAddr string + servingQuantumModul QuantumModule + tcpSocket *net.TCPAddr // the IP address and TCP port (aka socket) of the kms peer + tcpSocketStr string // string rep. of tcpSocket + et CryptoAlgorithm + name string // the name of the kms peer + id uuid.UUID // uuid of the peer + quit chan bool // cancel the peer goroutine + eventBus *event.EventBus } -func NewKmsPeer(peerKmsId string, servQM QuantumModule, tcpSocketStr string, interComAddr string, in chan string) (*kmsPeer, error) { +func NewKmsPeer(peerKmsId string, servQM QuantumModule, tcpSocketStr string, interComAddr string, eventBus *event.EventBus) (*kmsPeer, error) { if servQM.Peer() != nil { return nil, fmt.Errorf("QuantumModule with ID: , already has a peer", servQM.ID()) } @@ -67,7 +68,6 @@ func NewKmsPeer(peerKmsId string, servQM QuantumModule, tcpSocketStr string, int } peer := &kmsPeer{ - externalNotifierKMSPeer: in, // TODO: rename to client peerClient: peerClient, // TODO: change this, only for demo purposes @@ -81,6 +81,7 @@ func NewKmsPeer(peerKmsId string, servQM QuantumModule, tcpSocketStr string, int et: NewAES(), id: uuid.New(), quit: make(chan bool), + eventBus: eventBus, } servQM.SetPeer(peer) diff --git a/internal/kms/module.go b/internal/kms/module.go index 59437b0f27dcc67826a641df3cd79b9822324fd5..93b91d7d4a4ded05560a1a64117feff0a8b509a1 100644 --- a/internal/kms/module.go +++ b/internal/kms/module.go @@ -10,6 +10,7 @@ import ( etsi14 "code.fbi.h-da.de/danet/ekms/api/go/rest/etsi/client" pbIC "code.fbi.h-da.de/danet/ekms/internal/api/gen/proto/go/kmsintercom" + "code.fbi.h-da.de/danet/ekms/internal/kms/event" restclient "code.fbi.h-da.de/danet/ekms/restclient" "code.fbi.h-da.de/danet/quantumlayer" "github.com/google/uuid" @@ -117,8 +118,11 @@ func (eqe *EmulatedQuantumModule) Sync() error { // update the peer status to up eqe.peer.peerStatus = KmsPeerUp // Send notification about change - if eqe.peer.externalNotifierKMSPeer != nil { - eqe.peer.externalNotifierKMSPeer <- eqe.peer.tcpSocketStr + if eqe.peer.eventBus != nil { + err := eqe.peer.eventBus.Publish(event.NewPeerEvent(eqe.peer.tcpSocketStr)) + if err != nil { + log.Error(err) + } } return nil