diff --git a/cmd/ris-mirror/rtmirror/routecontainer.go b/cmd/ris-mirror/rtmirror/routecontainer.go new file mode 100644 index 0000000000000000000000000000000000000000..866db553d613c42392e13b627da100ec0af46a77 --- /dev/null +++ b/cmd/ris-mirror/rtmirror/routecontainer.go @@ -0,0 +1,47 @@ +package rtmirror + +import ( + routeapi "github.com/bio-routing/bio-rd/route/api" + "google.golang.org/grpc" +) + +// routeContainer groups a route with one ore multiple source the route was received from +type routeContainer struct { + route *routeapi.Route + sources []*grpc.ClientConn +} + +func newRouteContainer(route *routeapi.Route, source *grpc.ClientConn) *routeContainer { + return &routeContainer{ + route: route, + sources: []*grpc.ClientConn{source}, + } +} + +func (rc *routeContainer) addSource(cc *grpc.ClientConn) { + rc.sources = append(rc.sources, cc) +} + +func (rc *routeContainer) removeSource(cc *grpc.ClientConn) { + i := rc.getSourceIndex(cc) + if i < 0 { + return + } + + rc.sources[i] = rc.sources[len(rc.sources)-1] + rc.sources = rc.sources[:len(rc.sources)-1] +} + +func (rc *routeContainer) getSourceIndex(cc *grpc.ClientConn) int { + for i := range rc.sources { + if rc.sources[i] == cc { + return i + } + } + + return -1 +} + +func (rc *routeContainer) srcCount() int { + return len(rc.sources) +} diff --git a/cmd/ris-mirror/rtmirror/rtmirror.go b/cmd/ris-mirror/rtmirror/rtmirror.go new file mode 100644 index 0000000000000000000000000000000000000000..810312d089989f613989bdacafcd2edd7878d40d --- /dev/null +++ b/cmd/ris-mirror/rtmirror/rtmirror.go @@ -0,0 +1,227 @@ +package rtmirror + +import ( + "context" + "crypto/sha1" + "io" + "sync" + + risapi "github.com/bio-routing/bio-rd/cmd/ris/api" + "github.com/bio-routing/bio-rd/route" + routeapi "github.com/bio-routing/bio-rd/route/api" + "github.com/bio-routing/bio-rd/routingtable" + "github.com/gogo/protobuf/proto" + "github.com/pkg/errors" + "google.golang.org/grpc" + + log "github.com/sirupsen/logrus" +) + +// RTMirror provides an deduplicated mirror of a router/vrf/afi routing table from a multiple RIS instances +type RTMirror struct { + cfg Config + rt *routingtable.RoutingTable + routes map[[20]byte]*routeContainer + routesMu sync.Mutex + grpcClients []*grpc.ClientConn + stop chan struct{} + wg sync.WaitGroup +} + +// Config is a route mirror config +type Config struct { + Router string + VRF string + IPVersion uint8 +} + +// New creates a new RTMirror and starts it +func New(clientConns []*grpc.ClientConn, cfg Config) *RTMirror { + rtm := &RTMirror{ + cfg: cfg, + routes: make(map[[20]byte]*routeContainer), + rt: routingtable.NewRoutingTable(), + grpcClients: clientConns, + stop: make(chan struct{}), + } + + for _, ris := range rtm.grpcClients { + rtm.wg.Add(1) + go rtm.client(ris) + } + + return rtm +} + +func (rtm *RTMirror) addRIS(addr string) error { + cc, err := grpc.Dial(addr, grpc.WithInsecure()) + if err != nil { + return errors.Wrap(err, "grpc dial failed") + } + + rtm.grpcClients = append(rtm.grpcClients, cc) + + return nil +} + +// Dispose stops the RTMirror +func (rtm *RTMirror) Dispose() { + close(rtm.stop) + + for _, cc := range rtm.grpcClients { + cc.Close() + } + + rtm.wg.Wait() +} + +func (rtm *RTMirror) client(cc *grpc.ClientConn) { + defer rtm.wg.Done() + + risc := risapi.NewRoutingInformationServiceClient(cc) + + var afisafi risapi.ObserveRIBRequest_AFISAFI + switch rtm.cfg.IPVersion { + case 4: + afisafi = risapi.ObserveRIBRequest_IPv4Unicast + case 6: + afisafi = risapi.ObserveRIBRequest_IPv6Unicast + } + + for { + if rtm.stopped() { + return + } + + orc, err := risc.ObserveRIB(context.Background(), &risapi.ObserveRIBRequest{ + Router: rtm.cfg.Router, + Vrf: rtm.cfg.VRF, + Afisafi: afisafi, + }, grpc.WaitForReady(true)) + if err != nil { + log.WithError(err).Error("ObserveRIB call failed") + continue + } + + err = rtm.clientServiceLoop(cc, orc) + if err != nil { + log.WithError(err).Error("client service loop failed") + } + + rtm.dropRoutesFromRIS(cc) + } +} + +func (rtm *RTMirror) dropRoutesFromRIS(cc *grpc.ClientConn) { + rtm.routesMu.Lock() + defer rtm.routesMu.Unlock() + + for h, rc := range rtm.routes { + rtm._delRoute(h, cc, rc.route) + } +} + +func (rtm *RTMirror) stopped() bool { + select { + case <-rtm.stop: + return true + default: + return false + } +} + +func (rtm *RTMirror) clientServiceLoop(cc *grpc.ClientConn, orc risapi.RoutingInformationService_ObserveRIBClient) error { + for { + if rtm.stopped() { + return nil + } + + u, err := orc.Recv() + if err != nil { + if err == io.EOF { + return nil + } + + return errors.Wrap(err, "recv failed") + } + + if u.Advertisement { + rtm.addRoute(cc, u.Route) + continue + } + + rtm.delRoute(cc, u.Route) + } +} + +func (rtm *RTMirror) addRoute(cc *grpc.ClientConn, r *routeapi.Route) { + h, err := hashRoute(r) + if err != nil { + log.WithError(err).Error("Hashing failed") + return + } + + rtm.routesMu.Lock() + defer rtm.routesMu.Unlock() + + if _, exists := rtm.routes[h]; !exists { + rtm.routes[h] = newRouteContainer(r, cc) + s := route.RouteFromProtoRoute(r, true) + rtm.rt.AddPath(s.Prefix(), s.Paths()[0]) + return + } + + rtm.routes[h].addSource(cc) +} + +func (rtm *RTMirror) delRoute(cc *grpc.ClientConn, r *routeapi.Route) { + h, err := hashRoute(r) + if err != nil { + log.WithError(err).Error("Hashing failed") + return + } + + rtm.routesMu.Lock() + defer rtm.routesMu.Unlock() + + if _, exists := rtm.routes[h]; !exists { + return + } + + rtm._delRoute(h, cc, r) +} + +func (rtm *RTMirror) _delRoute(h [20]byte, cc *grpc.ClientConn, r *routeapi.Route) { + rtm.routes[h].removeSource(cc) + + if rtm.routes[h].srcCount() > 0 { + return + } + + s := route.RouteFromProtoRoute(r, true) + rtm.rt.RemovePath(s.Prefix(), s.Paths()[0]) + delete(rtm.routes, h) +} + +// GetRoutingTable exposes the routing table mirrored +func (rtm *RTMirror) GetRoutingTable() *routingtable.RoutingTable { + return rtm.rt +} + +func hashRoute(route *routeapi.Route) ([20]byte, error) { + m, err := proto.Marshal(route) + if err != nil { + return [20]byte{}, errors.Wrap(err, "Proto marshal failed") + } + + h := sha1.New() + _, err = h.Write(m) + if err != nil { + return [20]byte{}, errors.Wrap(err, "Write failed") + } + res := [20]byte{} + x := h.Sum(nil) + copy(res[:], x) + + return res, nil +} diff --git a/go.mod b/go.mod index e1077e5360c86f3f2c4173eb87a5a0ecde29cef2..90b88bc37c927d4540d990270c533e295f73d1fa 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ require ( github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 // indirect github.com/bio-routing/tflow2 v0.0.0-20181230153523-2e308a4a3c3a + github.com/gogo/protobuf v1.1.1 github.com/golang/protobuf v1.4.0 github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0