diff --git a/protocols/bgp/server/bmp_router.go b/protocols/bgp/server/bmp_router.go index 51a91eea531ff0e1fde79f6e09f634af482967c2..019a2b7ad082aeea2b79e0efc512b317f5a7598f 100644 --- a/protocols/bgp/server/bmp_router.go +++ b/protocols/bgp/server/bmp_router.go @@ -103,7 +103,7 @@ func (r *Router) Address() net.IP { return r.address } -func (r *Router) serve(con net.Conn) { +func (r *Router) serve(con net.Conn) error { r.con = con r.runMu.Lock() defer r.con.Close() @@ -112,14 +112,13 @@ func (r *Router) serve(con net.Conn) { for { select { case <-r.stop: - return + return nil default: } msg, err := recvBMPMsg(r.con) if err != nil { - r.logger.Errorf("Unable to get message: %v", err) - return + return errors.Wrap(err, "Unable to get message") } r.processMsg(msg) diff --git a/protocols/bgp/server/bmp_server.go b/protocols/bgp/server/bmp_server.go index d7a92a795fa524cb3e0c436da8182b950aae3163..d0bb0c20c44052f7ef30e09502f2472651d0e86e 100644 --- a/protocols/bgp/server/bmp_server.go +++ b/protocols/bgp/server/bmp_server.go @@ -46,6 +46,10 @@ func NewServer() *BMPServer { return b } +func conString(host string, port uint16) string { + return fmt.Sprintf("%s:%d", host, port) +} + // AddRouter adds a router to which we connect with BMP func (b *BMPServer) AddRouter(addr net.IP, port uint16) { b.gloablMu.Lock() @@ -56,10 +60,26 @@ func (b *BMPServer) AddRouter(addr net.IP, port uint16) { go func(r *Router) { for { - <-r.reconnectTimer.C + select { + case <-r.stop: + log.WithFields(log.Fields{ + "component": "bmp_server", + "address": conString(r.address.String(), r.port), + }).Info("Stop event: Stopping reconnect routine") + return + case <-r.reconnectTimer.C: + log.WithFields(log.Fields{ + "component": "bmp_server", + "address": conString(r.address.String(), r.port), + }).Info("Reconnect timer expired: Establishing connection") + } + c, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", r.address.String(), r.port), r.dialTimeout) if err != nil { - log.Infof("Unable to connect to BMP router: %v", err) + log.WithError(err).WithFields(log.Fields{ + "component": "bmp_server", + "address": conString(r.address.String(), r.port), + }).Info("Unable to connect to BMP router") if r.reconnectTime == 0 { r.reconnectTime = r.reconnectTimeMin } else if r.reconnectTime < r.reconnectTimeMax { @@ -72,9 +92,22 @@ func (b *BMPServer) AddRouter(addr net.IP, port uint16) { atomic.StoreUint32(&r.established, 1) r.reconnectTime = r.reconnectTimeMin r.reconnectTimer = time.NewTimer(time.Second * time.Duration(r.reconnectTime)) - log.Infof("Connected to %s", r.address.String()) - r.serve(c) + log.WithFields(log.Fields{ + "component": "bmp_server", + "address": conString(r.address.String(), r.port), + }).Info("Connected") + + err = r.serve(c) atomic.StoreUint32(&r.established, 0) + if err != nil { + r.logger.WithError(err).Error("r.serve() failed") + } else { + log.WithFields(log.Fields{ + "component": "bmp_server", + "address": conString(r.address.String(), r.port), + }).Info("r.Serve returned without error. Stopping reconnect routine") + return + } } }(r) }