diff --git a/cmd/ris-mirror/config/config.go b/cmd/ris-mirror/config/config.go index 9d270b6913cb832c6b3510b385f57a43021be0be..6aa781f39c299a91abf9128e5a31f39d067857cc 100644 --- a/cmd/ris-mirror/config/config.go +++ b/cmd/ris-mirror/config/config.go @@ -1,25 +1,40 @@ package config import ( + "fmt" "io/ioutil" + "net" + "github.com/bio-routing/bio-rd/routingtable/vrf" "github.com/pkg/errors" "gopkg.in/yaml.v2" ) // RISMirrorConfig is the config of RISMirror instance type RISMirrorConfig struct { - RIBConfigs []RIBConfig `yaml:"ribs"` + RIBConfigs []*RIBConfig `yaml:"ribs"` } // RIBConfig is a RIB configuration type RIBConfig struct { - Router string `yaml:"router"` + Router string `yaml:"router"` + router net.IP VRFs []string `yaml:"vrfs"` + vrfs []uint64 IPVersions []uint8 `yaml:"IPVersions"` SrcRISInstances []string `yaml:"source_ris_instances"` } +// GetRouter gets a routers IP address +func (rc *RIBConfig) GetRouter() net.IP { + return rc.router +} + +// GetVRFs gets a routers VRFs +func (rc *RIBConfig) GetVRFs() []uint64 { + return rc.vrfs +} + // LoadConfig loads a RISMirror config func LoadConfig(filepath string) (*RISMirrorConfig, error) { f, err := ioutil.ReadFile(filepath) @@ -33,9 +48,44 @@ func LoadConfig(filepath string) (*RISMirrorConfig, error) { return nil, errors.Wrap(err, "Unmarshal failed") } + for _, rc := range cfg.RIBConfigs { + err := rc.loadRouter() + if err != nil { + return nil, errors.Wrap(err, "Unable to load router config") + } + + err = rc.loadVRFs() + if err != nil { + return nil, errors.Wrap(err, "Unable to load VRFs") + } + } + return cfg, nil } +func (r *RIBConfig) loadRouter() error { + addr := net.ParseIP(r.Router) + if addr == nil { + return fmt.Errorf("Unable to parse routers IP: %q", r.Router) + } + + r.router = addr + return nil +} + +func (r *RIBConfig) loadVRFs() error { + for _, vrfHuman := range r.VRFs { + vrfRD, err := vrf.ParseHumanReadableRouteDistinguisher(vrfHuman) + if err != nil { + return errors.Wrap(err, "Unable to parse VRF identifier") + } + + r.vrfs = append(r.vrfs, vrfRD) + } + + return nil +} + // GetRISInstances returns a list of all RIS instances in the config func (rismc *RISMirrorConfig) GetRISInstances() []string { instances := make(map[string]struct{}) diff --git a/cmd/ris-mirror/main.go b/cmd/ris-mirror/main.go index d7ff7ceee7d6b126e5db804811d8461bb71a62dc..7857d05683c13f337aabaf0e621c29163fce83db 100644 --- a/cmd/ris-mirror/main.go +++ b/cmd/ris-mirror/main.go @@ -2,7 +2,6 @@ package main import ( "flag" - "net" "os" "time" @@ -11,7 +10,7 @@ import ( pb "github.com/bio-routing/bio-rd/cmd/ris/api" "github.com/bio-routing/bio-rd/cmd/ris/risserver" prom_ris_mirror "github.com/bio-routing/bio-rd/metrics/ris-mirror/adapter/prom" - "github.com/bio-routing/bio-rd/routingtable/vrf" + "github.com/bio-routing/bio-rd/util/grpc/clientmanager" "github.com/bio-routing/bio-rd/util/servicewrapper" "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" @@ -35,28 +34,30 @@ func main() { log.WithError(err).Fatal("Failed to load config") } - risInstances := connectAllRISInstances(cfg.GetRISInstances()) + grpcClientManager := clientmanager.New() + for _, instance := range cfg.GetRISInstances() { + err := grpcClientManager.Add(instance, grpc.WithInsecure(), grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: time.Second * 10, + Timeout: time.Second * time.Duration(*risTimeout), + PermitWithoutStream: true, + })) + + if err != nil { + log.WithError(err).Fatal("GRPC clientmanager add failed") + } + } + m := rismirror.New() prometheus.MustRegister(prom_ris_mirror.NewCollector(m)) for _, rcfg := range cfg.RIBConfigs { - for _, vrfHumanReadable := range rcfg.VRFs { - addr := net.ParseIP(rcfg.Router) - if addr == nil { - panic("Invalid address") - } - - vrfID, err := vrf.ParseHumanReadableRouteDistinguisher(vrfHumanReadable) - if err != nil { - panic(err) - } - + for _, vrdRD := range rcfg.GetVRFs() { srcs := make([]*grpc.ClientConn, 0) for _, srcInstance := range rcfg.SrcRISInstances { - srcs = append(srcs, risInstances[srcInstance]) + srcs = append(srcs, grpcClientManager.Get(srcInstance)) } - m.AddTarget(rcfg.Router, addr, vrfID, srcs) + m.AddTarget(rcfg.Router, rcfg.GetRouter(), vrdRD, srcs) } } @@ -83,23 +84,3 @@ func main() { log.Fatalf("failed to start server: %v", err) } } - -func connectAllRISInstances(addrs []string) map[string]*grpc.ClientConn { - res := make(map[string]*grpc.ClientConn) - - for _, a := range addrs { - log.Infof("grpc.Dialing %q", a) - cc, err := grpc.Dial(a, grpc.WithInsecure(), grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: time.Second * 10, - Timeout: time.Second * time.Duration(*risTimeout), - PermitWithoutStream: true, - })) - if err != nil { - log.WithError(err).Errorf("grpc.Dial failed for %q", a) - } - - res[a] = cc - } - - return res -} diff --git a/cmd/ris-mirror/rismirror/mirror.go b/cmd/ris-mirror/rismirror/rismirror.go similarity index 79% rename from cmd/ris-mirror/rismirror/mirror.go rename to cmd/ris-mirror/rismirror/rismirror.go index dcdca7399117b0abc24f9ff1c43af388fb0adc5a..9da635da6fa3110a086ca521951e34f152791d1e 100644 --- a/cmd/ris-mirror/rismirror/mirror.go +++ b/cmd/ris-mirror/rismirror/rismirror.go @@ -1,11 +1,9 @@ package rismirror import ( - "fmt" "net" "sync" - "github.com/bio-routing/bio-rd/cmd/ris-mirror/rtmirror" "github.com/bio-routing/bio-rd/protocols/bgp/server" "github.com/bio-routing/bio-rd/protocols/ris/metrics" "github.com/bio-routing/bio-rd/routingtable/vrf" @@ -24,6 +22,7 @@ func New() *RISMirror { } } +// AddTarget adds a target to the RISMirror func (rism *RISMirror) AddTarget(rtrName string, address net.IP, vrfRD uint64, sources []*grpc.ClientConn) { rism.routersMu.Lock() defer rism.routersMu.Unlock() @@ -32,16 +31,8 @@ func (rism *RISMirror) AddTarget(rtrName string, address net.IP, vrfRD uint64, s rism.routers[rtrName] = newRouter(rtrName, address) } - v := rism.routers[rtrName].(*Router).vrfRegistry.GetVRFByRD(vrfRD) - if v == nil { - v = rism.routers[rtrName].(*Router).vrfRegistry.CreateVRFIfNotExists(fmt.Sprintf("%d", vrfRD), vrfRD) - rtm := rtmirror.New(rtmirror.Config{ - Router: rtrName, - VRF: v, - }) - - rism.routers[rtrName].(*Router).rtMirrors[vrfRD] = rtm - } + r := rism.routers[rtrName].(*Router) + r.addVRF(vrfRD, sources) } // GetRouter gets a router @@ -80,7 +71,6 @@ func (rism *RISMirror) Metrics() *metrics.RISMirrorMetrics { Address: r.Address(), SysName: r.Name(), VRFMetrics: vrf.Metrics(r.(*Router).vrfRegistry), - // TODO: RISUpstreamStatus: Fill In, } res.Routers = append(res.Routers, rm) diff --git a/cmd/ris-mirror/rismirror/router.go b/cmd/ris-mirror/rismirror/router.go index c8b4913fa2a8496d59ce2af6591d3f23b4428507..38a6fec83cd98a1c1e80510ae13d8ab981cdc9a1 100644 --- a/cmd/ris-mirror/rismirror/router.go +++ b/cmd/ris-mirror/rismirror/router.go @@ -1,10 +1,14 @@ package rismirror import ( + "fmt" "net" - "github.com/bio-routing/bio-rd/cmd/ris-mirror/rtmirror" + "github.com/bio-routing/bio-rd/risclient" "github.com/bio-routing/bio-rd/routingtable/vrf" + "google.golang.org/grpc" + + "github.com/bio-routing/bio-rd/cmd/ris/api" ) // Router represents a router @@ -12,9 +16,7 @@ type Router struct { name string address net.IP vrfRegistry *vrf.VRFRegistry - - // rtMirrors contains RTMirrors organized by VRF route distinguisher - rtMirrors map[uint64]*rtmirror.RTMirror + vrfs map[uint64]*_vrf } func newRouter(name string, address net.IP) *Router { @@ -22,7 +24,7 @@ func newRouter(name string, address net.IP) *Router { name: name, address: address, vrfRegistry: vrf.NewVRFRegistry(), - rtMirrors: make(map[uint64]*rtmirror.RTMirror), + vrfs: make(map[uint64]*_vrf), } } @@ -45,3 +47,30 @@ func (r *Router) GetVRF(vrfID uint64) *vrf.VRF { func (r *Router) GetVRFs() []*vrf.VRF { return r.vrfRegistry.List() } + +func (r *Router) addVRF(rd uint64, sources []*grpc.ClientConn) { + v := r.vrfRegistry.CreateVRFIfNotExists(fmt.Sprintf("%d", rd), rd) + + r.vrfs[rd] = newVRF(v.IPv4UnicastRIB(), v.IPv6UnicastRIB()) + + for _, src := range sources { + r.connectVRF(rd, src, 4) + r.connectVRF(rd, src, 6) + } +} + +func (r *Router) connectVRF(rd uint64, src *grpc.ClientConn, afi uint8) { + risclient.New(&risclient.Request{ + Router: r.name, + VRFRD: rd, + AFI: apiAFI(afi), + }, src, r.vrfs[rd].getRIB(afi)) +} + +func apiAFI(afi uint8) api.ObserveRIBRequest_AFISAFI { + if afi == 6 { + return api.ObserveRIBRequest_IPv6Unicast + } + + return api.ObserveRIBRequest_IPv4Unicast +} diff --git a/cmd/ris-mirror/rismirror/vrf.go b/cmd/ris-mirror/rismirror/vrf.go new file mode 100644 index 0000000000000000000000000000000000000000..40ddfb16d81249bb6b988a308877ef5bfead0bb7 --- /dev/null +++ b/cmd/ris-mirror/rismirror/vrf.go @@ -0,0 +1,26 @@ +package rismirror + +import ( + "github.com/bio-routing/bio-rd/routingtable/locRIB" + "github.com/bio-routing/bio-rd/routingtable/mergedlocrib" +) + +type _vrf struct { + ipv4Unicast *mergedlocrib.MergedLocRIB + ipv6Unicast *mergedlocrib.MergedLocRIB +} + +func newVRF(locRIBIPv4Unicast, locRIBIPv6Unicast *locRIB.LocRIB) *_vrf { + return &_vrf{ + ipv4Unicast: mergedlocrib.New(locRIBIPv4Unicast), + ipv6Unicast: mergedlocrib.New(locRIBIPv6Unicast), + } +} + +func (v *_vrf) getRIB(afi uint8) *mergedlocrib.MergedLocRIB { + if afi == 6 { + return v.ipv6Unicast + } + + return v.ipv4Unicast +} diff --git a/route/route.go b/route/route.go index 5c2b08ab2cdd671f10fd024d5a45db82285a71b7..8b3966f230220d0bffce8d90fe69757c2c41b82e 100644 --- a/route/route.go +++ b/route/route.go @@ -291,6 +291,11 @@ func RouteFromProtoRoute(ar *api.Route, dedup bool) *Route { } func (r *Route) updateEqualPathCount() { + if len(r.paths) == 0 { + r.ecmpPaths = 0 + return + } + count := uint(1) for i := 0; i < len(r.paths)-1; i++ { if !r.paths[i].ECMP(r.paths[i+1]) { diff --git a/routingtable/mergedrt/routecontainer.go b/routingtable/mergedrt/routecontainer.go deleted file mode 100644 index 281bfabb5b78f2fea0f5e3933ecfe99e6e3bcca5..0000000000000000000000000000000000000000 --- a/routingtable/mergedrt/routecontainer.go +++ /dev/null @@ -1,46 +0,0 @@ -package rtmirror - -import ( - routeapi "github.com/bio-routing/bio-rd/route/api" -) - -// routeContainer groups a route with one ore multiple source the route was received from -type routeContainer struct { - route *routeapi.Route - sources []interface{} -} - -func newRouteContainer(route *routeapi.Route, source interface{}) *routeContainer { - return &routeContainer{ - route: route, - sources: []interface{}{source}, - } -} - -func (rc *routeContainer) addSource(cc interface{}) { - rc.sources = append(rc.sources, cc) -} - -func (rc *routeContainer) removeSource(cc interface{}) { - i := rc.getSourceIndex(cc) - if i < 0 { - return - } - - rc.sources[i] = rc.sources[len(rc.sources)-1] - rc.sources = rc.sources[:len(rc.sources)-1] -} - -func (rc *routeContainer) getSourceIndex(cc interface{}) int { - for i := range rc.sources { - if rc.sources[i] == cc { - return i - } - } - - return -1 -} - -func (rc *routeContainer) srcCount() int { - return len(rc.sources) -} diff --git a/routingtable/mergedrt/rtmirror.go b/routingtable/mergedrt/rtmirror.go deleted file mode 100644 index fcf1a6400eea84cb4861e41e01d96869c7bee4b5..0000000000000000000000000000000000000000 --- a/routingtable/mergedrt/rtmirror.go +++ /dev/null @@ -1,106 +0,0 @@ -package rtmirror - -import ( - "crypto/sha1" - "sync" - - "github.com/bio-routing/bio-rd/route" - routeapi "github.com/bio-routing/bio-rd/route/api" - "github.com/bio-routing/bio-rd/routingtable" - "github.com/golang/protobuf/proto" - "github.com/pkg/errors" -) - -// RTMirror provides an deduplicated routing table -type RTMirror struct { - routes map[[20]byte]*routeContainer - routesMu sync.Mutex - rt *routingtable.RoutingTable -} - -// New creates a new RTMirror and starts it -func New(rt *routingtable.RoutingTable) *RTMirror { - return &RTMirror{ - routes: make(map[[20]byte]*routeContainer), - rt: rt, - } -} - -// DropRIS drops all routes learned from a RIS -func (rtm *RTMirror) DropRIS(cc interface{}) { - rtm.routesMu.Lock() - defer rtm.routesMu.Unlock() - - for h, rc := range rtm.routes { - rtm._delRoute(h, cc, rc.route) - } -} - -// AddRoute adds a route -func (rtm *RTMirror) AddRoute(cc interface{}, r *routeapi.Route) error { - h, err := hashRoute(r) - if err != nil { - return errors.Wrap(err, "Hashing failed") - } - - rtm.routesMu.Lock() - defer rtm.routesMu.Unlock() - - if _, exists := rtm.routes[h]; !exists { - s := route.RouteFromProtoRoute(r, true) - rtm.routes[h] = newRouteContainer(r, cc) - rtm.rt.AddPath(s.Prefix(), s.Paths()[0]) - return nil - } - - rtm.routes[h].addSource(cc) - return nil -} - -// RemoveRoute deletes a route -func (rtm *RTMirror) RemoveRoute(cc interface{}, r *routeapi.Route) error { - h, err := hashRoute(r) - if err != nil { - return errors.Wrap(err, "Hashing failed") - } - - rtm.routesMu.Lock() - defer rtm.routesMu.Unlock() - - if _, exists := rtm.routes[h]; !exists { - return nil - } - - rtm._delRoute(h, cc, r) - return nil -} - -func (rtm *RTMirror) _delRoute(h [20]byte, src interface{}, r *routeapi.Route) { - rtm.routes[h].removeSource(src) - - if rtm.routes[h].srcCount() > 0 { - return - } - - s := route.RouteFromProtoRoute(r, true) - rtm.rt.RemovePath(s.Prefix(), s.Paths()[0]) - delete(rtm.routes, h) -} - -func hashRoute(route *routeapi.Route) ([20]byte, error) { - m, err := proto.Marshal(route) - if err != nil { - return [20]byte{}, errors.Wrap(err, "Proto marshal failed") - } - - h := sha1.New() - _, err = h.Write(m) - if err != nil { - return [20]byte{}, errors.Wrap(err, "Write failed") - } - res := [20]byte{} - x := h.Sum(nil) - copy(res[:], x) - - return res, nil -} diff --git a/routingtable/mergedrt/rtmirror_test.go b/routingtable/mergedrt/rtmirror_test.go deleted file mode 100644 index 4280f12ac52f1ce2ce67228acc1e04f9ce51289a..0000000000000000000000000000000000000000 --- a/routingtable/mergedrt/rtmirror_test.go +++ /dev/null @@ -1,226 +0,0 @@ -package rtmirror - -import ( - "testing" - - "github.com/bio-routing/bio-rd/route" - routeapi "github.com/bio-routing/bio-rd/route/api" - "github.com/bio-routing/bio-rd/routingtable" - "github.com/stretchr/testify/assert" - - bnet "github.com/bio-routing/bio-rd/net" -) - -type srcRouteTuple struct { - src interface{} - route *routeapi.Route -} - -func TestRTMirror(t *testing.T) { - tests := []struct { - name string - add []*srcRouteTuple - expectedAfterAdd []*route.Route - remove []*srcRouteTuple - expectedAfterRemove []*route.Route - }{ - { - name: "Test #1: Single source", - add: []*srcRouteTuple{ - { - src: "a", - route: &routeapi.Route{ - Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(), - Paths: []*routeapi.Path{ - { - Type: routeapi.Path_Static, - StaticPath: &routeapi.StaticPath{ - NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(), - }, - }, - }, - }, - }, - }, - expectedAfterAdd: []*route.Route{ - route.NewRoute(bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).Ptr(), &route.Path{ - Type: route.StaticPathType, - StaticPath: &route.StaticPath{ - NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).Ptr(), - }, - }), - }, - remove: []*srcRouteTuple{ - { - src: "a", - route: &routeapi.Route{ - Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(), - Paths: []*routeapi.Path{ - { - Type: routeapi.Path_Static, - StaticPath: &routeapi.StaticPath{ - NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(), - }, - }, - }, - }, - }, - }, - expectedAfterRemove: []*route.Route{}, - }, - { - name: "Test #2: Multiple source, single delete", - add: []*srcRouteTuple{ - { - src: "a", - route: &routeapi.Route{ - Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(), - Paths: []*routeapi.Path{ - { - Type: routeapi.Path_Static, - StaticPath: &routeapi.StaticPath{ - NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(), - }, - }, - }, - }, - }, - { - src: "b", - route: &routeapi.Route{ - Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(), - Paths: []*routeapi.Path{ - { - Type: routeapi.Path_Static, - StaticPath: &routeapi.StaticPath{ - NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(), - }, - }, - }, - }, - }, - }, - expectedAfterAdd: []*route.Route{ - route.NewRoute(bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).Ptr(), &route.Path{ - Type: route.StaticPathType, - StaticPath: &route.StaticPath{ - NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).Ptr(), - }, - }), - }, - remove: []*srcRouteTuple{ - { - src: "a", - route: &routeapi.Route{ - Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(), - Paths: []*routeapi.Path{ - { - Type: routeapi.Path_Static, - StaticPath: &routeapi.StaticPath{ - NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(), - }, - }, - }, - }, - }, - }, - expectedAfterRemove: []*route.Route{ - route.NewRoute(bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).Ptr(), &route.Path{ - Type: route.StaticPathType, - StaticPath: &route.StaticPath{ - NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).Ptr(), - }, - }), - }, - }, - { - name: "Test #3: Multiple source, double delete", - add: []*srcRouteTuple{ - { - src: "a", - route: &routeapi.Route{ - Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(), - Paths: []*routeapi.Path{ - { - Type: routeapi.Path_Static, - StaticPath: &routeapi.StaticPath{ - NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(), - }, - }, - }, - }, - }, - { - src: "b", - route: &routeapi.Route{ - Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(), - Paths: []*routeapi.Path{ - { - Type: routeapi.Path_Static, - StaticPath: &routeapi.StaticPath{ - NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(), - }, - }, - }, - }, - }, - }, - expectedAfterAdd: []*route.Route{ - route.NewRoute(bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).Ptr(), &route.Path{ - Type: route.StaticPathType, - StaticPath: &route.StaticPath{ - NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).Ptr(), - }, - }), - }, - remove: []*srcRouteTuple{ - { - src: "a", - route: &routeapi.Route{ - Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(), - Paths: []*routeapi.Path{ - { - Type: routeapi.Path_Static, - StaticPath: &routeapi.StaticPath{ - NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(), - }, - }, - }, - }, - }, - { - src: "b", - route: &routeapi.Route{ - Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(), - Paths: []*routeapi.Path{ - { - Type: routeapi.Path_Static, - StaticPath: &routeapi.StaticPath{ - NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(), - }, - }, - }, - }, - }, - }, - expectedAfterRemove: []*route.Route{}, - }, - } - - for _, test := range tests { - rt := routingtable.NewRoutingTable() - rtm := New(rt) - - for _, a := range test.add { - rtm.AddRoute(a.src, a.route) - } - - assert.Equal(t, test.expectedAfterAdd, rt.Dump(), test.name) - - /*for _, r := range test.add { - rtm.RemoveRoute(r.src, r.route) - } - - assert.Equal(t, test.expectedAfterAdd, rt.Dump(), test.name)*/ - } -}