-
Fabian Seidl authoredFabian Seidl authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
gnmi_transport.go 11.51 KiB
package nucleus
import (
"context"
"fmt"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
tpInterface "code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound"
ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
gpb "github.com/openconfig/gnmi/proto/gnmi"
"github.com/openconfig/goyang/pkg/yang"
"github.com/openconfig/ygot/util"
"github.com/openconfig/ygot/ygot"
"github.com/openconfig/ygot/ytypes"
log "github.com/sirupsen/logrus"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
)
// Gnmi implements the Transport interface and provides an SBI with the
// possibility to access a gNMI endpoint.
type Gnmi struct {
SetNode func(schema *yang.Entry, root interface{}, path *gpb.Path, val interface{}, opts ...ytypes.SetNodeOpt) error
RespChan chan *gpb.SubscribeResponse
Unmarshal func([]byte, *gpb.Path, ygot.GoStruct, ...ytypes.UnmarshalOpt) error
Options *tpb.TransportOption
client gpb.GNMIClient
config *gnmi.Config
}
// newGnmiTransport takes a struct of GnmiTransportOptions and returns a Gnmi
// transport based on the values of it.
// Do not call directly. Use NewTransport() instead.
func newGnmiTransport(opts *tpb.TransportOption, sbi southbound.SouthboundInterface) (*Gnmi, error) {
if opts == nil || sbi == nil {
return nil, &errors.ErrInvalidParameters{
Func: newGnmiTransport,
Param: "'opts' and 'sbi' can not be nil",
}
} else if opts.TransportOption == nil {
return nil, &errors.ErrInvalidParameters{
Func: newGnmiTransport,
Param: "'opts.TransportOption' can not be nil",
}
}
gnmiConfig := &gnmi.Config{
Addr: opts.Address,
Password: opts.Password,
Username: opts.Username,
TLS: opts.Tls,
Encoding: gpb.Encoding_JSON_IETF,
Compression: opts.GetGnmiTransportOption().GetCompression(),
}
c, err := gnmi.Dial(gnmiConfig)
if err != nil {
return nil, err
}
log.WithFields(log.Fields{
"target": opts.Address,
"tls": opts.Tls,
}).Info("building new gNMI transport")
return &Gnmi{
SetNode: sbi.SetNode,
RespChan: make(chan *gpb.SubscribeResponse),
Unmarshal: sbi.Unmarshal,
Options: opts,
client: c,
config: gnmiConfig,
}, nil
}
// Get takes a slice of gnmi paths, splits them and calls get for each one of them.
func (g *Gnmi) Get(ctx context.Context, params ...string) (interface{}, error) {
if g.client == nil {
return nil, &errors.ErrNilClient{}
}
ctx = gnmi.NewContext(ctx, g.config)
paths := gnmi.SplitPaths(params)
return g.get(ctx, paths, "")
}
// Set takes a change.Payload struct.
func (g *Gnmi) Set(ctx context.Context, payload change.Payload, path string, schema *ytypes.Schema) error {
p, err := ygot.StringToStructuredPath(path)
if err != nil {
return err
}
if g.client == nil {
return &errors.ErrNilClient{}
}
ctx = gnmi.NewContext(ctx, g.config)
return g.applyDiff(ctx, payload, p, schema)
}
// isGNMINotificationEmpty checks if the given gnmi.Notification does not
// contain any updates or deletes.
func isGNMINotificationEmpty(n *gpb.Notification) bool {
if n.Update == nil || len(n.Update) == 0 {
if n.Delete == nil || len(n.Delete) == 0 {
return true
}
}
return false
}
func (g *Gnmi) applyDiff(ctx context.Context, payload change.Payload, path *gpb.Path, schema *ytypes.Schema) error {
diff, err := ygot.Diff(payload.Original, payload.Modified)
if err != nil {
return err
}
if isGNMINotificationEmpty(diff) {
return errors.ErrNoNewChanges{Original: payload.Original, Modified: payload.Modified}
}
var json []byte
if op := ctx.Value(types.CtxKeyOperation); op == ppb.ApiOperation_API_OPERATION_UPDATE || op == ppb.ApiOperation_API_OPERATION_REPLACE {
rootCopy, err := ygot.DeepCopy(schema.Root)
if err != nil {
return err
}
for _, u := range diff.Update {
opts := []ytypes.SetNodeOpt{&ytypes.InitMissingElements{}, &ytypes.TolerateJSONInconsistencies{}}
if err := g.SetNode(schema.RootSchema(), rootCopy, u.GetPath(), u.GetVal(), opts...); err != nil {
return err
}
}
ygot.PruneEmptyBranches(rootCopy)
opts := []ytypes.GetNodeOpt{
&ytypes.GetHandleWildcards{},
}
nodes, err := ytypes.GetNode(schema.RootSchema(), rootCopy, path, opts...)
if err != nil {
return err
}
if len(nodes) == 0 || err != nil || util.IsValueNil(nodes[0].Data) {
return errors.ErrPathNotFound{Path: path, Err: err}
}
json, err = ygot.Marshal7951(nodes[0].Data, &ygot.RFC7951JSONConfig{AppendModuleName: true})
if err != nil {
return err
}
}
req, err := createSetRequest(ctx, diff, json, path)
if err != nil {
return err
}
resp, err := g.client.Set(ctx, req)
log.Info(resp)
return err
}
func createSetRequest(ctx context.Context, diff *gpb.Notification, json []byte, path *gpb.Path) (*gpb.SetRequest, error) {
op := ctx.Value(types.CtxKeyOperation)
req := &gpb.SetRequest{}
if diff.Update != nil {
switch op {
case ppb.ApiOperation_API_OPERATION_UPDATE:
req.Update = []*gpb.Update{{
Path: path,
Val: &gpb.TypedValue{
Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: json},
},
}}
case ppb.ApiOperation_API_OPERATION_REPLACE:
req.Replace = []*gpb.Update{{
Path: path,
Val: &gpb.TypedValue{
Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: json},
},
}}
default:
return nil, &errors.ErrOperationNotSupported{Op: op}
}
} else if diff.Delete != nil {
req.Delete = diff.Delete
}
return req, nil
}
//Subscribe subscribes to a gNMI target
func (g *Gnmi) Subscribe(ctx context.Context, params ...string) error {
if g.client == nil {
return &errors.ErrNilClient{}
}
return g.subscribe(ctx)
}
// SubscribeInternal is used to subscribe to devices from within the controller. gNMI SubscribeOptions need to be provided in the context,
// the callback function handles the responses received from the subscription.
func (g *Gnmi) SubscribeInternal(ctx context.Context, subscribeCallbackFunc tpInterface.HandleSubscribeResponse, subscriptionInfo *tpInterface.SubscriptionInformation) error {
if g.client == nil {
return &errors.ErrNilClient{}
}
return g.subscribeInternal(ctx, subscribeCallbackFunc, subscriptionInfo)
}
// Type returns the gNMI transport type
func (g *Gnmi) Type() string {
return "gnmi"
}
// ProcessResponse takes a gNMI response and serializes the contents to the
// root struct. It logs all errors and returns an error containing the number
// off errors encountered during the process.
func (g *Gnmi) ProcessResponse(resp interface{}, root interface{}, s *ytypes.Schema) error {
d, ok := root.(ygot.ValidatedGoStruct)
if !ok {
return &errors.ErrInvalidTypeAssertion{
Value: root,
Type: (*ygot.ValidatedGoStruct)(nil),
}
}
r, ok := resp.(*gpb.GetResponse)
if !ok {
return &errors.ErrInvalidTypeAssertion{
Value: resp,
Type: &gpb.GetResponse{},
}
}
rn := r.Notification
errs := make([]error, 0)
for _, msg := range rn {
for _, update := range msg.Update {
path := update.Path
switch val := update.Val.Value.(type) {
case *gpb.TypedValue_JsonVal:
opts := []ytypes.UnmarshalOpt{&ytypes.IgnoreExtraFields{}}
if err := g.Unmarshal(val.JsonVal, path, d, opts...); err != nil {
errs = append(errs, err)
}
case *gpb.TypedValue_JsonIetfVal:
opts := []ytypes.UnmarshalOpt{&ytypes.IgnoreExtraFields{}}
if err := g.Unmarshal(val.JsonIetfVal, path, d, opts...); err != nil {
errs = append(errs, err)
}
default:
schema := s.RootSchema()
opts := []ytypes.SetNodeOpt{&ytypes.InitMissingElements{}, &ytypes.TolerateJSONInconsistencies{}}
if err := g.SetNode(schema, root, update.Path, update.Val, opts...); err != nil {
errs = append(errs, err)
}
}
}
}
for _, e := range errs {
log.Error(e)
}
if len(errs) != 0 {
return fmt.Errorf("encountered %v errors during response processing\n%v", len(errs), errs)
}
return nil
}
// Capabilities calls GNMI capabilities
func (g *Gnmi) Capabilities(ctx context.Context) (interface{}, error) {
log.WithFields(log.Fields{
"target": g.Options.Address,
}).Info("sending gNMI capabilities request")
ctx = gnmi.NewContext(ctx, g.config)
ctx = context.WithValue(ctx, types.CtxKeyConfig, g.config) //nolint
resp, err := g.client.Capabilities(ctx, &gpb.CapabilityRequest{})
if err != nil {
return nil, err
}
return resp, nil
}
// get calls GNMI get
func (g *Gnmi) get(ctx context.Context, paths [][]string, origin string) (interface{}, error) {
ctx = context.WithValue(ctx, types.CtxKeyConfig, g.config) //nolint
req, err := gnmi.NewGetRequest(ctx, paths, origin)
if err != nil {
return nil, err
}
return g.getWithRequest(ctx, req)
}
// getWithRequest takes a fully formed GetRequest, performs the Get,
// and returns any response.
func (g *Gnmi) getWithRequest(ctx context.Context, req *gpb.GetRequest) (interface{}, error) {
if req == nil {
return nil, &errors.ErrNil{}
}
log.WithFields(log.Fields{
"target": g.Options.Address,
"path": req.Path,
}).Info("sending gNMI get request")
resp, err := g.client.Get(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
}
// subscribe calls GNMI subscribe.
func (g *Gnmi) subscribe(ctx context.Context) error {
ctx = gnmi.NewContext(ctx, g.config)
opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
if !ok {
return &errors.ErrInvalidTypeAssertion{
Value: ctx.Value(types.CtxKeyOpts),
Type: &gnmi.SubscribeOptions{},
}
}
go func() {
log.WithFields(log.Fields{
"address": opts.Target,
"paths": opts.Paths,
"mode": opts.Mode,
"interval": opts.SampleInterval,
}).Info("subscribed to gNMI target")
for {
resp := <-g.RespChan
if resp != nil {
if err := gnmi.LogSubscribeResponse(resp); err != nil {
log.Fatal(err)
}
}
}
}()
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
}
// subscribeInternal calls gNMI subscribe with a callback for responses and additional device information including
// an option to stop the subscription.
func (g *Gnmi) subscribeInternal(ctx context.Context, subcribeCallbackFunc func(*gpb.SubscribeResponse,
*tpInterface.SubscriptionInformation), subscriptionInfo *tpInterface.SubscriptionInformation) error {
ctx = gnmi.NewContext(ctx, g.config)
opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
if !ok {
return &errors.ErrInvalidTypeAssertion{
Value: ctx.Value(types.CtxKeyOpts),
Type: &gnmi.SubscribeOptions{},
}
}
go func() {
log.WithFields(log.Fields{
"address": opts.Target,
"paths": opts.Paths,
"mode": opts.Mode,
"interval": opts.SampleInterval,
}).Info("subscribed to gNMI target")
for {
resp := <-g.RespChan
if resp != nil {
// callback to trigger internal event handling process
go subcribeCallbackFunc(resp, subscriptionInfo)
}
select {
case <-subscriptionInfo.StopContext.Done():
if err := subscriptionInfo.StopContext.Err(); err != nil {
log.Error(err)
}
return
default:
}
}
}()
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
}
// Close calls GNMI close
func (g *Gnmi) Close() error {
return nil
}
// SetPassthrough allows to pass an existing SetRequest. Used for cSBI
func (g *Gnmi) SetPassthrough(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse, error) {
return g.client.Set(ctx, req)
}
// GetPassthrough allows to pass an existing GetRequest. Used for cSBI
func (g *Gnmi) GetPassthrough(ctx context.Context, req *gpb.GetRequest) (*gpb.GetResponse, error) {
return g.client.Get(ctx, req)
}