Skip to content
Snippets Groups Projects
Commit 2a3355c4 authored by Oliver Herms's avatar Oliver Herms
Browse files

More refactoring

parent 5a4c0a62
No related branches found
No related tags found
No related merge requests found
package config
import (
"fmt"
"io/ioutil"
"net"
"github.com/bio-routing/bio-rd/routingtable/vrf"
"github.com/pkg/errors"
"gopkg.in/yaml.v2"
)
// RISMirrorConfig is the config of RISMirror instance
type RISMirrorConfig struct {
RIBConfigs []RIBConfig `yaml:"ribs"`
RIBConfigs []*RIBConfig `yaml:"ribs"`
}
// RIBConfig is a RIB configuration
type RIBConfig struct {
Router string `yaml:"router"`
Router string `yaml:"router"`
router net.IP
VRFs []string `yaml:"vrfs"`
vrfs []uint64
IPVersions []uint8 `yaml:"IPVersions"`
SrcRISInstances []string `yaml:"source_ris_instances"`
}
// GetRouter gets a routers IP address
func (rc *RIBConfig) GetRouter() net.IP {
return rc.router
}
// GetVRFs gets a routers VRFs
func (rc *RIBConfig) GetVRFs() []uint64 {
return rc.vrfs
}
// LoadConfig loads a RISMirror config
func LoadConfig(filepath string) (*RISMirrorConfig, error) {
f, err := ioutil.ReadFile(filepath)
......@@ -33,9 +48,44 @@ func LoadConfig(filepath string) (*RISMirrorConfig, error) {
return nil, errors.Wrap(err, "Unmarshal failed")
}
for _, rc := range cfg.RIBConfigs {
err := rc.loadRouter()
if err != nil {
return nil, errors.Wrap(err, "Unable to load router config")
}
err = rc.loadVRFs()
if err != nil {
return nil, errors.Wrap(err, "Unable to load VRFs")
}
}
return cfg, nil
}
func (r *RIBConfig) loadRouter() error {
addr := net.ParseIP(r.Router)
if addr == nil {
return fmt.Errorf("Unable to parse routers IP: %q", r.Router)
}
r.router = addr
return nil
}
func (r *RIBConfig) loadVRFs() error {
for _, vrfHuman := range r.VRFs {
vrfRD, err := vrf.ParseHumanReadableRouteDistinguisher(vrfHuman)
if err != nil {
return errors.Wrap(err, "Unable to parse VRF identifier")
}
r.vrfs = append(r.vrfs, vrfRD)
}
return nil
}
// GetRISInstances returns a list of all RIS instances in the config
func (rismc *RISMirrorConfig) GetRISInstances() []string {
instances := make(map[string]struct{})
......
......@@ -2,7 +2,6 @@ package main
import (
"flag"
"net"
"os"
"time"
......@@ -11,7 +10,7 @@ import (
pb "github.com/bio-routing/bio-rd/cmd/ris/api"
"github.com/bio-routing/bio-rd/cmd/ris/risserver"
prom_ris_mirror "github.com/bio-routing/bio-rd/metrics/ris-mirror/adapter/prom"
"github.com/bio-routing/bio-rd/routingtable/vrf"
"github.com/bio-routing/bio-rd/util/grpc/clientmanager"
"github.com/bio-routing/bio-rd/util/servicewrapper"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
......@@ -35,28 +34,30 @@ func main() {
log.WithError(err).Fatal("Failed to load config")
}
risInstances := connectAllRISInstances(cfg.GetRISInstances())
grpcClientManager := clientmanager.New()
for _, instance := range cfg.GetRISInstances() {
err := grpcClientManager.Add(instance, grpc.WithInsecure(), grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Second * 10,
Timeout: time.Second * time.Duration(*risTimeout),
PermitWithoutStream: true,
}))
if err != nil {
log.WithError(err).Fatal("GRPC clientmanager add failed")
}
}
m := rismirror.New()
prometheus.MustRegister(prom_ris_mirror.NewCollector(m))
for _, rcfg := range cfg.RIBConfigs {
for _, vrfHumanReadable := range rcfg.VRFs {
addr := net.ParseIP(rcfg.Router)
if addr == nil {
panic("Invalid address")
}
vrfID, err := vrf.ParseHumanReadableRouteDistinguisher(vrfHumanReadable)
if err != nil {
panic(err)
}
for _, vrdRD := range rcfg.GetVRFs() {
srcs := make([]*grpc.ClientConn, 0)
for _, srcInstance := range rcfg.SrcRISInstances {
srcs = append(srcs, risInstances[srcInstance])
srcs = append(srcs, grpcClientManager.Get(srcInstance))
}
m.AddTarget(rcfg.Router, addr, vrfID, srcs)
m.AddTarget(rcfg.Router, rcfg.GetRouter(), vrdRD, srcs)
}
}
......@@ -83,23 +84,3 @@ func main() {
log.Fatalf("failed to start server: %v", err)
}
}
func connectAllRISInstances(addrs []string) map[string]*grpc.ClientConn {
res := make(map[string]*grpc.ClientConn)
for _, a := range addrs {
log.Infof("grpc.Dialing %q", a)
cc, err := grpc.Dial(a, grpc.WithInsecure(), grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Second * 10,
Timeout: time.Second * time.Duration(*risTimeout),
PermitWithoutStream: true,
}))
if err != nil {
log.WithError(err).Errorf("grpc.Dial failed for %q", a)
}
res[a] = cc
}
return res
}
package rismirror
import (
"fmt"
"net"
"sync"
"github.com/bio-routing/bio-rd/cmd/ris-mirror/rtmirror"
"github.com/bio-routing/bio-rd/protocols/bgp/server"
"github.com/bio-routing/bio-rd/protocols/ris/metrics"
"github.com/bio-routing/bio-rd/routingtable/vrf"
......@@ -24,6 +22,7 @@ func New() *RISMirror {
}
}
// AddTarget adds a target to the RISMirror
func (rism *RISMirror) AddTarget(rtrName string, address net.IP, vrfRD uint64, sources []*grpc.ClientConn) {
rism.routersMu.Lock()
defer rism.routersMu.Unlock()
......@@ -32,16 +31,8 @@ func (rism *RISMirror) AddTarget(rtrName string, address net.IP, vrfRD uint64, s
rism.routers[rtrName] = newRouter(rtrName, address)
}
v := rism.routers[rtrName].(*Router).vrfRegistry.GetVRFByRD(vrfRD)
if v == nil {
v = rism.routers[rtrName].(*Router).vrfRegistry.CreateVRFIfNotExists(fmt.Sprintf("%d", vrfRD), vrfRD)
rtm := rtmirror.New(rtmirror.Config{
Router: rtrName,
VRF: v,
})
rism.routers[rtrName].(*Router).rtMirrors[vrfRD] = rtm
}
r := rism.routers[rtrName].(*Router)
r.addVRF(vrfRD, sources)
}
// GetRouter gets a router
......@@ -80,7 +71,6 @@ func (rism *RISMirror) Metrics() *metrics.RISMirrorMetrics {
Address: r.Address(),
SysName: r.Name(),
VRFMetrics: vrf.Metrics(r.(*Router).vrfRegistry),
// TODO: RISUpstreamStatus: Fill In,
}
res.Routers = append(res.Routers, rm)
......
package rismirror
import (
"fmt"
"net"
"github.com/bio-routing/bio-rd/cmd/ris-mirror/rtmirror"
"github.com/bio-routing/bio-rd/risclient"
"github.com/bio-routing/bio-rd/routingtable/vrf"
"google.golang.org/grpc"
"github.com/bio-routing/bio-rd/cmd/ris/api"
)
// Router represents a router
......@@ -12,9 +16,7 @@ type Router struct {
name string
address net.IP
vrfRegistry *vrf.VRFRegistry
// rtMirrors contains RTMirrors organized by VRF route distinguisher
rtMirrors map[uint64]*rtmirror.RTMirror
vrfs map[uint64]*_vrf
}
func newRouter(name string, address net.IP) *Router {
......@@ -22,7 +24,7 @@ func newRouter(name string, address net.IP) *Router {
name: name,
address: address,
vrfRegistry: vrf.NewVRFRegistry(),
rtMirrors: make(map[uint64]*rtmirror.RTMirror),
vrfs: make(map[uint64]*_vrf),
}
}
......@@ -45,3 +47,30 @@ func (r *Router) GetVRF(vrfID uint64) *vrf.VRF {
func (r *Router) GetVRFs() []*vrf.VRF {
return r.vrfRegistry.List()
}
func (r *Router) addVRF(rd uint64, sources []*grpc.ClientConn) {
v := r.vrfRegistry.CreateVRFIfNotExists(fmt.Sprintf("%d", rd), rd)
r.vrfs[rd] = newVRF(v.IPv4UnicastRIB(), v.IPv6UnicastRIB())
for _, src := range sources {
r.connectVRF(rd, src, 4)
r.connectVRF(rd, src, 6)
}
}
func (r *Router) connectVRF(rd uint64, src *grpc.ClientConn, afi uint8) {
risclient.New(&risclient.Request{
Router: r.name,
VRFRD: rd,
AFI: apiAFI(afi),
}, src, r.vrfs[rd].getRIB(afi))
}
func apiAFI(afi uint8) api.ObserveRIBRequest_AFISAFI {
if afi == 6 {
return api.ObserveRIBRequest_IPv6Unicast
}
return api.ObserveRIBRequest_IPv4Unicast
}
package rismirror
import (
"github.com/bio-routing/bio-rd/routingtable/locRIB"
"github.com/bio-routing/bio-rd/routingtable/mergedlocrib"
)
type _vrf struct {
ipv4Unicast *mergedlocrib.MergedLocRIB
ipv6Unicast *mergedlocrib.MergedLocRIB
}
func newVRF(locRIBIPv4Unicast, locRIBIPv6Unicast *locRIB.LocRIB) *_vrf {
return &_vrf{
ipv4Unicast: mergedlocrib.New(locRIBIPv4Unicast),
ipv6Unicast: mergedlocrib.New(locRIBIPv6Unicast),
}
}
func (v *_vrf) getRIB(afi uint8) *mergedlocrib.MergedLocRIB {
if afi == 6 {
return v.ipv6Unicast
}
return v.ipv4Unicast
}
......@@ -291,6 +291,11 @@ func RouteFromProtoRoute(ar *api.Route, dedup bool) *Route {
}
func (r *Route) updateEqualPathCount() {
if len(r.paths) == 0 {
r.ecmpPaths = 0
return
}
count := uint(1)
for i := 0; i < len(r.paths)-1; i++ {
if !r.paths[i].ECMP(r.paths[i+1]) {
......
package rtmirror
import (
routeapi "github.com/bio-routing/bio-rd/route/api"
)
// routeContainer groups a route with one ore multiple source the route was received from
type routeContainer struct {
route *routeapi.Route
sources []interface{}
}
func newRouteContainer(route *routeapi.Route, source interface{}) *routeContainer {
return &routeContainer{
route: route,
sources: []interface{}{source},
}
}
func (rc *routeContainer) addSource(cc interface{}) {
rc.sources = append(rc.sources, cc)
}
func (rc *routeContainer) removeSource(cc interface{}) {
i := rc.getSourceIndex(cc)
if i < 0 {
return
}
rc.sources[i] = rc.sources[len(rc.sources)-1]
rc.sources = rc.sources[:len(rc.sources)-1]
}
func (rc *routeContainer) getSourceIndex(cc interface{}) int {
for i := range rc.sources {
if rc.sources[i] == cc {
return i
}
}
return -1
}
func (rc *routeContainer) srcCount() int {
return len(rc.sources)
}
package rtmirror
import (
"crypto/sha1"
"sync"
"github.com/bio-routing/bio-rd/route"
routeapi "github.com/bio-routing/bio-rd/route/api"
"github.com/bio-routing/bio-rd/routingtable"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
)
// RTMirror provides an deduplicated routing table
type RTMirror struct {
routes map[[20]byte]*routeContainer
routesMu sync.Mutex
rt *routingtable.RoutingTable
}
// New creates a new RTMirror and starts it
func New(rt *routingtable.RoutingTable) *RTMirror {
return &RTMirror{
routes: make(map[[20]byte]*routeContainer),
rt: rt,
}
}
// DropRIS drops all routes learned from a RIS
func (rtm *RTMirror) DropRIS(cc interface{}) {
rtm.routesMu.Lock()
defer rtm.routesMu.Unlock()
for h, rc := range rtm.routes {
rtm._delRoute(h, cc, rc.route)
}
}
// AddRoute adds a route
func (rtm *RTMirror) AddRoute(cc interface{}, r *routeapi.Route) error {
h, err := hashRoute(r)
if err != nil {
return errors.Wrap(err, "Hashing failed")
}
rtm.routesMu.Lock()
defer rtm.routesMu.Unlock()
if _, exists := rtm.routes[h]; !exists {
s := route.RouteFromProtoRoute(r, true)
rtm.routes[h] = newRouteContainer(r, cc)
rtm.rt.AddPath(s.Prefix(), s.Paths()[0])
return nil
}
rtm.routes[h].addSource(cc)
return nil
}
// RemoveRoute deletes a route
func (rtm *RTMirror) RemoveRoute(cc interface{}, r *routeapi.Route) error {
h, err := hashRoute(r)
if err != nil {
return errors.Wrap(err, "Hashing failed")
}
rtm.routesMu.Lock()
defer rtm.routesMu.Unlock()
if _, exists := rtm.routes[h]; !exists {
return nil
}
rtm._delRoute(h, cc, r)
return nil
}
func (rtm *RTMirror) _delRoute(h [20]byte, src interface{}, r *routeapi.Route) {
rtm.routes[h].removeSource(src)
if rtm.routes[h].srcCount() > 0 {
return
}
s := route.RouteFromProtoRoute(r, true)
rtm.rt.RemovePath(s.Prefix(), s.Paths()[0])
delete(rtm.routes, h)
}
func hashRoute(route *routeapi.Route) ([20]byte, error) {
m, err := proto.Marshal(route)
if err != nil {
return [20]byte{}, errors.Wrap(err, "Proto marshal failed")
}
h := sha1.New()
_, err = h.Write(m)
if err != nil {
return [20]byte{}, errors.Wrap(err, "Write failed")
}
res := [20]byte{}
x := h.Sum(nil)
copy(res[:], x)
return res, nil
}
package rtmirror
import (
"testing"
"github.com/bio-routing/bio-rd/route"
routeapi "github.com/bio-routing/bio-rd/route/api"
"github.com/bio-routing/bio-rd/routingtable"
"github.com/stretchr/testify/assert"
bnet "github.com/bio-routing/bio-rd/net"
)
type srcRouteTuple struct {
src interface{}
route *routeapi.Route
}
func TestRTMirror(t *testing.T) {
tests := []struct {
name string
add []*srcRouteTuple
expectedAfterAdd []*route.Route
remove []*srcRouteTuple
expectedAfterRemove []*route.Route
}{
{
name: "Test #1: Single source",
add: []*srcRouteTuple{
{
src: "a",
route: &routeapi.Route{
Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(),
Paths: []*routeapi.Path{
{
Type: routeapi.Path_Static,
StaticPath: &routeapi.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(),
},
},
},
},
},
},
expectedAfterAdd: []*route.Route{
route.NewRoute(bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).Ptr(), &route.Path{
Type: route.StaticPathType,
StaticPath: &route.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).Ptr(),
},
}),
},
remove: []*srcRouteTuple{
{
src: "a",
route: &routeapi.Route{
Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(),
Paths: []*routeapi.Path{
{
Type: routeapi.Path_Static,
StaticPath: &routeapi.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(),
},
},
},
},
},
},
expectedAfterRemove: []*route.Route{},
},
{
name: "Test #2: Multiple source, single delete",
add: []*srcRouteTuple{
{
src: "a",
route: &routeapi.Route{
Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(),
Paths: []*routeapi.Path{
{
Type: routeapi.Path_Static,
StaticPath: &routeapi.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(),
},
},
},
},
},
{
src: "b",
route: &routeapi.Route{
Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(),
Paths: []*routeapi.Path{
{
Type: routeapi.Path_Static,
StaticPath: &routeapi.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(),
},
},
},
},
},
},
expectedAfterAdd: []*route.Route{
route.NewRoute(bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).Ptr(), &route.Path{
Type: route.StaticPathType,
StaticPath: &route.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).Ptr(),
},
}),
},
remove: []*srcRouteTuple{
{
src: "a",
route: &routeapi.Route{
Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(),
Paths: []*routeapi.Path{
{
Type: routeapi.Path_Static,
StaticPath: &routeapi.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(),
},
},
},
},
},
},
expectedAfterRemove: []*route.Route{
route.NewRoute(bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).Ptr(), &route.Path{
Type: route.StaticPathType,
StaticPath: &route.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).Ptr(),
},
}),
},
},
{
name: "Test #3: Multiple source, double delete",
add: []*srcRouteTuple{
{
src: "a",
route: &routeapi.Route{
Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(),
Paths: []*routeapi.Path{
{
Type: routeapi.Path_Static,
StaticPath: &routeapi.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(),
},
},
},
},
},
{
src: "b",
route: &routeapi.Route{
Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(),
Paths: []*routeapi.Path{
{
Type: routeapi.Path_Static,
StaticPath: &routeapi.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(),
},
},
},
},
},
},
expectedAfterAdd: []*route.Route{
route.NewRoute(bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).Ptr(), &route.Path{
Type: route.StaticPathType,
StaticPath: &route.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).Ptr(),
},
}),
},
remove: []*srcRouteTuple{
{
src: "a",
route: &routeapi.Route{
Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(),
Paths: []*routeapi.Path{
{
Type: routeapi.Path_Static,
StaticPath: &routeapi.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(),
},
},
},
},
},
{
src: "b",
route: &routeapi.Route{
Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(),
Paths: []*routeapi.Path{
{
Type: routeapi.Path_Static,
StaticPath: &routeapi.StaticPath{
NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(),
},
},
},
},
},
},
expectedAfterRemove: []*route.Route{},
},
}
for _, test := range tests {
rt := routingtable.NewRoutingTable()
rtm := New(rt)
for _, a := range test.add {
rtm.AddRoute(a.src, a.route)
}
assert.Equal(t, test.expectedAfterAdd, rt.Dump(), test.name)
/*for _, r := range test.add {
rtm.RemoveRoute(r.src, r.route)
}
assert.Equal(t, test.expectedAfterAdd, rt.Dump(), test.name)*/
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment