Skip to content
Snippets Groups Projects
Verified Commit 85f6b5e2 authored by André Sterba's avatar André Sterba
Browse files

Remove transport from interfaces folder

parent 2df8d4de
No related branches found
No related tags found
1 merge request!557Draft: Refactor nucleus to remove hard dependencies on structs
Pipeline #163287 failed
......@@ -24,11 +24,3 @@ type Change interface {
Diff() *gnmi.Notification
AssociatedDeviceID() uuid.UUID
}
// Payload contains two ygot.GoStructs, the first represents the original state
// before the change was applied and the second repesents the modified state.
type Payload struct {
Original []byte
Modified []byte
Diff *gnmi.Notification
}
......@@ -3,17 +3,16 @@ package networkelement
import (
"context"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
"code.fbi.h-da.de/danet/gosdn/controller/conflict"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/plugin"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/domain/model"
"github.com/google/uuid"
gpb "github.com/openconfig/gnmi/proto/gnmi"
"github.com/openconfig/ygot/ygot"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
)
// NetworkElement represents an Managed Network Element (MNE) which is managed by
......@@ -23,7 +22,7 @@ type NetworkElement interface {
GetModel() ([]byte, error)
GetPlugin() plugin.Plugin
GetModelAsFilteredCopy() ([]byte, error)
Transport() transport.Transport
Transport() model.Transport
Name() string
ProcessResponse(proto.Message) error
IsTransportValid() bool
......@@ -72,7 +71,7 @@ type LoadedNetworkElement struct {
// EnsureIntendedConfigurationIsAppliedOnNetworkElement pushes the stored
// configuration to a network element.
// TODO: find a better place for this function.
func EnsureIntendedConfigurationIsAppliedOnNetworkElement(mne model.NetworkElement) error {
func EnsureIntendedConfigurationIsAppliedOnNetworkElement(mne NetworkElement) error {
model, err := mne.GetModelAsFilteredCopy()
if err != nil {
return err
......
package transport
import (
"context"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
gpb "github.com/openconfig/gnmi/proto/gnmi"
)
// Transport provides an interface for Transport implementations
// like RESTCONF or gnmi.
type Transport interface {
Get(ctx context.Context, params ...string) (any, error)
Set(ctx context.Context, payload change.Payload) error
CustomSet(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse, error)
Subscribe(ctx context.Context, params ...string) error
ControlPlaneSubscribe(ctx context.Context, subscriptionInfo SubscriptionInformation, subInfoChannel chan SubscriptionInformation) error
Type() string
ProcessResponse(resp interface{}) error
ProcessControlPlaneSubscribeResponse(resp *gpb.SubscribeResponse_Update) error
}
type (
// HandleSubscribeResponse is the callback function to handle subcription responses.
HandleSubscribeResponse func(*gpb.SubscribeResponse, *SubscriptionInformation)
)
// SubscriptionInformation contains additional information used for internal subscriptions
// for distinguishing from which network element the information is from, to stop subscriptions and
// error handling.
type SubscriptionInformation struct {
PndID string
NetworkElementID string
NetworkElementName string
StopContext context.Context
SubResponse *gpb.SubscribeResponse
}
......@@ -11,7 +11,6 @@ import (
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
"code.fbi.h-da.de/danet/gosdn/controller/conflict"
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
"code.fbi.h-da.de/danet/gosdn/controller/metrics"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/domain/model"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/domain/ports"
......@@ -946,7 +945,7 @@ func (n *NetworkElementServer) ChangeMNE(duid uuid.UUID, operation mnepb.ApiOper
callback := func(original, modified []byte) error {
ctx := context.WithValue(context.Background(), types.CtxKeyOperation, operation) // nolint
payload := change.Payload{Original: original, Modified: modified, Diff: diff}
payload := model.Payload{Original: original, Modified: modified, Diff: diff}
return mne.Transport().Set(ctx, payload)
}
......
......@@ -223,3 +223,11 @@ func stateManager(ctx context.Context, ch *Change, timeout time.Duration) (chan<
}()
return stateIn, stateOut, errChan
}
// Payload contains two ygot.GoStructs, the first represents the original state
// before the change was applied and the second repesents the modified state.
type Payload struct {
Original []byte
Modified []byte
Diff *gnmi.Notification
}
......@@ -7,8 +7,6 @@ import (
"time"
"code.fbi.h-da.de/danet/gosdn/controller/config"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
tpInterface "code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
"code.fbi.h-da.de/danet/gosdn/controller/plugin/shared"
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
......@@ -96,7 +94,7 @@ func (g *Gnmi) Get(ctx context.Context, params ...string) (interface{}, error) {
}
// Set takes a change.Payload struct.
func (g *Gnmi) Set(ctx context.Context, payload change.Payload) error {
func (g *Gnmi) Set(ctx context.Context, payload Payload) error {
if g.client == nil {
return &customerrs.NilClientError{}
}
......@@ -105,7 +103,7 @@ func (g *Gnmi) Set(ctx context.Context, payload change.Payload) error {
return g.applyDiff(ctx, payload)
}
func (g *Gnmi) applyDiff(ctx context.Context, payload change.Payload) error {
func (g *Gnmi) applyDiff(ctx context.Context, payload Payload) error {
diff := payload.Diff
updates := diff.GetUpdate()
......@@ -136,7 +134,10 @@ func (g *Gnmi) Subscribe(ctx context.Context, params ...string) error {
// ControlPlaneSubscribe is used to subscribe to devices from within the controller. gNMI SubscribeOptions need to be provided in the context,
// the callback function handles the responses received from the subscription.
func (g *Gnmi) ControlPlaneSubscribe(ctx context.Context, subscriptionInfo tpInterface.SubscriptionInformation, subInfoChannel chan tpInterface.SubscriptionInformation) error {
func (g *Gnmi) ControlPlaneSubscribe(
ctx context.Context,
subscriptionInfo SubscriptionInformation,
subInfoChannel chan SubscriptionInformation) error {
if g.client == nil {
return &customerrs.NilClientError{}
}
......@@ -300,7 +301,10 @@ func (g *Gnmi) subscribe(ctx context.Context) error {
// controlPlaneSubscribe calls gNMI subscribe with a callback for responses and additional network element information including
// an option to stop the subscription.
func (g *Gnmi) controlPlaneSubscribe(ctx context.Context, subscriptionInfo tpInterface.SubscriptionInformation, subInfoChannel chan tpInterface.SubscriptionInformation) error {
func (g *Gnmi) controlPlaneSubscribe(
ctx context.Context,
subscriptionInfo SubscriptionInformation,
subInfoChannel chan SubscriptionInformation) error {
ctx = gnmi.NewContext(ctx, g.config)
opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
if !ok {
......
......@@ -6,7 +6,6 @@ import (
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
"code.fbi.h-da.de/danet/gosdn/controller/conflict"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
"github.com/docker/docker/pkg/namesgenerator"
"github.com/google/uuid"
"github.com/openconfig/ygot/ygot"
......@@ -24,7 +23,7 @@ type NetworkElement interface {
GetModel() ([]byte, error)
GetPlugin() PluginIface
GetModelAsFilteredCopy() ([]byte, error)
Transport() transport.Transport
Transport() Transport
Name() string
ProcessResponse(proto.Message) error
IsTransportValid() bool
......@@ -120,7 +119,7 @@ type CommonNetworkElement struct {
Plugin PluginIface
// Transport is the network element's Transport implementation
transport transport.Transport
transport Transport
// Name is the network element's human readable name
name string
......@@ -150,7 +149,7 @@ func (n *CommonNetworkElement) GetModelAsFilteredCopy() ([]byte, error) {
}
// Transport returns the Transport of the network element.
func (n *CommonNetworkElement) Transport() transport.Transport {
func (n *CommonNetworkElement) Transport() Transport {
return n.transport
}
......@@ -170,7 +169,7 @@ func (n *CommonNetworkElement) Name() string {
}
// SetTransport sets the Network Element's Transport.
func (n *CommonNetworkElement) SetTransport(t transport.Transport) {
func (n *CommonNetworkElement) SetTransport(t Transport) {
n.transport = t
}
......@@ -230,7 +229,7 @@ func (n *CsbiNetworkElement) GetModelAsFilteredCopy() ([]byte, error) {
}
// Transport returns the Transport of the network element.
func (n *CsbiNetworkElement) Transport() transport.Transport {
func (n *CsbiNetworkElement) Transport() Transport {
return n.transport
}
......
package model
import (
"context"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
"code.fbi.h-da.de/danet/gosdn/controller/plugin/shared"
gpb "github.com/openconfig/gnmi/proto/gnmi"
)
// Transport provides an interface for Transport implementations
// like RESTCONF or gnmi.
type Transport interface {
Get(ctx context.Context, params ...string) (any, error)
Set(ctx context.Context, payload Payload) error
CustomSet(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse, error)
Subscribe(ctx context.Context, params ...string) error
ControlPlaneSubscribe(ctx context.Context, subscriptionInfo SubscriptionInformation, subInfoChannel chan SubscriptionInformation) error
Type() string
ProcessResponse(resp interface{}) error
ProcessControlPlaneSubscribeResponse(resp *gpb.SubscribeResponse_Update) error
}
type (
// HandleSubscribeResponse is the callback function to handle subcription responses.
HandleSubscribeResponse func(*gpb.SubscribeResponse, *SubscriptionInformation)
)
// SubscriptionInformation contains additional information used for internal subscriptions
// for distinguishing from which network element the information is from, to stop subscriptions and
// error handling.
type SubscriptionInformation struct {
PndID string
NetworkElementID string
NetworkElementName string
StopContext context.Context
SubResponse *gpb.SubscribeResponse
}
// NewTransport receives TransportOptions and returns an appropriate Transport
// implementation.
func NewTransport(opts *tpb.TransportOption, model shared.DeviceModel) (transport.Transport, error) {
func NewTransport(opts *tpb.TransportOption, model shared.DeviceModel) (Transport, error) {
if opts == nil {
return nil, &customerrs.InvalidParametersError{
Func: NewTransport,
......
......@@ -9,7 +9,6 @@ import (
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/event"
eventInterfaces "code.fbi.h-da.de/danet/gosdn/controller/interfaces/event"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/domain/model"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/domain/ports"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
......@@ -93,7 +92,7 @@ func (n *NetworkElementWatcher) callSubscribe(stopContext context.Context, mne m
gNMIOptionsCtx := context.Background()
gNMIOptionsCtx = context.WithValue(gNMIOptionsCtx, types.CtxKeyOpts, opts)
subInfoChan := make(chan transport.SubscriptionInformation, workerQueueSize)
subInfoChan := make(chan model.SubscriptionInformation, workerQueueSize)
for i := 1; i <= numberWorkers; i++ {
name := "Worker " + strconv.Itoa(i)
......@@ -104,7 +103,7 @@ func (n *NetworkElementWatcher) callSubscribe(stopContext context.Context, mne m
// SubscriptionInformation contains pnd ID, network element ID and name to be used in the internal subscribe to check
// from which network element a response was sent
if err := mne.Transport().ControlPlaneSubscribe(gNMIOptionsCtx, transport.SubscriptionInformation{
if err := mne.Transport().ControlPlaneSubscribe(gNMIOptionsCtx, model.SubscriptionInformation{
PndID: mne.PndID().String(),
NetworkElementID: mne.ID().String(),
NetworkElementName: mne.Name(),
......@@ -136,7 +135,7 @@ func (n *NetworkElementWatcher) StopAndRemoveNetworkElementSubscription(subID uu
// handleSubscribeResponse takes the subscribe response and additional information about the network element to distinguish
// from which network element a subscribe response was sent including improved error handling.
func (n *NetworkElementWatcher) handleSubscribeResponse(subscriptionInfo *transport.SubscriptionInformation, workerName string) {
func (n *NetworkElementWatcher) handleSubscribeResponse(subscriptionInfo *model.SubscriptionInformation, workerName string) {
if subscriptionInfo.SubResponse == nil {
// Note: This needs proper error handling, no idea how yet. Simply logging would lead to spam in the console
// if the target that was subscribed to is not reachable anymore.
......@@ -166,7 +165,7 @@ func (n *NetworkElementWatcher) handleSubscribeResponse(subscriptionInfo *transp
}
}
func (n *NetworkElementWatcher) handleSubscribeResponseUpdate(resp *gpb.SubscribeResponse_Update, subscriptionInfo *transport.SubscriptionInformation) {
func (n *NetworkElementWatcher) handleSubscribeResponseUpdate(resp *gpb.SubscribeResponse_Update, subscriptionInfo *model.SubscriptionInformation) {
pathsAndValues := make(map[string]string, len(resp.Update.Update))
for _, update := range resp.Update.Update {
......
......@@ -3,22 +3,22 @@ package nucleus
import (
"time"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/domain/model"
)
// SubscriptionQueueWorker is an interface to define a worker to handle the updates from gNMI Subscription from the NetWorkElementWatcher.
type SubscriptionQueueWorker interface {
HandleGnmiSubscriptionUpdates(chan transport.SubscriptionInformation)
HandleGnmiSubscriptionUpdates(chan model.SubscriptionInformation)
}
// SubscriptionQueueWorkerImpl implements the SubscriptionQueueWorker interface. This is used as a worker pool to handle gNMI subscription updates.
type SubscriptionQueueWorkerImpl struct {
WorkerName string
workFunc func(*transport.SubscriptionInformation, string)
workFunc func(*model.SubscriptionInformation, string)
}
// NewSubscriptionQueueWorker creates a new SubscriptionQueueWorker.
func NewSubscriptionQueueWorker(name string, workFunc func(*transport.SubscriptionInformation, string)) SubscriptionQueueWorker {
func NewSubscriptionQueueWorker(name string, workFunc func(*model.SubscriptionInformation, string)) SubscriptionQueueWorker {
return &SubscriptionQueueWorkerImpl{
WorkerName: name,
workFunc: workFunc,
......@@ -26,7 +26,7 @@ func NewSubscriptionQueueWorker(name string, workFunc func(*transport.Subscripti
}
// HandleGnmiSubscriptionUpdates handles assignment of tasks to free workers of the worker pool from SubscriptionQueueWorkerImpl.
func (s *SubscriptionQueueWorkerImpl) HandleGnmiSubscriptionUpdates(subInfoChan chan transport.SubscriptionInformation) {
func (s *SubscriptionQueueWorkerImpl) HandleGnmiSubscriptionUpdates(subInfoChan chan model.SubscriptionInformation) {
// Note: Sleep was needed to prevent some issue with deadlocks.
// Maybe this needs some additional investigation/improvements in the future
for {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment