Newer
Older
/* Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package gnmiserver implements a gnmi server.
package gnmiserver
import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"reflect"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
log "github.com/golang/glog"
"github.com/openconfig/gnmi/value"
"github.com/openconfig/ygot/util"
"github.com/openconfig/ygot/ygot"
"github.com/openconfig/ygot/ytypes"
"google.golang.org/protobuf/proto"
Malte Bauch
committed
"code.fbi.h-da.de/danet/gnmi-target/handler"
not "code.fbi.h-da.de/danet/gnmi-target/internal/notifications"
dpb "github.com/golang/protobuf/protoc-gen-go/descriptor"
"github.com/openconfig/gnmi/proto/gnmi"
pb "github.com/openconfig/gnmi/proto/gnmi"
)
// ConfigCallback is the signature of the function to apply a validated config to the physical device.
Malte Bauch
committed
//type ConfigCallback func(ygot.ValidatedGoStruct, ygot.ValidatedGoStruct) error
supportedEncodings = []pb.Encoding{pb.Encoding_JSON, pb.Encoding_JSON_IETF}
)
// Server struct maintains the data structure for device config and implements the interface of gnmi server. It supports Capabilities, Get, and Set APIs.
// Typical usage:
// g := grpc.NewServer()
// s, err := Server.NewServer(model, config, callback)
// pb.NewServer(g, s)
// reflection.Register(g)
// listen, err := net.Listen("tcp", ":8080")
// g.Serve(listen)
//
// For a real device, apply the config changes to the hardware in the callback function.
// Arguments:
//
// newConfig: new root config to be applied on the device.
//
// func callback(newConfig ygot.ValidatedGoStruct) error {
// // Apply the config to your device and return nil if success. return error if fails.
// //
// // Do something ...
// }
Malte Bauch
committed
model *Model
config *handler.Config
Malte Bauch
committed
handlers []handler.PathHandler
}
// NewServer creates an instance of Server with given json config.
func NewServer(model *Model, config *handler.Config, notifications *not.Dispatcher, handlers ...handler.PathHandler) (*Server, error) {
/*rootStruct, err := model.NewConfigStruct(config)
model: model,
config: config,
YangModelChangeDispatcher: notifications,
Malte Bauch
committed
handlers: handlers,
/*if config != nil && s.callback != nil {
if err := s.callback(config); err != nil {
Malte Bauch
committed
// callback is used to apply a validated new config to the physical device. The
// changed values are then sent to YangHandlers to apply the new config values
// to the physical device.
func (s *Server) callback(newConfig ygot.ValidatedGoStruct, existingConf ygot.ValidatedGoStruct) ([]*gnmi.Notification, error) {
Malte Bauch
committed
// All applied successfully, so time for finding the diff and report this
// Generate gnmi notifications for subscribe
configDiff, err := ygot.DiffWithAtomic(existingConf, newConfig)
if err != nil {
//TODO error logging as we cannot do anything in this case
}
// NOTE: This is a test implementation and not particularly sophisticated,
// but for now the general idea is this: For every registered module we
// check if a changed value is one that the registered module is
// responsible for. If this is the case, then the handlers update method is
// called.
// The update method of a handler will receive the new config with all the
// changes and from there on the handler updates the given values within
// the os.
for _, handler := range s.handlers {
handlerJobs, err := checkHandlerPaths(handler, configDiff)
if err != nil {
return nil, err
Malte Bauch
committed
}
if len(handlerJobs) != 0 {
for _, handlerJob := range handlerJobs {
// NOTE: add delete
//if err := handler.Delete(newConfig, handlerJob.Updates); err != nil {
// return err
//}
if err := handler.Update(newConfig, handlerJob.Updates); err != nil {
return nil, err
Malte Bauch
committed
}
}
}
}
return configDiff, nil
}
// TODO: This will be moved
func (s *Server) PublishNotificationsToSubscribers(notifications []*gnmi.Notification) error {
for _, specificDiff := range notifications {
Malte Bauch
committed
// First for gnmi Updates
updates := specificDiff.GetUpdate()
for _, specificUpdate := range updates {
pathString, err := ygot.PathToString(specificUpdate.Path)
if err != nil {
return err
}
Malte Bauch
committed
log.Infof("specificDiff update %s with value of %s", pathString, specificUpdate.Val.String())
// Wrap Update into a notification and ship it off
updateNotification := createUpdateNotification(specificUpdate)
s.YangModelChangeDispatcher.Publish(pathString, updateNotification)
}
// Second for gnmi Deletes
deletes := specificDiff.GetDelete()
for _, specificDelete := range deletes {
pathString, err := ygot.PathToString(specificDelete)
if err != nil {
return err
}
Malte Bauch
committed
log.Infof("specificDiff delete %s ", pathString)
// Wrap Update into a notification and ship it off
updateNotification := createDeleteNotification(specificDelete)
s.YangModelChangeDispatcher.Publish(pathString, updateNotification)
}
}
return nil
}
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
// checkEncodingAndModel checks whether encoding and models are supported by the server. Return error if anything is unsupported.
func (s *Server) checkEncodingAndModel(encoding pb.Encoding, models []*pb.ModelData) error {
hasSupportedEncoding := false
for _, supportedEncoding := range supportedEncodings {
if encoding == supportedEncoding {
hasSupportedEncoding = true
break
}
}
if !hasSupportedEncoding {
return fmt.Errorf("unsupported encoding: %s", pb.Encoding_name[int32(encoding)])
}
for _, m := range models {
isSupported := false
for _, supportedModel := range s.model.modelData {
if reflect.DeepEqual(m, supportedModel) {
isSupported = true
break
}
}
if !isSupported {
return fmt.Errorf("unsupported model: %v", m)
}
}
return nil
}
// getGNMIServiceVersion returns a pointer to the gNMI service version string.
// The method is non-trivial because of the way it is defined in the proto file.
func getGNMIServiceVersion() (*string, error) {
gzB, _ := (&pb.Update{}).Descriptor()
r, err := gzip.NewReader(bytes.NewReader(gzB))
if err != nil {
return nil, fmt.Errorf("error in initializing gzip reader: %v", err)
}
defer r.Close()
b, err := io.ReadAll(r)
if err != nil {
return nil, fmt.Errorf("error in reading gzip data: %v", err)
}
desc := &dpb.FileDescriptorProto{}
if err := proto.Unmarshal(b, desc); err != nil {
return nil, fmt.Errorf("error in unmarshaling proto: %v", err)
}
ver := proto.GetExtension(desc.Options, pb.E_GnmiService)
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
return ver.(*string), nil
}
// gnmiFullPath builds the full path from the prefix and path.
func gnmiFullPath(prefix, path *pb.Path) *pb.Path {
fullPath := &pb.Path{Origin: path.Origin}
if path.GetElement() != nil {
fullPath.Element = append(prefix.GetElement(), path.GetElement()...)
}
if path.GetElem() != nil {
fullPath.Elem = append(prefix.GetElem(), path.GetElem()...)
}
return fullPath
}
// Capabilities returns supported encodings and supported models.
func (s *Server) Capabilities(ctx context.Context, req *pb.CapabilityRequest) (*pb.CapabilityResponse, error) {
ver, err := getGNMIServiceVersion()
if err != nil {
return nil, status.Errorf(codes.Internal, "error in getting gnmi service version: %v", err)
}
return &pb.CapabilityResponse{
SupportedModels: s.model.modelData,
SupportedEncodings: supportedEncodings,
GNMIVersion: *ver,
}, nil
}
// Get implements the Get RPC in gNMI spec.
func (s *Server) Get(ctx context.Context, req *pb.GetRequest) (*pb.GetResponse, error) {
if req.GetType() != pb.GetRequest_ALL {
return nil, status.Errorf(codes.Unimplemented, "unsupported request type: %s", pb.GetRequest_DataType_name[int32(req.GetType())])
}
if err := s.checkEncodingAndModel(req.GetEncoding(), req.GetUseModels()); err != nil {
return nil, status.Error(codes.Unimplemented, err.Error())
}
prefix := req.GetPrefix()
paths := req.GetPath()
notifications := make([]*pb.Notification, len(paths))
s.config.RLock()
defer s.config.RUnlock()
for i, path := range paths {
// Get schema node for path from config struct.
fullPath := path
if prefix != nil {
fullPath = gnmiFullPath(prefix, path)
}
if fullPath.GetElem() == nil && fullPath.GetElement() != nil {
return nil, status.Error(codes.Unimplemented, "deprecated path element type is unsupported")
}
nodes, err := ytypes.GetNode(s.model.schemaTreeRoot, s.config.Data, fullPath)
if len(nodes) == 0 || err != nil || util.IsValueNil(nodes[0].Data) {
return nil, status.Errorf(codes.NotFound, "path %v not found: %v", fullPath, err)
}
node := nodes[0].Data
ts := time.Now().UnixNano()
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
// Return leaf node.
if !ok {
var val *pb.TypedValue
switch kind := reflect.ValueOf(node).Kind(); kind {
case reflect.Ptr, reflect.Interface:
var err error
val, err = value.FromScalar(reflect.ValueOf(node).Elem().Interface())
if err != nil {
msg := fmt.Sprintf("leaf node %v does not contain a scalar type value: %v", path, err)
log.Error(msg)
return nil, status.Error(codes.Internal, msg)
}
case reflect.Int64:
enumMap, ok := s.model.enumData[reflect.TypeOf(node).Name()]
if !ok {
return nil, status.Error(codes.Internal, "not a GoStruct enumeration type")
}
val = &pb.TypedValue{
Value: &pb.TypedValue_StringVal{
StringVal: enumMap[reflect.ValueOf(node).Int()].Name,
},
}
default:
return nil, status.Errorf(codes.Internal, "unexpected kind of leaf node type: %v %v", node, kind)
}
update := &pb.Update{Path: path, Val: val}
notifications[i] = &pb.Notification{
Timestamp: ts,
Prefix: prefix,
Update: []*pb.Update{update},
}
continue
}
if req.GetUseModels() != nil {
return nil, status.Errorf(codes.Unimplemented, "filtering Get using use_models is unsupported, got: %v", req.GetUseModels())
}
// Return IETF JSON by default.
jsonEncoder := func() (map[string]interface{}, error) {
return ygot.ConstructIETFJSON(nodeStruct, &ygot.RFC7951JSONConfig{AppendModuleName: true})
}
jsonType := "IETF"
buildUpdate := func(b []byte) *pb.Update {
return &pb.Update{Path: path, Val: &pb.TypedValue{Value: &pb.TypedValue_JsonIetfVal{JsonIetfVal: b}}}
}
if req.GetEncoding() == pb.Encoding_JSON {
jsonEncoder = func() (map[string]interface{}, error) {
return ygot.ConstructInternalJSON(nodeStruct)
}
jsonType = "Internal"
buildUpdate = func(b []byte) *pb.Update {
return &pb.Update{Path: path, Val: &pb.TypedValue{Value: &pb.TypedValue_JsonVal{JsonVal: b}}}
}
}
jsonTree, err := jsonEncoder()
if err != nil {
msg := fmt.Sprintf("error in constructing %s JSON tree from requested node: %v", jsonType, err)
log.Error(msg)
return nil, status.Error(codes.Internal, msg)
}
jsonDump, err := json.Marshal(jsonTree)
if err != nil {
msg := fmt.Sprintf("error in marshaling %s JSON tree to bytes: %v", jsonType, err)
log.Error(msg)
return nil, status.Error(codes.Internal, msg)
}
update := buildUpdate(jsonDump)
notifications[i] = &pb.Notification{
Timestamp: ts,
Prefix: prefix,
Update: []*pb.Update{update},
}
}
return &pb.GetResponse{Notification: notifications}, nil
}
// Set implements the Set RPC in gNMI spec.
func (s *Server) Set(ctx context.Context, req *pb.SetRequest) (*pb.SetResponse, error) {
s.config.Lock()
defer s.config.Unlock()
logrus.Debug("Incoming Set Request: ", req)
newConfig, err := ygot.DeepCopy(s.config.Data)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
newSchema := &ytypes.Schema{
Root: newConfig,
SchemaTree: s.model.schemaTree,
Unmarshal: s.model.jsonUnmarshaler,
err = ytypes.UnmarshalSetRequest(newSchema, req)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
valNewConfig, ok := newConfig.(ygot.ValidatedGoStruct)
if !ok {
return nil, status.Error(codes.Internal, fmt.Sprintf("wrong type for new config, expected: %T; got: %T", (ygot.ValidatedGoStruct)(nil), newConfig))
}
currentConfig, err := ygot.DeepCopy(s.config.Data)
Malte Bauch
committed
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
// Apply the validated operations to the device.
diff, applyErr := s.callback(valNewConfig, currentConfig.(ygot.ValidatedGoStruct))
if applyErr != nil {
if _, rollbackErr := s.callback(currentConfig.(ygot.ValidatedGoStruct), s.config.Data); rollbackErr != nil {
return nil, status.Errorf(codes.Internal, "error in rollback the failed operation (%v): %v", applyErr, rollbackErr)
return nil, status.Errorf(codes.Aborted, "error in applying operation to device: %v", applyErr)
s.config.Data = valNewConfig
// notify subscribers about the changes
err = s.PublishNotificationsToSubscribers(diff)
if err != nil {
msg := fmt.Sprintf("error while publishing config changes to subscribers: %v", err)
log.Error(msg)
return nil, status.Error(codes.Internal, msg)
}
Prefix: req.GetPrefix(),
Response: results,
Timestamp: time.Now().UnixNano(),
}, nil
}
// InternalUpdate is an experimental feature to let the server update its
// internal states. Use it with your own risk.
func (s *Server) InternalUpdate(fp func(config *ygot.ValidatedGoStruct) error) error {
s.config.Lock()
defer s.config.Unlock()
return fp(&s.config.Data)