Skip to content
Snippets Groups Projects
Commit b8329e16 authored by Fabian Seidl's avatar Fabian Seidl
Browse files

added improved error handling with option to see which device from which pnd has send a response

parent 954a867b
No related branches found
No related tags found
3 merge requests!376Add additional example application hostname-checker,!343Add basic application framework and example application to show interaction between events an NBI,!342Resolve "Add an option to send gNMI Subscribe requests via SBI"
Pipeline #108240 failed
This commit is part of merge request !342. Comments created here will be created in the context of that merge request.
......@@ -15,12 +15,12 @@ type Transport interface {
Get(ctx context.Context, params ...string) (interface{}, error)
Set(ctx context.Context, payload change.Payload, path string, schema *ytypes.Schema) error
Subscribe(ctx context.Context, params ...string) error
SubscribeInternal(ctx context.Context, subscribeCallbackFunc HandleSubscribeResponse) error
SubscribeInternal(ctx context.Context, subscribeCallbackFunc HandleSubscribeResponse, pndID, deviceID, deviceName string) error
Type() string
ProcessResponse(resp interface{}, root interface{}, models *ytypes.Schema) error
}
type (
// HandleSubscribeResponse is the callback function to handle subcription responses
HandleSubscribeResponse func(*gpb.SubscribeResponse)
HandleSubscribeResponse func(*gpb.SubscribeResponse, string, string, string)
)
......@@ -100,13 +100,13 @@ func (_m *Transport) Subscribe(ctx context.Context, params ...string) error {
return r0
}
// SubscribeInternal provides a mock function with given fields: ctx, subscribeCallbackFunc
func (_m *Transport) SubscribeInternal(ctx context.Context, subscribeCallbackFunc transport.HandleSubscribeResponse) error {
ret := _m.Called(ctx, subscribeCallbackFunc)
// SubscribeInternal provides a mock function with given fields: ctx, subscribeCallbackFunc, pndID, deviceID, deviceName
func (_m *Transport) SubscribeInternal(ctx context.Context, subscribeCallbackFunc transport.HandleSubscribeResponse, pndID string, deviceID string, deviceName string) error {
ret := _m.Called(ctx, subscribeCallbackFunc, pndID, deviceID, deviceName)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, transport.HandleSubscribeResponse) error); ok {
r0 = rf(ctx, subscribeCallbackFunc)
if rf, ok := ret.Get(0).(func(context.Context, transport.HandleSubscribeResponse, string, string, string) error); ok {
r0 = rf(ctx, subscribeCallbackFunc, pndID, deviceID, deviceName)
} else {
r0 = ret.Error(0)
}
......
......
......@@ -2,10 +2,10 @@ package nucleus
import (
"context"
"fmt"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/device"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
gpb "github.com/openconfig/gnmi/proto/gnmi"
......@@ -49,25 +49,46 @@ func (d *DeviceWatcher) SubToDevices(paths [][]string) {
}
for _, pnd := range pnds {
d.subscribeToPndDevices(pnd, opts)
d.subscribeToPndDevices(pnd.ID().String(), pnd.Devices(), opts)
}
}
func (d *DeviceWatcher) subscribeToPndDevices(pnd networkdomain.NetworkDomain, opts *gnmi.SubscribeOptions) {
for _, device := range pnd.Devices() {
go d.callSubscribe(device, opts)
func (d *DeviceWatcher) subscribeToPndDevices(pndID string, devices []device.Device, opts *gnmi.SubscribeOptions) {
for _, device := range devices {
go d.callSubscribe(pndID, device, opts)
}
}
func (d *DeviceWatcher) callSubscribe(device device.Device, opts *gnmi.SubscribeOptions) {
func (d *DeviceWatcher) callSubscribe(pndID string, device device.Device, opts *gnmi.SubscribeOptions) {
ctx := context.Background()
ctx = context.WithValue(ctx, types.CtxKeyOpts, opts)
if err := device.Transport().SubscribeInternal(ctx, handleSubscribeRepsonse); err != nil {
// device ID and name are passed to the internal subscribe to check in which device
// a problem occurs in case of an error
if err := device.Transport().SubscribeInternal(ctx, handleSubscribeResponse, pndID, device.ID().String(), device.Name()); err != nil {
logrus.Error(err)
}
}
func handleSubscribeRepsonse(resp *gpb.SubscribeResponse) {
fmt.Printf("YEP HANDLER CALLED Type:%T ExampleMessage:%s\n", resp, resp.String())
func handleSubscribeResponse(resp *gpb.SubscribeResponse, pndID, deviceID, deviceName string) {
switch resp := resp.Response.(type) {
case *gpb.SubscribeResponse_Error:
logrus.Error(&errors.ErrSubscribeResponse{
PndID: pndID,
DeviceID: deviceID,
DeviceName: deviceName,
Err: resp.Error.Message,
})
case *gpb.SubscribeResponse_SyncResponse:
if !resp.SyncResponse {
logrus.Error(&errors.ErrSubscribeSyncResponse{
PndID: pndID,
DeviceID: deviceID,
DeviceName: deviceName,
})
}
case *gpb.SubscribeResponse_Update:
// parse here!
//ExampleMessage:update:{timestamp:1657200848272415469 update:{path:{elem:{name:"system"} elem:{name:"config"} elem:{name:"hostname"}} val:{string_val:"ceos0"}}}
}
}
......@@ -251,3 +251,28 @@ type ErrAMQPMessageFail struct {
func (e ErrAMQPMessageFail) Error() string {
return fmt.Sprintf("Action: %s, Internal error: %v", e.Action, e.Err)
}
// ErrSubscribeResponse implements the Error interface and is called if there is an issue during a ongoing
// gNMI Subscription.
type ErrSubscribeResponse struct {
PndID string
DeviceID string
DeviceName string
Err string
}
func (e ErrSubscribeResponse) Error() string {
return fmt.Sprintf("PndID: %s, DeviceID: %s, DeviceName: %s, Internal error: %s", e.PndID, e.DeviceID, e.DeviceName, e.Err)
}
// ErrSubscribeSyncResponse implements the Error interface and is called if there is an issue syncing a
// gNMI Subscription.
type ErrSubscribeSyncResponse struct {
PndID string
DeviceID string
DeviceName string
}
func (e ErrSubscribeSyncResponse) Error() string {
return fmt.Sprintf("Sync failed, PndID: %s, DeviceID: %s, DeviceName: %s", e.PndID, e.DeviceID, e.DeviceName)
}
......@@ -3,8 +3,6 @@ package nucleus
import (
"context"
"fmt"
"path"
"time"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
tpInterface "code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
......@@ -204,12 +202,12 @@ func (g *Gnmi) Subscribe(ctx context.Context, params ...string) error {
// SubscribeInternal is used to subscribe to devices from within the controller. gNMI SubscribeOptions need to be provided in the context,
// the callback function handles the responses received from the subscription.
func (g *Gnmi) SubscribeInternal(ctx context.Context, subscribeCallbackFunc tpInterface.HandleSubscribeResponse) error {
func (g *Gnmi) SubscribeInternal(ctx context.Context, subscribeCallbackFunc tpInterface.HandleSubscribeResponse, pndID, deviceID, deviceName string) error {
if g.client == nil {
return &errors.ErrNilClient{}
}
return g.subscribeInternal(ctx, subscribeCallbackFunc)
return g.subscribeInternal(ctx, subscribeCallbackFunc, pndID, deviceID, deviceName)
}
// Type returns the gNMI transport type
......@@ -341,7 +339,7 @@ func (g *Gnmi) subscribe(ctx context.Context) error {
}
// Subscribe calls GNMI subscribe
func (g *Gnmi) subscribeInternal(ctx context.Context, subcribeCallbackFunc func(*gpb.SubscribeResponse)) error { // add callback function instead of chan string as parameter
func (g *Gnmi) subscribeInternal(ctx context.Context, subcribeCallbackFunc func(*gpb.SubscribeResponse, string, string, string), pndID, deviceID, deviceName string) error { // add callback function instead of chan string as parameter
ctx = gnmi.NewContext(ctx, g.config)
opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
if !ok {
......@@ -361,49 +359,14 @@ func (g *Gnmi) subscribeInternal(ctx context.Context, subcribeCallbackFunc func(
for {
resp := <-g.RespChan
if resp != nil {
// do callback(respMessage)
go subcribeCallbackFunc(resp)
// callback to trigger internal event handling process
go subcribeCallbackFunc(resp, pndID, deviceID, deviceName)
}
}
}()
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
}
// LogSubscribeResponse logs update responses to stderr.
func LogSubscribeResponse(response *gpb.SubscribeResponse, stringRespChan chan string) error {
switch resp := response.Response.(type) {
case *gpb.SubscribeResponse_Error:
//return errors.New(resp.Error.Message)
//TODO: fix error
return errors.ErrNotYetImplemented{}
case *gpb.SubscribeResponse_SyncResponse:
if !resp.SyncResponse {
//return errors.New("initial sync failed")
//TODO: fix error
return errors.ErrNotYetImplemented{}
}
case *gpb.SubscribeResponse_Update:
t := time.Unix(0, resp.Update.Timestamp).UTC()
prefix := resp.Update.Prefix
var target string
if t := resp.Update.Prefix.GetTarget(); t != "" {
target = "(" + t + ") "
}
for _, update := range resp.Update.Update {
stringRespChan <- fmt.Sprintf("[%s] %s%s = %s\n", t.Format(time.RFC3339Nano),
target,
path.Join(prefix.String(), update.Path.String()),
update.String())
}
for _, del := range resp.Update.Delete {
stringRespChan <- fmt.Sprintf("[%s] %sDeleted %s\n", t.Format(time.RFC3339Nano),
target,
path.Join(prefix.String(), del.String()))
}
}
return nil
}
// Close calls GNMI close
func (g *Gnmi) Close() error {
return nil
......
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment