Skip to content
Snippets Groups Projects
Unverified Commit 673eafda authored by cedi's avatar cedi Committed by GitHub
Browse files

Merge branch 'master' into feature/netlink

parents 5bf3640a 8092e1d8
No related branches found
No related tags found
No related merge requests found
Showing
with 3756 additions and 58 deletions
......@@ -6,7 +6,7 @@ go_library(
importpath = "github.com/bio-routing/bio-rd/examples/bmp",
visibility = ["//visibility:private"],
deps = [
"//protocols/bmp/server:go_default_library",
"//protocols/bgp/server:go_default_library",
"//routingtable/locRIB:go_default_library",
"//vendor/github.com/sirupsen/logrus:go_default_library",
],
......
......
......@@ -5,7 +5,7 @@ import (
"net"
"time"
"github.com/bio-routing/bio-rd/protocols/bmp/server"
"github.com/bio-routing/bio-rd/protocols/bgp/server"
"github.com/bio-routing/bio-rd/routingtable/locRIB"
"github.com/sirupsen/logrus"
)
......@@ -13,13 +13,14 @@ import (
func main() {
logrus.Printf("This is a BMP speaker\n")
rib := locRIB.New()
rib4 := locRIB.New()
rib6 := locRIB.New()
b := server.NewServer()
b.AddRouter(net.IP{127, 0, 0, 1}, 1234, rib, nil)
b.AddRouter(net.IP{10, 0, 255, 0}, 30119, rib4, rib6)
go func() {
for {
fmt.Printf("LocRIB count: %d\n", rib.Count())
fmt.Printf("LocRIB4 count: %d\n", rib4.Count())
time.Sleep(time.Second * 10)
}
}()
......
......
......@@ -30,7 +30,7 @@ func Decode(buf *bytes.Buffer, opt *DecodeOptions) (*BGPMessage, error) {
func decodeMsgBody(buf *bytes.Buffer, msgType uint8, l uint16, opt *DecodeOptions) (interface{}, error) {
switch msgType {
case OpenMsg:
return decodeOpenMsg(buf)
return DecodeOpenMsg(buf)
case UpdateMsg:
return decodeUpdateMsg(buf, l, opt)
case KeepaliveMsg:
......@@ -128,7 +128,8 @@ func invalidErrCode(n *BGPNotification) (*BGPNotification, error) {
return n, fmt.Errorf("Invalid error sub code: %d/%d", n.ErrorCode, n.ErrorSubcode)
}
func decodeOpenMsg(buf *bytes.Buffer) (*BGPOpen, error) {
// DecodeOpenMsg decodes a BGP OPEN message
func DecodeOpenMsg(buf *bytes.Buffer) (*BGPOpen, error) {
msg, err := _decodeOpenMsg(buf)
if err != nil {
return nil, fmt.Errorf("Unable to decode OPEN message: %v", err)
......@@ -241,13 +242,13 @@ func decodeCapability(buf *bytes.Buffer) (Capability, error) {
case AddPathCapabilityCode:
addPathCap, err := decodeAddPathCapability(buf)
if err != nil {
return cap, fmt.Errorf("Unable to decode add path capability")
return cap, fmt.Errorf("Unable to decode add path capability: %v", err)
}
cap.Value = addPathCap
case ASN4CapabilityCode:
asn4Cap, err := decodeASN4Capability(buf)
if err != nil {
return cap, fmt.Errorf("Unable to decode 4 octet ASN capability")
return cap, fmt.Errorf("Unable to decode 4 octet ASN capability: %v", err)
}
cap.Value = asn4Cap
default:
......
......
......@@ -60,6 +60,42 @@ func TestDecodeNLRIs(t *testing.T) {
}
}
func TestDecodeNLRIv6(t *testing.T) {
tests := []struct {
name string
input []byte
addPath bool
wantFail bool
expected *NLRI
}{
{
name: "IPv6 default",
input: []byte{
0,
},
wantFail: false,
expected: &NLRI{
Prefix: bnet.NewPfx(bnet.IPv6FromBlocks(0, 0, 0, 0, 0, 0, 0, 0), 0),
},
},
}
for _, test := range tests {
buf := bytes.NewBuffer(test.input)
res, _, err := decodeNLRI(buf, IPv6AFI, test.addPath)
if test.wantFail && err == nil {
t.Errorf("Expected error did not happen for test %q", test.name)
}
if !test.wantFail && err != nil {
t.Errorf("Unexpected failure for test %q: %v", test.name, err)
}
assert.Equal(t, test.expected, res)
}
}
func TestDecodeNLRI(t *testing.T) {
tests := []struct {
name string
......
......
......@@ -3,6 +3,8 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"bmp_router.go",
"bmp_server.go",
"fake_conn.go",
"fsm.go",
"fsm_active.go",
......@@ -29,6 +31,7 @@ go_library(
"//net:go_default_library",
"//protocols/bgp/packet:go_default_library",
"//protocols/bgp/types:go_default_library",
"//protocols/bmp/packet:go_default_library",
"//route:go_default_library",
"//routingtable:go_default_library",
"//routingtable/adjRIBIn:go_default_library",
......@@ -36,12 +39,15 @@ go_library(
"//routingtable/filter:go_default_library",
"//routingtable/locRIB:go_default_library",
"//vendor/github.com/sirupsen/logrus:go_default_library",
"//vendor/github.com/taktv6/tflow2/convert:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = [
"bmp_router_test.go",
"bmp_server_test.go",
"fsm_address_family_test.go",
"fsm_open_sent_test.go",
"fsm_test.go",
......@@ -55,11 +61,14 @@ go_test(
"//net:go_default_library",
"//protocols/bgp/packet:go_default_library",
"//protocols/bgp/types:go_default_library",
"//protocols/bmp/packet:go_default_library",
"//route:go_default_library",
"//routingtable:go_default_library",
"//routingtable/adjRIBIn:go_default_library",
"//routingtable/filter:go_default_library",
"//routingtable/locRIB:go_default_library",
"//testing:go_default_library",
"//vendor/github.com/sirupsen/logrus:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
)
package server
import (
"bytes"
"fmt"
"net"
"sync"
"time"
bnet "github.com/bio-routing/bio-rd/net"
"github.com/bio-routing/bio-rd/protocols/bgp/packet"
bmppkt "github.com/bio-routing/bio-rd/protocols/bmp/packet"
"github.com/bio-routing/bio-rd/routingtable"
"github.com/bio-routing/bio-rd/routingtable/filter"
"github.com/bio-routing/bio-rd/routingtable/locRIB"
log "github.com/sirupsen/logrus"
"github.com/taktv6/tflow2/convert"
)
type router struct {
name string
address net.IP
port uint16
con net.Conn
reconnectTimeMin int
reconnectTimeMax int
reconnectTime int
reconnectTimer *time.Timer
rib4 *locRIB.LocRIB
rib6 *locRIB.LocRIB
neighbors map[[16]byte]*neighbor
neighborsMu sync.Mutex
logger *log.Logger
runMu sync.Mutex
stop chan struct{}
ribClients map[afiClient]struct{}
ribClientsMu sync.Mutex
}
type neighbor struct {
localAS uint32
peerAS uint32
peerAddress [16]byte
routerID uint32
fsm *FSM
opt *packet.DecodeOptions
}
func newRouter(addr net.IP, port uint16, rib4 *locRIB.LocRIB, rib6 *locRIB.LocRIB) *router {
return &router{
address: addr,
port: port,
reconnectTimeMin: 30, // Suggested by RFC 7854
reconnectTimeMax: 720, // Suggested by RFC 7854
reconnectTimer: time.NewTimer(time.Duration(0)),
rib4: rib4,
rib6: rib6,
neighbors: make(map[[16]byte]*neighbor),
logger: log.New(),
stop: make(chan struct{}),
ribClients: make(map[afiClient]struct{}),
}
}
func (r *router) subscribeRIBs(client routingtable.RouteTableClient, afi uint8) {
ac := afiClient{
afi: afi,
client: client,
}
r.ribClientsMu.Lock()
defer r.ribClientsMu.Unlock()
if _, ok := r.ribClients[ac]; ok {
return
}
r.ribClients[ac] = struct{}{}
r.neighborsMu.Lock()
defer r.neighborsMu.Unlock()
for _, n := range r.neighbors {
if afi == packet.IPv4AFI {
n.fsm.ipv4Unicast.adjRIBIn.Register(client)
}
if afi == packet.IPv6AFI {
n.fsm.ipv6Unicast.adjRIBIn.Register(client)
}
}
}
func (r *router) unsubscribeRIBs(client routingtable.RouteTableClient, afi uint8) {
ac := afiClient{
afi: afi,
client: client,
}
r.ribClientsMu.Lock()
defer r.ribClientsMu.Unlock()
if _, ok := r.ribClients[ac]; !ok {
return
}
delete(r.ribClients, ac)
r.neighborsMu.Lock()
defer r.neighborsMu.Unlock()
for _, n := range r.neighbors {
if !n.fsm.ribsInitialized {
continue
}
if afi == packet.IPv4AFI {
n.fsm.ipv4Unicast.adjRIBIn.Unregister(client)
}
if afi == packet.IPv6AFI {
n.fsm.ipv6Unicast.adjRIBIn.Unregister(client)
}
}
}
func (r *router) serve(con net.Conn) {
r.con = con
r.runMu.Lock()
defer r.con.Close()
defer r.runMu.Unlock()
for {
select {
case <-r.stop:
return
default:
}
msg, err := recvBMPMsg(r.con)
if err != nil {
r.logger.Errorf("Unable to get message: %v", err)
return
}
bmpMsg, err := bmppkt.Decode(msg)
if err != nil {
r.logger.Errorf("Unable to decode BMP message: %v", err)
return
}
switch bmpMsg.MsgType() {
case bmppkt.PeerUpNotificationType:
err = r.processPeerUpNotification(bmpMsg.(*bmppkt.PeerUpNotification))
if err != nil {
r.logger.Errorf("Unable to process peer up notification: %v", err)
}
case bmppkt.PeerDownNotificationType:
r.processPeerDownNotification(bmpMsg.(*bmppkt.PeerDownNotification))
case bmppkt.InitiationMessageType:
r.processInitiationMsg(bmpMsg.(*bmppkt.InitiationMessage))
case bmppkt.TerminationMessageType:
r.processTerminationMsg(bmpMsg.(*bmppkt.TerminationMessage))
return
case bmppkt.RouteMonitoringType:
r.processRouteMonitoringMsg(bmpMsg.(*bmppkt.RouteMonitoringMsg))
}
}
}
func (r *router) processRouteMonitoringMsg(msg *bmppkt.RouteMonitoringMsg) {
r.neighborsMu.Lock()
defer r.neighborsMu.Unlock()
if _, ok := r.neighbors[msg.PerPeerHeader.PeerAddress]; !ok {
r.logger.Errorf("Received route monitoring message for non-existent neighbor %v on %s", msg.PerPeerHeader.PeerAddress, r.address.String())
return
}
n := r.neighbors[msg.PerPeerHeader.PeerAddress]
s := n.fsm.state.(*establishedState)
s.msgReceived(msg.BGPUpdate, s.fsm.decodeOptions())
}
func (r *router) processInitiationMsg(msg *bmppkt.InitiationMessage) {
const (
stringType = 0
sysDescrType = 1
sysNameType = 2
)
logMsg := fmt.Sprintf("Received initiation message from %s:", r.address.String())
for _, tlv := range msg.TLVs {
switch tlv.InformationType {
case stringType:
logMsg += fmt.Sprintf(" Message: %q", string(tlv.Information))
case sysDescrType:
logMsg += fmt.Sprintf(" sysDescr.: %s", string(tlv.Information))
case sysNameType:
r.name = string(tlv.Information)
logMsg += fmt.Sprintf(" sysName.: %s", string(tlv.Information))
}
}
r.logger.Info(logMsg)
}
func (r *router) processTerminationMsg(msg *bmppkt.TerminationMessage) {
const (
stringType = 0
reasonType = 1
adminDown = 0
unspecReason = 1
outOfRes = 2
redundantCon = 3
permAdminDown = 4
)
logMsg := fmt.Sprintf("Received termination message from %s: ", r.address.String())
for _, tlv := range msg.TLVs {
switch tlv.InformationType {
case stringType:
logMsg += fmt.Sprintf("Message: %q", string(tlv.Information))
case reasonType:
reason := convert.Uint16b(tlv.Information[:2])
switch reason {
case adminDown:
logMsg += "Session administratively down"
case unspecReason:
logMsg += "Unespcified reason"
case outOfRes:
logMsg += "Out of resources"
case redundantCon:
logMsg += "Redundant connection"
case permAdminDown:
logMsg += "Session permanently administratively closed"
}
}
}
r.logger.Warning(logMsg)
r.con.Close()
for n := range r.neighbors {
r.peerDown(n)
}
}
func (r *router) processPeerDownNotification(msg *bmppkt.PeerDownNotification) {
r.neighborsMu.Lock()
defer r.neighborsMu.Unlock()
if _, ok := r.neighbors[msg.PerPeerHeader.PeerAddress]; !ok {
r.logger.Warningf("Received peer down notification for %v: Peer doesn't exist.", msg.PerPeerHeader.PeerAddress)
return
}
r.peerDown(msg.PerPeerHeader.PeerAddress)
}
func (r *router) peerDown(addr [16]byte) {
if r.neighbors[addr].fsm != nil {
if r.neighbors[addr].fsm.ipv4Unicast != nil {
r.neighbors[addr].fsm.ipv4Unicast.bmpDispose()
}
if r.neighbors[addr].fsm.ipv6Unicast != nil {
r.neighbors[addr].fsm.ipv6Unicast.bmpDispose()
}
}
delete(r.neighbors, addr)
}
func (r *router) processPeerUpNotification(msg *bmppkt.PeerUpNotification) error {
r.neighborsMu.Lock()
defer r.neighborsMu.Unlock()
if _, ok := r.neighbors[msg.PerPeerHeader.PeerAddress]; ok {
return fmt.Errorf("Received peer up notification for %v: Peer exists already", msg.PerPeerHeader.PeerAddress)
}
if len(msg.SentOpenMsg) < packet.MinOpenLen {
return fmt.Errorf("Received peer up notification for %v: Invalid sent open message: %v", msg.PerPeerHeader.PeerAddress, msg.SentOpenMsg)
}
sentOpen, err := packet.DecodeOpenMsg(bytes.NewBuffer(msg.SentOpenMsg[packet.HeaderLen:]))
if err != nil {
return fmt.Errorf("Unable to decode sent open message sent from %v to %v: %v", r.address.String(), msg.PerPeerHeader.PeerAddress, err)
}
if len(msg.ReceivedOpenMsg) < packet.MinOpenLen {
return fmt.Errorf("Received peer up notification for %v: Invalid received open message: %v", msg.PerPeerHeader.PeerAddress, msg.ReceivedOpenMsg)
}
recvOpen, err := packet.DecodeOpenMsg(bytes.NewBuffer(msg.ReceivedOpenMsg[packet.HeaderLen:]))
if err != nil {
return fmt.Errorf("Unable to decode received open message sent from %v to %v: %v", msg.PerPeerHeader.PeerAddress, r.address.String(), err)
}
addrLen := net.IPv4len
if msg.PerPeerHeader.GetIPVersion() == 6 {
addrLen = net.IPv6len
}
// bnet.IPFromBytes can only fail if length of argument is not 4 or 16. However, length is ensured here.
peerAddress, _ := bnet.IPFromBytes(msg.PerPeerHeader.PeerAddress[16-addrLen:])
localAddress, _ := bnet.IPFromBytes(msg.LocalAddress[16-addrLen:])
fsm := &FSM{
isBMP: true,
peer: &peer{
routerID: sentOpen.BGPIdentifier,
addr: peerAddress,
localAddr: localAddress,
peerASN: msg.PerPeerHeader.PeerAS,
localASN: uint32(sentOpen.ASN),
ipv4: &peerAddressFamily{},
ipv6: &peerAddressFamily{},
},
}
fsm.peer.configureBySentOpen(sentOpen)
fsm.ipv4Unicast = newFSMAddressFamily(packet.IPv4AFI, packet.UnicastSAFI, &peerAddressFamily{
rib: r.rib4,
importFilter: filter.NewAcceptAllFilter(),
}, fsm)
fsm.ipv4Unicast.bmpInit()
fsm.ipv6Unicast = newFSMAddressFamily(packet.IPv6AFI, packet.UnicastSAFI, &peerAddressFamily{
rib: r.rib6,
importFilter: filter.NewAcceptAllFilter(),
}, fsm)
fsm.ipv6Unicast.bmpInit()
fsm.state = newOpenSentState(fsm)
openSent := fsm.state.(*openSentState)
openSent.openMsgReceived(recvOpen)
fsm.state = newEstablishedState(fsm)
n := &neighbor{
localAS: fsm.peer.localASN,
peerAS: msg.PerPeerHeader.PeerAS,
peerAddress: msg.PerPeerHeader.PeerAddress,
routerID: recvOpen.BGPIdentifier,
fsm: fsm,
opt: fsm.decodeOptions(),
}
r.neighbors[msg.PerPeerHeader.PeerAddress] = n
r.ribClientsMu.Lock()
defer r.ribClientsMu.Unlock()
n.registerClients(r.ribClients)
return nil
}
func (n *neighbor) registerClients(clients map[afiClient]struct{}) {
for ac := range clients {
if ac.afi == packet.IPv4AFI {
n.fsm.ipv4Unicast.adjRIBIn.Register(ac.client)
}
if ac.afi == packet.IPv6AFI {
n.fsm.ipv6Unicast.adjRIBIn.Register(ac.client)
}
}
}
func (p *peer) configureBySentOpen(msg *packet.BGPOpen) {
caps := getCaps(msg.OptParams)
for _, cap := range caps {
switch cap.Code {
case packet.AddPathCapabilityCode:
addPathCap := cap.Value.(packet.AddPathCapability)
peerFamily := p.addressFamily(addPathCap.AFI, addPathCap.SAFI)
if peerFamily == nil {
continue
}
switch addPathCap.SendReceive {
case packet.AddPathSend:
peerFamily.addPathSend = routingtable.ClientOptions{
MaxPaths: 10,
}
case packet.AddPathReceive:
peerFamily.addPathReceive = true
case packet.AddPathSendReceive:
peerFamily.addPathReceive = true
peerFamily.addPathSend = routingtable.ClientOptions{
MaxPaths: 10,
}
}
case packet.ASN4CapabilityCode:
asn4Cap := cap.Value.(packet.ASN4Capability)
p.localASN = asn4Cap.ASN4
}
}
}
func getCaps(optParams []packet.OptParam) packet.Capabilities {
for _, optParam := range optParams {
if optParam.Type != packet.CapabilitiesParamType {
continue
}
return optParam.Value.(packet.Capabilities)
}
return nil
}
This diff is collapsed.
package server
import (
"fmt"
"io"
"net"
"sync"
"time"
bmppkt "github.com/bio-routing/bio-rd/protocols/bmp/packet"
"github.com/bio-routing/bio-rd/routingtable"
"github.com/bio-routing/bio-rd/routingtable/locRIB"
log "github.com/sirupsen/logrus"
"github.com/taktv6/tflow2/convert"
)
const (
defaultBufferLen = 4096
)
// BMPServer represents a BMP server
type BMPServer struct {
routers map[string]*router
ribClients map[string]map[afiClient]struct{}
gloablMu sync.RWMutex
}
type afiClient struct {
afi uint8
client routingtable.RouteTableClient
}
// NewServer creates a new BMP server
func NewServer() *BMPServer {
return &BMPServer{
routers: make(map[string]*router),
ribClients: make(map[string]map[afiClient]struct{}),
}
}
// SubscribeRIBs subscribes c for all RIB updates of router rtr
func (b *BMPServer) SubscribeRIBs(client routingtable.RouteTableClient, rtr net.IP, afi uint8) {
b.gloablMu.Lock()
defer b.gloablMu.Unlock()
rtrStr := rtr.String()
if _, ok := b.ribClients[rtrStr]; !ok {
b.ribClients[rtrStr] = make(map[afiClient]struct{})
}
ac := afiClient{
afi: afi,
client: client,
}
if _, ok := b.ribClients[rtrStr][ac]; ok {
return
}
b.ribClients[rtrStr][ac] = struct{}{}
if _, ok := b.routers[rtrStr]; !ok {
return
}
b.routers[rtrStr].subscribeRIBs(client, afi)
}
// UnsubscribeRIBs unsubscribes client from RIBs of address family afi
func (b *BMPServer) UnsubscribeRIBs(client routingtable.RouteTableClient, rtr net.IP, afi uint8) {
b.gloablMu.Lock()
defer b.gloablMu.Unlock()
rtrStr := rtr.String()
if _, ok := b.ribClients[rtrStr]; !ok {
return
}
ac := afiClient{
afi: afi,
client: client,
}
if _, ok := b.ribClients[rtrStr][ac]; !ok {
return
}
delete(b.ribClients[rtrStr], ac)
b.routers[rtrStr].unsubscribeRIBs(client, afi)
}
// AddRouter adds a router to which we connect with BMP
func (b *BMPServer) AddRouter(addr net.IP, port uint16, rib4 *locRIB.LocRIB, rib6 *locRIB.LocRIB) {
b.gloablMu.Lock()
defer b.gloablMu.Unlock()
r := newRouter(addr, port, rib4, rib6)
b.routers[fmt.Sprintf("%s", r.address.String())] = r
go func(r *router) {
for {
<-r.reconnectTimer.C
c, err := net.Dial("tcp", fmt.Sprintf("%s:%d", r.address.String(), r.port))
if err != nil {
log.Infof("Unable to connect to BMP router: %v", err)
if r.reconnectTime == 0 {
r.reconnectTime = r.reconnectTimeMin
} else if r.reconnectTime < r.reconnectTimeMax {
r.reconnectTime *= 2
}
r.reconnectTimer = time.NewTimer(time.Second * time.Duration(r.reconnectTime))
continue
}
r.reconnectTime = 0
log.Infof("Connected to %s", r.address.String())
r.serve(c)
}
}(r)
}
// RemoveRouter removes a BMP monitored router
func (b *BMPServer) RemoveRouter(addr net.IP, port uint16) {
b.gloablMu.Lock()
defer b.gloablMu.Unlock()
id := addr.String()
r := b.routers[id]
r.stop <- struct{}{}
delete(b.routers, id)
}
func recvBMPMsg(c net.Conn) (msg []byte, err error) {
buffer := make([]byte, defaultBufferLen)
_, err = io.ReadFull(c, buffer[0:bmppkt.MinLen])
if err != nil {
return nil, fmt.Errorf("Read failed: %v", err)
}
l := convert.Uint32b(buffer[1:5])
if l > defaultBufferLen {
tmp := buffer
buffer = make([]byte, l)
copy(buffer, tmp)
}
toRead := l
_, err = io.ReadFull(c, buffer[bmppkt.MinLen:toRead])
if err != nil {
return nil, fmt.Errorf("Read failed: %v", err)
}
return buffer[0:toRead], nil
}
package server
import (
"net"
"testing"
"github.com/bio-routing/bio-rd/protocols/bgp/packet"
"github.com/stretchr/testify/assert"
)
func TestNewServer(t *testing.T) {
s := NewServer()
assert.Equal(t, &BMPServer{
routers: map[string]*router{},
ribClients: map[string]map[afiClient]struct{}{},
}, s)
}
func TestSubscribeRIBs(t *testing.T) {
tests := []struct {
name string
srv *BMPServer
expected *BMPServer
}{
{
name: "Test without routers with clients",
srv: &BMPServer{
routers: make(map[string]*router),
ribClients: map[string]map[afiClient]struct{}{
"20.30.40.50": map[afiClient]struct{}{
afiClient{
afi: packet.IPv4AFI,
client: nil,
}: struct{}{},
},
},
},
expected: &BMPServer{
routers: make(map[string]*router),
ribClients: map[string]map[afiClient]struct{}{
"20.30.40.50": map[afiClient]struct{}{
afiClient{
afi: packet.IPv4AFI,
client: nil,
}: struct{}{},
},
"10.20.30.40": map[afiClient]struct{}{
afiClient{
afi: packet.IPv4AFI,
client: nil,
}: struct{}{},
},
},
},
},
{
name: "Test with routers no clients",
srv: &BMPServer{
routers: map[string]*router{
"10.20.30.40": &router{
ribClients: make(map[afiClient]struct{}),
},
},
ribClients: map[string]map[afiClient]struct{}{},
},
expected: &BMPServer{
routers: map[string]*router{
"10.20.30.40": &router{
ribClients: map[afiClient]struct{}{
afiClient{
afi: packet.IPv4AFI,
client: nil,
}: struct{}{},
},
},
},
ribClients: map[string]map[afiClient]struct{}{
"10.20.30.40": map[afiClient]struct{}{
afiClient{
afi: packet.IPv4AFI,
client: nil,
}: struct{}{},
},
},
},
},
}
for _, test := range tests {
test.srv.SubscribeRIBs(nil, net.IP{10, 20, 30, 40}, packet.IPv4AFI)
assert.Equalf(t, test.expected, test.srv, "Test %q", test.name)
}
}
func TestUnsubscribeRIBs(t *testing.T) {
tests := []struct {
name string
srv *BMPServer
expected *BMPServer
}{
{
name: "Unsubscribe existing from router",
srv: &BMPServer{
routers: map[string]*router{
"10.20.30.40": &router{
ribClients: map[afiClient]struct{}{
afiClient{
afi: packet.IPv4AFI,
client: nil,
}: struct{}{},
},
},
"20.30.40.50": &router{
ribClients: map[afiClient]struct{}{
afiClient{
afi: packet.IPv4AFI,
client: nil,
}: struct{}{},
},
},
},
ribClients: map[string]map[afiClient]struct{}{
"20.30.40.50": map[afiClient]struct{}{
afiClient{
afi: packet.IPv4AFI,
client: nil,
}: struct{}{},
},
"10.20.30.40": map[afiClient]struct{}{
afiClient{
afi: packet.IPv4AFI,
client: nil,
}: struct{}{},
},
},
},
expected: &BMPServer{
routers: map[string]*router{
"10.20.30.40": &router{
ribClients: map[afiClient]struct{}{},
},
"20.30.40.50": &router{
ribClients: map[afiClient]struct{}{
afiClient{
afi: packet.IPv4AFI,
client: nil,
}: struct{}{},
},
},
},
ribClients: map[string]map[afiClient]struct{}{
"20.30.40.50": map[afiClient]struct{}{
afiClient{
afi: packet.IPv4AFI,
client: nil,
}: struct{}{},
},
"10.20.30.40": map[afiClient]struct{}{
},
},
},
},
{
name: "Unsubscribe existing from non-router",
srv: &BMPServer{
routers: map[string]*router{
"10.20.30.60": &router{
ribClients: map[afiClient]struct{}{
afiClient{
afi: packet.IPv4AFI,
client: nil,
}: struct{}{},
},
},
"20.30.40.50": &router{
ribClients: map[afiClient]struct{}{
afiClient{
afi: packet.IPv4AFI,
client: nil,
}: struct{}{},
},
},
},
ribClients: map[string]map[afiClient]struct{}{
"20.30.40.50": map[afiClient]struct{}{
afiClient{
afi: packet.IPv4AFI,
client: nil,
}: struct{}{},
},
"10.20.30.60": map[afiClient]struct{}{
afiClient{
afi: packet.IPv4AFI,
client: nil,
}: struct{}{},
},
},
},
expected: &BMPServer{
routers: map[string]*router{
"10.20.30.60": &router{
ribClients: map[afiClient]struct{}{
afiClient{
afi: packet.IPv4AFI,
client: nil,
}: struct{}{},
},
},
"20.30.40.50": &router{
ribClients: map[afiClient]struct{}{
afiClient{
afi: packet.IPv4AFI,
client: nil,
}: struct{}{},
},
},
},
ribClients: map[string]map[afiClient]struct{}{
"20.30.40.50": map[afiClient]struct{}{
afiClient{
afi: packet.IPv4AFI,
client: nil,
}: struct{}{},
},
"10.20.30.60": map[afiClient]struct{}{
afiClient{
afi: packet.IPv4AFI,
client: nil,
}: struct{}{},
},
},
},
},
{
name: "Unsubscribe existing from non-existing client",
srv: &BMPServer{
routers: map[string]*router{
"10.20.30.40": &router{
ribClients: map[afiClient]struct{}{},
},
"20.30.40.50": &router{
ribClients: map[afiClient]struct{}{
afiClient{
afi: packet.IPv4AFI,
client: nil,
}: struct{}{},
},
},
},
ribClients: map[string]map[afiClient]struct{}{
"20.30.40.40": map[afiClient]struct{}{},
"10.20.30.60": map[afiClient]struct{}{
afiClient{
afi: packet.IPv4AFI,
client: nil,
}: struct{}{},
},
},
},
expected: &BMPServer{
routers: map[string]*router{
"10.20.30.40": &router{
ribClients: map[afiClient]struct{}{},
},
"20.30.40.50": &router{
ribClients: map[afiClient]struct{}{
afiClient{
afi: packet.IPv4AFI,
client: nil,
}: struct{}{},
},
},
},
ribClients: map[string]map[afiClient]struct{}{
"20.30.40.40": map[afiClient]struct{}{},
"10.20.30.60": map[afiClient]struct{}{
afiClient{
afi: packet.IPv4AFI,
client: nil,
}: struct{}{},
},
},
},
},
}
for _, test := range tests {
test.srv.UnsubscribeRIBs(nil, net.IP{10, 20, 30, 40}, packet.IPv4AFI)
assert.Equalf(t, test.expected, test.srv, "Test %q", test.name)
}
}
......@@ -29,6 +29,7 @@ type state interface {
// FSM implements the BGP finite state machine (RFC4271)
type FSM struct {
isBMP bool
peer *peer
eventCh chan int
con net.Conn
......@@ -330,6 +331,10 @@ func recvMsg(c net.Conn) (msg []byte, err error) {
}
func stopTimer(t *time.Timer) {
if t == nil {
return
}
if !t.Stop() {
select {
case <-t.C:
......
......
......@@ -53,6 +53,7 @@ func (f *fsmAddressFamily) init(n *routingtable.Neighbor) {
f.adjRIBIn = adjRIBIn.New(f.importFilter, contributingASNs, f.fsm.peer.routerID, f.fsm.peer.clusterID, f.addPathRX)
contributingASNs.Add(f.fsm.peer.localASN)
f.adjRIBIn.Register(f.rib)
f.adjRIBOut = adjRIBOut.New(n, f.exportFilter, !f.addPathTX.BestOnly)
......@@ -65,6 +66,24 @@ func (f *fsmAddressFamily) init(n *routingtable.Neighbor) {
f.rib.RegisterWithOptions(f.adjRIBOut, f.addPathTX)
}
func (f *fsmAddressFamily) bmpInit() {
f.adjRIBIn = adjRIBIn.New(filter.NewAcceptAllFilter(), &routingtable.ContributingASNs{}, f.fsm.peer.routerID, f.fsm.peer.clusterID, f.addPathRX)
if f.rib != nil {
f.adjRIBIn.Register(f.rib)
}
}
func (f *fsmAddressFamily) bmpDispose() {
f.rib.GetContributingASNs().Remove(f.fsm.peer.localASN)
f.adjRIBIn.(*adjRIBIn.AdjRIBIn).Flush()
f.adjRIBIn.Unregister(f.rib)
f.adjRIBIn = nil
}
func (f *fsmAddressFamily) dispose() {
if !f.initialized {
return
......@@ -87,14 +106,12 @@ func (f *fsmAddressFamily) processUpdate(u *packet.BGPUpdate) {
return
}
if f.multiProtocol {
f.multiProtocolUpdates(u)
return
}
if f.afi == packet.IPv4AFI {
f.withdraws(u)
f.updates(u)
}
}
func (f *fsmAddressFamily) withdraws(u *packet.BGPUpdate) {
for r := u.WithdrawnRoutes; r != nil; r = r.Next {
......@@ -184,6 +201,8 @@ func (f *fsmAddressFamily) processAttributes(attrs *packet.PathAttribute, path *
path.BGPPath.OriginatorID = pa.Value.(uint32)
case packet.ClusterListAttr:
path.BGPPath.ClusterList = pa.Value.([]uint32)
case packet.MultiProtocolReachNLRICode:
case packet.MultiProtocolUnreachNLRICode:
default:
unknownAttr := f.processUnknownAttribute(pa)
if unknownAttr != nil {
......
......
......@@ -153,16 +153,19 @@ func (s *establishedState) msgReceived(data []byte, opt *packet.DecodeOptions) (
s.fsm.sendNotification(bgperr.ErrorCode, bgperr.ErrorSubCode)
}
stopTimer(s.fsm.connectRetryTimer)
if s.fsm.con != nil {
s.fsm.con.Close()
}
s.fsm.connectRetryCounter++
return newIdleState(s.fsm), "Failed to decode BGP message"
return newIdleState(s.fsm), fmt.Sprintf("Failed to decode BGP message: %v", err)
}
switch msg.Header.Type {
case packet.NotificationMsg:
fmt.Println(data)
return s.notification()
case packet.UpdateMsg:
return s.update(msg)
return s.update(msg.Body.(*packet.BGPUpdate))
case packet.KeepaliveMsg:
return s.keepaliveReceived()
default:
......@@ -178,13 +181,11 @@ func (s *establishedState) notification() (state, string) {
return newIdleState(s.fsm), "Received NOTIFICATION"
}
func (s *establishedState) update(msg *packet.BGPMessage) (state, string) {
func (s *establishedState) update(u *packet.BGPUpdate) (state, string) {
if s.fsm.holdTime != 0 {
s.fsm.holdTimer.Reset(s.fsm.holdTime)
}
u := msg.Body.(*packet.BGPUpdate)
if s.fsm.ipv4Unicast != nil {
s.fsm.ipv4Unicast.processUpdate(u)
}
......
......
......@@ -92,7 +92,7 @@ func (s *openSentState) msgReceived(data []byte, opt *packet.DecodeOptions) (sta
case packet.NotificationMsg:
return s.notification(msg)
case packet.OpenMsg:
return s.openMsgReceived(msg)
return s.openMsgReceived(msg.Body.(*packet.BGPOpen))
default:
return s.unexpectedMessage()
}
......@@ -106,11 +106,15 @@ func (s *openSentState) unexpectedMessage() (state, string) {
return newIdleState(s.fsm), "FSM Error"
}
func (s *openSentState) openMsgReceived(msg *packet.BGPMessage) (state, string) {
openMsg := msg.Body.(*packet.BGPOpen)
func (s *openSentState) openMsgReceived(openMsg *packet.BGPOpen) (state, string) {
s.peerASNRcvd = uint32(openMsg.ASN)
s.fsm.neighborID = openMsg.BGPIdentifier
if s.fsm.isBMP {
return s.handleOpenMessage(openMsg)
}
stopTimer(s.fsm.connectRetryTimer)
if s.fsm.peer.collisionHandling(s.fsm) {
return s.cease()
......
......
package server
import (
"net"
"testing"
"github.com/bio-routing/bio-rd/protocols/bgp/packet"
......@@ -69,7 +70,19 @@ func TestOpenMsgReceived(t *testing.T) {
fsm := newFSM(&peer{
peerASN: test.asn,
})
fsm.con = &btesting.MockConn{}
conA, conB := net.Pipe()
fsm.con = conB
go func() {
for {
buf := make([]byte, 1)
_, err := conA.Read(buf)
if err != nil {
return
}
}
}()
s := &openSentState{
fsm: fsm,
......
......
......@@ -22,6 +22,7 @@ type PeerInfo struct {
type peer struct {
server *bgpServer
addr bnet.IP
localAddr bnet.IP
peerASN uint32
localASN uint32
......
......
......@@ -61,7 +61,7 @@ func Decode(msg []byte) (Msg, error) {
return sr, nil
case PeerDownNotificationType:
pd, err := decodePeerUpNotification(buf, ch)
pd, err := decodePeerDownNotification(buf, ch)
if err != nil {
return nil, fmt.Errorf("Unable to decode peer down notification: %v", err)
}
......
......
......@@ -26,11 +26,11 @@ func TestDecode(t *testing.T) {
{
name: "Route monitoring ok",
input: []byte{
3, 0, 0, 0, 6 + 38 + 4, 0,
3, 0, 0, 0, 6 + PerPeerHeaderLen + 4, 0,
1,
2,
0, 0, 0, 3,
0, 0, 0, 0, 0, 0, 0, 3,
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
0, 0, 200, 124,
0, 0, 0, 123,
......@@ -43,7 +43,7 @@ func TestDecode(t *testing.T) {
expected: &RouteMonitoringMsg{
CommonHeader: &CommonHeader{
Version: 3,
MsgLength: 6 + 38 + 4,
MsgLength: 6 + PerPeerHeaderLen + 4,
MsgType: 0,
},
PerPeerHeader: &PerPeerHeader{
......@@ -62,11 +62,11 @@ func TestDecode(t *testing.T) {
{
name: "Route monitoring nok",
input: []byte{
3, 0, 0, 0, 6 + 38 + 4, 0,
3, 0, 0, 0, 6 + PerPeerHeaderLen + 4, 0,
1,
2,
0, 0, 0, 3,
0, 0, 0, 0, 0, 0, 0, 3,
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
0, 0, 200, 124,
0, 0, 0, 123,
......@@ -80,11 +80,11 @@ func TestDecode(t *testing.T) {
{
name: "Statistic report ok",
input: []byte{
3, 0, 0, 0, 6 + 9 + 38, 1,
3, 0, 0, 0, 6 + 9 + PerPeerHeaderLen, 1,
1,
2,
0, 0, 0, 3,
0, 0, 0, 0, 0, 0, 0, 3,
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
0, 0, 200, 124,
0, 0, 0, 123,
......@@ -98,7 +98,7 @@ func TestDecode(t *testing.T) {
expected: &StatsReport{
CommonHeader: &CommonHeader{
Version: 3,
MsgLength: 6 + 9 + 38,
MsgLength: 6 + 9 + PerPeerHeaderLen,
MsgType: 1,
},
PerPeerHeader: &PerPeerHeader{
......@@ -124,11 +124,11 @@ func TestDecode(t *testing.T) {
{
name: "Statistic report nok",
input: []byte{
3, 0, 0, 0, 6 + 9 + 38, 1,
3, 0, 0, 0, 6 + 9 + PerPeerHeaderLen, 1,
1,
2,
0, 0, 0, 3,
0, 0, 0, 0, 0, 0, 0, 3,
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
0, 0, 200, 124,
},
......@@ -137,11 +137,11 @@ func TestDecode(t *testing.T) {
{
name: "peer down ok",
input: []byte{
3, 0, 0, 0, 6 + 9 + 38, 1,
3, 0, 0, 0, 6 + 9 + PerPeerHeaderLen, 1,
1,
2,
0, 0, 0, 3,
0, 0, 0, 0, 0, 0, 0, 3,
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
0, 0, 200, 124,
0, 0, 0, 123,
......@@ -155,7 +155,7 @@ func TestDecode(t *testing.T) {
expected: &StatsReport{
CommonHeader: &CommonHeader{
Version: 3,
MsgLength: 6 + 9 + 38,
MsgLength: 6 + 9 + PerPeerHeaderLen,
MsgType: 1,
},
PerPeerHeader: &PerPeerHeader{
......@@ -181,11 +181,11 @@ func TestDecode(t *testing.T) {
{
name: "peer down nok",
input: []byte{
3, 0, 0, 0, 6 + 9 + 38, 1,
3, 0, 0, 0, 6 + 9 + PerPeerHeaderLen, 1,
1,
2,
0, 0, 0, 3,
0, 0, 0, 0, 0, 0, 0, 3,
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
0, 0, 200, 124,
0, 0, 0, 123,
......@@ -204,7 +204,7 @@ func TestDecode(t *testing.T) {
1,
2,
0, 0, 0, 3,
0, 0, 0, 0, 0, 0, 0, 3,
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
0, 0, 200, 124,
0, 0, 0, 123,
......@@ -216,6 +216,9 @@ func TestDecode(t *testing.T) {
0, 200,
// OPEN Sent
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
0, 34,
1,
4, // Version
1, 0, // ASN
2, 0, // Hold Time
......@@ -224,6 +227,9 @@ func TestDecode(t *testing.T) {
1, 2, 3, 4, 5,
// OPEN Recv
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
0, 29,
1,
4, // Version
1, 0, // ASN
2, 0, // Hold Time
......@@ -253,6 +259,9 @@ func TestDecode(t *testing.T) {
LocalPort: 100,
RemotePort: 200,
SentOpenMsg: []byte{
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
0, 34,
1,
4, // Version
1, 0, // ASN
2, 0, // Hold Time
......@@ -261,7 +270,9 @@ func TestDecode(t *testing.T) {
1, 2, 3, 4, 5,
},
ReceivedOpenMsg: []byte{
// OPEN Recv
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
0, 29,
1,
4, // Version
1, 0, // ASN
2, 0, // Hold Time
......@@ -280,7 +291,7 @@ func TestDecode(t *testing.T) {
1,
2,
0, 0, 0, 3,
0, 0, 0, 0, 0, 0, 0, 3,
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
0, 0, 200, 124,
0, 0, 0, 123,
......@@ -374,7 +385,7 @@ func TestDecode(t *testing.T) {
1,
2,
0, 0, 0, 3,
0, 0, 0, 0, 0, 0, 0, 3,
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
0, 0, 200, 124,
0, 0, 0, 123,
......@@ -416,7 +427,7 @@ func TestDecode(t *testing.T) {
1,
2,
0, 0, 0, 3,
0, 0, 0, 0, 0, 0, 0, 3,
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
0, 0, 200, 124,
0, 0,
......
......
......@@ -57,7 +57,7 @@ func decodePeerDownNotification(buf *bytes.Buffer, ch *CommonHeader) (*PeerDownN
err = decoder.Decode(buf, fields)
if err != nil {
return nil, err
return nil, fmt.Errorf("Unable to read Data: %v", err)
}
return p, nil
......
......
......@@ -32,7 +32,7 @@ func TestDecodePeerDownNotification(t *testing.T) {
input: []byte{
1,
2,
0, 0, 0, 3,
0, 0, 0, 0, 0, 0, 0, 3,
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
0, 0, 200, 124,
0, 0, 0, 123,
......@@ -43,12 +43,12 @@ func TestDecodePeerDownNotification(t *testing.T) {
1, 2, 3,
},
ch: &CommonHeader{
MsgLength: CommonHeaderLen + 4 + 38,
MsgLength: CommonHeaderLen + 4 + PerPeerHeaderLen,
},
wantFail: false,
expected: &PeerDownNotification{
CommonHeader: &CommonHeader{
MsgLength: CommonHeaderLen + 4 + 38,
MsgLength: CommonHeaderLen + 4 + PerPeerHeaderLen,
},
PerPeerHeader: &PerPeerHeader{
PeerType: 1,
......@@ -71,7 +71,7 @@ func TestDecodePeerDownNotification(t *testing.T) {
input: []byte{
1,
2,
0, 0, 0, 3,
0, 0, 0, 0, 0, 0, 0, 3,
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
0, 0, 200, 124,
0, 0, 0, 123,
......@@ -106,7 +106,7 @@ func TestDecodePeerDownNotification(t *testing.T) {
input: []byte{
1,
2,
0, 0, 0, 3,
0, 0, 0, 0, 0, 0, 0, 3,
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
0, 0, 200, 124,
0, 0, 0,
......@@ -121,7 +121,7 @@ func TestDecodePeerDownNotification(t *testing.T) {
input: []byte{
1,
2,
0, 0, 0, 3,
0, 0, 0, 0, 0, 0, 0, 3,
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
0, 0, 200, 124,
0, 0, 0, 123,
......
......
......@@ -9,7 +9,7 @@ import (
const (
// OpenMsgMinLen is the minimal length of a BGP open message
OpenMsgMinLen = 10
OpenMsgMinLen = 29
)
// PeerUpNotification represents a peer up notification
......
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment