Skip to content
Snippets Groups Projects
gnmi_transport.go 12.4 KiB
Newer Older
  • Learn to ignore specific revisions
  • Andre Sterba's avatar
    Andre Sterba committed
    	"context"
    
    	"fmt"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
    
    	tpInterface "code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound"
    
    	ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
    
    	"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
    
    	gpb "github.com/openconfig/gnmi/proto/gnmi"
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	"github.com/openconfig/goyang/pkg/yang"
    
    	"github.com/openconfig/ygot/ygot"
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	"github.com/openconfig/ygot/ytypes"
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	log "github.com/sirupsen/logrus"
    
    	tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
    
    // Gnmi implements the Transport interface and provides an SBI with the
    // possibility to access a gNMI endpoint.
    
    type Gnmi struct {
    
    	SetNode   func(schema *yang.Entry, root interface{}, path *gpb.Path, val interface{}, opts ...ytypes.SetNodeOpt) error
    	RespChan  chan *gpb.SubscribeResponse
    
    	Unmarshal func([]byte, *gpb.Path, ygot.GoStruct, ...ytypes.UnmarshalOpt) error
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	Options   *tpb.TransportOption
    
    	client    gpb.GNMIClient
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	config    *gnmi.Config
    
    // newGnmiTransport takes a struct of GnmiTransportOptions and returns a Gnmi
    
    // transport based on the values of it.
    
    // Do not call directly. Use NewTransport() instead.
    func newGnmiTransport(opts *tpb.TransportOption, sbi southbound.SouthboundInterface) (*Gnmi, error) {
    	if opts == nil || sbi == nil {
    
    			Func:  newGnmiTransport,
    			Param: "'opts' and 'sbi' can not be nil",
    		}
    
    	} else if opts.TransportOption == nil {
    
    			Func:  newGnmiTransport,
    			Param: "'opts.TransportOption' can not be nil",
    		}
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	gnmiConfig := &gnmi.Config{
    		Addr:        opts.Address,
    		Password:    opts.Password,
    		Username:    opts.Username,
    		TLS:         opts.Tls,
    
    		Compression: opts.GetGnmiTransportOption().GetCompression(),
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	}
    	c, err := gnmi.Dial(gnmiConfig)
    
    	if err != nil {
    		return nil, err
    	}
    
    	log.WithFields(log.Fields{
    
    		"target": opts.Address,
    		"tls":    opts.Tls,
    
    	}).Info("building new gNMI transport")
    
    	return &Gnmi{
    
    		SetNode:   sbi.SetNode,
    		RespChan:  make(chan *gpb.SubscribeResponse),
    		Unmarshal: sbi.Unmarshal,
    		Options:   opts,
    		client:    c,
    		config:    gnmiConfig,
    
    	}, nil
    
    Malte Bauch's avatar
    Malte Bauch committed
    // Get takes a slice of gnmi paths, splits them and calls get for each one of them.
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    func (g *Gnmi) Get(ctx context.Context, params ...string) (interface{}, error) {
    
    	if g.client == nil {
    
    	ctx = gnmi.NewContext(ctx, g.config)
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	paths := gnmi.SplitPaths(params)
    	return g.get(ctx, paths, "")
    }
    
    // Set takes a change.Payload struct.
    
    func (g *Gnmi) Set(ctx context.Context, payload change.Payload, path string, schema *ytypes.Schema) error {
    	p, err := ygot.StringToStructuredPath(path)
    	if err != nil {
    		return err
    	}
    
    	if g.client == nil {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	ctx = gnmi.NewContext(ctx, g.config)
    
    	return g.applyDiff(ctx, payload, p, schema)
    
    // isGNMINotificationEmpty checks if the given gnmi.Notification does not
    // contain any updates or deletes.
    func isGNMINotificationEmpty(n *gpb.Notification) bool {
    	if n.Update == nil || len(n.Update) == 0 {
    		if n.Delete == nil || len(n.Delete) == 0 {
    			return true
    		}
    	}
    	return false
    }
    
    func (g *Gnmi) applyDiff(ctx context.Context, payload change.Payload, path *gpb.Path, schema *ytypes.Schema) error {
    
    	diff, err := ygot.Diff(payload.Original, payload.Modified)
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	if err != nil {
    		return err
    	}
    
    
    	if isGNMINotificationEmpty(diff) {
    
    		return customerrs.NoNewChangesError{Original: payload.Original, Modified: payload.Modified}
    
    	req, err := createSetRequest(ctx, diff)
    
    	if err != nil {
    		return err
    	}
    
    	resp, err := g.client.Set(ctx, req)
    	log.Info(resp)
    	return err
    }
    
    
    func createSetRequest(ctx context.Context, diff *gpb.Notification) (*gpb.SetRequest, error) {
    
    	op := ctx.Value(types.CtxKeyOperation)
    
    	req := &gpb.SetRequest{}
    	if diff.Update != nil {
    		switch op {
    
    Fabian Seidl's avatar
    Fabian Seidl committed
    		case ppb.ApiOperation_API_OPERATION_UPDATE:
    
    			req.Update = diff.Update
    
    Fabian Seidl's avatar
    Fabian Seidl committed
    		case ppb.ApiOperation_API_OPERATION_REPLACE:
    
    			req.Replace = diff.Update
    
    		default:
    
    			return nil, &customerrs.OperationNotSupportedError{Op: op}
    
    		}
    	} else if diff.Delete != nil {
    		req.Delete = diff.Delete
    	}
    
    Andre Sterba's avatar
    Andre Sterba committed
    // Subscribe subscribes to a gNMI target.
    
    func (g *Gnmi) Subscribe(ctx context.Context, params ...string) error {
    
    	if g.client == nil {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	return g.subscribe(ctx)
    }
    
    // ControlPlaneSubscribe is used to subscribe to devices from within the controller. gNMI SubscribeOptions need to be provided in the context,
    // the callback function handles the responses received from the subscription.
    func (g *Gnmi) ControlPlaneSubscribe(ctx context.Context, subscribeCallbackFunc tpInterface.HandleSubscribeResponse, subscriptionInfo *tpInterface.SubscriptionInformation) error {
    	if g.client == nil {
    
    	}
    
    	return g.controlPlaneSubscribe(ctx, subscribeCallbackFunc, subscriptionInfo)
    }
    
    
    // Type returns the gNMI transport type.
    
    func (g *Gnmi) Type() string {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	return "gnmi"
    }
    
    
    // ProcessResponse takes a gNMI response and serializes the contents to the
    // root struct. It logs all errors and returns an error containing the number
    // off errors encountered during the process.
    
    func (g *Gnmi) ProcessResponse(resp interface{}, root interface{}, s *ytypes.Schema) error {
    
    	d, ok := root.(ygot.ValidatedGoStruct)
    	if !ok {
    
    			Value: root,
    			Type:  (*ygot.ValidatedGoStruct)(nil),
    		}
    
    	}
    	r, ok := resp.(*gpb.GetResponse)
    	if !ok {
    
    			Value: resp,
    			Type:  &gpb.GetResponse{},
    		}
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	rn := r.Notification
    	for _, msg := range rn {
    
    		if err := g.processResponseUpdates(msg.Update, d, root.(ygot.ValidatedGoStruct), s); err != nil {
    			return err
    		}
    	}
    
    	return nil
    }
    
    
    // ProcessControlPlaneSubscribeResponse processes the gNMI notification within the subscribe response, updating the provided network element model.
    
    func (g *Gnmi) ProcessControlPlaneSubscribeResponse(resp *gpb.SubscribeResponse_Update, root any, schema *ytypes.Schema) error {
    	dModel, ok := root.(ygot.ValidatedGoStruct)
    	if !ok {
    
    			Value: root,
    			Type:  (*ygot.ValidatedGoStruct)(nil),
    		}
    	}
    
    	notification := resp.Update
    
    	if len(notification.Update) > 0 {
    		if err := g.processResponseUpdates(notification.Update, dModel, root.(ygot.ValidatedGoStruct), schema); err != nil {
    			return err
    		}
    	}
    
    	if len(notification.Delete) > 0 {
    		if err := g.processResponseDeletes(notification.Delete, dModel, schema); err != nil {
    			return err
    		}
    	}
    
    	return nil
    }
    
    func (g *Gnmi) processResponseUpdates(updates []*gpb.Update, deviceModel ygot.ValidatedGoStruct, root ygot.ValidatedGoStruct, s *ytypes.Schema) error {
    	errs := make([]error, 0)
    
    	for _, update := range updates {
    		path := update.Path
    		switch val := update.Val.Value.(type) {
    		case *gpb.TypedValue_JsonVal:
    			opts := []ytypes.UnmarshalOpt{&ytypes.IgnoreExtraFields{}}
    			if err := g.Unmarshal(val.JsonVal, path, deviceModel, opts...); err != nil {
    				errs = append(errs, err)
    			}
    		case *gpb.TypedValue_JsonIetfVal:
    			opts := []ytypes.UnmarshalOpt{&ytypes.IgnoreExtraFields{}}
    			if err := g.Unmarshal(val.JsonIetfVal, path, deviceModel, opts...); err != nil {
    				errs = append(errs, err)
    			}
    		default:
    			schema := s.RootSchema()
    			opts := []ytypes.SetNodeOpt{&ytypes.InitMissingElements{}, &ytypes.TolerateJSONInconsistencies{}}
    			if err := g.SetNode(schema, root, update.Path, update.Val, opts...); err != nil {
    				errs = append(errs, err)
    
    
    	if len(errs) != 0 {
    		return handleProcessResponseErrors(errs)
    	}
    
    	return nil
    }
    
    func handleProcessResponseErrors(errs []error) error {
    
    	for _, e := range errs {
    		log.Error(e)
    	}
    
    
    	return fmt.Errorf("encountered %v errors during response processing\n%v", len(errs), errs)
    }
    
    func (g *Gnmi) processResponseDeletes(deletes []*gpb.Path, deviceModel ygot.ValidatedGoStruct, rootSchema *ytypes.Schema) error {
    	if err := ytypes.DeleteNode(rootSchema.RootSchema(), deviceModel, deletes[0]); err != nil {
    		return err
    	}
    	modelAsString, _ := ygot.EmitJSON(deviceModel, &ygot.EmitJSONConfig{
    		Format:         ygot.RFC7951,
    		Indent:         "",
    		SkipValidation: true,
    		RFC7951Config: &ygot.RFC7951JSONConfig{
    			AppendModuleName: true,
    		}})
    
    	rootPath, err := ygot.StringToStructuredPath("/")
    	if err != nil {
    		return err
    
    
    	if err := g.Unmarshal([]byte(modelAsString), rootPath, deviceModel); err != nil {
    		return err
    	}
    
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	return nil
    }
    
    
    // Capabilities calls GNMI capabilities.
    
    func (g *Gnmi) Capabilities(ctx context.Context) (interface{}, error) {
    
    	log.WithFields(log.Fields{
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    		"target": g.Options.Address,
    
    	}).Info("sending gNMI capabilities request")
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	ctx = gnmi.NewContext(ctx, g.config)
    	ctx = context.WithValue(ctx, types.CtxKeyConfig, g.config) //nolint
    
    	resp, err := g.client.Capabilities(ctx, &gpb.CapabilityRequest{})
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    func (g *Gnmi) get(ctx context.Context, paths [][]string, origin string) (interface{}, error) {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	ctx = context.WithValue(ctx, types.CtxKeyConfig, g.config) //nolint
    
    	req, err := gnmi.NewGetRequest(ctx, paths, origin)
    
    	return g.getWithRequest(ctx, req)
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    // getWithRequest takes a fully formed GetRequest, performs the Get,
    // and returns any response.
    
    func (g *Gnmi) getWithRequest(ctx context.Context, req *gpb.GetRequest) (interface{}, error) {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	if req == nil {
    
    	log.WithFields(log.Fields{
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    		"target": g.Options.Address,
    
    		"path":   req.Path,
    	}).Info("sending gNMI get request")
    
    
    	resp, err := g.client.Get(ctx, req)
    
    // subscribe calls GNMI subscribe.
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    func (g *Gnmi) subscribe(ctx context.Context) error {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	ctx = gnmi.NewContext(ctx, g.config)
    
    	opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	if !ok {
    
    			Value: ctx.Value(types.CtxKeyOpts),
    			Type:  &gnmi.SubscribeOptions{},
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	go func() {
    
    		log.WithFields(log.Fields{
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    			"address":  opts.Target,
    			"paths":    opts.Paths,
    			"mode":     opts.Mode,
    
    			"interval": opts.SampleInterval,
    		}).Info("subscribed to gNMI target")
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    			if resp != nil {
    				if err := gnmi.LogSubscribeResponse(resp); err != nil {
    					log.Fatal(err)
    				}
    
    	return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
    
    // controlPlaneSubscribe calls gNMI subscribe with a callback for responses and additional network element information including
    
    // an option to stop the subscription.
    func (g *Gnmi) controlPlaneSubscribe(ctx context.Context, subcribeCallbackFunc func(*gpb.SubscribeResponse,
    	*tpInterface.SubscriptionInformation), subscriptionInfo *tpInterface.SubscriptionInformation) error {
    	ctx = gnmi.NewContext(ctx, g.config)
    	opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
    	if !ok {
    
    			Value: ctx.Value(types.CtxKeyOpts),
    			Type:  &gnmi.SubscribeOptions{},
    		}
    	}
    
    	go func() {
    		log.WithFields(log.Fields{
    			"address":  opts.Target,
    			"paths":    opts.Paths,
    			"mode":     opts.Mode,
    			"interval": opts.SampleInterval,
    		}).Info("subscribed to gNMI target")
    		for {
    			resp := <-g.RespChan
    			if resp != nil {
    				// callback to trigger internal event handling process
    				go subcribeCallbackFunc(resp, subscriptionInfo)
    			}
    
    			select {
    			case <-subscriptionInfo.StopContext.Done():
    				if err := subscriptionInfo.StopContext.Err(); err != nil {
    					log.Error(err)
    				}
    				return
    			default:
    			}
    		}
    	}()
    	return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
    }
    
    
    // CustomSet allows to build a custom set request.
    func (g *Gnmi) CustomSet(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse, error) {
    	ctx = gnmi.NewContext(ctx, g.config)
    
    	return g.client.Set(ctx, req)
    }
    
    
    // SetPassthrough allows to pass an existing SetRequest. Used for cSBI.
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    func (g *Gnmi) SetPassthrough(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse, error) {
    	return g.client.Set(ctx, req)
    }
    
    
    // GetPassthrough allows to pass an existing GetRequest. Used for cSBI.
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    func (g *Gnmi) GetPassthrough(ctx context.Context, req *gpb.GetRequest) (*gpb.GetResponse, error) {
    	return g.client.Get(ctx, req)
    }