diff --git a/cmd/ris-mirror/config/config.go b/cmd/ris-mirror/config/config.go new file mode 100644 index 0000000000000000000000000000000000000000..6aa781f39c299a91abf9128e5a31f39d067857cc --- /dev/null +++ b/cmd/ris-mirror/config/config.go @@ -0,0 +1,105 @@ +package config + +import ( + "fmt" + "io/ioutil" + "net" + + "github.com/bio-routing/bio-rd/routingtable/vrf" + "github.com/pkg/errors" + "gopkg.in/yaml.v2" +) + +// RISMirrorConfig is the config of RISMirror instance +type RISMirrorConfig struct { + RIBConfigs []*RIBConfig `yaml:"ribs"` +} + +// RIBConfig is a RIB configuration +type RIBConfig struct { + Router string `yaml:"router"` + router net.IP + VRFs []string `yaml:"vrfs"` + vrfs []uint64 + IPVersions []uint8 `yaml:"IPVersions"` + SrcRISInstances []string `yaml:"source_ris_instances"` +} + +// GetRouter gets a routers IP address +func (rc *RIBConfig) GetRouter() net.IP { + return rc.router +} + +// GetVRFs gets a routers VRFs +func (rc *RIBConfig) GetVRFs() []uint64 { + return rc.vrfs +} + +// LoadConfig loads a RISMirror config +func LoadConfig(filepath string) (*RISMirrorConfig, error) { + f, err := ioutil.ReadFile(filepath) + if err != nil { + return nil, errors.Wrap(err, "Unable to read config file") + } + + cfg := &RISMirrorConfig{} + err = yaml.Unmarshal(f, cfg) + if err != nil { + return nil, errors.Wrap(err, "Unmarshal failed") + } + + for _, rc := range cfg.RIBConfigs { + err := rc.loadRouter() + if err != nil { + return nil, errors.Wrap(err, "Unable to load router config") + } + + err = rc.loadVRFs() + if err != nil { + return nil, errors.Wrap(err, "Unable to load VRFs") + } + } + + return cfg, nil +} + +func (r *RIBConfig) loadRouter() error { + addr := net.ParseIP(r.Router) + if addr == nil { + return fmt.Errorf("Unable to parse routers IP: %q", r.Router) + } + + r.router = addr + return nil +} + +func (r *RIBConfig) loadVRFs() error { + for _, vrfHuman := range r.VRFs { + vrfRD, err := vrf.ParseHumanReadableRouteDistinguisher(vrfHuman) + if err != nil { + return errors.Wrap(err, "Unable to parse VRF identifier") + } + + r.vrfs = append(r.vrfs, vrfRD) + } + + return nil +} + +// GetRISInstances returns a list of all RIS instances in the config +func (rismc *RISMirrorConfig) GetRISInstances() []string { + instances := make(map[string]struct{}) + + for _, r := range rismc.RIBConfigs { + for _, s := range r.SrcRISInstances { + instances[s] = struct{}{} + } + } + + ret := make([]string, 0) + for instance := range instances { + ret = append(ret, instance) + } + + return ret +} diff --git a/cmd/ris-mirror/main.go b/cmd/ris-mirror/main.go new file mode 100644 index 0000000000000000000000000000000000000000..25540961a26a892954494e67619c1c3a0e773b04 --- /dev/null +++ b/cmd/ris-mirror/main.go @@ -0,0 +1,88 @@ +package main + +import ( + "flag" + "os" + "time" + + "github.com/bio-routing/bio-rd/cmd/ris-mirror/config" + "github.com/bio-routing/bio-rd/cmd/ris-mirror/rismirror" + pb "github.com/bio-routing/bio-rd/cmd/ris/api" + "github.com/bio-routing/bio-rd/cmd/ris/risserver" + prom_grpc_cm "github.com/bio-routing/bio-rd/metrics/grpc/clientmanager/adapter/prom" + prom_ris_mirror "github.com/bio-routing/bio-rd/metrics/ris-mirror/adapter/prom" + "github.com/bio-routing/bio-rd/util/grpc/clientmanager" + "github.com/bio-routing/bio-rd/util/servicewrapper" + "github.com/prometheus/client_golang/prometheus" + log "github.com/sirupsen/logrus" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" +) + +var ( + grpcPort = flag.Uint("grpc_port", 4321, "gRPC server port") + httpPort = flag.Uint("http_port", 4320, "HTTP server port") + grpcKeepaliveMinTime = flag.Uint("grpc_keepalive_min_time", 1, "Minimum time (seconds) for a client to wait between GRPC keepalive pings") + risTimeout = flag.Uint("ris_timeout", 5, "RIS timeout in seconds") + configFilePath = flag.String("config.file", "ris_mirror.yml", "Configuration file") +) + +func main() { + flag.Parse() + + cfg, err := config.LoadConfig(*configFilePath) + if err != nil { + log.WithError(err).Fatal("Failed to load config") + } + + grpcClientManager := clientmanager.New() + for _, instance := range cfg.GetRISInstances() { + err := grpcClientManager.Add(instance, grpc.WithInsecure(), grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: time.Second * 10, + Timeout: time.Second * time.Duration(*risTimeout), + PermitWithoutStream: true, + })) + + if err != nil { + log.WithError(err).Fatal("GRPC clientmanager add failed") + } + } + + m := rismirror.New() + prometheus.MustRegister(prom_ris_mirror.NewCollector(m)) + prometheus.MustRegister(prom_grpc_cm.NewCollector(grpcClientManager)) + + for _, rcfg := range cfg.RIBConfigs { + for _, vrdRD := range rcfg.GetVRFs() { + srcs := make([]*grpc.ClientConn, 0) + for _, srcInstance := range rcfg.SrcRISInstances { + srcs = append(srcs, grpcClientManager.Get(srcInstance)) + } + + m.AddTarget(rcfg.Router, rcfg.GetRouter(), vrdRD, srcs) + } + } + + s := risserver.NewServer(m) + unaryInterceptors := []grpc.UnaryServerInterceptor{} + streamInterceptors := []grpc.StreamServerInterceptor{} + srv, err := servicewrapper.New( + uint16(*grpcPort), + servicewrapper.HTTP(uint16(*httpPort)), + unaryInterceptors, + streamInterceptors, + keepalive.EnforcementPolicy{ + MinTime: time.Duration(*grpcKeepaliveMinTime) * time.Second, + PermitWithoutStream: true, + }, + ) + if err != nil { + log.Errorf("failed to listen: %v", err) + os.Exit(1) + } + + pb.RegisterRoutingInformationServiceServer(srv.GRPC(), s) + if err := srv.Serve(); err != nil { + log.Fatalf("failed to start server: %v", err) + } +} diff --git a/cmd/ris-mirror/ris_mirror.yml b/cmd/ris-mirror/ris_mirror.yml new file mode 100644 index 0000000000000000000000000000000000000000..63e768f8f192ef2b09f89c7e6439c4c2f8c981a5 --- /dev/null +++ b/cmd/ris-mirror/ris_mirror.yml @@ -0,0 +1,4 @@ +ribs: + - router: 10.200.0.1 + vrfs: [51324:65201] + source_ris_instances: ["ncs01.lej01.srv.exaring.net:4321", "ncs02.lej01.srv.exaring.net:4321"] \ No newline at end of file diff --git a/cmd/ris-mirror/rismirror/metrics/ris_mirror_metrics.go b/cmd/ris-mirror/rismirror/metrics/ris_mirror_metrics.go new file mode 100644 index 0000000000000000000000000000000000000000..3d8948cb7c22582a52d60c687886b20d3396e746 --- /dev/null +++ b/cmd/ris-mirror/rismirror/metrics/ris_mirror_metrics.go @@ -0,0 +1,28 @@ +package metrics + +import ( + "net" + + mlrib_metrics "github.com/bio-routing/bio-rd/routingtable/mergedlocrib/metrics" + vrf_metrics "github.com/bio-routing/bio-rd/routingtable/vrf/metrics" +) + +// RISMirrorMetrics contains per router BMP metrics +type RISMirrorMetrics struct { + Routers []*RISMirrorRouterMetrics +} + +// RISMirrorRouterMetrics contains a routers RIS mirror metrics +type RISMirrorRouterMetrics struct { + Address net.IP + SysName string + VRFMetrics []*vrf_metrics.VRFMetrics + InternalVRFMetrics []*InternalVRFMetrics +} + +// InternalVRFMetrics represents internal VRF metrics (_vrf) +type InternalVRFMetrics struct { + RD uint64 + MergedLocRIBMetricsIPv4Unicast *mlrib_metrics.MergedLocRIBMetrics + MergedLocRIBMetricsIPv6Unicast *mlrib_metrics.MergedLocRIBMetrics +} diff --git a/cmd/ris-mirror/rismirror/rismirror.go b/cmd/ris-mirror/rismirror/rismirror.go new file mode 100644 index 0000000000000000000000000000000000000000..3fed598fbbeb48ae89ff6ceee9f2d90a91285766 --- /dev/null +++ b/cmd/ris-mirror/rismirror/rismirror.go @@ -0,0 +1,90 @@ +package rismirror + +import ( + "net" + "sync" + + "github.com/bio-routing/bio-rd/cmd/ris-mirror/rismirror/metrics" + "github.com/bio-routing/bio-rd/protocols/bgp/server" + "github.com/bio-routing/bio-rd/routingtable/vrf" + "google.golang.org/grpc" +) + +type RISMirror struct { + routers map[string]server.RouterInterface + routersMu sync.Mutex +} + +// New creates a new RISMirror +func New() *RISMirror { + return &RISMirror{ + routers: make(map[string]server.RouterInterface), + } +} + +// AddTarget adds a target to the RISMirror +func (rism *RISMirror) AddTarget(rtrName string, address net.IP, vrfRD uint64, sources []*grpc.ClientConn) { + rism.routersMu.Lock() + defer rism.routersMu.Unlock() + + if _, exists := rism.routers[rtrName]; !exists { + rism.routers[rtrName] = newRouter(rtrName, address) + } + + r := rism.routers[rtrName].(*Router) + r.addVRF(vrfRD, sources) +} + +// GetRouter gets a router +func (rism *RISMirror) GetRouter(rtr string) server.RouterInterface { + rism.routersMu.Lock() + defer rism.routersMu.Unlock() + + if _, exists := rism.routers[rtr]; !exists { + return nil + } + + return rism.routers[rtr] +} + +// GetRouters gets all routers +func (rism *RISMirror) GetRouters() []server.RouterInterface { + res := make([]server.RouterInterface, 0) + + for _, r := range rism.routers { + res = append(res, r) + } + + return res +} + +// Metrics gets a RISMirrors metrics +func (rism *RISMirror) Metrics() *metrics.RISMirrorMetrics { + res := &metrics.RISMirrorMetrics{ + Routers: make([]*metrics.RISMirrorRouterMetrics, 0), + } + + rism.routersMu.Lock() + defer rism.routersMu.Unlock() + + for _, r := range rism.routers { + rm := &metrics.RISMirrorRouterMetrics{ + Address: r.Address(), + SysName: r.Name(), + VRFMetrics: vrf.Metrics(r.(*Router).vrfRegistry), + InternalVRFMetrics: make([]*metrics.InternalVRFMetrics, 0), + } + + for rd, v := range r.(*Router).vrfs { + rm.InternalVRFMetrics = append(rm.InternalVRFMetrics, &metrics.InternalVRFMetrics{ + RD: rd, + MergedLocRIBMetricsIPv4Unicast: v.ipv4Unicast.Metrics(), + MergedLocRIBMetricsIPv6Unicast: v.ipv6Unicast.Metrics(), + }) + } + + res.Routers = append(res.Routers, rm) + } + + return res +} diff --git a/cmd/ris-mirror/rismirror/router.go b/cmd/ris-mirror/rismirror/router.go new file mode 100644 index 0000000000000000000000000000000000000000..eb8cb8061c19994160ee33dec07af0523ae4356c --- /dev/null +++ b/cmd/ris-mirror/rismirror/router.go @@ -0,0 +1,78 @@ +package rismirror + +import ( + "fmt" + "net" + + "github.com/bio-routing/bio-rd/risclient" + "github.com/bio-routing/bio-rd/routingtable/vrf" + "google.golang.org/grpc" + + "github.com/bio-routing/bio-rd/cmd/ris/api" +) + +// Router represents a router +type Router struct { + name string + address net.IP + vrfRegistry *vrf.VRFRegistry + vrfs map[uint64]*_vrf +} + +func newRouter(name string, address net.IP) *Router { + return &Router{ + name: name, + address: address, + vrfRegistry: vrf.NewVRFRegistry(), + vrfs: make(map[uint64]*_vrf), + } +} + +// Name gets the routers name +func (r *Router) Name() string { + return r.name +} + +// Address gets a routers address +func (r *Router) Address() net.IP { + return r.address +} + +// GetVRF gets a VRF by its ID +func (r *Router) GetVRF(vrfID uint64) *vrf.VRF { + return r.vrfRegistry.GetVRFByRD(vrfID) +} + +// GetVRFs gets all VRFs +func (r *Router) GetVRFs() []*vrf.VRF { + return r.vrfRegistry.List() +} + +func (r *Router) addVRF(rd uint64, sources []*grpc.ClientConn) { + v := r.vrfRegistry.CreateVRFIfNotExists(fmt.Sprintf("%d", rd), rd) + + r.vrfs[rd] = newVRF(v.IPv4UnicastRIB(), v.IPv6UnicastRIB()) + + for _, src := range sources { + r.connectVRF(rd, src, 4) + r.connectVRF(rd, src, 6) + } +} + +func (r *Router) connectVRF(rd uint64, src *grpc.ClientConn, afi uint8) { + rc := risclient.New(&risclient.Request{ + Router: r.name, + VRFRD: rd, + AFI: apiAFI(afi), + }, src, r.vrfs[rd].getRIB(afi)) + + rc.Start() +} + +func apiAFI(afi uint8) api.ObserveRIBRequest_AFISAFI { + if afi == 6 { + return api.ObserveRIBRequest_IPv6Unicast + } + + return api.ObserveRIBRequest_IPv4Unicast +} diff --git a/cmd/ris-mirror/rismirror/vrf.go b/cmd/ris-mirror/rismirror/vrf.go new file mode 100644 index 0000000000000000000000000000000000000000..40ddfb16d81249bb6b988a308877ef5bfead0bb7 --- /dev/null +++ b/cmd/ris-mirror/rismirror/vrf.go @@ -0,0 +1,26 @@ +package rismirror + +import ( + "github.com/bio-routing/bio-rd/routingtable/locRIB" + "github.com/bio-routing/bio-rd/routingtable/mergedlocrib" +) + +type _vrf struct { + ipv4Unicast *mergedlocrib.MergedLocRIB + ipv6Unicast *mergedlocrib.MergedLocRIB +} + +func newVRF(locRIBIPv4Unicast, locRIBIPv6Unicast *locRIB.LocRIB) *_vrf { + return &_vrf{ + ipv4Unicast: mergedlocrib.New(locRIBIPv4Unicast), + ipv6Unicast: mergedlocrib.New(locRIBIPv6Unicast), + } +} + +func (v *_vrf) getRIB(afi uint8) *mergedlocrib.MergedLocRIB { + if afi == 6 { + return v.ipv6Unicast + } + + return v.ipv4Unicast +} diff --git a/cmd/ris/risserver/server.go b/cmd/ris/risserver/server.go index ba9230161def689ed94673d9e0a60d0c668581a4..07bd98d17f62e2ec81a8f947e829d322376dbc04 100644 --- a/cmd/ris/risserver/server.go +++ b/cmd/ris/risserver/server.go @@ -44,11 +44,11 @@ func init() { // Server represents an RoutingInformationService server type Server struct { - bmp *server.BMPServer + bmp server.BMPServerInterface } // NewServer creates a new server -func NewServer(b *server.BMPServer) *Server { +func NewServer(b server.BMPServerInterface) *Server { return &Server{ bmp: b, } diff --git a/metrics/grpc/clientmanager/adapter/prom/clientmanager_prom.go b/metrics/grpc/clientmanager/adapter/prom/clientmanager_prom.go new file mode 100644 index 0000000000000000000000000000000000000000..7473284c123b16212339bcc1f75e352321435083 --- /dev/null +++ b/metrics/grpc/clientmanager/adapter/prom/clientmanager_prom.go @@ -0,0 +1,44 @@ +package prom + +import ( + "github.com/bio-routing/bio-rd/util/grpc/clientmanager" + "github.com/prometheus/client_golang/prometheus" +) + +const ( + prefix = "bio_grpc_clientmanager_" +) + +var ( + connectionStateDesc *prometheus.Desc +) + +func init() { + labels := []string{"target"} + connectionStateDesc = prometheus.NewDesc(prefix+"connection_state", "Connection state, 0=IDLE,1=CONNECTING,2=READY,3=TRANSIENT_FAILURE,4=SHUTDOWN", labels, nil) +} + +// NewCollector creates a new collector instance for the given clientmanager +func NewCollector(cm *clientmanager.ClientManager) prometheus.Collector { + return &grpcClientManagerCollector{ + cm: cm, + } +} + +// grpcClientManagerCollector provides a collector for RIS metrics of BIO to use with Prometheus +type grpcClientManagerCollector struct { + cm *clientmanager.ClientManager +} + +// Describe conforms to the prometheus collector interface +func (c *grpcClientManagerCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- connectionStateDesc +} + +// Collect conforms to the prometheus collector interface +func (c *grpcClientManagerCollector) Collect(ch chan<- prometheus.Metric) { + for _, con := range c.cm.Metrics().Connections { + l := []string{con.Target} + ch <- prometheus.MustNewConstMetric(connectionStateDesc, prometheus.GaugeValue, float64(con.State), l...) + } +} diff --git a/metrics/ris-mirror/adapter/prom/ris_mirror_prom_adapter.go b/metrics/ris-mirror/adapter/prom/ris_mirror_prom_adapter.go new file mode 100644 index 0000000000000000000000000000000000000000..da40d8df8274eec72abdbaa2c15dc669fa020ea6 --- /dev/null +++ b/metrics/ris-mirror/adapter/prom/ris_mirror_prom_adapter.go @@ -0,0 +1,89 @@ +package prom + +import ( + "fmt" + + "github.com/bio-routing/bio-rd/cmd/ris-mirror/rismirror" + "github.com/bio-routing/bio-rd/cmd/ris-mirror/rismirror/metrics" + "github.com/bio-routing/bio-rd/protocols/bgp/packet" + "github.com/bio-routing/bio-rd/routingtable/vrf" + "github.com/prometheus/client_golang/prometheus" + + vrf_prom "github.com/bio-routing/bio-rd/metrics/vrf/adapter/prom" +) + +const ( + prefix = "bio_rismirror_" +) + +var ( + mergedLocalRIBRouteCount *prometheus.Desc + mergedLocalRIBSingleSourceRouteCount *prometheus.Desc +) + +func init() { + labels := []string{"sys_name", "agent_address", "vrf", "afi", "rib"} + mergedLocalRIBRouteCount = prometheus.NewDesc(prefix+"merged_locrib_route_count", "Number of unique routes", labels, nil) + mergedLocalRIBSingleSourceRouteCount = prometheus.NewDesc(prefix+"merged_locrib_single_source_route_count", "Number of routes seen from single source", labels, nil) +} + +// NewCollector creates a new collector instance for the given RIS mirror server +func NewCollector(risMirror *rismirror.RISMirror) prometheus.Collector { + return &risCollector{ + risMirror: risMirror, + } +} + +// risCollector provides a collector for RIS metrics of BIO to use with Prometheus +type risCollector struct { + risMirror *rismirror.RISMirror +} + +// Describe conforms to the prometheus collector interface +func (c *risCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- mergedLocalRIBRouteCount + ch <- mergedLocalRIBSingleSourceRouteCount + + vrf_prom.DescribeRouter(ch) +} + +// Collect conforms to the prometheus collector interface +func (c *risCollector) Collect(ch chan<- prometheus.Metric) { + for _, rtr := range c.risMirror.Metrics().Routers { + c.collectForRouter(ch, rtr) + } +} + +func (c *risCollector) collectForRouter(ch chan<- prometheus.Metric, rtr *metrics.RISMirrorRouterMetrics) { + for _, vrfMetric := range rtr.VRFMetrics { + vrf_prom.CollectForVRFRouter(ch, rtr.SysName, rtr.Address.String(), vrfMetric) + } + + for _, x := range rtr.InternalVRFMetrics { + c.collectMergedLocRIBMetrics(ch, rtr, x) + } +} + +func (c *risCollector) collectMergedLocRIBMetrics(ch chan<- prometheus.Metric, rtr *metrics.RISMirrorRouterMetrics, v *metrics.InternalVRFMetrics) { + ch <- prometheus.MustNewConstMetric(mergedLocalRIBRouteCount, prometheus.GaugeValue, float64(v.MergedLocRIBMetricsIPv4Unicast.UniqueRouteCount), + getMergedLocRIBMetricsLabels(rtr, v, packet.IPv4AFI)...) + + ch <- prometheus.MustNewConstMetric(mergedLocalRIBRouteCount, prometheus.GaugeValue, float64(v.MergedLocRIBMetricsIPv6Unicast.UniqueRouteCount), + getMergedLocRIBMetricsLabels(rtr, v, packet.IPv6AFI)...) + + ch <- prometheus.MustNewConstMetric(mergedLocalRIBSingleSourceRouteCount, prometheus.GaugeValue, float64(v.MergedLocRIBMetricsIPv4Unicast.RoutesWithSingleSourceCount), + getMergedLocRIBMetricsLabels(rtr, v, packet.IPv4AFI)...) + + ch <- prometheus.MustNewConstMetric(mergedLocalRIBSingleSourceRouteCount, prometheus.GaugeValue, float64(v.MergedLocRIBMetricsIPv6Unicast.RoutesWithSingleSourceCount), + getMergedLocRIBMetricsLabels(rtr, v, packet.IPv6AFI)...) +} + +func getMergedLocRIBMetricsLabels(rtr *metrics.RISMirrorRouterMetrics, v *metrics.InternalVRFMetrics, afi uint8) []string { + ret := []string{rtr.SysName, rtr.Address.String(), vrf.RouteDistinguisherHumanReadable(v.RD), fmt.Sprintf("%d", afi)} + + if afi == packet.IPv4AFI { + return append(ret, v.MergedLocRIBMetricsIPv4Unicast.RIBName) + } + + return append(ret, v.MergedLocRIBMetricsIPv6Unicast.RIBName) +} diff --git a/metrics/vrf/adapter/prom/vrf_prom_adapter.go b/metrics/vrf/adapter/prom/vrf_prom_adapter.go index d55e706ec5856d8d5d907dd89e3c3919701466f6..0342801880d18a2cd21abf9c6c786ba13f6383e3 100644 --- a/metrics/vrf/adapter/prom/vrf_prom_adapter.go +++ b/metrics/vrf/adapter/prom/vrf_prom_adapter.go @@ -18,7 +18,7 @@ var ( ) func init() { - labels := []string{"vrf", "rib", "afi", "safi"} + labels := []string{"vrf_name", "vrf_rd", "rib", "afi", "safi"} routeCountDesc = prometheus.NewDesc(prefix+"route_count", "Number of routes in the RIB", labels, nil) routeCountDescRouter = prometheus.NewDesc(prefix+"route_count", "Number of routes in the RIB", append([]string{"sys_name", "agent_address"}, labels...), nil) } @@ -55,7 +55,7 @@ func (c *vrfCollector) Collect(ch chan<- prometheus.Metric) { func (c *vrfCollector) collectForVRF(ch chan<- prometheus.Metric, v *metrics.VRFMetrics) { for _, rib := range v.RIBs { ch <- prometheus.MustNewConstMetric(routeCountDesc, prometheus.GaugeValue, float64(rib.RouteCount), - v.Name, rib.Name, strconv.Itoa(int(rib.AFI)), strconv.Itoa(int(rib.SAFI))) + v.Name, vrf.RouteDistinguisherHumanReadable(v.RD), rib.Name, strconv.Itoa(int(rib.AFI)), strconv.Itoa(int(rib.SAFI))) } } @@ -63,6 +63,6 @@ func (c *vrfCollector) collectForVRF(ch chan<- prometheus.Metric, v *metrics.VRF func CollectForVRFRouter(ch chan<- prometheus.Metric, sysName string, agentAddress string, v *metrics.VRFMetrics) { for _, rib := range v.RIBs { ch <- prometheus.MustNewConstMetric(routeCountDescRouter, prometheus.GaugeValue, float64(rib.RouteCount), - sysName, agentAddress, v.Name, rib.Name, strconv.Itoa(int(rib.AFI)), strconv.Itoa(int(rib.SAFI))) + sysName, agentAddress, v.Name, vrf.RouteDistinguisherHumanReadable(v.RD), rib.Name, strconv.Itoa(int(rib.AFI)), strconv.Itoa(int(rib.SAFI))) } } diff --git a/protocols/bgp/server/bmp_router.go b/protocols/bgp/server/bmp_router.go index 5f93feb98a45e9a40c9340a825ffde5b0dfe7158..f4b9343d87a7cc48b906fbaf6d0399a968836791 100644 --- a/protocols/bgp/server/bmp_router.go +++ b/protocols/bgp/server/bmp_router.go @@ -20,6 +20,13 @@ import ( "github.com/bio-routing/tflow2/convert" ) +type RouterInterface interface { + Name() string + Address() net.IP + GetVRF(vrfID uint64) *vrf.VRF + GetVRFs() []*vrf.VRF +} + // Router represents a BMP enabled route in BMP context type Router struct { name string diff --git a/protocols/bgp/server/bmp_server.go b/protocols/bgp/server/bmp_server.go index 5fb1bb9b53be2c7c42056d91f7508efff46ac549..85caa197c2a6df0ca6cd9564dbefce86ad85c115 100644 --- a/protocols/bgp/server/bmp_server.go +++ b/protocols/bgp/server/bmp_server.go @@ -21,6 +21,11 @@ const ( defaultBufferLen = 4096 ) +type BMPServerInterface interface { + GetRouter(rtr string) RouterInterface + GetRouters() []RouterInterface +} + // BMPServer represents a BMP server type BMPServer struct { routers map[string]*Router @@ -170,11 +175,11 @@ func recvBMPMsg(c net.Conn) (msg []byte, err error) { } // GetRouters gets all routers -func (b *BMPServer) GetRouters() []*Router { +func (b *BMPServer) GetRouters() []RouterInterface { b.routersMu.RLock() defer b.routersMu.RUnlock() - r := make([]*Router, 0, len(b.routers)) + r := make([]RouterInterface, 0, len(b.routers)) for name := range b.routers { r = append(r, b.routers[name]) } @@ -183,7 +188,7 @@ func (b *BMPServer) GetRouters() []*Router { } // GetRouter gets a router -func (b *BMPServer) GetRouter(name string) *Router { +func (b *BMPServer) GetRouter(name string) RouterInterface { b.routersMu.RLock() defer b.routersMu.RUnlock() diff --git a/protocols/bgp/server/bmp_server_test.go b/protocols/bgp/server/bmp_server_test.go index fef31b5c9822d4e5fdad754283adfb9da0d87ee7..9ff9d124f327db36e40607e2c27c5baa58c582f7 100644 --- a/protocols/bgp/server/bmp_server_test.go +++ b/protocols/bgp/server/bmp_server_test.go @@ -75,7 +75,7 @@ func TestBMPServer(t *testing.T) { return } - aaaa := srv.GetRouter("10.0.255.1") + aaaa := srv.GetRouter("10.0.255.1").(*Router) if aaaa == nil { t.Errorf("Router AAAA not found") return diff --git a/risclient/risclient.go b/risclient/risclient.go new file mode 100644 index 0000000000000000000000000000000000000000..14ae89f41996448d18b2d4c634f920a31f7a917b --- /dev/null +++ b/risclient/risclient.go @@ -0,0 +1,156 @@ +package risclient + +import ( + "context" + "io" + "sync" + + risapi "github.com/bio-routing/bio-rd/cmd/ris/api" + routeapi "github.com/bio-routing/bio-rd/route/api" + "google.golang.org/grpc" + + log "github.com/sirupsen/logrus" +) + +// Client is a client interface +type Client interface { + AddRoute(src interface{}, r *routeapi.Route) error + RemoveRoute(src interface{}, r *routeapi.Route) error + DropAllBySrc(src interface{}) +} + +// RISClient represents a RIS client +type RISClient struct { + req *Request + cc *grpc.ClientConn + c Client + stopCh chan struct{} + wg sync.WaitGroup +} + +// Request is a RISClient config +type Request struct { + Router string + VRFRD uint64 + AFI risapi.ObserveRIBRequest_AFISAFI +} + +func (r *Request) toProtoRequest() *risapi.ObserveRIBRequest { + return &risapi.ObserveRIBRequest{ + Router: r.Router, + VrfId: r.VRFRD, + Afisafi: r.AFI, + } +} + +// New creates a new RISClient +func New(req *Request, cc *grpc.ClientConn, c Client) *RISClient { + return &RISClient{ + req: req, + cc: cc, + c: c, + stopCh: make(chan struct{}), + } +} + +// Stop stops the client +func (r *RISClient) Stop() { + close(r.stopCh) +} + +// Start starts the client +func (r *RISClient) Start() { + r.wg.Add(1) + + go r.run() +} + +// Wait blocks until the client is fully stopped +func (r *RISClient) Wait() { + r.wg.Wait() +} + +func (r *RISClient) stopped() bool { + select { + case <-r.stopCh: + return true + default: + return false + } +} + +func (r *RISClient) run() { + for { + if r.stopped() { + return + } + + risc := risapi.NewRoutingInformationServiceClient(r.cc) + + orc, err := risc.ObserveRIB(context.Background(), r.req.toProtoRequest(), grpc.WaitForReady(true)) + if err != nil { + log.WithError(err).Error("ObserveRIB call failed") + continue + } + + err = r.serviceLoop(orc) + if err == nil { + return + } + + r.serviceLoopLogging(err) + } +} + +func (r *RISClient) serviceLoopLogging(err error) { + if err == io.EOF { + log.WithError(err).WithFields(log.Fields{ + "component": "RISClient", + "function": "run", + }).Info("ObserveRIB ended") + return + } + + log.WithError(err).WithFields(log.Fields{ + "component": "RISClient", + "function": "run", + }).Error("ObserveRIB ended") +} + +func (r *RISClient) serviceLoop(orc risapi.RoutingInformationService_ObserveRIBClient) error { + defer r.processDownEvent() + + for { + if r.stopped() { + return nil + } + + u, err := orc.Recv() + if err != nil { + return err + } + + r.processUpdate(u) + } +} + +func (r *RISClient) processUpdate(u *risapi.RIBUpdate) { + if u.Advertisement { + r.processAdvertisement(u) + return + } + + r.processWithdraw(u) +} + +func (r *RISClient) processAdvertisement(u *risapi.RIBUpdate) { + r.c.AddRoute(r.cc, u.Route) +} + +func (r *RISClient) processWithdraw(u *risapi.RIBUpdate) { + r.c.RemoveRoute(r.cc, u.Route) +} + +func (r *RISClient) processDownEvent() { + r.c.DropAllBySrc(r.cc) +} diff --git a/route/path.go b/route/path.go index 4d36544331c35493e8f41b2d3843e8a4d4b56a68..3a7d5165a234d392666bee792d2b18291af3d4b3 100644 --- a/route/path.go +++ b/route/path.go @@ -151,7 +151,7 @@ func (p *Path) String() string { case FIBPathType: return p.FIBPath.String() default: - return "Unknown paty type. Probably not implemented yet" + return fmt.Sprintf("Unknown path type. Probably not implemented yet (%d)", p.Type) } } diff --git a/route/route.go b/route/route.go index fd92772e652f2986e73a25366493331488663652..8b3966f230220d0bffce8d90fe69757c2c41b82e 100644 --- a/route/route.go +++ b/route/route.go @@ -279,6 +279,9 @@ func RouteFromProtoRoute(ar *api.Route, dedup bool) *Route { case api.Path_BGP: p.Type = BGPPathType p.BGPPath = BGPPathFromProtoBGPPath(ar.Paths[i].BgpPath, dedup) + case api.Path_Static: + p.Type = StaticPathType + p.StaticPath = StaticPathFromProtoStaticPath(ar.Paths[i].StaticPath, dedup) } r.paths = append(r.paths, p) @@ -288,6 +291,11 @@ func RouteFromProtoRoute(ar *api.Route, dedup bool) *Route { } func (r *Route) updateEqualPathCount() { + if len(r.paths) == 0 { + r.ecmpPaths = 0 + return + } + count := uint(1) for i := 0; i < len(r.paths)-1; i++ { if !r.paths[i].ECMP(r.paths[i+1]) { diff --git a/route/static.go b/route/static.go index a4285223ea111944842da2eed544bf108eace7ac..b056fd34e9d096102e2c0512ca755525f3381ad6 100644 --- a/route/static.go +++ b/route/static.go @@ -58,3 +58,10 @@ func (s *StaticPath) ToProto() *api.StaticPath { NextHop: s.NextHop.ToProto(), } } + +// StaticPathFromProtoStaticPath converts a proto StaticPath to StaticPath +func StaticPathFromProtoStaticPath(pb *api.StaticPath, dedup bool) *StaticPath { + return &StaticPath{ + NextHop: bnet.IPFromProtoIP(pb.NextHop), + } +} diff --git a/routingtable/mergedlocrib/mergedlocrib.go b/routingtable/mergedlocrib/mergedlocrib.go new file mode 100644 index 0000000000000000000000000000000000000000..ab97c0fe0cb54d7560d2472d4de217628bc3352c --- /dev/null +++ b/routingtable/mergedlocrib/mergedlocrib.go @@ -0,0 +1,131 @@ +package mergedlocrib + +import ( + "crypto/sha1" + "sync" + + "github.com/bio-routing/bio-rd/route" + routeapi "github.com/bio-routing/bio-rd/route/api" + "github.com/bio-routing/bio-rd/routingtable/locRIB" + "github.com/bio-routing/bio-rd/routingtable/mergedlocrib/metrics" + "github.com/golang/protobuf/proto" + "github.com/pkg/errors" +) + +// MergedLocRIB provides an deduplicated routing table +type MergedLocRIB struct { + routes map[[sha1.Size]byte]*routeContainer + routesMu sync.RWMutex + locRIB *locRIB.LocRIB +} + +// New creates a new MergedLocRIB and starts it +func New(locRIB *locRIB.LocRIB) *MergedLocRIB { + return &MergedLocRIB{ + routes: make(map[[sha1.Size]byte]*routeContainer), + locRIB: locRIB, + } +} + +// DropAllBySrc drops all routes learned from a source +func (rtm *MergedLocRIB) DropAllBySrc(src interface{}) { + rtm.routesMu.Lock() + defer rtm.routesMu.Unlock() + + for h, rc := range rtm.routes { + rtm._delRoute(h, src, rc.route) + } +} + +// AddRoute adds a route +func (rtm *MergedLocRIB) AddRoute(cc interface{}, r *routeapi.Route) error { + h, err := hashRoute(r) + if err != nil { + return errors.Wrap(err, "Hashing failed") + } + + rtm.routesMu.Lock() + defer rtm.routesMu.Unlock() + + if _, exists := rtm.routes[h]; !exists { + s := route.RouteFromProtoRoute(r, true) + rtm.routes[h] = newRouteContainer(r, cc) + rtm.locRIB.AddPath(s.Prefix(), s.Paths()[0]) + return nil + } + + rtm.routes[h].addSource(cc) + return nil +} + +// RemoveRoute deletes a route +func (rtm *MergedLocRIB) RemoveRoute(cc interface{}, r *routeapi.Route) error { + h, err := hashRoute(r) + if err != nil { + return errors.Wrap(err, "Hashing failed") + } + + rtm.routesMu.Lock() + defer rtm.routesMu.Unlock() + + if _, exists := rtm.routes[h]; !exists { + return nil + } + + rtm._delRoute(h, cc, r) + return nil +} + +func (rtm *MergedLocRIB) _delRoute(h [sha1.Size]byte, src interface{}, r *routeapi.Route) { + rtm.routes[h].removeSource(src) + + if rtm.routes[h].srcCount() > 0 { + return + } + + s := route.RouteFromProtoRoute(r, true) + rtm.locRIB.RemovePath(s.Prefix(), s.Paths()[0]) + delete(rtm.routes, h) +} + +func hashRoute(route *routeapi.Route) ([sha1.Size]byte, error) { + m, err := proto.Marshal(route) + if err != nil { + return [sha1.Size]byte{}, errors.Wrap(err, "Proto marshal failed") + } + + h := sha1.New() + _, err = h.Write(m) + if err != nil { + return [sha1.Size]byte{}, errors.Wrap(err, "Write failed") + } + res := [sha1.Size]byte{} + x := h.Sum(nil) + copy(res[:], x) + + return res, nil +} + +// Metrics gets the metrics +func (rtm *MergedLocRIB) Metrics() *metrics.MergedLocRIBMetrics { + rtm.routesMu.RLock() + defer rtm.routesMu.RUnlock() + + return &metrics.MergedLocRIBMetrics{ + RIBName: rtm.locRIB.Name(), + UniqueRouteCount: uint64(len(rtm.routes)), + RoutesWithSingleSourceCount: rtm._getRoutesWithSingleSourceCount(), + } +} + +func (rtm *MergedLocRIB) _getRoutesWithSingleSourceCount() uint64 { + n := uint64(0) + + for _, r := range rtm.routes { + if len(r.sources) == 1 { + n++ + } + } + + return n +} diff --git a/routingtable/mergedlocrib/mergedlocrib_test.go b/routingtable/mergedlocrib/mergedlocrib_test.go new file mode 100644 index 0000000000000000000000000000000000000000..c0e3adc3983669011c5dfab7a77fe6e3fcb903a4 --- /dev/null +++ b/routingtable/mergedlocrib/mergedlocrib_test.go @@ -0,0 +1,234 @@ +package mergedlocrib + +import ( + "testing" + + "github.com/bio-routing/bio-rd/route" + routeapi "github.com/bio-routing/bio-rd/route/api" + "github.com/bio-routing/bio-rd/routingtable/locRIB" + "github.com/stretchr/testify/assert" + + bnet "github.com/bio-routing/bio-rd/net" +) + +type srcRouteTuple struct { + src interface{} + route *routeapi.Route +} + +func TestMergedLocRIB(t *testing.T) { + tests := []struct { + name string + add []*srcRouteTuple + expectedAfterAdd []*route.Route + remove []*srcRouteTuple + expectedAfterRemove []*route.Route + }{ + { + name: "Test #1: Single source", + add: []*srcRouteTuple{ + { + src: "a", + route: &routeapi.Route{ + Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(), + Paths: []*routeapi.Path{ + { + Type: routeapi.Path_Static, + StaticPath: &routeapi.StaticPath{ + NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(), + }, + }, + }, + }, + }, + }, + expectedAfterAdd: []*route.Route{ + route.NewRoute(bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).Ptr(), &route.Path{ + Type: route.StaticPathType, + StaticPath: &route.StaticPath{ + NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).Ptr(), + }, + }), + }, + remove: []*srcRouteTuple{ + { + src: "a", + route: &routeapi.Route{ + Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(), + Paths: []*routeapi.Path{ + { + Type: routeapi.Path_Static, + StaticPath: &routeapi.StaticPath{ + NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(), + }, + }, + }, + }, + }, + }, + expectedAfterRemove: []*route.Route{}, + }, + { + name: "Test #2: Multiple source, single delete", + add: []*srcRouteTuple{ + { + src: "a", + route: &routeapi.Route{ + Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(), + Paths: []*routeapi.Path{ + { + Type: routeapi.Path_Static, + StaticPath: &routeapi.StaticPath{ + NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(), + }, + }, + }, + }, + }, + { + src: "b", + route: &routeapi.Route{ + Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(), + Paths: []*routeapi.Path{ + { + Type: routeapi.Path_Static, + StaticPath: &routeapi.StaticPath{ + NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(), + }, + }, + }, + }, + }, + }, + expectedAfterAdd: []*route.Route{ + route.NewRoute(bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).Ptr(), &route.Path{ + Type: route.StaticPathType, + StaticPath: &route.StaticPath{ + NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).Ptr(), + }, + }), + }, + remove: []*srcRouteTuple{ + { + src: "a", + route: &routeapi.Route{ + Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(), + Paths: []*routeapi.Path{ + { + Type: routeapi.Path_Static, + StaticPath: &routeapi.StaticPath{ + NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(), + }, + }, + }, + }, + }, + }, + expectedAfterRemove: []*route.Route{ + route.NewRoute(bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).Ptr(), &route.Path{ + Type: route.StaticPathType, + StaticPath: &route.StaticPath{ + NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).Ptr(), + }, + }), + }, + }, + { + name: "Test #3: Multiple source, double delete", + add: []*srcRouteTuple{ + { + src: "a", + route: &routeapi.Route{ + Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(), + Paths: []*routeapi.Path{ + { + Type: routeapi.Path_Static, + StaticPath: &routeapi.StaticPath{ + NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(), + }, + }, + }, + }, + }, + { + src: "b", + route: &routeapi.Route{ + Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(), + Paths: []*routeapi.Path{ + { + Type: routeapi.Path_Static, + StaticPath: &routeapi.StaticPath{ + NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(), + }, + }, + }, + }, + }, + }, + expectedAfterAdd: []*route.Route{ + route.NewRoute(bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).Ptr(), &route.Path{ + Type: route.StaticPathType, + StaticPath: &route.StaticPath{ + NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).Ptr(), + }, + }), + }, + remove: []*srcRouteTuple{ + { + src: "a", + route: &routeapi.Route{ + Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(), + Paths: []*routeapi.Path{ + { + Type: routeapi.Path_Static, + StaticPath: &routeapi.StaticPath{ + NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(), + }, + }, + }, + }, + }, + { + src: "b", + route: &routeapi.Route{ + Pfx: bnet.NewPfx(bnet.IPv4FromOctets(10, 0, 0, 0), 8).ToProto(), + Paths: []*routeapi.Path{ + { + Type: routeapi.Path_Static, + StaticPath: &routeapi.StaticPath{ + NextHop: bnet.IPv4FromOctets(1, 1, 1, 1).ToProto(), + }, + }, + }, + }, + }, + }, + expectedAfterRemove: []*route.Route{}, + }, + } + + for _, test := range tests { + lr := locRIB.New("test") + rtm := New(lr) + + for _, a := range test.add { + rtm.AddRoute(a.src, a.route) + } + + selectPaths(test.expectedAfterAdd) + assert.Equal(t, test.expectedAfterAdd, lr.Dump(), test.name) + + for _, r := range test.remove { + rtm.RemoveRoute(r.src, r.route) + } + + selectPaths(test.expectedAfterRemove) + assert.Equal(t, test.expectedAfterRemove, lr.Dump(), test.name) + } +} + +func selectPaths(routes []*route.Route) { + for _, r := range routes { + r.PathSelection() + } +} diff --git a/routingtable/mergedlocrib/metrics/mergedlocrib_metrics.go b/routingtable/mergedlocrib/metrics/mergedlocrib_metrics.go new file mode 100644 index 0000000000000000000000000000000000000000..bcafe41b7e692c28c7cf2dd71fc7d4844ff02d34 --- /dev/null +++ b/routingtable/mergedlocrib/metrics/mergedlocrib_metrics.go @@ -0,0 +1,8 @@ +package metrics + +// MergedLocRIBMetrics represents merged local rib metrics +type MergedLocRIBMetrics struct { + RIBName string + UniqueRouteCount uint64 + RoutesWithSingleSourceCount uint64 +} diff --git a/routingtable/mergedlocrib/routecontainer.go b/routingtable/mergedlocrib/routecontainer.go new file mode 100644 index 0000000000000000000000000000000000000000..51b42ac20c2b2530645ec21a39ad5f007ae3a88e --- /dev/null +++ b/routingtable/mergedlocrib/routecontainer.go @@ -0,0 +1,46 @@ +package mergedlocrib + +import ( + routeapi "github.com/bio-routing/bio-rd/route/api" +) + +// routeContainer groups a route with one ore multiple source the route was received from +type routeContainer struct { + route *routeapi.Route + sources []interface{} +} + +func newRouteContainer(route *routeapi.Route, source interface{}) *routeContainer { + return &routeContainer{ + route: route, + sources: []interface{}{source}, + } +} + +func (rc *routeContainer) addSource(src interface{}) { + rc.sources = append(rc.sources, src) +} + +func (rc *routeContainer) removeSource(src interface{}) { + i := rc.getSourceIndex(src) + if i < 0 { + return + } + + rc.sources[i] = rc.sources[len(rc.sources)-1] + rc.sources = rc.sources[:len(rc.sources)-1] +} + +func (rc *routeContainer) getSourceIndex(src interface{}) int { + for i := range rc.sources { + if rc.sources[i] == src { + return i + } + } + + return -1 +} + +func (rc *routeContainer) srcCount() int { + return len(rc.sources) +} diff --git a/routingtable/vrf/metrics.go b/routingtable/vrf/metrics.go index 5e4dd53e867982c39deb9f80583524b4b9314eb9..f28cb379b19d91daca70dfc6f106780fd24f0906 100644 --- a/routingtable/vrf/metrics.go +++ b/routingtable/vrf/metrics.go @@ -21,6 +21,7 @@ func Metrics(r *VRFRegistry) []*metrics.VRFMetrics { func MetricsForVRF(v *VRF) *metrics.VRFMetrics { m := &metrics.VRFMetrics{ Name: v.Name(), + RD: v.RD(), RIBs: make([]*metrics.RIBMetrics, 0), } diff --git a/routingtable/vrf/metrics/vrf_metrics.go b/routingtable/vrf/metrics/vrf_metrics.go index 3c5ba2c0e37c293bb2467c86594f1e6222df05e2..088a665b154c41efae1ced26a6c701542568cc1c 100644 --- a/routingtable/vrf/metrics/vrf_metrics.go +++ b/routingtable/vrf/metrics/vrf_metrics.go @@ -5,6 +5,9 @@ type VRFMetrics struct { // Name of the VRF Name string + // RD is the route distinguisher + RD uint64 + // RIBs returns the RIB specific metrics RIBs []*RIBMetrics } diff --git a/routingtable/vrf/metrics_test.go b/routingtable/vrf/metrics_test.go index 4c22b147eadbea2de0cc4abeabde144d615f76a3..2946f8185afc2c2f69bb498ff9869002be2baf5e 100644 --- a/routingtable/vrf/metrics_test.go +++ b/routingtable/vrf/metrics_test.go @@ -26,6 +26,7 @@ func TestMetrics(t *testing.T) { expected := []*metrics.VRFMetrics{ { Name: "green", + RD: 0, RIBs: []*metrics.RIBMetrics{ { Name: "inet.0", @@ -43,6 +44,7 @@ func TestMetrics(t *testing.T) { }, { Name: "red", + RD: 1, RIBs: []*metrics.RIBMetrics{ { Name: "inet.0", diff --git a/util/grpc/clientmanager/clientmanager.go b/util/grpc/clientmanager/clientmanager.go new file mode 100644 index 0000000000000000000000000000000000000000..ef01eb70575f4913e5fe2ee3174630f0b2381f25 --- /dev/null +++ b/util/grpc/clientmanager/clientmanager.go @@ -0,0 +1,69 @@ +package clientmanager + +import ( + "fmt" + "sync" + + "github.com/bio-routing/bio-rd/util/grpc/clientmanager/metrics" + "github.com/pkg/errors" + "google.golang.org/grpc" +) + +// ClientManager manages GRPC client connections +type ClientManager struct { + connections map[string]*grpc.ClientConn + connectionsMu sync.RWMutex +} + +// New creates a new ClientManager +func New() *ClientManager { + return &ClientManager{ + connections: make(map[string]*grpc.ClientConn), + } +} + +// Get gets a target connection +func (cm *ClientManager) Get(target string) *grpc.ClientConn { + cm.connectionsMu.RLock() + defer cm.connectionsMu.RUnlock() + + if _, exists := cm.connections[target]; !exists { + return nil + } + + return cm.connections[target] +} + +// Add adds a target +func (cm *ClientManager) Add(target string, opts ...grpc.DialOption) error { + cm.connectionsMu.Lock() + defer cm.connectionsMu.Unlock() + + if _, exists := cm.connections[target]; exists { + return fmt.Errorf("Target exists already") + } + + cc, err := grpc.Dial(target, opts...) + if err != nil { + return errors.Wrap(err, "grpc.Dial failed") + } + + cm.connections[target] = cc + return nil +} + +// Metrics gets ClientManager metrics +func (cm *ClientManager) Metrics() *metrics.ClientManagerMetrics { + ret := metrics.New() + cm.connectionsMu.RLock() + defer cm.connectionsMu.RUnlock() + + for t, c := range cm.connections { + ret.Connections = append(ret.Connections, &metrics.GRPCConnectionMetrics{ + Target: t, + State: int(c.GetState()), + }) + } + + return ret +} diff --git a/util/grpc/clientmanager/metrics/clientmanager_metrics.go b/util/grpc/clientmanager/metrics/clientmanager_metrics.go new file mode 100644 index 0000000000000000000000000000000000000000..01cf5b63cb054f601aefc03ed49b6990c8323385 --- /dev/null +++ b/util/grpc/clientmanager/metrics/clientmanager_metrics.go @@ -0,0 +1,19 @@ +package metrics + +// ClientManagerMetrics provides metrics for a single ClientManager instance +type ClientManagerMetrics struct { + Connections []*GRPCConnectionMetrics +} + +// New returns ClientManagerMetrics +func New() *ClientManagerMetrics { + return &ClientManagerMetrics{ + Connections: make([]*GRPCConnectionMetrics, 0), + } +} + +// GRPCConnectionMetrics represents metrics of an GRPC connection +type GRPCConnectionMetrics struct { + Target string + State int +}