Skip to content
Snippets Groups Projects
Unverified Commit a252acec authored by takt's avatar takt Committed by GitHub
Browse files

Merge pull request #94 from bio-routing/fix/leak_of_connection_goroutine

Fix for reconnect goroutine leak at FSM stop 
parents 424edb17 a0914a38
No related branches found
No related tags found
No related merge requests found
package server package server
import ( import (
"context"
"fmt" "fmt"
"io"
"net" "net"
"sync" "sync"
"time" "time"
...@@ -66,6 +68,8 @@ type FSM struct { ...@@ -66,6 +68,8 @@ type FSM struct {
stateMu sync.RWMutex stateMu sync.RWMutex
reason string reason string
active bool active bool
connectionCancelFunc context.CancelFunc
} }
// NewPassiveFSM2 initiates a new passive FSM // NewPassiveFSM2 initiates a new passive FSM
...@@ -110,8 +114,11 @@ func newFSM2(peer *peer) *FSM { ...@@ -110,8 +114,11 @@ func newFSM2(peer *peer) *FSM {
} }
func (fsm *FSM) start() { func (fsm *FSM) start() {
ctx, cancel := context.WithCancel(context.Background())
fsm.connectionCancelFunc = cancel
go fsm.run() go fsm.run()
go fsm.tcpConnector() go fsm.tcpConnector(ctx)
return return
} }
...@@ -120,6 +127,8 @@ func (fsm *FSM) activate() { ...@@ -120,6 +127,8 @@ func (fsm *FSM) activate() {
} }
func (fsm *FSM) run() { func (fsm *FSM) run() {
defer fsm.cancelRunningGoRoutines()
next, reason := fsm.state.run() next, reason := fsm.state.run()
for { for {
newState := stateName(next) newState := stateName(next)
...@@ -146,6 +155,12 @@ func (fsm *FSM) run() { ...@@ -146,6 +155,12 @@ func (fsm *FSM) run() {
} }
} }
func (fsm *FSM) cancelRunningGoRoutines() {
if fsm.connectionCancelFunc != nil {
fsm.connectionCancelFunc()
}
}
func stateName(s state) string { func stateName(s state) string {
switch s.(type) { switch s.(type) {
case *idleState: case *idleState:
...@@ -171,7 +186,7 @@ func (fsm *FSM) cease() { ...@@ -171,7 +186,7 @@ func (fsm *FSM) cease() {
fsm.eventCh <- Cease fsm.eventCh <- Cease
} }
func (fsm *FSM) tcpConnector() error { func (fsm *FSM) tcpConnector(ctx context.Context) {
for { for {
select { select {
case <-fsm.initiateCon: case <-fsm.initiateCon:
...@@ -191,6 +206,8 @@ func (fsm *FSM) tcpConnector() error { ...@@ -191,6 +206,8 @@ func (fsm *FSM) tcpConnector() error {
case <-time.NewTimer(time.Second * 30).C: case <-time.NewTimer(time.Second * 30).C:
c.Close() c.Close()
continue continue
case <-ctx.Done():
return
} }
} }
} }
...@@ -276,6 +293,23 @@ func (fsm *FSM) sendKeepalive() error { ...@@ -276,6 +293,23 @@ func (fsm *FSM) sendKeepalive() error {
return nil return nil
} }
func recvMsg(c net.Conn) (msg []byte, err error) {
buffer := make([]byte, packet.MaxLen)
_, err = io.ReadFull(c, buffer[0:packet.MinLen])
if err != nil {
return nil, fmt.Errorf("Read failed: %v", err)
}
l := int(buffer[16])*256 + int(buffer[17])
toRead := l
_, err = io.ReadFull(c, buffer[packet.MinLen:toRead])
if err != nil {
return nil, fmt.Errorf("Read failed: %v", err)
}
return buffer, nil
}
func stopTimer(t *time.Timer) { func stopTimer(t *time.Timer) {
if !t.Stop() { if !t.Stop() {
select { select {
......
...@@ -2,13 +2,11 @@ package server ...@@ -2,13 +2,11 @@ package server
import ( import (
"fmt" "fmt"
"io"
"net" "net"
"strings" "strings"
"sync" "sync"
"github.com/bio-routing/bio-rd/config" "github.com/bio-routing/bio-rd/config"
"github.com/bio-routing/bio-rd/protocols/bgp/packet"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
...@@ -124,20 +122,3 @@ func (b *bgpServer) AddPeer(c config.Peer) error { ...@@ -124,20 +122,3 @@ func (b *bgpServer) AddPeer(c config.Peer) error {
return nil return nil
} }
func recvMsg(c net.Conn) (msg []byte, err error) {
buffer := make([]byte, packet.MaxLen)
_, err = io.ReadFull(c, buffer[0:packet.MinLen])
if err != nil {
return nil, fmt.Errorf("Read failed: %v", err)
}
l := int(buffer[16])*256 + int(buffer[17])
toRead := l
_, err = io.ReadFull(c, buffer[packet.MinLen:toRead])
if err != nil {
return nil, fmt.Errorf("Read failed: %v", err)
}
return buffer, nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment