Skip to content
Snippets Groups Projects
Commit 86e90455 authored by Malte Bauch's avatar Malte Bauch Committed by Fabian Seidl
Browse files

Add method which allows to remove a receiver

See merge request !142
parent 3b31727f
Branches
No related tags found
1 merge request!142Add method which allows to remove a receiver
Pipeline #201952 passed
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"sync" "sync"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/sirupsen/logrus"
) )
type Receiver struct { type Receiver struct {
...@@ -22,12 +23,27 @@ func (r *Receiver) RequestReceiverChannel(pathId uuid.UUID) (<-chan struct{}, er ...@@ -22,12 +23,27 @@ func (r *Receiver) RequestReceiverChannel(pathId uuid.UUID) (<-chan struct{}, er
return newSubChan, nil return newSubChan, nil
} }
func (r *Receiver) RemoveReceiver(pathId uuid.UUID) error {
r.mu.Lock()
defer r.mu.Unlock()
if channel, ok := r.receivers[pathId]; ok {
close(channel)
delete(r.receivers, pathId)
} else {
return fmt.Errorf("There are no active subscribers for pathId: %s", pathId)
}
logrus.Debugf("Current active receivers after removing receiver for pathId: %s; Receivers: %v", pathId, r.receivers)
return nil
}
func (r *Receiver) InformReceiver(pathId uuid.UUID) error { func (r *Receiver) InformReceiver(pathId uuid.UUID) error {
r.mu.RLock() r.mu.RLock()
defer r.mu.RUnlock() defer r.mu.RUnlock()
if receiver, ok := r.receivers[pathId]; ok { if receiver, ok := r.receivers[pathId]; ok {
receiver <- struct{}{} go func() { receiver <- struct{}{} }()
} else { } else {
return fmt.Errorf("There are no active subscribers for pathId: %s", pathId) return fmt.Errorf("There are no active subscribers for pathId: %s", pathId)
} }
......
...@@ -87,6 +87,9 @@ func ksaReqHandler(eventBus *event.EventBus, receiver *Receiver, generateAndSend ...@@ -87,6 +87,9 @@ func ksaReqHandler(eventBus *event.EventBus, receiver *Receiver, generateAndSend
select { select {
case <-receiverChan: case <-receiverChan:
case <-time.After(20 * time.Second): case <-time.After(20 * time.Second):
if err := receiver.RemoveReceiver(pathId); err != nil {
logrus.Errorf("Failed removing receiver for pathId: %s ; err: %v", pathId, err)
}
http.Error(w, fmt.Sprintf("Platform Key exchange failed for RequestID: %s", kmsKeyRequest.RequestID), http.StatusInternalServerError) http.Error(w, fmt.Sprintf("Platform Key exchange failed for RequestID: %s", kmsKeyRequest.RequestID), http.StatusInternalServerError)
logrus.Errorf("Platform Key exchange failed for RequestID: %s", kmsKeyRequest.RequestID) logrus.Errorf("Platform Key exchange failed for RequestID: %s", kmsKeyRequest.RequestID)
return return
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment