Skip to content
Snippets Groups Projects
Commit bef0d114 authored by Oliver Herms's avatar Oliver Herms
Browse files

Refactoring. Add Tests.

parent d7793cd7
No related branches found
No related tags found
No related merge requests found
......@@ -8,14 +8,15 @@ import (
"github.com/bio-routing/bio-rd/cmd/ris-mirror/config"
"github.com/bio-routing/bio-rd/cmd/ris-mirror/rismirror"
pb "github.com/bio-routing/bio-rd/cmd/ris/api"
"github.com/bio-routing/bio-rd/cmd/ris/risserver"
prom_ris_mirror "github.com/bio-routing/bio-rd/metrics/ris-mirror/adapter/prom"
"github.com/bio-routing/bio-rd/routingtable/vrf"
"github.com/bio-routing/bio-rd/util/servicewrapper"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
pb "github.com/bio-routing/bio-rd/cmd/ris/api"
)
var (
......@@ -36,6 +37,7 @@ func main() {
risInstances := connectAllRISInstances(cfg.GetRISInstances())
m := rismirror.New()
prometheus.MustRegister(prom_ris_mirror.NewCollector(m))
for _, rcfg := range cfg.RIBConfigs {
for _, vrfHumanReadable := range rcfg.VRFs {
......
......@@ -7,6 +7,8 @@ import (
"github.com/bio-routing/bio-rd/cmd/ris-mirror/rtmirror"
"github.com/bio-routing/bio-rd/protocols/bgp/server"
"github.com/bio-routing/bio-rd/protocols/ris/metrics"
"github.com/bio-routing/bio-rd/routingtable/vrf"
"google.golang.org/grpc"
)
......@@ -22,7 +24,7 @@ func New() *RISMirror {
}
}
func (rism *RISMirror) AddTarget(rtrName string, address net.IP, vrf uint64, sources []*grpc.ClientConn) {
func (rism *RISMirror) AddTarget(rtrName string, address net.IP, vrfRD uint64, sources []*grpc.ClientConn) {
rism.routersMu.Lock()
defer rism.routersMu.Unlock()
......@@ -30,12 +32,16 @@ func (rism *RISMirror) AddTarget(rtrName string, address net.IP, vrf uint64, sou
rism.routers[rtrName] = newRouter(rtrName, address)
}
v := rism.routers[rtrName].(*Router).vrfRegistry.CreateVRFIfNotExists(fmt.Sprintf("%d", vrf), vrf)
v := rism.routers[rtrName].(*Router).vrfRegistry.GetVRFByRD(vrfRD)
if v == nil {
v = rism.routers[rtrName].(*Router).vrfRegistry.CreateVRFIfNotExists(fmt.Sprintf("%d", vrfRD), vrfRD)
rtm := rtmirror.New(rtmirror.Config{
Router: rtrName,
VRF: v,
})
rtmirror.New(sources, rtmirror.Config{
Router: rtrName,
VRF: v,
})
rism.routers[rtrName].(*Router).rtMirrors[vrfRD] = rtm
}
}
// GetRouter gets a router
......@@ -60,3 +66,25 @@ func (rism *RISMirror) GetRouters() []server.RouterInterface {
return res
}
func (rism *RISMirror) Metrics() *metrics.RISMirrorMetrics {
res := &metrics.RISMirrorMetrics{
Routers: make([]*metrics.RISMirrorRouterMetrics, 0),
}
rism.routersMu.Lock()
defer rism.routersMu.Unlock()
for _, r := range rism.routers {
rm := &metrics.RISMirrorRouterMetrics{
Address: r.Address(),
SysName: r.Name(),
VRFMetrics: vrf.Metrics(r.(*Router).vrfRegistry),
// TODO: RISUpstreamStatus: Fill In,
}
res.Routers = append(res.Routers, rm)
}
return res
}
......@@ -3,6 +3,7 @@ package rismirror
import (
"net"
"github.com/bio-routing/bio-rd/cmd/ris-mirror/rtmirror"
"github.com/bio-routing/bio-rd/routingtable/vrf"
)
......@@ -11,6 +12,9 @@ type Router struct {
name string
address net.IP
vrfRegistry *vrf.VRFRegistry
// rtMirrors contains RTMirrors organized by VRF route distinguisher
rtMirrors map[uint64]*rtmirror.RTMirror
}
func newRouter(name string, address net.IP) *Router {
......@@ -18,6 +22,7 @@ func newRouter(name string, address net.IP) *Router {
name: name,
address: address,
vrfRegistry: vrf.NewVRFRegistry(),
rtMirrors: make(map[uint64]*rtmirror.RTMirror),
}
}
......
......@@ -2,27 +2,26 @@ package rtmirror
import (
routeapi "github.com/bio-routing/bio-rd/route/api"
"google.golang.org/grpc"
)
// routeContainer groups a route with one ore multiple source the route was received from
type routeContainer struct {
route *routeapi.Route
sources []*grpc.ClientConn
sources []interface{}
}
func newRouteContainer(route *routeapi.Route, source *grpc.ClientConn) *routeContainer {
func newRouteContainer(route *routeapi.Route, source interface{}) *routeContainer {
return &routeContainer{
route: route,
sources: []*grpc.ClientConn{source},
sources: []interface{}{source},
}
}
func (rc *routeContainer) addSource(cc *grpc.ClientConn) {
func (rc *routeContainer) addSource(cc interface{}) {
rc.sources = append(rc.sources, cc)
}
func (rc *routeContainer) removeSource(cc *grpc.ClientConn) {
func (rc *routeContainer) removeSource(cc interface{}) {
i := rc.getSourceIndex(cc)
if i < 0 {
return
......@@ -32,7 +31,7 @@ func (rc *routeContainer) removeSource(cc *grpc.ClientConn) {
rc.sources = rc.sources[:len(rc.sources)-1]
}
func (rc *routeContainer) getSourceIndex(cc *grpc.ClientConn) int {
func (rc *routeContainer) getSourceIndex(cc interface{}) int {
for i := range rc.sources {
if rc.sources[i] == cc {
return i
......
package rtmirror
import (
"context"
"crypto/sha1"
"io"
"sync"
risapi "github.com/bio-routing/bio-rd/cmd/ris/api"
bnet "github.com/bio-routing/bio-rd/net"
"github.com/bio-routing/bio-rd/route"
routeapi "github.com/bio-routing/bio-rd/route/api"
"github.com/bio-routing/bio-rd/routingtable/locRIB"
"github.com/bio-routing/bio-rd/routingtable/vrf"
"github.com/bio-routing/bio-rd/routingtable"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
"google.golang.org/grpc"
log "github.com/sirupsen/logrus"
)
// RTMirror provides an deduplicated mirror of a router/vrf/afi routing table from a multiple RIS instances
// RTMirror provides an deduplicated routing table
type RTMirror struct {
cfg Config
vrf *vrf.VRF
routes map[[20]byte]*routeContainer
routesMu sync.Mutex
grpcClients []*grpc.ClientConn
stop chan struct{}
wg sync.WaitGroup
}
// Config is a route mirror config
type Config struct {
Router string
VRF *vrf.VRF
routes map[[20]byte]*routeContainer
routesMu sync.Mutex
rt *routingtable.RoutingTable
}
// New creates a new RTMirror and starts it
func New(clientConns []*grpc.ClientConn, cfg Config) *RTMirror {
rtm := &RTMirror{
cfg: cfg,
routes: make(map[[20]byte]*routeContainer),
vrf: cfg.VRF,
grpcClients: clientConns,
stop: make(chan struct{}),
}
afis := []risapi.ObserveRIBRequest_AFISAFI{
risapi.ObserveRIBRequest_IPv4Unicast,
risapi.ObserveRIBRequest_IPv6Unicast,
}
for _, afi := range afis {
for _, ris := range rtm.grpcClients {
rtm.wg.Add(1)
go rtm.client(ris, afi)
}
}
return rtm
}
func (rtm *RTMirror) addRIS(addr string) error {
cc, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
return errors.Wrap(err, "grpc dial failed")
}
rtm.grpcClients = append(rtm.grpcClients, cc)
return nil
}
// Dispose stops the RTMirror
func (rtm *RTMirror) Dispose() {
close(rtm.stop)
for _, cc := range rtm.grpcClients {
cc.Close()
func New(rt *routingtable.RoutingTable) *RTMirror {
return &RTMirror{
routes: make(map[[20]byte]*routeContainer),
rt: rt,
}
rtm.wg.Wait()
}
func (rtm *RTMirror) client(cc *grpc.ClientConn, afi risapi.ObserveRIBRequest_AFISAFI) {
defer rtm.wg.Done()
risc := risapi.NewRoutingInformationServiceClient(cc)
for {
if rtm.stopped() {
return
}
orc, err := risc.ObserveRIB(context.Background(), &risapi.ObserveRIBRequest{
Router: rtm.cfg.Router,
VrfId: rtm.cfg.VRF.RD(),
Afisafi: afi,
}, grpc.WaitForReady(true))
if err != nil {
log.WithError(err).Error("ObserveRIB call failed")
continue
}
err = rtm.clientServiceLoop(cc, orc)
if err != nil {
log.WithError(err).Error("client service loop failed")
}
rtm.dropRoutesFromRIS(cc)
}
}
func (rtm *RTMirror) dropRoutesFromRIS(cc *grpc.ClientConn) {
// DropRIS drops all routes learned from a RIS
func (rtm *RTMirror) DropRIS(cc interface{}) {
rtm.routesMu.Lock()
defer rtm.routesMu.Unlock()
......@@ -121,44 +36,11 @@ func (rtm *RTMirror) dropRoutesFromRIS(cc *grpc.ClientConn) {
}
}
func (rtm *RTMirror) stopped() bool {
select {
case <-rtm.stop:
return true
default:
return false
}
}
func (rtm *RTMirror) clientServiceLoop(cc *grpc.ClientConn, orc risapi.RoutingInformationService_ObserveRIBClient) error {
for {
if rtm.stopped() {
return nil
}
u, err := orc.Recv()
if err != nil {
if err == io.EOF {
return nil
}
return errors.Wrap(err, "recv failed")
}
if u.Advertisement {
rtm.addRoute(cc, u.Route)
continue
}
rtm.delRoute(cc, u.Route)
}
}
func (rtm *RTMirror) addRoute(cc *grpc.ClientConn, r *routeapi.Route) {
// AddRoute adds a route
func (rtm *RTMirror) AddRoute(cc interface{}, r *routeapi.Route) error {
h, err := hashRoute(r)
if err != nil {
log.WithError(err).Error("Hashing failed")
return
return errors.Wrap(err, "Hashing failed")
}
rtm.routesMu.Lock()
......@@ -166,59 +48,45 @@ func (rtm *RTMirror) addRoute(cc *grpc.ClientConn, r *routeapi.Route) {
if _, exists := rtm.routes[h]; !exists {
s := route.RouteFromProtoRoute(r, true)
rib := rtm.getRIB(s.Prefix().Addr())
rtm.routes[h] = newRouteContainer(r, cc)
rib.AddPath(s.Prefix(), s.Paths()[0])
return
rtm.rt.AddPath(s.Prefix(), s.Paths()[0])
return nil
}
rtm.routes[h].addSource(cc)
return nil
}
func (rtm *RTMirror) getRIB(addr *bnet.IP) *locRIB.LocRIB {
if addr.IsIPv4() {
return rtm.vrf.IPv4UnicastRIB()
}
return rtm.vrf.IPv6UnicastRIB()
}
func (rtm *RTMirror) delRoute(cc *grpc.ClientConn, r *routeapi.Route) {
// RemoveRoute deletes a route
func (rtm *RTMirror) RemoveRoute(cc interface{}, r *routeapi.Route) error {
h, err := hashRoute(r)
if err != nil {
log.WithError(err).Error("Hashing failed")
return
return errors.Wrap(err, "Hashing failed")
}
rtm.routesMu.Lock()
defer rtm.routesMu.Unlock()
if _, exists := rtm.routes[h]; !exists {
return
return nil
}
rtm._delRoute(h, cc, r)
return nil
}
func (rtm *RTMirror) _delRoute(h [20]byte, cc *grpc.ClientConn, r *routeapi.Route) {
rtm.routes[h].removeSource(cc)
func (rtm *RTMirror) _delRoute(h [20]byte, src interface{}, r *routeapi.Route) {
rtm.routes[h].removeSource(src)
if rtm.routes[h].srcCount() > 0 {
return
}
s := route.RouteFromProtoRoute(r, true)
rib := rtm.getRIB(s.Prefix().Addr())
rib.RemovePath(s.Prefix(), s.Paths()[0])
rtm.rt.RemovePath(s.Prefix(), s.Paths()[0])
delete(rtm.routes, h)
}
// GetVRF exposes the mirrors VRF
func (rtm *RTMirror) GetVRF() *vrf.VRF {
return rtm.vrf
}
func hashRoute(route *routeapi.Route) ([20]byte, error) {
m, err := proto.Marshal(route)
if err != nil {
......
package rtmirror
import (
"testing"
"github.com/bio-routing/bio-rd/route"
routeapi "github.com/bio-routing/bio-rd/route/api"
"github.com/bio-routing/bio-rd/routingtable"
"github.com/stretchr/testify/assert"
bnet "github.com/bio-routing/bio-rd/net"
)
type srcRouteTuple struct {
src interface{}
route *routeapi.Route
}
func TestRTMirror(t *testing.T) {
tests := []struct {
name string
add []*srcRouteTuple
expectedAfterAdd []*route.Route
remove []*srcRouteTuple
expectedAfterRemove []*route.Route
}{
{
name: "Test #1: Single source",
add: []*srcRouteTuple{
{
src: "a",
route: &routeapi.Route{
Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(),
Paths: []*routeapi.Path{
{
Type: routeapi.Path_Static,
StaticPath: &routeapi.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(),
},
},
},
},
},
},
expectedAfterAdd: []*route.Route{
route.NewRoute(bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).Ptr(), &route.Path{
Type: route.StaticPathType,
StaticPath: &route.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).Ptr(),
},
}),
},
remove: []*srcRouteTuple{
{
src: "a",
route: &routeapi.Route{
Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(),
Paths: []*routeapi.Path{
{
Type: routeapi.Path_Static,
StaticPath: &routeapi.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(),
},
},
},
},
},
},
expectedAfterRemove: []*route.Route{},
},
{
name: "Test #2: Multiple source, single delete",
add: []*srcRouteTuple{
{
src: "a",
route: &routeapi.Route{
Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(),
Paths: []*routeapi.Path{
{
Type: routeapi.Path_Static,
StaticPath: &routeapi.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(),
},
},
},
},
},
{
src: "b",
route: &routeapi.Route{
Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(),
Paths: []*routeapi.Path{
{
Type: routeapi.Path_Static,
StaticPath: &routeapi.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(),
},
},
},
},
},
},
expectedAfterAdd: []*route.Route{
route.NewRoute(bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).Ptr(), &route.Path{
Type: route.StaticPathType,
StaticPath: &route.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).Ptr(),
},
}),
},
remove: []*srcRouteTuple{
{
src: "a",
route: &routeapi.Route{
Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(),
Paths: []*routeapi.Path{
{
Type: routeapi.Path_Static,
StaticPath: &routeapi.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(),
},
},
},
},
},
},
expectedAfterRemove: []*route.Route{
route.NewRoute(bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).Ptr(), &route.Path{
Type: route.StaticPathType,
StaticPath: &route.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).Ptr(),
},
}),
},
},
{
name: "Test #3: Multiple source, double delete",
add: []*srcRouteTuple{
{
src: "a",
route: &routeapi.Route{
Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(),
Paths: []*routeapi.Path{
{
Type: routeapi.Path_Static,
StaticPath: &routeapi.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(),
},
},
},
},
},
{
src: "b",
route: &routeapi.Route{
Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(),
Paths: []*routeapi.Path{
{
Type: routeapi.Path_Static,
StaticPath: &routeapi.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(),
},
},
},
},
},
},
expectedAfterAdd: []*route.Route{
route.NewRoute(bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).Ptr(), &route.Path{
Type: route.StaticPathType,
StaticPath: &route.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).Ptr(),
},
}),
},
remove: []*srcRouteTuple{
{
src: "a",
route: &routeapi.Route{
Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(),
Paths: []*routeapi.Path{
{
Type: routeapi.Path_Static,
StaticPath: &routeapi.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(),
},
},
},
},
},
{
src: "b",
route: &routeapi.Route{
Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(),
Paths: []*routeapi.Path{
{
Type: routeapi.Path_Static,
StaticPath: &routeapi.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(),
},
},
},
},
},
},
expectedAfterRemove: []*route.Route{},
},
}
for _, test := range tests {
rt := routingtable.NewRoutingTable()
rtm := New(rt)
for _, a := range test.add {
rtm.AddRoute(a.src, a.route)
}
assert.Equal(t, test.expectedAfterAdd, rt.Dump(), test.name)
/*for _, r := range test.add {
rtm.RemoveRoute(r.src, r.route)
}
assert.Equal(t, test.expectedAfterAdd, rt.Dump(), test.name)*/
}
}
......@@ -18,7 +18,7 @@ var (
)
func init() {
labels := []string{"vrf", "rib", "afi", "safi"}
labels := []string{"vrf_name", "vrf_rd", "rib", "afi", "safi"}
routeCountDesc = prometheus.NewDesc(prefix+"route_count", "Number of routes in the RIB", labels, nil)
routeCountDescRouter = prometheus.NewDesc(prefix+"route_count", "Number of routes in the RIB", append([]string{"sys_name", "agent_address"}, labels...), nil)
}
......@@ -55,7 +55,7 @@ func (c *vrfCollector) Collect(ch chan<- prometheus.Metric) {
func (c *vrfCollector) collectForVRF(ch chan<- prometheus.Metric, v *metrics.VRFMetrics) {
for _, rib := range v.RIBs {
ch <- prometheus.MustNewConstMetric(routeCountDesc, prometheus.GaugeValue, float64(rib.RouteCount),
v.Name, rib.Name, strconv.Itoa(int(rib.AFI)), strconv.Itoa(int(rib.SAFI)))
v.Name, vrf.RouteDistinguisherHumanReadable(v.RD), rib.Name, strconv.Itoa(int(rib.AFI)), strconv.Itoa(int(rib.SAFI)))
}
}
......@@ -63,6 +63,6 @@ func (c *vrfCollector) collectForVRF(ch chan<- prometheus.Metric, v *metrics.VRF
func CollectForVRFRouter(ch chan<- prometheus.Metric, sysName string, agentAddress string, v *metrics.VRFMetrics) {
for _, rib := range v.RIBs {
ch <- prometheus.MustNewConstMetric(routeCountDescRouter, prometheus.GaugeValue, float64(rib.RouteCount),
sysName, agentAddress, v.Name, rib.Name, strconv.Itoa(int(rib.AFI)), strconv.Itoa(int(rib.SAFI)))
sysName, agentAddress, v.Name, vrf.RouteDistinguisherHumanReadable(v.RD), rib.Name, strconv.Itoa(int(rib.AFI)), strconv.Itoa(int(rib.SAFI)))
}
}
......@@ -151,7 +151,7 @@ func (p *Path) String() string {
case FIBPathType:
return p.FIBPath.String()
default:
return "Unknown paty type. Probably not implemented yet"
return fmt.Sprintf("Unknown path type. Probably not implemented yet (%d)", p.Type)
}
}
......
......@@ -279,6 +279,9 @@ func RouteFromProtoRoute(ar *api.Route, dedup bool) *Route {
case api.Path_BGP:
p.Type = BGPPathType
p.BGPPath = BGPPathFromProtoBGPPath(ar.Paths[i].BgpPath, dedup)
case api.Path_Static:
p.Type = StaticPathType
p.StaticPath = StaticPathFromProtoStaticPath(ar.Paths[i].StaticPath, dedup)
}
r.paths = append(r.paths, p)
......
......@@ -58,3 +58,10 @@ func (s *StaticPath) ToProto() *api.StaticPath {
NextHop: s.NextHop.ToProto(),
}
}
// StaticPathFromProtoStaticPath converts a proto StaticPath to StaticPath
func StaticPathFromProtoStaticPath(pb *api.StaticPath, dedup bool) *StaticPath {
return &StaticPath{
NextHop: bnet.IPFromProtoIP(pb.NextHop),
}
}
......@@ -21,6 +21,7 @@ func Metrics(r *VRFRegistry) []*metrics.VRFMetrics {
func MetricsForVRF(v *VRF) *metrics.VRFMetrics {
m := &metrics.VRFMetrics{
Name: v.Name(),
RD: v.RD(),
RIBs: make([]*metrics.RIBMetrics, 0),
}
......
......@@ -5,6 +5,9 @@ type VRFMetrics struct {
// Name of the VRF
Name string
// RD is the route distinguisher
RD uint64
// RIBs returns the RIB specific metrics
RIBs []*RIBMetrics
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment