From 71bf52556f4f05f0540b1e6cf09c5f6e49b9e2c4 Mon Sep 17 00:00:00 2001 From: Oliver Herms <oliver.herms@exaring.de> Date: Fri, 19 Oct 2018 22:23:22 +0200 Subject: [PATCH] Adding fsm/peer initialization --- examples/bmp/main_bmp.go | 9 +- protocols/bgp/packet/decoder.go | 9 +- protocols/bgp/server/bmp_router.go | 294 +++++++++++++++++++ protocols/bgp/server/bmp_router_test.go | 319 +++++++++++++++++++++ protocols/bgp/server/bmp_server.go | 95 ++++++ protocols/bgp/server/fsm.go | 1 + protocols/bgp/server/fsm_address_family.go | 6 + protocols/bgp/server/fsm_open_sent.go | 10 +- protocols/bgp/server/peer.go | 9 +- 9 files changed, 737 insertions(+), 15 deletions(-) create mode 100644 protocols/bgp/server/bmp_router.go create mode 100644 protocols/bgp/server/bmp_router_test.go create mode 100644 protocols/bgp/server/bmp_server.go diff --git a/examples/bmp/main_bmp.go b/examples/bmp/main_bmp.go index 146fa424..63de954f 100644 --- a/examples/bmp/main_bmp.go +++ b/examples/bmp/main_bmp.go @@ -5,7 +5,7 @@ import ( "net" "time" - "github.com/bio-routing/bio-rd/protocols/bmp/server" + "github.com/bio-routing/bio-rd/protocols/bgp/server" "github.com/bio-routing/bio-rd/routingtable/locRIB" "github.com/sirupsen/logrus" ) @@ -13,13 +13,14 @@ import ( func main() { logrus.Printf("This is a BMP speaker\n") - rib := locRIB.New() + rib4 := locRIB.New() + rib6 := locRIB.New() b := server.NewServer() - b.AddRouter(net.IP{127, 0, 0, 1}, 1234, rib, nil) + b.AddRouter(net.IP{10, 0, 255, 0}, 30119, rib4, rib6) go func() { for { - fmt.Printf("LocRIB count: %d\n", rib.Count()) + fmt.Printf("LocRIB4 count: %d\n", rib4.Count()) time.Sleep(time.Second * 10) } }() diff --git a/protocols/bgp/packet/decoder.go b/protocols/bgp/packet/decoder.go index 73613758..5ad26c87 100644 --- a/protocols/bgp/packet/decoder.go +++ b/protocols/bgp/packet/decoder.go @@ -30,7 +30,7 @@ func Decode(buf *bytes.Buffer, opt *DecodeOptions) (*BGPMessage, error) { func decodeMsgBody(buf *bytes.Buffer, msgType uint8, l uint16, opt *DecodeOptions) (interface{}, error) { switch msgType { case OpenMsg: - return decodeOpenMsg(buf) + return DecodeOpenMsg(buf) case UpdateMsg: return decodeUpdateMsg(buf, l, opt) case KeepaliveMsg: @@ -128,7 +128,8 @@ func invalidErrCode(n *BGPNotification) (*BGPNotification, error) { return n, fmt.Errorf("Invalid error sub code: %d/%d", n.ErrorCode, n.ErrorSubcode) } -func decodeOpenMsg(buf *bytes.Buffer) (*BGPOpen, error) { +// DecodeOpenMsg decodes a BGP OPEN message +func DecodeOpenMsg(buf *bytes.Buffer) (*BGPOpen, error) { msg, err := _decodeOpenMsg(buf) if err != nil { return nil, fmt.Errorf("Unable to decode OPEN message: %v", err) @@ -241,13 +242,13 @@ func decodeCapability(buf *bytes.Buffer) (Capability, error) { case AddPathCapabilityCode: addPathCap, err := decodeAddPathCapability(buf) if err != nil { - return cap, fmt.Errorf("Unable to decode add path capability") + return cap, fmt.Errorf("Unable to decode add path capability: %v", err) } cap.Value = addPathCap case ASN4CapabilityCode: asn4Cap, err := decodeASN4Capability(buf) if err != nil { - return cap, fmt.Errorf("Unable to decode 4 octet ASN capability") + return cap, fmt.Errorf("Unable to decode 4 octet ASN capability: %v", err) } cap.Value = asn4Cap default: diff --git a/protocols/bgp/server/bmp_router.go b/protocols/bgp/server/bmp_router.go new file mode 100644 index 00000000..8667a42e --- /dev/null +++ b/protocols/bgp/server/bmp_router.go @@ -0,0 +1,294 @@ +package server + +import ( + "bytes" + "fmt" + "net" + "sync" + "time" + + bnet "github.com/bio-routing/bio-rd/net" + "github.com/bio-routing/bio-rd/protocols/bgp/packet" + bmppkt "github.com/bio-routing/bio-rd/protocols/bmp/packet" + "github.com/bio-routing/bio-rd/routingtable" + "github.com/bio-routing/bio-rd/routingtable/filter" + "github.com/bio-routing/bio-rd/routingtable/locRIB" + log "github.com/sirupsen/logrus" + "github.com/taktv6/tflow2/convert" +) + +type router struct { + address net.IP + port uint16 + con net.Conn + reconnectTimeMin int + reconnectTimeMax int + reconnectTime int + reconnectTimer *time.Timer + rib4 *locRIB.LocRIB + rib6 *locRIB.LocRIB + neighbors map[[16]byte]*neighbor + neighborsMu sync.Mutex +} + +type neighbor struct { + localAS uint32 + peerAS uint32 + address [16]byte + routerID uint32 + fsm *FSM + opt *packet.DecodeOptions +} + +func (r *router) serve() { + for { + msg, err := recvBMPMsg(r.con) + if err != nil { + log.Errorf("Unable to get message: %v", err) + return + } + + bmpMsg, err := bmppkt.Decode(msg) + if err != nil { + log.Errorf("Unable to decode BMP message: %v", err) + fmt.Printf("msg: %v\n", msg) + return + } + + switch bmpMsg.MsgType() { + case bmppkt.PeerUpNotificationType: + r.processPeerUpNotification(bmpMsg.(*bmppkt.PeerUpNotification)) + case bmppkt.PeerDownNotificationType: + r.processPeerDownNotification(bmpMsg.(*bmppkt.PeerDownNotification)) + case bmppkt.InitiationMessageType: + r.processInitiationMsg(bmpMsg.(*bmppkt.InitiationMessage)) + case bmppkt.TerminationMessageType: + r.processTerminationMsg(bmpMsg.(*bmppkt.TerminationMessage)) + return + case bmppkt.RouteMonitoringType: + r.processRouteMonitoringMsg(bmpMsg.(*bmppkt.RouteMonitoringMsg)) + } + } +} + +func (r *router) processRouteMonitoringMsg(msg *bmppkt.RouteMonitoringMsg) { + r.neighborsMu.Lock() + defer r.neighborsMu.Unlock() + + if _, ok := r.neighbors[msg.PerPeerHeader.PeerAddress]; !ok { + log.Errorf("Received route monitoring message for non-existent neighbor %v on %s", msg.PerPeerHeader.PeerAddress, r.address.String()) + return + } + + n := r.neighbors[msg.PerPeerHeader.PeerAddress] + s := n.fsm.state.(*establishedState) + s.msgReceived(msg.BGPUpdate, s.fsm.decodeOptions()) +} + +func (r *router) processInitiationMsg(msg *bmppkt.InitiationMessage) { + const ( + stringType = 0 + sysDescrType = 1 + sysNameType = 2 + ) + + logMsg := fmt.Sprintf("Received initiation message from %s:", r.address.String()) + + for _, tlv := range msg.TLVs { + switch tlv.InformationType { + case stringType: + logMsg += fmt.Sprintf(" Message: %q", string(tlv.Information)) + case sysDescrType: + logMsg += fmt.Sprintf(" sysDescr.: %s", string(tlv.Information)) + case sysNameType: + logMsg += fmt.Sprintf(" sysName.: %s", string(tlv.Information)) + } + } + + log.Info(logMsg) +} + +func (r *router) processTerminationMsg(msg *bmppkt.TerminationMessage) { + const ( + stringType = 0 + reasonType = 1 + + adminDown = 0 + unspecReason = 1 + outOfRes = 2 + redundantCon = 3 + permAdminDown = 4 + ) + + logMsg := fmt.Sprintf("Received termination message from %s: ", r.address.String()) + for _, tlv := range msg.TLVs { + switch tlv.InformationType { + case stringType: + logMsg += fmt.Sprintf("Message: %q", string(tlv.Information)) + case reasonType: + reason := convert.Uint16b(tlv.Information[:2]) + switch reason { + case adminDown: + logMsg += "Session administratively down" + case unspecReason: + logMsg += "Unespcified reason" + case outOfRes: + logMsg += "Out of resources" + case redundantCon: + logMsg += "Redundant connection" + case permAdminDown: + logMsg += "Session permanently administratively closed" + } + } + } + + log.Warning(logMsg) + + r.con.Close() + for n := range r.neighbors { + delete(r.neighbors, n) + } +} + +func (r *router) processPeerDownNotification(msg *bmppkt.PeerDownNotification) { + r.neighborsMu.Lock() + defer r.neighborsMu.Unlock() + + if _, ok := r.neighbors[msg.PerPeerHeader.PeerAddress]; !ok { + log.Warningf("Received peer down notification for %v: Peer doesn't exist.", msg.PerPeerHeader.PeerAddress) + return + } + + delete(r.neighbors, msg.PerPeerHeader.PeerAddress) +} + +func (r *router) processPeerUpNotification(msg *bmppkt.PeerUpNotification) error { + r.neighborsMu.Lock() + defer r.neighborsMu.Unlock() + + if _, ok := r.neighbors[msg.PerPeerHeader.PeerAddress]; ok { + return fmt.Errorf("Received peer up notification for %v: Peer exists already", msg.PerPeerHeader.PeerAddress) + } + + if len(msg.SentOpenMsg) < packet.MinOpenLen { + return fmt.Errorf("Received peer up notification for %v: Invalid sent open message: %v", msg.PerPeerHeader.PeerAddress, msg.SentOpenMsg) + } + + sentOpen, err := packet.DecodeOpenMsg(bytes.NewBuffer(msg.SentOpenMsg[19:])) + if err != nil { + return fmt.Errorf("Unable to decode sent open message sent from %v to %v: %v", r.address.String(), msg.PerPeerHeader.PeerAddress, err) + } + + if len(msg.ReceivedOpenMsg) < packet.MinOpenLen { + return fmt.Errorf("Received peer up notification for %v: Invalid received open message: %v", msg.PerPeerHeader.PeerAddress, msg.ReceivedOpenMsg) + } + + recvOpen, err := packet.DecodeOpenMsg(bytes.NewBuffer(msg.ReceivedOpenMsg[19:])) + if err != nil { + return fmt.Errorf("Unable to decode received open message sent from %v to %v: %v", msg.PerPeerHeader.PeerAddress, r.address.String(), err) + } + + addrLen := net.IPv4len + for i := 0; i < net.IPv6len-net.IPv4len; i++ { + if msg.PerPeerHeader.PeerAddress[i] == 0 { + continue + } + addrLen = net.IPv6len + break + } + + // bnet.IPFromBytes can only fail if length of argument is not 4 or 16. However, length is ensured here. + peerAddress, _ := bnet.IPFromBytes(msg.PerPeerHeader.PeerAddress[16-addrLen:]) + localAddress, _ := bnet.IPFromBytes(msg.LocalAddress[16-addrLen:]) + + fsm := &FSM{ + isBMP: true, + peer: &peer{ + addr: peerAddress, + localAddr: localAddress, + peerASN: msg.PerPeerHeader.PeerAS, + localASN: uint32(sentOpen.ASN), + ipv4: &peerAddressFamily{}, + ipv6: &peerAddressFamily{}, + }, + } + + fsm.peer.configureBySentOpen(sentOpen) + + fsm.ipv4Unicast = newFSMAddressFamily(packet.IPv4AFI, packet.UnicastSAFI, &peerAddressFamily{ + rib: r.rib4, + importFilter: filter.NewAcceptAllFilter(), + }, fsm) + fsm.ipv4Unicast.bmpInit() + + /*fsm.ipv6Unicast = newFSMAddressFamily(packet.IPv6AFI, packet.UnicastSAFI, &peerAddressFamily{ + rib: r.rib6, + importFilter: filter.NewAcceptAllFilter(), + //exportFilter: filter.NewDrainFilter(), + }, fsm) + fsm.ipv6Unicast.bmpInit()*/ + + fsm.state = newOpenSentState(fsm) + openSent := fsm.state.(*openSentState) + openSent.openMsgReceived(recvOpen) + + fsm.state = newEstablishedState(fsm) + n := &neighbor{ + localAS: fsm.peer.localASN, + peerAS: msg.PerPeerHeader.PeerAS, + address: msg.PerPeerHeader.PeerAddress, + routerID: recvOpen.BGPIdentifier, + fsm: fsm, + opt: fsm.decodeOptions(), + } + + r.neighbors[msg.PerPeerHeader.PeerAddress] = n + return nil +} + +func (p *peer) configureBySentOpen(msg *packet.BGPOpen) { + caps := getCaps(msg.OptParams) + for _, cap := range caps { + switch cap.Code { + case packet.AddPathCapabilityCode: + addPathCap := cap.Value.(packet.AddPathCapability) + peerFamily := p.addressFamily(addPathCap.AFI, addPathCap.SAFI) + if peerFamily == nil { + continue + } + switch addPathCap.SendReceive { + case packet.AddPathSend: + peerFamily.addPathSend = routingtable.ClientOptions{ + MaxPaths: 10, + } + case packet.AddPathReceive: + peerFamily.addPathReceive = true + case packet.AddPathSendReceive: + peerFamily.addPathReceive = true + peerFamily.addPathSend = routingtable.ClientOptions{ + MaxPaths: 10, + } + } + case packet.ASN4CapabilityCode: + asn4Cap := cap.Value.(packet.ASN4Capability) + p.localASN = asn4Cap.ASN4 + // TODO: Make 4Byte ASN configurable + case packet.MultiProtocolCapabilityCode: + /*mpCap := cap.Value.(packet.MultiProtocolCapability) + p := fsm.peer.addressFamily(mpCap.AFI, mpCap.SAFI) + p.multiProtocol = true*/ + // FIXME! + } + } +} + +func getCaps(optParams []packet.OptParam) packet.Capabilities { + for _, optParam := range optParams { + if optParam.Type != packet.CapabilitiesParamType { + continue + } + + return optParam.Value.(packet.Capabilities) + } + return nil +} diff --git a/protocols/bgp/server/bmp_router_test.go b/protocols/bgp/server/bmp_router_test.go new file mode 100644 index 00000000..9cf1060e --- /dev/null +++ b/protocols/bgp/server/bmp_router_test.go @@ -0,0 +1,319 @@ +package server + +import ( + "testing" + + bnet "github.com/bio-routing/bio-rd/net" + "github.com/bio-routing/bio-rd/protocols/bgp/packet" + bmppkt "github.com/bio-routing/bio-rd/protocols/bmp/packet" + "github.com/bio-routing/bio-rd/routingtable" + "github.com/bio-routing/bio-rd/routingtable/adjRIBIn" + "github.com/bio-routing/bio-rd/routingtable/filter" + "github.com/bio-routing/bio-rd/routingtable/locRIB" + "github.com/stretchr/testify/assert" +) + +func TestConfigureBySentOpen(t *testing.T) { + tests := []struct { + name string + p *peer + openMsg *packet.BGPOpen + expected *peer + }{ + { + name: "Test 32bit ASN", + p: &peer{}, + openMsg: &packet.BGPOpen{ + OptParams: []packet.OptParam{ + { + Type: 2, + Value: packet.Capabilities{ + { + Code: packet.ASN4CapabilityCode, + Value: packet.ASN4Capability{ + ASN4: 201701, + }, + }, + }, + }, + }, + }, + expected: &peer{ + localASN: 201701, + }, + }, + { + name: "Test Add Path TX", + p: &peer{ + ipv4: &peerAddressFamily{}, + }, + openMsg: &packet.BGPOpen{ + OptParams: []packet.OptParam{ + { + Type: 2, + Value: packet.Capabilities{ + { + Code: packet.AddPathCapabilityCode, + Value: packet.AddPathCapability{ + AFI: 1, + SAFI: 1, + SendReceive: 2, + }, + }, + }, + }, + }, + }, + expected: &peer{ + ipv4: &peerAddressFamily{ + addPathSend: routingtable.ClientOptions{ + MaxPaths: 10, + }, + }, + }, + }, + { + name: "Test Add Path RX", + p: &peer{ + ipv4: &peerAddressFamily{}, + }, + openMsg: &packet.BGPOpen{ + OptParams: []packet.OptParam{ + { + Type: 2, + Value: packet.Capabilities{ + { + Code: packet.AddPathCapabilityCode, + Value: packet.AddPathCapability{ + AFI: 1, + SAFI: 1, + SendReceive: 1, + }, + }, + }, + }, + }, + }, + expected: &peer{ + ipv4: &peerAddressFamily{ + addPathReceive: true, + }, + }, + }, + { + name: "Test Add Path RX/TX", + p: &peer{ + ipv4: &peerAddressFamily{}, + }, + openMsg: &packet.BGPOpen{ + OptParams: []packet.OptParam{ + { + Type: 2, + Value: packet.Capabilities{ + { + Code: packet.AddPathCapabilityCode, + Value: packet.AddPathCapability{ + AFI: 1, + SAFI: 1, + SendReceive: 3, + }, + }, + }, + }, + }, + }, + expected: &peer{ + ipv4: &peerAddressFamily{ + addPathSend: routingtable.ClientOptions{ + MaxPaths: 10, + }, + addPathReceive: true, + }, + }, + }, + } + + for _, test := range tests { + test.p.configureBySentOpen(test.openMsg) + + assert.Equalf(t, test.expected, test.p, "Test %q", test.name) + } +} + +func TestProcessPeerUpNotification(t *testing.T) { + tests := []struct { + name string + router *router + pkt *bmppkt.PeerUpNotification + wantFail bool + expected *router + }{ + { + name: "Invalid sent open message", + router: &router{ + neighbors: make(map[[16]byte]*neighbor), + }, + pkt: &bmppkt.PeerUpNotification{ + PerPeerHeader: &bmppkt.PerPeerHeader{ + PeerType: 0, + PeerFlags: 0, + PeerDistinguisher: 0, + PeerAddress: [16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 0, 255, 1}, + PeerAS: 51324, + PeerBGPID: 100, + }, + LocalAddress: [16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 0, 255, 0}, + LocalPort: 179, + RemotePort: 34542, + SentOpenMsg: []byte{}, + ReceivedOpenMsg: []byte{}, + Information: []byte{}, + }, + wantFail: true, + }, + { + name: "Invalid received open message", + router: &router{ + neighbors: make(map[[16]byte]*neighbor), + }, + pkt: &bmppkt.PeerUpNotification{ + PerPeerHeader: &bmppkt.PerPeerHeader{ + PeerType: 0, + PeerFlags: 0, + PeerDistinguisher: 0, + PeerAddress: [16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 0, 255, 1}, + PeerAS: 51324, + PeerBGPID: 100, + }, + LocalAddress: [16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 0, 255, 0}, + LocalPort: 179, + RemotePort: 34542, + SentOpenMsg: []byte{ + 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, + 0, 29, + 1, + 4, + 100, 200, + 20, 0, + 10, 20, 30, 40, + 0, + }, + ReceivedOpenMsg: []byte{}, + Information: []byte{}, + }, + wantFail: true, + }, + { + name: "Regular BGP by RFC4271", + router: &router{ + rib4: locRIB.New(), + rib6: locRIB.New(), + neighbors: make(map[[16]byte]*neighbor), + }, + pkt: &bmppkt.PeerUpNotification{ + PerPeerHeader: &bmppkt.PerPeerHeader{ + PeerType: 0, + PeerFlags: 0, + PeerDistinguisher: 0, + PeerAddress: [16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 0, 255, 1}, + PeerAS: 100, + PeerBGPID: 100, + }, + LocalAddress: [16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 0, 255, 0}, + LocalPort: 179, + RemotePort: 34542, + SentOpenMsg: []byte{ + 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, + 0, 29, + 1, + 4, + 0, 200, + 20, 0, + 10, 20, 30, 40, + 0, + }, + ReceivedOpenMsg: []byte{ + 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, + 0, 29, + 1, + 4, + 0, 100, + 20, 0, + 10, 20, 30, 50, + 0, + }, + Information: []byte{}, + }, + wantFail: false, + expected: &router{ + rib4: locRIB.New(), + rib6: locRIB.New(), + neighbors: map[[16]byte]*neighbor{ + [16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 0, 255, 1}: { + localAS: 200, + peerAS: 100, + address: [16]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 0, 255, 1}, + routerID: 169090610, + opt: &packet.DecodeOptions{ + AddPath: false, + Use32BitASN: false, + }, + fsm: &FSM{ + isBMP: true, + neighborID: 169090610, + state: &establishedState{}, + peer: &peer{ + addr: bnet.IPv4FromOctets(10, 0, 255, 1), + localAddr: bnet.IPv4FromOctets(10, 0, 255, 0), + peerASN: 100, + localASN: 200, + ipv4: &peerAddressFamily{}, + ipv6: &peerAddressFamily{}, + }, + ipv4Unicast: &fsmAddressFamily{ + afi: 1, + safi: 1, + adjRIBIn: adjRIBIn.New(filter.NewAcceptAllFilter(), &routingtable.ContributingASNs{}, 0, 0, false), + importFilter: filter.NewAcceptAllFilter(), + }, + }, + }, + }, + }, + }, + } + + for _, test := range tests { + err := test.router.processPeerUpNotification(test.pkt) + if err != nil { + if test.wantFail { + continue + } + + t.Errorf("Unexpected failure for test %q: %v", test.name, err) + continue + } + + if test.wantFail { + t.Errorf("Unexpected success for test %q", test.name) + continue + } + + test.expected.neighbors[test.pkt.PerPeerHeader.PeerAddress].fsm.state = &establishedState{fsm: test.expected.neighbors[test.pkt.PerPeerHeader.PeerAddress].fsm} + + if test.expected.neighbors[test.pkt.PerPeerHeader.PeerAddress].fsm.ipv4Unicast != nil { + test.expected.neighbors[test.pkt.PerPeerHeader.PeerAddress].fsm.ipv4Unicast.rib = test.router.rib4 + test.expected.neighbors[test.pkt.PerPeerHeader.PeerAddress].fsm.ipv4Unicast.fsm = test.expected.neighbors[test.pkt.PerPeerHeader.PeerAddress].fsm + test.expected.neighbors[test.pkt.PerPeerHeader.PeerAddress].fsm.ipv4Unicast.adjRIBIn.Register(test.router.rib4) + } + + if test.expected.neighbors[test.pkt.PerPeerHeader.PeerAddress].fsm.ipv6Unicast != nil { + test.expected.neighbors[test.pkt.PerPeerHeader.PeerAddress].fsm.ipv6Unicast.rib = test.router.rib6 + test.expected.neighbors[test.pkt.PerPeerHeader.PeerAddress].fsm.ipv6Unicast.fsm = test.expected.neighbors[test.pkt.PerPeerHeader.PeerAddress].fsm + test.expected.neighbors[test.pkt.PerPeerHeader.PeerAddress].fsm.ipv6Unicast.adjRIBIn.Register(test.router.rib6) + } + + assert.Equalf(t, test.expected, test.router, "Test %q", test.name) + } + +} diff --git a/protocols/bgp/server/bmp_server.go b/protocols/bgp/server/bmp_server.go new file mode 100644 index 00000000..74f2ce77 --- /dev/null +++ b/protocols/bgp/server/bmp_server.go @@ -0,0 +1,95 @@ +package server + +import ( + "fmt" + "io" + "net" + "sync" + "time" + + "github.com/bio-routing/bio-rd/protocols/bgp/packet" + "github.com/bio-routing/bio-rd/routingtable/locRIB" + log "github.com/sirupsen/logrus" + "github.com/taktv6/tflow2/convert" +) + +const ( + defaultBufferLen = 4096 +) + +// BMPServer represents a BMP server +type BMPServer struct { + routers map[string]*router + routersMu sync.RWMutex + reconnectTime uint +} + +// NewServer creates a new BMP server +func NewServer() *BMPServer { + return &BMPServer{ + routers: make(map[string]*router), + } +} + +// AddRouter adds a router to which we connect with BMP +func (b *BMPServer) AddRouter(addr net.IP, port uint16, rib4 *locRIB.LocRIB, rib6 *locRIB.LocRIB) { + r := &router{ + address: addr, + port: port, + reconnectTimeMin: 30, // Suggested by RFC 7854 + reconnectTimeMax: 720, // Suggested by RFC 7854 + reconnectTimer: time.NewTimer(time.Duration(0)), + rib4: rib4, + rib6: rib6, + neighbors: make(map[[16]byte]*neighbor), + } + + b.routersMu.Lock() + b.routers[fmt.Sprintf("%s:%d", r.address.String(), r.port)] = r + b.routersMu.Unlock() + + go func(r *router) { + for { + <-r.reconnectTimer.C + c, err := net.Dial("tcp", fmt.Sprintf("%s:%d", r.address.String(), r.port)) + if err != nil { + log.Infof("Unable to connect to BMP router: %v", err) + if r.reconnectTime == 0 { + r.reconnectTime = r.reconnectTimeMin + } else if r.reconnectTime < r.reconnectTimeMax { + r.reconnectTime *= 2 + } + r.reconnectTimer = time.NewTimer(time.Second * time.Duration(r.reconnectTime)) + continue + } + + r.reconnectTime = 0 + r.con = c + log.Infof("Connected to %s", r.address.String()) + r.serve() + } + }(r) +} + +func recvBMPMsg(c net.Conn) (msg []byte, err error) { + buffer := make([]byte, defaultBufferLen) + _, err = io.ReadFull(c, buffer[0:packet.MinLen]) + if err != nil { + return nil, fmt.Errorf("Read failed: %v", err) + } + + l := convert.Uint32b(buffer[1:5]) + if l > defaultBufferLen { + tmp := buffer + buffer = make([]byte, l) + copy(buffer, tmp) + } + + toRead := l + _, err = io.ReadFull(c, buffer[packet.MinLen:toRead]) + if err != nil { + return nil, fmt.Errorf("Read failed: %v", err) + } + + return buffer[0:toRead], nil +} diff --git a/protocols/bgp/server/fsm.go b/protocols/bgp/server/fsm.go index c0697f1a..157657fc 100644 --- a/protocols/bgp/server/fsm.go +++ b/protocols/bgp/server/fsm.go @@ -29,6 +29,7 @@ type state interface { // FSM implements the BGP finite state machine (RFC4271) type FSM struct { + isBMP bool peer *peer eventCh chan int con net.Conn diff --git a/protocols/bgp/server/fsm_address_family.go b/protocols/bgp/server/fsm_address_family.go index 0b241ecc..01ee06b7 100644 --- a/protocols/bgp/server/fsm_address_family.go +++ b/protocols/bgp/server/fsm_address_family.go @@ -65,6 +65,12 @@ func (f *fsmAddressFamily) init(n *routingtable.Neighbor) { f.rib.RegisterWithOptions(f.adjRIBOut, f.addPathTX) } +func (f *fsmAddressFamily) bmpInit() { + f.adjRIBIn = adjRIBIn.New(filter.NewAcceptAllFilter(), &routingtable.ContributingASNs{}, f.fsm.peer.routerID, f.fsm.peer.clusterID, f.addPathRX) + + f.adjRIBIn.Register(f.rib) +} + func (f *fsmAddressFamily) dispose() { if !f.initialized { return diff --git a/protocols/bgp/server/fsm_open_sent.go b/protocols/bgp/server/fsm_open_sent.go index a3aac48a..a6689854 100644 --- a/protocols/bgp/server/fsm_open_sent.go +++ b/protocols/bgp/server/fsm_open_sent.go @@ -92,7 +92,7 @@ func (s *openSentState) msgReceived(data []byte, opt *packet.DecodeOptions) (sta case packet.NotificationMsg: return s.notification(msg) case packet.OpenMsg: - return s.openMsgReceived(msg) + return s.openMsgReceived(msg.Body.(*packet.BGPOpen)) default: return s.unexpectedMessage() } @@ -106,11 +106,15 @@ func (s *openSentState) unexpectedMessage() (state, string) { return newIdleState(s.fsm), "FSM Error" } -func (s *openSentState) openMsgReceived(msg *packet.BGPMessage) (state, string) { - openMsg := msg.Body.(*packet.BGPOpen) +func (s *openSentState) openMsgReceived(openMsg *packet.BGPOpen) (state, string) { s.peerASNRcvd = uint32(openMsg.ASN) s.fsm.neighborID = openMsg.BGPIdentifier + + if s.fsm.isBMP { + return s.handleOpenMessage(openMsg) + } + stopTimer(s.fsm.connectRetryTimer) if s.fsm.peer.collisionHandling(s.fsm) { return s.cease() diff --git a/protocols/bgp/server/peer.go b/protocols/bgp/server/peer.go index 9a5b128a..8e0a674a 100644 --- a/protocols/bgp/server/peer.go +++ b/protocols/bgp/server/peer.go @@ -20,10 +20,11 @@ type PeerInfo struct { } type peer struct { - server *bgpServer - addr bnet.IP - peerASN uint32 - localASN uint32 + server *bgpServer + addr bnet.IP + localAddr bnet.IP + peerASN uint32 + localASN uint32 // guarded by fsmsMu fsms []*FSM -- GitLab