Skip to content
Snippets Groups Projects
Commit ca0da1f0 authored by Fabian Seidl's avatar Fabian Seidl
Browse files

test for subscribe component, ugly code, WIP

parent ee56ce6b
Branches
No related tags found
3 merge requests!376Add additional example application hostname-checker,!343Add basic application framework and example application to show interaction between events an NBI,!342Resolve "Add an option to send gNMI Subscribe requests via SBI"
Pipeline #107924 failed
This commit is part of merge request !342. Comments created here will be created in the context of that merge request.
...@@ -3,6 +3,8 @@ package nucleus ...@@ -3,6 +3,8 @@ package nucleus
import ( import (
"context" "context"
"fmt" "fmt"
"path"
"time"
"code.fbi.h-da.de/danet/gosdn/controller/interfaces/change" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/change"
...@@ -327,6 +329,70 @@ func (g *Gnmi) subscribe(ctx context.Context) error { ...@@ -327,6 +329,70 @@ func (g *Gnmi) subscribe(ctx context.Context) error {
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan) return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
} }
// Subscribe calls GNMI subscribe
func (g *Gnmi) subscribe2(ctx context.Context, stringRespChan chan string) error {
ctx = gnmi.NewContext(ctx, g.config)
opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions)
if !ok {
return &errors.ErrInvalidTypeAssertion{
Value: ctx.Value(types.CtxKeyOpts),
Type: &gnmi.SubscribeOptions{},
}
}
go func() {
log.WithFields(log.Fields{
"address": opts.Target,
"paths": opts.Paths,
"mode": opts.Mode,
"interval": opts.SampleInterval,
}).Info("subscribed to gNMI target")
for {
resp := <-g.RespChan
if resp != nil {
if err := LogSubscribeResponse(resp, stringRespChan); err != nil {
log.Fatal(err)
}
}
}
}()
return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan)
}
// LogSubscribeResponse logs update responses to stderr.
func LogSubscribeResponse(response *gpb.SubscribeResponse, stringRespChan chan string) error {
switch resp := response.Response.(type) {
case *gpb.SubscribeResponse_Error:
//return errors.New(resp.Error.Message)
//TODO: fix error
return errors.ErrNotYetImplemented{}
case *gpb.SubscribeResponse_SyncResponse:
if !resp.SyncResponse {
//return errors.New("initial sync failed")
//TODO: fix error
return errors.ErrNotYetImplemented{}
}
case *gpb.SubscribeResponse_Update:
t := time.Unix(0, resp.Update.Timestamp).UTC()
prefix := resp.Update.Prefix
var target string
if t := resp.Update.Prefix.GetTarget(); t != "" {
target = "(" + t + ") "
}
for _, update := range resp.Update.Update {
stringRespChan <- fmt.Sprintf("[%s] %s%s = %s\n", t.Format(time.RFC3339Nano),
target,
path.Join(prefix.String(), update.Path.String()),
update.String())
}
for _, del := range resp.Update.Delete {
stringRespChan <- fmt.Sprintf("[%s] %sDeleted %s\n", t.Format(time.RFC3339Nano),
target,
path.Join(prefix.String(), del.String()))
}
}
return nil
}
// Close calls GNMI close // Close calls GNMI close
func (g *Gnmi) Close() error { func (g *Gnmi) Close() error {
return nil return nil
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment