Skip to content
Snippets Groups Projects
bmp_router.go 11.1 KiB
Newer Older
  • Learn to ignore specific revisions
  • package server
    
    import (
    	"bytes"
    	"fmt"
    	"net"
    	"sync"
    
    takt's avatar
    takt committed
    	"sync/atomic"
    
    takt's avatar
    takt committed
    	"github.com/pkg/errors"
    	log "github.com/sirupsen/logrus"
    
    
    	bnet "github.com/bio-routing/bio-rd/net"
    	"github.com/bio-routing/bio-rd/protocols/bgp/packet"
    	bmppkt "github.com/bio-routing/bio-rd/protocols/bmp/packet"
    	"github.com/bio-routing/bio-rd/routingtable"
    	"github.com/bio-routing/bio-rd/routingtable/filter"
    
    takt's avatar
    takt committed
    	"github.com/bio-routing/bio-rd/routingtable/vrf"
    
    Julian Kornberger's avatar
    Julian Kornberger committed
    	"github.com/bio-routing/tflow2/convert"
    
    takt's avatar
    takt committed
    type Router struct {
    
    	name             string
    
    takt's avatar
    takt committed
    	nameMu           sync.RWMutex
    
    	address          net.IP
    	port             uint16
    	con              net.Conn
    	reconnectTimeMin int
    	reconnectTimeMax int
    	reconnectTime    int
    
    Oliver Herms's avatar
    Oliver Herms committed
    	dialTimeout      time.Duration
    
    	reconnectTimer   *time.Timer
    
    takt's avatar
    takt committed
    	vrfRegistry      *vrf.VRFRegistry
    	neighborManager  *neighborManager
    
    	logger           *log.Logger
    	runMu            sync.Mutex
    	stop             chan struct{}
    
    
    	ribClients   map[afiClient]struct{}
    	ribClientsMu sync.Mutex
    
    takt's avatar
    takt committed
    
    	counters routerCounters
    }
    
    type routerCounters struct {
    	routeMonitoringMessages      uint64
    	statisticsReportMessages     uint64
    	peerDownNotificationMessages uint64
    	peerUpNotificationMessages   uint64
    	initiationMessages           uint64
    	terminationMessages          uint64
    	routeMirroringMessages       uint64
    
    }
    
    type neighbor struct {
    
    takt's avatar
    takt committed
    	vrfID       uint64
    	peerAddress [16]byte
    
    Oliver Herms's avatar
    Oliver Herms committed
    	localAS     uint32
    	peerAS      uint32
    	routerID    uint32
    	fsm         *FSM
    	opt         *packet.DecodeOptions
    
    takt's avatar
    takt committed
    func newRouter(addr net.IP, port uint16) *Router {
    	return &Router{
    
    		address:          addr,
    		port:             port,
    		reconnectTimeMin: 30,  // Suggested by RFC 7854
    		reconnectTimeMax: 720, // Suggested by RFC 7854
    		reconnectTimer:   time.NewTimer(time.Duration(0)),
    
    Oliver Herms's avatar
    Oliver Herms committed
    		dialTimeout:      time.Second * 5,
    
    takt's avatar
    takt committed
    		vrfRegistry:      vrf.NewVRFRegistry(),
    		neighborManager:  newNeighborManager(),
    
    		logger:           log.New(),
    		stop:             make(chan struct{}),
    
    		ribClients:       make(map[afiClient]struct{}),
    	}
    }
    
    
    takt's avatar
    takt committed
    func (r *Router) GetVRF(rd uint64) *vrf.VRF {
    	return r.vrfRegistry.GetVRFByRD(rd)
    
    takt's avatar
    takt committed
    func (r *Router) GetVRFs() []*vrf.VRF {
    	return r.vrfRegistry.List()
    }
    
    takt's avatar
    takt committed
    func (r *Router) Name() string {
    	r.nameMu.RLock()
    	defer r.nameMu.RUnlock()
    	return r.name
    
    takt's avatar
    takt committed
    func (r *Router) serve(con net.Conn) {
    
    	r.con = con
    
    	r.runMu.Lock()
    	defer r.con.Close()
    	defer r.runMu.Unlock()
    
    
    		select {
    		case <-r.stop:
    			return
    		default:
    		}
    
    
    		msg, err := recvBMPMsg(r.con)
    		if err != nil {
    
    			r.logger.Errorf("Unable to get message: %v", err)
    
    takt's avatar
    takt committed
    		r.processMsg(msg)
    	}
    }
    
    takt's avatar
    takt committed
    func (r *Router) processMsg(msg []byte) {
    	bmpMsg, err := bmppkt.Decode(msg)
    	if err != nil {
    		r.logger.Errorf("Unable to decode BMP message: %v", err)
    		return
    	}
    
    	switch bmpMsg.MsgType() {
    	case bmppkt.PeerUpNotificationType:
    		err = r.processPeerUpNotification(bmpMsg.(*bmppkt.PeerUpNotification))
    		if err != nil {
    			r.logger.Errorf("Unable to process peer up notification: %v", err)
    
    takt's avatar
    takt committed
    	case bmppkt.PeerDownNotificationType:
    		r.processPeerDownNotification(bmpMsg.(*bmppkt.PeerDownNotification))
    	case bmppkt.InitiationMessageType:
    		r.processInitiationMsg(bmpMsg.(*bmppkt.InitiationMessage))
    	case bmppkt.TerminationMessageType:
    		r.processTerminationMsg(bmpMsg.(*bmppkt.TerminationMessage))
    		return
    	case bmppkt.RouteMonitoringType:
    		r.processRouteMonitoringMsg(bmpMsg.(*bmppkt.RouteMonitoringMsg))
    	case bmppkt.RouteMirroringMessageType:
    		atomic.AddUint64(&r.counters.routeMirroringMessages, 1)
    
    takt's avatar
    takt committed
    func (r *Router) processRouteMonitoringMsg(msg *bmppkt.RouteMonitoringMsg) {
    	atomic.AddUint64(&r.counters.routeMonitoringMessages, 1)
    
    takt's avatar
    takt committed
    	n := r.neighborManager.getNeighbor(msg.PerPeerHeader.PeerDistinguisher, msg.PerPeerHeader.PeerAddress)
    	if n == nil {
    		r.logger.Errorf("Received route monitoring message for non-existent neighbor %d/%v on %s", msg.PerPeerHeader.PeerDistinguisher, msg.PerPeerHeader.PeerAddress, r.address.String())
    
    		return
    	}
    
    	s := n.fsm.state.(*establishedState)
    
    Oliver Herms's avatar
    Oliver Herms committed
    	s.msgReceived(msg.BGPUpdate, s.fsm.decodeOptions())
    
    takt's avatar
    takt committed
    func (r *Router) processInitiationMsg(msg *bmppkt.InitiationMessage) {
    	atomic.AddUint64(&r.counters.initiationMessages, 1)
    
    	r.nameMu.Lock()
    	defer r.nameMu.Unlock()
    
    
    	const (
    		stringType   = 0
    		sysDescrType = 1
    		sysNameType  = 2
    	)
    
    	logMsg := fmt.Sprintf("Received initiation message from %s:", r.address.String())
    
    	for _, tlv := range msg.TLVs {
    		switch tlv.InformationType {
    		case stringType:
    			logMsg += fmt.Sprintf(" Message: %q", string(tlv.Information))
    		case sysDescrType:
    			logMsg += fmt.Sprintf(" sysDescr.: %s", string(tlv.Information))
    		case sysNameType:
    
    			r.name = string(tlv.Information)
    
    			logMsg += fmt.Sprintf(" sysName.: %s", string(tlv.Information))
    		}
    	}
    
    
    	r.logger.Info(logMsg)
    
    takt's avatar
    takt committed
    func (r *Router) processTerminationMsg(msg *bmppkt.TerminationMessage) {
    
    	const (
    		stringType = 0
    		reasonType = 1
    
    		adminDown     = 0
    		unspecReason  = 1
    		outOfRes      = 2
    		redundantCon  = 3
    		permAdminDown = 4
    	)
    
    
    takt's avatar
    takt committed
    	atomic.AddUint64(&r.counters.terminationMessages, 1)
    
    	logMsg := fmt.Sprintf("Received termination message from %s: ", r.address.String())
    	for _, tlv := range msg.TLVs {
    		switch tlv.InformationType {
    		case stringType:
    			logMsg += fmt.Sprintf("Message: %q", string(tlv.Information))
    		case reasonType:
    
    takt's avatar
    takt committed
    			reason := convert.Uint16b(tlv.Information[:1])
    
    			switch reason {
    			case adminDown:
    				logMsg += "Session administratively down"
    			case unspecReason:
    
    takt's avatar
    takt committed
    				logMsg += "Unspecified reason"
    
    			case outOfRes:
    				logMsg += "Out of resources"
    			case redundantCon:
    				logMsg += "Redundant connection"
    			case permAdminDown:
    				logMsg += "Session permanently administratively closed"
    			}
    		}
    	}
    
    
    	r.logger.Warning(logMsg)
    
    
    	r.con.Close()
    
    takt's avatar
    takt committed
    	r.neighborManager.disposeAll()
    
    takt's avatar
    takt committed
    func (r *Router) processPeerDownNotification(msg *bmppkt.PeerDownNotification) {
    	r.logger.WithFields(log.Fields{
    		"address":            r.address.String(),
    		"router":             r.name,
    		"peer_distinguisher": msg.PerPeerHeader.PeerDistinguisher,
    		"peer_address":       addrToNetIP(msg.PerPeerHeader.PeerAddress).String(),
    	}).Infof("peer down notification received")
    	atomic.AddUint64(&r.counters.peerDownNotificationMessages, 1)
    
    takt's avatar
    takt committed
    	err := r.neighborManager.neighborDown(msg.PerPeerHeader.PeerDistinguisher, msg.PerPeerHeader.PeerAddress)
    	if err != nil {
    		r.logger.Errorf("Failed to process peer down notification: %v", err)
    
    takt's avatar
    takt committed
    func (r *Router) processPeerUpNotification(msg *bmppkt.PeerUpNotification) error {
    	atomic.AddUint64(&r.counters.peerUpNotificationMessages, 1)
    	r.logger.WithFields(log.Fields{
    		"address":            r.address.String(),
    		"router":             r.name,
    		"peer_distinguisher": msg.PerPeerHeader.PeerDistinguisher,
    		"peer_address":       addrToNetIP(msg.PerPeerHeader.PeerAddress).String(),
    	}).Infof("peer up notification received")
    
    
    	if len(msg.SentOpenMsg) < packet.MinOpenLen {
    		return fmt.Errorf("Received peer up notification for %v: Invalid sent open message: %v", msg.PerPeerHeader.PeerAddress, msg.SentOpenMsg)
    	}
    
    
    Oliver Herms's avatar
    Oliver Herms committed
    	sentOpen, err := packet.DecodeOpenMsg(bytes.NewBuffer(msg.SentOpenMsg[packet.HeaderLen:]))
    
    	if err != nil {
    
    Julian Kornberger's avatar
    Julian Kornberger committed
    		return errors.Wrapf(err, "Unable to decode sent open message sent from %v to %v", r.address.String(), msg.PerPeerHeader.PeerAddress)
    
    	}
    
    	if len(msg.ReceivedOpenMsg) < packet.MinOpenLen {
    		return fmt.Errorf("Received peer up notification for %v: Invalid received open message: %v", msg.PerPeerHeader.PeerAddress, msg.ReceivedOpenMsg)
    	}
    
    
    Oliver Herms's avatar
    Oliver Herms committed
    	recvOpen, err := packet.DecodeOpenMsg(bytes.NewBuffer(msg.ReceivedOpenMsg[packet.HeaderLen:]))
    
    	if err != nil {
    
    Julian Kornberger's avatar
    Julian Kornberger committed
    		return errors.Wrapf(err, "Unable to decode received open message sent from %v to %v", msg.PerPeerHeader.PeerAddress, r.address.String())
    
    	}
    
    	addrLen := net.IPv4len
    
    	if msg.PerPeerHeader.GetIPVersion() == 6 {
    
    		addrLen = net.IPv6len
    	}
    
    	// bnet.IPFromBytes can only fail if length of argument is not 4 or 16. However, length is ensured here.
    	peerAddress, _ := bnet.IPFromBytes(msg.PerPeerHeader.PeerAddress[16-addrLen:])
    	localAddress, _ := bnet.IPFromBytes(msg.LocalAddress[16-addrLen:])
    
    	fsm := &FSM{
    		isBMP: true,
    		peer: &peer{
    
    			routerID:  sentOpen.BGPIdentifier,
    
    takt's avatar
    takt committed
    			addr:      peerAddress.Dedup(),
    			localAddr: localAddress.Dedup(),
    
    			peerASN:   msg.PerPeerHeader.PeerAS,
    			localASN:  uint32(sentOpen.ASN),
    			ipv4:      &peerAddressFamily{},
    			ipv6:      &peerAddressFamily{},
    
    takt's avatar
    takt committed
    			vrf:       r.vrfRegistry.CreateVRFIfNotExists(fmt.Sprintf("%d", msg.PerPeerHeader.PeerDistinguisher), msg.PerPeerHeader.PeerDistinguisher),
    
    takt's avatar
    takt committed
    	fsm.peer.fsms = []*FSM{
    		fsm,
    	}
    
    
    	fsm.peer.configureBySentOpen(sentOpen)
    
    
    takt's avatar
    takt committed
    	rib4, found := fsm.peer.vrf.RIBByName("inet.0")
    	if !found {
    		return fmt.Errorf("Unable to get inet RIB")
    	}
    
    	fsm.ipv4Unicast = newFSMAddressFamily(packet.IPv4AFI, packet.UnicastSAFI, &peerAddressFamily{
    
    		rib:               rib4,
    		importFilterChain: filter.NewAcceptAllFilterChain(),
    
    	}, fsm)
    	fsm.ipv4Unicast.bmpInit()
    
    
    takt's avatar
    takt committed
    	rib6, found := fsm.peer.vrf.RIBByName("inet6.0")
    	if !found {
    		return fmt.Errorf("Unable to get inet6 RIB")
    	}
    
    
    Oliver Herms's avatar
    Oliver Herms committed
    	fsm.ipv6Unicast = newFSMAddressFamily(packet.IPv6AFI, packet.UnicastSAFI, &peerAddressFamily{
    
    		rib:               rib6,
    		importFilterChain: filter.NewAcceptAllFilterChain(),
    
    	}, fsm)
    
    Oliver Herms's avatar
    Oliver Herms committed
    	fsm.ipv6Unicast.bmpInit()
    
    
    	fsm.state = newOpenSentState(fsm)
    	openSent := fsm.state.(*openSentState)
    	openSent.openMsgReceived(recvOpen)
    
    	fsm.state = newEstablishedState(fsm)
    	n := &neighbor{
    
    takt's avatar
    takt committed
    		vrfID:       msg.PerPeerHeader.PeerDistinguisher,
    
    Oliver Herms's avatar
    Oliver Herms committed
    		localAS:     fsm.peer.localASN,
    		peerAS:      msg.PerPeerHeader.PeerAS,
    		peerAddress: msg.PerPeerHeader.PeerAddress,
    		routerID:    recvOpen.BGPIdentifier,
    		fsm:         fsm,
    		opt:         fsm.decodeOptions(),
    
    takt's avatar
    takt committed
    	err = r.neighborManager.addNeighbor(n)
    	if err != nil {
    		return errors.Wrap(err, "Unable to add neighbor")
    	}
    
    
    	r.ribClientsMu.Lock()
    	defer r.ribClientsMu.Unlock()
    	n.registerClients(r.ribClients)
    
    
    func (n *neighbor) registerClients(clients map[afiClient]struct{}) {
    	for ac := range clients {
    		if ac.afi == packet.IPv4AFI {
    
    Oliver Herms's avatar
    Oliver Herms committed
    			n.fsm.ipv4Unicast.adjRIBIn.Register(ac.client)
    
    		}
    		if ac.afi == packet.IPv6AFI {
    
    Oliver Herms's avatar
    Oliver Herms committed
    			n.fsm.ipv6Unicast.adjRIBIn.Register(ac.client)
    
    func (p *peer) configureBySentOpen(msg *packet.BGPOpen) {
    	caps := getCaps(msg.OptParams)
    	for _, cap := range caps {
    		switch cap.Code {
    		case packet.AddPathCapabilityCode:
    			addPathCap := cap.Value.(packet.AddPathCapability)
    			peerFamily := p.addressFamily(addPathCap.AFI, addPathCap.SAFI)
    			if peerFamily == nil {
    				continue
    			}
    			switch addPathCap.SendReceive {
    			case packet.AddPathSend:
    				peerFamily.addPathSend = routingtable.ClientOptions{
    					MaxPaths: 10,
    				}
    			case packet.AddPathReceive:
    				peerFamily.addPathReceive = true
    			case packet.AddPathSendReceive:
    				peerFamily.addPathReceive = true
    				peerFamily.addPathSend = routingtable.ClientOptions{
    					MaxPaths: 10,
    				}
    			}
    		case packet.ASN4CapabilityCode:
    			asn4Cap := cap.Value.(packet.ASN4Capability)
    			p.localASN = asn4Cap.ASN4
    		}
    	}
    }
    
    func getCaps(optParams []packet.OptParam) packet.Capabilities {
    	for _, optParam := range optParams {
    		if optParam.Type != packet.CapabilitiesParamType {
    			continue
    		}
    
    		return optParam.Value.(packet.Capabilities)
    	}
    	return nil
    }
    
    takt's avatar
    takt committed
    
    func addrToNetIP(a [16]byte) net.IP {
    	for i := 0; i < 12; i++ {
    		if a[i] != 0 {
    			return net.IP(a[:])
    		}
    	}
    
    	return net.IP(a[12:])
    }