package nucleus import ( "context" "fmt" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/change" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound" ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd" "code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors" "code.fbi.h-da.de/danet/gosdn/controller/nucleus/types" "code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi" gpb "github.com/openconfig/gnmi/proto/gnmi" "github.com/openconfig/goyang/pkg/yang" "github.com/openconfig/ygot/util" "github.com/openconfig/ygot/ygot" "github.com/openconfig/ygot/ytypes" log "github.com/sirupsen/logrus" tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport" ) // Gnmi implements the Transport interface and provides an SBI with the // possibility to access a gNMI endpoint. type Gnmi struct { SetNode func(schema *yang.Entry, root interface{}, path *gpb.Path, val interface{}, opts ...ytypes.SetNodeOpt) error RespChan chan *gpb.SubscribeResponse Unmarshal func([]byte, *gpb.Path, ygot.GoStruct, ...ytypes.UnmarshalOpt) error Options *tpb.TransportOption client gpb.GNMIClient config *gnmi.Config } // newGnmiTransport takes a struct of GnmiTransportOptions and returns a Gnmi // transport based on the values of it. // Do not call directly. Use NewTransport() instead. func newGnmiTransport(opts *tpb.TransportOption, sbi southbound.SouthboundInterface) (*Gnmi, error) { if opts == nil || sbi == nil { return nil, &errors.ErrInvalidParameters{ Func: newGnmiTransport, Param: "'opts' and 'sbi' can not be nil", } } else if opts.TransportOption == nil { return nil, &errors.ErrInvalidParameters{ Func: newGnmiTransport, Param: "'opts.TransportOption' can not be nil", } } gnmiConfig := &gnmi.Config{ Addr: opts.Address, Password: opts.Password, Username: opts.Username, TLS: opts.Tls, Encoding: gpb.Encoding_JSON_IETF, Compression: opts.GetGnmiTransportOption().GetCompression(), } c, err := gnmi.Dial(gnmiConfig) if err != nil { return nil, err } log.WithFields(log.Fields{ "target": opts.Address, "tls": opts.Tls, }).Info("building new gNMI transport") return &Gnmi{ SetNode: sbi.SetNode, RespChan: make(chan *gpb.SubscribeResponse), Unmarshal: sbi.Unmarshal, Options: opts, client: c, config: gnmiConfig, }, nil } // Get takes a slice of gnmi paths, splits them and calls get for each one of them. func (g *Gnmi) Get(ctx context.Context, params ...string) (interface{}, error) { if g.client == nil { return nil, &errors.ErrNilClient{} } ctx = gnmi.NewContext(ctx, g.config) paths := gnmi.SplitPaths(params) return g.get(ctx, paths, "") } // Set takes a change.Payload struct. func (g *Gnmi) Set(ctx context.Context, payload change.Payload, path string, schema *ytypes.Schema) error { p, err := ygot.StringToStructuredPath(path) if err != nil { return err } if g.client == nil { return &errors.ErrNilClient{} } ctx = gnmi.NewContext(ctx, g.config) return g.applyDiff(ctx, payload, p, schema) } // isGNMINotificationEmpty checks if the given gnmi.Notification does not // contain any updates or deletes. func isGNMINotificationEmpty(n *gpb.Notification) bool { if n.Update == nil || len(n.Update) == 0 { if n.Delete == nil || len(n.Delete) == 0 { return true } } return false } func (g *Gnmi) applyDiff(ctx context.Context, payload change.Payload, path *gpb.Path, schema *ytypes.Schema) error { diff, err := ygot.Diff(payload.Original, payload.Modified) if err != nil { return err } if isGNMINotificationEmpty(diff) { return errors.ErrNoNewChanges{Original: payload.Original, Modified: payload.Modified} } var json []byte if op := ctx.Value(types.CtxKeyOperation); op == ppb.ApiOperation_API_OPERATION_UPDATE || op == ppb.ApiOperation_API_OPERATION_REPLACE { rootCopy, err := ygot.DeepCopy(schema.Root) if err != nil { return err } for _, u := range diff.Update { opts := []ytypes.SetNodeOpt{&ytypes.InitMissingElements{}, &ytypes.TolerateJSONInconsistencies{}} if err := g.SetNode(schema.RootSchema(), rootCopy, u.GetPath(), u.GetVal(), opts...); err != nil { return err } } ygot.PruneEmptyBranches(rootCopy) opts := []ytypes.GetNodeOpt{ &ytypes.GetHandleWildcards{}, } nodes, err := ytypes.GetNode(schema.RootSchema(), rootCopy, path, opts...) if err != nil { return err } if len(nodes) == 0 || err != nil || util.IsValueNil(nodes[0].Data) { return errors.ErrPathNotFound{Path: path, Err: err} } json, err = ygot.Marshal7951(nodes[0].Data, &ygot.RFC7951JSONConfig{AppendModuleName: true}) if err != nil { return err } } req, err := createSetRequest(ctx, diff, json, path) if err != nil { return err } resp, err := g.client.Set(ctx, req) log.Info(resp) return err } func createSetRequest(ctx context.Context, diff *gpb.Notification, json []byte, path *gpb.Path) (*gpb.SetRequest, error) { op := ctx.Value(types.CtxKeyOperation) req := &gpb.SetRequest{} if diff.Update != nil { switch op { case ppb.ApiOperation_API_OPERATION_UPDATE: req.Update = []*gpb.Update{{ Path: path, Val: &gpb.TypedValue{ Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: json}, }, }} case ppb.ApiOperation_API_OPERATION_REPLACE: req.Replace = []*gpb.Update{{ Path: path, Val: &gpb.TypedValue{ Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: json}, }, }} default: return nil, &errors.ErrOperationNotSupported{Op: op} } } else if diff.Delete != nil { req.Delete = diff.Delete } return req, nil } //Subscribe subscribes to a gNMI target func (g *Gnmi) Subscribe(ctx context.Context, params ...string) error { if g.client == nil { return &errors.ErrNilClient{} } return g.subscribe(ctx) } // Type returns the gNMI transport type func (g *Gnmi) Type() string { return "gnmi" } // ProcessResponse takes a gNMI response and serializes the contents to the // root struct. It logs all errors and returns an error containing the number // off errors encountered during the process. func (g *Gnmi) ProcessResponse(resp interface{}, root interface{}, s *ytypes.Schema) error { d, ok := root.(ygot.ValidatedGoStruct) if !ok { return &errors.ErrInvalidTypeAssertion{ Value: root, Type: (*ygot.ValidatedGoStruct)(nil), } } r, ok := resp.(*gpb.GetResponse) if !ok { return &errors.ErrInvalidTypeAssertion{ Value: resp, Type: &gpb.GetResponse{}, } } rn := r.Notification errs := make([]error, 0) for _, msg := range rn { for _, update := range msg.Update { path := update.Path switch val := update.Val.Value.(type) { case *gpb.TypedValue_JsonVal: opts := []ytypes.UnmarshalOpt{&ytypes.IgnoreExtraFields{}} if err := g.Unmarshal(val.JsonVal, path, d, opts...); err != nil { errs = append(errs, err) } case *gpb.TypedValue_JsonIetfVal: opts := []ytypes.UnmarshalOpt{&ytypes.IgnoreExtraFields{}} if err := g.Unmarshal(val.JsonIetfVal, path, d, opts...); err != nil { errs = append(errs, err) } default: schema := s.RootSchema() opts := []ytypes.SetNodeOpt{&ytypes.InitMissingElements{}, &ytypes.TolerateJSONInconsistencies{}} if err := g.SetNode(schema, root, update.Path, update.Val, opts...); err != nil { errs = append(errs, err) } } } } for _, e := range errs { log.Error(e) } if len(errs) != 0 { return fmt.Errorf("encountered %v errors during response processing\n%v", len(errs), errs) } return nil } // Capabilities calls GNMI capabilities func (g *Gnmi) Capabilities(ctx context.Context) (interface{}, error) { log.WithFields(log.Fields{ "target": g.Options.Address, }).Info("sending gNMI capabilities request") ctx = gnmi.NewContext(ctx, g.config) ctx = context.WithValue(ctx, types.CtxKeyConfig, g.config) //nolint resp, err := g.client.Capabilities(ctx, &gpb.CapabilityRequest{}) if err != nil { return nil, err } return resp, nil } // get calls GNMI get func (g *Gnmi) get(ctx context.Context, paths [][]string, origin string) (interface{}, error) { ctx = context.WithValue(ctx, types.CtxKeyConfig, g.config) //nolint req, err := gnmi.NewGetRequest(ctx, paths, origin) if err != nil { return nil, err } return g.getWithRequest(ctx, req) } // getWithRequest takes a fully formed GetRequest, performs the Get, // and returns any response. func (g *Gnmi) getWithRequest(ctx context.Context, req *gpb.GetRequest) (interface{}, error) { if req == nil { return nil, &errors.ErrNil{} } log.WithFields(log.Fields{ "target": g.Options.Address, "path": req.Path, }).Info("sending gNMI get request") resp, err := g.client.Get(ctx, req) if err != nil { return nil, err } return resp, nil } // Subscribe calls GNMI subscribe func (g *Gnmi) subscribe(ctx context.Context) error { ctx = gnmi.NewContext(ctx, g.config) opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions) if !ok { return &errors.ErrInvalidTypeAssertion{ Value: ctx.Value(types.CtxKeyOpts), Type: &gnmi.SubscribeOptions{}, } } go func() { log.WithFields(log.Fields{ "address": opts.Target, "paths": opts.Paths, "mode": opts.Mode, "interval": opts.SampleInterval, }).Info("subscribed to gNMI target") for { resp := <-g.RespChan if resp != nil { if err := gnmi.LogSubscribeResponse(resp); err != nil { log.Fatal(err) } } } }() return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan) } // Close calls GNMI close func (g *Gnmi) Close() error { return nil } // SetPassthrough allows to pass an existing SetRequest. Used for cSBI func (g *Gnmi) SetPassthrough(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse, error) { return g.client.Set(ctx, req) } // GetPassthrough allows to pass an existing GetRequest. Used for cSBI func (g *Gnmi) GetPassthrough(ctx context.Context, req *gpb.GetRequest) (*gpb.GetResponse, error) { return g.client.Get(ctx, req) }