diff --git a/config/peer.go b/config/peer.go index ded26d7bd89f5d3bde535148284c632cfef14284..af156695925cf2ac96bcaf0d5e871d7cc227ffda 100644 --- a/config/peer.go +++ b/config/peer.go @@ -2,7 +2,6 @@ package config import ( "net" - "time" "github.com/bio-routing/bio-rd/routingtable" @@ -10,8 +9,9 @@ import ( type Peer struct { AdminEnabled bool - KeepAlive uint16 - HoldTimer uint16 + ReconnectInterval time.Duration + KeepAlive time.Duration + HoldTimer time.Duration LocalAddress net.IP PeerAddress net.IP LocalAS uint32 @@ -20,5 +20,4 @@ type Peer struct { RouterID uint32 AddPathSend routingtable.ClientOptions AddPathRecv bool - ReconnectInterval time.Duration } diff --git a/main.go b/main.go index 1952d771d9713831e851f7c7388b532ba1f4f7f1..d717d0a29eed085cf3bb1228212bd5b62bbe4e32 100644 --- a/main.go +++ b/main.go @@ -32,32 +32,34 @@ func main() { } b.AddPeer(config.Peer{ - AdminEnabled: true, - LocalAS: 65200, - PeerAS: 65300, - PeerAddress: net.IP([]byte{169, 254, 200, 1}), - LocalAddress: net.IP([]byte{169, 254, 200, 0}), - HoldTimer: 90, - KeepAlive: 30, - Passive: true, - RouterID: b.RouterID(), + AdminEnabled: true, + LocalAS: 65200, + PeerAS: 65300, + PeerAddress: net.IP([]byte{169, 254, 200, 1}), + LocalAddress: net.IP([]byte{169, 254, 200, 0}), + ReconnectInterval: time.Second * 15, + HoldTimer: time.Second * 90, + KeepAlive: time.Second * 30, + Passive: true, + RouterID: b.RouterID(), AddPathSend: routingtable.ClientOptions{ MaxPaths: 10, }, }, rib) - time.Sleep(time.Second * 15) + //time.Sleep(time.Second * 15) b.AddPeer(config.Peer{ - AdminEnabled: true, - LocalAS: 65200, - PeerAS: 65100, - PeerAddress: net.IP([]byte{169, 254, 100, 0}), - LocalAddress: net.IP([]byte{169, 254, 100, 1}), - HoldTimer: 90, - KeepAlive: 30, - Passive: true, - RouterID: b.RouterID(), + AdminEnabled: true, + LocalAS: 65200, + PeerAS: 65100, + PeerAddress: net.IP([]byte{169, 254, 100, 0}), + LocalAddress: net.IP([]byte{169, 254, 100, 1}), + ReconnectInterval: time.Second * 15, + HoldTimer: time.Second * 90, + KeepAlive: time.Second * 30, + Passive: true, + RouterID: b.RouterID(), AddPathSend: routingtable.ClientOptions{ MaxPaths: 10, }, diff --git a/protocols/bgp/packet/decoder_test.go b/protocols/bgp/packet/decoder_test.go index 19a83a960c956378415d2c1792abce344dbb76dc..a9ae237b9cb94a3e0ec6af0b988bc09f41d3f806 100644 --- a/protocols/bgp/packet/decoder_test.go +++ b/protocols/bgp/packet/decoder_test.go @@ -227,6 +227,26 @@ func TestDecode(t *testing.T) { }, wantFail: true, }, + { + testNum: 8, + input: []byte{ + 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, + 0, 21, + 3, + 6, 4, + }, + wantFail: false, + expected: &BGPMessage{ + Header: &BGPHeader{ + Length: 21, + Type: 3, + }, + Body: &BGPNotification{ + ErrorCode: 6, + ErrorSubcode: 4, + }, + }, + }, } for _, test := range tests { @@ -252,7 +272,7 @@ func TestDecode(t *testing.T) { continue } - assert.Equal(t, test.expected, msg) + assert.Equalf(t, test.expected, msg, "Test: %d", test.testNum) } } diff --git a/protocols/bgp/server/fsm2.go b/protocols/bgp/server/fsm2.go index 52d1b7ae60888f7419d4b902fda32619d4084d57..d0f4f15ff4d1fda7e6b1eb670ff85d51736ef794 100644 --- a/protocols/bgp/server/fsm2.go +++ b/protocols/bgp/server/fsm2.go @@ -3,12 +3,24 @@ package server import ( "fmt" "net" + "sync" "time" "github.com/bio-routing/bio-rd/protocols/bgp/packet" "github.com/bio-routing/bio-rd/routingtable" - "github.com/bio-routing/bio-rd/routingtable/adjRIBIn" "github.com/bio-routing/bio-rd/routingtable/locRIB" + log "github.com/sirupsen/logrus" +) + +const ( + // Administrative events + ManualStart = 1 + ManualStop = 2 + AutomaticStart = 3 + ManualStartWithPassiveTcpEstablishment = 4 + AutomaticStartWithPassiveTcpEstablishment = 5 + AutomaticStop = 8 + Cease = 100 ) type state interface { @@ -17,11 +29,12 @@ type state interface { // FSM2 implements the BGP finite state machine (RFC4271) type FSM2 struct { - server *BGPServer - peer *Peer - eventCh chan int - con net.Conn - conCh chan net.Conn + peer *Peer + eventCh chan int + con net.Conn + conCh chan net.Conn + initiateCon chan struct{} + conErrCh chan error delayOpen bool delayOpenTime time.Duration @@ -31,70 +44,193 @@ type FSM2 struct { connectRetryTimer *time.Timer connectRetryCounter int - holdTimeConfigured time.Duration - holdTime time.Duration - holdTimer *time.Timer + holdTime time.Duration + holdTimer *time.Timer keepaliveTime time.Duration keepaliveTimer *time.Timer - msgRecvCh chan msgRecvMsg - msgRecvFailCh chan msgRecvErr + msgRecvCh chan []byte + msgRecvFailCh chan error stopMsgRecvCh chan struct{} capAddPathSend bool capAddPathRecv bool - local net.IP - remote net.IP + local net.IP + //remote net.IP ribsInitialized bool - adjRIBIn *adjRIBIn.AdjRIBIn + adjRIBIn routingtable.RouteTableClient adjRIBOut routingtable.RouteTableClient rib *locRIB.LocRIB updateSender routingtable.RouteTableClient neighborID uint32 state state + stateMu sync.RWMutex reason string active bool } // NewPassiveFSM2 initiates a new passive FSM func NewPassiveFSM2(peer *Peer, con *net.TCPConn) *FSM2 { - fsm := &FSM2{ - peer: peer, - eventCh: make(chan int), - con: con, - conCh: make(chan net.Conn), - msgRecvCh: make(chan msgRecvMsg), - msgRecvFailCh: make(chan msgRecvErr), - stopMsgRecvCh: make(chan struct{}), - } - + fmt.Printf("NewPassiveFSM2\n") + fsm := newFSM2(peer) + fsm.con = con + fsm.state = newIdleState(fsm) return fsm } // NewActiveFSM2 initiates a new passive FSM func NewActiveFSM2(peer *Peer) *FSM2 { + fmt.Printf("NewActiveFSM2\n") + fsm := newFSM2(peer) + fsm.active = true + fsm.state = newIdleState(fsm) + return fsm +} + +func newFSM2(peer *Peer) *FSM2 { return &FSM2{ - peer: peer, - eventCh: make(chan int), - active: true, - conCh: make(chan net.Conn), + connectRetryTime: time.Minute, + peer: peer, + eventCh: make(chan int), + conCh: make(chan net.Conn), + conErrCh: make(chan error), + initiateCon: make(chan struct{}), + msgRecvCh: make(chan []byte), + msgRecvFailCh: make(chan error), + stopMsgRecvCh: make(chan struct{}), + rib: peer.rib, + } +} + +func (fsm *FSM2) start() { + go fsm.run() + go fsm.tcpConnector() + return +} + +func (fsm *FSM2) activate() { + fsm.eventCh <- AutomaticStart +} + +func (fsm *FSM2) run() { + //fmt.Printf("Starting FSM\n") + next, reason := fsm.state.run() + for { + newState := stateName(next) + oldState := stateName(fsm.state) + + if oldState != newState { + log.WithFields(log.Fields{ + "peer": fsm.peer.addr.String(), + "last_state": oldState, + "new_state": newState, + "reason": reason, + }).Info("FSM: Neighbor state change") + } + + if newState == "cease" { + return + } + + //fmt.Printf("Aquiring lock...\n") + fsm.stateMu.Lock() + fsm.state = next + //fmt.Printf("Releasing lock...\n") + fsm.stateMu.Unlock() + + //fmt.Printf("Running new state\n") + next, reason = fsm.state.run() } } -func (fsm *FSM2) Cease() { +func stateName(s state) string { + switch s.(type) { + case *idleState: + return "idle" + case *connectState: + return "connect" + case *activeState: + return "active" + case *openSentState: + return "openSent" + case *openConfirmState: + return "openConfirm" + case *establishedState: + return "established" + case *ceaseState: + return "cease" + default: + panic(fmt.Sprintf("Unknown state: %v", s)) + } +} + +func (fsm *FSM2) cease() { + fsm.eventCh <- Cease +} + +func (fsm *FSM2) tcpConnector() error { + fmt.Printf("TCP CONNECTOR STARTED\n") + for { + //fmt.Printf("READING FROM fsm.initiateCon\n") + select { + case <-fsm.initiateCon: + fmt.Printf("Initiating connection to %s\n", fsm.peer.addr.String()) + c, err := net.DialTCP("tcp", &net.TCPAddr{IP: fsm.local}, &net.TCPAddr{IP: fsm.peer.addr, Port: BGPPORT}) + if err != nil { + select { + case fsm.conErrCh <- err: + continue + case <-time.NewTimer(time.Second * 30).C: + continue + } + } + + //fmt.Printf("GOT CONNECTION!\n") + select { + case fsm.conCh <- c: + continue + case <-time.NewTimer(time.Second * 30).C: + c.Close() + continue + } + } + } +} +func (fsm *FSM2) tcpConnect() { + fsm.initiateCon <- struct{}{} +} + +func (fsm *FSM2) msgReceiver() error { + for { + msg, err := recvMsg(fsm.con) + if err != nil { + fsm.msgRecvFailCh <- err + return nil + + /*select { + case fsm.msgRecvFailCh <- msgRecvErr{err: err, con: c}: + continue + case <-time.NewTimer(time.Second * 60).C: + return nil + }*/ + } + fmt.Printf("Message received for %s: %v\n", fsm.con.RemoteAddr().String(), msg[18]) + fsm.msgRecvCh <- msg + } } func (fsm *FSM2) startConnectRetryTimer() { - fsm.connectRetryTimer = time.NewTimer(time.Second * fsm.connectRetryTime) + fmt.Printf("Initializing connectRetryTimer: %d\n", fsm.connectRetryTime) + fsm.connectRetryTimer = time.NewTimer(fsm.connectRetryTime) } func (fsm *FSM2) resetConnectRetryTimer() { - if !fsm.connectRetryTimer.Reset(time.Second * fsm.connectRetryTime) { + if !fsm.connectRetryTimer.Reset(fsm.connectRetryTime) { <-fsm.connectRetryTimer.C } } @@ -103,16 +239,14 @@ func (fsm *FSM2) resetConnectRetryCounter() { fsm.connectRetryCounter = 0 } -func (fsm *FSM2) tcpConnect() { - -} - func (fsm *FSM2) sendOpen() error { + fmt.Printf("Sending OPEN Message to %s\n", fsm.con.RemoteAddr().String()) + msg := packet.SerializeOpenMsg(&packet.BGPOpen{ Version: BGPVersion, - AS: uint16(fsm.peer.asn), - HoldTime: uint16(fsm.holdTimeConfigured), - BGPIdentifier: fsm.server.routerID, + AS: uint16(fsm.peer.localASN), + HoldTime: uint16(fsm.peer.holdTime / time.Second), + BGPIdentifier: fsm.peer.server.routerID, OptParams: fsm.peer.optOpenParams, }) @@ -125,6 +259,7 @@ func (fsm *FSM2) sendOpen() error { } func (fsm *FSM2) sendNotification(errorCode uint8, errorSubCode uint8) error { + fmt.Printf("Sending NOTIFICATION Message to %s\n", fsm.con.RemoteAddr().String()) msg := packet.SerializeNotificationMsg(&packet.BGPNotification{}) _, err := fsm.con.Write(msg) @@ -136,6 +271,7 @@ func (fsm *FSM2) sendNotification(errorCode uint8, errorSubCode uint8) error { } func (fsm *FSM2) sendKeepalive() error { + fmt.Printf("Sending KEEPALIVE to %s\n", fsm.con.RemoteAddr().String()) msg := packet.SerializeKeepaliveMsg() _, err := fsm.con.Write(msg) diff --git a/protocols/bgp/server/fsm_active.go b/protocols/bgp/server/fsm_active.go index e2884dc6be28ccb7fec65d263bf5a47cc3cd7827..2b269242f7017c10435a3a2a718d009f5a3c3b15 100644 --- a/protocols/bgp/server/fsm_active.go +++ b/protocols/bgp/server/fsm_active.go @@ -1,5 +1,11 @@ package server +import ( + "fmt" + "net" + "time" +) + type activeState struct { fsm *FSM2 } @@ -10,18 +16,56 @@ func newActiveState(fsm *FSM2) *activeState { } } -func (s *activeState) run() (state, string) { +func (s activeState) run() (state, string) { + fmt.Printf("THIS IS ACTIVE STATE\n") for { select { case e := <-s.fsm.eventCh: - if e == ManualStop { - + switch e { + case ManualStop: + return s.manualStop() + case Cease: + return s.cease() + default: + continue } - continue case <-s.fsm.connectRetryTimer.C: - + return s.connectRetryTimerExpired() case c := <-s.fsm.conCh: - + fmt.Printf("Received a connection in active state\n") + return s.connectionSuccess(c) } } } + +func (s *activeState) manualStop() (state, string) { + s.fsm.con.Close() + s.fsm.resetConnectRetryCounter() + stopTimer(s.fsm.connectRetryTimer) + return newIdleState(s.fsm), "Manual stop event" +} + +func (s *activeState) cease() (state, string) { + s.fsm.con.Close() + return newCeaseState(), "Cease" +} + +func (s *activeState) connectRetryTimerExpired() (state, string) { + s.fsm.resetConnectRetryTimer() + s.fsm.tcpConnect() + return newConnectState(s.fsm), "Connect retry timer expired" +} + +func (s *activeState) connectionSuccess(con net.Conn) (state, string) { + s.fsm.con = con + stopTimer(s.fsm.connectRetryTimer) + err := s.fsm.sendOpen() + if err != nil { + s.fsm.resetConnectRetryTimer() + s.fsm.connectRetryCounter++ + return newIdleState(s.fsm), fmt.Sprintf("Sending OPEN message failed: %v", err) + } + s.fsm.holdTimer = time.NewTimer(time.Minute * 4) + fmt.Printf("Next state: OpenSent!\n") + return newOpenSentState(s.fsm), "Sent OPEN message" +} diff --git a/protocols/bgp/server/fsm_cease.go b/protocols/bgp/server/fsm_cease.go new file mode 100644 index 0000000000000000000000000000000000000000..7d239840bbda1e5fe16418f410cb413e5357b36f --- /dev/null +++ b/protocols/bgp/server/fsm_cease.go @@ -0,0 +1,12 @@ +package server + +type ceaseState struct { +} + +func newCeaseState() *ceaseState { + return &ceaseState{} +} + +func (s ceaseState) run() (state, string) { + return newCeaseState(), "Loop" +} diff --git a/protocols/bgp/server/fsm_connect.go b/protocols/bgp/server/fsm_connect.go index e25ff12f37412eb86e36287b41b020b6c3293d0c..eac1459425aa66a271851fed21034bf8cd42e006 100644 --- a/protocols/bgp/server/fsm_connect.go +++ b/protocols/bgp/server/fsm_connect.go @@ -16,36 +16,41 @@ func newConnectState(fsm *FSM2) *connectState { } } -func (s *connectState) run() (state, string) { +func (s connectState) run() (state, string) { for { select { case e := <-s.fsm.eventCh: - if e == ManualStop { + switch e { + case ManualStop: return s.manualStop() + case Cease: + return newCeaseState(), "Cease" + default: + continue } - continue case <-s.fsm.connectRetryTimer.C: s.connectRetryTimerExpired() continue case c := <-s.fsm.conCh: - s.connectionSuccess(c) + return s.connectionSuccess(c) } } } func (s *connectState) connectionSuccess(c net.Conn) (state, string) { + fmt.Printf("GOT A TCP CONNECTION!\n") s.fsm.con = c stopTimer(s.fsm.connectRetryTimer) err := s.fsm.sendOpen() if err != nil { return newIdleState(s.fsm), fmt.Sprintf("Unable to send open: %v", err) } - s.fsm.holdTimer = time.NewTimer(time.Minute * 4) return newOpenSentState(s.fsm), "TCP connection succeeded" } func (s *connectState) connectRetryTimerExpired() { + fmt.Printf("Connect retry timer expired\n") s.fsm.resetConnectRetryTimer() s.fsm.tcpConnect() } @@ -55,7 +60,3 @@ func (s *connectState) manualStop() (state, string) { stopTimer(s.fsm.connectRetryTimer) return newIdleState(s.fsm), "Manual stop event" } - -func (s *connectState) cease() { - -} diff --git a/protocols/bgp/server/fsm_established.go b/protocols/bgp/server/fsm_established.go index 18b968a355585ba5216e4bd3ca45f0be706ba088..faebad99f295507bf11b660f76071908a939bdc6 100644 --- a/protocols/bgp/server/fsm_established.go +++ b/protocols/bgp/server/fsm_established.go @@ -3,7 +3,6 @@ package server import ( "bytes" "fmt" - "time" tnet "github.com/bio-routing/bio-rd/net" "github.com/bio-routing/bio-rd/protocols/bgp/packet" @@ -24,7 +23,7 @@ func newEstablishedState(fsm *FSM2) *establishedState { } } -func (s *establishedState) run() (state, string) { +func (s establishedState) run() (state, string) { if !s.fsm.ribsInitialized { s.init() } @@ -32,13 +31,16 @@ func (s *establishedState) run() (state, string) { for { select { case e := <-s.fsm.eventCh: - if e == ManualStop { + switch e { + case ManualStop: return s.manualStop() - } - if e == AutomaticStop { + case AutomaticStop: return s.automaticStop() + case Cease: + return s.cease() + default: + continue } - continue case <-s.fsm.holdTimer.C: return s.holdTimerExpired() case <-s.fsm.keepaliveTimer.C: @@ -55,7 +57,7 @@ func (s *establishedState) init() { n := &routingtable.Neighbor{ Type: route.BGPPathType, - Address: tnet.IPv4ToUint32(s.fsm.remote), + Address: tnet.IPv4ToUint32(s.fsm.peer.addr), } clientOptions := routingtable.ClientOptions{} @@ -99,6 +101,13 @@ func (s *establishedState) automaticStop() (state, string) { return newIdleState(s.fsm), "Automatic stop event" } +func (s *establishedState) cease() (state, string) { + s.fsm.sendNotification(packet.Cease, 0) + s.uninit() + s.fsm.con.Close() + return newCeaseState(), "Cease" +} + func (s *establishedState) holdTimerExpired() (state, string) { s.fsm.sendNotification(packet.HoldTimeExpired, 0) s.uninit() @@ -117,14 +126,13 @@ func (s *establishedState) keepaliveTimerExpired() (state, string) { return newIdleState(s.fsm), fmt.Sprintf("Failed to send keepalive: %v", err) } - s.fsm.keepaliveTimer.Reset(time.Second * s.fsm.keepaliveTime) + s.fsm.keepaliveTimer.Reset(s.fsm.keepaliveTime) return newEstablishedState(s.fsm), s.fsm.reason } -func (s *establishedState) msgReceived(recvMsg msgRecvMsg) (state, string) { - msg, err := packet.Decode(bytes.NewBuffer(recvMsg.msg)) +func (s *establishedState) msgReceived(data []byte) (state, string) { + msg, err := packet.Decode(bytes.NewBuffer(data)) if err != nil { - fmt.Printf("Failed to decode BGP message: %v\n", recvMsg.msg) switch bgperr := err.(type) { case packet.BGPError: s.fsm.sendNotification(bgperr.ErrorCode, bgperr.ErrorSubCode) @@ -139,6 +147,8 @@ func (s *establishedState) msgReceived(recvMsg msgRecvMsg) (state, string) { return s.notification() case packet.UpdateMsg: return s.update(msg) + case packet.KeepaliveMsg: + return s.keepaliveReceived() default: return s.unexpectedMessage() } @@ -154,7 +164,7 @@ func (s *establishedState) notification() (state, string) { func (s *establishedState) update(msg *packet.BGPMessage) (state, string) { if s.fsm.holdTime != 0 { - s.fsm.holdTimer.Reset(time.Second * s.fsm.holdTime) + s.fsm.holdTimer.Reset(s.fsm.holdTime) } u := msg.Body.(*packet.BGPUpdate) @@ -167,7 +177,6 @@ func (s *establishedState) update(msg *packet.BGPMessage) (state, string) { func (s *establishedState) withdraws(u *packet.BGPUpdate) { for r := u.WithdrawnRoutes; r != nil; r = r.Next { pfx := tnet.NewPfx(r.IP, r.Pfxlen) - fmt.Printf("LPM: Removing prefix %s\n", pfx.String()) s.fsm.adjRIBIn.RemovePath(pfx, nil) } } @@ -175,17 +184,15 @@ func (s *establishedState) withdraws(u *packet.BGPUpdate) { func (s *establishedState) updates(u *packet.BGPUpdate) { for r := u.NLRI; r != nil; r = r.Next { pfx := tnet.NewPfx(r.IP, r.Pfxlen) - fmt.Printf("LPM: Adding prefix %s\n", pfx.String()) path := &route.Path{ Type: route.BGPPathType, BGPPath: &route.BGPPath{ - Source: tnet.IPv4ToUint32(s.fsm.remote), + Source: tnet.IPv4ToUint32(s.fsm.peer.addr), }, } for pa := u.PathAttributes; pa != nil; pa = pa.Next { - fmt.Printf("TypeCode: %d\n", pa.TypeCode) switch pa.TypeCode { case packet.OriginAttr: path.BGPPath.Origin = pa.Value.(uint8) @@ -205,6 +212,13 @@ func (s *establishedState) updates(u *packet.BGPUpdate) { } } +func (s *establishedState) keepaliveReceived() (state, string) { + if s.fsm.holdTime != 0 { + s.fsm.holdTimer.Reset(s.fsm.holdTime) + } + return newEstablishedState(s.fsm), s.fsm.reason +} + func (s *establishedState) unexpectedMessage() (state, string) { s.fsm.sendNotification(packet.FiniteStateMachineError, 0) s.uninit() diff --git a/protocols/bgp/server/fsm_idle.go b/protocols/bgp/server/fsm_idle.go index 4b4697f69cbad6a70a4fb5902b16b3527ecb562b..87d57f79050bf00659ade688b22ed2afe090e838 100644 --- a/protocols/bgp/server/fsm_idle.go +++ b/protocols/bgp/server/fsm_idle.go @@ -1,5 +1,10 @@ package server +import ( + "fmt" + "time" +) + type idleState struct { fsm *FSM2 newStateReason string @@ -11,35 +16,41 @@ func newIdleState(fsm *FSM2) *idleState { } } -func (s *idleState) run() (state, string) { +func (s idleState) run() (state, string) { + if s.fsm.peer.reconnectInterval != 0 { + time.Sleep(s.fsm.peer.reconnectInterval) + go s.fsm.activate() + } for { - switch <-s.fsm.eventCh { + event := <-s.fsm.eventCh + fmt.Printf("Event: %d\n", event) + switch event { case ManualStart: - s.manualStart() + return s.manualStart() case AutomaticStart: - s.automaticStart() + return s.automaticStart() + case Cease: + return newCeaseState(), "Cease" default: continue } - - return newConnectState(s.fsm), s.newStateReason } } -func (s *idleState) manualStart() { +func (s *idleState) manualStart() (state, string) { s.newStateReason = "Received ManualStart event" - s.start() + return s.start() } -func (s *idleState) automaticStart() { +func (s *idleState) automaticStart() (state, string) { s.newStateReason = "Received AutomaticStart event" - s.start() + return s.start() } -func (s *idleState) start() { +func (s *idleState) start() (state, string) { s.fsm.resetConnectRetryCounter() s.fsm.startConnectRetryTimer() - if s.fsm.active { - s.fsm.tcpConnect() - } + go s.fsm.tcpConnect() + + return newConnectState(s.fsm), s.newStateReason } diff --git a/protocols/bgp/server/fsm_open_confirm.go b/protocols/bgp/server/fsm_open_confirm.go index 5c92cb0ee2f247f621684815dd89b7b7239c5459..1959c21e57e47da3a3c491683383110e23cc4f70 100644 --- a/protocols/bgp/server/fsm_open_confirm.go +++ b/protocols/bgp/server/fsm_open_confirm.go @@ -3,7 +3,6 @@ package server import ( "bytes" "fmt" - "time" "github.com/bio-routing/bio-rd/protocols/bgp/packet" ) @@ -18,14 +17,18 @@ func newOpenConfirmState(fsm *FSM2) *openConfirmState { } } -func (s *openConfirmState) run() (state, string) { +func (s openConfirmState) run() (state, string) { for { select { case e := <-s.fsm.eventCh: - if e == ManualStop { + switch e { + case ManualStop: return s.manualStop() + case Cease: + return s.cease() + default: + continue } - continue case <-s.fsm.holdTimer.C: return s.holdTimerExpired() case <-s.fsm.keepaliveTimer.C: @@ -52,6 +55,12 @@ func (s *openConfirmState) automaticStop() (state, string) { return newIdleState(s.fsm), "Automatic stop event" } +func (s *openConfirmState) cease() (state, string) { + s.fsm.sendNotification(packet.Cease, 0) + s.fsm.con.Close() + return newCeaseState(), "Cease" +} + func (s *openConfirmState) holdTimerExpired() (state, string) { s.fsm.sendNotification(packet.HoldTimeExpired, 0) stopTimer(s.fsm.connectRetryTimer) @@ -68,12 +77,12 @@ func (s *openConfirmState) keepaliveTimerExpired() (state, string) { s.fsm.connectRetryCounter++ return newIdleState(s.fsm), fmt.Sprintf("Failed to send keepalive: %v", err) } - s.fsm.keepaliveTimer.Reset(time.Second * s.fsm.keepaliveTime) + s.fsm.keepaliveTimer.Reset(s.fsm.keepaliveTime) return newOpenConfirmState(s.fsm), s.fsm.reason } -func (s *openConfirmState) msgReceived(recvMsg msgRecvMsg) (state, string) { - msg, err := packet.Decode(bytes.NewBuffer(recvMsg.msg)) +func (s *openConfirmState) msgReceived(data []byte) (state, string) { + msg, err := packet.Decode(bytes.NewBuffer(data)) if err != nil { switch bgperr := err.(type) { case packet.BGPError: @@ -106,7 +115,7 @@ func (s *openConfirmState) notification(msg *packet.BGPMessage) (state, string) } func (s *openConfirmState) keepaliveReceived() (state, string) { - s.fsm.holdTimer.Reset(time.Second * s.fsm.holdTime) + s.fsm.holdTimer.Reset(s.fsm.holdTime) return newEstablishedState(s.fsm), "Received KEEPALIVE" } diff --git a/protocols/bgp/server/fsm_open_sent.go b/protocols/bgp/server/fsm_open_sent.go index c1ad7ff4bdbf0a627a9f4edb4e04ac960fd16039..bb29ef6f76b443eed8b7ea9085e80bc25ec7826b 100644 --- a/protocols/bgp/server/fsm_open_sent.go +++ b/protocols/bgp/server/fsm_open_sent.go @@ -14,12 +14,14 @@ type openSentState struct { } func newOpenSentState(fsm *FSM2) *openSentState { + fmt.Printf("newOpenSentState\n") return &openSentState{ fsm: fsm, } } -func (s *openSentState) run() (state, string) { +func (s openSentState) run() (state, string) { + go s.fsm.msgReceiver() for { select { case e := <-s.fsm.eventCh: @@ -28,6 +30,8 @@ func (s *openSentState) run() (state, string) { return s.manualStop() case AutomaticStop: return s.automaticStop() + case Cease: + return s.cease() default: continue } @@ -55,6 +59,12 @@ func (s *openSentState) automaticStop() (state, string) { return newIdleState(s.fsm), "Automatic stop event" } +func (s *openSentState) cease() (state, string) { + s.fsm.sendNotification(packet.Cease, 0) + s.fsm.con.Close() + return newCeaseState(), "Cease" +} + func (s *openSentState) holdTimerExpired() (state, string) { s.fsm.sendNotification(packet.HoldTimeExpired, 0) stopTimer(s.fsm.connectRetryTimer) @@ -63,8 +73,8 @@ func (s *openSentState) holdTimerExpired() (state, string) { return newIdleState(s.fsm), "Holdtimer expired" } -func (s *openSentState) msgReceived(recvMsg msgRecvMsg) (state, string) { - msg, err := packet.Decode(bytes.NewBuffer(recvMsg.msg)) +func (s *openSentState) msgReceived(data []byte) (state, string) { + msg, err := packet.Decode(bytes.NewBuffer(data)) if err != nil { switch bgperr := err.(type) { case packet.BGPError: @@ -97,16 +107,19 @@ func (s *openSentState) openMsgReceived(msg *packet.BGPMessage) (state, string) openMsg := msg.Body.(*packet.BGPOpen) s.fsm.neighborID = openMsg.BGPIdentifier stopTimer(s.fsm.connectRetryTimer) + s.fsm.peer.collisionHandling(s.fsm) err := s.fsm.sendKeepalive() if err != nil { return s.tcpFailure() } - s.fsm.holdTime = time.Duration(math.Min(float64(s.fsm.holdTimeConfigured), float64(openMsg.HoldTime))) + s.fsm.holdTime = time.Duration(math.Min(float64(s.fsm.peer.holdTime), float64(time.Duration(openMsg.HoldTime)*time.Second))) if s.fsm.holdTime != 0 { - s.fsm.holdTimer.Reset(time.Second * s.fsm.holdTime) + if !s.fsm.holdTimer.Reset(s.fsm.holdTime) { + <-s.fsm.holdTimer.C + } s.fsm.keepaliveTime = s.fsm.holdTime / 3 - s.fsm.keepaliveTimer.Reset(time.Second * s.fsm.keepaliveTime) + s.fsm.keepaliveTimer = time.NewTimer(s.fsm.keepaliveTime) } s.processOpenOptions(openMsg.OptParams) diff --git a/protocols/bgp/server/peer.go b/protocols/bgp/server/peer.go index 49d8baed1550f0e2cd4e7c2f5df855cdc54b88c1..8e9408a58fc0cdbb6f0c9f704877c9e145a0d449 100644 --- a/protocols/bgp/server/peer.go +++ b/protocols/bgp/server/peer.go @@ -2,41 +2,105 @@ package server import ( "net" + "sync" + "time" "github.com/bio-routing/bio-rd/config" "github.com/bio-routing/bio-rd/protocols/bgp/packet" "github.com/bio-routing/bio-rd/routingtable" + "github.com/bio-routing/bio-rd/routingtable/filter" + "github.com/bio-routing/bio-rd/routingtable/locRIB" "time" ) type Peer struct { + server *BGPServer addr net.IP - asn uint32 + peerASN uint32 localASN uint32 - fsm *FSM - rib routingtable.RouteTableClient + fsms []*FSM2 + fsmsMu sync.Mutex + rib *locRIB.LocRIB routerID uint32 addPathSend routingtable.ClientOptions addPathRecv bool - optOpenParams []packet.OptParam reconnectInterval time.Duration + keepaliveTime time.Duration + holdTime time.Duration + optOpenParams []packet.OptParam + importFilter *filter.Filter + exportFilter *filter.Filter +} + +func (p *Peer) collisionHandling(callingFSM *FSM2) bool { + p.fsmsMu.Lock() + defer p.fsmsMu.Unlock() + + for _, fsm := range p.fsms { + if callingFSM == fsm { + continue + } + + fsm.stateMu.RLock() + isEstablished := isEstablishedState(fsm.state) + isOpenConfirm := isOpenConfirmState(fsm.state) + fsm.stateMu.RUnlock() + + if isEstablished { + return true + } + + if !isOpenConfirm { + continue + } + + if p.routerID < callingFSM.neighborID { + fsm.cease() + } else { + return true + } + } + + return false +} + +func isOpenConfirmState(s state) bool { + switch s.(type) { + case openConfirmState: + return true + } + + return false +} + +func isEstablishedState(s state) bool { + switch s.(type) { + case establishedState: + return true + } + + return false } // NewPeer creates a new peer with the given config. If an connection is established, the adjRIBIN of the peer is connected // to the given rib. To actually connect the peer, call Start() on the returned peer. -func NewPeer(c config.Peer, rib routingtable.RouteTableClient) (*Peer, error) { +func NewPeer(c config.Peer, rib *locRIB.LocRIB, server *BGPServer) (*Peer, error) { p := &Peer{ + server: server, addr: c.PeerAddress, - asn: c.PeerAS, + peerASN: c.PeerAS, localASN: c.LocalAS, + fsms: make([]*FSM2, 0), rib: rib, addPathSend: c.AddPathSend, addPathRecv: c.AddPathRecv, - optOpenParams: make([]packet.OptParam, 0), reconnectInterval: c.ReconnectInterval, + keepaliveTime: c.KeepAlive, + holdTime: c.HoldTimer, + optOpenParams: make([]packet.OptParam, 0), } - p.fsm = NewFSM(p, c, rib) + p.fsms = append(p.fsms, NewActiveFSM2(p)) caps := make([]packet.Capability, 0) @@ -74,24 +138,6 @@ func (p *Peer) GetAddr() net.IP { return p.addr } -// GetASN returns the configured AS number of the peer -func (p *Peer) GetASN() uint32 { - return p.asn -} - -// Start the peers fsm. It starts from the Idle state and will get an ManualStart event. To trigger -// reconnects if the fsm falls back into the Idle state, every reconnectInterval a ManualStart event is send. -// The default value for reconnectInterval is 30 seconds. func (p *Peer) Start() { - p.fsm.start() - if p.reconnectInterval == 0 { - p.reconnectInterval = 30 * time.Second - } - t := time.Tick(p.reconnectInterval) - go func() { - for { - <-t - p.fsm.activate() - } - }() + p.fsms[0].start() } diff --git a/protocols/bgp/server/server.go b/protocols/bgp/server/server.go index a6025140fd62f81d8fa503aeb0c31114a7589f2b..4a1f28a7a45d199191d9e59a36090d31704019d8 100644 --- a/protocols/bgp/server/server.go +++ b/protocols/bgp/server/server.go @@ -4,12 +4,15 @@ import ( "fmt" "io" "net" - "strings" "github.com/bio-routing/bio-rd/config" "github.com/bio-routing/bio-rd/protocols/bgp/packet" +<<<<<<< HEAD "github.com/bio-routing/bio-rd/routingtable" log "github.com/sirupsen/logrus" +======= + "github.com/bio-routing/bio-rd/routingtable/locRIB" +>>>>>>> Replaced FSM ) const ( @@ -61,7 +64,9 @@ func (b *BGPServer) Start(c *config.Global) error { func (b *BGPServer) incomingConnectionWorker() { for { - c := <-b.acceptCh + /*c := <-b.acceptCh + fmt.Printf("Incoming connection!\n") + fmt.Printf("Connection from: %v\n", c.RemoteAddr()) peerAddr := strings.Split(c.RemoteAddr().String(), ":")[0] if _, ok := b.peers[peerAddr]; !ok { @@ -77,8 +82,19 @@ func (b *BGPServer) incomingConnectionWorker() { }).Info("Incoming TCP connection") log.WithField("Peer", peerAddr).Debug("Sending incoming TCP connection to fsm for peer") - b.peers[peerAddr].fsm.conCh <- c - log.Debug("Sending done") + fmt.Printf("Initiating new ActiveFSM due to incoming connection from peer %s\n", peerAddr) + fsm := NewActiveFSM2(b.peers[peerAddr]) + fsm.state = newActiveState(fsm) + fsm.startConnectRetryTimer() + + fmt.Printf("Getting lock...\n") + b.peers[peerAddr].fsmsMu.Lock() + b.peers[peerAddr].fsms = append(b.peers[peerAddr].fsms, fsm) + fmt.Printf("Releasing lock...\n") + b.peers[peerAddr].fsmsMu.Unlock() + + go fsm.run() + fsm.conCh <- c*/ } } @@ -87,7 +103,7 @@ func (b *BGPServer) AddPeer(c config.Peer, rib routingtable.RouteTableClient) er return fmt.Errorf("32bit ASNs are not supported yet") } - peer, err := NewPeer(c, rib) + peer, err := NewPeer(c, rib, b) if err != nil { return err } @@ -100,7 +116,7 @@ func (b *BGPServer) AddPeer(c config.Peer, rib routingtable.RouteTableClient) er return nil } -func recvMsg(c *net.TCPConn) (msg []byte, err error) { +func recvMsg(c net.Conn) (msg []byte, err error) { buffer := make([]byte, packet.MaxLen) _, err = io.ReadFull(c, buffer[0:packet.MinLen]) if err != nil { diff --git a/protocols/bgp/server/update_sender.go b/protocols/bgp/server/update_sender.go index d6e3ee40a08b974652cdb212683c54292d0b37f8..c394c47a50207eb794254283d1efc886123f82e7 100644 --- a/protocols/bgp/server/update_sender.go +++ b/protocols/bgp/server/update_sender.go @@ -28,7 +28,11 @@ func newUpdateSender(fsm *FSM2) *UpdateSender { // AddPath serializes a new path and sends out a BGP update message func (u *UpdateSender) AddPath(pfx net.Prefix, p *route.Path) error { +<<<<<<< HEAD pathAttrs, err := pathAttribues(p, u.fsm) +======= + asPathPA, err := packet.ParseASPathStr(fmt.Sprintf("%d %s", u.fsm.peer.localASN, p.BGPPath.ASPath)) +>>>>>>> Replaced FSM if err != nil { log.Errorf("Unable to create BGP Update: %v", err) return nil diff --git a/routingtable/client_manager.go b/routingtable/client_manager.go index f29ae2dce47dc7e4a3d3ba725ff5d57fd27f74f6..0bedbb45519c88bcf8a44e4b4491a76b66e590c7 100644 --- a/routingtable/client_manager.go +++ b/routingtable/client_manager.go @@ -57,6 +57,7 @@ func (c *ClientManager) RegisterWithOptions(client RouteTableClient, opt ClientO c.mu.Lock() c.clients[client] = opt c.mu.Unlock() + c.master.UpdateNewClient(client) }