diff --git a/protocols/bgp/server/fsm.go b/protocols/bgp/server/fsm.go index 20be5160eff16fb132ffe45ed1a2875b0d333fdb..a26a743d7d85245a431483c05d35a2d1d4d09f63 100644 --- a/protocols/bgp/server/fsm.go +++ b/protocols/bgp/server/fsm.go @@ -898,15 +898,6 @@ func (fsm *FSM) established() int { } } -func stopTimer(t *time.Timer) { - if !t.Stop() { - select { - case <-t.C: - default: - } - } -} - func (fsm *FSM) startConnectRetryTimer() { fsm.connectRetryTimer = time.NewTimer(time.Second * fsm.connectRetryTime) } diff --git a/protocols/bgp/server/fsm2.go b/protocols/bgp/server/fsm2.go new file mode 100644 index 0000000000000000000000000000000000000000..52d1b7ae60888f7419d4b902fda32619d4084d57 --- /dev/null +++ b/protocols/bgp/server/fsm2.go @@ -0,0 +1,156 @@ +package server + +import ( + "fmt" + "net" + "time" + + "github.com/bio-routing/bio-rd/protocols/bgp/packet" + "github.com/bio-routing/bio-rd/routingtable" + "github.com/bio-routing/bio-rd/routingtable/adjRIBIn" + "github.com/bio-routing/bio-rd/routingtable/locRIB" +) + +type state interface { + run() (state, string) +} + +// FSM2 implements the BGP finite state machine (RFC4271) +type FSM2 struct { + server *BGPServer + peer *Peer + eventCh chan int + con net.Conn + conCh chan net.Conn + + delayOpen bool + delayOpenTime time.Duration + delayOpenTimer *time.Timer + + connectRetryTime time.Duration + connectRetryTimer *time.Timer + connectRetryCounter int + + holdTimeConfigured time.Duration + holdTime time.Duration + holdTimer *time.Timer + + keepaliveTime time.Duration + keepaliveTimer *time.Timer + + msgRecvCh chan msgRecvMsg + msgRecvFailCh chan msgRecvErr + stopMsgRecvCh chan struct{} + + capAddPathSend bool + capAddPathRecv bool + + local net.IP + remote net.IP + + ribsInitialized bool + adjRIBIn *adjRIBIn.AdjRIBIn + adjRIBOut routingtable.RouteTableClient + rib *locRIB.LocRIB + updateSender routingtable.RouteTableClient + + neighborID uint32 + state state + reason string + active bool +} + +// NewPassiveFSM2 initiates a new passive FSM +func NewPassiveFSM2(peer *Peer, con *net.TCPConn) *FSM2 { + fsm := &FSM2{ + peer: peer, + eventCh: make(chan int), + con: con, + conCh: make(chan net.Conn), + msgRecvCh: make(chan msgRecvMsg), + msgRecvFailCh: make(chan msgRecvErr), + stopMsgRecvCh: make(chan struct{}), + } + + return fsm +} + +// NewActiveFSM2 initiates a new passive FSM +func NewActiveFSM2(peer *Peer) *FSM2 { + return &FSM2{ + peer: peer, + eventCh: make(chan int), + active: true, + conCh: make(chan net.Conn), + } +} + +func (fsm *FSM2) Cease() { + +} + +func (fsm *FSM2) startConnectRetryTimer() { + fsm.connectRetryTimer = time.NewTimer(time.Second * fsm.connectRetryTime) +} + +func (fsm *FSM2) resetConnectRetryTimer() { + if !fsm.connectRetryTimer.Reset(time.Second * fsm.connectRetryTime) { + <-fsm.connectRetryTimer.C + } +} + +func (fsm *FSM2) resetConnectRetryCounter() { + fsm.connectRetryCounter = 0 +} + +func (fsm *FSM2) tcpConnect() { + +} + +func (fsm *FSM2) sendOpen() error { + msg := packet.SerializeOpenMsg(&packet.BGPOpen{ + Version: BGPVersion, + AS: uint16(fsm.peer.asn), + HoldTime: uint16(fsm.holdTimeConfigured), + BGPIdentifier: fsm.server.routerID, + OptParams: fsm.peer.optOpenParams, + }) + + _, err := fsm.con.Write(msg) + if err != nil { + return fmt.Errorf("Unable to send OPEN message: %v", err) + } + + return nil +} + +func (fsm *FSM2) sendNotification(errorCode uint8, errorSubCode uint8) error { + msg := packet.SerializeNotificationMsg(&packet.BGPNotification{}) + + _, err := fsm.con.Write(msg) + if err != nil { + return fmt.Errorf("Unable to send NOTIFICATION message: %v", err) + } + + return nil +} + +func (fsm *FSM2) sendKeepalive() error { + msg := packet.SerializeKeepaliveMsg() + + _, err := fsm.con.Write(msg) + if err != nil { + return fmt.Errorf("Unable to send KEEPALIVE message: %v", err) + } + + return nil +} + +func stopTimer(t *time.Timer) { + if !t.Stop() { + select { + case <-t.C: + default: + } + } +} diff --git a/protocols/bgp/server/fsm_active.go b/protocols/bgp/server/fsm_active.go new file mode 100644 index 0000000000000000000000000000000000000000..e2884dc6be28ccb7fec65d263bf5a47cc3cd7827 --- /dev/null +++ b/protocols/bgp/server/fsm_active.go @@ -0,0 +1,27 @@ +package server + +type activeState struct { + fsm *FSM2 +} + +func newActiveState(fsm *FSM2) *activeState { + return &activeState{ + fsm: fsm, + } +} + +func (s *activeState) run() (state, string) { + for { + select { + case e := <-s.fsm.eventCh: + if e == ManualStop { + + } + continue + case <-s.fsm.connectRetryTimer.C: + + case c := <-s.fsm.conCh: + + } + } +} diff --git a/protocols/bgp/server/fsm_connect.go b/protocols/bgp/server/fsm_connect.go new file mode 100644 index 0000000000000000000000000000000000000000..e25ff12f37412eb86e36287b41b020b6c3293d0c --- /dev/null +++ b/protocols/bgp/server/fsm_connect.go @@ -0,0 +1,61 @@ +package server + +import ( + "fmt" + "net" + "time" +) + +type connectState struct { + fsm *FSM2 +} + +func newConnectState(fsm *FSM2) *connectState { + return &connectState{ + fsm: fsm, + } +} + +func (s *connectState) run() (state, string) { + for { + select { + case e := <-s.fsm.eventCh: + if e == ManualStop { + return s.manualStop() + } + continue + case <-s.fsm.connectRetryTimer.C: + s.connectRetryTimerExpired() + continue + case c := <-s.fsm.conCh: + s.connectionSuccess(c) + } + } +} + +func (s *connectState) connectionSuccess(c net.Conn) (state, string) { + s.fsm.con = c + stopTimer(s.fsm.connectRetryTimer) + err := s.fsm.sendOpen() + if err != nil { + return newIdleState(s.fsm), fmt.Sprintf("Unable to send open: %v", err) + } + + s.fsm.holdTimer = time.NewTimer(time.Minute * 4) + return newOpenSentState(s.fsm), "TCP connection succeeded" +} + +func (s *connectState) connectRetryTimerExpired() { + s.fsm.resetConnectRetryTimer() + s.fsm.tcpConnect() +} + +func (s *connectState) manualStop() (state, string) { + s.fsm.resetConnectRetryCounter() + stopTimer(s.fsm.connectRetryTimer) + return newIdleState(s.fsm), "Manual stop event" +} + +func (s *connectState) cease() { + +} diff --git a/protocols/bgp/server/fsm_connect_test.go b/protocols/bgp/server/fsm_connect_test.go new file mode 100644 index 0000000000000000000000000000000000000000..abf2a66251d819a1c3574f2ae14575b1401652b0 --- /dev/null +++ b/protocols/bgp/server/fsm_connect_test.go @@ -0,0 +1,80 @@ +package server + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestConnectStateManualStop(t *testing.T) { + fsm := &FSM2{ + eventCh: make(chan int), + connectRetryCounter: 100, + connectRetryTimer: time.NewTimer(time.Second * 120), + } + fsm.startConnectRetryTimer() + fsm.state = newConnectState(fsm) + + var wg sync.WaitGroup + var nextState state + var reason string + wg.Add(1) + go func() { + nextState, reason = fsm.state.run() + wg.Done() + }() + + fsm.eventCh <- ManualStop + wg.Wait() + + assert.IsType(t, &idleState{}, nextState, "Unexpected state returned") + assert.Equalf(t, 0, fsm.connectRetryCounter, "Unexpected resetConnectRetryCounter: %d", fsm.connectRetryCounter) +} + +func TestConnectStateConnectRetryTimer(t *testing.T) { + fsm := &FSM2{ + eventCh: make(chan int), + connectRetryTimer: time.NewTimer(time.Second * 120), + } + fsm.startConnectRetryTimer() + fsm.state = newConnectState(fsm) + + var wg sync.WaitGroup + var nextState state + var reason string + wg.Add(1) + go func() { + fsm.connectRetryTimer = time.NewTimer(time.Duration(0)) + nextState, reason = fsm.state.run() + wg.Done() + }() + + wg.Wait() + + assert.IsType(t, &connectState{}, nextState, "Unexpected state returned") +} + +func TestConnectStateConEstablished(t *testing.T) { + fsm := &FSM2{ + eventCh: make(chan int), + connectRetryTimer: time.NewTimer(time.Second * 120), + } + fsm.startConnectRetryTimer() + fsm.state = newConnectState(fsm) + + var wg sync.WaitGroup + var nextState state + var reason string + wg.Add(1) + go func() { + fsm.connectRetryTimer = time.NewTimer(time.Duration(0)) + nextState, reason = fsm.state.run() + wg.Done() + }() + + wg.Wait() + + assert.IsType(t, &connectState{}, nextState, "Unexpected state returned") +} diff --git a/protocols/bgp/server/fsm_established.go b/protocols/bgp/server/fsm_established.go new file mode 100644 index 0000000000000000000000000000000000000000..18b968a355585ba5216e4bd3ca45f0be706ba088 --- /dev/null +++ b/protocols/bgp/server/fsm_established.go @@ -0,0 +1,215 @@ +package server + +import ( + "bytes" + "fmt" + "time" + + tnet "github.com/bio-routing/bio-rd/net" + "github.com/bio-routing/bio-rd/protocols/bgp/packet" + "github.com/bio-routing/bio-rd/route" + "github.com/bio-routing/bio-rd/routingtable" + "github.com/bio-routing/bio-rd/routingtable/adjRIBIn" + "github.com/bio-routing/bio-rd/routingtable/adjRIBOut" + "github.com/bio-routing/bio-rd/routingtable/adjRIBOutAddPath" +) + +type establishedState struct { + fsm *FSM2 +} + +func newEstablishedState(fsm *FSM2) *establishedState { + return &establishedState{ + fsm: fsm, + } +} + +func (s *establishedState) run() (state, string) { + if !s.fsm.ribsInitialized { + s.init() + } + + for { + select { + case e := <-s.fsm.eventCh: + if e == ManualStop { + return s.manualStop() + } + if e == AutomaticStop { + return s.automaticStop() + } + continue + case <-s.fsm.holdTimer.C: + return s.holdTimerExpired() + case <-s.fsm.keepaliveTimer.C: + return s.keepaliveTimerExpired() + case recvMsg := <-s.fsm.msgRecvCh: + return s.msgReceived(recvMsg) + } + } +} + +func (s *establishedState) init() { + s.fsm.adjRIBIn = adjRIBIn.New() + s.fsm.adjRIBIn.Register(s.fsm.rib) + + n := &routingtable.Neighbor{ + Type: route.BGPPathType, + Address: tnet.IPv4ToUint32(s.fsm.remote), + } + + clientOptions := routingtable.ClientOptions{} + if s.fsm.capAddPathSend { + s.fsm.updateSender = newUpdateSenderAddPath(s.fsm) + s.fsm.adjRIBOut = adjRIBOutAddPath.New(n) + clientOptions = s.fsm.peer.addPathSend + } else { + s.fsm.updateSender = newUpdateSender(s.fsm) + s.fsm.adjRIBOut = adjRIBOut.New(n) + } + + s.fsm.adjRIBOut.Register(s.fsm.updateSender) + s.fsm.rib.RegisterWithOptions(s.fsm.adjRIBOut, clientOptions) + + s.fsm.ribsInitialized = true +} + +func (s *establishedState) uninit() { + s.fsm.adjRIBOut.Unregister(s.fsm.updateSender) + s.fsm.rib.Unregister(s.fsm.adjRIBOut) + s.fsm.adjRIBIn.Unregister(s.fsm.rib) + s.fsm.ribsInitialized = false +} + +func (s *establishedState) manualStop() (state, string) { + s.fsm.sendNotification(packet.Cease, 0) + s.uninit() + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.connectRetryCounter = 0 + return newIdleState(s.fsm), "Manual stop event" +} + +func (s *establishedState) automaticStop() (state, string) { + s.fsm.sendNotification(packet.Cease, 0) + s.uninit() + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), "Automatic stop event" +} + +func (s *establishedState) holdTimerExpired() (state, string) { + s.fsm.sendNotification(packet.HoldTimeExpired, 0) + s.uninit() + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), "Holdtimer expired" +} + +func (s *establishedState) keepaliveTimerExpired() (state, string) { + err := s.fsm.sendKeepalive() + if err != nil { + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), fmt.Sprintf("Failed to send keepalive: %v", err) + } + + s.fsm.keepaliveTimer.Reset(time.Second * s.fsm.keepaliveTime) + return newEstablishedState(s.fsm), s.fsm.reason +} + +func (s *establishedState) msgReceived(recvMsg msgRecvMsg) (state, string) { + msg, err := packet.Decode(bytes.NewBuffer(recvMsg.msg)) + if err != nil { + fmt.Printf("Failed to decode BGP message: %v\n", recvMsg.msg) + switch bgperr := err.(type) { + case packet.BGPError: + s.fsm.sendNotification(bgperr.ErrorCode, bgperr.ErrorSubCode) + } + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), "Failed to decode BGP message" + } + switch msg.Header.Type { + case packet.NotificationMsg: + return s.notification() + case packet.UpdateMsg: + return s.update(msg) + default: + return s.unexpectedMessage() + } +} + +func (s *establishedState) notification() (state, string) { + stopTimer(s.fsm.connectRetryTimer) + s.uninit() + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), "Received NOTIFICATION" +} + +func (s *establishedState) update(msg *packet.BGPMessage) (state, string) { + if s.fsm.holdTime != 0 { + s.fsm.holdTimer.Reset(time.Second * s.fsm.holdTime) + } + + u := msg.Body.(*packet.BGPUpdate) + s.withdraws(u) + s.updates(u) + + return newEstablishedState(s.fsm), s.fsm.reason +} + +func (s *establishedState) withdraws(u *packet.BGPUpdate) { + for r := u.WithdrawnRoutes; r != nil; r = r.Next { + pfx := tnet.NewPfx(r.IP, r.Pfxlen) + fmt.Printf("LPM: Removing prefix %s\n", pfx.String()) + s.fsm.adjRIBIn.RemovePath(pfx, nil) + } +} + +func (s *establishedState) updates(u *packet.BGPUpdate) { + for r := u.NLRI; r != nil; r = r.Next { + pfx := tnet.NewPfx(r.IP, r.Pfxlen) + fmt.Printf("LPM: Adding prefix %s\n", pfx.String()) + + path := &route.Path{ + Type: route.BGPPathType, + BGPPath: &route.BGPPath{ + Source: tnet.IPv4ToUint32(s.fsm.remote), + }, + } + + for pa := u.PathAttributes; pa != nil; pa = pa.Next { + fmt.Printf("TypeCode: %d\n", pa.TypeCode) + switch pa.TypeCode { + case packet.OriginAttr: + path.BGPPath.Origin = pa.Value.(uint8) + case packet.LocalPrefAttr: + path.BGPPath.LocalPref = pa.Value.(uint32) + case packet.MEDAttr: + path.BGPPath.MED = pa.Value.(uint32) + case packet.NextHopAttr: + fmt.Printf("RECEIVED NEXT_HOP: %d\n", pa.Value.(uint32)) + path.BGPPath.NextHop = pa.Value.(uint32) + case packet.ASPathAttr: + path.BGPPath.ASPath = pa.ASPathString() + path.BGPPath.ASPathLen = pa.ASPathLen() + } + } + s.fsm.adjRIBIn.AddPath(pfx, path) + } +} + +func (s *establishedState) unexpectedMessage() (state, string) { + s.fsm.sendNotification(packet.FiniteStateMachineError, 0) + s.uninit() + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), "FSM Error" +} diff --git a/protocols/bgp/server/fsm_idle.go b/protocols/bgp/server/fsm_idle.go new file mode 100644 index 0000000000000000000000000000000000000000..4b4697f69cbad6a70a4fb5902b16b3527ecb562b --- /dev/null +++ b/protocols/bgp/server/fsm_idle.go @@ -0,0 +1,45 @@ +package server + +type idleState struct { + fsm *FSM2 + newStateReason string +} + +func newIdleState(fsm *FSM2) *idleState { + return &idleState{ + fsm: fsm, + } +} + +func (s *idleState) run() (state, string) { + for { + switch <-s.fsm.eventCh { + case ManualStart: + s.manualStart() + case AutomaticStart: + s.automaticStart() + default: + continue + } + + return newConnectState(s.fsm), s.newStateReason + } +} + +func (s *idleState) manualStart() { + s.newStateReason = "Received ManualStart event" + s.start() +} + +func (s *idleState) automaticStart() { + s.newStateReason = "Received AutomaticStart event" + s.start() +} + +func (s *idleState) start() { + s.fsm.resetConnectRetryCounter() + s.fsm.startConnectRetryTimer() + if s.fsm.active { + s.fsm.tcpConnect() + } +} diff --git a/protocols/bgp/server/fsm_idle_test.go b/protocols/bgp/server/fsm_idle_test.go new file mode 100644 index 0000000000000000000000000000000000000000..4cb1dff09ae556207fd41616d61e10fccd6ba428 --- /dev/null +++ b/protocols/bgp/server/fsm_idle_test.go @@ -0,0 +1,63 @@ +package server + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNewIdleState(t *testing.T) { + tests := []struct { + name string + fsm *FSM2 + expected *idleState + }{ + { + name: "Test #1", + fsm: &FSM2{}, + expected: &idleState{ + fsm: &FSM2{}, + }, + }, + } + + for _, test := range tests { + res := newIdleState(test.fsm) + assert.Equalf(t, test.expected, res, "Test: %s", test.name) + } +} + +func TestStart(t *testing.T) { + tests := []struct { + name string + state *idleState + expected *idleState + }{ + { + name: "Test #1", + state: &idleState{ + fsm: &FSM2{ + connectRetryCounter: 5, + connectRetryTimer: time.NewTimer(time.Second * 20), + }, + newStateReason: "Foo Bar", + }, + expected: &idleState{ + fsm: &FSM2{ + connectRetryCounter: 0, + connectRetryTimer: time.NewTimer(time.Second * 20), + }, + newStateReason: "Foo Bar", + }, + }, + } + + for _, test := range tests { + if !test.expected.fsm.connectRetryTimer.Stop() { + <-test.expected.fsm.connectRetryTimer.C + } + test.state.start() + assert.Equalf(t, test.expected, test.state, "Test: %s", test.name) + } +} diff --git a/protocols/bgp/server/fsm_manager.go b/protocols/bgp/server/fsm_manager.go new file mode 100644 index 0000000000000000000000000000000000000000..5511704c7950bf89a7eeb7ba5ff7d82843b8348b --- /dev/null +++ b/protocols/bgp/server/fsm_manager.go @@ -0,0 +1,25 @@ +package server + +import "net" + +type fsmManager struct { + fsms map[string][]*FSM2 +} + +func newFSMManager() *fsmManager { + return &fsmManager{ + fsms: make(map[string][]*FSM2, 0), + } +} + +func (m *fsmManager) resolveCollision(addr net.IP) { + +} + +func (m *fsmManager) newFSMPassive() *FSM2 { + return &FSM2{} +} + +func (m *fsmManager) newFSMActive() *FSM2 { + return &FSM2{} +} diff --git a/protocols/bgp/server/fsm_open_confirm.go b/protocols/bgp/server/fsm_open_confirm.go new file mode 100644 index 0000000000000000000000000000000000000000..5c92cb0ee2f247f621684815dd89b7b7239c5459 --- /dev/null +++ b/protocols/bgp/server/fsm_open_confirm.go @@ -0,0 +1,119 @@ +package server + +import ( + "bytes" + "fmt" + "time" + + "github.com/bio-routing/bio-rd/protocols/bgp/packet" +) + +type openConfirmState struct { + fsm *FSM2 +} + +func newOpenConfirmState(fsm *FSM2) *openConfirmState { + return &openConfirmState{ + fsm: fsm, + } +} + +func (s *openConfirmState) run() (state, string) { + for { + select { + case e := <-s.fsm.eventCh: + if e == ManualStop { + return s.manualStop() + } + continue + case <-s.fsm.holdTimer.C: + return s.holdTimerExpired() + case <-s.fsm.keepaliveTimer.C: + return s.keepaliveTimerExpired() + case recvMsg := <-s.fsm.msgRecvCh: + return s.msgReceived(recvMsg) + } + } +} + +func (s *openConfirmState) manualStop() (state, string) { + s.fsm.sendNotification(packet.Cease, 0) + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.resetConnectRetryCounter() + return newIdleState(s.fsm), "Manual stop event" +} + +func (s *openConfirmState) automaticStop() (state, string) { + s.fsm.sendNotification(packet.Cease, 0) + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), "Automatic stop event" +} + +func (s *openConfirmState) holdTimerExpired() (state, string) { + s.fsm.sendNotification(packet.HoldTimeExpired, 0) + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), "Holdtimer expired" +} + +func (s *openConfirmState) keepaliveTimerExpired() (state, string) { + err := s.fsm.sendKeepalive() + if err != nil { + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), fmt.Sprintf("Failed to send keepalive: %v", err) + } + s.fsm.keepaliveTimer.Reset(time.Second * s.fsm.keepaliveTime) + return newOpenConfirmState(s.fsm), s.fsm.reason +} + +func (s *openConfirmState) msgReceived(recvMsg msgRecvMsg) (state, string) { + msg, err := packet.Decode(bytes.NewBuffer(recvMsg.msg)) + if err != nil { + switch bgperr := err.(type) { + case packet.BGPError: + s.fsm.sendNotification(bgperr.ErrorCode, bgperr.ErrorSubCode) + } + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), fmt.Sprintf("Failed to decode BGP message: %v", err) + } + switch msg.Header.Type { + case packet.NotificationMsg: + return s.notification(msg) + case packet.KeepaliveMsg: + return s.keepaliveReceived() + default: + return s.unexpectedMessage() + } +} + +func (s *openConfirmState) notification(msg *packet.BGPMessage) (state, string) { + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + nMsg := msg.Body.(*packet.BGPNotification) + if nMsg.ErrorCode != packet.UnsupportedVersionNumber { + s.fsm.connectRetryCounter++ + } + + return newIdleState(s.fsm), "Received NOTIFICATION" +} + +func (s *openConfirmState) keepaliveReceived() (state, string) { + s.fsm.holdTimer.Reset(time.Second * s.fsm.holdTime) + return newEstablishedState(s.fsm), "Received KEEPALIVE" +} + +func (s *openConfirmState) unexpectedMessage() (state, string) { + s.fsm.sendNotification(packet.FiniteStateMachineError, 0) + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), "FSM Error" +} diff --git a/protocols/bgp/server/fsm_open_sent.go b/protocols/bgp/server/fsm_open_sent.go new file mode 100644 index 0000000000000000000000000000000000000000..c1ad7ff4bdbf0a627a9f4edb4e04ac960fd16039 --- /dev/null +++ b/protocols/bgp/server/fsm_open_sent.go @@ -0,0 +1,182 @@ +package server + +import ( + "bytes" + "fmt" + "math" + "time" + + "github.com/bio-routing/bio-rd/protocols/bgp/packet" +) + +type openSentState struct { + fsm *FSM2 +} + +func newOpenSentState(fsm *FSM2) *openSentState { + return &openSentState{ + fsm: fsm, + } +} + +func (s *openSentState) run() (state, string) { + for { + select { + case e := <-s.fsm.eventCh: + switch e { + case ManualStop: + return s.manualStop() + case AutomaticStop: + return s.automaticStop() + default: + continue + } + case <-s.fsm.holdTimer.C: + return s.holdTimerExpired() + case recvMsg := <-s.fsm.msgRecvCh: + return s.msgReceived(recvMsg) + } + } +} + +func (s *openSentState) manualStop() (state, string) { + s.fsm.sendNotification(packet.Cease, 0) + s.fsm.resetConnectRetryTimer() + s.fsm.con.Close() + s.fsm.resetConnectRetryCounter() + return newIdleState(s.fsm), "Manual stop event" +} + +func (s *openSentState) automaticStop() (state, string) { + s.fsm.sendNotification(packet.Cease, 0) + s.fsm.resetConnectRetryTimer() + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), "Automatic stop event" +} + +func (s *openSentState) holdTimerExpired() (state, string) { + s.fsm.sendNotification(packet.HoldTimeExpired, 0) + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), "Holdtimer expired" +} + +func (s *openSentState) msgReceived(recvMsg msgRecvMsg) (state, string) { + msg, err := packet.Decode(bytes.NewBuffer(recvMsg.msg)) + if err != nil { + switch bgperr := err.(type) { + case packet.BGPError: + s.fsm.sendNotification(bgperr.ErrorCode, bgperr.ErrorSubCode) + } + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), fmt.Sprintf("Failed to decode BGP message: %v", err) + } + switch msg.Header.Type { + case packet.NotificationMsg: + return s.notification(msg) + case packet.OpenMsg: + return s.openMsgReceived(msg) + default: + return s.unexpectedMessage() + } +} + +func (s *openSentState) unexpectedMessage() (state, string) { + s.fsm.sendNotification(packet.FiniteStateMachineError, 0) + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), "FSM Error" +} + +func (s *openSentState) openMsgReceived(msg *packet.BGPMessage) (state, string) { + openMsg := msg.Body.(*packet.BGPOpen) + s.fsm.neighborID = openMsg.BGPIdentifier + stopTimer(s.fsm.connectRetryTimer) + err := s.fsm.sendKeepalive() + if err != nil { + return s.tcpFailure() + } + + s.fsm.holdTime = time.Duration(math.Min(float64(s.fsm.holdTimeConfigured), float64(openMsg.HoldTime))) + if s.fsm.holdTime != 0 { + s.fsm.holdTimer.Reset(time.Second * s.fsm.holdTime) + s.fsm.keepaliveTime = s.fsm.holdTime / 3 + s.fsm.keepaliveTimer.Reset(time.Second * s.fsm.keepaliveTime) + } + + s.processOpenOptions(openMsg.OptParams) + return newOpenConfirmState(s.fsm), "Received OPEN message" +} + +func (s *openSentState) tcpFailure() (state, string) { + s.fsm.con.Close() + s.fsm.resetConnectRetryTimer() + return newActiveState(s.fsm), "TCP connection failure" +} + +func (s *openSentState) processOpenOptions(optParams []packet.OptParam) { + for _, optParam := range optParams { + if optParam.Type != packet.CapabilitiesParamType { + continue + } + + s.processCapabilities(optParam.Value.(packet.Capabilities)) + } +} + +func (s *openSentState) processCapabilities(caps packet.Capabilities) { + for _, cap := range caps { + s.processCapability(cap) + } +} + +func (s *openSentState) processCapability(cap packet.Capability) { + switch cap.Code { + case packet.AddPathCapabilityCode: + s.processAddPathCapability(cap.Value.(packet.AddPathCapability)) + + } +} + +func (s *openSentState) processAddPathCapability(addPathCap packet.AddPathCapability) { + if addPathCap.AFI != 1 { + return + } + if addPathCap.SAFI != 1 { + return + } + + switch addPathCap.SendReceive { + case packet.AddPathReceive: + if !s.fsm.peer.addPathSend.BestOnly { + s.fsm.capAddPathSend = true + } + case packet.AddPathSend: + if s.fsm.peer.addPathRecv { + s.fsm.capAddPathRecv = true + } + case packet.AddPathSendReceive: + if !s.fsm.peer.addPathSend.BestOnly { + s.fsm.capAddPathSend = true + } + if s.fsm.peer.addPathRecv { + s.fsm.capAddPathRecv = true + } + } +} + +func (s *openSentState) notification(msg *packet.BGPMessage) (state, string) { + stopTimer(s.fsm.connectRetryTimer) + s.fsm.con.Close() + nMsg := msg.Body.(*packet.BGPNotification) + if nMsg.ErrorCode != packet.UnsupportedVersionNumber { + s.fsm.connectRetryCounter++ + } + + return newIdleState(s.fsm), "Received NOTIFICATION" +} diff --git a/protocols/bgp/server/mock_conn.go b/protocols/bgp/server/mock_conn.go new file mode 100644 index 0000000000000000000000000000000000000000..b07985f9ebc317812e0870d0166d3b599a867110 --- /dev/null +++ b/protocols/bgp/server/mock_conn.go @@ -0,0 +1,64 @@ +package server + +import ( + "bytes" + "fmt" + "net" + "time" +) + +type mockCon struct { + closed bool + localAddr net.Addr + remoteAddr net.Addr + buffer bytes.Buffer +} + +type mockAddr struct { +} + +func (m *mockAddr) Network() string { + return "" +} + +func (m *mockAddr) String() string { + return "" +} + +func newMockCon(localAddr net.Addr, remoteAddr net.Addr) *mockCon { + return &mockCon{} +} + +func (m *mockCon) Read(b []byte) (n int, err error) { + + return 0, nil +} + +func (m *mockCon) Write(b []byte) (n int, err error) { + return 0, nil +} + +func (m *mockCon) Close() error { + m.closed = true + return nil +} + +func (m *mockCon) LocalAddr() net.Addr { + return m.localAddr +} + +func (m *mockCon) RemoteAddr() net.Addr { + return m.remoteAddr +} + +func (m *mockCon) SetDeadline(t time.Time) error { + return fmt.Errorf("Not implemented") +} + +func (m *mockCon) SetReadDeadline(t time.Time) error { + return fmt.Errorf("Not implemented") +} + +func (m *mockCon) SetWriteDeadline(t time.Time) error { + return fmt.Errorf("Not implemented") +} diff --git a/protocols/bgp/server/peer.go b/protocols/bgp/server/peer.go index 5077aaeff37e7634f0e2d12bb4988d30f1f52775..49d8baed1550f0e2cd4e7c2f5df855cdc54b88c1 100644 --- a/protocols/bgp/server/peer.go +++ b/protocols/bgp/server/peer.go @@ -13,6 +13,7 @@ import ( type Peer struct { addr net.IP asn uint32 + localASN uint32 fsm *FSM rib routingtable.RouteTableClient routerID uint32 @@ -28,6 +29,7 @@ func NewPeer(c config.Peer, rib routingtable.RouteTableClient) (*Peer, error) { p := &Peer{ addr: c.PeerAddress, asn: c.PeerAS, + localASN: c.LocalAS, rib: rib, addPathSend: c.AddPathSend, addPathRecv: c.AddPathRecv, diff --git a/protocols/bgp/server/timer.go b/protocols/bgp/server/timer.go new file mode 100644 index 0000000000000000000000000000000000000000..d0834e35ad39cf9a05567af402de79664fa9a2c5 --- /dev/null +++ b/protocols/bgp/server/timer.go @@ -0,0 +1,8 @@ +package server + +import "time" + +type timer interface { + Stop() bool + Reset(d time.Duration) bool +} diff --git a/protocols/bgp/server/update_helper.go b/protocols/bgp/server/update_helper.go index 4a50f1d7ef6903c415a99108f8540fc5d0b40bcd..b749f1433ec7cea2a6d1b026161d1b6758f98a3b 100644 --- a/protocols/bgp/server/update_helper.go +++ b/protocols/bgp/server/update_helper.go @@ -10,8 +10,8 @@ import ( log "github.com/sirupsen/logrus" ) -func pathAttribues(p *route.Path, fsm *FSM) (*packet.PathAttribute, error) { - asPathPA, err := packet.ParseASPathStr(strings.TrimRight(fmt.Sprintf("%d %s", fsm.localASN, p.BGPPath.ASPath), " ")) +func pathAttribues(p *route.Path, fsm *FSM2) (*packet.PathAttribute, error) { + asPathPA, err := packet.ParseASPathStr(strings.TrimRight(fmt.Sprintf("%d %s", fsm.peer.localASN, p.BGPPath.ASPath), " ")) if err != nil { return nil, fmt.Errorf("Unable to parse AS path: %v", err) } diff --git a/protocols/bgp/server/update_sender.go b/protocols/bgp/server/update_sender.go index 6c6b7dfd152c0cf387055e8e86a5d9fc03078000..d6e3ee40a08b974652cdb212683c54292d0b37f8 100644 --- a/protocols/bgp/server/update_sender.go +++ b/protocols/bgp/server/update_sender.go @@ -15,14 +15,14 @@ import ( // UpdateSender converts table changes into BGP update messages type UpdateSender struct { routingtable.ClientManager - fsm *FSM + fsm *FSM2 iBGP bool } -func newUpdateSender(fsm *FSM) *UpdateSender { +func newUpdateSender(fsm *FSM2) *UpdateSender { return &UpdateSender{ fsm: fsm, - iBGP: fsm.localASN == fsm.remoteASN, + iBGP: fsm.peer.localASN == fsm.peer.asn, } } diff --git a/protocols/bgp/server/update_sender_add_path.go b/protocols/bgp/server/update_sender_add_path.go index 5c9f545720e797309c80039a8ce466be2e72878b..0eb187899a5cb6bf6210ff52a6f46ad3ed46671c 100644 --- a/protocols/bgp/server/update_sender_add_path.go +++ b/protocols/bgp/server/update_sender_add_path.go @@ -12,14 +12,14 @@ import ( // UpdateSenderAddPath converts table changes into BGP update messages with add path type UpdateSenderAddPath struct { routingtable.ClientManager - fsm *FSM + fsm *FSM2 iBGP bool } -func newUpdateSenderAddPath(fsm *FSM) *UpdateSenderAddPath { +func newUpdateSenderAddPath(fsm *FSM2) *UpdateSenderAddPath { return &UpdateSenderAddPath{ fsm: fsm, - iBGP: fsm.localASN == fsm.remoteASN, + iBGP: fsm.peer.localASN == fsm.peer.asn, } }