From a1957517e86f370b0bbe9b7ae68835abb9de79bc Mon Sep 17 00:00:00 2001
From: Oliver Herms <oliver.herms@exaring.de>
Date: Wed, 24 Oct 2018 13:33:18 +0200
Subject: [PATCH] Addind RIB subscription service

---
 protocols/bgp/server/bmp_router.go      | 79 +++++++++++++++++++++++++
 protocols/bgp/server/bmp_server.go      | 73 ++++++++++++++++++++---
 protocols/bgp/server/bmp_server_test.go |  3 +-
 3 files changed, 145 insertions(+), 10 deletions(-)

diff --git a/protocols/bgp/server/bmp_router.go b/protocols/bgp/server/bmp_router.go
index 6749368b..d911a2a2 100644
--- a/protocols/bgp/server/bmp_router.go
+++ b/protocols/bgp/server/bmp_router.go
@@ -18,6 +18,7 @@ import (
 )
 
 type router struct {
+	name             string
 	address          net.IP
 	port             uint16
 	con              net.Conn
@@ -32,6 +33,9 @@ type router struct {
 	logger           *log.Logger
 	runMu            sync.Mutex
 	stop             chan struct{}
+
+	ribClients   map[afiClient]struct{}
+	ribClientsMu sync.Mutex
 }
 
 type neighbor struct {
@@ -55,6 +59,64 @@ func newRouter(addr net.IP, port uint16, rib4 *locRIB.LocRIB, rib6 *locRIB.LocRI
 		neighbors:        make(map[[16]byte]*neighbor),
 		logger:           log.New(),
 		stop:             make(chan struct{}),
+		ribClients:       make(map[afiClient]struct{}),
+	}
+}
+
+func (r *router) subscribeRIBs(client routingtable.RouteTableClient, afi uint8) {
+	ac := afiClient{
+		afi:    afi,
+		client: client,
+	}
+
+	r.ribClientsMu.Lock()
+	defer r.ribClientsMu.Unlock()
+	if _, ok := r.ribClients[ac]; ok {
+		return
+	}
+	r.ribClients[ac] = struct{}{}
+
+	r.neighborsMu.Lock()
+	defer r.neighborsMu.Unlock()
+	for _, n := range r.neighbors {
+		/*if !n.fsm.ribsInitialized {
+			fmt.Printf("Uninitialized\n")
+			continue
+		}*/
+		if afi == packet.IPv4AFI {
+			n.fsm.ipv4Unicast.adjRIBIn.Register(client)
+		}
+		if afi == packet.IPv6AFI {
+			n.fsm.ipv6Unicast.adjRIBIn.Register(client)
+		}
+	}
+}
+
+func (r *router) unsubscribeRIBs(client routingtable.RouteTableClient, afi uint8) {
+	ac := afiClient{
+		afi:    afi,
+		client: client,
+	}
+
+	r.ribClientsMu.Lock()
+	defer r.ribClientsMu.Unlock()
+	if _, ok := r.ribClients[ac]; !ok {
+		return
+	}
+	delete(r.ribClients, ac)
+
+	r.neighborsMu.Lock()
+	defer r.neighborsMu.Unlock()
+	for _, n := range r.neighbors {
+		if !n.fsm.ribsInitialized {
+			continue
+		}
+		if afi == packet.IPv4AFI {
+			n.fsm.ipv4Unicast.adjRIBIn.Unregister(client)
+		}
+		if afi == packet.IPv6AFI {
+			n.fsm.ipv6Unicast.adjRIBIn.Unregister(client)
+		}
 	}
 }
 
@@ -132,6 +194,7 @@ func (r *router) processInitiationMsg(msg *bmppkt.InitiationMessage) {
 		case sysDescrType:
 			logMsg += fmt.Sprintf(" sysDescr.: %s", string(tlv.Information))
 		case sysNameType:
+			r.name = string(tlv.Information)
 			logMsg += fmt.Sprintf(" sysName.: %s", string(tlv.Information))
 		}
 	}
@@ -284,9 +347,25 @@ func (r *router) processPeerUpNotification(msg *bmppkt.PeerUpNotification) error
 	}
 
 	r.neighbors[msg.PerPeerHeader.PeerAddress] = n
+
+	r.ribClientsMu.Lock()
+	defer r.ribClientsMu.Unlock()
+	n.registerClients(r.ribClients)
+
 	return nil
 }
 
