Newer
Older
Fabian Seidl
committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package nucleus
import (
"time"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
)
// SubscriptionQueueWorker is an interface to define a worker to handle the updates from gNMI Subscription from the NetWorkElementWatcher.
type SubscriptionQueueWorker interface {
HandleGnmiSubscriptionUpdates(chan transport.SubscriptionInformation)
}
// SubscriptionQueueWorkerImpl implements the SubscriptionQueueWorker interface. This is used as a worker pool to handle gNMI subscription updates.
type SubscriptionQueueWorkerImpl struct {
WorkerName string
workFunc func(*transport.SubscriptionInformation, string)
}
// NewSubscriptionQueueWorker creates a new SubscriptionQueueWorker.
func NewSubscriptionQueueWorker(name string, workFunc func(*transport.SubscriptionInformation, string)) SubscriptionQueueWorker {
return &SubscriptionQueueWorkerImpl{
WorkerName: name,
workFunc: workFunc,
}
}
// HandleGnmiSubscriptionUpdates handles assignment of tasks to free workers of the worker pool from SubscriptionQueueWorkerImpl.
func (s *SubscriptionQueueWorkerImpl) HandleGnmiSubscriptionUpdates(subInfoChan chan transport.SubscriptionInformation) {
// Note: Sleep was needed to prevent some issue with deadlocks.
// Maybe this needs some additional investigation/improvements in the future
for {
select {
case subInfo := <-subInfoChan:
s.workFunc(&subInfo, s.WorkerName)
time.Sleep(time.Millisecond * 2)
default:
time.Sleep(time.Millisecond * 2)
}
}
}