Skip to content
Snippets Groups Projects
Commit e7f66125 authored by Oliver Herms's avatar Oliver Herms
Browse files

Add untracked files

parent 2a3355c4
No related branches found
No related tags found
No related merge requests found
package prom
import (
"github.com/bio-routing/bio-rd/cmd/ris-mirror/rismirror"
"github.com/bio-routing/bio-rd/protocols/ris/metrics"
"github.com/prometheus/client_golang/prometheus"
vrf_prom "github.com/bio-routing/bio-rd/metrics/vrf/adapter/prom"
)
const (
prefix = "bio_rismirror_"
)
var (
risMirrorSessionEstablishedDesc *prometheus.Desc
risMirrorObserveRIBMessages *prometheus.Desc
)
func init() {
labels := []string{"sys_name", "agent_address"}
risMirrorSessionEstablishedDesc = prometheus.NewDesc(prefix+"session_established", "Indicates if a RIS session is established", labels, nil)
risMirrorObserveRIBMessages = prometheus.NewDesc(prefix+"observe_rib_messages", "Returns number of received rib monitoring messages", labels, nil)
}
// NewCollector creates a new collector instance for the given RIS mirror server
func NewCollector(risMirror *rismirror.RISMirror) prometheus.Collector {
return &risCollector{
risMirror: risMirror,
}
}
// risCollector provides a collector for RIS metrics of BIO to use with Prometheus
type risCollector struct {
risMirror *rismirror.RISMirror
}
// Describe conforms to the prometheus collector interface
func (c *risCollector) Describe(ch chan<- *prometheus.Desc) {
/*ch <- risMirrorSessionEstablishedDesc
ch <- risMirrorObserveRIBMessages*/
vrf_prom.DescribeRouter(ch)
}
// Collect conforms to the prometheus collector interface
func (c *risCollector) Collect(ch chan<- prometheus.Metric) {
for _, rtr := range c.risMirror.Metrics().Routers {
c.collectForRouter(ch, rtr)
}
}
func (c *risCollector) collectForRouter(ch chan<- prometheus.Metric, rtr *metrics.RISMirrorRouterMetrics) {
/*l := []string{rtr.SysName, rtr.Address.String()}
ch <- prometheus.MustNewConstMetric(routeMonitoringMessagesDesc, prometheus.CounterValue, float64(rtr.RouteMonitoringMessages), l...)
ch <- prometheus.MustNewConstMetric(statisticsReportMessages, prometheus.CounterValue, float64(rtr.StatisticsReportMessages), l...)
ch <- prometheus.MustNewConstMetric(peerDownNotificationMessages, prometheus.CounterValue, float64(rtr.PeerDownNotificationMessages), l...)
ch <- prometheus.MustNewConstMetric(peerUpNotificationMessages, prometheus.CounterValue, float64(rtr.PeerUpNotificationMessages), l...)
ch <- prometheus.MustNewConstMetric(initiationMessages, prometheus.CounterValue, float64(rtr.InitiationMessages), l...)
ch <- prometheus.MustNewConstMetric(terminationMessages, prometheus.CounterValue, float64(rtr.TerminationMessages), l...)
ch <- prometheus.MustNewConstMetric(routeMirroringMessages, prometheus.CounterValue, float64(rtr.RouteMirroringMessages), l...)*/
for _, vrfMetric := range rtr.VRFMetrics {
vrf_prom.CollectForVRFRouter(ch, rtr.SysName, rtr.Address.String(), vrfMetric)
}
}
package metrics
import (
"net"
vrf_metrics "github.com/bio-routing/bio-rd/routingtable/vrf/metrics"
)
// RISMirrorMetrics contains per router BMP metrics
type RISMirrorMetrics struct {
Routers []*RISMirrorRouterMetrics
}
// RISMirrorRouterMetrics contains a routers RIS mirror metrics
type RISMirrorRouterMetrics struct {
// Routers IP Address
Address net.IP
// SysName of the monitored router
SysName string
// VRFMetrics represent per VRF metrics
VRFMetrics []*vrf_metrics.VRFMetrics
RTMirrorMetrics []*RTMirrorMetrics
}
type RTMirrorMetrics struct {
RTMirrorRISStates []*RTMirrorRISState
UniqueRoutes uint64
RoutesWithSingleSource uint64
}
type RTMirrorRISState struct {
Target string
ConnectionState string
RTMirrorRISAFIStates []*RTMirrorRISAFIState
}
type RTMirrorRISAFIState struct {
AFI uint8
Operational bool
}
package risclient
import (
"context"
"io"
"sync"
risapi "github.com/bio-routing/bio-rd/cmd/ris/api"
routeapi "github.com/bio-routing/bio-rd/route/api"
"google.golang.org/grpc"
log "github.com/sirupsen/logrus"
)
// Client is a client interface
type Client interface {
AddRoute(src interface{}, r *routeapi.Route) error
RemoveRoute(src interface{}, r *routeapi.Route) error
DropAllBySrc(src interface{})
}
// RISClient represents a RIS client
type RISClient struct {
req *Request
cc *grpc.ClientConn
c Client
stopCh chan struct{}
wg sync.WaitGroup
}
// Request is a RISClient config
type Request struct {
Router string
VRFRD uint64
AFI risapi.ObserveRIBRequest_AFISAFI
}
func (r *Request) toProtoRequest() *risapi.ObserveRIBRequest {
return &risapi.ObserveRIBRequest{
Router: r.Router,
VrfId: r.VRFRD,
Afisafi: r.AFI,
}
}
// New creates a new RISClient
func New(req *Request, cc *grpc.ClientConn, c Client) *RISClient {
return &RISClient{
req: req,
cc: cc,
c: c,
stopCh: make(chan struct{}),
}
}
// Stop stops the client
func (r *RISClient) Stop() {
close(r.stopCh)
}
// Start starts the client
func (r *RISClient) Start() {
r.wg.Add(1)
go r.run()
}
// Wait blocks until the client is fully stopped
func (r *RISClient) Wait() {
r.wg.Wait()
}
func (r *RISClient) stopped() bool {
select {
case <-r.stopCh:
return true
default:
return false
}
}
func (r *RISClient) run() {
for {
if r.stopped() {
return
}
risc := risapi.NewRoutingInformationServiceClient(r.cc)
orc, err := risc.ObserveRIB(context.Background(), r.req.toProtoRequest(), grpc.WaitForReady(true))
if err != nil {
log.WithError(err).Error("ObserveRIB call failed")
continue
}
err = r.serviceLoop(orc)
if err == nil {
return
}
r.serviceLoopLogging(err)
}
}
func (r *RISClient) serviceLoopLogging(err error) {
if err == io.EOF {
log.WithError(err).WithFields(log.Fields{
"component": "RISClient",
"function": "run",
}).Info("ObserveRIB ended")
return
}
log.WithError(err).WithFields(log.Fields{
"component": "RISClient",
"function": "run",
}).Error("ObserveRIB ended")
}
func (r *RISClient) serviceLoop(orc risapi.RoutingInformationService_ObserveRIBClient) error {
defer r.processDownEvent()
for {
if r.stopped() {
return nil
}
u, err := orc.Recv()
if err != nil {
return err
}
r.processUpdate(u)
}
}
func (r *RISClient) processUpdate(u *risapi.RIBUpdate) {
if u.Advertisement {
r.processAdvertisement(u)
return
}
r.processWithdraw(u)
}
func (r *RISClient) processAdvertisement(u *risapi.RIBUpdate) {
r.c.AddRoute(r.cc, u.Route)
}
func (r *RISClient) processWithdraw(u *risapi.RIBUpdate) {
r.c.RemoveRoute(r.cc, u.Route)
}
func (r *RISClient) processDownEvent() {
r.c.DropAllBySrc(r.cc)
}
package mergedlocrib
import (
"crypto/sha1"
"sync"
"github.com/bio-routing/bio-rd/route"
routeapi "github.com/bio-routing/bio-rd/route/api"
"github.com/bio-routing/bio-rd/routingtable/locRIB"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
)
// MergedLocRIB provides an deduplicated routing table
type MergedLocRIB struct {
routes map[[20]byte]*routeContainer
routesMu sync.Mutex
locRIB *locRIB.LocRIB
}
// New creates a new MergedLocRIB and starts it
func New(locRIB *locRIB.LocRIB) *MergedLocRIB {
return &MergedLocRIB{
routes: make(map[[20]byte]*routeContainer),
locRIB: locRIB,
}
}
// DropAllBySrc drops all routes learned from a source
func (rtm *MergedLocRIB) DropAllBySrc(src interface{}) {
rtm.routesMu.Lock()
defer rtm.routesMu.Unlock()
for h, rc := range rtm.routes {
rtm._delRoute(h, src, rc.route)
}
}
// AddRoute adds a route
func (rtm *MergedLocRIB) AddRoute(cc interface{}, r *routeapi.Route) error {
h, err := hashRoute(r)
if err != nil {
return errors.Wrap(err, "Hashing failed")
}
rtm.routesMu.Lock()
defer rtm.routesMu.Unlock()
if _, exists := rtm.routes[h]; !exists {
s := route.RouteFromProtoRoute(r, true)
rtm.routes[h] = newRouteContainer(r, cc)
rtm.locRIB.AddPath(s.Prefix(), s.Paths()[0])
return nil
}
rtm.routes[h].addSource(cc)
return nil
}
// RemoveRoute deletes a route
func (rtm *MergedLocRIB) RemoveRoute(cc interface{}, r *routeapi.Route) error {
h, err := hashRoute(r)
if err != nil {
return errors.Wrap(err, "Hashing failed")
}
rtm.routesMu.Lock()
defer rtm.routesMu.Unlock()
if _, exists := rtm.routes[h]; !exists {
return nil
}
rtm._delRoute(h, cc, r)
return nil
}
func (rtm *MergedLocRIB) _delRoute(h [20]byte, src interface{}, r *routeapi.Route) {
rtm.routes[h].removeSource(src)
if rtm.routes[h].srcCount() > 0 {
return
}
s := route.RouteFromProtoRoute(r, true)
rtm.locRIB.RemovePath(s.Prefix(), s.Paths()[0])
delete(rtm.routes, h)
}
func hashRoute(route *routeapi.Route) ([20]byte, error) {
m, err := proto.Marshal(route)
if err != nil {
return [20]byte{}, errors.Wrap(err, "Proto marshal failed")
}
h := sha1.New()
_, err = h.Write(m)
if err != nil {
return [20]byte{}, errors.Wrap(err, "Write failed")
}
res := [20]byte{}
x := h.Sum(nil)
copy(res[:], x)
return res, nil
}
package mergedlocrib
import (
"testing"
"github.com/bio-routing/bio-rd/route"
routeapi "github.com/bio-routing/bio-rd/route/api"
"github.com/bio-routing/bio-rd/routingtable/locRIB"
"github.com/stretchr/testify/assert"
bnet "github.com/bio-routing/bio-rd/net"
)
type srcRouteTuple struct {
src interface{}
route *routeapi.Route
}
func TestMergedLocRIB(t *testing.T) {
tests := []struct {
name string
add []*srcRouteTuple
expectedAfterAdd []*route.Route
remove []*srcRouteTuple
expectedAfterRemove []*route.Route
}{
{
name: "Test #1: Single source",
add: []*srcRouteTuple{
{
src: "a",
route: &routeapi.Route{
Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(),
Paths: []*routeapi.Path{
{
Type: routeapi.Path_Static,
StaticPath: &routeapi.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(),
},
},
},
},
},
},
expectedAfterAdd: []*route.Route{
route.NewRoute(bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).Ptr(), &route.Path{
Type: route.StaticPathType,
StaticPath: &route.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).Ptr(),
},
}),
},
remove: []*srcRouteTuple{
{
src: "a",
route: &routeapi.Route{
Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(),
Paths: []*routeapi.Path{
{
Type: routeapi.Path_Static,
StaticPath: &routeapi.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(),
},
},
},
},
},
},
expectedAfterRemove: []*route.Route{},
},
{
name: "Test #2: Multiple source, single delete",
add: []*srcRouteTuple{
{
src: "a",
route: &routeapi.Route{
Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(),
Paths: []*routeapi.Path{
{
Type: routeapi.Path_Static,
StaticPath: &routeapi.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(),
},
},
},
},
},
{
src: "b",
route: &routeapi.Route{
Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(),
Paths: []*routeapi.Path{
{
Type: routeapi.Path_Static,
StaticPath: &routeapi.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(),
},
},
},
},
},
},
expectedAfterAdd: []*route.Route{
route.NewRoute(bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).Ptr(), &route.Path{
Type: route.StaticPathType,
StaticPath: &route.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).Ptr(),
},
}),
},
remove: []*srcRouteTuple{
{
src: "a",
route: &routeapi.Route{
Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(),
Paths: []*routeapi.Path{
{
Type: routeapi.Path_Static,
StaticPath: &routeapi.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(),
},
},
},
},
},
},
expectedAfterRemove: []*route.Route{
route.NewRoute(bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).Ptr(), &route.Path{
Type: route.StaticPathType,
StaticPath: &route.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).Ptr(),
},
}),
},
},
{
name: "Test #3: Multiple source, double delete",
add: []*srcRouteTuple{
{
src: "a",
route: &routeapi.Route{
Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(),
Paths: []*routeapi.Path{
{
Type: routeapi.Path_Static,
StaticPath: &routeapi.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(),
},
},
},
},
},
{
src: "b",
route: &routeapi.Route{
Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(),
Paths: []*routeapi.Path{
{
Type: routeapi.Path_Static,
StaticPath: &routeapi.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(),
},
},
},
},
},
},
expectedAfterAdd: []*route.Route{
route.NewRoute(bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).Ptr(), &route.Path{
Type: route.StaticPathType,
StaticPath: &route.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).Ptr(),
},
}),
},
remove: []*srcRouteTuple{
{
src: "a",
route: &routeapi.Route{
Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(),
Paths: []*routeapi.Path{
{
Type: routeapi.Path_Static,
StaticPath: &routeapi.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(),
},
},
},
},
},
{
src: "b",
route: &routeapi.Route{
Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(),
Paths: []*routeapi.Path{
{
Type: routeapi.Path_Static,
StaticPath: &routeapi.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(),
},
},
},
},
},
},
expectedAfterRemove: []*route.Route{},
},
}
for _, test := range tests {
lr := locRIB.New("test")
rtm := New(lr)
for _, a := range test.add {
rtm.AddRoute(a.src, a.route)
}
selectPaths(test.expectedAfterAdd)
assert.Equal(t, test.expectedAfterAdd, lr.Dump(), test.name)
for _, r := range test.remove {
rtm.RemoveRoute(r.src, r.route)
}
selectPaths(test.expectedAfterRemove)
assert.Equal(t, test.expectedAfterRemove, lr.Dump(), test.name)
}
}
func selectPaths(routes []*route.Route) {
for _, r := range routes {
r.PathSelection()
}
}
package mergedlocrib
import (
routeapi "github.com/bio-routing/bio-rd/route/api"
)
// routeContainer groups a route with one ore multiple source the route was received from
type routeContainer struct {
route *routeapi.Route
sources []interface{}
}
func newRouteContainer(route *routeapi.Route, source interface{}) *routeContainer {
return &routeContainer{
route: route,
sources: []interface{}{source},
}
}
func (rc *routeContainer) addSource(src interface{}) {
rc.sources = append(rc.sources, src)
}
func (rc *routeContainer) removeSource(src interface{}) {
i := rc.getSourceIndex(src)
if i < 0 {
return
}
rc.sources[i] = rc.sources[len(rc.sources)-1]
rc.sources = rc.sources[:len(rc.sources)-1]
}
func (rc *routeContainer) getSourceIndex(src interface{}) int {
for i := range rc.sources {
if rc.sources[i] == src {
return i
}
}
return -1
}
func (rc *routeContainer) srcCount() int {
return len(rc.sources)
}
package clientmanager
import (
"fmt"
"sync"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
// ClientManager manages GRPC client connections
type ClientManager struct {
connections map[string]*grpc.ClientConn
connectionsMu sync.RWMutex
}
// New creates a new ClientManager
func New() *ClientManager {
return &ClientManager{
connections: make(map[string]*grpc.ClientConn),
}
}
// Get gets a target connection
func (cm *ClientManager) Get(target string) *grpc.ClientConn {
cm.connectionsMu.RLock()
defer cm.connectionsMu.RUnlock()
if _, exists := cm.connections[target]; !exists {
return nil
}
return cm.connections[target]
}
// Add adds a target
func (cm *ClientManager) Add(target string, opts ...grpc.DialOption) error {
cm.connectionsMu.Lock()
defer cm.connectionsMu.Unlock()
if _, exists := cm.connections[target]; exists {
return fmt.Errorf("Target exists already")
}
cc, err := grpc.Dial(target, opts...)
if err != nil {
return errors.Wrap(err, "grpc.Dial failed")
}
cm.connections[target] = cc
return nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment