diff --git a/config/peer.go b/config/peer.go index ded26d7bd89f5d3bde535148284c632cfef14284..245c1aa2c940a3e878c296748b3c79c62882d479 100644 --- a/config/peer.go +++ b/config/peer.go @@ -2,7 +2,6 @@ package config import ( "net" - "time" "github.com/bio-routing/bio-rd/routingtable" @@ -10,8 +9,9 @@ import ( type Peer struct { AdminEnabled bool - KeepAlive uint16 - HoldTimer uint16 + ReconnectInterval time.Duration + KeepAlive time.Duration + HoldTime time.Duration LocalAddress net.IP PeerAddress net.IP LocalAS uint32 @@ -20,5 +20,4 @@ type Peer struct { RouterID uint32 AddPathSend routingtable.ClientOptions AddPathRecv bool - ReconnectInterval time.Duration } diff --git a/main.go b/main.go index 1952d771d9713831e851f7c7388b532ba1f4f7f1..49d19c0f0f76019ae6f522b5f82e0262b5ec99a9 100644 --- a/main.go +++ b/main.go @@ -32,32 +32,34 @@ func main() { } b.AddPeer(config.Peer{ - AdminEnabled: true, - LocalAS: 65200, - PeerAS: 65300, - PeerAddress: net.IP([]byte{169, 254, 200, 1}), - LocalAddress: net.IP([]byte{169, 254, 200, 0}), - HoldTimer: 90, - KeepAlive: 30, - Passive: true, - RouterID: b.RouterID(), + AdminEnabled: true, + LocalAS: 65200, + PeerAS: 65300, + PeerAddress: net.IP([]byte{169, 254, 200, 1}), + LocalAddress: net.IP([]byte{169, 254, 200, 0}), + ReconnectInterval: time.Second * 15, + HoldTime: time.Second * 90, + KeepAlive: time.Second * 30, + Passive: true, + RouterID: b.RouterID(), AddPathSend: routingtable.ClientOptions{ MaxPaths: 10, }, }, rib) - time.Sleep(time.Second * 15) + //time.Sleep(time.Second * 15) b.AddPeer(config.Peer{ - AdminEnabled: true, - LocalAS: 65200, - PeerAS: 65100, - PeerAddress: net.IP([]byte{169, 254, 100, 0}), - LocalAddress: net.IP([]byte{169, 254, 100, 1}), - HoldTimer: 90, - KeepAlive: 30, - Passive: true, - RouterID: b.RouterID(), + AdminEnabled: true, + LocalAS: 65200, + PeerAS: 65100, + PeerAddress: net.IP([]byte{169, 254, 100, 0}), + LocalAddress: net.IP([]byte{169, 254, 100, 1}), + ReconnectInterval: time.Second * 15, + HoldTime: time.Second * 90, + KeepAlive: time.Second * 30, + Passive: true, + RouterID: b.RouterID(), AddPathSend: routingtable.ClientOptions{ MaxPaths: 10, }, diff --git a/protocols/bgp/packet/decoder_test.go b/protocols/bgp/packet/decoder_test.go index 19a83a960c956378415d2c1792abce344dbb76dc..a9ae237b9cb94a3e0ec6af0b988bc09f41d3f806 100644 --- a/protocols/bgp/packet/decoder_test.go +++ b/protocols/bgp/packet/decoder_test.go @@ -227,6 +227,26 @@ func TestDecode(t *testing.T) { }, wantFail: true, }, + { + testNum: 8, + input: []byte{ + 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, + 0, 21, + 3, + 6, 4, + }, + wantFail: false, + expected: &BGPMessage{ + Header: &BGPHeader{ + Length: 21, + Type: 3, + }, + Body: &BGPNotification{ + ErrorCode: 6, + ErrorSubcode: 4, + }, + }, + }, } for _, test := range tests { @@ -252,7 +272,7 @@ func TestDecode(t *testing.T) { continue } - assert.Equal(t, test.expected, msg) + assert.Equalf(t, test.expected, msg, "Test: %d", test.testNum) } } diff --git a/protocols/bgp/server/fsm.go b/protocols/bgp/server/fsm.go deleted file mode 100644 index 20be5160eff16fb132ffe45ed1a2875b0d333fdb..0000000000000000000000000000000000000000 --- a/protocols/bgp/server/fsm.go +++ /dev/null @@ -1,968 +0,0 @@ -package server - -import ( - "bytes" - "fmt" - "math" - "net" - "time" - - "github.com/bio-routing/bio-rd/routingtable" - - "github.com/bio-routing/bio-rd/config" - tnet "github.com/bio-routing/bio-rd/net" - "github.com/bio-routing/bio-rd/protocols/bgp/packet" - "github.com/bio-routing/bio-rd/route" - "github.com/bio-routing/bio-rd/routingtable/adjRIBIn" - "github.com/bio-routing/bio-rd/routingtable/adjRIBOut" - "github.com/bio-routing/bio-rd/routingtable/adjRIBOutAddPath" - log "github.com/sirupsen/logrus" - tomb "gopkg.in/tomb.v2" -) - -const ( - // Administrative events - ManualStart = 1 - ManualStop = 2 - AutomaticStart = 3 - ManualStartWithPassiveTcpEstablishment = 4 - AutomaticStartWithPassiveTcpEstablishment = 5 - AutomaticStop = 8 - - // Timer events - ConnectRetryTimerExpires = 9 - HoldTimerExpires = 10 - KeepaliveTimerExpires = 11 -) - -const ( - Cease = 0 - Idle = 1 - Connect = 2 - Active = 3 - OpenSent = 4 - OpenConfirm = 5 - Established = 6 -) - -type FSM struct { - peer *Peer - - t tomb.Tomb - stateReason string - state int - lastState int - eventCh chan int - - con *net.TCPConn - con2 *net.TCPConn - conCh chan *net.TCPConn - conErrCh chan error - initiateCon chan struct{} - passive bool - - local net.IP - remote net.IP - - localASN uint16 - remoteASN uint16 - - neighborID uint32 - routerID uint32 - - delayOpen bool - delayOpenTime time.Duration - delayOpenTimer *time.Timer - - connectRetryTime time.Duration - connectRetryTimer *time.Timer - connectRetryCounter int - - holdTimeConfigured time.Duration - holdTime time.Duration - holdTimer *time.Timer - - keepaliveTime time.Duration - keepaliveTimer *time.Ticker - - msgRecvCh chan msgRecvMsg - msgRecvFailCh chan msgRecvErr - stopMsgRecvCh chan struct{} - - adjRIBIn *adjRIBIn.AdjRIBIn - adjRIBOut routingtable.RouteTableClient - rib routingtable.RouteTableClient - updateSender routingtable.RouteTableClient - - capAddPathSend bool - capAddPathRecv bool -} - -type msgRecvMsg struct { - msg []byte - con *net.TCPConn -} - -type msgRecvErr struct { - err error - con *net.TCPConn -} - -func NewFSM(peer *Peer, c config.Peer, rib routingtable.RouteTableClient) *FSM { - fsm := &FSM{ - peer: peer, - state: Idle, - passive: true, - connectRetryTime: 5, - connectRetryTimer: time.NewTimer(time.Second * time.Duration(20)), - - msgRecvCh: make(chan msgRecvMsg), - msgRecvFailCh: make(chan msgRecvErr), - stopMsgRecvCh: make(chan struct{}), - - holdTimeConfigured: time.Duration(c.HoldTimer), - holdTimer: time.NewTimer(0), - - keepaliveTime: time.Duration(c.KeepAlive), - keepaliveTimer: time.NewTicker(time.Duration(c.KeepAlive)), - - routerID: c.RouterID, - remote: c.PeerAddress, - local: c.LocalAddress, - localASN: uint16(c.LocalAS), - eventCh: make(chan int), - conCh: make(chan *net.TCPConn), - conErrCh: make(chan error), initiateCon: make(chan struct{}), - - rib: rib, - } - - return fsm -} - -func (fsm *FSM) disconnect() { - if fsm.con != nil { - fsm.con.Close() - fsm.con = nil - } - if fsm.con2 != nil { - fsm.con2.Close() - fsm.con2 = nil - } -} - -func (fsm *FSM) changeState(new int, reason string) int { - state := map[int]string{ - Cease: "Cease", - Idle: "Idle", - Connect: "Connect", - Active: "Active", - OpenSent: "OpenSent", - OpenConfirm: "OpenConfirm", - Established: "Established", - } - - log.WithFields(log.Fields{ - "peer": fsm.remote.String(), - "last_state": state[fsm.state], - "new_state": state[new], - "reason": reason, - }).Info("FSM: Neighbor state change") - - fsm.lastState = fsm.state - fsm.state = new - fsm.stateReason = reason - - return fsm.state -} - -func (fsm *FSM) activate() { - fsm.eventCh <- ManualStart -} - -func (fsm *FSM) Stop() error { - fsm.eventCh <- ManualStop - fsm.t.Kill(nil) - return fsm.t.Wait() -} - -func (fsm *FSM) start() { - fsm.t.Go(fsm.main) - fsm.t.Go(fsm.tcpConnector) - return -} - -func (fsm *FSM) main() error { - next := fsm.idle() - for { - switch next { - case Cease: - fsm.t.Kill(fmt.Errorf("FSM is being stopped")) - return nil - case Idle: - next = fsm.idle() - case Connect: - next = fsm.connect() - case Active: - next = fsm.active() - case OpenSent: - next = fsm.openSent() - case OpenConfirm: - next = fsm.openConfirm() - case Established: - next = fsm.established() - } - } -} - -func (fsm *FSM) idle() int { - if fsm.adjRIBOut != nil { - fsm.rib.Unregister(fsm.adjRIBOut) - fsm.adjRIBOut.Unregister(fsm.updateSender) - } - fsm.adjRIBIn = nil - fsm.adjRIBOut = nil - for { - select { - case c := <-fsm.conCh: - c.Close() - continue - case e := <-fsm.eventCh: - reason := "" - switch e { - case ManualStart: - reason = "Received ManualStart event %d for %s peer" - case AutomaticStart: - reason = "Received AutomaticStart event %d for %s peer" - default: - continue - } - - fsm.connectRetryCounter = 0 - fsm.startConnectRetryTimer() - if fsm.passive { - return fsm.changeState(Active, fmt.Sprintf(reason, "passive")) - } - fsm.tcpConnect() - return fsm.changeState(Connect, fmt.Sprintf(reason, "active")) - } - - } -} - -func (fsm *FSM) tcpConnector() error { - for { - select { - case <-fsm.initiateCon: - c, err := net.DialTCP("tcp", &net.TCPAddr{IP: fsm.local}, &net.TCPAddr{IP: fsm.remote, Port: BGPPORT}) - if err != nil { - select { - case fsm.conErrCh <- err: - continue - case <-time.NewTimer(time.Second * 30).C: - continue - } - } - - select { - case fsm.conCh <- c: - continue - case <-time.NewTimer(time.Second * 30).C: - c.Close() - continue - } - case <-fsm.t.Dying(): - return nil - } - } -} - -func (fsm *FSM) tcpConnect() { - fsm.initiateCon <- struct{}{} -} - -// connect state waits for a TCP connection to be fully established. Either the active or passive one. -func (fsm *FSM) connect() int { - for { - select { - case e := <-fsm.eventCh: - if e == ManualStop { - fsm.connectRetryCounter = 0 - stopTimer(fsm.connectRetryTimer) - return fsm.changeState(Idle, "Manual stop event") - } - continue - case <-fsm.connectRetryTimer.C: - fsm.resetConnectRetryTimer() - fsm.tcpConnect() - continue - case c := <-fsm.conCh: - fsm.con = c - stopTimer(fsm.connectRetryTimer) - return fsm.connectSendOpen() - } - } -} - -func (fsm *FSM) connectSendOpen() int { - err := fsm.sendOpen(fsm.con) - if err != nil { - stopTimer(fsm.connectRetryTimer) - return fsm.changeState(Idle, fmt.Sprintf("Sending OPEN message failed: %v", err)) - } - fsm.holdTimer = time.NewTimer(time.Minute * 4) - return fsm.changeState(OpenSent, "Sent OPEN message") -} - -// in the active state we wait for a passive TCP connection to be established -func (fsm *FSM) active() int { - for { - select { - case e := <-fsm.eventCh: - if e == ManualStop { - fsm.disconnect() - fsm.connectRetryCounter = 0 - stopTimer(fsm.connectRetryTimer) - return fsm.changeState(Active, "Manual stop event") - } - continue - case <-fsm.connectRetryTimer.C: - fsm.resetConnectRetryTimer() - fsm.tcpConnect() - return fsm.changeState(Connect, "Connect retry timer expired") - case c := <-fsm.conCh: - fsm.con = c - stopTimer(fsm.connectRetryTimer) - return fsm.activeSendOpen() - } - } -} - -func (fsm *FSM) activeSendOpen() int { - err := fsm.sendOpen(fsm.con) - if err != nil { - fsm.resetConnectRetryTimer() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, fmt.Sprintf("Sending OPEN message failed: %v", err)) - } - fsm.holdTimer = time.NewTimer(time.Minute * 4) - return fsm.changeState(OpenSent, "Sent OPEN message") -} - -func (fsm *FSM) msgReceiver(c *net.TCPConn) error { - for { - msg, err := recvMsg(c) - if err != nil { - fsm.msgRecvFailCh <- msgRecvErr{err: err, con: c} - return nil - - /*select { - case fsm.msgRecvFailCh <- msgRecvErr{err: err, con: c}: - continue - case <-time.NewTimer(time.Second * 60).C: - return nil - }*/ - } - fsm.msgRecvCh <- msgRecvMsg{msg: msg, con: c} - - select { - case <-fsm.stopMsgRecvCh: - return nil - default: - continue - } - } -} - -func (fsm *FSM) openSent() int { - go fsm.msgReceiver(fsm.con) - - for { - select { - case e := <-fsm.eventCh: - if e == ManualStop { - sendNotification(fsm.con, packet.Cease, 0) - stopTimer(fsm.connectRetryTimer) - fsm.disconnect() - fsm.connectRetryCounter = 0 - return fsm.changeState(Idle, "Manual stop event") - } - continue - case <-fsm.holdTimer.C: - sendNotification(fsm.con, packet.HoldTimeExpired, 0) - stopTimer(fsm.connectRetryTimer) - fsm.disconnect() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, "Holdtimer expired") - case c := <-fsm.conCh: // 2nd connection coming in. Collision! - if fsm.con2 != nil { - log.WithFields(log.Fields{ - "peer": fsm.remote.String(), - "local": fsm.local.String(), - }).Warningf("Received third connection from peer. Dropping new connection") - c.Close() - continue - } - - err := fsm.sendOpen(c) // FIXME: Not sure if this is standard compliant - if err != nil { - c.Close() - continue - } - fsm.con2 = c - go fsm.msgReceiver(c) - continue - case recvMsg := <-fsm.msgRecvCh: - msg, err := packet.Decode(bytes.NewBuffer(recvMsg.msg)) - if err != nil { - switch bgperr := err.(type) { - case packet.BGPError: - sendNotification(fsm.con, bgperr.ErrorCode, bgperr.ErrorSubCode) - sendNotification(fsm.con2, bgperr.ErrorCode, bgperr.ErrorSubCode) - } - stopTimer(fsm.connectRetryTimer) - fsm.disconnect() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, fmt.Sprintf("Failed to decode BGP message: %v", err)) - } - switch msg.Header.Type { - case packet.NotificationMsg: - nMsg := msg.Body.(*packet.BGPNotification) - if nMsg.ErrorCode == packet.UnsupportedVersionNumber { - stopTimer(fsm.connectRetryTimer) - fsm.disconnect() - return fsm.changeState(Idle, "Received NOTIFICATION") - } - - if nMsg.ErrorCode == packet.Cease { - // Was this connection to be closed anyways? - if fsm.dumpCon(recvMsg.con) { - continue - } - } - stopTimer(fsm.connectRetryTimer) - fsm.disconnect() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, "Received NOTIFICATION") - case packet.OpenMsg: - openMsg := msg.Body.(*packet.BGPOpen) - fsm.neighborID = openMsg.BGPIdentifier - fsm.resolveCollision() - stopTimer(fsm.connectRetryTimer) - err := fsm.sendKeepalive() - if err != nil { - return fsm.openSentTCPFail(err) - } - fsm.holdTime = time.Duration(math.Min(float64(fsm.holdTimeConfigured), float64(openMsg.HoldTime))) - if fsm.holdTime != 0 { - fsm.holdTimer.Reset(time.Second * fsm.holdTime) - fsm.keepaliveTime = fsm.holdTime / 3 - if fsm.keepaliveTimer != nil { - fsm.keepaliveTimer.Stop() - } - fsm.keepaliveTimer = time.NewTicker(fsm.keepaliveTime * time.Second) - } - - fsm.processOpenOptions(openMsg.OptParams) - - return fsm.changeState(OpenConfirm, "Received OPEN message") - default: - sendNotification(fsm.con, packet.FiniteStateMachineError, 0) - stopTimer(fsm.connectRetryTimer) - fsm.con.Close() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, "FSM Error") - } - case err := <-fsm.msgRecvFailCh: - if err.con == fsm.con && fsm.con2 != nil { - fsm.con.Close() - fsm.con = fsm.con2 - fsm.con2 = nil - continue - } - - if err.con == fsm.con2 { - fsm.con2.Close() - fsm.con2 = nil - continue - } - return fsm.openSentTCPFail(err.err) - } - } -} - -func (fsm *FSM) processOpenOptions(optParams []packet.OptParam) { - for _, optParam := range optParams { - if optParam.Type != packet.CapabilitiesParamType { - continue - } - - fsm.processCapabilities(optParam.Value.(packet.Capabilities)) - } -} - -func (fsm *FSM) processCapabilities(caps packet.Capabilities) { - for _, cap := range caps { - fsm.processCapability(cap) - } -} - -func (fsm *FSM) processCapability(cap packet.Capability) { - switch cap.Code { - case packet.AddPathCapabilityCode: - fsm.processAddPathCapability(cap.Value.(packet.AddPathCapability)) - - } -} - -func (fsm *FSM) processAddPathCapability(addPathCap packet.AddPathCapability) { - if addPathCap.AFI != 1 { - return - } - if addPathCap.SAFI != 1 { - return - } - switch addPathCap.SendReceive { - case packet.AddPathReceive: - if !fsm.peer.addPathSend.BestOnly { - fsm.capAddPathSend = true - } - case packet.AddPathSend: - if fsm.peer.addPathRecv { - fsm.capAddPathRecv = true - } - case packet.AddPathSendReceive: - if !fsm.peer.addPathSend.BestOnly { - fsm.capAddPathSend = true - } - if fsm.peer.addPathRecv { - fsm.capAddPathRecv = true - } - } -} - -func (fsm *FSM) openSentTCPFail(err error) int { - fsm.con.Close() - fsm.resetConnectRetryTimer() - return fsm.changeState(Active, fmt.Sprintf("TCP failure: %v", err)) -} - -func (fsm *FSM) dumpCon(c *net.TCPConn) bool { - p := fsm.isPassive(c) - if fsm.routerID > fsm.neighborID { - return p - } - return !p -} - -func (fsm *FSM) resolveCollision() { - if fsm.con2 == nil { - return - } - - if fsm.routerID > fsm.neighborID { - // Terminate passive connection - if fsm.isPassive(fsm.con) { - dumpCon(fsm.con) - fsm.con = fsm.con2 - return - } - if fsm.isPassive(fsm.con2) { - dumpCon(fsm.con2) - return - } - return - } - - // Terminate active connection - if !fsm.isPassive(fsm.con) { - dumpCon(fsm.con) - fsm.con = fsm.con2 - return - } - if !fsm.isPassive(fsm.con2) { - dumpCon(fsm.con2) - fsm.con2.Close() - fsm.con2 = nil - return - } -} - -func dumpCon(c *net.TCPConn) { - sendNotification(c, packet.Cease, packet.ConnectionCollisionResolution) - c.Close() -} - -func (fsm *FSM) isPassive(c *net.TCPConn) bool { - if c.LocalAddr().String() == fmt.Sprintf("%s:179", fsm.local.String()) { - return true - } - return false -} - -func (fsm *FSM) openConfirm() int { - for { - select { - case e := <-fsm.eventCh: - if e == ManualStop { // Event 2 - sendNotification(fsm.con, packet.Cease, 0) - stopTimer(fsm.connectRetryTimer) - fsm.disconnect() - fsm.connectRetryCounter = 0 - return fsm.changeState(Idle, "Manual stop event") - } - continue - case <-fsm.holdTimer.C: - sendNotification(fsm.con, packet.HoldTimeExpired, 0) - stopTimer(fsm.connectRetryTimer) - fsm.disconnect() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, "Holdtimer expired") - case <-fsm.keepaliveTimer.C: - err := fsm.sendKeepalive() - if err != nil { - stopTimer(fsm.connectRetryTimer) - fsm.disconnect() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, fmt.Sprintf("Failed to send keepalive: %v", err)) - } - continue - case c := <-fsm.conCh: - if fsm.con2 != nil { - log.WithFields(log.Fields{ - "peer": fsm.remote.String(), - "local": fsm.local.String(), - }).Warningf("Received third connection from peer. Dropping new connection") - c.Close() - continue - } - - err := fsm.sendOpen(c) // FIXME: Not sure if this is standard compliant - if err != nil { - c.Close() - continue - } - fsm.con2 = c - go fsm.msgReceiver(c) - continue - case recvMsg := <-fsm.msgRecvCh: - msg, err := packet.Decode(bytes.NewBuffer(recvMsg.msg)) - if err != nil { - log.WithError(err).Errorf("Failed to decode BGP message %v\n", recvMsg.msg) - switch bgperr := err.(type) { - case packet.BGPError: - sendNotification(fsm.con, bgperr.ErrorCode, bgperr.ErrorSubCode) - sendNotification(fsm.con2, bgperr.ErrorCode, bgperr.ErrorSubCode) - } - stopTimer(fsm.connectRetryTimer) - fsm.disconnect() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, "Failed to decode BGP message") - } - - switch msg.Header.Type { - case packet.NotificationMsg: - nMsg := msg.Body.(*packet.BGPNotification) - if nMsg.ErrorCode == packet.UnsupportedVersionNumber { - stopTimer(fsm.connectRetryTimer) - fsm.con.Close() - return fsm.changeState(Idle, "Received NOTIFICATION") - } - - if nMsg.ErrorCode == packet.Cease { - // Was this connection to be closed anyways? - if fsm.dumpCon(recvMsg.con) { - continue - } - } - - return fsm.openConfirmTCPFail(fmt.Errorf("NOTIFICATION received")) - case packet.KeepaliveMsg: - fsm.holdTimer.Reset(time.Second * fsm.holdTime) - return fsm.changeState(Established, "Received KEEPALIVE") - case packet.OpenMsg: - openMsg := msg.Body.(*packet.BGPOpen) - fsm.neighborID = openMsg.BGPIdentifier - fsm.resolveCollision() - default: - sendNotification(fsm.con, packet.FiniteStateMachineError, 0) - stopTimer(fsm.connectRetryTimer) - fsm.con.Close() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, "FSM Error") - } - case err := <-fsm.msgRecvFailCh: - if err.con == fsm.con && fsm.con2 != nil { - fsm.con.Close() - fsm.con = fsm.con2 - fsm.con2 = nil - continue - } - - if err.con == fsm.con2 { - fsm.con2.Close() - fsm.con2 = nil - continue - } - return fsm.openConfirmTCPFail(err.err) - } - } -} - -func (fsm *FSM) openConfirmTCPFail(err error) int { - fsm.con.Close() - fsm.resetConnectRetryTimer() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, fmt.Sprintf("Failure: %v", err)) -} - -func (fsm *FSM) established() int { - fsm.adjRIBIn = adjRIBIn.New() - fsm.adjRIBIn.Register(fsm.rib) - - n := &routingtable.Neighbor{ - Type: route.BGPPathType, - Address: tnet.IPv4ToUint32(fsm.remote), - IBGP: fsm.remoteASN == fsm.localASN, - } - - clientOptions := routingtable.ClientOptions{ - BestOnly: true, - } - if fsm.capAddPathSend { - fsm.updateSender = newUpdateSenderAddPath(fsm) - fsm.adjRIBOut = adjRIBOutAddPath.New(n) - clientOptions = fsm.peer.addPathSend - } else { - fsm.updateSender = newUpdateSender(fsm) - fsm.adjRIBOut = adjRIBOut.New(n) - } - - fsm.adjRIBOut.Register(fsm.updateSender) - fsm.rib.RegisterWithOptions(fsm.adjRIBOut, clientOptions) - - /*go func() { - for { - if fsm.adjRIBOut == nil { - return - } - fmt.Printf("ADJ-RIB-OUT: %s\n", fsm.remote.String()) - fmt.Print(fsm.adjRIBOut.Print()) - time.Sleep(time.Second * 11) - } - }()*/ - - for { - log.Debug("Iterate established loop.") - select { - case e := <-fsm.eventCh: - if e == ManualStop { // Event 2 - sendNotification(fsm.con, packet.Cease, 0) - stopTimer(fsm.connectRetryTimer) - fsm.con.Close() - fsm.connectRetryCounter = 0 - return fsm.changeState(Idle, "Manual stop event") - } - if e == AutomaticStop { // Event 8 - sendNotification(fsm.con, packet.Cease, 0) - stopTimer(fsm.connectRetryTimer) - fsm.con.Close() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, "Automatic stop event") - } - continue - case <-fsm.holdTimer.C: - sendNotification(fsm.con, packet.HoldTimeExpired, 0) - stopTimer(fsm.connectRetryTimer) - fsm.con.Close() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, "Holdtimer expired") - case <-fsm.keepaliveTimer.C: - - err := fsm.sendKeepalive() - if err != nil { - stopTimer(fsm.connectRetryTimer) - fsm.con.Close() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, fmt.Sprintf("Failed to send keepalive: %v", err)) - } - continue - case c := <-fsm.conCh: - c.Close() - continue - case recvMsg := <-fsm.msgRecvCh: - msg, err := packet.Decode(bytes.NewBuffer(recvMsg.msg)) - if err != nil { - log.WithError(err).Errorf("Failed to decode BGP message %v\n", recvMsg.msg) - switch bgperr := err.(type) { - case packet.BGPError: - sendNotification(fsm.con, bgperr.ErrorCode, bgperr.ErrorSubCode) - } - stopTimer(fsm.connectRetryTimer) - fsm.con.Close() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, "Failed to decode BGP message") - } - switch msg.Header.Type { - case packet.NotificationMsg: - stopTimer(fsm.connectRetryTimer) - fsm.con.Close() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, "Received NOTIFICATION") - case packet.UpdateMsg: - if fsm.holdTime != 0 { - fsm.holdTimer.Reset(time.Second * fsm.holdTime) - } - - u := msg.Body.(*packet.BGPUpdate) - - for r := u.WithdrawnRoutes; r != nil; r = r.Next { - pfx := tnet.NewPfx(r.IP, r.Pfxlen) - log.WithField("Prefix", pfx.String()).Debug("LPM: Removing prefix") - fsm.adjRIBIn.RemovePath(pfx, nil) - } - - for r := u.NLRI; r != nil; r = r.Next { - pfx := tnet.NewPfx(r.IP, r.Pfxlen) - log.WithField("Prefix", pfx.String()).Debug("LPM: Adding prefix") - path := &route.Path{ - Type: route.BGPPathType, - BGPPath: &route.BGPPath{ - Source: tnet.IPv4ToUint32(fsm.remote), - }, - } - - for pa := u.PathAttributes; pa != nil; pa = pa.Next { - switch pa.TypeCode { - case packet.OriginAttr: - path.BGPPath.Origin = pa.Value.(uint8) - case packet.LocalPrefAttr: - path.BGPPath.LocalPref = pa.Value.(uint32) - case packet.MEDAttr: - path.BGPPath.MED = pa.Value.(uint32) - case packet.NextHopAttr: - log.WithField("NextHop", pa.Value.(uint32)).Debug("RECEIVED NEXT_HOP") - path.BGPPath.NextHop = pa.Value.(uint32) - case packet.ASPathAttr: - path.BGPPath.ASPath = pa.ASPathString() - path.BGPPath.ASPathLen = pa.ASPathLen() - case packet.CommunitiesAttr: - path.BGPPath.Communities = pa.CommunityString() - case packet.LargeCommunitiesAttr: - path.BGPPath.LargeCommunities = pa.LargeCommunityString() - } - } - fsm.adjRIBIn.AddPath(pfx, path) - } - - continue - case packet.KeepaliveMsg: - if fsm.holdTime != 0 { - fsm.holdTimer.Reset(time.Second * fsm.holdTime) - } - continue - case packet.OpenMsg: - if fsm.con2 != nil { - sendNotification(fsm.con2, packet.Cease, packet.ConnectionCollisionResolution) - fsm.con2.Close() - fsm.con2 = nil - continue - } - sendNotification(fsm.con, packet.FiniteStateMachineError, 0) - stopTimer(fsm.connectRetryTimer) - fsm.con.Close() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, "FSM Error") - default: - sendNotification(fsm.con, packet.FiniteStateMachineError, 0) - stopTimer(fsm.connectRetryTimer) - fsm.con.Close() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, "FSM Error") - } - case err := <-fsm.msgRecvFailCh: - if err.con == fsm.con && fsm.con2 != nil { - fsm.con.Close() - fsm.con = fsm.con2 - fsm.con2 = nil - continue - } - - if err.con == fsm.con2 { - fsm.con2.Close() - fsm.con2 = nil - continue - } - return fsm.openConfirmTCPFail(err.err) - } - } -} - -func stopTimer(t *time.Timer) { - if !t.Stop() { - select { - case <-t.C: - default: - } - } -} - -func (fsm *FSM) startConnectRetryTimer() { - fsm.connectRetryTimer = time.NewTimer(time.Second * fsm.connectRetryTime) -} - -func (fsm *FSM) resetConnectRetryTimer() { - if !fsm.connectRetryTimer.Reset(time.Second * fsm.connectRetryTime) { - <-fsm.connectRetryTimer.C - } -} - -func (fsm *FSM) resetDelayOpenTimer() { - if !fsm.delayOpenTimer.Reset(time.Second * fsm.delayOpenTime) { - <-fsm.delayOpenTimer.C - } -} - -func (fsm *FSM) sendKeepalive() error { - - msg := packet.SerializeKeepaliveMsg() - _, err := fsm.con.Write(msg) - log.WithError(err).Debug("Send keepalive") - if err != nil { - return fmt.Errorf("Unable to send KEEPALIVE message: %v", err) - } - - return nil -} - -func (fsm *FSM) sendOpen(c *net.TCPConn) error { - msg := packet.SerializeOpenMsg(&packet.BGPOpen{ - Version: BGPVersion, - AS: fsm.localASN, - HoldTime: uint16(fsm.holdTimeConfigured), - BGPIdentifier: fsm.routerID, - OptParams: fsm.peer.optOpenParams, - }) - - _, err := c.Write(msg) - if err != nil { - return fmt.Errorf("Unable to send OPEN message: %v", err) - } - - return nil -} - -func sendNotification(c *net.TCPConn, errorCode uint8, errorSubCode uint8) error { - if c == nil { - return fmt.Errorf("connection is nil") - } - - msg := packet.SerializeNotificationMsg(&packet.BGPNotification{}) - - _, err := c.Write(msg) - if err != nil { - return fmt.Errorf("Unable to send NOTIFICATION message: %v", err) - } - - return nil -} diff --git a/protocols/bgp/server/fsm2.go b/protocols/bgp/server/fsm2.go new file mode 100644 index 0000000000000000000000000000000000000000..6e1682c71b7aaf47ff2858a30b199fc07cb43297 --- /dev/null +++ b/protocols/bgp/server/fsm2.go @@ -0,0 +1,267 @@ +package server + +import ( + "fmt" + "net" + "sync" + "time" + + "github.com/bio-routing/bio-rd/protocols/bgp/packet" + "github.com/bio-routing/bio-rd/routingtable" + log "github.com/sirupsen/logrus" +) + +const ( + // Administrative events + ManualStart = 1 + ManualStop = 2 + AutomaticStart = 3 + ManualStartWithPassiveTcpEstablishment = 4 + AutomaticStartWithPassiveTcpEstablishment = 5 + AutomaticStop = 8 + Cease = 100 +) + +type state interface { + run() (state, string) +} + +// FSM implements the BGP finite state machine (RFC4271) +type FSM struct { + peer *Peer + eventCh chan int + con net.Conn + conCh chan net.Conn + initiateCon chan struct{} + conErrCh chan error + + delayOpen bool + delayOpenTime time.Duration + delayOpenTimer *time.Timer + + connectRetryTime time.Duration + connectRetryTimer *time.Timer + connectRetryCounter int + + holdTime time.Duration + holdTimer *time.Timer + + keepaliveTime time.Duration + keepaliveTimer *time.Timer + + msgRecvCh chan []byte + msgRecvFailCh chan error + stopMsgRecvCh chan struct{} + + capAddPathSend bool + capAddPathRecv bool + + local net.IP + + ribsInitialized bool + adjRIBIn routingtable.RouteTableClient + adjRIBOut routingtable.RouteTableClient + rib routingtable.RouteTableClient + updateSender routingtable.RouteTableClient + + neighborID uint32 + state state + stateMu sync.RWMutex + reason string + active bool +} + +// NewPassiveFSM2 initiates a new passive FSM +func NewPassiveFSM2(peer *Peer, con *net.TCPConn) *FSM { + fsm := newFSM2(peer) + fsm.con = con + fsm.state = newIdleState(fsm) + return fsm +} + +// NewActiveFSM2 initiates a new passive FSM +func NewActiveFSM2(peer *Peer) *FSM { + fsm := newFSM2(peer) + fsm.active = true + fsm.state = newIdleState(fsm) + return fsm +} + +func newFSM2(peer *Peer) *FSM { + return &FSM{ + connectRetryTime: time.Minute, + peer: peer, + eventCh: make(chan int), + conCh: make(chan net.Conn), + conErrCh: make(chan error), + initiateCon: make(chan struct{}), + msgRecvCh: make(chan []byte), + msgRecvFailCh: make(chan error), + stopMsgRecvCh: make(chan struct{}), + rib: peer.rib, + } +} + +func (fsm *FSM) start() { + go fsm.run() + go fsm.tcpConnector() + return +} + +func (fsm *FSM) activate() { + fsm.eventCh <- AutomaticStart +} + +func (fsm *FSM) run() { + next, reason := fsm.state.run() + for { + newState := stateName(next) + oldState := stateName(fsm.state) + + if oldState != newState { + log.WithFields(log.Fields{ + "peer": fsm.peer.addr.String(), + "last_state": oldState, + "new_state": newState, + "reason": reason, + }).Info("FSM: Neighbor state change") + } + + if newState == "cease" { + return + } + + fsm.stateMu.Lock() + fsm.state = next + fsm.stateMu.Unlock() + + next, reason = fsm.state.run() + } +} + +func stateName(s state) string { + switch s.(type) { + case *idleState: + return "idle" + case *connectState: + return "connect" + case *activeState: + return "active" + case *openSentState: + return "openSent" + case *openConfirmState: + return "openConfirm" + case *establishedState: + return "established" + case *ceaseState: + return "cease" + default: + panic(fmt.Sprintf("Unknown state: %v", s)) + } +} + +func (fsm *FSM) cease() { + fsm.eventCh <- Cease +} + +func (fsm *FSM) tcpConnector() error { + for { + select { + case <-fsm.initiateCon: + c, err := net.DialTCP("tcp", &net.TCPAddr{IP: fsm.local}, &net.TCPAddr{IP: fsm.peer.addr, Port: BGPPORT}) + if err != nil { + select { + case fsm.conErrCh <- err: + continue + case <-time.NewTimer(time.Second * 30).C: + continue + } + } + + select { + case fsm.conCh <- c: + continue + case <-time.NewTimer(time.Second * 30).C: + c.Close() + continue + } + } + } +} + +func (fsm *FSM) tcpConnect() { + fsm.initiateCon <- struct{}{} +} + +func (fsm *FSM) msgReceiver() error { + for { + msg, err := recvMsg(fsm.con) + if err != nil { + fsm.msgRecvFailCh <- err + return nil + } + fsm.msgRecvCh <- msg + } +} + +func (fsm *FSM) startConnectRetryTimer() { + fsm.connectRetryTimer = time.NewTimer(fsm.connectRetryTime) +} + +func (fsm *FSM) resetConnectRetryTimer() { + if !fsm.connectRetryTimer.Reset(fsm.connectRetryTime) { + <-fsm.connectRetryTimer.C + } +} + +func (fsm *FSM) resetConnectRetryCounter() { + fsm.connectRetryCounter = 0 +} + +func (fsm *FSM) sendOpen() error { + msg := packet.SerializeOpenMsg(&packet.BGPOpen{ + Version: BGPVersion, + AS: uint16(fsm.peer.localASN), + HoldTime: uint16(fsm.peer.holdTime / time.Second), + BGPIdentifier: fsm.peer.server.routerID, + OptParams: fsm.peer.optOpenParams, + }) + + _, err := fsm.con.Write(msg) + if err != nil { + return fmt.Errorf("Unable to send OPEN message: %v", err) + } + + return nil +} + +func (fsm *FSM) sendNotification(errorCode uint8, errorSubCode uint8) error { + msg := packet.SerializeNotificationMsg(&packet.BGPNotification{}) + + _, err := fsm.con.Write(msg) + if err != nil { + return fmt.Errorf("Unable to send NOTIFICATION message: %v", err) + } + + return nil +} + +func (fsm *FSM) sendKeepalive() error { + msg := packet.SerializeKeepaliveMsg() + + _, err := fsm.con.Write(msg) + if err != nil { + return fmt.Errorf("Unable to send KEEPALIVE message: %v", err) + } + + return nil +} + +func stopTimer(t *time.Timer) { + if !t.Stop() { + select { + case <-t.C: + default: + } + } +} diff --git a/protocols/bgp/server/fsm_active.go b/protocols/bgp/server/fsm_active.go new file mode 100644 index 0000000000000000000000000000000000000000..a84f910d945b473f655465239ae1b08c61d94ebc --- /dev/null +++ b/protocols/bgp/server/fsm_active.go @@ -0,0 +1,68 @@ +package server + +import ( + "fmt" + "net" + "time" +) + +type activeState struct { + fsm *FSM +} + +func newActiveState(fsm *FSM) *activeState { + return &activeState{ + fsm: fsm, + } +} + +func (s activeState) run() (state, string) { + for { + select { + case e := <-s.fsm.eventCh: + switch e { + case ManualStop: + return s.manualStop() + case Cease: + return s.cease() + default: + continue + } + case <-s.fsm.connectRetryTimer.C: + return s.connectRetryTimerExpired() + case c := <-s.fsm.conCh: + return s.connectionSuccess(c) + } + } +} + +func (s *activeState) manualStop() (state, string) { + s.fsm.con.Close() + s.fsm.resetConnectRetryCounter() + stopTimer(s.fsm.connectRetryTimer) + return newIdleState(s.fsm), "Manual stop event" +} + +func (s *activeState) cease() (state, string) { + s.fsm.con.Close() + return newCeaseState(), "Cease" +} + +func (s *activeState) connectRetryTimerExpired() (state, string) { + s.fsm.resetConnectRetryTimer() + s.fsm.tcpConnect() + return newConnectState(s.fsm), "Connect retry timer expired" +} + +func (s *activeState) connectionSuccess(con net.Conn) (state, string) { + s.fsm.con = con + stopTimer(s.fsm.connectRetryTimer) + err := s.fsm.sendOpen() + if err != nil { + s.fsm.resetConnectRetryTimer() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), fmt.Sprintf("Sending OPEN message failed: %v", err) + } + s.fsm.holdTimer = time.NewTimer(time.Minute * 4) + return newOpenSentState(s.fsm), "Sent OPEN message" +} diff --git a/protocols/bgp/server/fsm_cease.go b/protocols/bgp/server/fsm_cease.go new file mode 100644 index 0000000000000000000000000000000000000000..7d239840bbda1e5fe16418f410cb413e5357b36f --- /dev/null +++ b/protocols/bgp/server/fsm_cease.go @@ -0,0 +1,12 @@ +package server + +type ceaseState struct { +} + +func newCeaseState() *ceaseState { + return &ceaseState{} +} + +func (s ceaseState) run() (state, string) { + return newCeaseState(), "Loop" +} diff --git a/protocols/bgp/server/fsm_connect.go b/protocols/bgp/server/fsm_connect.go new file mode 100644 index 0000000000000000000000000000000000000000..70142ea9702d2673f6c9a2450fc1ec2fb7f50c76 --- /dev/null +++ b/protocols/bgp/server/fsm_connect.go @@ -0,0 +1,60 @@ +package server + +import ( + "fmt" + "net" + "time" +) + +type connectState struct { + fsm *FSM +} + +func newConnectState(fsm *FSM) *connectState { + return &connectState{ + fsm: fsm, + } +} + +func (s connectState) run() (state, string) { + for { + select { + case e := <-s.fsm.eventCh: + switch e { + case ManualStop: + return s.manualStop() + case Cease: + return newCeaseState(), "Cease" + default: + continue + } + case <-s.fsm.connectRetryTimer.C: + s.connectRetryTimerExpired() + continue + case c := <-s.fsm.conCh: + return s.connectionSuccess(c) + } + } +} + +func (s *connectState) connectionSuccess(c net.Conn) (state, string) { + s.fsm.con = c + stopTimer(s.fsm.connectRetryTimer) + err := s.fsm.sendOpen() + if err != nil { + return newIdleState(s.fsm), fmt.Sprintf("Unable to send open: %v", err) + } + s.fsm.holdTimer = time.NewTimer(time.Minute * 4) + return newOpenSentState(s.fsm), "TCP connection succeeded" +} + +func (s *connectState) connectRetryTimerExpired() { + s.fsm.resetConnectRetryTimer() + s.fsm.tcpConnect() +} + +func (s *connectState) manualStop() (state, string) { + s.fsm.resetConnectRetryCounter() + stopTimer(s.fsm.connectRetryTimer) + return newIdleState(s.fsm), "Manual stop event" +} diff --git a/protocols/bgp/server/fsm_established.go b/protocols/bgp/server/fsm_established.go new file mode 100644 index 0000000000000000000000000000000000000000..87ef4ba1f935e8ae8068c2a63ce0789eb7db0bf7 --- /dev/null +++ b/protocols/bgp/server/fsm_established.go @@ -0,0 +1,232 @@ +package server + +import ( + "bytes" + "fmt" + + tnet "github.com/bio-routing/bio-rd/net" + "github.com/bio-routing/bio-rd/protocols/bgp/packet" + "github.com/bio-routing/bio-rd/route" + "github.com/bio-routing/bio-rd/routingtable" + "github.com/bio-routing/bio-rd/routingtable/adjRIBIn" + "github.com/bio-routing/bio-rd/routingtable/adjRIBOut" + "github.com/bio-routing/bio-rd/routingtable/adjRIBOutAddPath" +) + +type establishedState struct { + fsm *FSM +} + +func newEstablishedState(fsm *FSM) *establishedState { + return &establishedState{ + fsm: fsm, + } +} + +func (s establishedState) run() (state, string) { + if !s.fsm.ribsInitialized { + s.init() + } + + for { + select { + case e := <-s.fsm.eventCh: + switch e { + case ManualStop: + return s.manualStop() + case AutomaticStop: + return s.automaticStop() + case Cease: + return s.cease() + default: + continue + } + case <-s.fsm.holdTimer.C: + return s.holdTimerExpired() + case <-s.fsm.keepaliveTimer.C: + return s.keepaliveTimerExpired() + case recvMsg := <-s.fsm.msgRecvCh: + return s.msgReceived(recvMsg) + } + } +} + +func (s *establishedState) init() { + s.fsm.adjRIBIn = adjRIBIn.New() + s.fsm.adjRIBIn.Register(s.fsm.rib) + + n := &routingtable.Neighbor{ + Type: route.BGPPathType, + Address: tnet.IPv4ToUint32(s.fsm.peer.addr), + } + + clientOptions := routingtable.ClientOptions{} + if s.fsm.capAddPathSend { + s.fsm.updateSender = newUpdateSenderAddPath(s.fsm) + s.fsm.adjRIBOut = adjRIBOutAddPath.New(n) + clientOptions = s.fsm.peer.addPathSend + } else { + s.fsm.updateSender = newUpdateSender(s.fsm) + s.fsm.adjRIBOut = adjRIBOut.New(n) + } + + s.fsm.adjRIBOut.Register(s.fsm.updateSender) + s.fsm.rib.RegisterWithOptions(s.fsm.adjRIBOut, clientOptions) + + s.fsm.ribsInitialized = true +} + +func (s *establishedState) uninit() { + s.fsm.adjRIBOut.Unregister(s.fsm.updateSender) + s.fsm.rib.Unregister(s.fsm.adjRIBOut) + s.fsm.adjRIBIn.Unregister(s.fsm.rib) + s.fsm.ribsInitialized = false +} + +func (s *establishedState) manualStop() (state, string) { + s.fsm.sendNotification(packet.Cease, 0) + s.uninit() + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.connectRetryCounter = 0 + return newIdleState(s.fsm), "Manual stop event" +} + +func (s *establishedState) automaticStop() (state, string) { + s.fsm.sendNotification(packet.Cease, 0) + s.uninit() + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), "Automatic stop event" +} + +func (s *establishedState) cease() (state, string) { + s.fsm.sendNotification(packet.Cease, 0) + s.uninit() + s.fsm.con.Close() + return newCeaseState(), "Cease" +} + +func (s *establishedState) holdTimerExpired() (state, string) { + s.fsm.sendNotification(packet.HoldTimeExpired, 0) + s.uninit() + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), "Holdtimer expired" +} + +func (s *establishedState) keepaliveTimerExpired() (state, string) { + err := s.fsm.sendKeepalive() + if err != nil { + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), fmt.Sprintf("Failed to send keepalive: %v", err) + } + + s.fsm.keepaliveTimer.Reset(s.fsm.keepaliveTime) + return newEstablishedState(s.fsm), s.fsm.reason +} + +func (s *establishedState) msgReceived(data []byte) (state, string) { + msg, err := packet.Decode(bytes.NewBuffer(data)) + if err != nil { + switch bgperr := err.(type) { + case packet.BGPError: + s.fsm.sendNotification(bgperr.ErrorCode, bgperr.ErrorSubCode) + } + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), "Failed to decode BGP message" + } + switch msg.Header.Type { + case packet.NotificationMsg: + return s.notification() + case packet.UpdateMsg: + return s.update(msg) + case packet.KeepaliveMsg: + return s.keepaliveReceived() + default: + return s.unexpectedMessage() + } +} + +func (s *establishedState) notification() (state, string) { + stopTimer(s.fsm.connectRetryTimer) + s.uninit() + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), "Received NOTIFICATION" +} + +func (s *establishedState) update(msg *packet.BGPMessage) (state, string) { + if s.fsm.holdTime != 0 { + s.fsm.holdTimer.Reset(s.fsm.holdTime) + } + + u := msg.Body.(*packet.BGPUpdate) + s.withdraws(u) + s.updates(u) + + return newEstablishedState(s.fsm), s.fsm.reason +} + +func (s *establishedState) withdraws(u *packet.BGPUpdate) { + for r := u.WithdrawnRoutes; r != nil; r = r.Next { + pfx := tnet.NewPfx(r.IP, r.Pfxlen) + s.fsm.adjRIBIn.RemovePath(pfx, nil) + } +} + +func (s *establishedState) updates(u *packet.BGPUpdate) { + for r := u.NLRI; r != nil; r = r.Next { + pfx := tnet.NewPfx(r.IP, r.Pfxlen) + + path := &route.Path{ + Type: route.BGPPathType, + BGPPath: &route.BGPPath{ + Source: tnet.IPv4ToUint32(s.fsm.peer.addr), + }, + } + + for pa := u.PathAttributes; pa != nil; pa = pa.Next { + switch pa.TypeCode { + case packet.OriginAttr: + path.BGPPath.Origin = pa.Value.(uint8) + case packet.LocalPrefAttr: + path.BGPPath.LocalPref = pa.Value.(uint32) + case packet.MEDAttr: + path.BGPPath.MED = pa.Value.(uint32) + case packet.NextHopAttr: + path.BGPPath.NextHop = pa.Value.(uint32) + case packet.ASPathAttr: + path.BGPPath.ASPath = pa.ASPathString() + path.BGPPath.ASPathLen = pa.ASPathLen() + case packet.CommunitiesAttr: + path.BGPPath.Communities = pa.CommunityString() + case packet.LargeCommunitiesAttr: + path.BGPPath.LargeCommunities = pa.LargeCommunityString() + } + } + s.fsm.adjRIBIn.AddPath(pfx, path) + } +} + +func (s *establishedState) keepaliveReceived() (state, string) { + if s.fsm.holdTime != 0 { + s.fsm.holdTimer.Reset(s.fsm.holdTime) + } + return newEstablishedState(s.fsm), s.fsm.reason +} + +func (s *establishedState) unexpectedMessage() (state, string) { + s.fsm.sendNotification(packet.FiniteStateMachineError, 0) + s.uninit() + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), "FSM Error" +} diff --git a/protocols/bgp/server/fsm_idle.go b/protocols/bgp/server/fsm_idle.go new file mode 100644 index 0000000000000000000000000000000000000000..4480bb53579d46147a0a84e69cc17126ea6bf1ea --- /dev/null +++ b/protocols/bgp/server/fsm_idle.go @@ -0,0 +1,54 @@ +package server + +import ( + "time" +) + +type idleState struct { + fsm *FSM + newStateReason string +} + +func newIdleState(fsm *FSM) *idleState { + return &idleState{ + fsm: fsm, + } +} + +func (s idleState) run() (state, string) { + if s.fsm.peer.reconnectInterval != 0 { + time.Sleep(s.fsm.peer.reconnectInterval) + go s.fsm.activate() + } + for { + event := <-s.fsm.eventCh + switch event { + case ManualStart: + return s.manualStart() + case AutomaticStart: + return s.automaticStart() + case Cease: + return newCeaseState(), "Cease" + default: + continue + } + } +} + +func (s *idleState) manualStart() (state, string) { + s.newStateReason = "Received ManualStart event" + return s.start() +} + +func (s *idleState) automaticStart() (state, string) { + s.newStateReason = "Received AutomaticStart event" + return s.start() +} + +func (s *idleState) start() (state, string) { + s.fsm.resetConnectRetryCounter() + s.fsm.startConnectRetryTimer() + go s.fsm.tcpConnect() + + return newConnectState(s.fsm), s.newStateReason +} diff --git a/protocols/bgp/server/fsm_manager.go b/protocols/bgp/server/fsm_manager.go new file mode 100644 index 0000000000000000000000000000000000000000..2ed78f3958805e7a9b6eac6075adafb7d196aa26 --- /dev/null +++ b/protocols/bgp/server/fsm_manager.go @@ -0,0 +1,25 @@ +package server + +import "net" + +type fsmManager struct { + fsms map[string][]*FSM +} + +func newFSMManager() *fsmManager { + return &fsmManager{ + fsms: make(map[string][]*FSM, 0), + } +} + +func (m *fsmManager) resolveCollision(addr net.IP) { + +} + +func (m *fsmManager) newFSMPassive() *FSM { + return &FSM{} +} + +func (m *fsmManager) newFSMActive() *FSM { + return &FSM{} +} diff --git a/protocols/bgp/server/fsm_open_confirm.go b/protocols/bgp/server/fsm_open_confirm.go new file mode 100644 index 0000000000000000000000000000000000000000..d2d25e4f533ec30d9a5d15aec25504d055c45f54 --- /dev/null +++ b/protocols/bgp/server/fsm_open_confirm.go @@ -0,0 +1,128 @@ +package server + +import ( + "bytes" + "fmt" + + "github.com/bio-routing/bio-rd/protocols/bgp/packet" +) + +type openConfirmState struct { + fsm *FSM +} + +func newOpenConfirmState(fsm *FSM) *openConfirmState { + return &openConfirmState{ + fsm: fsm, + } +} + +func (s openConfirmState) run() (state, string) { + for { + select { + case e := <-s.fsm.eventCh: + switch e { + case ManualStop: + return s.manualStop() + case Cease: + return s.cease() + default: + continue + } + case <-s.fsm.holdTimer.C: + return s.holdTimerExpired() + case <-s.fsm.keepaliveTimer.C: + return s.keepaliveTimerExpired() + case recvMsg := <-s.fsm.msgRecvCh: + return s.msgReceived(recvMsg) + } + } +} + +func (s *openConfirmState) manualStop() (state, string) { + s.fsm.sendNotification(packet.Cease, 0) + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.resetConnectRetryCounter() + return newIdleState(s.fsm), "Manual stop event" +} + +func (s *openConfirmState) automaticStop() (state, string) { + s.fsm.sendNotification(packet.Cease, 0) + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), "Automatic stop event" +} + +func (s *openConfirmState) cease() (state, string) { + s.fsm.sendNotification(packet.Cease, 0) + s.fsm.con.Close() + return newCeaseState(), "Cease" +} + +func (s *openConfirmState) holdTimerExpired() (state, string) { + s.fsm.sendNotification(packet.HoldTimeExpired, 0) + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), "Holdtimer expired" +} + +func (s *openConfirmState) keepaliveTimerExpired() (state, string) { + err := s.fsm.sendKeepalive() + if err != nil { + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), fmt.Sprintf("Failed to send keepalive: %v", err) + } + s.fsm.keepaliveTimer.Reset(s.fsm.keepaliveTime) + return newOpenConfirmState(s.fsm), s.fsm.reason +} + +func (s *openConfirmState) msgReceived(data []byte) (state, string) { + msg, err := packet.Decode(bytes.NewBuffer(data)) + if err != nil { + switch bgperr := err.(type) { + case packet.BGPError: + s.fsm.sendNotification(bgperr.ErrorCode, bgperr.ErrorSubCode) + } + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), fmt.Sprintf("Failed to decode BGP message: %v", err) + } + switch msg.Header.Type { + case packet.NotificationMsg: + return s.notification(msg) + case packet.KeepaliveMsg: + return s.keepaliveReceived() + default: + return s.unexpectedMessage() + } +} + +func (s *openConfirmState) notification(msg *packet.BGPMessage) (state, string) { + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + nMsg := msg.Body.(*packet.BGPNotification) + if nMsg.ErrorCode != packet.UnsupportedVersionNumber { + s.fsm.connectRetryCounter++ + } + + return newIdleState(s.fsm), "Received NOTIFICATION" +} + +func (s *openConfirmState) keepaliveReceived() (state, string) { + s.fsm.holdTimer.Reset(s.fsm.holdTime) + return newEstablishedState(s.fsm), "Received KEEPALIVE" +} + +func (s *openConfirmState) unexpectedMessage() (state, string) { + s.fsm.sendNotification(packet.FiniteStateMachineError, 0) + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), "FSM Error" +} diff --git a/protocols/bgp/server/fsm_open_sent.go b/protocols/bgp/server/fsm_open_sent.go new file mode 100644 index 0000000000000000000000000000000000000000..63680636d8e0c65afe1dad9ec9d1280c34e9c37d --- /dev/null +++ b/protocols/bgp/server/fsm_open_sent.go @@ -0,0 +1,196 @@ +package server + +import ( + "bytes" + "fmt" + "math" + "time" + + "github.com/bio-routing/bio-rd/protocols/bgp/packet" +) + +type openSentState struct { + fsm *FSM +} + +func newOpenSentState(fsm *FSM) *openSentState { + return &openSentState{ + fsm: fsm, + } +} + +func (s openSentState) run() (state, string) { + go s.fsm.msgReceiver() + for { + select { + case e := <-s.fsm.eventCh: + switch e { + case ManualStop: + return s.manualStop() + case AutomaticStop: + return s.automaticStop() + case Cease: + return s.cease() + default: + continue + } + case <-s.fsm.holdTimer.C: + return s.holdTimerExpired() + case recvMsg := <-s.fsm.msgRecvCh: + return s.msgReceived(recvMsg) + } + } +} + +func (s *openSentState) manualStop() (state, string) { + s.fsm.sendNotification(packet.Cease, 0) + s.fsm.resetConnectRetryTimer() + s.fsm.con.Close() + s.fsm.resetConnectRetryCounter() + return newIdleState(s.fsm), "Manual stop event" +} + +func (s *openSentState) automaticStop() (state, string) { + s.fsm.sendNotification(packet.Cease, 0) + s.fsm.resetConnectRetryTimer() + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), "Automatic stop event" +} + +func (s *openSentState) cease() (state, string) { + s.fsm.sendNotification(packet.Cease, 0) + s.fsm.con.Close() + return newCeaseState(), "Cease" +} + +func (s *openSentState) holdTimerExpired() (state, string) { + s.fsm.sendNotification(packet.HoldTimeExpired, 0) + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), "Holdtimer expired" +} + +func (s *openSentState) msgReceived(data []byte) (state, string) { + msg, err := packet.Decode(bytes.NewBuffer(data)) + if err != nil { + switch bgperr := err.(type) { + case packet.BGPError: + s.fsm.sendNotification(bgperr.ErrorCode, bgperr.ErrorSubCode) + } + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), fmt.Sprintf("Failed to decode BGP message: %v", err) + } + switch msg.Header.Type { + case packet.NotificationMsg: + return s.notification(msg) + case packet.OpenMsg: + return s.openMsgReceived(msg) + default: + return s.unexpectedMessage() + } +} + +func (s *openSentState) unexpectedMessage() (state, string) { + s.fsm.sendNotification(packet.FiniteStateMachineError, 0) + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), "FSM Error" +} + +func (s *openSentState) openMsgReceived(msg *packet.BGPMessage) (state, string) { + openMsg := msg.Body.(*packet.BGPOpen) + s.fsm.neighborID = openMsg.BGPIdentifier + stopTimer(s.fsm.connectRetryTimer) + if s.fsm.peer.collisionHandling(s.fsm) { + return s.cease() + } + err := s.fsm.sendKeepalive() + if err != nil { + return s.tcpFailure() + } + + s.fsm.holdTime = time.Duration(math.Min(float64(s.fsm.peer.holdTime), float64(time.Duration(openMsg.HoldTime)*time.Second))) + if s.fsm.holdTime != 0 { + if !s.fsm.holdTimer.Reset(s.fsm.holdTime) { + <-s.fsm.holdTimer.C + } + s.fsm.keepaliveTime = s.fsm.holdTime / 3 + s.fsm.keepaliveTimer = time.NewTimer(s.fsm.keepaliveTime) + } + + s.processOpenOptions(openMsg.OptParams) + return newOpenConfirmState(s.fsm), "Received OPEN message" +} + +func (s *openSentState) tcpFailure() (state, string) { + s.fsm.con.Close() + s.fsm.resetConnectRetryTimer() + return newActiveState(s.fsm), "TCP connection failure" +} + +func (s *openSentState) processOpenOptions(optParams []packet.OptParam) { + for _, optParam := range optParams { + if optParam.Type != packet.CapabilitiesParamType { + continue + } + + s.processCapabilities(optParam.Value.(packet.Capabilities)) + } +} + +func (s *openSentState) processCapabilities(caps packet.Capabilities) { + for _, cap := range caps { + s.processCapability(cap) + } +} + +func (s *openSentState) processCapability(cap packet.Capability) { + switch cap.Code { + case packet.AddPathCapabilityCode: + s.processAddPathCapability(cap.Value.(packet.AddPathCapability)) + + } +} + +func (s *openSentState) processAddPathCapability(addPathCap packet.AddPathCapability) { + if addPathCap.AFI != 1 { + return + } + if addPathCap.SAFI != 1 { + return + } + + switch addPathCap.SendReceive { + case packet.AddPathReceive: + if !s.fsm.peer.addPathSend.BestOnly { + s.fsm.capAddPathSend = true + } + case packet.AddPathSend: + if s.fsm.peer.addPathRecv { + s.fsm.capAddPathRecv = true + } + case packet.AddPathSendReceive: + if !s.fsm.peer.addPathSend.BestOnly { + s.fsm.capAddPathSend = true + } + if s.fsm.peer.addPathRecv { + s.fsm.capAddPathRecv = true + } + } +} + +func (s *openSentState) notification(msg *packet.BGPMessage) (state, string) { + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + nMsg := msg.Body.(*packet.BGPNotification) + if nMsg.ErrorCode != packet.UnsupportedVersionNumber { + s.fsm.connectRetryCounter++ + } + + return newIdleState(s.fsm), "Received NOTIFICATION" +} diff --git a/protocols/bgp/server/peer.go b/protocols/bgp/server/peer.go index 5077aaeff37e7634f0e2d12bb4988d30f1f52775..26f183ce7b7572f71a6edc6982122eccf88fccf2 100644 --- a/protocols/bgp/server/peer.go +++ b/protocols/bgp/server/peer.go @@ -2,39 +2,102 @@ package server import ( "net" + "sync" + "time" "github.com/bio-routing/bio-rd/config" "github.com/bio-routing/bio-rd/protocols/bgp/packet" "github.com/bio-routing/bio-rd/routingtable" - - "time" + "github.com/bio-routing/bio-rd/routingtable/filter" ) type Peer struct { + server *BGPServer addr net.IP - asn uint32 - fsm *FSM + peerASN uint32 + localASN uint32 + fsms []*FSM + fsmsMu sync.Mutex rib routingtable.RouteTableClient routerID uint32 addPathSend routingtable.ClientOptions addPathRecv bool - optOpenParams []packet.OptParam reconnectInterval time.Duration + keepaliveTime time.Duration + holdTime time.Duration + optOpenParams []packet.OptParam + importFilter *filter.Filter + exportFilter *filter.Filter +} + +func (p *Peer) collisionHandling(callingFSM *FSM) bool { + p.fsmsMu.Lock() + defer p.fsmsMu.Unlock() + + for _, fsm := range p.fsms { + if callingFSM == fsm { + continue + } + + fsm.stateMu.RLock() + isEstablished := isEstablishedState(fsm.state) + isOpenConfirm := isOpenConfirmState(fsm.state) + fsm.stateMu.RUnlock() + + if isEstablished { + return true + } + + if !isOpenConfirm { + continue + } + + if p.routerID < callingFSM.neighborID { + fsm.cease() + } else { + return true + } + } + + return false +} + +func isOpenConfirmState(s state) bool { + switch s.(type) { + case openConfirmState: + return true + } + + return false +} + +func isEstablishedState(s state) bool { + switch s.(type) { + case establishedState: + return true + } + + return false } // NewPeer creates a new peer with the given config. If an connection is established, the adjRIBIN of the peer is connected // to the given rib. To actually connect the peer, call Start() on the returned peer. -func NewPeer(c config.Peer, rib routingtable.RouteTableClient) (*Peer, error) { +func NewPeer(c config.Peer, rib routingtable.RouteTableClient, server *BGPServer) (*Peer, error) { p := &Peer{ + server: server, addr: c.PeerAddress, - asn: c.PeerAS, + peerASN: c.PeerAS, + localASN: c.LocalAS, + fsms: make([]*FSM, 0), rib: rib, addPathSend: c.AddPathSend, addPathRecv: c.AddPathRecv, - optOpenParams: make([]packet.OptParam, 0), reconnectInterval: c.ReconnectInterval, + keepaliveTime: c.KeepAlive, + holdTime: c.HoldTime, + optOpenParams: make([]packet.OptParam, 0), } - p.fsm = NewFSM(p, c, rib) + p.fsms = append(p.fsms, NewActiveFSM2(p)) caps := make([]packet.Capability, 0) @@ -72,24 +135,6 @@ func (p *Peer) GetAddr() net.IP { return p.addr } -// GetASN returns the configured AS number of the peer -func (p *Peer) GetASN() uint32 { - return p.asn -} - -// Start the peers fsm. It starts from the Idle state and will get an ManualStart event. To trigger -// reconnects if the fsm falls back into the Idle state, every reconnectInterval a ManualStart event is send. -// The default value for reconnectInterval is 30 seconds. func (p *Peer) Start() { - p.fsm.start() - if p.reconnectInterval == 0 { - p.reconnectInterval = 30 * time.Second - } - t := time.Tick(p.reconnectInterval) - go func() { - for { - <-t - p.fsm.activate() - } - }() + p.fsms[0].start() } diff --git a/protocols/bgp/server/server.go b/protocols/bgp/server/server.go index a6025140fd62f81d8fa503aeb0c31114a7589f2b..fef8b4e419d3b071ee9235620f85a0f0254d83f0 100644 --- a/protocols/bgp/server/server.go +++ b/protocols/bgp/server/server.go @@ -77,8 +77,16 @@ func (b *BGPServer) incomingConnectionWorker() { }).Info("Incoming TCP connection") log.WithField("Peer", peerAddr).Debug("Sending incoming TCP connection to fsm for peer") - b.peers[peerAddr].fsm.conCh <- c - log.Debug("Sending done") + fsm := NewActiveFSM2(b.peers[peerAddr]) + fsm.state = newActiveState(fsm) + fsm.startConnectRetryTimer() + + b.peers[peerAddr].fsmsMu.Lock() + b.peers[peerAddr].fsms = append(b.peers[peerAddr].fsms, fsm) + b.peers[peerAddr].fsmsMu.Unlock() + + go fsm.run() + fsm.conCh <- c } } @@ -87,7 +95,7 @@ func (b *BGPServer) AddPeer(c config.Peer, rib routingtable.RouteTableClient) er return fmt.Errorf("32bit ASNs are not supported yet") } - peer, err := NewPeer(c, rib) + peer, err := NewPeer(c, rib, b) if err != nil { return err } @@ -100,7 +108,7 @@ func (b *BGPServer) AddPeer(c config.Peer, rib routingtable.RouteTableClient) er return nil } -func recvMsg(c *net.TCPConn) (msg []byte, err error) { +func recvMsg(c net.Conn) (msg []byte, err error) { buffer := make([]byte, packet.MaxLen) _, err = io.ReadFull(c, buffer[0:packet.MinLen]) if err != nil { diff --git a/protocols/bgp/server/update_helper.go b/protocols/bgp/server/update_helper.go index 4a50f1d7ef6903c415a99108f8540fc5d0b40bcd..fdb73fee2aead719b76a7bef7ba9209e46891857 100644 --- a/protocols/bgp/server/update_helper.go +++ b/protocols/bgp/server/update_helper.go @@ -11,7 +11,7 @@ import ( ) func pathAttribues(p *route.Path, fsm *FSM) (*packet.PathAttribute, error) { - asPathPA, err := packet.ParseASPathStr(strings.TrimRight(fmt.Sprintf("%d %s", fsm.localASN, p.BGPPath.ASPath), " ")) + asPathPA, err := packet.ParseASPathStr(strings.TrimRight(fmt.Sprintf("%d %s", fsm.peer.localASN, p.BGPPath.ASPath), " ")) if err != nil { return nil, fmt.Errorf("Unable to parse AS path: %v", err) } diff --git a/protocols/bgp/server/update_sender.go b/protocols/bgp/server/update_sender.go index 6c6b7dfd152c0cf387055e8e86a5d9fc03078000..76c4eadc827ab809af23d09df9e809f1ed4b4a47 100644 --- a/protocols/bgp/server/update_sender.go +++ b/protocols/bgp/server/update_sender.go @@ -22,7 +22,7 @@ type UpdateSender struct { func newUpdateSender(fsm *FSM) *UpdateSender { return &UpdateSender{ fsm: fsm, - iBGP: fsm.localASN == fsm.remoteASN, + iBGP: fsm.peer.localASN == fsm.peer.peerASN, } } diff --git a/protocols/bgp/server/update_sender_add_path.go b/protocols/bgp/server/update_sender_add_path.go index 5c9f545720e797309c80039a8ce466be2e72878b..0dac801374e167c3bd5e8a38304ba12f8af74f3c 100644 --- a/protocols/bgp/server/update_sender_add_path.go +++ b/protocols/bgp/server/update_sender_add_path.go @@ -19,13 +19,14 @@ type UpdateSenderAddPath struct { func newUpdateSenderAddPath(fsm *FSM) *UpdateSenderAddPath { return &UpdateSenderAddPath{ fsm: fsm, - iBGP: fsm.localASN == fsm.remoteASN, + iBGP: fsm.peer.localASN == fsm.peer.peerASN, } } // AddPath serializes a new path and sends out a BGP update message func (u *UpdateSenderAddPath) AddPath(pfx net.Prefix, p *route.Path) error { pathAttrs, err := pathAttribues(p, u.fsm) + if err != nil { log.Errorf("Unable to create BGP Update: %v", err) return nil @@ -49,6 +50,6 @@ func (u *UpdateSenderAddPath) RemovePath(pfx net.Prefix, p *route.Path) bool { // UpdateNewClient does nothing func (u *UpdateSenderAddPath) UpdateNewClient(client routingtable.RouteTableClient) error { - log.Warningf("BGP Update Sender: RemovePath not implemented") + log.Warningf("BGP Update Sender: UpdateNewClient not implemented") return nil } diff --git a/routingtable/client_manager.go b/routingtable/client_manager.go index f29ae2dce47dc7e4a3d3ba725ff5d57fd27f74f6..0bedbb45519c88bcf8a44e4b4491a76b66e590c7 100644 --- a/routingtable/client_manager.go +++ b/routingtable/client_manager.go @@ -57,6 +57,7 @@ func (c *ClientManager) RegisterWithOptions(client RouteTableClient, opt ClientO c.mu.Lock() c.clients[client] = opt c.mu.Unlock() + c.master.UpdateNewClient(client) }