Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package nucleus
import (
"context"
"fmt"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/device"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/networkdomain"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/transport"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/errors"
"code.fbi.h-da.de/danet/gosdn/controller/nucleus/types"
"code.fbi.h-da.de/danet/gosdn/controller/store"
"code.fbi.h-da.de/danet/gosdn/forks/goarista/gnmi"
"github.com/google/uuid"
gpb "github.com/openconfig/gnmi/proto/gnmi"
log "github.com/sirupsen/logrus"
)
const (
subscribeSampleInterval uint64 = 1000000000 // 1 second in nanoseconds
// TODO: These gNMI options are adjusted to arista gNMI fork. Change when switching to native gNMI.
gNMISubscribeMode string = "stream"
gNMIStreamMode string = "on_change"
)
// DeviceWatcher is a component that subscribes to devices via gNMI from within the controller and handles
// responses by triggering the internal event process.
type DeviceWatcher struct {
pndStore networkdomain.PndStore
deviceSubcriptions map[uuid.UUID]*deviceSubscriptionHelper
}
// deviceSubscriptionHelper is used to store information to stop a running subscribe go routine.
type deviceSubscriptionHelper struct {
stopSubscribeCtx context.Context
stopFunc context.CancelFunc
}
// NewDeviceWatcher takes a pndStore to subscribe to device paths.
func NewDeviceWatcher(pndStore networkdomain.PndStore) *DeviceWatcher {
return &DeviceWatcher{
pndStore: pndStore,
deviceSubcriptions: make(map[uuid.UUID]*deviceSubscriptionHelper),
}
}
// SubToDevices subscribes to every available device in each network domain according to provided SubscribeOptions.
// Paths should be provided in the following format [][]string{{"system", "config", "hostname"}}
// SubscribeOptions can be nil. Use nil for a fixed, pre-defined set of gNMI subscription options (streaming in sample mode each second).
func (d *DeviceWatcher) SubToDevices(paths [][]string, opts *gnmi.SubscribeOptions) {
if opts == nil {
opts = &gnmi.SubscribeOptions{
Mode: gNMISubscribeMode,
StreamMode: gNMIStreamMode,
Paths: paths,
SampleInterval: subscribeSampleInterval,
}
}
pnds, err := d.pndStore.GetAll()
if err != nil {
log.Error(err)
}
for _, pnd := range pnds {
d.subscribeToPndDevices(pnd.ID().String(), pnd, opts)
}
}
func (d *DeviceWatcher) subscribeToPndDevices(pndID string, pnd networkdomain.NetworkDomain, opts *gnmi.SubscribeOptions) {
for _, loadedDevice := range pnd.Devices() {
device, _ := loadedDevice.ConvertToDevice()
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
subID := uuid.New()
stopContext, cancel := context.WithCancel(context.Background())
d.addToDeviceSubscriptions(subID, &deviceSubscriptionHelper{
stopSubscribeCtx: stopContext,
stopFunc: cancel,
})
go d.callSubscribe(stopContext, pndID, device, opts)
}
}
func (d *DeviceWatcher) callSubscribe(stopContext context.Context, pndID string, device device.Device, opts *gnmi.SubscribeOptions) {
gNMIOptionsCtx := context.Background()
gNMIOptionsCtx = context.WithValue(gNMIOptionsCtx, types.CtxKeyOpts, opts)
// SubscriptionInformation contains pnd ID, device ID and name to be used in the internal subscribe to check
// from which device a response was sent
if err := device.Transport().ControlPlaneSubscribe(gNMIOptionsCtx, d.handleSubscribeResponse, &transport.SubscriptionInformation{
PndID: pndID,
DeviceID: device.ID().String(),
DeviceName: device.Name(),
StopContext: stopContext,
}); err != nil {
log.Error(err)
}
}
func (d *DeviceWatcher) addToDeviceSubscriptions(subID uuid.UUID, devSub *deviceSubscriptionHelper) {
//TODO: improve handling of subscriptions, like be able to expose to apps so specific subscriptions instead of only all can be stopped in the future
d.deviceSubcriptions[subID] = devSub
}
// StopAndRemoveAllDeviceSubscriptions stops and removes all the available running subscriptions.
func (d *DeviceWatcher) StopAndRemoveAllDeviceSubscriptions() {
for key := range d.deviceSubcriptions {
d.StopAndRemoveDeviceSubscription(key)
}
}
// StopAndRemoveDeviceSubscription passes a subscription uuid to stop the running subscription go routing and removes the entry from the map
// of device subscriptions.
func (d *DeviceWatcher) StopAndRemoveDeviceSubscription(subID uuid.UUID) {
d.deviceSubcriptions[subID].stopFunc()
delete(d.deviceSubcriptions, subID)
}
// handleSubscribeResponse takes the subscribe response and additional information about the device to distinguish
// from which device a subscribe response was sent including improved error handling.
func (d *DeviceWatcher) handleSubscribeResponse(resp *gpb.SubscribeResponse, subscriptionInfo *transport.SubscriptionInformation) {
switch resp := resp.Response.(type) {
case *gpb.SubscribeResponse_Error:
log.Error(&errors.ErrSubscribeResponse{
PndID: subscriptionInfo.PndID,
DeviceID: subscriptionInfo.DeviceID,
DeviceName: subscriptionInfo.DeviceName,
Err: fmt.Sprint("SubscribeResponse_Error"),
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
})
case *gpb.SubscribeResponse_SyncResponse:
if !resp.SyncResponse {
log.Error(&errors.ErrSubscribeSyncResponse{
PndID: subscriptionInfo.PndID,
DeviceID: subscriptionInfo.DeviceID,
DeviceName: subscriptionInfo.DeviceName,
})
}
case *gpb.SubscribeResponse_Update:
d.handleSubscribeResponseUpdate(resp, subscriptionInfo)
default:
log.Infof("Invalid SubscribeResponse, %v", resp)
}
}
func (d *DeviceWatcher) handleSubscribeResponseUpdate(resp *gpb.SubscribeResponse_Update, subscriptionInfo *transport.SubscriptionInformation) {
pndID, err := uuid.Parse(subscriptionInfo.PndID)
if err != nil {
log.Error(err)
}
pnd, err := d.pndStore.Get(store.Query{ID: pndID})
if err != nil {
log.Error(err)
}
device, err := pnd.GetDevice(subscriptionInfo.DeviceID)
if err != nil {
log.Error(err)
}
err = device.Transport().ProcessControlPlaneSubscribeResponse(resp, device.GetModel(), device.SBI().Schema())
if err != nil {
log.Error(err)
} else {
if err := pnd.UpdateDeviceAfterSubscribeResponse(device); err != nil {
log.Error(err)
}
}
}