+func (n *neighbor) registerClients(clients map[afiClient]struct{}) {
+	for ac := range clients {
+		if ac.afi == packet.IPv4AFI {
+			n.fsm.ipv4Unicast.adjRIBIn.Unregister(ac.client)
+		}
+		if ac.afi == packet.IPv6AFI {
+			n.fsm.ipv6Unicast.adjRIBIn.Unregister(ac.client)
+		}
+	}
+}
+
 func (p *peer) configureBySentOpen(msg *packet.BGPOpen) {
 	caps := getCaps(msg.OptParams)
 	for _, cap := range caps {
diff --git a/protocols/bgp/server/bmp_server.go b/protocols/bgp/server/bmp_server.go
index 100e514c..33273672 100644
--- a/protocols/bgp/server/bmp_server.go
+++ b/protocols/bgp/server/bmp_server.go
@@ -8,6 +8,7 @@ import (
 	"time"
 
 	bmppkt "github.com/bio-routing/bio-rd/protocols/bmp/packet"
+	"github.com/bio-routing/bio-rd/routingtable"
 	"github.com/bio-routing/bio-rd/routingtable/locRIB"
 	log "github.com/sirupsen/logrus"
 	"github.com/taktv6/tflow2/convert"
@@ -19,25 +20,79 @@ const (
 
 // BMPServer represents a BMP server
 type BMPServer struct {
-	routers       map[string]*router
-	routersMu     sync.RWMutex
-	reconnectTime uint
+	routers    map[string]*router
+	ribClients map[string]map[afiClient]struct{}
+	gloablMu   sync.RWMutex
+}
+
+type afiClient struct {
+	afi    uint8
+	client routingtable.RouteTableClient
 }
 
 // NewServer creates a new BMP server
 func NewServer() *BMPServer {
 	return &BMPServer{
-		routers: make(map[string]*router),
+		routers:    make(map[string]*router),
+		ribClients: make(map[string]map[afiClient]struct{}),
+	}
+}
+
+// SubscribeRIBs subscribes c for all RIB updates of router rtr
+func (b *BMPServer) SubscribeRIBs(client routingtable.RouteTableClient, rtr net.IP, afi uint8) {
+	b.gloablMu.Lock()
+	defer b.gloablMu.Unlock()
+
+	rtrStr := rtr.String()
+	if _, ok := b.ribClients[rtrStr]; !ok {
+		b.ribClients[rtrStr] = make(map[afiClient]struct{})
 	}
+
+	ac := afiClient{
+		afi:    afi,
+		client: client,
+	}
+	if _, ok := b.ribClients[rtrStr][ac]; ok {
+		return
+	}
+
+	b.ribClients[rtrStr][ac] = struct{}{}
+
+	if _, ok := b.routers[rtrStr]; !ok {
+		return
+	}
+
+	b.routers[rtrStr].subscribeRIBs(client, afi)
+}
+
+// UnsubscribeRIBs unsubscribes client from RIBs of address family afi
+func (b *BMPServer) UnsubscribeRIBs(client routingtable.RouteTableClient, rtr net.IP, afi uint8) {
+	b.gloablMu.Lock()
+	defer b.gloablMu.Unlock()
+
+	rtrStr := rtr.String()
+	if _, ok := b.ribClients[rtrStr]; !ok {
+		return
+	}
+
+	ac := afiClient{
+		afi:    afi,
+		client: client,
+	}
+	if _, ok := b.ribClients[rtrStr][ac]; !ok {
+		return
+	}
+
+	b.routers[rtrStr].unsubscribeRIBs(client, afi)
 }
 
 // AddRouter adds a router to which we connect with BMP
 func (b *BMPServer) AddRouter(addr net.IP, port uint16, rib4 *locRIB.LocRIB, rib6 *locRIB.LocRIB) {
-	b.routersMu.Lock()
-	defer b.routersMu.Unlock()
+	b.gloablMu.Lock()
+	defer b.gloablMu.Unlock()
 
 	r := newRouter(addr, port, rib4, rib6)
-	b.routers[fmt.Sprintf("%s:%d", r.address.String(), r.port)] = r
+	b.routers[fmt.Sprintf("%s", r.address.String())] = r
 
 	go func(r *router) {
 		for {
@@ -63,8 +118,8 @@ func (b *BMPServer) AddRouter(addr net.IP, port uint16, rib4 *locRIB.LocRIB, rib
 
 // RemoveRouter removes a BMP monitored router
 func (b *BMPServer) RemoveRouter(addr net.IP, port uint16) {
-	b.routersMu.Lock()
-	defer b.routersMu.Unlock()
+	b.gloablMu.Lock()
+	defer b.gloablMu.Unlock()
 
 	id := fmt.Sprintf("%s:%d", addr.String(), port)
 	r := b.routers[id]
diff --git a/protocols/bgp/server/bmp_server_test.go b/protocols/bgp/server/bmp_server_test.go
index 34a1e27b..7b00c615 100644
--- a/protocols/bgp/server/bmp_server_test.go
+++ b/protocols/bgp/server/bmp_server_test.go
@@ -13,7 +13,8 @@ import (
 func TestNewServer(t *testing.T) {
 	s := NewServer()
 	assert.Equal(t, &BMPServer{
-		routers: map[string]*router{},
+		routers:    map[string]*router{},
+		ribClients: map[string]map[afiClient]struct{}{},
 	}, s)
 }
 
-- 
GitLab