Newer
Older
package nucleus
import (
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound"
ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
gpb "github.com/openconfig/gnmi/proto/gnmi"
"github.com/openconfig/ygot/util"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
)
// Gnmi implements the Transport interface and provides an SBI with the
// possibility to access a gNMI endpoint.
SetNode func(schema *yang.Entry, root interface{}, path *gpb.Path, val interface{}, opts ...ytypes.SetNodeOpt) error
RespChan chan *gpb.SubscribeResponse
Unmarshal func([]byte, *gpb.Path, ygot.GoStruct, ...ytypes.UnmarshalOpt) error
// newGnmiTransport takes a struct of GnmiTransportOptions and returns a Gnmi
// transport based on the values of it.
// Do not call directly. Use NewTransport() instead.
func newGnmiTransport(opts *tpb.TransportOption, sbi southbound.SouthboundInterface) (*Gnmi, error) {
if opts == nil || sbi == nil {
return nil, &errors.ErrInvalidParameters{
Func: newGnmiTransport,
Param: "'opts' and 'sbi' can not be nil",
}
} else if opts.TransportOption == nil {
return nil, &errors.ErrInvalidParameters{
Func: newGnmiTransport,
Param: "'opts.TransportOption' can not be nil",
}
gnmiConfig := &gnmi.Config{
Addr: opts.Address,
Password: opts.Password,
Username: opts.Username,
TLS: opts.Tls,
Malte Bauch
committed
Encoding: gpb.Encoding_JSON_IETF,
Compression: opts.GetGnmiTransportOption().GetCompression(),
"target": opts.Address,
"tls": opts.Tls,
}).Info("building new gNMI transport")
SetNode: sbi.SetNode,
RespChan: make(chan *gpb.SubscribeResponse),
Unmarshal: sbi.Unmarshal,
Options: opts,
client: c,
config: gnmiConfig,
// Get takes a slice of gnmi paths, splits them and calls get for each one of them.
func (g *Gnmi) Get(ctx context.Context, params ...string) (interface{}, error) {
ctx = gnmi.NewContext(ctx, g.config)
paths := gnmi.SplitPaths(params)
return g.get(ctx, paths, "")
}
// Set takes a change.Payload struct.
func (g *Gnmi) Set(ctx context.Context, payload change.Payload, path string, schema *ytypes.Schema) error {
p, err := ygot.StringToStructuredPath(path)
if err != nil {
return err
}
return g.applyDiff(ctx, payload, p, schema)
// isGNMINotificationEmpty checks if the given gnmi.Notification does not
// contain any updates or deletes.
func isGNMINotificationEmpty(n *gpb.Notification) bool {
if n.Update == nil || len(n.Update) == 0 {
if n.Delete == nil || len(n.Delete) == 0 {
return true
}
}
return false
}
func (g *Gnmi) applyDiff(ctx context.Context, payload change.Payload, path *gpb.Path, schema *ytypes.Schema) error {
diff, err := ygot.Diff(payload.Original, payload.Modified)
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
if isGNMINotificationEmpty(diff) {
return errors.ErrNoNewChanges{Original: payload.Original, Modified: payload.Modified}
}
var json []byte
if op := ctx.Value(types.CtxKeyOperation); op == ppb.ApiOperation_API_OPERATION_UPDATE || op == ppb.ApiOperation_API_OPERATION_REPLACE {
rootCopy, err := ygot.DeepCopy(schema.Root)
if err != nil {
return err
}
for _, u := range diff.Update {
opts := []ytypes.SetNodeOpt{&ytypes.InitMissingElements{}, &ytypes.TolerateJSONInconsistencies{}}
if err := g.SetNode(schema.RootSchema(), rootCopy, u.GetPath(), u.GetVal(), opts...); err != nil {
return err
}
}
ygot.PruneEmptyBranches(rootCopy)
opts := []ytypes.GetNodeOpt{
&ytypes.GetHandleWildcards{},
}
nodes, err := ytypes.GetNode(schema.RootSchema(), rootCopy, path, opts...)
if err != nil {
return err
}
if len(nodes) == 0 || err != nil || util.IsValueNil(nodes[0].Data) {
return errors.ErrPathNotFound{Path: path, Err: err}
}
json, err = ygot.Marshal7951(nodes[0].Data, &ygot.RFC7951JSONConfig{AppendModuleName: true})
if err != nil {
return err
}
}
req, err := createSetRequest(ctx, diff, json, path)
if err != nil {
return err
}
resp, err := g.client.Set(ctx, req)
log.Info(resp)
return err
}
func createSetRequest(ctx context.Context, diff *gpb.Notification, json []byte, path *gpb.Path) (*gpb.SetRequest, error) {
op := ctx.Value(types.CtxKeyOperation)
req := &gpb.SetRequest{}
if diff.Update != nil {
switch op {
req.Update = []*gpb.Update{{
Path: path,
Val: &gpb.TypedValue{
Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: json},
},
}}
req.Replace = []*gpb.Update{{
Path: path,
Val: &gpb.TypedValue{
Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: json},
},
}}
return nil, &errors.ErrOperationNotSupported{Op: op}
}
} else if diff.Delete != nil {
req.Delete = diff.Delete
}
return req, nil
func (g *Gnmi) Subscribe(ctx context.Context, params ...string) error {
// Type returns the gNMI transport type
func (g *Gnmi) Type() string {
// ProcessResponse takes a gNMI response and serializes the contents to the
// root struct. It logs all errors and returns an error containing the number
// off errors encountered during the process.
func (g *Gnmi) ProcessResponse(resp interface{}, root interface{}, s *ytypes.Schema) error {
d, ok := root.(ygot.ValidatedGoStruct)
if !ok {
return &errors.ErrInvalidTypeAssertion{
Value: root,
Type: (*ygot.ValidatedGoStruct)(nil),
}
}
r, ok := resp.(*gpb.GetResponse)
if !ok {
return &errors.ErrInvalidTypeAssertion{
Value: resp,
Type: &gpb.GetResponse{},
}
for _, update := range msg.Update {
path := update.Path
switch val := update.Val.Value.(type) {
case *gpb.TypedValue_JsonVal:
opts := []ytypes.UnmarshalOpt{&ytypes.IgnoreExtraFields{}}
Malte Bauch
committed
if err := g.Unmarshal(val.JsonVal, path, d, opts...); err != nil {
errs = append(errs, err)
}
case *gpb.TypedValue_JsonIetfVal:
opts := []ytypes.UnmarshalOpt{&ytypes.IgnoreExtraFields{}}
Malte Bauch
committed
if err := g.Unmarshal(val.JsonIetfVal, path, d, opts...); err != nil {
Malte Bauch
committed
schema := s.RootSchema()
opts := []ytypes.SetNodeOpt{&ytypes.InitMissingElements{}, &ytypes.TolerateJSONInconsistencies{}}
if err := g.SetNode(schema, root, update.Path, update.Val, opts...); err != nil {
errs = append(errs, err)
}
for _, e := range errs {
log.Error(e)
}
if len(errs) != 0 {
return fmt.Errorf("encountered %v errors during response processing\n%v", len(errs), errs)
// Capabilities calls GNMI capabilities
func (g *Gnmi) Capabilities(ctx context.Context) (interface{}, error) {
}).Info("sending gNMI capabilities request")
ctx = gnmi.NewContext(ctx, g.config)
ctx = context.WithValue(ctx, types.CtxKeyConfig, g.config) //nolint
resp, err := g.client.Capabilities(ctx, &gpb.CapabilityRequest{})
if err != nil {
return nil, err
}
return resp, nil
}
func (g *Gnmi) get(ctx context.Context, paths [][]string, origin string) (interface{}, error) {
ctx = context.WithValue(ctx, types.CtxKeyConfig, g.config) //nolint
req, err := gnmi.NewGetRequest(ctx, paths, origin)
if err != nil {
return nil, err
}
return g.getWithRequest(ctx, req)
}
// getWithRequest takes a fully formed GetRequest, performs the Get,
// and returns any response.
func (g *Gnmi) getWithRequest(ctx context.Context, req *gpb.GetRequest) (interface{}, error) {
"path": req.Path,
}).Info("sending gNMI get request")
resp, err := g.client.Get(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
}
// Subscribe calls GNMI subscribe
func (g *Gnmi) subscribe(ctx context.Context) error {
opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
return &errors.ErrInvalidTypeAssertion{
Value: ctx.Value(types.CtxKeyOpts),
Type: &gnmi.SubscribeOptions{},
"address": opts.Target,
"paths": opts.Paths,
"mode": opts.Mode,
"interval": opts.SampleInterval,
}).Info("subscribed to gNMI target")
for {
resp := <-g.RespChan
if resp != nil {
if err := gnmi.LogSubscribeResponse(resp); err != nil {
log.Fatal(err)
}
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
}
// Close calls GNMI close
func (g *Gnmi) Close() error {
return nil
}
// SetPassthrough allows to pass an existing SetRequest. Used for cSBI
func (g *Gnmi) SetPassthrough(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse, error) {
return g.client.Set(ctx, req)
}
// GetPassthrough allows to pass an existing GetRequest. Used for cSBI
func (g *Gnmi) GetPassthrough(ctx context.Context, req *gpb.GetRequest) (*gpb.GetResponse, error) {
return g.client.Get(ctx, req)
}