Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
gnmi_transport.go 12.06 KiB
package nucleus

import (
	"context"
	"fmt"
	"path"
	"time"

	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"

	"code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound"

	ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"

	"code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors"
	"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
	"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
	gpb "github.com/openconfig/gnmi/proto/gnmi"
	"github.com/openconfig/goyang/pkg/yang"
	"github.com/openconfig/ygot/util"
	"github.com/openconfig/ygot/ygot"
	"github.com/openconfig/ygot/ytypes"
	log "github.com/sirupsen/logrus"

	tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
)

// Gnmi implements the Transport interface and provides an SBI with the
// possibility to access a gNMI endpoint.
type Gnmi struct {
	SetNode   func(schema *yang.Entry, root interface{}, path *gpb.Path, val interface{}, opts ...ytypes.SetNodeOpt) error
	RespChan  chan *gpb.SubscribeResponse
	Unmarshal func([]byte, *gpb.Path, ygot.GoStruct, ...ytypes.UnmarshalOpt) error
	Options   *tpb.TransportOption
	client    gpb.GNMIClient
	config    *gnmi.Config
}

// newGnmiTransport takes a struct of GnmiTransportOptions and returns a Gnmi
// transport based on the values of it.
// Do not call directly. Use NewTransport() instead.
func newGnmiTransport(opts *tpb.TransportOption, sbi southbound.SouthboundInterface) (*Gnmi, error) {
	if opts == nil || sbi == nil {
		return nil, &errors.ErrInvalidParameters{
			Func:  newGnmiTransport,
			Param: "'opts' and 'sbi' can not be nil",
		}
	} else if opts.TransportOption == nil {
		return nil, &errors.ErrInvalidParameters{
			Func:  newGnmiTransport,
			Param: "'opts.TransportOption' can not be nil",
		}
	}
	gnmiConfig := &gnmi.Config{
		Addr:        opts.Address,
		Password:    opts.Password,
		Username:    opts.Username,
		TLS:         opts.Tls,
		Encoding:    gpb.Encoding_JSON_IETF,
		Compression: opts.GetGnmiTransportOption().GetCompression(),
	}
	c, err := gnmi.Dial(gnmiConfig)
	if err != nil {
		return nil, err
	}
	log.WithFields(log.Fields{
		"target": opts.Address,
		"tls":    opts.Tls,
	}).Info("building new gNMI transport")
	return &Gnmi{
		SetNode:   sbi.SetNode,
		RespChan:  make(chan *gpb.SubscribeResponse),
		Unmarshal: sbi.Unmarshal,
		Options:   opts,
		client:    c,
		config:    gnmiConfig,
	}, nil
}

// Get takes a slice of gnmi paths, splits them and calls get for each one of them.
func (g *Gnmi) Get(ctx context.Context, params ...string) (interface{}, error) {
	if g.client == nil {
		return nil, &errors.ErrNilClient{}
	}
	ctx = gnmi.NewContext(ctx, g.config)
	paths := gnmi.SplitPaths(params)
	return g.get(ctx, paths, "")
}

// Set takes a change.Payload struct.
func (g *Gnmi) Set(ctx context.Context, payload change.Payload, path string, schema *ytypes.Schema) error {
	p, err := ygot.StringToStructuredPath(path)
	if err != nil {
		return err
	}
	if g.client == nil {
		return &errors.ErrNilClient{}
	}
	ctx = gnmi.NewContext(ctx, g.config)
	return g.applyDiff(ctx, payload, p, schema)
}

// isGNMINotificationEmpty checks if the given gnmi.Notification does not
// contain any updates or deletes.
func isGNMINotificationEmpty(n *gpb.Notification) bool {
	if n.Update == nil || len(n.Update) == 0 {
		if n.Delete == nil || len(n.Delete) == 0 {
			return true
		}
	}
	return false
}

func (g *Gnmi) applyDiff(ctx context.Context, payload change.Payload, path *gpb.Path, schema *ytypes.Schema) error {
	diff, err := ygot.Diff(payload.Original, payload.Modified)
	if err != nil {
		return err
	}

	if isGNMINotificationEmpty(diff) {
		return errors.ErrNoNewChanges{Original: payload.Original, Modified: payload.Modified}
	}

	var json []byte
	if op := ctx.Value(types.CtxKeyOperation); op == ppb.ApiOperation_API_OPERATION_UPDATE || op == ppb.ApiOperation_API_OPERATION_REPLACE {
		rootCopy, err := ygot.DeepCopy(schema.Root)
		if err != nil {
			return err
		}

		for _, u := range diff.Update {
			opts := []ytypes.SetNodeOpt{&ytypes.InitMissingElements{}, &ytypes.TolerateJSONInconsistencies{}}
			if err := g.SetNode(schema.RootSchema(), rootCopy, u.GetPath(), u.GetVal(), opts...); err != nil {
				return err
			}
		}

		ygot.PruneEmptyBranches(rootCopy)

		opts := []ytypes.GetNodeOpt{
			&ytypes.GetHandleWildcards{},
		}
		nodes, err := ytypes.GetNode(schema.RootSchema(), rootCopy, path, opts...)
		if err != nil {
			return err
		}

		if len(nodes) == 0 || err != nil || util.IsValueNil(nodes[0].Data) {
			return errors.ErrPathNotFound{Path: path, Err: err}
		}

		json, err = ygot.Marshal7951(nodes[0].Data, &ygot.RFC7951JSONConfig{AppendModuleName: true})
		if err != nil {
			return err
		}
	}

	req, err := createSetRequest(ctx, diff, json, path)
	if err != nil {
		return err
	}

	resp, err := g.client.Set(ctx, req)
	log.Info(resp)
	return err
}

