Skip to content
Snippets Groups Projects
Commit a1957517 authored by Oliver Herms's avatar Oliver Herms
Browse files

Addind RIB subscription service

parent a95c1982
No related branches found
No related tags found
No related merge requests found
...@@ -18,6 +18,7 @@ import ( ...@@ -18,6 +18,7 @@ import (
) )
type router struct { type router struct {
name string
address net.IP address net.IP
port uint16 port uint16
con net.Conn con net.Conn
...@@ -32,6 +33,9 @@ type router struct { ...@@ -32,6 +33,9 @@ type router struct {
logger *log.Logger logger *log.Logger
runMu sync.Mutex runMu sync.Mutex
stop chan struct{} stop chan struct{}
ribClients map[afiClient]struct{}
ribClientsMu sync.Mutex
} }
type neighbor struct { type neighbor struct {
...@@ -55,6 +59,64 @@ func newRouter(addr net.IP, port uint16, rib4 *locRIB.LocRIB, rib6 *locRIB.LocRI ...@@ -55,6 +59,64 @@ func newRouter(addr net.IP, port uint16, rib4 *locRIB.LocRIB, rib6 *locRIB.LocRI
neighbors: make(map[[16]byte]*neighbor), neighbors: make(map[[16]byte]*neighbor),
logger: log.New(), logger: log.New(),
stop: make(chan struct{}), stop: make(chan struct{}),
ribClients: make(map[afiClient]struct{}),
}
}
func (r *router) subscribeRIBs(client routingtable.RouteTableClient, afi uint8) {
ac := afiClient{
afi: afi,
client: client,
}
r.ribClientsMu.Lock()
defer r.ribClientsMu.Unlock()
if _, ok := r.ribClients[ac]; ok {
return
}
r.ribClients[ac] = struct{}{}
r.neighborsMu.Lock()
defer r.neighborsMu.Unlock()
for _, n := range r.neighbors {
/*if !n.fsm.ribsInitialized {
fmt.Printf("Uninitialized\n")
continue
}*/
if afi == packet.IPv4AFI {
n.fsm.ipv4Unicast.adjRIBIn.Register(client)
}
if afi == packet.IPv6AFI {
n.fsm.ipv6Unicast.adjRIBIn.Register(client)
}
}
}
func (r *router) unsubscribeRIBs(client routingtable.RouteTableClient, afi uint8) {
ac := afiClient{
afi: afi,
client: client,
}
r.ribClientsMu.Lock()
defer r.ribClientsMu.Unlock()
if _, ok := r.ribClients[ac]; !ok {
return
}
delete(r.ribClients, ac)
r.neighborsMu.Lock()
defer r.neighborsMu.Unlock()
for _, n := range r.neighbors {
if !n.fsm.ribsInitialized {
continue
}
if afi == packet.IPv4AFI {
n.fsm.ipv4Unicast.adjRIBIn.Unregister(client)
}
if afi == packet.IPv6AFI {
n.fsm.ipv6Unicast.adjRIBIn.Unregister(client)
}
} }
} }
...@@ -132,6 +194,7 @@ func (r *router) processInitiationMsg(msg *bmppkt.InitiationMessage) { ...@@ -132,6 +194,7 @@ func (r *router) processInitiationMsg(msg *bmppkt.InitiationMessage) {
case sysDescrType: case sysDescrType:
logMsg += fmt.Sprintf(" sysDescr.: %s", string(tlv.Information)) logMsg += fmt.Sprintf(" sysDescr.: %s", string(tlv.Information))
case sysNameType: case sysNameType:
r.name = string(tlv.Information)
logMsg += fmt.Sprintf(" sysName.: %s", string(tlv.Information)) logMsg += fmt.Sprintf(" sysName.: %s", string(tlv.Information))
} }
} }
...@@ -284,9 +347,25 @@ func (r *router) processPeerUpNotification(msg *bmppkt.PeerUpNotification) error ...@@ -284,9 +347,25 @@ func (r *router) processPeerUpNotification(msg *bmppkt.PeerUpNotification) error
} }
r.neighbors[msg.PerPeerHeader.PeerAddress] = n r.neighbors[msg.PerPeerHeader.PeerAddress] = n
r.ribClientsMu.Lock()
defer r.ribClientsMu.Unlock()
n.registerClients(r.ribClients)
return nil return nil
} }
func (n *neighbor) registerClients(clients map[afiClient]struct{}) {
for ac := range clients {
if ac.afi == packet.IPv4AFI {
n.fsm.ipv4Unicast.adjRIBIn.Unregister(ac.client)
}
if ac.afi == packet.IPv6AFI {
n.fsm.ipv6Unicast.adjRIBIn.Unregister(ac.client)
}
}
}
func (p *peer) configureBySentOpen(msg *packet.BGPOpen) { func (p *peer) configureBySentOpen(msg *packet.BGPOpen) {
caps := getCaps(msg.OptParams) caps := getCaps(msg.OptParams)
for _, cap := range caps { for _, cap := range caps {
......
...@@ -8,6 +8,7 @@ import ( ...@@ -8,6 +8,7 @@ import (
"time" "time"
bmppkt "github.com/bio-routing/bio-rd/protocols/bmp/packet" bmppkt "github.com/bio-routing/bio-rd/protocols/bmp/packet"
"github.com/bio-routing/bio-rd/routingtable"
"github.com/bio-routing/bio-rd/routingtable/locRIB" "github.com/bio-routing/bio-rd/routingtable/locRIB"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/taktv6/tflow2/convert" "github.com/taktv6/tflow2/convert"
...@@ -19,25 +20,79 @@ const ( ...@@ -19,25 +20,79 @@ const (
// BMPServer represents a BMP server // BMPServer represents a BMP server
type BMPServer struct { type BMPServer struct {
routers map[string]*router routers map[string]*router
routersMu sync.RWMutex ribClients map[string]map[afiClient]struct{}
reconnectTime uint gloablMu sync.RWMutex
}
type afiClient struct {
afi uint8
client routingtable.RouteTableClient
} }
// NewServer creates a new BMP server // NewServer creates a new BMP server
func NewServer() *BMPServer { func NewServer() *BMPServer {
return &BMPServer{ return &BMPServer{
routers: make(map[string]*router), routers: make(map[string]*router),
ribClients: make(map[string]map[afiClient]struct{}),
}
}
// SubscribeRIBs subscribes c for all RIB updates of router rtr
func (b *BMPServer) SubscribeRIBs(client routingtable.RouteTableClient, rtr net.IP, afi uint8) {
b.gloablMu.Lock()
defer b.gloablMu.Unlock()
rtrStr := rtr.String()
if _, ok := b.ribClients[rtrStr]; !ok {
b.ribClients[rtrStr] = make(map[afiClient]struct{})
} }
ac := afiClient{
afi: afi,
client: client,
}
if _, ok := b.ribClients[rtrStr][ac]; ok {
return
}
b.ribClients[rtrStr][ac] = struct{}{}
if _, ok := b.routers[rtrStr]; !ok {
return
}
b.routers[rtrStr].subscribeRIBs(client, afi)
}
// UnsubscribeRIBs unsubscribes client from RIBs of address family afi
func (b *BMPServer) UnsubscribeRIBs(client routingtable.RouteTableClient, rtr net.IP, afi uint8) {
b.gloablMu.Lock()
defer b.gloablMu.Unlock()
rtrStr := rtr.String()
if _, ok := b.ribClients[rtrStr]; !ok {
return
}
ac := afiClient{
afi: afi,
client: client,
}
if _, ok := b.ribClients[rtrStr][ac]; !ok {
return
}
b.routers[rtrStr].unsubscribeRIBs(client, afi)
} }
// AddRouter adds a router to which we connect with BMP // AddRouter adds a router to which we connect with BMP
func (b *BMPServer) AddRouter(addr net.IP, port uint16, rib4 *locRIB.LocRIB, rib6 *locRIB.LocRIB) { func (b *BMPServer) AddRouter(addr net.IP, port uint16, rib4 *locRIB.LocRIB, rib6 *locRIB.LocRIB) {
b.routersMu.Lock() b.gloablMu.Lock()
defer b.routersMu.Unlock() defer b.gloablMu.Unlock()
r := newRouter(addr, port, rib4, rib6) r := newRouter(addr, port, rib4, rib6)
b.routers[fmt.Sprintf("%s:%d", r.address.String(), r.port)] = r b.routers[fmt.Sprintf("%s", r.address.String())] = r
go func(r *router) { go func(r *router) {
for { for {
...@@ -63,8 +118,8 @@ func (b *BMPServer) AddRouter(addr net.IP, port uint16, rib4 *locRIB.LocRIB, rib ...@@ -63,8 +118,8 @@ func (b *BMPServer) AddRouter(addr net.IP, port uint16, rib4 *locRIB.LocRIB, rib
// RemoveRouter removes a BMP monitored router // RemoveRouter removes a BMP monitored router
func (b *BMPServer) RemoveRouter(addr net.IP, port uint16) { func (b *BMPServer) RemoveRouter(addr net.IP, port uint16) {
b.routersMu.Lock() b.gloablMu.Lock()
defer b.routersMu.Unlock() defer b.gloablMu.Unlock()
id := fmt.Sprintf("%s:%d", addr.String(), port) id := fmt.Sprintf("%s:%d", addr.String(), port)
r := b.routers[id] r := b.routers[id]
......
...@@ -13,7 +13,8 @@ import ( ...@@ -13,7 +13,8 @@ import (
func TestNewServer(t *testing.T) { func TestNewServer(t *testing.T) {
s := NewServer() s := NewServer()
assert.Equal(t, &BMPServer{ assert.Equal(t, &BMPServer{
routers: map[string]*router{}, routers: map[string]*router{},
ribClients: map[string]map[afiClient]struct{}{},
}, s) }, s)
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment