Newer
Older
package nucleus
import (
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
tpInterface "code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
"code.fbi.h-da.de/danet/gosdn/controller/plugin/shared"
Fabian Seidl
committed
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
gpb "github.com/openconfig/gnmi/proto/gnmi"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
)
// Gnmi implements the Transport interface and provides an SBI with the
// possibility to access a gNMI endpoint.
SetNode func(path *gpb.Path, value *gpb.TypedValue) error
DeleteNode func(path *gpb.Path) error
RespChan chan *gpb.SubscribeResponse
Unmarshal func(data []byte, path *gpb.Path) error
Options *tpb.TransportOption
client gpb.GNMIClient
config *gnmi.Config
// newGnmiTransport takes a struct of GnmiTransportOptions and returns a Gnmi
// transport based on the values of it.
// Do not call directly. Use NewTransport() instead.
func newGnmiTransport(opts *tpb.TransportOption, model shared.DeviceModel) (*Gnmi, error) {
if opts == nil || model == nil {
Fabian Seidl
committed
return nil, &customerrs.InvalidParametersError{
Func: newGnmiTransport,
Param: "'opts' and 'sbi' can not be nil",
}
} else if opts.TransportOption == nil {
Fabian Seidl
committed
return nil, &customerrs.InvalidParametersError{
Func: newGnmiTransport,
Param: "'opts.TransportOption' can not be nil",
}
gnmiConfig := &gnmi.Config{
Addr: opts.Address,
Password: opts.Password,
Username: opts.Username,
TLS: opts.Tls,
Malte Bauch
committed
Encoding: gpb.Encoding_JSON_IETF,
Compression: opts.GetGnmiTransportOption().GetCompression(),
"target": opts.Address,
"tls": opts.Tls,
}).Info("building new gNMI transport")
SetNode: model.SetNode,
DeleteNode: model.DeleteNode,
RespChan: make(chan *gpb.SubscribeResponse),
Unmarshal: model.Unmarshal,
Options: opts,
client: c,
config: gnmiConfig,
// Get takes a slice of gnmi paths, splits them and calls get for each one of them.
func (g *Gnmi) Get(ctx context.Context, params ...string) (interface{}, error) {
Fabian Seidl
committed
return nil, &customerrs.NilClientError{}
ctx = gnmi.NewContext(ctx, g.config)
paths := gnmi.SplitPaths(params)
return g.get(ctx, paths, "")
}
// Set takes a change.Payload struct.
func (g *Gnmi) Set(ctx context.Context, payload change.Payload) error {
Fabian Seidl
committed
return &customerrs.NilClientError{}
return g.applyDiff(ctx, payload)
func (g *Gnmi) applyDiff(ctx context.Context, payload change.Payload) error {
diff := payload.Diff
updates := diff.GetUpdate()
deletes := diff.GetDelete()
setRequest := &gpb.SetRequest{
Prefix: diff.GetPrefix(),
Delete: deletes,
Update: updates,
}
resp, err := g.client.Set(ctx, setRequest)
if err != nil {
return err
}
log.Info(resp)
return nil
func (g *Gnmi) Subscribe(ctx context.Context, params ...string) error {
Fabian Seidl
committed
return &customerrs.NilClientError{}
// ControlPlaneSubscribe is used to subscribe to devices from within the controller. gNMI SubscribeOptions need to be provided in the context,
// the callback function handles the responses received from the subscription.
func (g *Gnmi) ControlPlaneSubscribe(ctx context.Context, subscribeCallbackFunc tpInterface.HandleSubscribeResponse, subscriptionInfo *tpInterface.SubscriptionInformation) error {
if g.client == nil {
Fabian Seidl
committed
return &customerrs.NilClientError{}
}
return g.controlPlaneSubscribe(ctx, subscribeCallbackFunc, subscriptionInfo)
}
// Type returns the gNMI transport type.
func (g *Gnmi) Type() string {
// ProcessResponse takes a gNMI response and serializes the contents to the
// root struct. It logs all errors and returns an error containing the number
// off errors encountered during the process.
func (g *Gnmi) ProcessResponse(resp interface{}) error {
Fabian Seidl
committed
return &customerrs.InvalidTypeAssertionError{
Value: resp,
Type: &gpb.GetResponse{},
}
if err := g.processResponseUpdates(msg.Update); err != nil {
return err
}
}
return nil
}
// ProcessControlPlaneSubscribeResponse processes the gNMI notification within the subscribe response, updating the provided device model.
func (g *Gnmi) ProcessControlPlaneSubscribeResponse(resp *gpb.SubscribeResponse_Update) error {
notification := resp.Update
if len(notification.Update) > 0 {
if err := g.processResponseUpdates(notification.Update); err != nil {
return err
}
}
if len(notification.Delete) > 0 {
if err := g.processResponseDeletes(notification.Delete); err != nil {
return err
}
}
return nil
}
func (g *Gnmi) processResponseUpdates(updates []*gpb.Update) error {
errs := make([]error, 0)
for _, update := range updates {
path := update.Path
switch val := update.Val.Value.(type) {
case *gpb.TypedValue_JsonVal:
if err := g.Unmarshal(val.JsonVal, path); err != nil {
errs = append(errs, err)
}
case *gpb.TypedValue_JsonIetfVal:
if err := g.Unmarshal(val.JsonIetfVal, path); err != nil {
errs = append(errs, err)
}
default:
if err := g.SetNode(update.Path, update.Val); err != nil {
errs = append(errs, err)
if len(errs) != 0 {
return handleProcessResponseErrors(errs)
}
return nil
}
func handleProcessResponseErrors(errs []error) error {
for _, e := range errs {
log.Error(e)
}
return fmt.Errorf("encountered %v errors during response processing\n%v", len(errs), errs)
}
func (g *Gnmi) processResponseDeletes(deletes []*gpb.Path) error {
errs := make([]error, 0)
for _, delete := range deletes {
if err := g.DeleteNode(delete); err != nil {
errs = append(errs, err)
}
if len(errs) != 0 {
return handleProcessResponseErrors(errs)
// Capabilities calls GNMI capabilities.
func (g *Gnmi) Capabilities(ctx context.Context) (interface{}, error) {
}).Info("sending gNMI capabilities request")
ctx = gnmi.NewContext(ctx, g.config)
ctx = context.WithValue(ctx, types.CtxKeyConfig, g.config) //nolint
resp, err := g.client.Capabilities(ctx, &gpb.CapabilityRequest{})
if err != nil {
return nil, err
}
return resp, nil
}
// get calls GNMI get.
func (g *Gnmi) get(ctx context.Context, paths [][]string, origin string) (interface{}, error) {
ctx = context.WithValue(ctx, types.CtxKeyConfig, g.config) //nolint
req, err := gnmi.NewGetRequest(ctx, paths, origin)
if err != nil {
return nil, err
}
return g.getWithRequest(ctx, req)
}
// getWithRequest takes a fully formed GetRequest, performs the Get,
// and returns any response.
func (g *Gnmi) getWithRequest(ctx context.Context, req *gpb.GetRequest) (interface{}, error) {
Fabian Seidl
committed
return nil, &customerrs.NilError{}
"path": req.Path,
}).Info("sending gNMI get request")
resp, err := g.client.Get(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
}
// subscribe calls GNMI subscribe.
func (g *Gnmi) subscribe(ctx context.Context) error {
opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
Fabian Seidl
committed
return &customerrs.InvalidTypeAssertionError{
Value: ctx.Value(types.CtxKeyOpts),
Type: &gnmi.SubscribeOptions{},
"address": opts.Target,
"paths": opts.Paths,
"mode": opts.Mode,
"interval": opts.SampleInterval,
}).Info("subscribed to gNMI target")
for {
resp := <-g.RespChan
if resp != nil {
if err := gnmi.LogSubscribeResponse(resp); err != nil {
log.Fatal(err)
}
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
}
// controlPlaneSubscribe calls gNMI subscribe with a callback for responses and additional network element information including
// an option to stop the subscription.
func (g *Gnmi) controlPlaneSubscribe(ctx context.Context, subcribeCallbackFunc func(*gpb.SubscribeResponse,
*tpInterface.SubscriptionInformation), subscriptionInfo *tpInterface.SubscriptionInformation) error {
ctx = gnmi.NewContext(ctx, g.config)
opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
if !ok {
Fabian Seidl
committed
return &customerrs.InvalidTypeAssertionError{
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
Value: ctx.Value(types.CtxKeyOpts),
Type: &gnmi.SubscribeOptions{},
}
}
go func() {
log.WithFields(log.Fields{
"address": opts.Target,
"paths": opts.Paths,
"mode": opts.Mode,
"interval": opts.SampleInterval,
}).Info("subscribed to gNMI target")
for {
resp := <-g.RespChan
if resp != nil {
// callback to trigger internal event handling process
go subcribeCallbackFunc(resp, subscriptionInfo)
}
select {
case <-subscriptionInfo.StopContext.Done():
if err := subscriptionInfo.StopContext.Err(); err != nil {
log.Error(err)
}
return
default:
}
}
}()
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
}
// Close calls GNMI close.
func (g *Gnmi) Close() error {
return nil
}
// CustomSet allows to build a custom set request.
func (g *Gnmi) CustomSet(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse, error) {
ctx = gnmi.NewContext(ctx, g.config)
return g.client.Set(ctx, req)
}
// SetPassthrough allows to pass an existing SetRequest. Used for cSBI.
func (g *Gnmi) SetPassthrough(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse, error) {
return g.client.Set(ctx, req)
}
// GetPassthrough allows to pass an existing GetRequest. Used for cSBI.
func (g *Gnmi) GetPassthrough(ctx context.Context, req *gpb.GetRequest) (*gpb.GetResponse, error) {
return g.client.Get(ctx, req)
}