func createSetRequest(ctx context.Context, diff *gpb.Notification, json []byte, path *gpb.Path) (*gpb.SetRequest, error) {
	op := ctx.Value(types.CtxKeyOperation)
	req := &gpb.SetRequest{}
	if diff.Update != nil {
		switch op {
		case ppb.ApiOperation_API_OPERATION_UPDATE:
			req.Update = []*gpb.Update{{
				Path: path,
				Val: &gpb.TypedValue{
					Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: json},
				},
			}}
		case ppb.ApiOperation_API_OPERATION_REPLACE:
			req.Replace = []*gpb.Update{{
				Path: path,
				Val: &gpb.TypedValue{
					Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: json},
				},
			}}
		default:
			return nil, &errors.ErrOperationNotSupported{Op: op}
		}
	} else if diff.Delete != nil {
		req.Delete = diff.Delete
	}
	return req, nil
}

//Subscribe subscribes to a gNMI target
func (g *Gnmi) Subscribe(ctx context.Context, params ...string) error {
	if g.client == nil {
		return &errors.ErrNilClient{}
	}
	return g.subscribe(ctx)
}

func (g *Gnmi) SubscribeInternal(ctx context.Context, handleSubscribeRepsonse any) error {
	if g.client == nil {
		return &errors.ErrNilClient{}
	}

	callback, ok := handleSubscribeRepsonse.(func(*gpb.SubscribeResponse))
	if !ok {
		// TODO: fix error, change to wrong method or sth
		return &errors.ErrNotYetImplemented{}
	}

	return g.subscribeInternal(ctx, callback)
}

// Type returns the gNMI transport type
func (g *Gnmi) Type() string {
	return "gnmi"
}

// ProcessResponse takes a gNMI response and serializes the contents to the
// root struct. It logs all errors and returns an error containing the number
// off errors encountered during the process.
func (g *Gnmi) ProcessResponse(resp interface{}, root interface{}, s *ytypes.Schema) error {
	d, ok := root.(ygot.ValidatedGoStruct)
	if !ok {
		return &errors.ErrInvalidTypeAssertion{
			Value: root,
			Type:  (*ygot.ValidatedGoStruct)(nil),
		}
	}
	r, ok := resp.(*gpb.GetResponse)
	if !ok {
		return &errors.ErrInvalidTypeAssertion{
			Value: resp,
			Type:  &gpb.GetResponse{},
		}
	}
	rn := r.Notification
	errs := make([]error, 0)
	for _, msg := range rn {
		for _, update := range msg.Update {
			path := update.Path
			switch val := update.Val.Value.(type) {
			case *gpb.TypedValue_JsonVal:
				opts := []ytypes.UnmarshalOpt{&ytypes.IgnoreExtraFields{}}
				if err := g.Unmarshal(val.JsonVal, path, d, opts...); err != nil {
					errs = append(errs, err)
				}
			case *gpb.TypedValue_JsonIetfVal:
				opts := []ytypes.UnmarshalOpt{&ytypes.IgnoreExtraFields{}}
				if err := g.Unmarshal(val.JsonIetfVal, path, d, opts...); err != nil {
					errs = append(errs, err)
				}
			default:
				schema := s.RootSchema()
				opts := []ytypes.SetNodeOpt{&ytypes.InitMissingElements{}, &ytypes.TolerateJSONInconsistencies{}}
				if err := g.SetNode(schema, root, update.Path, update.Val, opts...); err != nil {
					errs = append(errs, err)
				}
			}
		}
	}
	for _, e := range errs {
		log.Error(e)
	}
	if len(errs) != 0 {
		return fmt.Errorf("encountered %v errors during response processing\n%v", len(errs), errs)
	}
	return nil
}

// Capabilities calls GNMI capabilities
func (g *Gnmi) Capabilities(ctx context.Context) (interface{}, error) {
	log.WithFields(log.Fields{
		"target": g.Options.Address,
	}).Info("sending gNMI capabilities request")
	ctx = gnmi.NewContext(ctx, g.config)
	ctx = context.WithValue(ctx, types.CtxKeyConfig, g.config) //nolint
	resp, err := g.client.Capabilities(ctx, &gpb.CapabilityRequest{})
	if err != nil {
		return nil, err
	}
	return resp, nil
}

