diff --git a/goKMS/kms/akms/server/receiver.go b/goKMS/kms/akms/server/receiver.go index c5680ebb248c9b884e805ac103cbae9da2ba13ca..893b627ba343ce64ae477c3336b73bf05f252c9e 100644 --- a/goKMS/kms/akms/server/receiver.go +++ b/goKMS/kms/akms/server/receiver.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/google/uuid" + "github.com/sirupsen/logrus" ) type Receiver struct { @@ -22,12 +23,27 @@ func (r *Receiver) RequestReceiverChannel(pathId uuid.UUID) (<-chan struct{}, er return newSubChan, nil } +func (r *Receiver) RemoveReceiver(pathId uuid.UUID) error { + r.mu.Lock() + defer r.mu.Unlock() + + if channel, ok := r.receivers[pathId]; ok { + close(channel) + delete(r.receivers, pathId) + } else { + return fmt.Errorf("There are no active subscribers for pathId: %s", pathId) + } + + logrus.Debugf("Current active receivers after removing receiver for pathId: %s; Receivers: %v", pathId, r.receivers) + return nil +} + func (r *Receiver) InformReceiver(pathId uuid.UUID) error { r.mu.RLock() defer r.mu.RUnlock() if receiver, ok := r.receivers[pathId]; ok { - receiver <- struct{}{} + go func() { receiver <- struct{}{} }() } else { return fmt.Errorf("There are no active subscribers for pathId: %s", pathId) } diff --git a/goKMS/kms/akms/server/server.go b/goKMS/kms/akms/server/server.go index 87eea666af6bd36ecd25616a8c500a32b8b451c7..b3cb5dfaa5944e13b23ff9d47c16bf79f9cbb6ff 100644 --- a/goKMS/kms/akms/server/server.go +++ b/goKMS/kms/akms/server/server.go @@ -87,6 +87,9 @@ func ksaReqHandler(eventBus *event.EventBus, receiver *Receiver, generateAndSend select { case <-receiverChan: case <-time.After(20 * time.Second): + if err := receiver.RemoveReceiver(pathId); err != nil { + logrus.Errorf("Failed removing receiver for pathId: %s ; err: %v", pathId, err) + } http.Error(w, fmt.Sprintf("Platform Key exchange failed for RequestID: %s", kmsKeyRequest.RequestID), http.StatusInternalServerError) logrus.Errorf("Platform Key exchange failed for RequestID: %s", kmsKeyRequest.RequestID) return