Skip to content
Snippets Groups Projects

Resolve "Add an option to send gNMI Subscribe requests via SBI"

1 file
+ 66
0
Compare changes
  • Side-by-side
  • Inline
@@ -3,6 +3,8 @@ package nucleus
@@ -3,6 +3,8 @@ package nucleus
import (
import (
"context"
"context"
"fmt"
"fmt"
 
"path"
 
"time"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
@@ -327,6 +329,70 @@ func (g *Gnmi) subscribe(ctx context.Context) error {
@@ -327,6 +329,70 @@ func (g *Gnmi) subscribe(ctx context.Context) error {
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
}
}
 
// Subscribe calls GNMI subscribe
 
func (g *Gnmi) subscribe2(ctx context.Context, stringRespChan chan string) error {
 
ctx = gnmi.NewContext(ctx, g.config)
 
opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
 
if !ok {
 
return &errors.ErrInvalidTypeAssertion{
 
Value: ctx.Value(types.CtxKeyOpts),
 
Type: &gnmi.SubscribeOptions{},
 
}
 
}
 
go func() {
 
log.WithFields(log.Fields{
 
"address": opts.Target,
 
"paths": opts.Paths,
 
"mode": opts.Mode,
 
"interval": opts.SampleInterval,
 
}).Info("subscribed to gNMI target")
 
for {
 
resp := <-g.RespChan
 
if resp != nil {
 
if err := LogSubscribeResponse(resp, stringRespChan); err != nil {
 
log.Fatal(err)
 
}
 
}
 
}
 
}()
 
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
 
}
 
 
// LogSubscribeResponse logs update responses to stderr.
 
func LogSubscribeResponse(response *gpb.SubscribeResponse, stringRespChan chan string) error {
 
switch resp := response.Response.(type) {
 
case *gpb.SubscribeResponse_Error:
 
//return errors.New(resp.Error.Message)
 
//TODO: fix error
 
return errors.ErrNotYetImplemented{}
 
case *gpb.SubscribeResponse_SyncResponse:
 
if !resp.SyncResponse {
 
//return errors.New("initial sync failed")
 
//TODO: fix error
 
return errors.ErrNotYetImplemented{}
 
}
 
case *gpb.SubscribeResponse_Update:
 
t := time.Unix(0, resp.Update.Timestamp).UTC()
 
prefix := resp.Update.Prefix
 
var target string
 
if t := resp.Update.Prefix.GetTarget(); t != "" {
 
target = "(" + t + ") "
 
}
 
for _, update := range resp.Update.Update {
 
stringRespChan <- fmt.Sprintf("[%s] %s%s = %s\n", t.Format(time.RFC3339Nano),
 
target,
 
path.Join(prefix.String(), update.Path.String()),
 
update.String())
 
}
 
for _, del := range resp.Update.Delete {
 
stringRespChan <- fmt.Sprintf("[%s] %sDeleted %s\n", t.Format(time.RFC3339Nano),
 
target,
 
path.Join(prefix.String(), del.String()))
 
}
 
}
 
return nil
 
}
 
// Close calls GNMI close
// Close calls GNMI close
func (g *Gnmi) Close() error {
func (g *Gnmi) Close() error {
return nil
return nil
Loading