// get calls GNMI get
func (g *Gnmi) get(ctx context.Context, paths [][]string, origin string) (interface{}, error) {
	ctx = context.WithValue(ctx, types.CtxKeyConfig, g.config) //nolint
	req, err := gnmi.NewGetRequest(ctx, paths, origin)
	if err != nil {
		return nil, err
	}
	return g.getWithRequest(ctx, req)
}

// getWithRequest takes a fully formed GetRequest, performs the Get,
// and returns any response.
func (g *Gnmi) getWithRequest(ctx context.Context, req *gpb.GetRequest) (interface{}, error) {
	if req == nil {
		return nil, &errors.ErrNil{}
	}
	log.WithFields(log.Fields{
		"target": g.Options.Address,
		"path":   req.Path,
	}).Info("sending gNMI get request")

	resp, err := g.client.Get(ctx, req)
	if err != nil {
		return nil, err
	}
	return resp, nil
}

// Subscribe calls GNMI subscribe
func (g *Gnmi) subscribe(ctx context.Context) error {
	ctx = gnmi.NewContext(ctx, g.config)
	opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
	if !ok {
		return &errors.ErrInvalidTypeAssertion{
			Value: ctx.Value(types.CtxKeyOpts),
			Type:  &gnmi.SubscribeOptions{},
		}
	}
	go func() {
		log.WithFields(log.Fields{
			"address":  opts.Target,
			"paths":    opts.Paths,
			"mode":     opts.Mode,
			"interval": opts.SampleInterval,
		}).Info("subscribed to gNMI target")
		for {
			resp := <-g.RespChan
			if resp != nil {
				if err := gnmi.LogSubscribeResponse(resp); err != nil {
					log.Fatal(err)
				}
			}
		}
	}()
	return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
}

// Subscribe calls GNMI subscribe
func (g *Gnmi) subscribeInternal(ctx context.Context, subcribeCallbackFunc func(*gpb.SubscribeResponse)) error { // add callback function instead of chan string as parameter
	ctx = gnmi.NewContext(ctx, g.config)
	opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
	if !ok {
		return &errors.ErrInvalidTypeAssertion{
			Value: ctx.Value(types.CtxKeyOpts),
			Type:  &gnmi.SubscribeOptions{},
		}
	}

	go func() {
		log.WithFields(log.Fields{
			"address":  opts.Target,
			"paths":    opts.Paths,
			"mode":     opts.Mode,
			"interval": opts.SampleInterval,
		}).Info("subscribed to gNMI target")
		for {
			resp := <-g.RespChan
			if resp != nil {
				// do callback(respMessage)
				go subcribeCallbackFunc(resp)
			}
		}
	}()
	return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
}

// LogSubscribeResponse logs update responses to stderr.
func LogSubscribeResponse(response *gpb.SubscribeResponse, stringRespChan chan string) error {
	switch resp := response.Response.(type) {
	case *gpb.SubscribeResponse_Error:
		//return errors.New(resp.Error.Message)
		//TODO: fix error
		return errors.ErrNotYetImplemented{}
	case *gpb.SubscribeResponse_SyncResponse:
		if !resp.SyncResponse {
			//return errors.New("initial sync failed")
			//TODO: fix error
			return errors.ErrNotYetImplemented{}
		}
	case *gpb.SubscribeResponse_Update:
		t := time.Unix(0, resp.Update.Timestamp).UTC()
		prefix := resp.Update.Prefix
		var target string
		if t := resp.Update.Prefix.GetTarget(); t != "" {
			target = "(" + t + ") "
		}
		for _, update := range resp.Update.Update {
			stringRespChan <- fmt.Sprintf("[%s] %s%s = %s\n", t.Format(time.RFC3339Nano),
				target,
				path.Join(prefix.String(), update.Path.String()),
				update.String())
		}
		for _, del := range resp.Update.Delete {
			stringRespChan <- fmt.Sprintf("[%s] %sDeleted %s\n", t.Format(time.RFC3339Nano),
				target,
				path.Join(prefix.String(), del.String()))
		}
	}
	return nil
}

// Close calls GNMI close
func (g *Gnmi) Close() error {
	return nil
}

// SetPassthrough allows to pass an existing SetRequest. Used for cSBI
func (g *Gnmi) SetPassthrough(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse, error) {
	return g.client.Set(ctx, req)
}

// GetPassthrough allows to pass an existing GetRequest. Used for cSBI
func (g *Gnmi) GetPassthrough(ctx context.Context, req *gpb.GetRequest) (*gpb.GetResponse, error) {
	return g.client.Get(ctx, req)
}