diff --git a/controller/nucleus/gnmi_transport.go b/controller/nucleus/gnmi_transport.go index 84b9e3b3d7e06d755de9cb45d733b512b11846d0..8b8e5a2c3d4ad64bbdc9f752106ded25dfcf3c04 100644 --- a/controller/nucleus/gnmi_transport.go +++ b/controller/nucleus/gnmi_transport.go @@ -3,6 +3,8 @@ package nucleus import ( "context" "fmt" + "path" + "time" "code.fbi.h-da.de/danet/gosdn/controller/interfaces/change" @@ -327,6 +329,70 @@ func (g *Gnmi) subscribe(ctx context.Context) error { return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan) } +// Subscribe calls GNMI subscribe +func (g *Gnmi) subscribe2(ctx context.Context, stringRespChan chan string) error { + ctx = gnmi.NewContext(ctx, g.config) + opts, ok := ctx.Value(types.CtxKeyOpts).(*gnmi.SubscribeOptions) + if !ok { + return &errors.ErrInvalidTypeAssertion{ + Value: ctx.Value(types.CtxKeyOpts), + Type: &gnmi.SubscribeOptions{}, + } + } + go func() { + log.WithFields(log.Fields{ + "address": opts.Target, + "paths": opts.Paths, + "mode": opts.Mode, + "interval": opts.SampleInterval, + }).Info("subscribed to gNMI target") + for { + resp := <-g.RespChan + if resp != nil { + if err := LogSubscribeResponse(resp, stringRespChan); err != nil { + log.Fatal(err) + } + } + } + }() + return gnmi.SubscribeErr(ctx, g.client, opts, g.RespChan) +} + +// LogSubscribeResponse logs update responses to stderr. +func LogSubscribeResponse(response *gpb.SubscribeResponse, stringRespChan chan string) error { + switch resp := response.Response.(type) { + case *gpb.SubscribeResponse_Error: + //return errors.New(resp.Error.Message) + //TODO: fix error + return errors.ErrNotYetImplemented{} + case *gpb.SubscribeResponse_SyncResponse: + if !resp.SyncResponse { + //return errors.New("initial sync failed") + //TODO: fix error + return errors.ErrNotYetImplemented{} + } + case *gpb.SubscribeResponse_Update: + t := time.Unix(0, resp.Update.Timestamp).UTC() + prefix := resp.Update.Prefix + var target string + if t := resp.Update.Prefix.GetTarget(); t != "" { + target = "(" + t + ") " + } + for _, update := range resp.Update.Update { + stringRespChan <- fmt.Sprintf("[%s] %s%s = %s\n", t.Format(time.RFC3339Nano), + target, + path.Join(prefix.String(), update.Path.String()), + update.String()) + } + for _, del := range resp.Update.Delete { + stringRespChan <- fmt.Sprintf("[%s] %sDeleted %s\n", t.Format(time.RFC3339Nano), + target, + path.Join(prefix.String(), del.String())) + } + } + return nil +} + // Close calls GNMI close func (g *Gnmi) Close() error { return nil