diff --git a/controller/interfaces/change/change.go b/controller/interfaces/change/change.go index 489b42303323759316675953d934b259cd5f7760..3ec9de55589e7a450502610c2a4fa15f54f47678 100644 --- a/controller/interfaces/change/change.go +++ b/controller/interfaces/change/change.go @@ -24,11 +24,3 @@ type Change interface { Diff() *gnmi.Notification AssociatedDeviceID() uuid.UUID } - -// Payload contains two ygot.GoStructs, the first represents the original state -// before the change was applied and the second repesents the modified state. -type Payload struct { - Original []byte - Modified []byte - Diff *gnmi.Notification -} diff --git a/controller/interfaces/networkelement/networkElement.go b/controller/interfaces/networkelement/networkElement.go index 3d32fca88fcb129cd6e23df01a5e4847d3644255..295e7b784af48b6d66636024685c12763cbbfc21 100644 --- a/controller/interfaces/networkelement/networkElement.go +++ b/controller/interfaces/networkelement/networkElement.go @@ -3,17 +3,16 @@ package networkelement import ( "context" + tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport" "code.fbi.h-da.de/danet/gosdn/controller/conflict" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/plugin" - "code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport" "code.fbi.h-da.de/danet/gosdn/controller/nucleus/domain/model" + "github.com/google/uuid" gpb "github.com/openconfig/gnmi/proto/gnmi" "github.com/openconfig/ygot/ygot" log "github.com/sirupsen/logrus" "google.golang.org/protobuf/proto" - - tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport" ) // NetworkElement represents an Managed Network Element (MNE) which is managed by @@ -23,7 +22,7 @@ type NetworkElement interface { GetModel() ([]byte, error) GetPlugin() plugin.Plugin GetModelAsFilteredCopy() ([]byte, error) - Transport() transport.Transport + Transport() model.Transport Name() string ProcessResponse(proto.Message) error IsTransportValid() bool @@ -72,7 +71,7 @@ type LoadedNetworkElement struct { // EnsureIntendedConfigurationIsAppliedOnNetworkElement pushes the stored // configuration to a network element. // TODO: find a better place for this function. -func EnsureIntendedConfigurationIsAppliedOnNetworkElement(mne model.NetworkElement) error { +func EnsureIntendedConfigurationIsAppliedOnNetworkElement(mne NetworkElement) error { model, err := mne.GetModelAsFilteredCopy() if err != nil { return err diff --git a/controller/interfaces/transport/transport.go b/controller/interfaces/transport/transport.go deleted file mode 100644 index 90fbb3ccbcbcc31b082bdfbd038c3034566128ad..0000000000000000000000000000000000000000 --- a/controller/interfaces/transport/transport.go +++ /dev/null @@ -1,38 +0,0 @@ -package transport - -import ( - "context" - - "code.fbi.h-da.de/danet/gosdn/controller/interfaces/change" - - gpb "github.com/openconfig/gnmi/proto/gnmi" -) - -// Transport provides an interface for Transport implementations -// like RESTCONF or gnmi. -type Transport interface { - Get(ctx context.Context, params ...string) (any, error) - Set(ctx context.Context, payload change.Payload) error - CustomSet(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse, error) - Subscribe(ctx context.Context, params ...string) error - ControlPlaneSubscribe(ctx context.Context, subscriptionInfo SubscriptionInformation, subInfoChannel chan SubscriptionInformation) error - Type() string - ProcessResponse(resp interface{}) error - ProcessControlPlaneSubscribeResponse(resp *gpb.SubscribeResponse_Update) error -} - -type ( - // HandleSubscribeResponse is the callback function to handle subcription responses. - HandleSubscribeResponse func(*gpb.SubscribeResponse, *SubscriptionInformation) -) - -// SubscriptionInformation contains additional information used for internal subscriptions -// for distinguishing from which network element the information is from, to stop subscriptions and -// error handling. -type SubscriptionInformation struct { - PndID string - NetworkElementID string - NetworkElementName string - StopContext context.Context - SubResponse *gpb.SubscribeResponse -} diff --git a/controller/northbound/server/networkElement.go b/controller/northbound/server/networkElement.go index ce6b9ba38d01bf00d82731b5559b6e972914e925..023f33226df93c0dce2b1857209d3789e9ccb90f 100644 --- a/controller/northbound/server/networkElement.go +++ b/controller/northbound/server/networkElement.go @@ -11,7 +11,6 @@ import ( tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport" "code.fbi.h-da.de/danet/gosdn/controller/conflict" "code.fbi.h-da.de/danet/gosdn/controller/customerrs" - "code.fbi.h-da.de/danet/gosdn/controller/interfaces/change" "code.fbi.h-da.de/danet/gosdn/controller/metrics" "code.fbi.h-da.de/danet/gosdn/controller/nucleus/domain/model" "code.fbi.h-da.de/danet/gosdn/controller/nucleus/domain/ports" @@ -946,7 +945,7 @@ func (n *NetworkElementServer) ChangeMNE(duid uuid.UUID, operation mnepb.ApiOper callback := func(original, modified []byte) error { ctx := context.WithValue(context.Background(), types.CtxKeyOperation, operation) // nolint - payload := change.Payload{Original: original, Modified: modified, Diff: diff} + payload := model.Payload{Original: original, Modified: modified, Diff: diff} return mne.Transport().Set(ctx, payload) } diff --git a/controller/nucleus/domain/model/change.go b/controller/nucleus/domain/model/change.go index 41f548287ebc359a12fbf26d9a4668c5ca386664..35dd320ffa7c74d0379cee1b97bcb553b81725cb 100644 --- a/controller/nucleus/domain/model/change.go +++ b/controller/nucleus/domain/model/change.go @@ -223,3 +223,11 @@ func stateManager(ctx context.Context, ch *Change, timeout time.Duration) (chan< }() return stateIn, stateOut, errChan } + +// Payload contains two ygot.GoStructs, the first represents the original state +// before the change was applied and the second repesents the modified state. +type Payload struct { + Original []byte + Modified []byte + Diff *gnmi.Notification +} diff --git a/controller/nucleus/domain/model/gnmi_transport.go b/controller/nucleus/domain/model/gnmi_transport.go index 5676c6a501ba3629d46defb51652e503d73e7205..9d6a9827dc5b85fac1679ec04bd82b88c0a51e8e 100644 --- a/controller/nucleus/domain/model/gnmi_transport.go +++ b/controller/nucleus/domain/model/gnmi_transport.go @@ -7,8 +7,6 @@ import ( "time" "code.fbi.h-da.de/danet/gosdn/controller/config" - "code.fbi.h-da.de/danet/gosdn/controller/interfaces/change" - tpInterface "code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport" "code.fbi.h-da.de/danet/gosdn/controller/plugin/shared" "code.fbi.h-da.de/danet/gosdn/controller/customerrs" @@ -96,7 +94,7 @@ func (g *Gnmi) Get(ctx context.Context, params ...string) (interface{}, error) { } // Set takes a change.Payload struct. -func (g *Gnmi) Set(ctx context.Context, payload change.Payload) error { +func (g *Gnmi) Set(ctx context.Context, payload Payload) error { if g.client == nil { return &customerrs.NilClientError{} } @@ -105,7 +103,7 @@ func (g *Gnmi) Set(ctx context.Context, payload change.Payload) error { return g.applyDiff(ctx, payload) } -func (g *Gnmi) applyDiff(ctx context.Context, payload change.Payload) error { +func (g *Gnmi) applyDiff(ctx context.Context, payload Payload) error { diff := payload.Diff updates := diff.GetUpdate() @@ -136,7 +134,10 @@ func (g *Gnmi) Subscribe(ctx context.Context, params ...string) error { // ControlPlaneSubscribe is used to subscribe to devices from within the controller. gNMI SubscribeOptions need to be provided in the context, // the callback function handles the responses received from the subscription. -func (g *Gnmi) ControlPlaneSubscribe(ctx context.Context, subscriptionInfo tpInterface.SubscriptionInformation, subInfoChannel chan tpInterface.SubscriptionInformation) error { +func (g *Gnmi) ControlPlaneSubscribe( + ctx context.Context, + subscriptionInfo SubscriptionInformation, + subInfoChannel chan SubscriptionInformation) error { if g.client == nil { return &customerrs.NilClientError{} } @@ -300,7 +301,10 @@ func (g *Gnmi) subscribe(ctx context.Context) error { // controlPlaneSubscribe calls gNMI subscribe with a callback for responses and additional network element information including // an option to stop the subscription. -func (g *Gnmi) controlPlaneSubscribe(ctx context.Context, subscriptionInfo tpInterface.SubscriptionInformation, subInfoChannel chan tpInterface.SubscriptionInformation) error { +func (g *Gnmi) controlPlaneSubscribe( + ctx context.Context, + subscriptionInfo SubscriptionInformation, + subInfoChannel chan SubscriptionInformation) error { ctx = gnmi.NewContext(ctx, g.config) opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions) if !ok { diff --git a/controller/nucleus/domain/model/networkElement.go b/controller/nucleus/domain/model/networkElement.go index 40552cc46f5656420ac8b99e64aa46adc0bbe934..ec4164815d43f029dac2b15e4718b1fb3e81fd1b 100644 --- a/controller/nucleus/domain/model/networkElement.go +++ b/controller/nucleus/domain/model/networkElement.go @@ -6,7 +6,6 @@ import ( tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport" "code.fbi.h-da.de/danet/gosdn/controller/conflict" - "code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport" "github.com/docker/docker/pkg/namesgenerator" "github.com/google/uuid" "github.com/openconfig/ygot/ygot" @@ -24,7 +23,7 @@ type NetworkElement interface { GetModel() ([]byte, error) GetPlugin() PluginIface GetModelAsFilteredCopy() ([]byte, error) - Transport() transport.Transport + Transport() Transport Name() string ProcessResponse(proto.Message) error IsTransportValid() bool @@ -120,7 +119,7 @@ type CommonNetworkElement struct { Plugin PluginIface // Transport is the network element's Transport implementation - transport transport.Transport + transport Transport // Name is the network element's human readable name name string @@ -150,7 +149,7 @@ func (n *CommonNetworkElement) GetModelAsFilteredCopy() ([]byte, error) { } // Transport returns the Transport of the network element. -func (n *CommonNetworkElement) Transport() transport.Transport { +func (n *CommonNetworkElement) Transport() Transport { return n.transport } @@ -170,7 +169,7 @@ func (n *CommonNetworkElement) Name() string { } // SetTransport sets the Network Element's Transport. -func (n *CommonNetworkElement) SetTransport(t transport.Transport) { +func (n *CommonNetworkElement) SetTransport(t Transport) { n.transport = t } @@ -230,7 +229,7 @@ func (n *CsbiNetworkElement) GetModelAsFilteredCopy() ([]byte, error) { } // Transport returns the Transport of the network element. -func (n *CsbiNetworkElement) Transport() transport.Transport { +func (n *CsbiNetworkElement) Transport() Transport { return n.transport } diff --git a/controller/nucleus/domain/model/transport.go b/controller/nucleus/domain/model/transport.go index 699289309755f350d0438bc0c1f3a6eb28ceb2c0..c8d776c3d0bb641e3d8221842bd932517ba3e182 100644 --- a/controller/nucleus/domain/model/transport.go +++ b/controller/nucleus/domain/model/transport.go @@ -1,15 +1,47 @@ package model import ( + "context" + tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport" "code.fbi.h-da.de/danet/gosdn/controller/customerrs" - "code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport" "code.fbi.h-da.de/danet/gosdn/controller/plugin/shared" + + gpb "github.com/openconfig/gnmi/proto/gnmi" +) + +// Transport provides an interface for Transport implementations +// like RESTCONF or gnmi. +type Transport interface { + Get(ctx context.Context, params ...string) (any, error) + Set(ctx context.Context, payload Payload) error + CustomSet(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse, error) + Subscribe(ctx context.Context, params ...string) error + ControlPlaneSubscribe(ctx context.Context, subscriptionInfo SubscriptionInformation, subInfoChannel chan SubscriptionInformation) error + Type() string + ProcessResponse(resp interface{}) error + ProcessControlPlaneSubscribeResponse(resp *gpb.SubscribeResponse_Update) error +} + +type ( + // HandleSubscribeResponse is the callback function to handle subcription responses. + HandleSubscribeResponse func(*gpb.SubscribeResponse, *SubscriptionInformation) ) +// SubscriptionInformation contains additional information used for internal subscriptions +// for distinguishing from which network element the information is from, to stop subscriptions and +// error handling. +type SubscriptionInformation struct { + PndID string + NetworkElementID string + NetworkElementName string + StopContext context.Context + SubResponse *gpb.SubscribeResponse +} + // NewTransport receives TransportOptions and returns an appropriate Transport // implementation. -func NewTransport(opts *tpb.TransportOption, model shared.DeviceModel) (transport.Transport, error) { +func NewTransport(opts *tpb.TransportOption, model shared.DeviceModel) (Transport, error) { if opts == nil { return nil, &customerrs.InvalidParametersError{ Func: NewTransport, diff --git a/controller/nucleus/networkElementWatcher.go b/controller/nucleus/networkElementWatcher.go index f2822883e0d2c0c9e8459c3e366f3489850c2ab3..2f33e56e871ebbf81c59c70621a9775093532b90 100644 --- a/controller/nucleus/networkElementWatcher.go +++ b/controller/nucleus/networkElementWatcher.go @@ -9,7 +9,6 @@ import ( "code.fbi.h-da.de/danet/gosdn/controller/customerrs" "code.fbi.h-da.de/danet/gosdn/controller/event" eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event" - "code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport" "code.fbi.h-da.de/danet/gosdn/controller/nucleus/domain/model" "code.fbi.h-da.de/danet/gosdn/controller/nucleus/domain/ports" "code.fbi.h-da.de/danet/gosdn/controller/nucleus/types" @@ -93,7 +92,7 @@ func (n *NetworkElementWatcher) callSubscribe(stopContext context.Context, mne m gNMIOptionsCtx := context.Background() gNMIOptionsCtx = context.WithValue(gNMIOptionsCtx, types.CtxKeyOpts, opts) - subInfoChan := make(chan transport.SubscriptionInformation, workerQueueSize) + subInfoChan := make(chan model.SubscriptionInformation, workerQueueSize) for i := 1; i <= numberWorkers; i++ { name := "Worker " + strconv.Itoa(i) @@ -104,7 +103,7 @@ func (n *NetworkElementWatcher) callSubscribe(stopContext context.Context, mne m // SubscriptionInformation contains pnd ID, network element ID and name to be used in the internal subscribe to check // from which network element a response was sent - if err := mne.Transport().ControlPlaneSubscribe(gNMIOptionsCtx, transport.SubscriptionInformation{ + if err := mne.Transport().ControlPlaneSubscribe(gNMIOptionsCtx, model.SubscriptionInformation{ PndID: mne.PndID().String(), NetworkElementID: mne.ID().String(), NetworkElementName: mne.Name(), @@ -136,7 +135,7 @@ func (n *NetworkElementWatcher) StopAndRemoveNetworkElementSubscription(subID uu // handleSubscribeResponse takes the subscribe response and additional information about the network element to distinguish // from which network element a subscribe response was sent including improved error handling. -func (n *NetworkElementWatcher) handleSubscribeResponse(subscriptionInfo *transport.SubscriptionInformation, workerName string) { +func (n *NetworkElementWatcher) handleSubscribeResponse(subscriptionInfo *model.SubscriptionInformation, workerName string) { if subscriptionInfo.SubResponse == nil { // Note: This needs proper error handling, no idea how yet. Simply logging would lead to spam in the console // if the target that was subscribed to is not reachable anymore. @@ -166,7 +165,7 @@ func (n *NetworkElementWatcher) handleSubscribeResponse(subscriptionInfo *transp } } -func (n *NetworkElementWatcher) handleSubscribeResponseUpdate(resp *gpb.SubscribeResponse_Update, subscriptionInfo *transport.SubscriptionInformation) { +func (n *NetworkElementWatcher) handleSubscribeResponseUpdate(resp *gpb.SubscribeResponse_Update, subscriptionInfo *model.SubscriptionInformation) { pathsAndValues := make(map[string]string, len(resp.Update.Update)) for _, update := range resp.Update.Update { diff --git a/controller/nucleus/subscriptionQueueHandler.go b/controller/nucleus/subscriptionQueueHandler.go index 28bcd73f13dfb9b5892c84edcd8793b25f7555cb..f634d627f8ff6eebb99826991f321aee4254d849 100644 --- a/controller/nucleus/subscriptionQueueHandler.go +++ b/controller/nucleus/subscriptionQueueHandler.go @@ -3,22 +3,22 @@ package nucleus import ( "time" - "code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport" + "code.fbi.h-da.de/danet/gosdn/controller/nucleus/domain/model" ) // SubscriptionQueueWorker is an interface to define a worker to handle the updates from gNMI Subscription from the NetWorkElementWatcher. type SubscriptionQueueWorker interface { - HandleGnmiSubscriptionUpdates(chan transport.SubscriptionInformation) + HandleGnmiSubscriptionUpdates(chan model.SubscriptionInformation) } // SubscriptionQueueWorkerImpl implements the SubscriptionQueueWorker interface. This is used as a worker pool to handle gNMI subscription updates. type SubscriptionQueueWorkerImpl struct { WorkerName string - workFunc func(*transport.SubscriptionInformation, string) + workFunc func(*model.SubscriptionInformation, string) } // NewSubscriptionQueueWorker creates a new SubscriptionQueueWorker. -func NewSubscriptionQueueWorker(name string, workFunc func(*transport.SubscriptionInformation, string)) SubscriptionQueueWorker { +func NewSubscriptionQueueWorker(name string, workFunc func(*model.SubscriptionInformation, string)) SubscriptionQueueWorker { return &SubscriptionQueueWorkerImpl{ WorkerName: name, workFunc: workFunc, @@ -26,7 +26,7 @@ func NewSubscriptionQueueWorker(name string, workFunc func(*transport.Subscripti } // HandleGnmiSubscriptionUpdates handles assignment of tasks to free workers of the worker pool from SubscriptionQueueWorkerImpl. -func (s *SubscriptionQueueWorkerImpl) HandleGnmiSubscriptionUpdates(subInfoChan chan transport.SubscriptionInformation) { +func (s *SubscriptionQueueWorkerImpl) HandleGnmiSubscriptionUpdates(subInfoChan chan model.SubscriptionInformation) { // Note: Sleep was needed to prevent some issue with deadlocks. // Maybe this needs some additional investigation/improvements in the future for {