Skip to content
Snippets Groups Projects
bmp_router.go 11.6 KiB
Newer Older
  • Learn to ignore specific revisions
  • package server
    
    import (
    	"bytes"
    	"fmt"
    	"net"
    	"sync"
    
    takt's avatar
    takt committed
    	"sync/atomic"
    
    takt's avatar
    takt committed
    	"github.com/pkg/errors"
    	log "github.com/sirupsen/logrus"
    
    
    	bnet "github.com/bio-routing/bio-rd/net"
    	"github.com/bio-routing/bio-rd/protocols/bgp/packet"
    	bmppkt "github.com/bio-routing/bio-rd/protocols/bmp/packet"
    	"github.com/bio-routing/bio-rd/routingtable"
    	"github.com/bio-routing/bio-rd/routingtable/filter"
    
    takt's avatar
    takt committed
    	"github.com/bio-routing/bio-rd/routingtable/vrf"
    
    Julian Kornberger's avatar
    Julian Kornberger committed
    	"github.com/bio-routing/tflow2/convert"
    
    // Router represents a BMP enabled route in BMP context
    
    takt's avatar
    takt committed
    type Router struct {
    
    	name             string
    
    takt's avatar
    takt committed
    	nameMu           sync.RWMutex
    
    	address          net.IP
    	port             uint16
    	con              net.Conn
    
    	established      uint32
    
    	reconnectTimeMin int
    	reconnectTimeMax int
    	reconnectTime    int
    
    Oliver Herms's avatar
    Oliver Herms committed
    	dialTimeout      time.Duration
    
    	reconnectTimer   *time.Timer
    
    takt's avatar
    takt committed
    	vrfRegistry      *vrf.VRFRegistry
    	neighborManager  *neighborManager
    
    	logger           *log.Logger
    	runMu            sync.Mutex
    	stop             chan struct{}
    
    
    	ribClients   map[afiClient]struct{}
    	ribClientsMu sync.Mutex
    
    takt's avatar
    takt committed
    
    	counters routerCounters
    }
    
    type routerCounters struct {
    	routeMonitoringMessages      uint64
    	statisticsReportMessages     uint64
    	peerDownNotificationMessages uint64
    	peerUpNotificationMessages   uint64
    	initiationMessages           uint64
    	terminationMessages          uint64
    	routeMirroringMessages       uint64
    
    }
    
    type neighbor struct {
    
    takt's avatar
    takt committed
    	vrfID       uint64
    	peerAddress [16]byte
    
    Oliver Herms's avatar
    Oliver Herms committed
    	localAS     uint32
    	peerAS      uint32
    	routerID    uint32
    	fsm         *FSM
    	opt         *packet.DecodeOptions
    
    takt's avatar
    takt committed
    func newRouter(addr net.IP, port uint16) *Router {
    	return &Router{
    
    		address:          addr,
    		port:             port,
    		reconnectTimeMin: 30,  // Suggested by RFC 7854
    		reconnectTimeMax: 720, // Suggested by RFC 7854
    		reconnectTimer:   time.NewTimer(time.Duration(0)),
    
    Oliver Herms's avatar
    Oliver Herms committed
    		dialTimeout:      time.Second * 5,
    
    takt's avatar
    takt committed
    		vrfRegistry:      vrf.NewVRFRegistry(),
    		neighborManager:  newNeighborManager(),
    
    		logger:           log.New(),
    		stop:             make(chan struct{}),
    
    		ribClients:       make(map[afiClient]struct{}),
    	}
    }
    
    
    // GetVRF get's a VRF
    
    takt's avatar
    takt committed
    func (r *Router) GetVRF(rd uint64) *vrf.VRF {
    	return r.vrfRegistry.GetVRFByRD(rd)
    
    // GetVRFs gets all VRFs
    
    takt's avatar
    takt committed
    func (r *Router) GetVRFs() []*vrf.VRF {
    	return r.vrfRegistry.List()
    }
    
    // Name gets a routers name
    
    takt's avatar
    takt committed
    func (r *Router) Name() string {
    	r.nameMu.RLock()
    	defer r.nameMu.RUnlock()
    	return r.name
    
    // Address gets a routers address
    func (r *Router) Address() net.IP {
    	return r.address
    }
    
    
    func (r *Router) serve(con net.Conn) error {
    
    	r.con = con
    
    	r.runMu.Lock()
    	defer r.con.Close()
    	defer r.runMu.Unlock()
    
    
    		select {
    		case <-r.stop:
    
    			return nil
    
    		msg, err := recvBMPMsg(r.con)
    		if err != nil {
    
    			return errors.Wrap(err, "Unable to get message")
    
    takt's avatar
    takt committed
    		r.processMsg(msg)
    	}
    }
    
    takt's avatar
    takt committed
    func (r *Router) processMsg(msg []byte) {
    	bmpMsg, err := bmppkt.Decode(msg)
    	if err != nil {
    		r.logger.Errorf("Unable to decode BMP message: %v", err)
    		return
    	}
    
    	switch bmpMsg.MsgType() {
    	case bmppkt.PeerUpNotificationType:
    		err = r.processPeerUpNotification(bmpMsg.(*bmppkt.PeerUpNotification))
    		if err != nil {
    			r.logger.Errorf("Unable to process peer up notification: %v", err)
    
    takt's avatar
    takt committed
    	case bmppkt.PeerDownNotificationType:
    		r.processPeerDownNotification(bmpMsg.(*bmppkt.PeerDownNotification))
    	case bmppkt.InitiationMessageType:
    		r.processInitiationMsg(bmpMsg.(*bmppkt.InitiationMessage))
    	case bmppkt.TerminationMessageType:
    		r.processTerminationMsg(bmpMsg.(*bmppkt.TerminationMessage))
    		return
    	case bmppkt.RouteMonitoringType:
    		r.processRouteMonitoringMsg(bmpMsg.(*bmppkt.RouteMonitoringMsg))
    	case bmppkt.RouteMirroringMessageType:
    		atomic.AddUint64(&r.counters.routeMirroringMessages, 1)
    
    takt's avatar
    takt committed
    func (r *Router) processRouteMonitoringMsg(msg *bmppkt.RouteMonitoringMsg) {
    	atomic.AddUint64(&r.counters.routeMonitoringMessages, 1)
    
    takt's avatar
    takt committed
    	n := r.neighborManager.getNeighbor(msg.PerPeerHeader.PeerDistinguisher, msg.PerPeerHeader.PeerAddress)
    	if n == nil {
    		r.logger.Errorf("Received route monitoring message for non-existent neighbor %d/%v on %s", msg.PerPeerHeader.PeerDistinguisher, msg.PerPeerHeader.PeerAddress, r.address.String())
    
    		return
    	}
    
    	s := n.fsm.state.(*establishedState)
    
    Oliver Herms's avatar
    Oliver Herms committed
    	opt := s.fsm.decodeOptions()
    	opt.Use32BitASN = !msg.PerPeerHeader.GetAFlag()
    	s.msgReceived(msg.BGPUpdate, opt)
    
    takt's avatar
    takt committed
    func (r *Router) processInitiationMsg(msg *bmppkt.InitiationMessage) {
    	atomic.AddUint64(&r.counters.initiationMessages, 1)
    
    	r.nameMu.Lock()
    	defer r.nameMu.Unlock()
    
    
    	const (
    		stringType   = 0
    		sysDescrType = 1
    		sysNameType  = 2
    	)
    
    	logMsg := fmt.Sprintf("Received initiation message from %s:", r.address.String())
    
    	for _, tlv := range msg.TLVs {
    		switch tlv.InformationType {
    		case stringType:
    			logMsg += fmt.Sprintf(" Message: %q", string(tlv.Information))
    		case sysDescrType:
    			logMsg += fmt.Sprintf(" sysDescr.: %s", string(tlv.Information))
    		case sysNameType:
    
    			r.name = string(tlv.Information)
    
    			logMsg += fmt.Sprintf(" sysName.: %s", string(tlv.Information))
    		}
    	}
    
    
    	r.logger.Info(logMsg)
    
    takt's avatar
    takt committed
    func (r *Router) processTerminationMsg(msg *bmppkt.TerminationMessage) {
    
    	const (
    		stringType = 0
    		reasonType = 1
    
    		adminDown     = 0
    		unspecReason  = 1
    		outOfRes      = 2
    		redundantCon  = 3
    		permAdminDown = 4
    	)
    
    
    takt's avatar
    takt committed
    	atomic.AddUint64(&r.counters.terminationMessages, 1)
    
    	logMsg := fmt.Sprintf("Received termination message from %s: ", r.address.String())
    	for _, tlv := range msg.TLVs {
    		switch tlv.InformationType {
    		case stringType:
    			logMsg += fmt.Sprintf("Message: %q", string(tlv.Information))
    		case reasonType:
    
    takt's avatar
    takt committed
    			reason := convert.Uint16b(tlv.Information[:1])
    
    			switch reason {
    			case adminDown:
    				logMsg += "Session administratively down"
    			case unspecReason:
    
    takt's avatar
    takt committed
    				logMsg += "Unspecified reason"
    
    			case outOfRes:
    				logMsg += "Out of resources"
    			case redundantCon:
    				logMsg += "Redundant connection"
    			case permAdminDown:
    				logMsg += "Session permanently administratively closed"
    			}
    		}
    	}
    
    
    	r.logger.Warning(logMsg)
    
    
    	r.con.Close()
    
    takt's avatar
    takt committed
    	r.neighborManager.disposeAll()
    
    takt's avatar
    takt committed
    func (r *Router) processPeerDownNotification(msg *bmppkt.PeerDownNotification) {
    	r.logger.WithFields(log.Fields{
    		"address":            r.address.String(),
    		"router":             r.name,
    
    		"peer_distinguisher": vrf.RouteDistinguisherHumanReadable(msg.PerPeerHeader.PeerDistinguisher),
    
    takt's avatar
    takt committed
    		"peer_address":       addrToNetIP(msg.PerPeerHeader.PeerAddress).String(),
    	}).Infof("peer down notification received")
    	atomic.AddUint64(&r.counters.peerDownNotificationMessages, 1)
    
    takt's avatar
    takt committed
    	err := r.neighborManager.neighborDown(msg.PerPeerHeader.PeerDistinguisher, msg.PerPeerHeader.PeerAddress)
    	if err != nil {
    		r.logger.Errorf("Failed to process peer down notification: %v", err)
    
    takt's avatar
    takt committed
    func (r *Router) processPeerUpNotification(msg *bmppkt.PeerUpNotification) error {
    	atomic.AddUint64(&r.counters.peerUpNotificationMessages, 1)
    	r.logger.WithFields(log.Fields{
    		"address":            r.address.String(),
    		"router":             r.name,
    
    		"peer_distinguisher": vrf.RouteDistinguisherHumanReadable(msg.PerPeerHeader.PeerDistinguisher),
    
    takt's avatar
    takt committed
    		"peer_address":       addrToNetIP(msg.PerPeerHeader.PeerAddress).String(),
    	}).Infof("peer up notification received")
    
    
    	if len(msg.SentOpenMsg) < packet.MinOpenLen {
    		return fmt.Errorf("Received peer up notification for %v: Invalid sent open message: %v", msg.PerPeerHeader.PeerAddress, msg.SentOpenMsg)
    	}
    
    
    Oliver Herms's avatar
    Oliver Herms committed
    	sentOpen, err := packet.DecodeOpenMsg(bytes.NewBuffer(msg.SentOpenMsg[packet.HeaderLen:]))
    
    	if err != nil {
    
    Julian Kornberger's avatar
    Julian Kornberger committed
    		return errors.Wrapf(err, "Unable to decode sent open message sent from %v to %v", r.address.String(), msg.PerPeerHeader.PeerAddress)
    
    	}
    
    	if len(msg.ReceivedOpenMsg) < packet.MinOpenLen {
    		return fmt.Errorf("Received peer up notification for %v: Invalid received open message: %v", msg.PerPeerHeader.PeerAddress, msg.ReceivedOpenMsg)
    	}
    
    
    Oliver Herms's avatar
    Oliver Herms committed
    	recvOpen, err := packet.DecodeOpenMsg(bytes.NewBuffer(msg.ReceivedOpenMsg[packet.HeaderLen:]))
    
    	if err != nil {
    
    Julian Kornberger's avatar
    Julian Kornberger committed
    		return errors.Wrapf(err, "Unable to decode received open message sent from %v to %v", msg.PerPeerHeader.PeerAddress, r.address.String())
    
    	}
    
    	addrLen := net.IPv4len
    
    	if msg.PerPeerHeader.GetIPVersion() == 6 {
    
    		addrLen = net.IPv6len
    	}
    
    	// bnet.IPFromBytes can only fail if length of argument is not 4 or 16. However, length is ensured here.
    	peerAddress, _ := bnet.IPFromBytes(msg.PerPeerHeader.PeerAddress[16-addrLen:])
    	localAddress, _ := bnet.IPFromBytes(msg.LocalAddress[16-addrLen:])
    
    	fsm := &FSM{
    
    Oliver Herms's avatar
    Oliver Herms committed
    		isBMP: true,
    
    		peer: &peer{
    
    			routerID:  sentOpen.BGPIdentifier,
    
    takt's avatar
    takt committed
    			addr:      peerAddress.Dedup(),
    			localAddr: localAddress.Dedup(),
    
    			peerASN:   msg.PerPeerHeader.PeerAS,
    			localASN:  uint32(sentOpen.ASN),
    			ipv4:      &peerAddressFamily{},
    			ipv6:      &peerAddressFamily{},
    
    takt's avatar
    takt committed
    			vrf:       r.vrfRegistry.CreateVRFIfNotExists(fmt.Sprintf("%d", msg.PerPeerHeader.PeerDistinguisher), msg.PerPeerHeader.PeerDistinguisher),
    
    takt's avatar
    takt committed
    	fsm.peer.fsms = []*FSM{
    		fsm,
    	}
    
    
    	fsm.peer.configureBySentOpen(sentOpen)
    
    
    takt's avatar
    takt committed
    	rib4, found := fsm.peer.vrf.RIBByName("inet.0")
    	if !found {
    		return fmt.Errorf("Unable to get inet RIB")
    	}
    
    	fsm.ipv4Unicast = newFSMAddressFamily(packet.IPv4AFI, packet.UnicastSAFI, &peerAddressFamily{
    
    		rib:               rib4,
    		importFilterChain: filter.NewAcceptAllFilterChain(),
    
    	}, fsm)
    	fsm.ipv4Unicast.bmpInit()
    
    
    takt's avatar
    takt committed
    	rib6, found := fsm.peer.vrf.RIBByName("inet6.0")
    	if !found {
    		return fmt.Errorf("Unable to get inet6 RIB")
    	}
    
    
    Oliver Herms's avatar
    Oliver Herms committed
    	fsm.ipv6Unicast = newFSMAddressFamily(packet.IPv6AFI, packet.UnicastSAFI, &peerAddressFamily{
    
    		rib:               rib6,
    		importFilterChain: filter.NewAcceptAllFilterChain(),
    
    	}, fsm)
    
    Oliver Herms's avatar
    Oliver Herms committed
    	fsm.ipv6Unicast.bmpInit()
    
    
    	fsm.state = newOpenSentState(fsm)
    	openSent := fsm.state.(*openSentState)
    	openSent.openMsgReceived(recvOpen)
    
    	fsm.state = newEstablishedState(fsm)
    	n := &neighbor{
    
    takt's avatar
    takt committed
    		vrfID:       msg.PerPeerHeader.PeerDistinguisher,
    
    Oliver Herms's avatar
    Oliver Herms committed
    		localAS:     fsm.peer.localASN,
    		peerAS:      msg.PerPeerHeader.PeerAS,
    		peerAddress: msg.PerPeerHeader.PeerAddress,
    		routerID:    recvOpen.BGPIdentifier,
    		fsm:         fsm,
    		opt:         fsm.decodeOptions(),
    
    takt's avatar
    takt committed
    	err = r.neighborManager.addNeighbor(n)
    	if err != nil {
    		return errors.Wrap(err, "Unable to add neighbor")
    	}
    
    
    	r.ribClientsMu.Lock()
    	defer r.ribClientsMu.Unlock()
    	n.registerClients(r.ribClients)
    
    
    func (n *neighbor) registerClients(clients map[afiClient]struct{}) {
    	for ac := range clients {
    		if ac.afi == packet.IPv4AFI {
    
    Oliver Herms's avatar
    Oliver Herms committed
    			n.fsm.ipv4Unicast.adjRIBIn.Register(ac.client)
    
    		}
    		if ac.afi == packet.IPv6AFI {
    
    Oliver Herms's avatar
    Oliver Herms committed
    			n.fsm.ipv6Unicast.adjRIBIn.Register(ac.client)
    
    func (p *peer) configureBySentOpen(msg *packet.BGPOpen) {
    
    takt's avatar
    takt committed
    	capsList := getCaps(msg.OptParams)
    	for _, caps := range capsList {
    		for _, cap := range caps {
    			switch cap.Code {
    			case packet.AddPathCapabilityCode:
    				addPathCap := cap.Value.(packet.AddPathCapability)
    
    				for _, addPathCapTuple := range addPathCap {
    					peerFamily := p.addressFamily(addPathCapTuple.AFI, addPathCapTuple.SAFI)
    					if peerFamily == nil {
    						continue
    
    takt's avatar
    takt committed
    					}
    
    					switch addPathCapTuple.SendReceive {
    					case packet.AddPathSend:
    						peerFamily.addPathSend = routingtable.ClientOptions{
    							MaxPaths: 10,
    						}
    					case packet.AddPathReceive:
    						peerFamily.addPathReceive = true
    					case packet.AddPathSendReceive:
    						peerFamily.addPathReceive = true
    						peerFamily.addPathSend = routingtable.ClientOptions{
    							MaxPaths: 10,
    						}
    
    takt's avatar
    takt committed
    					}
    
    takt's avatar
    takt committed
    func getCaps(optParams []packet.OptParam) []packet.Capabilities {
    	res := make([]packet.Capabilities, 0)
    
    	for _, optParam := range optParams {
    		if optParam.Type != packet.CapabilitiesParamType {
    			continue
    		}
    
    
    takt's avatar
    takt committed
    		res = append(res, optParam.Value.(packet.Capabilities))
    
    takt's avatar
    takt committed
    
    	return res
    
    takt's avatar
    takt committed
    
    func addrToNetIP(a [16]byte) net.IP {
    	for i := 0; i < 12; i++ {
    		if a[i] != 0 {
    			return net.IP(a[:])
    		}
    	}
    
    	return net.IP(a[12:])
    }