diff --git a/metrics/ris-mirror/adapter/prom/ris_mirror_prom_adapter.go b/metrics/ris-mirror/adapter/prom/ris_mirror_prom_adapter.go new file mode 100644 index 0000000000000000000000000000000000000000..95b6f79ce996f44a456d1e4ecf8b5879be093462 --- /dev/null +++ b/metrics/ris-mirror/adapter/prom/ris_mirror_prom_adapter.go @@ -0,0 +1,69 @@ +package prom + +import ( + "github.com/bio-routing/bio-rd/cmd/ris-mirror/rismirror" + "github.com/bio-routing/bio-rd/protocols/ris/metrics" + "github.com/prometheus/client_golang/prometheus" + + vrf_prom "github.com/bio-routing/bio-rd/metrics/vrf/adapter/prom" +) + +const ( + prefix = "bio_rismirror_" +) + +var ( + risMirrorSessionEstablishedDesc *prometheus.Desc + risMirrorObserveRIBMessages *prometheus.Desc +) + +func init() { + labels := []string{"sys_name", "agent_address"} + + risMirrorSessionEstablishedDesc = prometheus.NewDesc(prefix+"session_established", "Indicates if a RIS session is established", labels, nil) + risMirrorObserveRIBMessages = prometheus.NewDesc(prefix+"observe_rib_messages", "Returns number of received rib monitoring messages", labels, nil) +} + +// NewCollector creates a new collector instance for the given RIS mirror server +func NewCollector(risMirror *rismirror.RISMirror) prometheus.Collector { + return &risCollector{ + risMirror: risMirror, + } +} + +// risCollector provides a collector for RIS metrics of BIO to use with Prometheus +type risCollector struct { + risMirror *rismirror.RISMirror +} + +// Describe conforms to the prometheus collector interface +func (c *risCollector) Describe(ch chan<- *prometheus.Desc) { + /*ch <- risMirrorSessionEstablishedDesc + ch <- risMirrorObserveRIBMessages*/ + + vrf_prom.DescribeRouter(ch) +} + +// Collect conforms to the prometheus collector interface +func (c *risCollector) Collect(ch chan<- prometheus.Metric) { + for _, rtr := range c.risMirror.Metrics().Routers { + c.collectForRouter(ch, rtr) + } +} + +func (c *risCollector) collectForRouter(ch chan<- prometheus.Metric, rtr *metrics.RISMirrorRouterMetrics) { + + /*l := []string{rtr.SysName, rtr.Address.String()} + + ch <- prometheus.MustNewConstMetric(routeMonitoringMessagesDesc, prometheus.CounterValue, float64(rtr.RouteMonitoringMessages), l...) + ch <- prometheus.MustNewConstMetric(statisticsReportMessages, prometheus.CounterValue, float64(rtr.StatisticsReportMessages), l...) + ch <- prometheus.MustNewConstMetric(peerDownNotificationMessages, prometheus.CounterValue, float64(rtr.PeerDownNotificationMessages), l...) + ch <- prometheus.MustNewConstMetric(peerUpNotificationMessages, prometheus.CounterValue, float64(rtr.PeerUpNotificationMessages), l...) + ch <- prometheus.MustNewConstMetric(initiationMessages, prometheus.CounterValue, float64(rtr.InitiationMessages), l...) + ch <- prometheus.MustNewConstMetric(terminationMessages, prometheus.CounterValue, float64(rtr.TerminationMessages), l...) + ch <- prometheus.MustNewConstMetric(routeMirroringMessages, prometheus.CounterValue, float64(rtr.RouteMirroringMessages), l...)*/ + + for _, vrfMetric := range rtr.VRFMetrics { + vrf_prom.CollectForVRFRouter(ch, rtr.SysName, rtr.Address.String(), vrfMetric) + } +} diff --git a/protocols/ris/metrics/ris_mirror_metrics.go b/protocols/ris/metrics/ris_mirror_metrics.go new file mode 100644 index 0000000000000000000000000000000000000000..33f1867bec8ea8ea98f97808bbb9550fb368ea2e --- /dev/null +++ b/protocols/ris/metrics/ris_mirror_metrics.go @@ -0,0 +1,43 @@ +package metrics + +import ( + "net" + + vrf_metrics "github.com/bio-routing/bio-rd/routingtable/vrf/metrics" +) + +// RISMirrorMetrics contains per router BMP metrics +type RISMirrorMetrics struct { + Routers []*RISMirrorRouterMetrics +} + +// RISMirrorRouterMetrics contains a routers RIS mirror metrics +type RISMirrorRouterMetrics struct { + // Routers IP Address + Address net.IP + + // SysName of the monitored router + SysName string + + // VRFMetrics represent per VRF metrics + VRFMetrics []*vrf_metrics.VRFMetrics + + RTMirrorMetrics []*RTMirrorMetrics +} + +type RTMirrorMetrics struct { + RTMirrorRISStates []*RTMirrorRISState + UniqueRoutes uint64 + RoutesWithSingleSource uint64 +} + +type RTMirrorRISState struct { + Target string + ConnectionState string + RTMirrorRISAFIStates []*RTMirrorRISAFIState +} + +type RTMirrorRISAFIState struct { + AFI uint8 + Operational bool +} diff --git a/risclient/risclient.go b/risclient/risclient.go new file mode 100644 index 0000000000000000000000000000000000000000..14ae89f41996448d18b2d4c634f920a31f7a917b --- /dev/null +++ b/risclient/risclient.go @@ -0,0 +1,156 @@ +package risclient + +import ( + "context" + "io" + "sync" + + risapi "github.com/bio-routing/bio-rd/cmd/ris/api" + routeapi "github.com/bio-routing/bio-rd/route/api" + "google.golang.org/grpc" + + log "github.com/sirupsen/logrus" +) + +// Client is a client interface +type Client interface { + AddRoute(src interface{}, r *routeapi.Route) error + RemoveRoute(src interface{}, r *routeapi.Route) error + DropAllBySrc(src interface{}) +} + +// RISClient represents a RIS client +type RISClient struct { + req *Request + cc *grpc.ClientConn + c Client + stopCh chan struct{} + wg sync.WaitGroup +} + +// Request is a RISClient config +type Request struct { + Router string + VRFRD uint64 + AFI risapi.ObserveRIBRequest_AFISAFI +} + +func (r *Request) toProtoRequest() *risapi.ObserveRIBRequest { + return &risapi.ObserveRIBRequest{ + Router: r.Router, + VrfId: r.VRFRD, + Afisafi: r.AFI, + } +} + +// New creates a new RISClient +func New(req *Request, cc *grpc.ClientConn, c Client) *RISClient { + return &RISClient{ + req: req, + cc: cc, + c: c, + stopCh: make(chan struct{}), + } +} + +// Stop stops the client +func (r *RISClient) Stop() { + close(r.stopCh) +} + +// Start starts the client +func (r *RISClient) Start() { + r.wg.Add(1) + + go r.run() +} + +// Wait blocks until the client is fully stopped +func (r *RISClient) Wait() { + r.wg.Wait() +} + +func (r *RISClient) stopped() bool { + select { + case <-r.stopCh: + return true + default: + return false + } +} + +func (r *RISClient) run() { + for { + if r.stopped() { + return + } + + risc := risapi.NewRoutingInformationServiceClient(r.cc) + + orc, err := risc.ObserveRIB(context.Background(), r.req.toProtoRequest(), grpc.WaitForReady(true)) + if err != nil { + log.WithError(err).Error("ObserveRIB call failed") + continue + } + + err = r.serviceLoop(orc) + if err == nil { + return + } + + r.serviceLoopLogging(err) + } +} + +func (r *RISClient) serviceLoopLogging(err error) { + if err == io.EOF { + log.WithError(err).WithFields(log.Fields{ + "component": "RISClient", + "function": "run", + }).Info("ObserveRIB ended") + return + } + + log.WithError(err).WithFields(log.Fields{ + "component": "RISClient", + "function": "run", + }).Error("ObserveRIB ended") +} + +func (r *RISClient) serviceLoop(orc risapi.RoutingInformationService_ObserveRIBClient) error { + defer r.processDownEvent() + + for { + if r.stopped() { + return nil + } + + u, err := orc.Recv() + if err != nil { + return err + } + + r.processUpdate(u) + } +} + +func (r *RISClient) processUpdate(u *risapi.RIBUpdate) { + if u.Advertisement { + r.processAdvertisement(u) + return + } + + r.processWithdraw(u) +} + +func (r *RISClient) processAdvertisement(u *risapi.RIBUpdate) { + r.c.AddRoute(r.cc, u.Route) +} + +func (r *RISClient) processWithdraw(u *risapi.RIBUpdate) { + r.c.RemoveRoute(r.cc, u.Route) +} + +func (r *RISClient) processDownEvent() { + r.c.DropAllBySrc(r.cc) +} diff --git a/routingtable/mergedlocrib/mergedlocrib.go b/routingtable/mergedlocrib/mergedlocrib.go new file mode 100644 index 0000000000000000000000000000000000000000..efcc9023a363b54f5a9a3b8205063849d15eb5d7 --- /dev/null +++ b/routingtable/mergedlocrib/mergedlocrib.go @@ -0,0 +1,106 @@ +package mergedlocrib + +import ( + "crypto/sha1" + "sync" + + "github.com/bio-routing/bio-rd/route" + routeapi "github.com/bio-routing/bio-rd/route/api" + "github.com/bio-routing/bio-rd/routingtable/locRIB" + "github.com/golang/protobuf/proto" + "github.com/pkg/errors" +) + +// MergedLocRIB provides an deduplicated routing table +type MergedLocRIB struct { + routes map[[20]byte]*routeContainer + routesMu sync.Mutex + locRIB *locRIB.LocRIB +} + +// New creates a new MergedLocRIB and starts it +func New(locRIB *locRIB.LocRIB) *MergedLocRIB { + return &MergedLocRIB{ + routes: make(map[[20]byte]*routeContainer), + locRIB: locRIB, + } +} + +// DropAllBySrc drops all routes learned from a source +func (rtm *MergedLocRIB) DropAllBySrc(src interface{}) { + rtm.routesMu.Lock() + defer rtm.routesMu.Unlock() + + for h, rc := range rtm.routes { + rtm._delRoute(h, src, rc.route) + } +} + +// AddRoute adds a route +func (rtm *MergedLocRIB) AddRoute(cc interface{}, r *routeapi.Route) error { + h, err := hashRoute(r) + if err != nil { + return errors.Wrap(err, "Hashing failed") + } + + rtm.routesMu.Lock() + defer rtm.routesMu.Unlock() + + if _, exists := rtm.routes[h]; !exists { + s := route.RouteFromProtoRoute(r, true) + rtm.routes[h] = newRouteContainer(r, cc) + rtm.locRIB.AddPath(s.Prefix(), s.Paths()[0]) + return nil + } + + rtm.routes[h].addSource(cc) + return nil +} + +// RemoveRoute deletes a route +func (rtm *MergedLocRIB) RemoveRoute(cc interface{}, r *routeapi.Route) error { + h, err := hashRoute(r) + if err != nil { + return errors.Wrap(err, "Hashing failed") + } + + rtm.routesMu.Lock() + defer rtm.routesMu.Unlock() + + if _, exists := rtm.routes[h]; !exists { + return nil + } + + rtm._delRoute(h, cc, r) + return nil +} + +func (rtm *MergedLocRIB) _delRoute(h [20]byte, src interface{}, r *routeapi.Route) { + rtm.routes[h].removeSource(src) + + if rtm.routes[h].srcCount() > 0 { + return + } + + s := route.RouteFromProtoRoute(r, true) + rtm.locRIB.RemovePath(s.Prefix(), s.Paths()[0]) + delete(rtm.routes, h) +} + +func hashRoute(route *routeapi.Route) ([20]byte, error) { + m, err := proto.Marshal(route) + if err != nil { + return [20]byte{}, errors.Wrap(err, "Proto marshal failed") + } + + h := sha1.New() + _, err = h.Write(m) + if err != nil { + return [20]byte{}, errors.Wrap(err, "Write failed") + } + res := [20]byte{} + x := h.Sum(nil) + copy(res[:], x) + + return res, nil +} diff --git a/routingtable/mergedlocrib/mergedlocrib_test.go b/routingtable/mergedlocrib/mergedlocrib_test.go new file mode 100644 index 0000000000000000000000000000000000000000..c0e3adc3983669011c5dfab7a77fe6e3fcb903a4 --- /dev/null +++ b/routingtable/mergedlocrib/mergedlocrib_test.go @@ -0,0 +1,234 @@ +package mergedlocrib + +import ( + "testing" + + "github.com/bio-routing/bio-rd/route" + routeapi "github.com/bio-routing/bio-rd/route/api" + "github.com/bio-routing/bio-rd/routingtable/locRIB" + "github.com/stretchr/testify/assert" + + bnet "github.com/bio-routing/bio-rd/net" +) + +type srcRouteTuple struct { + src interface{} + route *routeapi.Route +} + +func TestMergedLocRIB(t *testing.T) { + tests := []struct { + name string + add []*srcRouteTuple + expectedAfterAdd []*route.Route + remove []*srcRouteTuple + expectedAfterRemove []*route.Route + }{ + { + name: "Test #1: Single source", + add: []*srcRouteTuple{ + { + src: "a", + route: &routeapi.Route{ + Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(), + Paths: []*routeapi.Path{ + { + Type: routeapi.Path_Static, + StaticPath: &routeapi.StaticPath{ + NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(), + }, + }, + }, + }, + }, + }, + expectedAfterAdd: []*route.Route{ + route.NewRoute(bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).Ptr(), &route.Path{ + Type: route.StaticPathType, + StaticPath: &route.StaticPath{ + NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).Ptr(), + }, + }), + }, + remove: []*srcRouteTuple{ + { + src: "a", + route: &routeapi.Route{ + Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(), + Paths: []*routeapi.Path{ + { + Type: routeapi.Path_Static, + StaticPath: &routeapi.StaticPath{ + NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(), + }, + }, + }, + }, + }, + }, + expectedAfterRemove: []*route.Route{}, + }, + { + name: "Test #2: Multiple source, single delete", + add: []*srcRouteTuple{ + { + src: "a", + route: &routeapi.Route{ + Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(), + Paths: []*routeapi.Path{ + { + Type: routeapi.Path_Static, + StaticPath: &routeapi.StaticPath{ + NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(), + }, + }, + }, + }, + }, + { + src: "b", + route: &routeapi.Route{ + Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(), + Paths: []*routeapi.Path{ + { + Type: routeapi.Path_Static, + StaticPath: &routeapi.StaticPath{ + NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(), + }, + }, + }, + }, + }, + }, + expectedAfterAdd: []*route.Route{ + route.NewRoute(bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).Ptr(), &route.Path{ + Type: route.StaticPathType, + StaticPath: &route.StaticPath{ + NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).Ptr(), + }, + }), + }, + remove: []*srcRouteTuple{ + { + src: "a", + route: &routeapi.Route{ + Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(), + Paths: []*routeapi.Path{ + { + Type: routeapi.Path_Static, + StaticPath: &routeapi.StaticPath{ + NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(), + }, + }, + }, + }, + }, + }, + expectedAfterRemove: []*route.Route{ + route.NewRoute(bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).Ptr(), &route.Path{ + Type: route.StaticPathType, + StaticPath: &route.StaticPath{ + NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).Ptr(), + }, + }), + }, + }, + { + name: "Test #3: Multiple source, double delete", + add: []*srcRouteTuple{ + { + src: "a", + route: &routeapi.Route{ + Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(), + Paths: []*routeapi.Path{ + { + Type: routeapi.Path_Static, + StaticPath: &routeapi.StaticPath{ + NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(), + }, + }, + }, + }, + }, + { + src: "b", + route: &routeapi.Route{ + Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(), + Paths: []*routeapi.Path{ + { + Type: routeapi.Path_Static, + StaticPath: &routeapi.StaticPath{ + NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(), + }, + }, + }, + }, + }, + }, + expectedAfterAdd: []*route.Route{ + route.NewRoute(bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).Ptr(), &route.Path{ + Type: route.StaticPathType, + StaticPath: &route.StaticPath{ + NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).Ptr(), + }, + }), + }, + remove: []*srcRouteTuple{ + { + src: "a", + route: &routeapi.Route{ + Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(), + Paths: []*routeapi.Path{ + { + Type: routeapi.Path_Static, + StaticPath: &routeapi.StaticPath{ + NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(), + }, + }, + }, + }, + }, + { + src: "b", + route: &routeapi.Route{ + Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(), + Paths: []*routeapi.Path{ + { + Type: routeapi.Path_Static, + StaticPath: &routeapi.StaticPath{ + NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(), + }, + }, + }, + }, + }, + }, + expectedAfterRemove: []*route.Route{}, + }, + } + + for _, test := range tests { + lr := locRIB.New("test") + rtm := New(lr) + + for _, a := range test.add { + rtm.AddRoute(a.src, a.route) + } + + selectPaths(test.expectedAfterAdd) + assert.Equal(t, test.expectedAfterAdd, lr.Dump(), test.name) + + for _, r := range test.remove { + rtm.RemoveRoute(r.src, r.route) + } + + selectPaths(test.expectedAfterRemove) + assert.Equal(t, test.expectedAfterRemove, lr.Dump(), test.name) + } +} + +func selectPaths(routes []*route.Route) { + for _, r := range routes { + r.PathSelection() + } +} diff --git a/routingtable/mergedlocrib/routecontainer.go b/routingtable/mergedlocrib/routecontainer.go new file mode 100644 index 0000000000000000000000000000000000000000..51b42ac20c2b2530645ec21a39ad5f007ae3a88e --- /dev/null +++ b/routingtable/mergedlocrib/routecontainer.go @@ -0,0 +1,46 @@ +package mergedlocrib + +import ( + routeapi "github.com/bio-routing/bio-rd/route/api" +) + +// routeContainer groups a route with one ore multiple source the route was received from +type routeContainer struct { + route *routeapi.Route + sources []interface{} +} + +func newRouteContainer(route *routeapi.Route, source interface{}) *routeContainer { + return &routeContainer{ + route: route, + sources: []interface{}{source}, + } +} + +func (rc *routeContainer) addSource(src interface{}) { + rc.sources = append(rc.sources, src) +} + +func (rc *routeContainer) removeSource(src interface{}) { + i := rc.getSourceIndex(src) + if i < 0 { + return + } + + rc.sources[i] = rc.sources[len(rc.sources)-1] + rc.sources = rc.sources[:len(rc.sources)-1] +} + +func (rc *routeContainer) getSourceIndex(src interface{}) int { + for i := range rc.sources { + if rc.sources[i] == src { + return i + } + } + + return -1 +} + +func (rc *routeContainer) srcCount() int { + return len(rc.sources) +} diff --git a/util/grpc/clientmanager/clientmanager.go b/util/grpc/clientmanager/clientmanager.go new file mode 100644 index 0000000000000000000000000000000000000000..e70a6b6d6061afcf9eac9a7f0204052c2542adcf --- /dev/null +++ b/util/grpc/clientmanager/clientmanager.go @@ -0,0 +1,52 @@ +package clientmanager + +import ( + "fmt" + "sync" + + "github.com/pkg/errors" + "google.golang.org/grpc" +) + +// ClientManager manages GRPC client connections +type ClientManager struct { + connections map[string]*grpc.ClientConn + connectionsMu sync.RWMutex +} + +// New creates a new ClientManager +func New() *ClientManager { + return &ClientManager{ + connections: make(map[string]*grpc.ClientConn), + } +} + +// Get gets a target connection +func (cm *ClientManager) Get(target string) *grpc.ClientConn { + cm.connectionsMu.RLock() + defer cm.connectionsMu.RUnlock() + + if _, exists := cm.connections[target]; !exists { + return nil + } + + return cm.connections[target] +} + +// Add adds a target +func (cm *ClientManager) Add(target string, opts ...grpc.DialOption) error { + cm.connectionsMu.Lock() + defer cm.connectionsMu.Unlock() + + if _, exists := cm.connections[target]; exists { + return fmt.Errorf("Target exists already") + } + + cc, err := grpc.Dial(target, opts...) + if err != nil { + return errors.Wrap(err, "grpc.Dial failed") + } + + cm.connections[target] = cc + return nil +}