Newer
Older
package nucleus
import (
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/southbound"
ppb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/pnd"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
gpb "github.com/openconfig/gnmi/proto/gnmi"
"github.com/openconfig/ygot/util"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
)
// Gnmi implements the Transport interface and provides an SBI with the
// possibility to access a gNMI endpoint.
SetNode func(schema *yang.Entry, root interface{}, path *gpb.Path, val interface{}, opts ...ytypes.SetNodeOpt) error
RespChan chan *gpb.SubscribeResponse
Unmarshal func([]byte, *gpb.Path, ygot.GoStruct, ...ytypes.UnmarshalOpt) error
// newGnmiTransport takes a struct of GnmiTransportOptions and returns a Gnmi
// transport based on the values of it.
// Do not call directly. Use NewTransport() instead.
func newGnmiTransport(opts *tpb.TransportOption, sbi southbound.SouthboundInterface) (*Gnmi, error) {
if opts == nil || sbi == nil {
return nil, &errors.ErrInvalidParameters{
Func: newGnmiTransport,
Param: "'opts' and 'sbi' can not be nil",
}
} else if opts.TransportOption == nil {
return nil, &errors.ErrInvalidParameters{
Func: newGnmiTransport,
Param: "'opts.TransportOption' can not be nil",
}
gnmiConfig := &gnmi.Config{
Addr: opts.Address,
Password: opts.Password,
Username: opts.Username,
TLS: opts.Tls,
Malte Bauch
committed
Encoding: gpb.Encoding_JSON_IETF,
Compression: opts.GetGnmiTransportOption().GetCompression(),
"target": opts.Address,
"tls": opts.Tls,
}).Info("building new gNMI transport")
SetNode: sbi.SetNode,
RespChan: make(chan *gpb.SubscribeResponse),
Unmarshal: sbi.Unmarshal,
Options: opts,
client: c,
config: gnmiConfig,
// Get takes a slice of gnmi paths, splits them and calls get for each one of them.
func (g *Gnmi) Get(ctx context.Context, params ...string) (interface{}, error) {
ctx = gnmi.NewContext(ctx, g.config)
paths := gnmi.SplitPaths(params)
return g.get(ctx, paths, "")
}
// Set takes a change.Payload struct.
func (g *Gnmi) Set(ctx context.Context, payload change.Payload, path string, schema *ytypes.Schema) error {
p, err := ygot.StringToStructuredPath(path)
if err != nil {
return err
}
return g.applyDiff(ctx, payload, p, schema)
// isGNMINotificationEmpty checks if the given gnmi.Notification does not
// contain any updates or deletes.
func isGNMINotificationEmpty(n *gpb.Notification) bool {
if n.Update == nil || len(n.Update) == 0 {
if n.Delete == nil || len(n.Delete) == 0 {
return true
}
}
return false
}
func (g *Gnmi) applyDiff(ctx context.Context, payload change.Payload, path *gpb.Path, schema *ytypes.Schema) error {
diff, err := ygot.Diff(payload.Original, payload.Modified)
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
if isGNMINotificationEmpty(diff) {
return errors.ErrNoNewChanges{Original: payload.Original, Modified: payload.Modified}
}
var json []byte
if op := ctx.Value(types.CtxKeyOperation); op == ppb.ApiOperation_API_OPERATION_UPDATE || op == ppb.ApiOperation_API_OPERATION_REPLACE {
rootCopy, err := ygot.DeepCopy(schema.Root)
if err != nil {
return err
}
for _, u := range diff.Update {
opts := []ytypes.SetNodeOpt{&ytypes.InitMissingElements{}, &ytypes.TolerateJSONInconsistencies{}}
if err := g.SetNode(schema.RootSchema(), rootCopy, u.GetPath(), u.GetVal(), opts...); err != nil {
return err
}
}
ygot.PruneEmptyBranches(rootCopy)
opts := []ytypes.GetNodeOpt{
&ytypes.GetHandleWildcards{},
}
nodes, err := ytypes.GetNode(schema.RootSchema(), rootCopy, path, opts...)
if err != nil {
return err
}
if len(nodes) == 0 || err != nil || util.IsValueNil(nodes[0].Data) {
return errors.ErrPathNotFound{Path: path, Err: err}
}
json, err = ygot.Marshal7951(nodes[0].Data, &ygot.RFC7951JSONConfig{AppendModuleName: true})
if err != nil {
return err
}
}
req, err := createSetRequest(ctx, diff, json, path)
if err != nil {
return err
}
resp, err := g.client.Set(ctx, req)
log.Info(resp)
return err
}
func createSetRequest(ctx context.Context, diff *gpb.Notification, json []byte, path *gpb.Path) (*gpb.SetRequest, error) {
op := ctx.Value(types.CtxKeyOperation)
req := &gpb.SetRequest{}
if diff.Update != nil {
switch op {
req.Update = []*gpb.Update{{
Path: path,
Val: &gpb.TypedValue{
Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: json},
},
}}
req.Replace = []*gpb.Update{{
Path: path,
Val: &gpb.TypedValue{
Value: &gpb.TypedValue_JsonIetfVal{JsonIetfVal: json},
},
}}
return nil, &errors.ErrOperationNotSupported{Op: op}
}
} else if diff.Delete != nil {
req.Delete = diff.Delete
}
return req, nil
func (g *Gnmi) Subscribe(ctx context.Context, params ...string) error {
// Type returns the gNMI transport type
func (g *Gnmi) Type() string {
// ProcessResponse takes a gNMI response and serializes the contents to the
// root struct. It logs all errors and returns an error containing the number
// off errors encountered during the process.
func (g *Gnmi) ProcessResponse(resp interface{}, root interface{}, s *ytypes.Schema) error {
d, ok := root.(ygot.ValidatedGoStruct)
if !ok {
return &errors.ErrInvalidTypeAssertion{
Value: root,
Type: (*ygot.ValidatedGoStruct)(nil),
}
}
r, ok := resp.(*gpb.GetResponse)
if !ok {
return &errors.ErrInvalidTypeAssertion{
Value: resp,
Type: &gpb.GetResponse{},
}
for _, update := range msg.Update {
path := update.Path
switch val := update.Val.Value.(type) {
case *gpb.TypedValue_JsonVal:
opts := []ytypes.UnmarshalOpt{&ytypes.IgnoreExtraFields{}}
Malte Bauch
committed
if err := g.Unmarshal(val.JsonVal, path, d, opts...); err != nil {
errs = append(errs, err)
}
case *gpb.TypedValue_JsonIetfVal:
opts := []ytypes.UnmarshalOpt{&ytypes.IgnoreExtraFields{}}
Malte Bauch
committed
if err := g.Unmarshal(val.JsonIetfVal, path, d, opts...); err != nil {
Malte Bauch
committed
schema := s.RootSchema()
opts := []ytypes.SetNodeOpt{&ytypes.InitMissingElements{}, &ytypes.TolerateJSONInconsistencies{}}
if err := g.SetNode(schema, root, update.Path, update.Val, opts...); err != nil {
errs = append(errs, err)
}
for _, e := range errs {
log.Error(e)
}
if len(errs) != 0 {
return fmt.Errorf("encountered %v errors during response processing\n%v", len(errs), errs)
// Capabilities calls GNMI capabilities
func (g *Gnmi) Capabilities(ctx context.Context) (interface{}, error) {
}).Info("sending gNMI capabilities request")
ctx = gnmi.NewContext(ctx, g.config)
ctx = context.WithValue(ctx, types.CtxKeyConfig, g.config) //nolint
resp, err := g.client.Capabilities(ctx, &gpb.CapabilityRequest{})
if err != nil {
return nil, err
}
return resp, nil
}
func (g *Gnmi) get(ctx context.Context, paths [][]string, origin string) (interface{}, error) {
ctx = context.WithValue(ctx, types.CtxKeyConfig, g.config) //nolint
req, err := gnmi.NewGetRequest(ctx, paths, origin)
if err != nil {
return nil, err
}
return g.getWithRequest(ctx, req)
}
// getWithRequest takes a fully formed GetRequest, performs the Get,
// and returns any response.
func (g *Gnmi) getWithRequest(ctx context.Context, req *gpb.GetRequest) (interface{}, error) {
"path": req.Path,
}).Info("sending gNMI get request")
resp, err := g.client.Get(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
}
// Subscribe calls GNMI subscribe
func (g *Gnmi) subscribe(ctx context.Context) error {
opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
return &errors.ErrInvalidTypeAssertion{
Value: ctx.Value(types.CtxKeyOpts),
Type: &gnmi.SubscribeOptions{},
"address": opts.Target,
"paths": opts.Paths,
"mode": opts.Mode,
"interval": opts.SampleInterval,
}).Info("subscribed to gNMI target")
for {
resp := <-g.RespChan
if resp != nil {
if err := gnmi.LogSubscribeResponse(resp); err != nil {
log.Fatal(err)
}
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
}
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
// Subscribe calls GNMI subscribe
func (g *Gnmi) subscribe2(ctx context.Context, stringRespChan chan string) error {
ctx = gnmi.NewContext(ctx, g.config)
opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
if !ok {
return &errors.ErrInvalidTypeAssertion{
Value: ctx.Value(types.CtxKeyOpts),
Type: &gnmi.SubscribeOptions{},
}
}
go func() {
log.WithFields(log.Fields{
"address": opts.Target,
"paths": opts.Paths,
"mode": opts.Mode,
"interval": opts.SampleInterval,
}).Info("subscribed to gNMI target")
for {
resp := <-g.RespChan
if resp != nil {
if err := LogSubscribeResponse(resp, stringRespChan); err != nil {
log.Fatal(err)
}
}
}
}()
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
}
// LogSubscribeResponse logs update responses to stderr.
func LogSubscribeResponse(response *gpb.SubscribeResponse, stringRespChan chan string) error {
switch resp := response.Response.(type) {
case *gpb.SubscribeResponse_Error:
//return errors.New(resp.Error.Message)
//TODO: fix error
return errors.ErrNotYetImplemented{}
case *gpb.SubscribeResponse_SyncResponse:
if !resp.SyncResponse {
//return errors.New("initial sync failed")
//TODO: fix error
return errors.ErrNotYetImplemented{}
}
case *gpb.SubscribeResponse_Update:
t := time.Unix(0, resp.Update.Timestamp).UTC()
prefix := resp.Update.Prefix
var target string
if t := resp.Update.Prefix.GetTarget(); t != "" {
target = "(" + t + ") "
}
for _, update := range resp.Update.Update {
stringRespChan <- fmt.Sprintf("[%s] %s%s = %s\n", t.Format(time.RFC3339Nano),
target,
path.Join(prefix.String(), update.Path.String()),
update.String())
}
for _, del := range resp.Update.Delete {
stringRespChan <- fmt.Sprintf("[%s] %sDeleted %s\n", t.Format(time.RFC3339Nano),
target,
path.Join(prefix.String(), del.String()))
}
}
return nil
}
// Close calls GNMI close
func (g *Gnmi) Close() error {
return nil
}
// SetPassthrough allows to pass an existing SetRequest. Used for cSBI
func (g *Gnmi) SetPassthrough(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse, error) {
return g.client.Set(ctx, req)
}
// GetPassthrough allows to pass an existing GetRequest. Used for cSBI
func (g *Gnmi) GetPassthrough(ctx context.Context, req *gpb.GetRequest) (*gpb.GetResponse, error) {
return g.client.Get(ctx, req)
}