Skip to content
Snippets Groups Projects
subscriptionQueueHandler.go 1.46 KiB
Newer Older
  • Learn to ignore specific revisions
  • package nucleus
    
    import (
    	"time"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
    )
    
    // SubscriptionQueueWorker is an interface to define a worker to handle the updates from gNMI Subscription from the NetWorkElementWatcher.
    type SubscriptionQueueWorker interface {
    	HandleGnmiSubscriptionUpdates(chan transport.SubscriptionInformation)
    }
    
    // SubscriptionQueueWorkerImpl implements the SubscriptionQueueWorker interface. This is used as a worker pool to handle gNMI subscription updates.
    type SubscriptionQueueWorkerImpl struct {
    	WorkerName string
    	workFunc   func(*transport.SubscriptionInformation, string)
    }
    
    // NewSubscriptionQueueWorker creates a new SubscriptionQueueWorker.
    func NewSubscriptionQueueWorker(name string, workFunc func(*transport.SubscriptionInformation, string)) SubscriptionQueueWorker {
    	return &SubscriptionQueueWorkerImpl{
    		WorkerName: name,
    		workFunc:   workFunc,
    	}
    }
    
    // HandleGnmiSubscriptionUpdates handles assignment of tasks to free workers of the worker pool from SubscriptionQueueWorkerImpl.
    func (s *SubscriptionQueueWorkerImpl) HandleGnmiSubscriptionUpdates(subInfoChan chan transport.SubscriptionInformation) {
    	// Note: Sleep was needed to prevent some issue with deadlocks.
    	// Maybe this needs some additional investigation/improvements in the future
    	for {
    		select {
    		case subInfo := <-subInfoChan:
    			s.workFunc(&subInfo, s.WorkerName)
    			time.Sleep(time.Millisecond * 2)
    		default:
    			time.Sleep(time.Millisecond * 2)
    		}
    	}
    }