diff --git a/cmd/ris/risserver/server.go b/cmd/ris/risserver/server.go index 211d614a5ee1f4839a25989bf6101fb60242458e..de8a9621ae5377ffa5d2532f7965c070afa354b5 100644 --- a/cmd/ris/risserver/server.go +++ b/cmd/ris/risserver/server.go @@ -11,12 +11,35 @@ import ( "github.com/bio-routing/bio-rd/routingtable/filter" "github.com/bio-routing/bio-rd/routingtable/locRIB" + "github.com/prometheus/client_golang/prometheus" + pb "github.com/bio-routing/bio-rd/cmd/ris/api" bnet "github.com/bio-routing/bio-rd/net" netapi "github.com/bio-routing/bio-rd/net/api" routeapi "github.com/bio-routing/bio-rd/route/api" ) +var ( + risObserveFIBClients *prometheus.GaugeVec +) + +func init() { + risObserveFIBClients = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "bio", + Subsystem: "ris", + Name: "observe_fib_clients", + Help: "number of observe FIB clients per router/vrf/afisafi", + }, + []string{ + "router", + "vrf", + "afisafi", + }, + ) + prometheus.MustRegister(risObserveFIBClients) +} + // Server represents an RoutingInformationService server type Server struct { bmp *server.BMPServer @@ -114,6 +137,7 @@ func (s *Server) GetLonger(ctx context.Context, req *pb.GetLongerRequest) (*pb.G return res, nil } +// ObserveRIB implements the ObserveRIB RPC func (s *Server) ObserveRIB(req *pb.ObserveRIBRequest, stream pb.RoutingInformationService_ObserveRIBServer) error { ipVersion := netapi.IP_IPv4 switch req.Afisafi { @@ -130,6 +154,9 @@ func (s *Server) ObserveRIB(req *pb.ObserveRIBRequest, stream pb.RoutingInformat return err } + risObserveFIBClients.WithLabelValues(req.Router, fmt.Sprintf("%d", req.VrfId), fmt.Sprintf("%d", req.Afisafi)).Inc() + defer risObserveFIBClients.WithLabelValues(req.Router, fmt.Sprintf("%d", req.VrfId), fmt.Sprintf("%d", req.Afisafi)).Dec() + fifo := newUpdateFIFO() rc := newRIBClient(fifo) ret := make(chan error) @@ -162,6 +189,7 @@ func (s *Server) ObserveRIB(req *pb.ObserveRIBRequest, stream pb.RoutingInformat return nil } +// DumpRIB implements the DumpRIB RPC func (s *Server) DumpRIB(req *pb.DumpRIBRequest, stream pb.RoutingInformationService_DumpRIBServer) error { ipVersion := netapi.IP_IPv4 switch req.Afisafi { diff --git a/go.sum b/go.sum index 772a6cbf53b8136c1e8d673fbe977120dd47fc13..d55f9d381c1bb7cbdeaf1f01de6a5d385ef6be71 100644 --- a/go.sum +++ b/go.sum @@ -54,6 +54,7 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.0.0 h1:vrDKnkGzuGvhNAL56c7DBz29ZL+KxnoR0x7enabFceM= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= +github.com/prometheus/client_golang v1.3.0 h1:miYCvYqFXtl/J9FIy8eNpBfYthAEFg+Ys0XyUVEcDsc= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= diff --git a/metrics/bmp/adapter/prom/bmp_prom_adapter.go b/metrics/bmp/adapter/prom/bmp_prom_adapter.go index 0b31429ce98391300b614199113fc206e5ace84c..76c1dcb48d9eabb5e2b8c7e8ede2699355126257 100644 --- a/metrics/bmp/adapter/prom/bmp_prom_adapter.go +++ b/metrics/bmp/adapter/prom/bmp_prom_adapter.go @@ -16,6 +16,7 @@ const ( ) var ( + bmpSessionEstablishedDesc *prometheus.Desc routeMonitoringMessagesDesc *prometheus.Desc statisticsReportMessages *prometheus.Desc peerDownNotificationMessages *prometheus.Desc @@ -28,6 +29,7 @@ var ( func init() { labels := []string{"router"} + bmpSessionEstablishedDesc = prometheus.NewDesc(prefix+"session_established", "Indicates if a BMP session is established", labels, nil) routeMonitoringMessagesDesc = prometheus.NewDesc(prefix+"route_monitoring_messages", "Returns number of received route monitoring messages", labels, nil) statisticsReportMessages = prometheus.NewDesc(prefix+"statistics_report_messages", "Returns number of received statistics report messages", labels, nil) peerDownNotificationMessages = prometheus.NewDesc(prefix+"peer_down_messages", "Returns number of received peer down notification messages", labels, nil) @@ -49,6 +51,7 @@ type bmpCollector struct { // Describe conforms to the prometheus collector interface func (c *bmpCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- bmpSessionEstablishedDesc ch <- routeMonitoringMessagesDesc ch <- statisticsReportMessages ch <- peerDownNotificationMessages @@ -77,6 +80,12 @@ func (c *bmpCollector) Collect(ch chan<- prometheus.Metric) { func (c *bmpCollector) collectForRouter(ch chan<- prometheus.Metric, rtr *metrics.BMPRouterMetrics) { l := []string{rtr.Name} + established := 0 + if rtr.Established { + established = 1 + } + + ch <- prometheus.MustNewConstMetric(bmpSessionEstablishedDesc, prometheus.GaugeValue, float64(established), l...) ch <- prometheus.MustNewConstMetric(routeMonitoringMessagesDesc, prometheus.CounterValue, float64(rtr.RouteMonitoringMessages), l...) ch <- prometheus.MustNewConstMetric(statisticsReportMessages, prometheus.CounterValue, float64(rtr.StatisticsReportMessages), l...) ch <- prometheus.MustNewConstMetric(peerDownNotificationMessages, prometheus.CounterValue, float64(rtr.PeerDownNotificationMessages), l...) diff --git a/net/ip_cache.go b/net/ip_cache.go index 8936f32d44a9d31a4ae6194e083a1731a126ecc4..8c6cf7a4c8682e0503828803889784adf0c691a2 100644 --- a/net/ip_cache.go +++ b/net/ip_cache.go @@ -36,8 +36,9 @@ func (ipc *ipCache) get(addr IP) *IP { } ipc._set(addr) + res := ipc.cache[addr] ipc.cacheMu.Unlock() - return ipc.cache[addr] + return res } func (ipc *ipCache) _set(addr IP) { diff --git a/net/prefix_cache.go b/net/prefix_cache.go index 3b6795730e3bedad8ddd4472c7ec75c9077e0dd2..695df9a46a986004edf4dc3ffc22958f56ca10de 100644 --- a/net/prefix_cache.go +++ b/net/prefix_cache.go @@ -37,8 +37,9 @@ func (pfxc *pfxCache) get(pfx Prefix) *Prefix { } pfxc._set(pfx) + ret := pfxc.cache[pfx] pfxc.cacheMu.Unlock() - return pfxc.cache[pfx] + return ret } func (pfxc *pfxCache) _set(pfx Prefix) { diff --git a/protocols/bgp/metrics/bmp_metrics.go b/protocols/bgp/metrics/bmp_metrics.go index c0541203fc9b360b651b3adac99bfa46e8382f3c..ccc76fa8f297a8c32259202e9e459dfedd2b7af3 100644 --- a/protocols/bgp/metrics/bmp_metrics.go +++ b/protocols/bgp/metrics/bmp_metrics.go @@ -14,6 +14,9 @@ type BMPRouterMetrics struct { // Name of the monitored routers Name string + // Status of TCP session + Established bool + // Count of received RouteMonitoringMessages RouteMonitoringMessages uint64 diff --git a/protocols/bgp/server/bmp_metrics_service.go b/protocols/bgp/server/bmp_metrics_service.go index d678bd12d76a3ac3626ad97b1049e09dd7e0d832..9e385d008f6498c731eb9b374fba1597949a592a 100644 --- a/protocols/bgp/server/bmp_metrics_service.go +++ b/protocols/bgp/server/bmp_metrics_service.go @@ -30,8 +30,11 @@ func (b *bmpMetricsService) routerMetrics() []*metrics.BMPRouterMetrics { } func (b *bmpMetricsService) metricsForRouter(rtr *Router) *metrics.BMPRouterMetrics { + established := atomic.LoadUint32(&rtr.established) + rm := &metrics.BMPRouterMetrics{ Name: rtr.name, + Established: established == 1, RouteMonitoringMessages: atomic.LoadUint64(&rtr.counters.routeMonitoringMessages), StatisticsReportMessages: atomic.LoadUint64(&rtr.counters.statisticsReportMessages), PeerDownNotificationMessages: atomic.LoadUint64(&rtr.counters.peerDownNotificationMessages), diff --git a/protocols/bgp/server/bmp_router.go b/protocols/bgp/server/bmp_router.go index 01a5952813210b610b4cc85213a3e954225469fc..b98aacb23ca9da415ef3c9edd78b3735ab151f4e 100644 --- a/protocols/bgp/server/bmp_router.go +++ b/protocols/bgp/server/bmp_router.go @@ -20,15 +20,18 @@ import ( "github.com/bio-routing/tflow2/convert" ) +// Router represents a BMP enabled route in BMP context type Router struct { name string nameMu sync.RWMutex address net.IP port uint16 con net.Conn + established uint32 reconnectTimeMin int reconnectTimeMax int reconnectTime int + dialTimeout time.Duration reconnectTimer *time.Timer vrfRegistry *vrf.VRFRegistry neighborManager *neighborManager @@ -69,6 +72,7 @@ func newRouter(addr net.IP, port uint16) *Router { reconnectTimeMin: 30, // Suggested by RFC 7854 reconnectTimeMax: 720, // Suggested by RFC 7854 reconnectTimer: time.NewTimer(time.Duration(0)), + dialTimeout: time.Second * 5, vrfRegistry: vrf.NewVRFRegistry(), neighborManager: newNeighborManager(), logger: log.New(), @@ -77,14 +81,17 @@ func newRouter(addr net.IP, port uint16) *Router { } } +// GetVRF get's a VRF func (r *Router) GetVRF(rd uint64) *vrf.VRF { return r.vrfRegistry.GetVRFByRD(rd) } +// GetVRFs gets all VRFs func (r *Router) GetVRFs() []*vrf.VRF { return r.vrfRegistry.List() } +// Name gets a routers name func (r *Router) Name() string { r.nameMu.RLock() defer r.nameMu.RUnlock() diff --git a/protocols/bgp/server/bmp_server.go b/protocols/bgp/server/bmp_server.go index cc387a2c6e18399d62d3087f3cb882f06e312e46..d7a92a795fa524cb3e0c436da8182b950aae3163 100644 --- a/protocols/bgp/server/bmp_server.go +++ b/protocols/bgp/server/bmp_server.go @@ -5,6 +5,7 @@ import ( "io" "net" "sync" + "sync/atomic" "time" "github.com/bio-routing/bio-rd/protocols/bgp/metrics" @@ -56,7 +57,7 @@ func (b *BMPServer) AddRouter(addr net.IP, port uint16) { go func(r *Router) { for { <-r.reconnectTimer.C - c, err := net.Dial("tcp", fmt.Sprintf("%s:%d", r.address.String(), r.port)) + c, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", r.address.String(), r.port), r.dialTimeout) if err != nil { log.Infof("Unable to connect to BMP router: %v", err) if r.reconnectTime == 0 { @@ -68,9 +69,12 @@ func (b *BMPServer) AddRouter(addr net.IP, port uint16) { continue } - r.reconnectTime = 0 + atomic.StoreUint32(&r.established, 1) + r.reconnectTime = r.reconnectTimeMin + r.reconnectTimer = time.NewTimer(time.Second * time.Duration(r.reconnectTime)) log.Infof("Connected to %s", r.address.String()) r.serve(c) + atomic.StoreUint32(&r.established, 0) } }(r) } @@ -125,6 +129,7 @@ func recvBMPMsg(c net.Conn) (msg []byte, err error) { return buffer[0:toRead], nil } +// GetRouters gets all routers func (b *BMPServer) GetRouters() []*Router { b.routersMu.RLock() defer b.routersMu.RUnlock() @@ -137,6 +142,7 @@ func (b *BMPServer) GetRouters() []*Router { return r } +// GetRouter gets a router func (b *BMPServer) GetRouter(name string) *Router { b.routersMu.RLock() defer b.routersMu.RUnlock() @@ -152,6 +158,7 @@ func (b *BMPServer) GetRouter(name string) *Router { return nil } +// Metrics gets BMP server metrics func (b *BMPServer) Metrics() (*metrics.BMPMetrics, error) { if b.metrics == nil { return nil, fmt.Errorf("Server not started yet")