diff --git a/cmd/ris-mirror/config/config.go b/cmd/ris-mirror/config/config.go new file mode 100644 index 0000000000000000000000000000000000000000..9d270b6913cb832c6b3510b385f57a43021be0be --- /dev/null +++ b/cmd/ris-mirror/config/config.go @@ -0,0 +1,55 @@ +package config + +import ( + "io/ioutil" + + "github.com/pkg/errors" + "gopkg.in/yaml.v2" +) + +// RISMirrorConfig is the config of RISMirror instance +type RISMirrorConfig struct { + RIBConfigs []RIBConfig `yaml:"ribs"` +} + +// RIBConfig is a RIB configuration +type RIBConfig struct { + Router string `yaml:"router"` + VRFs []string `yaml:"vrfs"` + IPVersions []uint8 `yaml:"IPVersions"` + SrcRISInstances []string `yaml:"source_ris_instances"` +} + +// LoadConfig loads a RISMirror config +func LoadConfig(filepath string) (*RISMirrorConfig, error) { + f, err := ioutil.ReadFile(filepath) + if err != nil { + return nil, errors.Wrap(err, "Unable to read config file") + } + + cfg := &RISMirrorConfig{} + err = yaml.Unmarshal(f, cfg) + if err != nil { + return nil, errors.Wrap(err, "Unmarshal failed") + } + + return cfg, nil +} + +// GetRISInstances returns a list of all RIS instances in the config +func (rismc *RISMirrorConfig) GetRISInstances() []string { + instances := make(map[string]struct{}) + + for _, r := range rismc.RIBConfigs { + for _, s := range r.SrcRISInstances { + instances[s] = struct{}{} + } + } + + ret := make([]string, 0) + for instance := range instances { + ret = append(ret, instance) + } + + return ret +} diff --git a/cmd/ris-mirror/main.go b/cmd/ris-mirror/main.go new file mode 100644 index 0000000000000000000000000000000000000000..f7f44ace2f35679cf6c76553fc5df787d408c458 --- /dev/null +++ b/cmd/ris-mirror/main.go @@ -0,0 +1,109 @@ +package main + +import ( + "flag" + "fmt" + "net" + "os" + "time" + + "github.com/bio-routing/bio-rd/cmd/ris-mirror/config" + "github.com/bio-routing/bio-rd/cmd/ris-mirror/rismirror" + "github.com/bio-routing/bio-rd/cmd/ris/risserver" + "github.com/bio-routing/bio-rd/routingtable/vrf" + "github.com/bio-routing/bio-rd/util/servicewrapper" + log "github.com/sirupsen/logrus" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" + + pb "github.com/bio-routing/bio-rd/cmd/ris/api" +) + +var ( + grpcPort = flag.Uint("grpc_port", 4321, "gRPC server port") + httpPort = flag.Uint("http_port", 4320, "HTTP server port") + grpcKeepaliveMinTime = flag.Uint("grpc_keepalive_min_time", 1, "Minimum time (seconds) for a client to wait between GRPC keepalive pings") + risTimeout = flag.Uint("ris_timeout", 5, "RIS timeout in seconds") + configFilePath = flag.String("config.file", "ris_mirror.yml", "Configuration file") +) + +func main() { + flag.Parse() + + cfg, err := config.LoadConfig(*configFilePath) + if err != nil { + log.WithError(err).Fatal("Failed to load config") + } + + fmt.Printf("Config loaded\n") + + risInstances := connectAllRISInstances(cfg.GetRISInstances()) + + fmt.Printf("Foo: %v\n", risInstances) + m := rismirror.New() + + for _, rcfg := range cfg.RIBConfigs { + for _, vrfHumanReadable := range rcfg.VRFs { + addr := net.ParseIP(rcfg.Router) + if addr == nil { + panic("Invalid address") + } + + vrfID, err := vrf.ParseHumanReadableRouteDistinguisher(vrfHumanReadable) + if err != nil { + panic(err) + } + + srcs := make([]*grpc.ClientConn, 0) + for _, srcInstance := range rcfg.SrcRISInstances { + srcs = append(srcs, risInstances[srcInstance]) + } + + fmt.Printf("Adding Target %v\n", rcfg.Router) + m.AddTarget(rcfg.Router, addr, vrfID, srcs) + } + } + + s := risserver.NewServer(m) + unaryInterceptors := []grpc.UnaryServerInterceptor{} + streamInterceptors := []grpc.StreamServerInterceptor{} + srv, err := servicewrapper.New( + uint16(*grpcPort), + servicewrapper.HTTP(uint16(*httpPort)), + unaryInterceptors, + streamInterceptors, + keepalive.EnforcementPolicy{ + MinTime: time.Duration(*grpcKeepaliveMinTime) * time.Second, + PermitWithoutStream: true, + }, + ) + if err != nil { + log.Errorf("failed to listen: %v", err) + os.Exit(1) + } + + pb.RegisterRoutingInformationServiceServer(srv.GRPC(), s) + if err := srv.Serve(); err != nil { + log.Fatalf("failed to start server: %v", err) + } +} + +func connectAllRISInstances(addrs []string) map[string]*grpc.ClientConn { + res := make(map[string]*grpc.ClientConn) + + for _, a := range addrs { + log.Infof("grpc.Dialing %q", a) + cc, err := grpc.Dial(a, grpc.WithInsecure(), grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: time.Second * 10, + Timeout: time.Second * time.Duration(*risTimeout), + PermitWithoutStream: true, + })) + if err != nil { + log.WithError(err).Errorf("grpc.Dial failed for %q", a) + } + + res[a] = cc + } + + return res +} diff --git a/cmd/ris-mirror/ris_mirror.yml b/cmd/ris-mirror/ris_mirror.yml new file mode 100644 index 0000000000000000000000000000000000000000..63e768f8f192ef2b09f89c7e6439c4c2f8c981a5 --- /dev/null +++ b/cmd/ris-mirror/ris_mirror.yml @@ -0,0 +1,4 @@ +ribs: + - router: 10.200.0.1 + vrfs: [51324:65201] + source_ris_instances: ["ncs01.lej01.srv.exaring.net:4321", "ncs02.lej01.srv.exaring.net:4321"] \ No newline at end of file diff --git a/cmd/ris-mirror/rismirror/mirror.go b/cmd/ris-mirror/rismirror/mirror.go new file mode 100644 index 0000000000000000000000000000000000000000..a40d4dd8b57eb05944b8c35f113663b1cc07adb0 --- /dev/null +++ b/cmd/ris-mirror/rismirror/mirror.go @@ -0,0 +1,62 @@ +package rismirror + +import ( + "fmt" + "net" + "sync" + + "github.com/bio-routing/bio-rd/cmd/ris-mirror/rtmirror" + "github.com/bio-routing/bio-rd/protocols/bgp/server" + "google.golang.org/grpc" +) + +type RISMirror struct { + routers map[string]server.RouterInterface + routersMu sync.Mutex +} + +// New creates a new RISMirror +func New() *RISMirror { + return &RISMirror{ + routers: make(map[string]server.RouterInterface), + } +} + +func (rism *RISMirror) AddTarget(rtrName string, address net.IP, vrf uint64, sources []*grpc.ClientConn) { + rism.routersMu.Lock() + defer rism.routersMu.Unlock() + + if _, exists := rism.routers[rtrName]; !exists { + rism.routers[rtrName] = newRouter(rtrName, address) + } + + v := rism.routers[rtrName].(*Router).vrfRegistry.CreateVRFIfNotExists(fmt.Sprintf("%d", vrf), vrf) + + rtmirror.New(sources, rtmirror.Config{ + Router: rtrName, + VRF: v, + }) +} + +// GetRouter gets a router +func (rism *RISMirror) GetRouter(rtr string) server.RouterInterface { + rism.routersMu.Lock() + defer rism.routersMu.Unlock() + + if _, exists := rism.routers[rtr]; !exists { + return nil + } + + return rism.routers[rtr] +} + +// GetRouters gets all routers +func (rism *RISMirror) GetRouters() []server.RouterInterface { + res := make([]server.RouterInterface, 0) + + for _, r := range rism.routers { + res = append(res, r) + } + + return res +} diff --git a/cmd/ris-mirror/rismirror/router.go b/cmd/ris-mirror/rismirror/router.go new file mode 100644 index 0000000000000000000000000000000000000000..4f1ded5c42f69ad71859c5e76ecc97235ddbfff4 --- /dev/null +++ b/cmd/ris-mirror/rismirror/router.go @@ -0,0 +1,42 @@ +package rismirror + +import ( + "net" + + "github.com/bio-routing/bio-rd/routingtable/vrf" +) + +// Router represents a router +type Router struct { + name string + address net.IP + vrfRegistry *vrf.VRFRegistry +} + +func newRouter(name string, address net.IP) *Router { + return &Router{ + name: name, + address: address, + vrfRegistry: vrf.NewVRFRegistry(), + } +} + +// Name gets the routers name +func (r *Router) Name() string { + return r.name +} + +// Address gets a routers address +func (r *Router) Address() net.IP { + return r.address +} + +// GetVRF gets a VRF by its ID +func (r *Router) GetVRF(vrfID uint64) *vrf.VRF { + return r.vrfRegistry.GetVRFByRD(vrfID) +} + +// GetVRFs gets all VRFs +func (r *Router) GetVRFs() []*vrf.VRF { + return r.vrfRegistry.List() +} diff --git a/cmd/ris-mirror/rtmirror/rtmirror.go b/cmd/ris-mirror/rtmirror/rtmirror.go index 810312d089989f613989bdacafcd2edd7878d40d..55ee1463738d7020725da27630013e0a103e0ec2 100644 --- a/cmd/ris-mirror/rtmirror/rtmirror.go +++ b/cmd/ris-mirror/rtmirror/rtmirror.go @@ -7,10 +7,12 @@ import ( "sync" risapi "github.com/bio-routing/bio-rd/cmd/ris/api" + bnet "github.com/bio-routing/bio-rd/net" "github.com/bio-routing/bio-rd/route" routeapi "github.com/bio-routing/bio-rd/route/api" - "github.com/bio-routing/bio-rd/routingtable" - "github.com/gogo/protobuf/proto" + "github.com/bio-routing/bio-rd/routingtable/locRIB" + "github.com/bio-routing/bio-rd/routingtable/vrf" + "github.com/golang/protobuf/proto" "github.com/pkg/errors" "google.golang.org/grpc" @@ -20,7 +22,7 @@ import ( // RTMirror provides an deduplicated mirror of a router/vrf/afi routing table from a multiple RIS instances type RTMirror struct { cfg Config - rt *routingtable.RoutingTable + vrf *vrf.VRF routes map[[20]byte]*routeContainer routesMu sync.Mutex grpcClients []*grpc.ClientConn @@ -30,9 +32,8 @@ type RTMirror struct { // Config is a route mirror config type Config struct { - Router string - VRF string - IPVersion uint8 + Router string + VRF *vrf.VRF } // New creates a new RTMirror and starts it @@ -40,14 +41,21 @@ func New(clientConns []*grpc.ClientConn, cfg Config) *RTMirror { rtm := &RTMirror{ cfg: cfg, routes: make(map[[20]byte]*routeContainer), - rt: routingtable.NewRoutingTable(), + vrf: cfg.VRF, grpcClients: clientConns, stop: make(chan struct{}), } - for _, ris := range rtm.grpcClients { - rtm.wg.Add(1) - go rtm.client(ris) + afis := []risapi.ObserveRIBRequest_AFISAFI{ + risapi.ObserveRIBRequest_IPv4Unicast, + risapi.ObserveRIBRequest_IPv6Unicast, + } + + for _, afi := range afis { + for _, ris := range rtm.grpcClients { + rtm.wg.Add(1) + go rtm.client(ris, afi) + } } return rtm @@ -75,19 +83,11 @@ func (rtm *RTMirror) Dispose() { rtm.wg.Wait() } -func (rtm *RTMirror) client(cc *grpc.ClientConn) { +func (rtm *RTMirror) client(cc *grpc.ClientConn, afi risapi.ObserveRIBRequest_AFISAFI) { defer rtm.wg.Done() risc := risapi.NewRoutingInformationServiceClient(cc) - var afisafi risapi.ObserveRIBRequest_AFISAFI - switch rtm.cfg.IPVersion { - case 4: - afisafi = risapi.ObserveRIBRequest_IPv4Unicast - case 6: - afisafi = risapi.ObserveRIBRequest_IPv6Unicast - } - for { if rtm.stopped() { return @@ -95,8 +95,8 @@ func (rtm *RTMirror) client(cc *grpc.ClientConn) { orc, err := risc.ObserveRIB(context.Background(), &risapi.ObserveRIBRequest{ Router: rtm.cfg.Router, - Vrf: rtm.cfg.VRF, - Afisafi: afisafi, + VrfId: rtm.cfg.VRF.RD(), + Afisafi: afi, }, grpc.WaitForReady(true)) if err != nil { log.WithError(err).Error("ObserveRIB call failed") @@ -165,15 +165,25 @@ func (rtm *RTMirror) addRoute(cc *grpc.ClientConn, r *routeapi.Route) { defer rtm.routesMu.Unlock() if _, exists := rtm.routes[h]; !exists { - rtm.routes[h] = newRouteContainer(r, cc) s := route.RouteFromProtoRoute(r, true) - rtm.rt.AddPath(s.Prefix(), s.Paths()[0]) + rib := rtm.getRIB(s.Prefix().Addr()) + + rtm.routes[h] = newRouteContainer(r, cc) + rib.AddPath(s.Prefix(), s.Paths()[0]) return } rtm.routes[h].addSource(cc) } +func (rtm *RTMirror) getRIB(addr *bnet.IP) *locRIB.LocRIB { + if addr.IsIPv4() { + return rtm.vrf.IPv4UnicastRIB() + } + + return rtm.vrf.IPv6UnicastRIB() +} + func (rtm *RTMirror) delRoute(cc *grpc.ClientConn, r *routeapi.Route) { h, err := hashRoute(r) if err != nil { @@ -199,13 +209,14 @@ func (rtm *RTMirror) _delRoute(h [20]byte, cc *grpc.ClientConn, r *routeapi.Rout } s := route.RouteFromProtoRoute(r, true) - rtm.rt.RemovePath(s.Prefix(), s.Paths()[0]) + rib := rtm.getRIB(s.Prefix().Addr()) + rib.RemovePath(s.Prefix(), s.Paths()[0]) delete(rtm.routes, h) } -// GetRoutingTable exposes the routing table mirrored -func (rtm *RTMirror) GetRoutingTable() *routingtable.RoutingTable { - return rtm.rt +// GetVRF exposes the mirrors VRF +func (rtm *RTMirror) GetVRF() *vrf.VRF { + return rtm.vrf } func hashRoute(route *routeapi.Route) ([20]byte, error) { diff --git a/cmd/ris/risserver/server.go b/cmd/ris/risserver/server.go index ba9230161def689ed94673d9e0a60d0c668581a4..07bd98d17f62e2ec81a8f947e829d322376dbc04 100644 --- a/cmd/ris/risserver/server.go +++ b/cmd/ris/risserver/server.go @@ -44,11 +44,11 @@ func init() { // Server represents an RoutingInformationService server type Server struct { - bmp *server.BMPServer + bmp server.BMPServerInterface } // NewServer creates a new server -func NewServer(b *server.BMPServer) *Server { +func NewServer(b server.BMPServerInterface) *Server { return &Server{ bmp: b, } diff --git a/protocols/bgp/server/bmp_router.go b/protocols/bgp/server/bmp_router.go index 5f93feb98a45e9a40c9340a825ffde5b0dfe7158..f4b9343d87a7cc48b906fbaf6d0399a968836791 100644 --- a/protocols/bgp/server/bmp_router.go +++ b/protocols/bgp/server/bmp_router.go @@ -20,6 +20,13 @@ import ( "github.com/bio-routing/tflow2/convert" ) +type RouterInterface interface { + Name() string + Address() net.IP + GetVRF(vrfID uint64) *vrf.VRF + GetVRFs() []*vrf.VRF +} + // Router represents a BMP enabled route in BMP context type Router struct { name string diff --git a/protocols/bgp/server/bmp_server.go b/protocols/bgp/server/bmp_server.go index 5fb1bb9b53be2c7c42056d91f7508efff46ac549..85caa197c2a6df0ca6cd9564dbefce86ad85c115 100644 --- a/protocols/bgp/server/bmp_server.go +++ b/protocols/bgp/server/bmp_server.go @@ -21,6 +21,11 @@ const ( defaultBufferLen = 4096 ) +type BMPServerInterface interface { + GetRouter(rtr string) RouterInterface + GetRouters() []RouterInterface +} + // BMPServer represents a BMP server type BMPServer struct { routers map[string]*Router @@ -170,11 +175,11 @@ func recvBMPMsg(c net.Conn) (msg []byte, err error) { } // GetRouters gets all routers -func (b *BMPServer) GetRouters() []*Router { +func (b *BMPServer) GetRouters() []RouterInterface { b.routersMu.RLock() defer b.routersMu.RUnlock() - r := make([]*Router, 0, len(b.routers)) + r := make([]RouterInterface, 0, len(b.routers)) for name := range b.routers { r = append(r, b.routers[name]) } @@ -183,7 +188,7 @@ func (b *BMPServer) GetRouters() []*Router { } // GetRouter gets a router -func (b *BMPServer) GetRouter(name string) *Router { +func (b *BMPServer) GetRouter(name string) RouterInterface { b.routersMu.RLock() defer b.routersMu.RUnlock()