From 453a853eeb0507c5e83706d9e3f251adba5b2484 Mon Sep 17 00:00:00 2001 From: Oliver Herms <oliver.herms@exaring.de> Date: Wed, 20 Jun 2018 12:53:18 +0200 Subject: [PATCH] Replaced FSM --- config/peer.go | 24 +- main.go | 40 +- protocols/bgp/packet/decoder.go | 2 +- protocols/bgp/packet/decoder_test.go | 22 +- protocols/bgp/server/fsm.go | 951 ------------------ protocols/bgp/server/fsm2.go | 212 +++- protocols/bgp/server/fsm_active.go | 56 +- protocols/bgp/server/fsm_cease.go | 12 + protocols/bgp/server/fsm_connect.go | 19 +- protocols/bgp/server/fsm_established.go | 46 +- protocols/bgp/server/fsm_idle.go | 39 +- protocols/bgp/server/fsm_open_confirm.go | 25 +- protocols/bgp/server/fsm_open_sent.go | 25 +- protocols/bgp/server/peer.go | 106 +- protocols/bgp/server/server.go | 24 +- protocols/bgp/server/update_sender.go | 2 +- .../bgp/server/update_sender_add_path.go | 2 +- routingtable/client_manager.go | 4 +- 18 files changed, 497 insertions(+), 1114 deletions(-) delete mode 100644 protocols/bgp/server/fsm.go create mode 100644 protocols/bgp/server/fsm_cease.go diff --git a/config/peer.go b/config/peer.go index 9e057b9d..af156695 100644 --- a/config/peer.go +++ b/config/peer.go @@ -2,20 +2,22 @@ package config import ( "net" + "time" "github.com/bio-routing/bio-rd/routingtable" ) type Peer struct { - AdminEnabled bool - KeepAlive uint16 - HoldTimer uint16 - LocalAddress net.IP - PeerAddress net.IP - LocalAS uint32 - PeerAS uint32 - Passive bool - RouterID uint32 - AddPathSend routingtable.ClientOptions - AddPathRecv bool + AdminEnabled bool + ReconnectInterval time.Duration + KeepAlive time.Duration + HoldTimer time.Duration + LocalAddress net.IP + PeerAddress net.IP + LocalAS uint32 + PeerAS uint32 + Passive bool + RouterID uint32 + AddPathSend routingtable.ClientOptions + AddPathRecv bool } diff --git a/main.go b/main.go index 7e19a5aa..7686374f 100644 --- a/main.go +++ b/main.go @@ -28,32 +28,34 @@ func main() { } b.AddPeer(config.Peer{ - AdminEnabled: true, - LocalAS: 65200, - PeerAS: 65300, - PeerAddress: net.IP([]byte{169, 254, 200, 1}), - LocalAddress: net.IP([]byte{169, 254, 200, 0}), - HoldTimer: 90, - KeepAlive: 30, - Passive: true, - RouterID: b.RouterID(), + AdminEnabled: true, + LocalAS: 65200, + PeerAS: 65300, + PeerAddress: net.IP([]byte{169, 254, 200, 1}), + LocalAddress: net.IP([]byte{169, 254, 200, 0}), + ReconnectInterval: time.Second * 15, + HoldTimer: time.Second * 90, + KeepAlive: time.Second * 30, + Passive: true, + RouterID: b.RouterID(), AddPathSend: routingtable.ClientOptions{ MaxPaths: 10, }, }, rib) - time.Sleep(time.Second * 15) + //time.Sleep(time.Second * 15) b.AddPeer(config.Peer{ - AdminEnabled: true, - LocalAS: 65200, - PeerAS: 65100, - PeerAddress: net.IP([]byte{169, 254, 100, 0}), - LocalAddress: net.IP([]byte{169, 254, 100, 1}), - HoldTimer: 90, - KeepAlive: 30, - Passive: true, - RouterID: b.RouterID(), + AdminEnabled: true, + LocalAS: 65200, + PeerAS: 65100, + PeerAddress: net.IP([]byte{169, 254, 100, 0}), + LocalAddress: net.IP([]byte{169, 254, 100, 1}), + ReconnectInterval: time.Second * 15, + HoldTimer: time.Second * 90, + KeepAlive: time.Second * 30, + Passive: true, + RouterID: b.RouterID(), AddPathSend: routingtable.ClientOptions{ MaxPaths: 10, }, diff --git a/protocols/bgp/packet/decoder.go b/protocols/bgp/packet/decoder.go index 961789b3..92535299 100644 --- a/protocols/bgp/packet/decoder.go +++ b/protocols/bgp/packet/decoder.go @@ -114,7 +114,7 @@ func decodeNotificationMsg(buf *bytes.Buffer) (*BGPNotification, error) { return invalidErrCode(msg) } case Cease: - if msg.ErrorSubcode != 0 { + if msg.ErrorSubcode == 0 || msg.ErrorSubcode > 8 { return invalidErrCode(msg) } default: diff --git a/protocols/bgp/packet/decoder_test.go b/protocols/bgp/packet/decoder_test.go index 01d4e015..4b414693 100644 --- a/protocols/bgp/packet/decoder_test.go +++ b/protocols/bgp/packet/decoder_test.go @@ -227,6 +227,26 @@ func TestDecode(t *testing.T) { }, wantFail: true, }, + { + testNum: 8, + input: []byte{ + 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, + 0, 21, + 3, + 6, 4, + }, + wantFail: false, + expected: &BGPMessage{ + Header: &BGPHeader{ + Length: 21, + Type: 3, + }, + Body: &BGPNotification{ + ErrorCode: 6, + ErrorSubcode: 4, + }, + }, + }, } for _, test := range tests { @@ -252,7 +272,7 @@ func TestDecode(t *testing.T) { continue } - assert.Equal(t, test.expected, msg) + assert.Equalf(t, test.expected, msg, "Test: %d", test.testNum) } } diff --git a/protocols/bgp/server/fsm.go b/protocols/bgp/server/fsm.go deleted file mode 100644 index 48b9e6c3..00000000 --- a/protocols/bgp/server/fsm.go +++ /dev/null @@ -1,951 +0,0 @@ -package server - -import ( - "bytes" - "fmt" - "math" - "net" - "time" - - "github.com/bio-routing/bio-rd/routingtable" - - "github.com/bio-routing/bio-rd/config" - tnet "github.com/bio-routing/bio-rd/net" - "github.com/bio-routing/bio-rd/protocols/bgp/packet" - "github.com/bio-routing/bio-rd/route" - "github.com/bio-routing/bio-rd/routingtable/adjRIBIn" - "github.com/bio-routing/bio-rd/routingtable/adjRIBOut" - "github.com/bio-routing/bio-rd/routingtable/adjRIBOutAddPath" - "github.com/bio-routing/bio-rd/routingtable/locRIB" - log "github.com/sirupsen/logrus" - tomb "gopkg.in/tomb.v2" -) - -const ( - // Administrative events - ManualStart = 1 - ManualStop = 2 - AutomaticStart = 3 - ManualStartWithPassiveTcpEstablishment = 4 - AutomaticStartWithPassiveTcpEstablishment = 5 - AutomaticStop = 8 - - // Timer events - ConnectRetryTimerExpires = 9 - HoldTimerExpires = 10 - KeepaliveTimerExpires = 11 -) - -const ( - Cease = 0 - Idle = 1 - Connect = 2 - Active = 3 - OpenSent = 4 - OpenConfirm = 5 - Established = 6 -) - -type FSM struct { - peer *Peer - - t tomb.Tomb - stateReason string - state int - lastState int - eventCh chan int - - con *net.TCPConn - con2 *net.TCPConn - conCh chan *net.TCPConn - conErrCh chan error - initiateCon chan struct{} - passive bool - - local net.IP - remote net.IP - - localASN uint16 - remoteASN uint16 - - neighborID uint32 - routerID uint32 - - delayOpen bool - delayOpenTime time.Duration - delayOpenTimer *time.Timer - - connectRetryTime time.Duration - connectRetryTimer *time.Timer - connectRetryCounter int - - holdTimeConfigured time.Duration - holdTime time.Duration - holdTimer *time.Timer - - keepaliveTime time.Duration - keepaliveTimer *time.Timer - - msgRecvCh chan msgRecvMsg - msgRecvFailCh chan msgRecvErr - stopMsgRecvCh chan struct{} - - adjRIBIn *adjRIBIn.AdjRIBIn - adjRIBOut routingtable.RouteTableClient - rib *locRIB.LocRIB - updateSender routingtable.RouteTableClient - - capAddPathSend bool - capAddPathRecv bool -} - -type msgRecvMsg struct { - msg []byte - con *net.TCPConn -} - -type msgRecvErr struct { - err error - con *net.TCPConn -} - -func NewFSM(peer *Peer, c config.Peer, rib *locRIB.LocRIB) *FSM { - fsm := &FSM{ - peer: peer, - state: Idle, - passive: true, - connectRetryTime: 5, - connectRetryTimer: time.NewTimer(time.Second * time.Duration(20)), - - msgRecvCh: make(chan msgRecvMsg), - msgRecvFailCh: make(chan msgRecvErr), - stopMsgRecvCh: make(chan struct{}), - - holdTimeConfigured: time.Duration(c.HoldTimer), - holdTimer: time.NewTimer(0), - - keepaliveTime: time.Duration(c.KeepAlive), - keepaliveTimer: time.NewTimer(0), - - routerID: c.RouterID, - remote: c.PeerAddress, - local: c.LocalAddress, - localASN: uint16(c.LocalAS), - eventCh: make(chan int), - conCh: make(chan *net.TCPConn), - conErrCh: make(chan error), initiateCon: make(chan struct{}), - - rib: rib, - } - - return fsm -} - -func (fsm *FSM) disconnect() { - if fsm.con != nil { - fsm.con.Close() - fsm.con = nil - } - if fsm.con2 != nil { - fsm.con2.Close() - fsm.con2 = nil - } -} - -func (fsm *FSM) changeState(new int, reason string) int { - state := map[int]string{ - Cease: "Cease", - Idle: "Idle", - Connect: "Connect", - Active: "Active", - OpenSent: "OpenSent", - OpenConfirm: "OpenConfirm", - Established: "Established", - } - - log.WithFields(log.Fields{ - "peer": fsm.remote.String(), - "last_state": state[fsm.state], - "new_state": state[new], - "reason": reason, - }).Info("FSM: Neighbor state change") - - fsm.lastState = fsm.state - fsm.state = new - fsm.stateReason = reason - - return fsm.state -} - -func (fsm *FSM) activate() { - fsm.eventCh <- ManualStart -} - -func (fsm *FSM) Stop() error { - fsm.eventCh <- ManualStop - fsm.t.Kill(nil) - return fsm.t.Wait() -} - -func (fsm *FSM) start() { - fsm.t.Go(fsm.main) - fsm.t.Go(fsm.tcpConnector) - return -} - -func (fsm *FSM) main() error { - next := fsm.idle() - for { - switch next { - case Cease: - fsm.t.Kill(fmt.Errorf("FSM is being stopped")) - return nil - case Idle: - next = fsm.idle() - case Connect: - next = fsm.connect() - case Active: - next = fsm.active() - case OpenSent: - next = fsm.openSent() - case OpenConfirm: - next = fsm.openConfirm() - case Established: - next = fsm.established() - } - } -} - -func (fsm *FSM) idle() int { - if fsm.adjRIBOut != nil { - fsm.rib.Unregister(fsm.adjRIBOut) - fsm.adjRIBOut.Unregister(fsm.updateSender) - } - fsm.adjRIBIn = nil - fsm.adjRIBOut = nil - for { - select { - case c := <-fsm.conCh: - c.Close() - continue - case e := <-fsm.eventCh: - reason := "" - switch e { - case ManualStart: - reason = "Received ManualStart event %d for %s peer" - case AutomaticStart: - reason = "Received AutomaticStart event %d for %s peer" - default: - continue - } - - fsm.connectRetryCounter = 0 - fsm.startConnectRetryTimer() - if fsm.passive { - return fsm.changeState(Active, fmt.Sprintf(reason, "passive")) - } - fsm.tcpConnect() - return fsm.changeState(Connect, fmt.Sprintf(reason, "active")) - } - - } -} - -func (fsm *FSM) tcpConnector() error { - for { - select { - case <-fsm.initiateCon: - c, err := net.DialTCP("tcp", &net.TCPAddr{IP: fsm.local}, &net.TCPAddr{IP: fsm.remote, Port: BGPPORT}) - if err != nil { - select { - case fsm.conErrCh <- err: - continue - case <-time.NewTimer(time.Second * 30).C: - continue - } - } - - select { - case fsm.conCh <- c: - continue - case <-time.NewTimer(time.Second * 30).C: - c.Close() - continue - } - case <-fsm.t.Dying(): - return nil - } - } -} - -func (fsm *FSM) tcpConnect() { - fsm.initiateCon <- struct{}{} -} - -// connect state waits for a TCP connection to be fully established. Either the active or passive one. -func (fsm *FSM) connect() int { - for { - select { - case e := <-fsm.eventCh: - if e == ManualStop { - fsm.connectRetryCounter = 0 - stopTimer(fsm.connectRetryTimer) - return fsm.changeState(Idle, "Manual stop event") - } - continue - case <-fsm.connectRetryTimer.C: - fsm.resetConnectRetryTimer() - fsm.tcpConnect() - continue - case c := <-fsm.conCh: - fsm.con = c - stopTimer(fsm.connectRetryTimer) - return fsm.connectSendOpen() - } - } -} - -func (fsm *FSM) connectSendOpen() int { - err := fsm.sendOpen(fsm.con) - if err != nil { - stopTimer(fsm.connectRetryTimer) - return fsm.changeState(Idle, fmt.Sprintf("Sending OPEN message failed: %v", err)) - } - fsm.holdTimer = time.NewTimer(time.Minute * 4) - return fsm.changeState(OpenSent, "Sent OPEN message") -} - -// in the active state we wait for a passive TCP connection to be established -func (fsm *FSM) active() int { - for { - select { - case e := <-fsm.eventCh: - if e == ManualStop { - fsm.disconnect() - fsm.connectRetryCounter = 0 - stopTimer(fsm.connectRetryTimer) - return fsm.changeState(Active, "Manual stop event") - } - continue - case <-fsm.connectRetryTimer.C: - fsm.resetConnectRetryTimer() - fsm.tcpConnect() - return fsm.changeState(Connect, "Connect retry timer expired") - case c := <-fsm.conCh: - fsm.con = c - stopTimer(fsm.connectRetryTimer) - return fsm.activeSendOpen() - } - } -} - -func (fsm *FSM) activeSendOpen() int { - err := fsm.sendOpen(fsm.con) - if err != nil { - fsm.resetConnectRetryTimer() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, fmt.Sprintf("Sending OPEN message failed: %v", err)) - } - fsm.holdTimer = time.NewTimer(time.Minute * 4) - return fsm.changeState(OpenSent, "Sent OPEN message") -} - -func (fsm *FSM) msgReceiver(c *net.TCPConn) error { - for { - msg, err := recvMsg(c) - if err != nil { - fsm.msgRecvFailCh <- msgRecvErr{err: err, con: c} - return nil - - /*select { - case fsm.msgRecvFailCh <- msgRecvErr{err: err, con: c}: - continue - case <-time.NewTimer(time.Second * 60).C: - return nil - }*/ - } - fsm.msgRecvCh <- msgRecvMsg{msg: msg, con: c} - - select { - case <-fsm.stopMsgRecvCh: - return nil - default: - continue - } - } -} - -func (fsm *FSM) openSent() int { - go fsm.msgReceiver(fsm.con) - - for { - select { - case e := <-fsm.eventCh: - if e == ManualStop { - sendNotification(fsm.con, packet.Cease, 0) - stopTimer(fsm.connectRetryTimer) - fsm.disconnect() - fsm.connectRetryCounter = 0 - return fsm.changeState(Idle, "Manual stop event") - } - continue - case <-fsm.holdTimer.C: - sendNotification(fsm.con, packet.HoldTimeExpired, 0) - stopTimer(fsm.connectRetryTimer) - fsm.disconnect() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, "Holdtimer expired") - case c := <-fsm.conCh: // 2nd connection coming in. Collision! - if fsm.con2 != nil { - log.WithFields(log.Fields{ - "peer": fsm.remote.String(), - "local": fsm.local.String(), - }).Warningf("Received third connection from peer. Dropping new connection") - c.Close() - continue - } - - err := fsm.sendOpen(c) // FIXME: Not sure if this is standard compliant - if err != nil { - c.Close() - continue - } - fsm.con2 = c - go fsm.msgReceiver(c) - continue - case recvMsg := <-fsm.msgRecvCh: - msg, err := packet.Decode(bytes.NewBuffer(recvMsg.msg)) - if err != nil { - switch bgperr := err.(type) { - case packet.BGPError: - sendNotification(fsm.con, bgperr.ErrorCode, bgperr.ErrorSubCode) - sendNotification(fsm.con2, bgperr.ErrorCode, bgperr.ErrorSubCode) - } - stopTimer(fsm.connectRetryTimer) - fsm.disconnect() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, fmt.Sprintf("Failed to decode BGP message: %v", err)) - } - switch msg.Header.Type { - case packet.NotificationMsg: - nMsg := msg.Body.(*packet.BGPNotification) - if nMsg.ErrorCode == packet.UnsupportedVersionNumber { - stopTimer(fsm.connectRetryTimer) - fsm.disconnect() - return fsm.changeState(Idle, "Received NOTIFICATION") - } - - if nMsg.ErrorCode == packet.Cease { - // Was this connection to be closed anyways? - if fsm.dumpCon(recvMsg.con) { - continue - } - } - stopTimer(fsm.connectRetryTimer) - fsm.disconnect() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, "Received NOTIFICATION") - case packet.OpenMsg: - openMsg := msg.Body.(*packet.BGPOpen) - fsm.neighborID = openMsg.BGPIdentifier - fsm.resolveCollision() - stopTimer(fsm.connectRetryTimer) - err := fsm.sendKeepalive() - if err != nil { - return fsm.openSentTCPFail(err) - } - fsm.holdTime = time.Duration(math.Min(float64(fsm.holdTimeConfigured), float64(openMsg.HoldTime))) - if fsm.holdTime != 0 { - fsm.holdTimer.Reset(time.Second * fsm.holdTime) - fsm.keepaliveTime = fsm.holdTime / 3 - fsm.keepaliveTimer.Reset(time.Second * fsm.keepaliveTime) - } - - fsm.processOpenOptions(openMsg.OptParams) - - return fsm.changeState(OpenConfirm, "Received OPEN message") - default: - sendNotification(fsm.con, packet.FiniteStateMachineError, 0) - stopTimer(fsm.connectRetryTimer) - fsm.con.Close() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, "FSM Error") - } - case err := <-fsm.msgRecvFailCh: - if err.con == fsm.con && fsm.con2 != nil { - fsm.con.Close() - fsm.con = fsm.con2 - fsm.con2 = nil - continue - } - - if err.con == fsm.con2 { - fsm.con2.Close() - fsm.con2 = nil - continue - } - return fsm.openSentTCPFail(err.err) - } - } -} - -func (fsm *FSM) processOpenOptions(optParams []packet.OptParam) { - for _, optParam := range optParams { - if optParam.Type != packet.CapabilitiesParamType { - continue - } - - fsm.processCapabilities(optParam.Value.(packet.Capabilities)) - } -} - -func (fsm *FSM) processCapabilities(caps packet.Capabilities) { - for _, cap := range caps { - fsm.processCapability(cap) - } -} - -func (fsm *FSM) processCapability(cap packet.Capability) { - switch cap.Code { - case packet.AddPathCapabilityCode: - fsm.processAddPathCapability(cap.Value.(packet.AddPathCapability)) - - } -} - -func (fsm *FSM) processAddPathCapability(addPathCap packet.AddPathCapability) { - if addPathCap.AFI != 1 { - return - } - if addPathCap.SAFI != 1 { - return - } - switch addPathCap.SendReceive { - case packet.AddPathReceive: - if !fsm.peer.addPathSend.BestOnly { - fsm.capAddPathSend = true - } - case packet.AddPathSend: - if fsm.peer.addPathRecv { - fsm.capAddPathRecv = true - } - case packet.AddPathSendReceive: - if !fsm.peer.addPathSend.BestOnly { - fsm.capAddPathSend = true - } - if fsm.peer.addPathRecv { - fsm.capAddPathRecv = true - } - } -} - -func (fsm *FSM) openSentTCPFail(err error) int { - fsm.con.Close() - fsm.resetConnectRetryTimer() - return fsm.changeState(Active, fmt.Sprintf("TCP failure: %v", err)) -} - -func (fsm *FSM) dumpCon(c *net.TCPConn) bool { - p := fsm.isPassive(c) - if fsm.routerID > fsm.neighborID { - return p - } - return !p -} - -func (fsm *FSM) resolveCollision() { - if fsm.con2 == nil { - return - } - - if fsm.routerID > fsm.neighborID { - // Terminate passive connection - if fsm.isPassive(fsm.con) { - dumpCon(fsm.con) - fsm.con = fsm.con2 - return - } - if fsm.isPassive(fsm.con2) { - dumpCon(fsm.con2) - return - } - return - } - - // Terminate active connection - if !fsm.isPassive(fsm.con) { - dumpCon(fsm.con) - fsm.con = fsm.con2 - return - } - if !fsm.isPassive(fsm.con2) { - dumpCon(fsm.con2) - fsm.con2.Close() - fsm.con2 = nil - return - } -} - -func dumpCon(c *net.TCPConn) { - sendNotification(c, packet.Cease, packet.ConnectionCollisionResolution) - c.Close() -} - -func (fsm *FSM) isPassive(c *net.TCPConn) bool { - if c.LocalAddr().String() == fmt.Sprintf("%s:179", fsm.local.String()) { - return true - } - return false -} - -func (fsm *FSM) openConfirm() int { - for { - select { - case e := <-fsm.eventCh: - if e == ManualStop { // Event 2 - sendNotification(fsm.con, packet.Cease, 0) - stopTimer(fsm.connectRetryTimer) - fsm.disconnect() - fsm.connectRetryCounter = 0 - return fsm.changeState(Idle, "Manual stop event") - } - continue - case <-fsm.holdTimer.C: - sendNotification(fsm.con, packet.HoldTimeExpired, 0) - stopTimer(fsm.connectRetryTimer) - fsm.disconnect() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, "Holdtimer expired") - case <-fsm.keepaliveTimer.C: - err := fsm.sendKeepalive() - if err != nil { - stopTimer(fsm.connectRetryTimer) - fsm.disconnect() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, fmt.Sprintf("Failed to send keepalive: %v", err)) - } - fsm.keepaliveTimer.Reset(time.Second * fsm.keepaliveTime) - continue - case c := <-fsm.conCh: - if fsm.con2 != nil { - log.WithFields(log.Fields{ - "peer": fsm.remote.String(), - "local": fsm.local.String(), - }).Warningf("Received third connection from peer. Dropping new connection") - c.Close() - continue - } - - err := fsm.sendOpen(c) // FIXME: Not sure if this is standard compliant - if err != nil { - c.Close() - continue - } - fsm.con2 = c - go fsm.msgReceiver(c) - continue - case recvMsg := <-fsm.msgRecvCh: - msg, err := packet.Decode(bytes.NewBuffer(recvMsg.msg)) - if err != nil { - fmt.Printf("Failed to decode message: %v\n", recvMsg.msg) - switch bgperr := err.(type) { - case packet.BGPError: - sendNotification(fsm.con, bgperr.ErrorCode, bgperr.ErrorSubCode) - sendNotification(fsm.con2, bgperr.ErrorCode, bgperr.ErrorSubCode) - } - stopTimer(fsm.connectRetryTimer) - fsm.disconnect() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, "Failed to decode BGP message") - } - - switch msg.Header.Type { - case packet.NotificationMsg: - nMsg := msg.Body.(*packet.BGPNotification) - if nMsg.ErrorCode == packet.UnsupportedVersionNumber { - stopTimer(fsm.connectRetryTimer) - fsm.con.Close() - return fsm.changeState(Idle, "Received NOTIFICATION") - } - - if nMsg.ErrorCode == packet.Cease { - // Was this connection to be closed anyways? - if fsm.dumpCon(recvMsg.con) { - continue - } - } - - return fsm.openConfirmTCPFail(fmt.Errorf("NOTIFICATION received")) - case packet.KeepaliveMsg: - fsm.holdTimer.Reset(time.Second * fsm.holdTime) - return fsm.changeState(Established, "Received KEEPALIVE") - case packet.OpenMsg: - openMsg := msg.Body.(*packet.BGPOpen) - fsm.neighborID = openMsg.BGPIdentifier - fsm.resolveCollision() - default: - sendNotification(fsm.con, packet.FiniteStateMachineError, 0) - stopTimer(fsm.connectRetryTimer) - fsm.con.Close() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, "FSM Error") - } - case err := <-fsm.msgRecvFailCh: - if err.con == fsm.con && fsm.con2 != nil { - fsm.con.Close() - fsm.con = fsm.con2 - fsm.con2 = nil - continue - } - - if err.con == fsm.con2 { - fsm.con2.Close() - fsm.con2 = nil - continue - } - return fsm.openConfirmTCPFail(err.err) - } - } -} - -func (fsm *FSM) openConfirmTCPFail(err error) int { - fsm.con.Close() - fsm.resetConnectRetryTimer() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, fmt.Sprintf("Failure: %v", err)) -} - -func (fsm *FSM) established() int { - fsm.adjRIBIn = adjRIBIn.New() - fsm.adjRIBIn.Register(fsm.rib) - - n := &routingtable.Neighbor{ - Type: route.BGPPathType, - Address: tnet.IPv4ToUint32(fsm.remote), - } - - clientOptions := routingtable.ClientOptions{} - if fsm.capAddPathSend { - fsm.updateSender = newUpdateSenderAddPath(fsm) - fsm.adjRIBOut = adjRIBOutAddPath.New(n) - clientOptions = fsm.peer.addPathSend - } else { - fsm.updateSender = newUpdateSender(fsm) - fsm.adjRIBOut = adjRIBOut.New(n) - } - - fsm.adjRIBOut.Register(fsm.updateSender) - fsm.rib.RegisterWithOptions(fsm.adjRIBOut, clientOptions) - - /*go func() { - for { - if fsm.adjRIBOut == nil { - return - } - fmt.Printf("ADJ-RIB-OUT: %s\n", fsm.remote.String()) - fmt.Print(fsm.adjRIBOut.Print()) - time.Sleep(time.Second * 11) - } - }()*/ - - for { - select { - case e := <-fsm.eventCh: - if e == ManualStop { // Event 2 - sendNotification(fsm.con, packet.Cease, 0) - stopTimer(fsm.connectRetryTimer) - fsm.con.Close() - fsm.connectRetryCounter = 0 - return fsm.changeState(Idle, "Manual stop event") - } - if e == AutomaticStop { // Event 8 - sendNotification(fsm.con, packet.Cease, 0) - stopTimer(fsm.connectRetryTimer) - fsm.con.Close() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, "Automatic stop event") - } - continue - case <-fsm.holdTimer.C: - sendNotification(fsm.con, packet.HoldTimeExpired, 0) - stopTimer(fsm.connectRetryTimer) - fsm.con.Close() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, "Holdtimer expired") - case <-fsm.keepaliveTimer.C: - err := fsm.sendKeepalive() - if err != nil { - stopTimer(fsm.connectRetryTimer) - fsm.con.Close() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, fmt.Sprintf("Failed to send keepalive: %v", err)) - } - fsm.keepaliveTimer.Reset(time.Second * fsm.keepaliveTime) - continue - case c := <-fsm.conCh: - c.Close() - continue - case recvMsg := <-fsm.msgRecvCh: - msg, err := packet.Decode(bytes.NewBuffer(recvMsg.msg)) - if err != nil { - fmt.Printf("Failed to decode BGP message: %v\n", recvMsg.msg) - switch bgperr := err.(type) { - case packet.BGPError: - sendNotification(fsm.con, bgperr.ErrorCode, bgperr.ErrorSubCode) - } - stopTimer(fsm.connectRetryTimer) - fsm.con.Close() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, "Failed to decode BGP message") - } - switch msg.Header.Type { - case packet.NotificationMsg: - stopTimer(fsm.connectRetryTimer) - fsm.con.Close() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, "Received NOTIFICATION") - case packet.UpdateMsg: - if fsm.holdTime != 0 { - fsm.holdTimer.Reset(time.Second * fsm.holdTime) - } - - u := msg.Body.(*packet.BGPUpdate) - - for r := u.WithdrawnRoutes; r != nil; r = r.Next { - pfx := tnet.NewPfx(r.IP, r.Pfxlen) - fmt.Printf("LPM: Removing prefix %s\n", pfx.String()) - fsm.adjRIBIn.RemovePath(pfx, nil) - } - - for r := u.NLRI; r != nil; r = r.Next { - pfx := tnet.NewPfx(r.IP, r.Pfxlen) - fmt.Printf("LPM: Adding prefix %s\n", pfx.String()) - - path := &route.Path{ - Type: route.BGPPathType, - BGPPath: &route.BGPPath{ - Source: tnet.IPv4ToUint32(fsm.remote), - }, - } - - for pa := u.PathAttributes; pa != nil; pa = pa.Next { - fmt.Printf("TypeCode: %d\n", pa.TypeCode) - switch pa.TypeCode { - case packet.OriginAttr: - path.BGPPath.Origin = pa.Value.(uint8) - case packet.LocalPrefAttr: - path.BGPPath.LocalPref = pa.Value.(uint32) - case packet.MEDAttr: - path.BGPPath.MED = pa.Value.(uint32) - case packet.NextHopAttr: - fmt.Printf("RECEIVED NEXT_HOP: %d\n", pa.Value.(uint32)) - path.BGPPath.NextHop = pa.Value.(uint32) - case packet.ASPathAttr: - path.BGPPath.ASPath = pa.ASPathString() - path.BGPPath.ASPathLen = pa.ASPathLen() - } - } - fsm.adjRIBIn.AddPath(pfx, path) - } - - continue - case packet.KeepaliveMsg: - if fsm.holdTime != 0 { - fsm.holdTimer.Reset(time.Second * fsm.holdTime) - } - continue - case packet.OpenMsg: - if fsm.con2 != nil { - sendNotification(fsm.con2, packet.Cease, packet.ConnectionCollisionResolution) - fsm.con2.Close() - fsm.con2 = nil - continue - } - sendNotification(fsm.con, packet.FiniteStateMachineError, 0) - stopTimer(fsm.connectRetryTimer) - fsm.con.Close() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, "FSM Error") - default: - sendNotification(fsm.con, packet.FiniteStateMachineError, 0) - stopTimer(fsm.connectRetryTimer) - fsm.con.Close() - fsm.connectRetryCounter++ - return fsm.changeState(Idle, "FSM Error") - } - case err := <-fsm.msgRecvFailCh: - if err.con == fsm.con && fsm.con2 != nil { - fsm.con.Close() - fsm.con = fsm.con2 - fsm.con2 = nil - continue - } - - if err.con == fsm.con2 { - fsm.con2.Close() - fsm.con2 = nil - continue - } - return fsm.openConfirmTCPFail(err.err) - } - } -} - -func (fsm *FSM) startConnectRetryTimer() { - fsm.connectRetryTimer = time.NewTimer(time.Second * fsm.connectRetryTime) -} - -func (fsm *FSM) resetConnectRetryTimer() { - if !fsm.connectRetryTimer.Reset(time.Second * fsm.connectRetryTime) { - <-fsm.connectRetryTimer.C - } -} - -func (fsm *FSM) resetDelayOpenTimer() { - if !fsm.delayOpenTimer.Reset(time.Second * fsm.delayOpenTime) { - <-fsm.delayOpenTimer.C - } -} - -func (fsm *FSM) sendKeepalive() error { - msg := packet.SerializeKeepaliveMsg() - - _, err := fsm.con.Write(msg) - if err != nil { - return fmt.Errorf("Unable to send KEEPALIVE message: %v", err) - } - - return nil -} - -func (fsm *FSM) sendOpen(c *net.TCPConn) error { - msg := packet.SerializeOpenMsg(&packet.BGPOpen{ - Version: BGPVersion, - AS: fsm.localASN, - HoldTime: uint16(fsm.holdTimeConfigured), - BGPIdentifier: fsm.routerID, - OptParams: fsm.peer.optOpenParams, - }) - - _, err := c.Write(msg) - if err != nil { - return fmt.Errorf("Unable to send OPEN message: %v", err) - } - - return nil -} - -func sendNotification(c *net.TCPConn, errorCode uint8, errorSubCode uint8) error { - if c == nil { - return fmt.Errorf("connection is nil") - } - - msg := packet.SerializeNotificationMsg(&packet.BGPNotification{}) - - _, err := c.Write(msg) - if err != nil { - return fmt.Errorf("Unable to send NOTIFICATION message: %v", err) - } - - return nil -} diff --git a/protocols/bgp/server/fsm2.go b/protocols/bgp/server/fsm2.go index 52d1b7ae..d0f4f15f 100644 --- a/protocols/bgp/server/fsm2.go +++ b/protocols/bgp/server/fsm2.go @@ -3,12 +3,24 @@ package server import ( "fmt" "net" + "sync" "time" "github.com/bio-routing/bio-rd/protocols/bgp/packet" "github.com/bio-routing/bio-rd/routingtable" - "github.com/bio-routing/bio-rd/routingtable/adjRIBIn" "github.com/bio-routing/bio-rd/routingtable/locRIB" + log "github.com/sirupsen/logrus" +) + +const ( + // Administrative events + ManualStart = 1 + ManualStop = 2 + AutomaticStart = 3 + ManualStartWithPassiveTcpEstablishment = 4 + AutomaticStartWithPassiveTcpEstablishment = 5 + AutomaticStop = 8 + Cease = 100 ) type state interface { @@ -17,11 +29,12 @@ type state interface { // FSM2 implements the BGP finite state machine (RFC4271) type FSM2 struct { - server *BGPServer - peer *Peer - eventCh chan int - con net.Conn - conCh chan net.Conn + peer *Peer + eventCh chan int + con net.Conn + conCh chan net.Conn + initiateCon chan struct{} + conErrCh chan error delayOpen bool delayOpenTime time.Duration @@ -31,70 +44,193 @@ type FSM2 struct { connectRetryTimer *time.Timer connectRetryCounter int - holdTimeConfigured time.Duration - holdTime time.Duration - holdTimer *time.Timer + holdTime time.Duration + holdTimer *time.Timer keepaliveTime time.Duration keepaliveTimer *time.Timer - msgRecvCh chan msgRecvMsg - msgRecvFailCh chan msgRecvErr + msgRecvCh chan []byte + msgRecvFailCh chan error stopMsgRecvCh chan struct{} capAddPathSend bool capAddPathRecv bool - local net.IP - remote net.IP + local net.IP + //remote net.IP ribsInitialized bool - adjRIBIn *adjRIBIn.AdjRIBIn + adjRIBIn routingtable.RouteTableClient adjRIBOut routingtable.RouteTableClient rib *locRIB.LocRIB updateSender routingtable.RouteTableClient neighborID uint32 state state + stateMu sync.RWMutex reason string active bool } // NewPassiveFSM2 initiates a new passive FSM func NewPassiveFSM2(peer *Peer, con *net.TCPConn) *FSM2 { - fsm := &FSM2{ - peer: peer, - eventCh: make(chan int), - con: con, - conCh: make(chan net.Conn), - msgRecvCh: make(chan msgRecvMsg), - msgRecvFailCh: make(chan msgRecvErr), - stopMsgRecvCh: make(chan struct{}), - } - + fmt.Printf("NewPassiveFSM2\n") + fsm := newFSM2(peer) + fsm.con = con + fsm.state = newIdleState(fsm) return fsm } // NewActiveFSM2 initiates a new passive FSM func NewActiveFSM2(peer *Peer) *FSM2 { + fmt.Printf("NewActiveFSM2\n") + fsm := newFSM2(peer) + fsm.active = true + fsm.state = newIdleState(fsm) + return fsm +} + +func newFSM2(peer *Peer) *FSM2 { return &FSM2{ - peer: peer, - eventCh: make(chan int), - active: true, - conCh: make(chan net.Conn), + connectRetryTime: time.Minute, + peer: peer, + eventCh: make(chan int), + conCh: make(chan net.Conn), + conErrCh: make(chan error), + initiateCon: make(chan struct{}), + msgRecvCh: make(chan []byte), + msgRecvFailCh: make(chan error), + stopMsgRecvCh: make(chan struct{}), + rib: peer.rib, + } +} + +func (fsm *FSM2) start() { + go fsm.run() + go fsm.tcpConnector() + return +} + +func (fsm *FSM2) activate() { + fsm.eventCh <- AutomaticStart +} + +func (fsm *FSM2) run() { + //fmt.Printf("Starting FSM\n") + next, reason := fsm.state.run() + for { + newState := stateName(next) + oldState := stateName(fsm.state) + + if oldState != newState { + log.WithFields(log.Fields{ + "peer": fsm.peer.addr.String(), + "last_state": oldState, + "new_state": newState, + "reason": reason, + }).Info("FSM: Neighbor state change") + } + + if newState == "cease" { + return + } + + //fmt.Printf("Aquiring lock...\n") + fsm.stateMu.Lock() + fsm.state = next + //fmt.Printf("Releasing lock...\n") + fsm.stateMu.Unlock() + + //fmt.Printf("Running new state\n") + next, reason = fsm.state.run() } } -func (fsm *FSM2) Cease() { +func stateName(s state) string { + switch s.(type) { + case *idleState: + return "idle" + case *connectState: + return "connect" + case *activeState: + return "active" + case *openSentState: + return "openSent" + case *openConfirmState: + return "openConfirm" + case *establishedState: + return "established" + case *ceaseState: + return "cease" + default: + panic(fmt.Sprintf("Unknown state: %v", s)) + } +} + +func (fsm *FSM2) cease() { + fsm.eventCh <- Cease +} + +func (fsm *FSM2) tcpConnector() error { + fmt.Printf("TCP CONNECTOR STARTED\n") + for { + //fmt.Printf("READING FROM fsm.initiateCon\n") + select { + case <-fsm.initiateCon: + fmt.Printf("Initiating connection to %s\n", fsm.peer.addr.String()) + c, err := net.DialTCP("tcp", &net.TCPAddr{IP: fsm.local}, &net.TCPAddr{IP: fsm.peer.addr, Port: BGPPORT}) + if err != nil { + select { + case fsm.conErrCh <- err: + continue + case <-time.NewTimer(time.Second * 30).C: + continue + } + } + + //fmt.Printf("GOT CONNECTION!\n") + select { + case fsm.conCh <- c: + continue + case <-time.NewTimer(time.Second * 30).C: + c.Close() + continue + } + } + } +} +func (fsm *FSM2) tcpConnect() { + fsm.initiateCon <- struct{}{} +} + +func (fsm *FSM2) msgReceiver() error { + for { + msg, err := recvMsg(fsm.con) + if err != nil { + fsm.msgRecvFailCh <- err + return nil + + /*select { + case fsm.msgRecvFailCh <- msgRecvErr{err: err, con: c}: + continue + case <-time.NewTimer(time.Second * 60).C: + return nil + }*/ + } + fmt.Printf("Message received for %s: %v\n", fsm.con.RemoteAddr().String(), msg[18]) + fsm.msgRecvCh <- msg + } } func (fsm *FSM2) startConnectRetryTimer() { - fsm.connectRetryTimer = time.NewTimer(time.Second * fsm.connectRetryTime) + fmt.Printf("Initializing connectRetryTimer: %d\n", fsm.connectRetryTime) + fsm.connectRetryTimer = time.NewTimer(fsm.connectRetryTime) } func (fsm *FSM2) resetConnectRetryTimer() { - if !fsm.connectRetryTimer.Reset(time.Second * fsm.connectRetryTime) { + if !fsm.connectRetryTimer.Reset(fsm.connectRetryTime) { <-fsm.connectRetryTimer.C } } @@ -103,16 +239,14 @@ func (fsm *FSM2) resetConnectRetryCounter() { fsm.connectRetryCounter = 0 } -func (fsm *FSM2) tcpConnect() { - -} - func (fsm *FSM2) sendOpen() error { + fmt.Printf("Sending OPEN Message to %s\n", fsm.con.RemoteAddr().String()) + msg := packet.SerializeOpenMsg(&packet.BGPOpen{ Version: BGPVersion, - AS: uint16(fsm.peer.asn), - HoldTime: uint16(fsm.holdTimeConfigured), - BGPIdentifier: fsm.server.routerID, + AS: uint16(fsm.peer.localASN), + HoldTime: uint16(fsm.peer.holdTime / time.Second), + BGPIdentifier: fsm.peer.server.routerID, OptParams: fsm.peer.optOpenParams, }) @@ -125,6 +259,7 @@ func (fsm *FSM2) sendOpen() error { } func (fsm *FSM2) sendNotification(errorCode uint8, errorSubCode uint8) error { + fmt.Printf("Sending NOTIFICATION Message to %s\n", fsm.con.RemoteAddr().String()) msg := packet.SerializeNotificationMsg(&packet.BGPNotification{}) _, err := fsm.con.Write(msg) @@ -136,6 +271,7 @@ func (fsm *FSM2) sendNotification(errorCode uint8, errorSubCode uint8) error { } func (fsm *FSM2) sendKeepalive() error { + fmt.Printf("Sending KEEPALIVE to %s\n", fsm.con.RemoteAddr().String()) msg := packet.SerializeKeepaliveMsg() _, err := fsm.con.Write(msg) diff --git a/protocols/bgp/server/fsm_active.go b/protocols/bgp/server/fsm_active.go index e2884dc6..2b269242 100644 --- a/protocols/bgp/server/fsm_active.go +++ b/protocols/bgp/server/fsm_active.go @@ -1,5 +1,11 @@ package server +import ( + "fmt" + "net" + "time" +) + type activeState struct { fsm *FSM2 } @@ -10,18 +16,56 @@ func newActiveState(fsm *FSM2) *activeState { } } -func (s *activeState) run() (state, string) { +func (s activeState) run() (state, string) { + fmt.Printf("THIS IS ACTIVE STATE\n") for { select { case e := <-s.fsm.eventCh: - if e == ManualStop { - + switch e { + case ManualStop: + return s.manualStop() + case Cease: + return s.cease() + default: + continue } - continue case <-s.fsm.connectRetryTimer.C: - + return s.connectRetryTimerExpired() case c := <-s.fsm.conCh: - + fmt.Printf("Received a connection in active state\n") + return s.connectionSuccess(c) } } } + +func (s *activeState) manualStop() (state, string) { + s.fsm.con.Close() + s.fsm.resetConnectRetryCounter() + stopTimer(s.fsm.connectRetryTimer) + return newIdleState(s.fsm), "Manual stop event" +} + +func (s *activeState) cease() (state, string) { + s.fsm.con.Close() + return newCeaseState(), "Cease" +} + +func (s *activeState) connectRetryTimerExpired() (state, string) { + s.fsm.resetConnectRetryTimer() + s.fsm.tcpConnect() + return newConnectState(s.fsm), "Connect retry timer expired" +} + +func (s *activeState) connectionSuccess(con net.Conn) (state, string) { + s.fsm.con = con + stopTimer(s.fsm.connectRetryTimer) + err := s.fsm.sendOpen() + if err != nil { + s.fsm.resetConnectRetryTimer() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), fmt.Sprintf("Sending OPEN message failed: %v", err) + } + s.fsm.holdTimer = time.NewTimer(time.Minute * 4) + fmt.Printf("Next state: OpenSent!\n") + return newOpenSentState(s.fsm), "Sent OPEN message" +} diff --git a/protocols/bgp/server/fsm_cease.go b/protocols/bgp/server/fsm_cease.go new file mode 100644 index 00000000..7d239840 --- /dev/null +++ b/protocols/bgp/server/fsm_cease.go @@ -0,0 +1,12 @@ +package server + +type ceaseState struct { +} + +func newCeaseState() *ceaseState { + return &ceaseState{} +} + +func (s ceaseState) run() (state, string) { + return newCeaseState(), "Loop" +} diff --git a/protocols/bgp/server/fsm_connect.go b/protocols/bgp/server/fsm_connect.go index e25ff12f..eac14594 100644 --- a/protocols/bgp/server/fsm_connect.go +++ b/protocols/bgp/server/fsm_connect.go @@ -16,36 +16,41 @@ func newConnectState(fsm *FSM2) *connectState { } } -func (s *connectState) run() (state, string) { +func (s connectState) run() (state, string) { for { select { case e := <-s.fsm.eventCh: - if e == ManualStop { + switch e { + case ManualStop: return s.manualStop() + case Cease: + return newCeaseState(), "Cease" + default: + continue } - continue case <-s.fsm.connectRetryTimer.C: s.connectRetryTimerExpired() continue case c := <-s.fsm.conCh: - s.connectionSuccess(c) + return s.connectionSuccess(c) } } } func (s *connectState) connectionSuccess(c net.Conn) (state, string) { + fmt.Printf("GOT A TCP CONNECTION!\n") s.fsm.con = c stopTimer(s.fsm.connectRetryTimer) err := s.fsm.sendOpen() if err != nil { return newIdleState(s.fsm), fmt.Sprintf("Unable to send open: %v", err) } - s.fsm.holdTimer = time.NewTimer(time.Minute * 4) return newOpenSentState(s.fsm), "TCP connection succeeded" } func (s *connectState) connectRetryTimerExpired() { + fmt.Printf("Connect retry timer expired\n") s.fsm.resetConnectRetryTimer() s.fsm.tcpConnect() } @@ -55,7 +60,3 @@ func (s *connectState) manualStop() (state, string) { stopTimer(s.fsm.connectRetryTimer) return newIdleState(s.fsm), "Manual stop event" } - -func (s *connectState) cease() { - -} diff --git a/protocols/bgp/server/fsm_established.go b/protocols/bgp/server/fsm_established.go index 18b968a3..faebad99 100644 --- a/protocols/bgp/server/fsm_established.go +++ b/protocols/bgp/server/fsm_established.go @@ -3,7 +3,6 @@ package server import ( "bytes" "fmt" - "time" tnet "github.com/bio-routing/bio-rd/net" "github.com/bio-routing/bio-rd/protocols/bgp/packet" @@ -24,7 +23,7 @@ func newEstablishedState(fsm *FSM2) *establishedState { } } -func (s *establishedState) run() (state, string) { +func (s establishedState) run() (state, string) { if !s.fsm.ribsInitialized { s.init() } @@ -32,13 +31,16 @@ func (s *establishedState) run() (state, string) { for { select { case e := <-s.fsm.eventCh: - if e == ManualStop { + switch e { + case ManualStop: return s.manualStop() - } - if e == AutomaticStop { + case AutomaticStop: return s.automaticStop() + case Cease: + return s.cease() + default: + continue } - continue case <-s.fsm.holdTimer.C: return s.holdTimerExpired() case <-s.fsm.keepaliveTimer.C: @@ -55,7 +57,7 @@ func (s *establishedState) init() { n := &routingtable.Neighbor{ Type: route.BGPPathType, - Address: tnet.IPv4ToUint32(s.fsm.remote), + Address: tnet.IPv4ToUint32(s.fsm.peer.addr), } clientOptions := routingtable.ClientOptions{} @@ -99,6 +101,13 @@ func (s *establishedState) automaticStop() (state, string) { return newIdleState(s.fsm), "Automatic stop event" } +func (s *establishedState) cease() (state, string) { + s.fsm.sendNotification(packet.Cease, 0) + s.uninit() + s.fsm.con.Close() + return newCeaseState(), "Cease" +} + func (s *establishedState) holdTimerExpired() (state, string) { s.fsm.sendNotification(packet.HoldTimeExpired, 0) s.uninit() @@ -117,14 +126,13 @@ func (s *establishedState) keepaliveTimerExpired() (state, string) { return newIdleState(s.fsm), fmt.Sprintf("Failed to send keepalive: %v", err) } - s.fsm.keepaliveTimer.Reset(time.Second * s.fsm.keepaliveTime) + s.fsm.keepaliveTimer.Reset(s.fsm.keepaliveTime) return newEstablishedState(s.fsm), s.fsm.reason } -func (s *establishedState) msgReceived(recvMsg msgRecvMsg) (state, string) { - msg, err := packet.Decode(bytes.NewBuffer(recvMsg.msg)) +func (s *establishedState) msgReceived(data []byte) (state, string) { + msg, err := packet.Decode(bytes.NewBuffer(data)) if err != nil { - fmt.Printf("Failed to decode BGP message: %v\n", recvMsg.msg) switch bgperr := err.(type) { case packet.BGPError: s.fsm.sendNotification(bgperr.ErrorCode, bgperr.ErrorSubCode) @@ -139,6 +147,8 @@ func (s *establishedState) msgReceived(recvMsg msgRecvMsg) (state, string) { return s.notification() case packet.UpdateMsg: return s.update(msg) + case packet.KeepaliveMsg: + return s.keepaliveReceived() default: return s.unexpectedMessage() } @@ -154,7 +164,7 @@ func (s *establishedState) notification() (state, string) { func (s *establishedState) update(msg *packet.BGPMessage) (state, string) { if s.fsm.holdTime != 0 { - s.fsm.holdTimer.Reset(time.Second * s.fsm.holdTime) + s.fsm.holdTimer.Reset(s.fsm.holdTime) } u := msg.Body.(*packet.BGPUpdate) @@ -167,7 +177,6 @@ func (s *establishedState) update(msg *packet.BGPMessage) (state, string) { func (s *establishedState) withdraws(u *packet.BGPUpdate) { for r := u.WithdrawnRoutes; r != nil; r = r.Next { pfx := tnet.NewPfx(r.IP, r.Pfxlen) - fmt.Printf("LPM: Removing prefix %s\n", pfx.String()) s.fsm.adjRIBIn.RemovePath(pfx, nil) } } @@ -175,17 +184,15 @@ func (s *establishedState) withdraws(u *packet.BGPUpdate) { func (s *establishedState) updates(u *packet.BGPUpdate) { for r := u.NLRI; r != nil; r = r.Next { pfx := tnet.NewPfx(r.IP, r.Pfxlen) - fmt.Printf("LPM: Adding prefix %s\n", pfx.String()) path := &route.Path{ Type: route.BGPPathType, BGPPath: &route.BGPPath{ - Source: tnet.IPv4ToUint32(s.fsm.remote), + Source: tnet.IPv4ToUint32(s.fsm.peer.addr), }, } for pa := u.PathAttributes; pa != nil; pa = pa.Next { - fmt.Printf("TypeCode: %d\n", pa.TypeCode) switch pa.TypeCode { case packet.OriginAttr: path.BGPPath.Origin = pa.Value.(uint8) @@ -205,6 +212,13 @@ func (s *establishedState) updates(u *packet.BGPUpdate) { } } +func (s *establishedState) keepaliveReceived() (state, string) { + if s.fsm.holdTime != 0 { + s.fsm.holdTimer.Reset(s.fsm.holdTime) + } + return newEstablishedState(s.fsm), s.fsm.reason +} + func (s *establishedState) unexpectedMessage() (state, string) { s.fsm.sendNotification(packet.FiniteStateMachineError, 0) s.uninit() diff --git a/protocols/bgp/server/fsm_idle.go b/protocols/bgp/server/fsm_idle.go index 4b4697f6..87d57f79 100644 --- a/protocols/bgp/server/fsm_idle.go +++ b/protocols/bgp/server/fsm_idle.go @@ -1,5 +1,10 @@ package server +import ( + "fmt" + "time" +) + type idleState struct { fsm *FSM2 newStateReason string @@ -11,35 +16,41 @@ func newIdleState(fsm *FSM2) *idleState { } } -func (s *idleState) run() (state, string) { +func (s idleState) run() (state, string) { + if s.fsm.peer.reconnectInterval != 0 { + time.Sleep(s.fsm.peer.reconnectInterval) + go s.fsm.activate() + } for { - switch <-s.fsm.eventCh { + event := <-s.fsm.eventCh + fmt.Printf("Event: %d\n", event) + switch event { case ManualStart: - s.manualStart() + return s.manualStart() case AutomaticStart: - s.automaticStart() + return s.automaticStart() + case Cease: + return newCeaseState(), "Cease" default: continue } - - return newConnectState(s.fsm), s.newStateReason } } -func (s *idleState) manualStart() { +func (s *idleState) manualStart() (state, string) { s.newStateReason = "Received ManualStart event" - s.start() + return s.start() } -func (s *idleState) automaticStart() { +func (s *idleState) automaticStart() (state, string) { s.newStateReason = "Received AutomaticStart event" - s.start() + return s.start() } -func (s *idleState) start() { +func (s *idleState) start() (state, string) { s.fsm.resetConnectRetryCounter() s.fsm.startConnectRetryTimer() - if s.fsm.active { - s.fsm.tcpConnect() - } + go s.fsm.tcpConnect() + + return newConnectState(s.fsm), s.newStateReason } diff --git a/protocols/bgp/server/fsm_open_confirm.go b/protocols/bgp/server/fsm_open_confirm.go index 5c92cb0e..1959c21e 100644 --- a/protocols/bgp/server/fsm_open_confirm.go +++ b/protocols/bgp/server/fsm_open_confirm.go @@ -3,7 +3,6 @@ package server import ( "bytes" "fmt" - "time" "github.com/bio-routing/bio-rd/protocols/bgp/packet" ) @@ -18,14 +17,18 @@ func newOpenConfirmState(fsm *FSM2) *openConfirmState { } } -func (s *openConfirmState) run() (state, string) { +func (s openConfirmState) run() (state, string) { for { select { case e := <-s.fsm.eventCh: - if e == ManualStop { + switch e { + case ManualStop: return s.manualStop() + case Cease: + return s.cease() + default: + continue } - continue case <-s.fsm.holdTimer.C: return s.holdTimerExpired() case <-s.fsm.keepaliveTimer.C: @@ -52,6 +55,12 @@ func (s *openConfirmState) automaticStop() (state, string) { return newIdleState(s.fsm), "Automatic stop event" } +func (s *openConfirmState) cease() (state, string) { + s.fsm.sendNotification(packet.Cease, 0) + s.fsm.con.Close() + return newCeaseState(), "Cease" +} + func (s *openConfirmState) holdTimerExpired() (state, string) { s.fsm.sendNotification(packet.HoldTimeExpired, 0) stopTimer(s.fsm.connectRetryTimer) @@ -68,12 +77,12 @@ func (s *openConfirmState) keepaliveTimerExpired() (state, string) { s.fsm.connectRetryCounter++ return newIdleState(s.fsm), fmt.Sprintf("Failed to send keepalive: %v", err) } - s.fsm.keepaliveTimer.Reset(time.Second * s.fsm.keepaliveTime) + s.fsm.keepaliveTimer.Reset(s.fsm.keepaliveTime) return newOpenConfirmState(s.fsm), s.fsm.reason } -func (s *openConfirmState) msgReceived(recvMsg msgRecvMsg) (state, string) { - msg, err := packet.Decode(bytes.NewBuffer(recvMsg.msg)) +func (s *openConfirmState) msgReceived(data []byte) (state, string) { + msg, err := packet.Decode(bytes.NewBuffer(data)) if err != nil { switch bgperr := err.(type) { case packet.BGPError: @@ -106,7 +115,7 @@ func (s *openConfirmState) notification(msg *packet.BGPMessage) (state, string) } func (s *openConfirmState) keepaliveReceived() (state, string) { - s.fsm.holdTimer.Reset(time.Second * s.fsm.holdTime) + s.fsm.holdTimer.Reset(s.fsm.holdTime) return newEstablishedState(s.fsm), "Received KEEPALIVE" } diff --git a/protocols/bgp/server/fsm_open_sent.go b/protocols/bgp/server/fsm_open_sent.go index c1ad7ff4..bb29ef6f 100644 --- a/protocols/bgp/server/fsm_open_sent.go +++ b/protocols/bgp/server/fsm_open_sent.go @@ -14,12 +14,14 @@ type openSentState struct { } func newOpenSentState(fsm *FSM2) *openSentState { + fmt.Printf("newOpenSentState\n") return &openSentState{ fsm: fsm, } } -func (s *openSentState) run() (state, string) { +func (s openSentState) run() (state, string) { + go s.fsm.msgReceiver() for { select { case e := <-s.fsm.eventCh: @@ -28,6 +30,8 @@ func (s *openSentState) run() (state, string) { return s.manualStop() case AutomaticStop: return s.automaticStop() + case Cease: + return s.cease() default: continue } @@ -55,6 +59,12 @@ func (s *openSentState) automaticStop() (state, string) { return newIdleState(s.fsm), "Automatic stop event" } +func (s *openSentState) cease() (state, string) { + s.fsm.sendNotification(packet.Cease, 0) + s.fsm.con.Close() + return newCeaseState(), "Cease" +} + func (s *openSentState) holdTimerExpired() (state, string) { s.fsm.sendNotification(packet.HoldTimeExpired, 0) stopTimer(s.fsm.connectRetryTimer) @@ -63,8 +73,8 @@ func (s *openSentState) holdTimerExpired() (state, string) { return newIdleState(s.fsm), "Holdtimer expired" } -func (s *openSentState) msgReceived(recvMsg msgRecvMsg) (state, string) { - msg, err := packet.Decode(bytes.NewBuffer(recvMsg.msg)) +func (s *openSentState) msgReceived(data []byte) (state, string) { + msg, err := packet.Decode(bytes.NewBuffer(data)) if err != nil { switch bgperr := err.(type) { case packet.BGPError: @@ -97,16 +107,19 @@ func (s *openSentState) openMsgReceived(msg *packet.BGPMessage) (state, string) openMsg := msg.Body.(*packet.BGPOpen) s.fsm.neighborID = openMsg.BGPIdentifier stopTimer(s.fsm.connectRetryTimer) + s.fsm.peer.collisionHandling(s.fsm) err := s.fsm.sendKeepalive() if err != nil { return s.tcpFailure() } - s.fsm.holdTime = time.Duration(math.Min(float64(s.fsm.holdTimeConfigured), float64(openMsg.HoldTime))) + s.fsm.holdTime = time.Duration(math.Min(float64(s.fsm.peer.holdTime), float64(time.Duration(openMsg.HoldTime)*time.Second))) if s.fsm.holdTime != 0 { - s.fsm.holdTimer.Reset(time.Second * s.fsm.holdTime) + if !s.fsm.holdTimer.Reset(s.fsm.holdTime) { + <-s.fsm.holdTimer.C + } s.fsm.keepaliveTime = s.fsm.holdTime / 3 - s.fsm.keepaliveTimer.Reset(time.Second * s.fsm.keepaliveTime) + s.fsm.keepaliveTimer = time.NewTimer(s.fsm.keepaliveTime) } s.processOpenOptions(openMsg.OptParams) diff --git a/protocols/bgp/server/peer.go b/protocols/bgp/server/peer.go index 7bfcbc56..71bb9723 100644 --- a/protocols/bgp/server/peer.go +++ b/protocols/bgp/server/peer.go @@ -2,35 +2,102 @@ package server import ( "net" + "sync" + "time" "github.com/bio-routing/bio-rd/protocols/bgp/packet" "github.com/bio-routing/bio-rd/routingtable" + "github.com/bio-routing/bio-rd/routingtable/filter" "github.com/bio-routing/bio-rd/routingtable/locRIB" "github.com/bio-routing/bio-rd/config" ) type Peer struct { - addr net.IP - asn uint32 - fsm *FSM - rib *locRIB.LocRIB - routerID uint32 - addPathSend routingtable.ClientOptions - addPathRecv bool - optOpenParams []packet.OptParam + server *BGPServer + addr net.IP + peerASN uint32 + localASN uint32 + fsms []*FSM2 + fsmsMu sync.Mutex + rib *locRIB.LocRIB + routerID uint32 + addPathSend routingtable.ClientOptions + addPathRecv bool + reconnectInterval time.Duration + keepaliveTime time.Duration + holdTime time.Duration + optOpenParams []packet.OptParam + importFilter *filter.Filter + exportFilter *filter.Filter } -func NewPeer(c config.Peer, rib *locRIB.LocRIB) (*Peer, error) { +func (p *Peer) collisionHandling(callingFSM *FSM2) bool { + p.fsmsMu.Lock() + defer p.fsmsMu.Unlock() + + for _, fsm := range p.fsms { + if callingFSM == fsm { + continue + } + + fsm.stateMu.RLock() + isEstablished := isEstablishedState(fsm.state) + isOpenConfirm := isOpenConfirmState(fsm.state) + fsm.stateMu.RUnlock() + + if isEstablished { + return true + } + + if !isOpenConfirm { + continue + } + + if p.routerID < callingFSM.neighborID { + fsm.cease() + } else { + return true + } + } + + return false +} + +func isOpenConfirmState(s state) bool { + switch s.(type) { + case openConfirmState: + return true + } + + return false +} + +func isEstablishedState(s state) bool { + switch s.(type) { + case establishedState: + return true + } + + return false +} + +func NewPeer(c config.Peer, rib *locRIB.LocRIB, server *BGPServer) (*Peer, error) { p := &Peer{ - addr: c.PeerAddress, - asn: c.PeerAS, - rib: rib, - addPathSend: c.AddPathSend, - addPathRecv: c.AddPathRecv, - optOpenParams: make([]packet.OptParam, 0), + server: server, + addr: c.PeerAddress, + peerASN: c.PeerAS, + localASN: c.LocalAS, + fsms: make([]*FSM2, 0), + rib: rib, + addPathSend: c.AddPathSend, + addPathRecv: c.AddPathRecv, + reconnectInterval: c.ReconnectInterval, + keepaliveTime: c.KeepAlive, + holdTime: c.HoldTimer, + optOpenParams: make([]packet.OptParam, 0), } - p.fsm = NewFSM(p, c, rib) + p.fsms = append(p.fsms, NewActiveFSM2(p)) caps := make([]packet.Capability, 0) @@ -67,11 +134,6 @@ func (p *Peer) GetAddr() net.IP { return p.addr } -func (p *Peer) GetASN() uint32 { - return p.asn -} - func (p *Peer) Start() { - p.fsm.start() - p.fsm.activate() + p.fsms[0].start() } diff --git a/protocols/bgp/server/server.go b/protocols/bgp/server/server.go index 8a11cd3a..bd32c341 100644 --- a/protocols/bgp/server/server.go +++ b/protocols/bgp/server/server.go @@ -4,12 +4,10 @@ import ( "fmt" "io" "net" - "strings" "github.com/bio-routing/bio-rd/config" "github.com/bio-routing/bio-rd/protocols/bgp/packet" "github.com/bio-routing/bio-rd/routingtable/locRIB" - log "github.com/sirupsen/logrus" ) const ( @@ -61,7 +59,7 @@ func (b *BGPServer) Start(c *config.Global) error { func (b *BGPServer) incomingConnectionWorker() { for { - c := <-b.acceptCh + /*c := <-b.acceptCh fmt.Printf("Incoming connection!\n") fmt.Printf("Connection from: %v\n", c.RemoteAddr()) @@ -78,9 +76,19 @@ func (b *BGPServer) incomingConnectionWorker() { "source": c.RemoteAddr(), }).Info("Incoming TCP connection") - fmt.Printf("DEBUG: Sending incoming TCP connection to fsm for peer %s\n", peerAddr) - b.peers[peerAddr].fsm.conCh <- c - fmt.Printf("DEBUG: Sending done\n") + fmt.Printf("Initiating new ActiveFSM due to incoming connection from peer %s\n", peerAddr) + fsm := NewActiveFSM2(b.peers[peerAddr]) + fsm.state = newActiveState(fsm) + fsm.startConnectRetryTimer() + + fmt.Printf("Getting lock...\n") + b.peers[peerAddr].fsmsMu.Lock() + b.peers[peerAddr].fsms = append(b.peers[peerAddr].fsms, fsm) + fmt.Printf("Releasing lock...\n") + b.peers[peerAddr].fsmsMu.Unlock() + + go fsm.run() + fsm.conCh <- c*/ } } @@ -89,7 +97,7 @@ func (b *BGPServer) AddPeer(c config.Peer, rib *locRIB.LocRIB) error { return fmt.Errorf("32bit ASNs are not supported yet") } - peer, err := NewPeer(c, rib) + peer, err := NewPeer(c, rib, b) if err != nil { return err } @@ -102,7 +110,7 @@ func (b *BGPServer) AddPeer(c config.Peer, rib *locRIB.LocRIB) error { return nil } -func recvMsg(c *net.TCPConn) (msg []byte, err error) { +func recvMsg(c net.Conn) (msg []byte, err error) { buffer := make([]byte, packet.MaxLen) _, err = io.ReadFull(c, buffer[0:packet.MinLen]) if err != nil { diff --git a/protocols/bgp/server/update_sender.go b/protocols/bgp/server/update_sender.go index 490771f1..9a72da3a 100644 --- a/protocols/bgp/server/update_sender.go +++ b/protocols/bgp/server/update_sender.go @@ -25,7 +25,7 @@ func newUpdateSender(fsm *FSM2) *UpdateSender { // AddPath serializes a new path and sends out a BGP update message func (u *UpdateSender) AddPath(pfx net.Prefix, p *route.Path) error { - asPathPA, err := packet.ParseASPathStr(fmt.Sprintf("%d %s", u.fsm.localASN, p.BGPPath.ASPath)) + asPathPA, err := packet.ParseASPathStr(fmt.Sprintf("%d %s", u.fsm.peer.localASN, p.BGPPath.ASPath)) if err != nil { return fmt.Errorf("Unable to parse AS path: %v", err) } diff --git a/protocols/bgp/server/update_sender_add_path.go b/protocols/bgp/server/update_sender_add_path.go index 15c0bf69..440d5d73 100644 --- a/protocols/bgp/server/update_sender_add_path.go +++ b/protocols/bgp/server/update_sender_add_path.go @@ -25,7 +25,7 @@ func newUpdateSenderAddPath(fsm *FSM2) *UpdateSenderAddPath { // AddPath serializes a new path and sends out a BGP update message func (u *UpdateSenderAddPath) AddPath(pfx net.Prefix, p *route.Path) error { - asPathPA, err := packet.ParseASPathStr(fmt.Sprintf("%d %s", u.fsm.localASN, p.BGPPath.ASPath)) + asPathPA, err := packet.ParseASPathStr(fmt.Sprintf("%d %s", u.fsm.peer.localASN, p.BGPPath.ASPath)) if err != nil { return fmt.Errorf("Unable to parse AS path: %v", err) } diff --git a/routingtable/client_manager.go b/routingtable/client_manager.go index ee0275c3..483e4c26 100644 --- a/routingtable/client_manager.go +++ b/routingtable/client_manager.go @@ -55,9 +55,9 @@ func (c *ClientManager) Register(client RouteTableClient) { // RegisterWithOptions registers a client with options for updates func (c *ClientManager) RegisterWithOptions(client RouteTableClient, opt ClientOptions) { c.mu.Lock() - defer c.mu.Unlock() - c.clients[client] = opt + c.mu.Unlock() + c.master.UpdateNewClient(client) } -- GitLab