Newer
Older
package nucleus
import (
Fabian Seidl
committed
"time"
"code.fbi.h-da.de/danet/gosdn/controller/config"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
tpInterface "code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
"code.fbi.h-da.de/danet/gosdn/controller/plugin/shared"
Fabian Seidl
committed
"code.fbi.h-da.de/danet/gosdn/controller/customerrs"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
gpb "github.com/openconfig/gnmi/proto/gnmi"
tpb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/transport"
)
var gnmiClients = make(map[string]gpb.GNMIClient, 0)
// Gnmi implements the Transport interface and provides an SBI with the
// possibility to access a gNMI endpoint.
SetNode func(path *gpb.Path, value *gpb.TypedValue) error
DeleteNode func(path *gpb.Path) error
RespChan chan *gpb.SubscribeResponse
Unmarshal func(data []byte, path *gpb.Path) error
Options *tpb.TransportOption
client gpb.GNMIClient
config *gnmi.Config
// newGnmiTransport takes a struct of GnmiTransportOptions and returns a Gnmi
// transport based on the values of it.
// Do not call directly. Use NewTransport() instead.
func newGnmiTransport(opts *tpb.TransportOption, model shared.DeviceModel) (*Gnmi, error) {
if opts == nil || model == nil {
Fabian Seidl
committed
return nil, &customerrs.InvalidParametersError{
Func: newGnmiTransport,
Param: "'opts' and 'sbi' can not be nil",
}
} else if opts.TransportOption == nil {
Fabian Seidl
committed
return nil, &customerrs.InvalidParametersError{
Func: newGnmiTransport,
Param: "'opts.TransportOption' can not be nil",
}
gnmiConfig := &gnmi.Config{
Addr: opts.Address,
Password: opts.Password,
Username: opts.Username,
TLS: opts.Tls,
Malte Bauch
committed
Encoding: gpb.Encoding_JSON_IETF,
Compression: opts.GetGnmiTransportOption().GetCompression(),
if opts.Tls {
if err := enableTLSInGnmiConfig(gnmiConfig); err != nil {
log.Error(err)
return nil, err
}
}
var err error
c, ok := gnmiClients[opts.GetAddress()]
if !ok {
c, err = gnmi.Dial(gnmiConfig)
if err != nil {
return nil, err
}
gnmiClients[opts.GetAddress()] = c
"target": opts.Address,
"tls": opts.Tls,
}).Info("building new gNMI transport")
SetNode: model.SetNode,
DeleteNode: model.DeleteNode,
RespChan: make(chan *gpb.SubscribeResponse),
Unmarshal: model.Unmarshal,
Options: opts,
client: c,
config: gnmiConfig,
// Get takes a slice of gnmi paths, splits them and calls get for each one of them.
func (g *Gnmi) Get(ctx context.Context, params ...string) (interface{}, error) {
Fabian Seidl
committed
return nil, &customerrs.NilClientError{}
ctx = gnmi.NewContext(ctx, g.config)
paths := gnmi.SplitPaths(params)
return g.get(ctx, paths, "")
}
// Set takes a change.Payload struct.
func (g *Gnmi) Set(ctx context.Context, payload change.Payload) error {
Fabian Seidl
committed
return &customerrs.NilClientError{}
return g.applyDiff(ctx, payload)
func (g *Gnmi) applyDiff(ctx context.Context, payload change.Payload) error {
diff := payload.Diff
updates := diff.GetUpdate()
deletes := diff.GetDelete()
setRequest := &gpb.SetRequest{
Prefix: diff.GetPrefix(),
Delete: deletes,
Update: updates,
}
resp, err := g.client.Set(ctx, setRequest)
if err != nil {
return err
}
log.Info(resp)
return nil
func (g *Gnmi) Subscribe(ctx context.Context, params ...string) error {
Fabian Seidl
committed
return &customerrs.NilClientError{}
// ControlPlaneSubscribe is used to subscribe to devices from within the controller. gNMI SubscribeOptions need to be provided in the context,
// the callback function handles the responses received from the subscription.
Fabian Seidl
committed
func (g *Gnmi) ControlPlaneSubscribe(ctx context.Context, subscriptionInfo tpInterface.SubscriptionInformation, subInfoChannel chan tpInterface.SubscriptionInformation) error {
if g.client == nil {
Fabian Seidl
committed
return &customerrs.NilClientError{}
Fabian Seidl
committed
return g.controlPlaneSubscribe(ctx, subscriptionInfo, subInfoChannel)
// Type returns the gNMI transport type.
func (g *Gnmi) Type() string {
// ProcessResponse takes a gNMI response and serializes the contents to the
// root struct. It logs all errors and returns an error containing the number
// off errors encountered during the process.
func (g *Gnmi) ProcessResponse(resp interface{}) error {
Fabian Seidl
committed
return &customerrs.InvalidTypeAssertionError{
Value: resp,
Type: &gpb.GetResponse{},
}
if err := g.processResponseUpdates(msg.Update); err != nil {
return err
}
}
return nil
}
// ProcessControlPlaneSubscribeResponse processes the gNMI notification within the subscribe response, updating the provided device model.
func (g *Gnmi) ProcessControlPlaneSubscribeResponse(resp *gpb.SubscribeResponse_Update) error {
notification := resp.Update
if len(notification.Update) > 0 {
if err := g.processResponseUpdates(notification.Update); err != nil {
return err
}
}
if len(notification.Delete) > 0 {
if err := g.processResponseDeletes(notification.Delete); err != nil {
return err
}
}
return nil
}
func (g *Gnmi) processResponseUpdates(updates []*gpb.Update) error {
errs := make([]error, 0)
for _, update := range updates {
if err := g.SetNode(update.Path, update.Val); err != nil {
errs = append(errs, err)
if len(errs) != 0 {
return handleProcessResponseErrors(errs)
}
return nil
}
func handleProcessResponseErrors(errs []error) error {
for _, e := range errs {
log.Error(e)
}
return fmt.Errorf("encountered %v errors during response processing\n%v", len(errs), errs)
}
func (g *Gnmi) processResponseDeletes(deletes []*gpb.Path) error {
errs := make([]error, 0)
for _, delete := range deletes {
if err := g.DeleteNode(delete); err != nil {
errs = append(errs, err)
}
if len(errs) != 0 {
return handleProcessResponseErrors(errs)
// Capabilities calls GNMI capabilities.
func (g *Gnmi) Capabilities(ctx context.Context) (interface{}, error) {
}).Info("sending gNMI capabilities request")
ctx = gnmi.NewContext(ctx, g.config)
ctx = context.WithValue(ctx, types.CtxKeyConfig, g.config) //nolint
resp, err := g.client.Capabilities(ctx, &gpb.CapabilityRequest{})
if err != nil {
return nil, err
}
return resp, nil
}
// get calls GNMI get.
func (g *Gnmi) get(ctx context.Context, paths [][]string, origin string) (interface{}, error) {
ctx = context.WithValue(ctx, types.CtxKeyConfig, g.config) //nolint
req, err := gnmi.NewGetRequest(ctx, paths, origin)
if err != nil {
return nil, err
}
return g.getWithRequest(ctx, req)
}
// getWithRequest takes a fully formed GetRequest, performs the Get,
// and returns any response.
func (g *Gnmi) getWithRequest(ctx context.Context, req *gpb.GetRequest) (interface{}, error) {
Fabian Seidl
committed
return nil, &customerrs.NilError{}
"path": req.Path,
}).Info("sending gNMI get request")
resp, err := g.client.Get(ctx, req)
if err != nil {
return nil, err
}
return resp, nil
}
// subscribe calls GNMI subscribe.
func (g *Gnmi) subscribe(ctx context.Context) error {
opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
Fabian Seidl
committed
return &customerrs.InvalidTypeAssertionError{
Value: ctx.Value(types.CtxKeyOpts),
Type: &gnmi.SubscribeOptions{},
"address": opts.Target,
"paths": opts.Paths,
"mode": opts.Mode,
"interval": opts.SampleInterval,
}).Info("subscribed to gNMI target")
for {
resp := <-g.RespChan
if resp != nil {
if err := gnmi.LogSubscribeResponse(resp); err != nil {
log.Fatal(err)
}
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
}
// controlPlaneSubscribe calls gNMI subscribe with a callback for responses and additional network element information including
// an option to stop the subscription.
Fabian Seidl
committed
func (g *Gnmi) controlPlaneSubscribe(ctx context.Context, subscriptionInfo tpInterface.SubscriptionInformation, subInfoChannel chan tpInterface.SubscriptionInformation) error {
ctx = gnmi.NewContext(ctx, g.config)
opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
if !ok {
Fabian Seidl
committed
return &customerrs.InvalidTypeAssertionError{
Value: ctx.Value(types.CtxKeyOpts),
Type: &gnmi.SubscribeOptions{},
}
}
go func() {
log.WithFields(log.Fields{
"address": opts.Target,
"paths": opts.Paths,
"mode": opts.Mode,
"interval": opts.SampleInterval,
}).Info("subscribed to gNMI target")
for {
resp := <-g.RespChan
Fabian Seidl
committed
subscriptionInfo.SubResponse = resp
subInfoChannel <- subscriptionInfo
select {
case <-subscriptionInfo.StopContext.Done():
if err := subscriptionInfo.StopContext.Err(); err != nil {
log.Error(err)
}
return
default:
Fabian Seidl
committed
time.Sleep(time.Millisecond * 2)
}
}
}()
Fabian Seidl
committed
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
}
// Close calls GNMI close.
func (g *Gnmi) Close() error {
return nil
}
// CustomSet allows to build a custom set request.
func (g *Gnmi) CustomSet(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse, error) {
ctx = gnmi.NewContext(ctx, g.config)
return g.client.Set(ctx, req)
}
// SetPassthrough allows to pass an existing SetRequest. Used for cSBI.
func (g *Gnmi) SetPassthrough(ctx context.Context, req *gpb.SetRequest) (*gpb.SetResponse, error) {
return g.client.Set(ctx, req)
}
// GetPassthrough allows to pass an existing GetRequest. Used for cSBI.
func (g *Gnmi) GetPassthrough(ctx context.Context, req *gpb.GetRequest) (*gpb.GetResponse, error) {
return g.client.Get(ctx, req)
}
func enableTLSInGnmiConfig(gnmiConfig *gnmi.Config) error {
wd, err := os.Getwd()
if err != nil {
return err
}
if config.CAFilePath != "" {
gnmiConfig.CertFile = wd + config.CAFilePath
} else {
return fmt.Errorf("Error setting up client with mTLS, required CA file not provided in config")
}
if config.CertFilePath != "" && config.KeyFilePath != "" {
gnmiConfig.CertFile = wd + config.CertFilePath
gnmiConfig.KeyFile = wd + config.KeyFilePath
} else {
return fmt.Errorf("Error setting up client with mTLS, required files not provided in config")
}
return nil
}