Skip to content
Snippets Groups Projects
Unverified Commit ac791fdc authored by takt's avatar takt Committed by GitHub
Browse files

Merge branch 'master' into bgp/ribspeed

parents 685e1993 f7cd2d04
Branches
Tags
No related merge requests found
......@@ -11,12 +11,35 @@ import (
"github.com/bio-routing/bio-rd/routingtable/filter"
"github.com/bio-routing/bio-rd/routingtable/locRIB"
"github.com/prometheus/client_golang/prometheus"
pb "github.com/bio-routing/bio-rd/cmd/ris/api"
bnet "github.com/bio-routing/bio-rd/net"
netapi "github.com/bio-routing/bio-rd/net/api"
routeapi "github.com/bio-routing/bio-rd/route/api"
)
var (
risObserveFIBClients *prometheus.GaugeVec
)
func init() {
risObserveFIBClients = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "bio",
Subsystem: "ris",
Name: "observe_fib_clients",
Help: "number of observe FIB clients per router/vrf/afisafi",
},
[]string{
"router",
"vrf",
"afisafi",
},
)
prometheus.MustRegister(risObserveFIBClients)
}
// Server represents an RoutingInformationService server
type Server struct {
bmp *server.BMPServer
......@@ -114,6 +137,7 @@ func (s *Server) GetLonger(ctx context.Context, req *pb.GetLongerRequest) (*pb.G
return res, nil
}
// ObserveRIB implements the ObserveRIB RPC
func (s *Server) ObserveRIB(req *pb.ObserveRIBRequest, stream pb.RoutingInformationService_ObserveRIBServer) error {
ipVersion := netapi.IP_IPv4
switch req.Afisafi {
......@@ -130,6 +154,9 @@ func (s *Server) ObserveRIB(req *pb.ObserveRIBRequest, stream pb.RoutingInformat
return err
}
risObserveFIBClients.WithLabelValues(req.Router, fmt.Sprintf("%d", req.VrfId), fmt.Sprintf("%d", req.Afisafi)).Inc()
defer risObserveFIBClients.WithLabelValues(req.Router, fmt.Sprintf("%d", req.VrfId), fmt.Sprintf("%d", req.Afisafi)).Dec()
fifo := newUpdateFIFO()
rc := newRIBClient(fifo)
ret := make(chan error)
......@@ -162,6 +189,7 @@ func (s *Server) ObserveRIB(req *pb.ObserveRIBRequest, stream pb.RoutingInformat
return nil
}
// DumpRIB implements the DumpRIB RPC
func (s *Server) DumpRIB(req *pb.DumpRIBRequest, stream pb.RoutingInformationService_DumpRIBServer) error {
ipVersion := netapi.IP_IPv4
switch req.Afisafi {
......
......@@ -16,6 +16,7 @@ const (
)
var (
bmpSessionEstablishedDesc *prometheus.Desc
routeMonitoringMessagesDesc *prometheus.Desc
statisticsReportMessages *prometheus.Desc
peerDownNotificationMessages *prometheus.Desc
......@@ -28,6 +29,7 @@ var (
func init() {
labels := []string{"router"}
bmpSessionEstablishedDesc = prometheus.NewDesc(prefix+"session_established", "Indicates if a BMP session is established", labels, nil)
routeMonitoringMessagesDesc = prometheus.NewDesc(prefix+"route_monitoring_messages", "Returns number of received route monitoring messages", labels, nil)
statisticsReportMessages = prometheus.NewDesc(prefix+"statistics_report_messages", "Returns number of received statistics report messages", labels, nil)
peerDownNotificationMessages = prometheus.NewDesc(prefix+"peer_down_messages", "Returns number of received peer down notification messages", labels, nil)
......@@ -49,6 +51,7 @@ type bmpCollector struct {
// Describe conforms to the prometheus collector interface
func (c *bmpCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- bmpSessionEstablishedDesc
ch <- routeMonitoringMessagesDesc
ch <- statisticsReportMessages
ch <- peerDownNotificationMessages
......@@ -77,6 +80,12 @@ func (c *bmpCollector) Collect(ch chan<- prometheus.Metric) {
func (c *bmpCollector) collectForRouter(ch chan<- prometheus.Metric, rtr *metrics.BMPRouterMetrics) {
l := []string{rtr.Name}
established := 0
if rtr.Established {
established = 1
}
ch <- prometheus.MustNewConstMetric(bmpSessionEstablishedDesc, prometheus.GaugeValue, float64(established), l...)
ch <- prometheus.MustNewConstMetric(routeMonitoringMessagesDesc, prometheus.CounterValue, float64(rtr.RouteMonitoringMessages), l...)
ch <- prometheus.MustNewConstMetric(statisticsReportMessages, prometheus.CounterValue, float64(rtr.StatisticsReportMessages), l...)
ch <- prometheus.MustNewConstMetric(peerDownNotificationMessages, prometheus.CounterValue, float64(rtr.PeerDownNotificationMessages), l...)
......
......@@ -36,8 +36,9 @@ func (ipc *ipCache) get(addr IP) *IP {
}
ipc._set(addr)
res := ipc.cache[addr]
ipc.cacheMu.Unlock()
return ipc.cache[addr]
return res
}
func (ipc *ipCache) _set(addr IP) {
......
......@@ -37,8 +37,9 @@ func (pfxc *pfxCache) get(pfx Prefix) *Prefix {
}
pfxc._set(pfx)
ret := pfxc.cache[pfx]
pfxc.cacheMu.Unlock()
return pfxc.cache[pfx]
return ret
}
func (pfxc *pfxCache) _set(pfx Prefix) {
......
......@@ -14,6 +14,9 @@ type BMPRouterMetrics struct {
// Name of the monitored routers
Name string
// Status of TCP session
Established bool
// Count of received RouteMonitoringMessages
RouteMonitoringMessages uint64
......
......@@ -30,8 +30,11 @@ func (b *bmpMetricsService) routerMetrics() []*metrics.BMPRouterMetrics {
}
func (b *bmpMetricsService) metricsForRouter(rtr *Router) *metrics.BMPRouterMetrics {
established := atomic.LoadUint32(&rtr.established)
rm := &metrics.BMPRouterMetrics{
Name: rtr.name,
Established: established == 1,
RouteMonitoringMessages: atomic.LoadUint64(&rtr.counters.routeMonitoringMessages),
StatisticsReportMessages: atomic.LoadUint64(&rtr.counters.statisticsReportMessages),
PeerDownNotificationMessages: atomic.LoadUint64(&rtr.counters.peerDownNotificationMessages),
......
......@@ -20,15 +20,18 @@ import (
"github.com/bio-routing/tflow2/convert"
)
// Router represents a BMP enabled route in BMP context
type Router struct {
name string
nameMu sync.RWMutex
address net.IP
port uint16
con net.Conn
established uint32
reconnectTimeMin int
reconnectTimeMax int
reconnectTime int
dialTimeout time.Duration
reconnectTimer *time.Timer
vrfRegistry *vrf.VRFRegistry
neighborManager *neighborManager
......@@ -69,6 +72,7 @@ func newRouter(addr net.IP, port uint16) *Router {
reconnectTimeMin: 30, // Suggested by RFC 7854
reconnectTimeMax: 720, // Suggested by RFC 7854
reconnectTimer: time.NewTimer(time.Duration(0)),
dialTimeout: time.Second * 5,
vrfRegistry: vrf.NewVRFRegistry(),
neighborManager: newNeighborManager(),
logger: log.New(),
......@@ -77,14 +81,17 @@ func newRouter(addr net.IP, port uint16) *Router {
}
}
// GetVRF get's a VRF
func (r *Router) GetVRF(rd uint64) *vrf.VRF {
return r.vrfRegistry.GetVRFByRD(rd)
}
// GetVRFs gets all VRFs
func (r *Router) GetVRFs() []*vrf.VRF {
return r.vrfRegistry.List()
}
// Name gets a routers name
func (r *Router) Name() string {
r.nameMu.RLock()
defer r.nameMu.RUnlock()
......
......@@ -5,6 +5,7 @@ import (
"io"
"net"
"sync"
"sync/atomic"
"time"
"github.com/bio-routing/bio-rd/protocols/bgp/metrics"
......@@ -56,7 +57,7 @@ func (b *BMPServer) AddRouter(addr net.IP, port uint16) {
go func(r *Router) {
for {
<-r.reconnectTimer.C
c, err := net.Dial("tcp", fmt.Sprintf("%s:%d", r.address.String(), r.port))
c, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", r.address.String(), r.port), r.dialTimeout)
if err != nil {
log.Infof("Unable to connect to BMP router: %v", err)
if r.reconnectTime == 0 {
......@@ -68,9 +69,12 @@ func (b *BMPServer) AddRouter(addr net.IP, port uint16) {
continue
}
r.reconnectTime = 0
atomic.StoreUint32(&r.established, 1)
r.reconnectTime = r.reconnectTimeMin
r.reconnectTimer = time.NewTimer(time.Second * time.Duration(r.reconnectTime))
log.Infof("Connected to %s", r.address.String())
r.serve(c)
atomic.StoreUint32(&r.established, 0)
}
}(r)
}
......@@ -125,6 +129,7 @@ func recvBMPMsg(c net.Conn) (msg []byte, err error) {
return buffer[0:toRead], nil
}
// GetRouters gets all routers
func (b *BMPServer) GetRouters() []*Router {
b.routersMu.RLock()
defer b.routersMu.RUnlock()
......@@ -137,6 +142,7 @@ func (b *BMPServer) GetRouters() []*Router {
return r
}
// GetRouter gets a router
func (b *BMPServer) GetRouter(name string) *Router {
b.routersMu.RLock()
defer b.routersMu.RUnlock()
......@@ -152,6 +158,7 @@ func (b *BMPServer) GetRouter(name string) *Router {
return nil
}
// Metrics gets BMP server metrics
func (b *BMPServer) Metrics() (*metrics.BMPMetrics, error) {
if b.metrics == nil {
return nil, fmt.Errorf("Server not started yet")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment