Newer
Older
package nucleus
import (
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
tpInterface "code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound"
ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
Fabian Seidl
committed
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
gpb "github.com/openconfig/gnmi/proto/gnmi"
"github.com/openconfig/ygot/util"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
)
// Gnmi implements the Transport interface and provides an SBI with the
// possibility to access a gNMI endpoint.
SetNode func(schema *yang.Entry, root interface{}, path *gpb.Path, val interface{}, opts ...ytypes.SetNodeOpt) error
RespChan chan *gpb.SubscribeResponse
Unmarshal func([]byte, *gpb.Path, ygot.GoStruct, ...ytypes.UnmarshalOpt) error
// newGnmiTransport takes a struct of GnmiTransportOptions and returns a Gnmi
// transport based on the values of it.
// Do not call directly. Use NewTransport() instead.
func newGnmiTransport(opts *tpb.TransportOption, sbi southbound.SouthboundInterface) (*Gnmi, error) {
if opts == nil || sbi == nil {
Fabian Seidl
committed
return nil, &customerrs.InvalidParametersError{
Func: newGnmiTransport,
Param: "'opts' and 'sbi' can not be nil",
}
} else if opts.TransportOption == nil {
Fabian Seidl
committed
return nil, &customerrs.InvalidParametersError{
Func: newGnmiTransport,
Param: "'opts.TransportOption' can not be nil",
}
gnmiConfig := &gnmi.Config{
Addr: opts.Address,
Password: opts.Password,
Username: opts.Username,
TLS: opts.Tls,
Malte Bauch
committed
Encoding: gpb.Encoding_JSON_IETF,
Compression: opts.GetGnmiTransportOption().GetCompression(),
"target": opts.Address,
"tls": opts.Tls,
}).Info("building new gNMI transport")
SetNode: sbi.SetNode,
RespChan: make(chan *gpb.SubscribeResponse),
Unmarshal: sbi.Unmarshal,
Options: opts,
client: c,
config: gnmiConfig,
// Get takes a slice of gnmi paths, splits them and calls get for each one of them.
func (g *Gnmi) Get(ctx context.Context, params ...string) (interface{}, error) {
Fabian Seidl
committed
return nil, &customerrs.NilClientError{}
ctx = gnmi.NewContext(ctx, g.config)
paths := gnmi.SplitPaths(params)
return g.get(ctx, paths, "")
}
// Set takes a change.Payload struct.
func (g *Gnmi) Set(ctx context.Context, payload change.Payload, path string, schema *ytypes.Schema) error {
p, err := ygot.StringToStructuredPath(path)
if err != nil {
return err
}
Fabian Seidl
committed
return &customerrs.NilClientError{}
return g.applyDiff(ctx, payload, p, schema)
// isGNMINotificationEmpty checks if the given gnmi.Notification does not
// contain any updates or deletes.
func isGNMINotificationEmpty(n *gpb.Notification) bool {
if n.Update == nil || len(n.Update) == 0 {
if n.Delete == nil || len(n.Delete) == 0 {
return true
}
}
return false
}
func (g *Gnmi) applyDiff(ctx context.Context, payload change.Payload, path *gpb.Path, schema *ytypes.Schema) error {
diff, err := ygot.Diff(payload.Original, payload.Modified)
if isGNMINotificationEmpty(diff) {
Fabian Seidl
committed
return customerrs.NoNewChangesError{Original: payload.Original, Modified: payload.Modified}
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
}
var json []byte
if op := ctx.Value(types.CtxKeyOperation); op == ppb.ApiOperation_API_OPERATION_UPDATE || op == ppb.ApiOperation_API_OPERATION_REPLACE {
rootCopy, err := ygot.DeepCopy(schema.Root)
if err != nil {
return err
}
for _, u := range diff.Update {
opts := []ytypes.SetNodeOpt{&ytypes.InitMissingElements{}, &ytypes.TolerateJSONInconsistencies{}}
if err := g.SetNode(schema.RootSchema(), rootCopy, u.GetPath(), u.GetVal(), opts...); err != nil {
return err
}
}
ygot.PruneEmptyBranches(rootCopy)
opts := []ytypes.GetNodeOpt{
&ytypes.GetHandleWildcards{},
}
nodes, err := ytypes.GetNode(schema.RootSchema(), rootCopy, path, opts...)
if err != nil {
return err
}
if len(nodes) == 0 || err != nil || util.IsValueNil(nodes[0].Data) {
Fabian Seidl
committed
return customerrs.PathNotFoundError{Path: path, Err: err}
}
json, err = ygot.Marshal7951(nodes[0].Data, &ygot.RFC7951JSONConfig{AppendModuleName: true})
if err != nil {
return err
}
}
req, err := createSetRequest(ctx, diff, json, path)
if err != nil {
return err
}
resp, err := g.client.Set(ctx, req)
log.Info(resp)
return err
}
func createSetRequest(ctx context.Context, diff *gpb.Notification, json []byte, path *gpb.Path) (*gpb.SetRequest, error) {
op := ctx.Value(types.CtxKeyOperation)
req := &gpb.SetRequest{}
if diff.Update != nil {
switch op {
req.Update = []*gpb.Update{{
Path: path,
Val: &gpb.TypedValue{
Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: json},
},
}}
req.Replace = []*gpb.Update{{
Path: path,
Val: &gpb.TypedValue{
Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: json},
},
}}
Fabian Seidl
committed
return nil, &customerrs.OperationNotSupportedError{Op: op}
}
} else if diff.Delete != nil {
req.Delete = diff.Delete
}
return req, nil
//Subscribe subscribes to a gNMI target.
func (g *Gnmi) Subscribe(ctx context.Context, params ...string) error {
Fabian Seidl
committed
return &customerrs.NilClientError{}
// ControlPlaneSubscribe is used to subscribe to devices from within the controller. gNMI SubscribeOptions need to be provided in the context,
// the callback function handles the responses received from the subscription.
func (g *Gnmi) ControlPlaneSubscribe(ctx context.Context, subscribeCallbackFunc tpInterface.HandleSubscribeResponse, subscriptionInfo *tpInterface.SubscriptionInformation) error {
if g.client == nil {
Fabian Seidl
committed
return &customerrs.NilClientError{}
}
return g.controlPlaneSubscribe(ctx, subscribeCallbackFunc, subscriptionInfo)
}
// Type returns the gNMI transport type.
func (g *Gnmi) Type() string {
// ProcessResponse takes a gNMI response and serializes the contents to the
// root struct. It logs all errors and returns an error containing the number
// off errors encountered during the process.
func (g *Gnmi) ProcessResponse(resp interface{}, root interface{}, s *ytypes.Schema) error {
d, ok := root.(ygot.ValidatedGoStruct)
if !ok {
Fabian Seidl
committed
return &customerrs.InvalidTypeAssertionError{
Value: root,
Type: (*ygot.ValidatedGoStruct)(nil),
}
}
r, ok := resp.(*gpb.GetResponse)
if !ok {
Fabian Seidl
committed
return &customerrs.InvalidTypeAssertionError{
Value: resp,
Type: &gpb.GetResponse{},
}
if err := g.processResponseUpdates(msg.Update, d, root.(ygot.ValidatedGoStruct), s); err != nil {
return err
}
}
return nil
}
// ProcessControlPlaneSubscribeResponse processes the gNMI notification within the subscribe response, updating the provided device model.
func (g *Gnmi) ProcessControlPlaneSubscribeResponse(resp *gpb.SubscribeResponse_Update, root any, schema *ytypes.Schema) error {
dModel, ok := root.(ygot.ValidatedGoStruct)
if !ok {
Fabian Seidl
committed
return &customerrs.InvalidTypeAssertionError{
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
Value: root,
Type: (*ygot.ValidatedGoStruct)(nil),
}
}
notification := resp.Update
if len(notification.Update) > 0 {
if err := g.processResponseUpdates(notification.Update, dModel, root.(ygot.ValidatedGoStruct), schema); err != nil {
return err
}
}
if len(notification.Delete) > 0 {
if err := g.processResponseDeletes(notification.Delete, dModel, schema); err != nil {
return err
}
}
return nil
}
func (g *Gnmi) processResponseUpdates(updates []*gpb.Update, deviceModel ygot.ValidatedGoStruct, root ygot.ValidatedGoStruct, s *ytypes.Schema) error {
errs := make([]error, 0)
for _, update := range updates {
path := update.Path
switch val := update.Val.Value.(type) {
case *gpb.TypedValue_JsonVal:
opts := []ytypes.UnmarshalOpt{&ytypes.IgnoreExtraFields{}}
if err := g.Unmarshal(val.JsonVal, path, deviceModel, opts...); err != nil {
errs = append(errs, err)
}
case *gpb.TypedValue_JsonIetfVal:
opts := []ytypes.UnmarshalOpt{&ytypes.IgnoreExtraFields{}}
if err := g.Unmarshal(val.JsonIetfVal, path, deviceModel, opts...); err != nil {
errs = append(errs, err)
}
default:
schema := s.RootSchema()
opts := []ytypes.SetNodeOpt{&ytypes.InitMissingElements{}, &ytypes.TolerateJSONInconsistencies{}}
if err := g.SetNode(schema, root, update.Path, update.Val, opts...); err != nil {
errs = append(errs, err)
if len(errs) != 0 {
return handleProcessResponseErrors(errs)
}
return nil
}
func handleProcessResponseErrors(errs []error) error {
for _, e := range errs {
log.Error(e)
}
return fmt.Errorf("encountered %v errors during response processing\n%v", len(errs), errs)
}
func (g *Gnmi) processResponseDeletes(deletes []*gpb.Path, deviceModel ygot.ValidatedGoStruct, rootSchema *ytypes.Schema) error {
if err := ytypes.DeleteNode(rootSchema.RootSchema(), deviceModel, deletes[0]); err != nil {
return err
}
modelAsString, _ := ygot.EmitJSON(deviceModel, &ygot.EmitJSONConfig{
Format: ygot.RFC7951,
Indent: "",
SkipValidation: true,
RFC7951Config: &ygot.RFC7951JSONConfig{
AppendModuleName: true,
}})
rootPath, err := ygot.StringToStructuredPath("/")
if err != nil {
return err
if err := g.Unmarshal([]byte(modelAsString), rootPath, deviceModel); err != nil {
return err
}
// Capabilities calls GNMI capabilities.
func (g *Gnmi) Capabilities(ctx context.Context) (interface{}, error) {
}).Info("sending gNMI capabilities request")
ctx = gnmi.NewContext(ctx, g.config)
ctx = context.WithValue(ctx, types.CtxKeyConfig, g.config) //nolint
resp, err := g.client.Capabilities(ctx, &gpb.CapabilityRequest{})
if err != nil {
return nil, err
}
return resp, nil
}
// get calls GNMI get.
func (g *Gnmi) get(ctx context.Context, paths [][]string, origin string) (interface{}, error) {
ctx = context.WithValue(ctx, types.CtxKeyConfig, g.config) //nolint
req, err := gnmi.NewGetRequest(ctx, paths, origin)
if err != nil {
return nil, err
}
return g.getWithRequest(ctx, req)
}
// getWithRequest takes a fully formed GetRequest, performs the Get,
// and returns any response.
func (g *Gnmi) getWithRequest(ctx context.Context, req *gpb.GetRequest) (interface{}, error) {
Fabian Seidl
committed
return nil, &customerrs.NilError{}
"path": req.Path,
}).Info("sending gNMI get request")
resp, err := g.client.Get(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
}
// subscribe calls GNMI subscribe.
func (g *Gnmi) subscribe(ctx context.Context) error {
opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
Fabian Seidl
committed
return &customerrs.InvalidTypeAssertionError{
Value: ctx.Value(types.CtxKeyOpts),
Type: &gnmi.SubscribeOptions{},
"address": opts.Target,
"paths": opts.Paths,
"mode": opts.Mode,
"interval": opts.SampleInterval,
}).Info("subscribed to gNMI target")
for {
resp := <-g.RespChan
if resp != nil {
if err := gnmi.LogSubscribeResponse(resp); err != nil {
log.Fatal(err)
}
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
}
// controlPlaneSubscribe calls gNMI subscribe with a callback for responses and additional device information including
// an option to stop the subscription.
func (g *Gnmi) controlPlaneSubscribe(ctx context.Context, subcribeCallbackFunc func(*gpb.SubscribeResponse,
*tpInterface.SubscriptionInformation), subscriptionInfo *tpInterface.SubscriptionInformation) error {
ctx = gnmi.NewContext(ctx, g.config)
opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
if !ok {
Fabian Seidl
committed
return &customerrs.InvalidTypeAssertionError{
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
Value: ctx.Value(types.CtxKeyOpts),
Type: &gnmi.SubscribeOptions{},
}
}
go func() {
log.WithFields(log.Fields{
"address": opts.Target,
"paths": opts.Paths,
"mode": opts.Mode,
"interval": opts.SampleInterval,
}).Info("subscribed to gNMI target")
for {
resp := <-g.RespChan
if resp != nil {
// callback to trigger internal event handling process
go subcribeCallbackFunc(resp, subscriptionInfo)
}
select {
case <-subscriptionInfo.StopContext.Done():
if err := subscriptionInfo.StopContext.Err(); err != nil {
log.Error(err)
}
return
default:
}
}
}()
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
}
// Close calls GNMI close.
func (g *Gnmi) Close() error {
return nil
}
// CustomSet allows to build a custom set request.
func (g *Gnmi) CustomSet(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse, error) {
ctx = gnmi.NewContext(ctx, g.config)
return g.client.Set(ctx, req)
}
// SetPassthrough allows to pass an existing SetRequest. Used for cSBI.
func (g *Gnmi) SetPassthrough(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse, error) {
return g.client.Set(ctx, req)
}
// GetPassthrough allows to pass an existing GetRequest. Used for cSBI.
func (g *Gnmi) GetPassthrough(ctx context.Context, req *gpb.GetRequest) (*gpb.GetResponse, error) {
return g.client.Get(ctx, req)
}