Newer
Older
package nucleus
import (
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
tpInterface "code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound"
ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
Fabian Seidl
committed
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
gpb "github.com/openconfig/gnmi/proto/gnmi"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
)
// Gnmi implements the Transport interface and provides an SBI with the
// possibility to access a gNMI endpoint.
SetNode func(schema *yang.Entry, root interface{}, path *gpb.Path, val interface{}, opts ...ytypes.SetNodeOpt) error
RespChan chan *gpb.SubscribeResponse
Unmarshal func([]byte, *gpb.Path, ygot.GoStruct, ...ytypes.UnmarshalOpt) error
// newGnmiTransport takes a struct of GnmiTransportOptions and returns a Gnmi
// transport based on the values of it.
// Do not call directly. Use NewTransport() instead.
func newGnmiTransport(opts *tpb.TransportOption, sbi southbound.SouthboundInterface) (*Gnmi, error) {
if opts == nil || sbi == nil {
Fabian Seidl
committed
return nil, &customerrs.InvalidParametersError{
Func: newGnmiTransport,
Param: "'opts' and 'sbi' can not be nil",
}
} else if opts.TransportOption == nil {
Fabian Seidl
committed
return nil, &customerrs.InvalidParametersError{
Func: newGnmiTransport,
Param: "'opts.TransportOption' can not be nil",
}
gnmiConfig := &gnmi.Config{
Addr: opts.Address,
Password: opts.Password,
Username: opts.Username,
TLS: opts.Tls,
Malte Bauch
committed
Encoding: gpb.Encoding_JSON_IETF,
Compression: opts.GetGnmiTransportOption().GetCompression(),
"target": opts.Address,
"tls": opts.Tls,
}).Info("building new gNMI transport")
SetNode: sbi.SetNode,
RespChan: make(chan *gpb.SubscribeResponse),
Unmarshal: sbi.Unmarshal,
Options: opts,
client: c,
config: gnmiConfig,
// Get takes a slice of gnmi paths, splits them and calls get for each one of them.
func (g *Gnmi) Get(ctx context.Context, params ...string) (interface{}, error) {
Fabian Seidl
committed
return nil, &customerrs.NilClientError{}
ctx = gnmi.NewContext(ctx, g.config)
paths := gnmi.SplitPaths(params)
return g.get(ctx, paths, "")
}
// Set takes a change.Payload struct.
func (g *Gnmi) Set(ctx context.Context, payload change.Payload, path string, schema *ytypes.Schema) error {
p, err := ygot.StringToStructuredPath(path)
if err != nil {
return err
}
Fabian Seidl
committed
return &customerrs.NilClientError{}
return g.applyDiff(ctx, payload, p, schema)
// isGNMINotificationEmpty checks if the given gnmi.Notification does not
// contain any updates or deletes.
func isGNMINotificationEmpty(n *gpb.Notification) bool {
if n.Update == nil || len(n.Update) == 0 {
if n.Delete == nil || len(n.Delete) == 0 {
return true
}
}
return false
}
func (g *Gnmi) applyDiff(ctx context.Context, payload change.Payload, path *gpb.Path, schema *ytypes.Schema) error {
diff, err := ygot.Diff(payload.Original, payload.Modified)
if isGNMINotificationEmpty(diff) {
Fabian Seidl
committed
return customerrs.NoNewChangesError{Original: payload.Original, Modified: payload.Modified}
req, err := createSetRequest(ctx, diff)
if err != nil {
return err
}
resp, err := g.client.Set(ctx, req)
log.Info(resp)
return err
}
func createSetRequest(ctx context.Context, diff *gpb.Notification) (*gpb.SetRequest, error) {
op := ctx.Value(types.CtxKeyOperation)
req := &gpb.SetRequest{}
if diff.Update != nil {
switch op {
req.Update = diff.Update
req.Replace = diff.Update
Fabian Seidl
committed
return nil, &customerrs.OperationNotSupportedError{Op: op}
}
} else if diff.Delete != nil {
req.Delete = diff.Delete
}
return req, nil
func (g *Gnmi) Subscribe(ctx context.Context, params ...string) error {
Fabian Seidl
committed
return &customerrs.NilClientError{}
// ControlPlaneSubscribe is used to subscribe to devices from within the controller. gNMI SubscribeOptions need to be provided in the context,
// the callback function handles the responses received from the subscription.
func (g *Gnmi) ControlPlaneSubscribe(ctx context.Context, subscribeCallbackFunc tpInterface.HandleSubscribeResponse, subscriptionInfo *tpInterface.SubscriptionInformation) error {
if g.client == nil {
Fabian Seidl
committed
return &customerrs.NilClientError{}
}
return g.controlPlaneSubscribe(ctx, subscribeCallbackFunc, subscriptionInfo)
}
// Type returns the gNMI transport type.
func (g *Gnmi) Type() string {
// ProcessResponse takes a gNMI response and serializes the contents to the
// root struct. It logs all errors and returns an error containing the number
// off errors encountered during the process.
func (g *Gnmi) ProcessResponse(resp interface{}, root interface{}, s *ytypes.Schema) error {
d, ok := root.(ygot.ValidatedGoStruct)
if !ok {
Fabian Seidl
committed
return &customerrs.InvalidTypeAssertionError{
Value: root,
Type: (*ygot.ValidatedGoStruct)(nil),
}
}
r, ok := resp.(*gpb.GetResponse)
if !ok {
Fabian Seidl
committed
return &customerrs.InvalidTypeAssertionError{
Value: resp,
Type: &gpb.GetResponse{},
}
if err := g.processResponseUpdates(msg.Update, d, root.(ygot.ValidatedGoStruct), s); err != nil {
return err
}
}
return nil
}
// ProcessControlPlaneSubscribeResponse processes the gNMI notification within the subscribe response, updating the provided network element model.
func (g *Gnmi) ProcessControlPlaneSubscribeResponse(resp *gpb.SubscribeResponse_Update, root any, schema *ytypes.Schema) error {
dModel, ok := root.(ygot.ValidatedGoStruct)
if !ok {
Fabian Seidl
committed
return &customerrs.InvalidTypeAssertionError{
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
Value: root,
Type: (*ygot.ValidatedGoStruct)(nil),
}
}
notification := resp.Update
if len(notification.Update) > 0 {
if err := g.processResponseUpdates(notification.Update, dModel, root.(ygot.ValidatedGoStruct), schema); err != nil {
return err
}
}
if len(notification.Delete) > 0 {
if err := g.processResponseDeletes(notification.Delete, dModel, schema); err != nil {
return err
}
}
return nil
}
func (g *Gnmi) processResponseUpdates(updates []*gpb.Update, deviceModel ygot.ValidatedGoStruct, root ygot.ValidatedGoStruct, s *ytypes.Schema) error {
errs := make([]error, 0)
for _, update := range updates {
path := update.Path
switch val := update.Val.Value.(type) {
case *gpb.TypedValue_JsonVal:
opts := []ytypes.UnmarshalOpt{&ytypes.IgnoreExtraFields{}}
if err := g.Unmarshal(val.JsonVal, path, deviceModel, opts...); err != nil {
errs = append(errs, err)
}
case *gpb.TypedValue_JsonIetfVal:
opts := []ytypes.UnmarshalOpt{&ytypes.IgnoreExtraFields{}}
if err := g.Unmarshal(val.JsonIetfVal, path, deviceModel, opts...); err != nil {
errs = append(errs, err)
}
default:
schema := s.RootSchema()
opts := []ytypes.SetNodeOpt{&ytypes.InitMissingElements{}, &ytypes.TolerateJSONInconsistencies{}}
if err := g.SetNode(schema, root, update.Path, update.Val, opts...); err != nil {
errs = append(errs, err)
if len(errs) != 0 {
return handleProcessResponseErrors(errs)
}
return nil
}
func handleProcessResponseErrors(errs []error) error {
for _, e := range errs {
log.Error(e)
}
return fmt.Errorf("encountered %v errors during response processing\n%v", len(errs), errs)
}
func (g *Gnmi) processResponseDeletes(deletes []*gpb.Path, deviceModel ygot.ValidatedGoStruct, rootSchema *ytypes.Schema) error {
if err := ytypes.DeleteNode(rootSchema.RootSchema(), deviceModel, deletes[0]); err != nil {
return err
}
modelAsString, _ := ygot.EmitJSON(deviceModel, &ygot.EmitJSONConfig{
Format: ygot.RFC7951,
Indent: "",
SkipValidation: true,
RFC7951Config: &ygot.RFC7951JSONConfig{
AppendModuleName: true,
}})
rootPath, err := ygot.StringToStructuredPath("/")
if err != nil {
return err
if err := g.Unmarshal([]byte(modelAsString), rootPath, deviceModel); err != nil {
return err
}
// Capabilities calls GNMI capabilities.
func (g *Gnmi) Capabilities(ctx context.Context) (interface{}, error) {
}).Info("sending gNMI capabilities request")
ctx = gnmi.NewContext(ctx, g.config)
ctx = context.WithValue(ctx, types.CtxKeyConfig, g.config) //nolint
resp, err := g.client.Capabilities(ctx, &gpb.CapabilityRequest{})
if err != nil {
return nil, err
}
return resp, nil
}
// get calls GNMI get.
func (g *Gnmi) get(ctx context.Context, paths [][]string, origin string) (interface{}, error) {
ctx = context.WithValue(ctx, types.CtxKeyConfig, g.config) //nolint
req, err := gnmi.NewGetRequest(ctx, paths, origin)
if err != nil {
return nil, err
}
return g.getWithRequest(ctx, req)
}
// getWithRequest takes a fully formed GetRequest, performs the Get,
// and returns any response.
func (g *Gnmi) getWithRequest(ctx context.Context, req *gpb.GetRequest) (interface{}, error) {
Fabian Seidl
committed
return nil, &customerrs.NilError{}
"path": req.Path,
}).Info("sending gNMI get request")
resp, err := g.client.Get(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
}
// subscribe calls GNMI subscribe.
func (g *Gnmi) subscribe(ctx context.Context) error {
opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
Fabian Seidl
committed
return &customerrs.InvalidTypeAssertionError{
Value: ctx.Value(types.CtxKeyOpts),
Type: &gnmi.SubscribeOptions{},
"address": opts.Target,
"paths": opts.Paths,
"mode": opts.Mode,
"interval": opts.SampleInterval,
}).Info("subscribed to gNMI target")
for {
resp := <-g.RespChan
if resp != nil {
if err := gnmi.LogSubscribeResponse(resp); err != nil {
log.Fatal(err)
}
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
}
// controlPlaneSubscribe calls gNMI subscribe with a callback for responses and additional network element information including
// an option to stop the subscription.
func (g *Gnmi) controlPlaneSubscribe(ctx context.Context, subcribeCallbackFunc func(*gpb.SubscribeResponse,
*tpInterface.SubscriptionInformation), subscriptionInfo *tpInterface.SubscriptionInformation) error {
ctx = gnmi.NewContext(ctx, g.config)
opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
if !ok {
Fabian Seidl
committed
return &customerrs.InvalidTypeAssertionError{
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
Value: ctx.Value(types.CtxKeyOpts),
Type: &gnmi.SubscribeOptions{},
}
}
go func() {
log.WithFields(log.Fields{
"address": opts.Target,
"paths": opts.Paths,
"mode": opts.Mode,
"interval": opts.SampleInterval,
}).Info("subscribed to gNMI target")
for {
resp := <-g.RespChan
if resp != nil {
// callback to trigger internal event handling process
go subcribeCallbackFunc(resp, subscriptionInfo)
}
select {
case <-subscriptionInfo.StopContext.Done():
if err := subscriptionInfo.StopContext.Err(); err != nil {
log.Error(err)
}
return
default:
}
}
}()
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
}
// Close calls GNMI close.
func (g *Gnmi) Close() error {
return nil
}
// CustomSet allows to build a custom set request.
func (g *Gnmi) CustomSet(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse, error) {
ctx = gnmi.NewContext(ctx, g.config)
return g.client.Set(ctx, req)
}
// SetPassthrough allows to pass an existing SetRequest. Used for cSBI.
func (g *Gnmi) SetPassthrough(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse, error) {
return g.client.Set(ctx, req)
}
// GetPassthrough allows to pass an existing GetRequest. Used for cSBI.
func (g *Gnmi) GetPassthrough(ctx context.Context, req *gpb.GetRequest) (*gpb.GetResponse, error) {
return g.client.Get(ctx, req)
}