Newer
Older
package nucleus
import (
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
tpInterface "code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound"
ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
gpb "github.com/openconfig/gnmi/proto/gnmi"
"github.com/openconfig/ygot/util"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
)
// Gnmi implements the Transport interface and provides an SBI with the
// possibility to access a gNMI endpoint.
SetNode func(schema *yang.Entry, root interface{}, path *gpb.Path, val interface{}, opts ...ytypes.SetNodeOpt) error
RespChan chan *gpb.SubscribeResponse
Unmarshal func([]byte, *gpb.Path, ygot.GoStruct, ...ytypes.UnmarshalOpt) error
// newGnmiTransport takes a struct of GnmiTransportOptions and returns a Gnmi
// transport based on the values of it.
// Do not call directly. Use NewTransport() instead.
func newGnmiTransport(opts *tpb.TransportOption, sbi southbound.SouthboundInterface) (*Gnmi, error) {
if opts == nil || sbi == nil {
return nil, &errors.ErrInvalidParameters{
Func: newGnmiTransport,
Param: "'opts' and 'sbi' can not be nil",
}
} else if opts.TransportOption == nil {
return nil, &errors.ErrInvalidParameters{
Func: newGnmiTransport,
Param: "'opts.TransportOption' can not be nil",
}
gnmiConfig := &gnmi.Config{
Addr: opts.Address,
Password: opts.Password,
Username: opts.Username,
TLS: opts.Tls,
Malte Bauch
committed
Encoding: gpb.Encoding_JSON_IETF,
Compression: opts.GetGnmiTransportOption().GetCompression(),
"target": opts.Address,
"tls": opts.Tls,
}).Info("building new gNMI transport")
SetNode: sbi.SetNode,
RespChan: make(chan *gpb.SubscribeResponse),
Unmarshal: sbi.Unmarshal,
Options: opts,
client: c,
config: gnmiConfig,
// Get takes a slice of gnmi paths, splits them and calls get for each one of them.
func (g *Gnmi) Get(ctx context.Context, params ...string) (interface{}, error) {
ctx = gnmi.NewContext(ctx, g.config)
paths := gnmi.SplitPaths(params)
return g.get(ctx, paths, "")
}
// Set takes a change.Payload struct.
func (g *Gnmi) Set(ctx context.Context, payload change.Payload, path string, schema *ytypes.Schema) error {
p, err := ygot.StringToStructuredPath(path)
if err != nil {
return err
}
return g.applyDiff(ctx, payload, p, schema)
// isGNMINotificationEmpty checks if the given gnmi.Notification does not
// contain any updates or deletes.
func isGNMINotificationEmpty(n *gpb.Notification) bool {
if n.Update == nil || len(n.Update) == 0 {
if n.Delete == nil || len(n.Delete) == 0 {
return true
}
}
return false
}
func (g *Gnmi) applyDiff(ctx context.Context, payload change.Payload, path *gpb.Path, schema *ytypes.Schema) error {
diff, err := ygot.Diff(payload.Original, payload.Modified)
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
if isGNMINotificationEmpty(diff) {
return errors.ErrNoNewChanges{Original: payload.Original, Modified: payload.Modified}
}
var json []byte
if op := ctx.Value(types.CtxKeyOperation); op == ppb.ApiOperation_API_OPERATION_UPDATE || op == ppb.ApiOperation_API_OPERATION_REPLACE {
rootCopy, err := ygot.DeepCopy(schema.Root)
if err != nil {
return err
}
for _, u := range diff.Update {
opts := []ytypes.SetNodeOpt{&ytypes.InitMissingElements{}, &ytypes.TolerateJSONInconsistencies{}}
if err := g.SetNode(schema.RootSchema(), rootCopy, u.GetPath(), u.GetVal(), opts...); err != nil {
return err
}
}
ygot.PruneEmptyBranches(rootCopy)
opts := []ytypes.GetNodeOpt{
&ytypes.GetHandleWildcards{},
}
nodes, err := ytypes.GetNode(schema.RootSchema(), rootCopy, path, opts...)
if err != nil {
return err
}
if len(nodes) == 0 || err != nil || util.IsValueNil(nodes[0].Data) {
return errors.ErrPathNotFound{Path: path, Err: err}
}
json, err = ygot.Marshal7951(nodes[0].Data, &ygot.RFC7951JSONConfig{AppendModuleName: true})
if err != nil {
return err
}
}
req, err := createSetRequest(ctx, diff, json, path)
if err != nil {
return err
}
resp, err := g.client.Set(ctx, req)
log.Info(resp)
return err
}
func createSetRequest(ctx context.Context, diff *gpb.Notification, json []byte, path *gpb.Path) (*gpb.SetRequest, error) {
op := ctx.Value(types.CtxKeyOperation)
req := &gpb.SetRequest{}
if diff.Update != nil {
switch op {
req.Update = []*gpb.Update{{
Path: path,
Val: &gpb.TypedValue{
Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: json},
},
}}
req.Replace = []*gpb.Update{{
Path: path,
Val: &gpb.TypedValue{
Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: json},
},
}}
return nil, &errors.ErrOperationNotSupported{Op: op}
}
} else if diff.Delete != nil {
req.Delete = diff.Delete
}
return req, nil
func (g *Gnmi) Subscribe(ctx context.Context, params ...string) error {
Fabian Seidl
committed
return g.subscribe(ctx)
}
// ControlPlaneSubscribe is used to subscribe to devices from within the controller. gNMI SubscribeOptions need to be provided in the context,
// the callback function handles the responses received from the subscription.
func (g *Gnmi) ControlPlaneSubscribe(ctx context.Context, subscribeCallbackFunc tpInterface.HandleSubscribeResponse, subscriptionInfo *tpInterface.SubscriptionInformation) error {
Fabian Seidl
committed
if g.client == nil {
return &errors.ErrNilClient{}
}
return g.controlPlaneSubscribe(ctx, subscribeCallbackFunc, subscriptionInfo)
// Type returns the gNMI transport type
func (g *Gnmi) Type() string {
// ProcessResponse takes a gNMI response and serializes the contents to the
// root struct. It logs all errors and returns an error containing the number
// off errors encountered during the process.
func (g *Gnmi) ProcessResponse(resp interface{}, root interface{}, s *ytypes.Schema) error {
d, ok := root.(ygot.ValidatedGoStruct)
if !ok {
return &errors.ErrInvalidTypeAssertion{
Value: root,
Type: (*ygot.ValidatedGoStruct)(nil),
}
}
r, ok := resp.(*gpb.GetResponse)
if !ok {
return &errors.ErrInvalidTypeAssertion{
Value: resp,
Type: &gpb.GetResponse{},
}
if err := g.processResponseUpdates(msg.Update, d, root.(ygot.ValidatedGoStruct), s); err != nil {
return err
// ProcessControlPlaneSubscribeResponse processes the gNMI notification within the subscribe response, updating the provided device model.
func (g *Gnmi) ProcessControlPlaneSubscribeResponse(resp *gpb.SubscribeResponse_Update, root any, schema *ytypes.Schema) error {
dModel, ok := root.(ygot.ValidatedGoStruct)
if !ok {
return &errors.ErrInvalidTypeAssertion{
Value: root,
Type: (*ygot.ValidatedGoStruct)(nil),
}
}
notification := resp.Update
if len(notification.Update) > 0 {
if err := g.processResponseUpdates(notification.Update, dModel, root.(ygot.ValidatedGoStruct), schema); err != nil {
return err
}
//TODO: maybe remove and combine into one function/method!
if len(notification.Delete) > 0 {
if err := g.processResponseDeletes(notification.Delete, dModel, schema); err != nil {
return err
}
}
return nil
}
func (g *Gnmi) processResponseUpdates(updates []*gpb.Update, deviceModel ygot.ValidatedGoStruct, root ygot.ValidatedGoStruct, s *ytypes.Schema) error {
errs := make([]error, 0)
for _, update := range updates {
path := update.Path
switch val := update.Val.Value.(type) {
case *gpb.TypedValue_JsonVal:
opts := []ytypes.UnmarshalOpt{&ytypes.IgnoreExtraFields{}}
if err := g.Unmarshal(val.JsonVal, path, deviceModel, opts...); err != nil {
errs = append(errs, err)
}
case *gpb.TypedValue_JsonIetfVal:
opts := []ytypes.UnmarshalOpt{&ytypes.IgnoreExtraFields{}}
if err := g.Unmarshal(val.JsonIetfVal, path, deviceModel, opts...); err != nil {
errs = append(errs, err)
}
default:
schema := s.RootSchema()
opts := []ytypes.SetNodeOpt{&ytypes.InitMissingElements{}, &ytypes.TolerateJSONInconsistencies{}}
if err := g.SetNode(schema, root, update.Path, update.Val, opts...); err != nil {
errs = append(errs, err)
}
}
}
if len(errs) != 0 {
return handleProcessResponseErrors(errs)
}
return nil
}
func handleProcessResponseErrors(errs []error) error {
for _, e := range errs {
log.Error(e)
}
return fmt.Errorf("encountered %v errors during response processing\n%v", len(errs), errs)
}
func (g *Gnmi) processResponseDeletes(deletes []*gpb.Path, root ygot.ValidatedGoStruct, s *ytypes.Schema) error {
errs := make([]error, 0)
for _, path := range deletes {
if err := ytypes.DeleteNode(s.RootSchema(), root, path); err != nil {
errs = append(errs, err)
}
modelAsString, _ := ygot.EmitJSON(root, &ygot.EmitJSONConfig{
Format: ygot.RFC7951,
Indent: "",
SkipValidation: true,
RFC7951Config: &ygot.RFC7951JSONConfig{
AppendModuleName: true,
}})
if err := g.Unmarshal([]byte(modelAsString), path, root); err != nil {
errs = append(errs, err)
}
log.Error(root)
if len(errs) != 0 {
return handleProcessResponseErrors(errs)
}
}
// Capabilities calls GNMI capabilities
func (g *Gnmi) Capabilities(ctx context.Context) (interface{}, error) {
}).Info("sending gNMI capabilities request")
ctx = gnmi.NewContext(ctx, g.config)
ctx = context.WithValue(ctx, types.CtxKeyConfig, g.config) //nolint
resp, err := g.client.Capabilities(ctx, &gpb.CapabilityRequest{})
if err != nil {
return nil, err
}
return resp, nil
}
func (g *Gnmi) get(ctx context.Context, paths [][]string, origin string) (interface{}, error) {
ctx = context.WithValue(ctx, types.CtxKeyConfig, g.config) //nolint
req, err := gnmi.NewGetRequest(ctx, paths, origin)
if err != nil {
return nil, err
}
return g.getWithRequest(ctx, req)
}
// getWithRequest takes a fully formed GetRequest, performs the Get,
// and returns any response.
func (g *Gnmi) getWithRequest(ctx context.Context, req *gpb.GetRequest) (interface{}, error) {
"path": req.Path,
}).Info("sending gNMI get request")
resp, err := g.client.Get(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
}
func (g *Gnmi) subscribe(ctx context.Context) error {
opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
return &errors.ErrInvalidTypeAssertion{
Value: ctx.Value(types.CtxKeyOpts),
Type: &gnmi.SubscribeOptions{},
"address": opts.Target,
"paths": opts.Paths,
"mode": opts.Mode,
"interval": opts.SampleInterval,
}).Info("subscribed to gNMI target")
for {
resp := <-g.RespChan
if resp != nil {
if err := gnmi.LogSubscribeResponse(resp); err != nil {
log.Fatal(err)
}
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
}
// controlPlaneSubscribe calls gNMI subscribe with a callback for responses and additional device information including
func (g *Gnmi) controlPlaneSubscribe(ctx context.Context, subcribeCallbackFunc func(*gpb.SubscribeResponse,
*tpInterface.SubscriptionInformation), subscriptionInfo *tpInterface.SubscriptionInformation) error {
ctx = gnmi.NewContext(ctx, g.config)
opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
if !ok {
return &errors.ErrInvalidTypeAssertion{
Value: ctx.Value(types.CtxKeyOpts),
Type: &gnmi.SubscribeOptions{},
}
}
go func() {
log.WithFields(log.Fields{
"address": opts.Target,
"paths": opts.Paths,
"mode": opts.Mode,
"interval": opts.SampleInterval,
}).Info("subscribed to gNMI target")
for {
resp := <-g.RespChan
if resp != nil {
Fabian Seidl
committed
// callback to trigger internal event handling process
Fabian Seidl
committed
go subcribeCallbackFunc(resp, subscriptionInfo)
}
select {
case <-subscriptionInfo.StopContext.Done():
if err := subscriptionInfo.StopContext.Err(); err != nil {
log.Error(err)
}
return
}
}
}()
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
}
// Close calls GNMI close
func (g *Gnmi) Close() error {
return nil
}
// CustomSet allows to build a custom set request.
func (g *Gnmi) CustomSet(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse, error) {
ctx = gnmi.NewContext(ctx, g.config)
return g.client.Set(ctx, req)
}
// SetPassthrough allows to pass an existing SetRequest. Used for cSBI
func (g *Gnmi) SetPassthrough(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse, error) {
return g.client.Set(ctx, req)
}
// GetPassthrough allows to pass an existing GetRequest. Used for cSBI
func (g *Gnmi) GetPassthrough(ctx context.Context, req *gpb.GetRequest) (*gpb.GetResponse, error) {
return g.client.Get(ctx, req)
}