Skip to content
Snippets Groups Projects
gnmi_transport.go 10.5 KiB
Newer Older
  • Learn to ignore specific revisions
  • Andre Sterba's avatar
    Andre Sterba committed
    	"context"
    
    	"fmt"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/config"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
    
    	tpInterface "code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/plugin/shared"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
    
    	"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
    
    	"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
    
    	gpb "github.com/openconfig/gnmi/proto/gnmi"
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	log "github.com/sirupsen/logrus"
    
    	tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
    
    var gnmiClients = make(map[string]gpb.GNMIClient, 0)
    
    
    // Gnmi implements the Transport interface and provides an SBI with the
    // possibility to access a gNMI endpoint.
    
    type Gnmi struct {
    
    	SetNode    func(path *gpb.Path, value *gpb.TypedValue) error
    	DeleteNode func(path *gpb.Path) error
    	RespChan   chan *gpb.SubscribeResponse
    	Unmarshal  func(data []byte, path *gpb.Path) error
    	Options    *tpb.TransportOption
    	client     gpb.GNMIClient
    	config     *gnmi.Config
    
    // newGnmiTransport takes a struct of GnmiTransportOptions and returns a Gnmi
    
    // transport based on the values of it.
    
    // Do not call directly. Use NewTransport() instead.
    
    func newGnmiTransport(opts *tpb.TransportOption, model shared.DeviceModel) (*Gnmi, error) {
    	if opts == nil || model == nil {
    
    			Func:  newGnmiTransport,
    			Param: "'opts' and 'sbi' can not be nil",
    		}
    
    	} else if opts.TransportOption == nil {
    
    			Func:  newGnmiTransport,
    			Param: "'opts.TransportOption' can not be nil",
    		}
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	gnmiConfig := &gnmi.Config{
    		Addr:        opts.Address,
    		Password:    opts.Password,
    		Username:    opts.Username,
    		TLS:         opts.Tls,
    
    		Compression: opts.GetGnmiTransportOption().GetCompression(),
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	}
    
    
    	if opts.Tls {
    		if err := enableTLSInGnmiConfig(gnmiConfig); err != nil {
    			log.Error(err)
    			return nil, err
    		}
    	}
    
    
    	var err error
    	c, ok := gnmiClients[opts.GetAddress()]
    	if !ok {
    		c, err = gnmi.Dial(gnmiConfig)
    		if err != nil {
    			return nil, err
    		}
    		gnmiClients[opts.GetAddress()] = c
    
    	log.WithFields(log.Fields{
    
    		"target": opts.Address,
    		"tls":    opts.Tls,
    
    	}).Info("building new gNMI transport")
    
    	return &Gnmi{
    
    		SetNode:    model.SetNode,
    		DeleteNode: model.DeleteNode,
    		RespChan:   make(chan *gpb.SubscribeResponse),
    		Unmarshal:  model.Unmarshal,
    		Options:    opts,
    		client:     c,
    		config:     gnmiConfig,
    
    	}, nil
    
    Malte Bauch's avatar
    Malte Bauch committed
    // Get takes a slice of gnmi paths, splits them and calls get for each one of them.
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    func (g *Gnmi) Get(ctx context.Context, params ...string) (interface{}, error) {
    
    	if g.client == nil {
    
    	ctx = gnmi.NewContext(ctx, g.config)
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	paths := gnmi.SplitPaths(params)
    	return g.get(ctx, paths, "")
    }
    
    // Set takes a change.Payload struct.
    
    func (g *Gnmi) Set(ctx context.Context, payload change.Payload) error {
    
    	if g.client == nil {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	ctx = gnmi.NewContext(ctx, g.config)
    
    	return g.applyDiff(ctx, payload)
    
    func (g *Gnmi) applyDiff(ctx context.Context, payload change.Payload) error {
    	diff := payload.Diff
    
    	updates := diff.GetUpdate()
    	deletes := diff.GetDelete()
    
    	setRequest := &gpb.SetRequest{
    		Prefix: diff.GetPrefix(),
    		Delete: deletes,
    		Update: updates,
    	}
    
    	resp, err := g.client.Set(ctx, setRequest)
    
    Andre Sterba's avatar
    Andre Sterba committed
    // Subscribe subscribes to a gNMI target.
    
    func (g *Gnmi) Subscribe(ctx context.Context, params ...string) error {
    
    	if g.client == nil {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	return g.subscribe(ctx)
    }
    
    // ControlPlaneSubscribe is used to subscribe to devices from within the controller. gNMI SubscribeOptions need to be provided in the context,
    // the callback function handles the responses received from the subscription.
    
    func (g *Gnmi) ControlPlaneSubscribe(ctx context.Context, subscriptionInfo tpInterface.SubscriptionInformation, subInfoChannel chan tpInterface.SubscriptionInformation) error {
    
    	return g.controlPlaneSubscribe(ctx, subscriptionInfo, subInfoChannel)
    
    // Type returns the gNMI transport type.
    
    func (g *Gnmi) Type() string {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	return "gnmi"
    }
    
    
    // ProcessResponse takes a gNMI response and serializes the contents to the
    // root struct. It logs all errors and returns an error containing the number
    // off errors encountered during the process.
    
    func (g *Gnmi) ProcessResponse(resp interface{}) error {
    
    	r, ok := resp.(*gpb.GetResponse)
    	if !ok {
    
    			Value: resp,
    			Type:  &gpb.GetResponse{},
    		}
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	rn := r.Notification
    	for _, msg := range rn {
    
    		if err := g.processResponseUpdates(msg.Update); err != nil {
    
    // ProcessControlPlaneSubscribeResponse processes the gNMI notification within the subscribe response, updating the provided device model.
    func (g *Gnmi) ProcessControlPlaneSubscribeResponse(resp *gpb.SubscribeResponse_Update) error {
    
    	notification := resp.Update
    
    	if len(notification.Update) > 0 {
    
    		if err := g.processResponseUpdates(notification.Update); err != nil {
    
    			return err
    		}
    	}
    
    	if len(notification.Delete) > 0 {
    
    		if err := g.processResponseDeletes(notification.Delete); err != nil {
    
    func (g *Gnmi) processResponseUpdates(updates []*gpb.Update) error {
    
    	errs := make([]error, 0)
    
    	for _, update := range updates {
    
    		if err := g.SetNode(update.Path, update.Val); err != nil {
    			errs = append(errs, err)
    
    
    	if len(errs) != 0 {
    		return handleProcessResponseErrors(errs)
    	}
    
    	return nil
    }
    
    func handleProcessResponseErrors(errs []error) error {
    
    	for _, e := range errs {
    		log.Error(e)
    	}
    
    
    	return fmt.Errorf("encountered %v errors during response processing\n%v", len(errs), errs)
    }
    
    
    func (g *Gnmi) processResponseDeletes(deletes []*gpb.Path) error {
    	errs := make([]error, 0)
    	for _, delete := range deletes {
    		if err := g.DeleteNode(delete); err != nil {
    			errs = append(errs, err)
    		}
    
    	if len(errs) != 0 {
    		return handleProcessResponseErrors(errs)
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	return nil
    }
    
    
    // Capabilities calls GNMI capabilities.
    
    func (g *Gnmi) Capabilities(ctx context.Context) (interface{}, error) {
    
    	log.WithFields(log.Fields{
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    		"target": g.Options.Address,
    
    	}).Info("sending gNMI capabilities request")
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	ctx = gnmi.NewContext(ctx, g.config)
    	ctx = context.WithValue(ctx, types.CtxKeyConfig, g.config) //nolint
    
    	resp, err := g.client.Capabilities(ctx, &gpb.CapabilityRequest{})
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    func (g *Gnmi) get(ctx context.Context, paths [][]string, origin string) (interface{}, error) {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	ctx = context.WithValue(ctx, types.CtxKeyConfig, g.config) //nolint
    
    	req, err := gnmi.NewGetRequest(ctx, paths, origin)
    
    	return g.getWithRequest(ctx, req)
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    // getWithRequest takes a fully formed GetRequest, performs the Get,
    // and returns any response.
    
    func (g *Gnmi) getWithRequest(ctx context.Context, req *gpb.GetRequest) (interface{}, error) {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	if req == nil {
    
    	log.WithFields(log.Fields{
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    		"target": g.Options.Address,
    
    		"path":   req.Path,
    	}).Info("sending gNMI get request")
    
    
    	resp, err := g.client.Get(ctx, req)
    
    // subscribe calls GNMI subscribe.
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    func (g *Gnmi) subscribe(ctx context.Context) error {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	ctx = gnmi.NewContext(ctx, g.config)
    
    	opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	if !ok {
    
    			Value: ctx.Value(types.CtxKeyOpts),
    			Type:  &gnmi.SubscribeOptions{},
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	go func() {
    
    		log.WithFields(log.Fields{
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    			"address":  opts.Target,
    			"paths":    opts.Paths,
    			"mode":     opts.Mode,
    
    			"interval": opts.SampleInterval,
    		}).Info("subscribed to gNMI target")
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    			if resp != nil {
    				if err := gnmi.LogSubscribeResponse(resp); err != nil {
    					log.Fatal(err)
    				}
    
    	return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
    
    // controlPlaneSubscribe calls gNMI subscribe with a callback for responses and additional network element information including
    
    // an option to stop the subscription.
    
    func (g *Gnmi) controlPlaneSubscribe(ctx context.Context, subscriptionInfo tpInterface.SubscriptionInformation, subInfoChannel chan tpInterface.SubscriptionInformation) error {
    
    	ctx = gnmi.NewContext(ctx, g.config)
    	opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
    	if !ok {
    
    			Value: ctx.Value(types.CtxKeyOpts),
    			Type:  &gnmi.SubscribeOptions{},
    		}
    	}
    
    	go func() {
    		log.WithFields(log.Fields{
    			"address":  opts.Target,
    			"paths":    opts.Paths,
    			"mode":     opts.Mode,
    			"interval": opts.SampleInterval,
    		}).Info("subscribed to gNMI target")
    		for {
    			resp := <-g.RespChan
    
    			subscriptionInfo.SubResponse = resp
    			subInfoChannel <- subscriptionInfo
    
    
    			select {
    			case <-subscriptionInfo.StopContext.Done():
    				if err := subscriptionInfo.StopContext.Err(); err != nil {
    					log.Error(err)
    				}
    				return
    			default:
    
    	return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
    }
    
    
    // CustomSet allows to build a custom set request.
    func (g *Gnmi) CustomSet(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse, error) {
    	ctx = gnmi.NewContext(ctx, g.config)
    
    	return g.client.Set(ctx, req)
    }
    
    
    // SetPassthrough allows to pass an existing SetRequest. Used for cSBI.
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    func (g *Gnmi) SetPassthrough(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse, error) {
    	return g.client.Set(ctx, req)
    }
    
    
    // GetPassthrough allows to pass an existing GetRequest. Used for cSBI.
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    func (g *Gnmi) GetPassthrough(ctx context.Context, req *gpb.GetRequest) (*gpb.GetResponse, error) {
    	return g.client.Get(ctx, req)
    }
    
    
    func enableTLSInGnmiConfig(gnmiConfig *gnmi.Config) error {
    	wd, err := os.Getwd()
    	if err != nil {
    		return err
    	}
    
    	if config.CAFilePath != "" {
    		gnmiConfig.CertFile = wd + config.CAFilePath
    	} else {
    		return fmt.Errorf("Error setting up client with mTLS, required CA file not provided in config")
    	}
    
    	if config.CertFilePath != "" && config.KeyFilePath != "" {
    		gnmiConfig.CertFile = wd + config.CertFilePath
    		gnmiConfig.KeyFile = wd + config.KeyFilePath
    	} else {
    		return fmt.Errorf("Error setting up client with mTLS, required files not provided in config")
    	}
    
    	return nil
    }