Skip to content
Snippets Groups Projects
bmp_router.go 11.4 KiB
Newer Older
package server

import (
	"bytes"
	"fmt"
	"net"
	"sync"
takt's avatar
takt committed
	"sync/atomic"
takt's avatar
takt committed
	"github.com/pkg/errors"
	log "github.com/sirupsen/logrus"

	bnet "github.com/bio-routing/bio-rd/net"
	"github.com/bio-routing/bio-rd/protocols/bgp/packet"
	bmppkt "github.com/bio-routing/bio-rd/protocols/bmp/packet"
	"github.com/bio-routing/bio-rd/routingtable"
	"github.com/bio-routing/bio-rd/routingtable/filter"
takt's avatar
takt committed
	"github.com/bio-routing/bio-rd/routingtable/vrf"
Julian Kornberger's avatar
Julian Kornberger committed
	"github.com/bio-routing/tflow2/convert"
// Router represents a BMP enabled route in BMP context
takt's avatar
takt committed
type Router struct {
	name             string
takt's avatar
takt committed
	nameMu           sync.RWMutex
	address          net.IP
	port             uint16
	con              net.Conn
	established      uint32
	reconnectTimeMin int
	reconnectTimeMax int
	reconnectTime    int
Oliver Herms's avatar
Oliver Herms committed
	dialTimeout      time.Duration
	reconnectTimer   *time.Timer
takt's avatar
takt committed
	vrfRegistry      *vrf.VRFRegistry
	neighborManager  *neighborManager
	logger           *log.Logger
	runMu            sync.Mutex
	stop             chan struct{}

	ribClients   map[afiClient]struct{}
	ribClientsMu sync.Mutex
takt's avatar
takt committed

	counters routerCounters
}

type routerCounters struct {
	routeMonitoringMessages      uint64
	statisticsReportMessages     uint64
	peerDownNotificationMessages uint64
	peerUpNotificationMessages   uint64
	initiationMessages           uint64
	terminationMessages          uint64
	routeMirroringMessages       uint64
}

type neighbor struct {
takt's avatar
takt committed
	vrfID       uint64
	peerAddress [16]byte
Oliver Herms's avatar
Oliver Herms committed
	localAS     uint32
	peerAS      uint32
	routerID    uint32
	fsm         *FSM
	opt         *packet.DecodeOptions
takt's avatar
takt committed
func newRouter(addr net.IP, port uint16) *Router {
	return &Router{
		address:          addr,
		port:             port,
		reconnectTimeMin: 30,  // Suggested by RFC 7854
		reconnectTimeMax: 720, // Suggested by RFC 7854
		reconnectTimer:   time.NewTimer(time.Duration(0)),
Oliver Herms's avatar
Oliver Herms committed
		dialTimeout:      time.Second * 5,
takt's avatar
takt committed
		vrfRegistry:      vrf.NewVRFRegistry(),
		neighborManager:  newNeighborManager(),
		logger:           log.New(),
		stop:             make(chan struct{}),
		ribClients:       make(map[afiClient]struct{}),
	}
}

// GetVRF get's a VRF
takt's avatar
takt committed
func (r *Router) GetVRF(rd uint64) *vrf.VRF {
	return r.vrfRegistry.GetVRFByRD(rd)
// GetVRFs gets all VRFs
takt's avatar
takt committed
func (r *Router) GetVRFs() []*vrf.VRF {
	return r.vrfRegistry.List()
}
// Name gets a routers name
takt's avatar
takt committed
func (r *Router) Name() string {
	r.nameMu.RLock()
	defer r.nameMu.RUnlock()
	return r.name
takt's avatar
takt committed
func (r *Router) serve(con net.Conn) {
	r.con = con
	r.runMu.Lock()
	defer r.con.Close()
	defer r.runMu.Unlock()

		select {
		case <-r.stop:
			return
		default:
		}

		msg, err := recvBMPMsg(r.con)
		if err != nil {
			r.logger.Errorf("Unable to get message: %v", err)
takt's avatar
takt committed
		r.processMsg(msg)
	}
}
takt's avatar
takt committed
func (r *Router) processMsg(msg []byte) {
	bmpMsg, err := bmppkt.Decode(msg)
	if err != nil {
		r.logger.Errorf("Unable to decode BMP message: %v", err)
		return
	}

	switch bmpMsg.MsgType() {
	case bmppkt.PeerUpNotificationType:
		err = r.processPeerUpNotification(bmpMsg.(*bmppkt.PeerUpNotification))
		if err != nil {
			r.logger.Errorf("Unable to process peer up notification: %v", err)
takt's avatar
takt committed
	case bmppkt.PeerDownNotificationType:
		r.processPeerDownNotification(bmpMsg.(*bmppkt.PeerDownNotification))
	case bmppkt.InitiationMessageType:
		r.processInitiationMsg(bmpMsg.(*bmppkt.InitiationMessage))
	case bmppkt.TerminationMessageType:
		r.processTerminationMsg(bmpMsg.(*bmppkt.TerminationMessage))
		return
	case bmppkt.RouteMonitoringType:
		r.processRouteMonitoringMsg(bmpMsg.(*bmppkt.RouteMonitoringMsg))
	case bmppkt.RouteMirroringMessageType:
		atomic.AddUint64(&r.counters.routeMirroringMessages, 1)
takt's avatar
takt committed
func (r *Router) processRouteMonitoringMsg(msg *bmppkt.RouteMonitoringMsg) {
	atomic.AddUint64(&r.counters.routeMonitoringMessages, 1)
takt's avatar
takt committed
	n := r.neighborManager.getNeighbor(msg.PerPeerHeader.PeerDistinguisher, msg.PerPeerHeader.PeerAddress)
	if n == nil {
		r.logger.Errorf("Received route monitoring message for non-existent neighbor %d/%v on %s", msg.PerPeerHeader.PeerDistinguisher, msg.PerPeerHeader.PeerAddress, r.address.String())
		return
	}

	s := n.fsm.state.(*establishedState)
Oliver Herms's avatar
Oliver Herms committed
	s.msgReceived(msg.BGPUpdate, s.fsm.decodeOptions())
takt's avatar
takt committed
func (r *Router) processInitiationMsg(msg *bmppkt.InitiationMessage) {
	atomic.AddUint64(&r.counters.initiationMessages, 1)

	r.nameMu.Lock()
	defer r.nameMu.Unlock()

	const (
		stringType   = 0
		sysDescrType = 1
		sysNameType  = 2
	)

	logMsg := fmt.Sprintf("Received initiation message from %s:", r.address.String())

	for _, tlv := range msg.TLVs {
		switch tlv.InformationType {
		case stringType:
			logMsg += fmt.Sprintf(" Message: %q", string(tlv.Information))
		case sysDescrType:
			logMsg += fmt.Sprintf(" sysDescr.: %s", string(tlv.Information))
		case sysNameType:
			r.name = string(tlv.Information)
			logMsg += fmt.Sprintf(" sysName.: %s", string(tlv.Information))
		}
	}

	r.logger.Info(logMsg)
takt's avatar
takt committed
func (r *Router) processTerminationMsg(msg *bmppkt.TerminationMessage) {
	const (
		stringType = 0
		reasonType = 1

		adminDown     = 0
		unspecReason  = 1
		outOfRes      = 2
		redundantCon  = 3
		permAdminDown = 4
	)

takt's avatar
takt committed
	atomic.AddUint64(&r.counters.terminationMessages, 1)
	logMsg := fmt.Sprintf("Received termination message from %s: ", r.address.String())
	for _, tlv := range msg.TLVs {
		switch tlv.InformationType {
		case stringType:
			logMsg += fmt.Sprintf("Message: %q", string(tlv.Information))
		case reasonType:
takt's avatar
takt committed
			reason := convert.Uint16b(tlv.Information[:1])
			switch reason {
			case adminDown:
				logMsg += "Session administratively down"
			case unspecReason:
takt's avatar
takt committed
				logMsg += "Unspecified reason"
			case outOfRes:
				logMsg += "Out of resources"
			case redundantCon:
				logMsg += "Redundant connection"
			case permAdminDown:
				logMsg += "Session permanently administratively closed"
			}
		}
	}

	r.logger.Warning(logMsg)

	r.con.Close()
takt's avatar
takt committed
	r.neighborManager.disposeAll()
takt's avatar
takt committed
func (r *Router) processPeerDownNotification(msg *bmppkt.PeerDownNotification) {
	r.logger.WithFields(log.Fields{
		"address":            r.address.String(),
		"router":             r.name,
		"peer_distinguisher": msg.PerPeerHeader.PeerDistinguisher,
		"peer_address":       addrToNetIP(msg.PerPeerHeader.PeerAddress).String(),
	}).Infof("peer down notification received")
	atomic.AddUint64(&r.counters.peerDownNotificationMessages, 1)
takt's avatar
takt committed
	err := r.neighborManager.neighborDown(msg.PerPeerHeader.PeerDistinguisher, msg.PerPeerHeader.PeerAddress)
	if err != nil {
		r.logger.Errorf("Failed to process peer down notification: %v", err)
takt's avatar
takt committed
func (r *Router) processPeerUpNotification(msg *bmppkt.PeerUpNotification) error {
	atomic.AddUint64(&r.counters.peerUpNotificationMessages, 1)
	r.logger.WithFields(log.Fields{
		"address":            r.address.String(),
		"router":             r.name,
		"peer_distinguisher": msg.PerPeerHeader.PeerDistinguisher,
		"peer_address":       addrToNetIP(msg.PerPeerHeader.PeerAddress).String(),
	}).Infof("peer up notification received")

	if len(msg.SentOpenMsg) < packet.MinOpenLen {
		return fmt.Errorf("Received peer up notification for %v: Invalid sent open message: %v", msg.PerPeerHeader.PeerAddress, msg.SentOpenMsg)
	}

Oliver Herms's avatar
Oliver Herms committed
	sentOpen, err := packet.DecodeOpenMsg(bytes.NewBuffer(msg.SentOpenMsg[packet.HeaderLen:]))
	if err != nil {
Julian Kornberger's avatar
Julian Kornberger committed
		return errors.Wrapf(err, "Unable to decode sent open message sent from %v to %v", r.address.String(), msg.PerPeerHeader.PeerAddress)
	}

	if len(msg.ReceivedOpenMsg) < packet.MinOpenLen {
		return fmt.Errorf("Received peer up notification for %v: Invalid received open message: %v", msg.PerPeerHeader.PeerAddress, msg.ReceivedOpenMsg)
	}

Oliver Herms's avatar
Oliver Herms committed
	recvOpen, err := packet.DecodeOpenMsg(bytes.NewBuffer(msg.ReceivedOpenMsg[packet.HeaderLen:]))
	if err != nil {
Julian Kornberger's avatar
Julian Kornberger committed
		return errors.Wrapf(err, "Unable to decode received open message sent from %v to %v", msg.PerPeerHeader.PeerAddress, r.address.String())
	}

	addrLen := net.IPv4len
	if msg.PerPeerHeader.GetIPVersion() == 6 {
		addrLen = net.IPv6len
	}

	// bnet.IPFromBytes can only fail if length of argument is not 4 or 16. However, length is ensured here.
	peerAddress, _ := bnet.IPFromBytes(msg.PerPeerHeader.PeerAddress[16-addrLen:])
	localAddress, _ := bnet.IPFromBytes(msg.LocalAddress[16-addrLen:])

	fsm := &FSM{
		isBMP: true,
		peer: &peer{
			routerID:  sentOpen.BGPIdentifier,
takt's avatar
takt committed
			addr:      peerAddress.Dedup(),
			localAddr: localAddress.Dedup(),
			peerASN:   msg.PerPeerHeader.PeerAS,
			localASN:  uint32(sentOpen.ASN),
			ipv4:      &peerAddressFamily{},
			ipv6:      &peerAddressFamily{},
takt's avatar
takt committed
			vrf:       r.vrfRegistry.CreateVRFIfNotExists(fmt.Sprintf("%d", msg.PerPeerHeader.PeerDistinguisher), msg.PerPeerHeader.PeerDistinguisher),
takt's avatar
takt committed
	fsm.peer.fsms = []*FSM{
		fsm,
	}

	fsm.peer.configureBySentOpen(sentOpen)

takt's avatar
takt committed
	rib4, found := fsm.peer.vrf.RIBByName("inet.0")
	if !found {
		return fmt.Errorf("Unable to get inet RIB")
	}
	fsm.ipv4Unicast = newFSMAddressFamily(packet.IPv4AFI, packet.UnicastSAFI, &peerAddressFamily{
		rib:               rib4,
		importFilterChain: filter.NewAcceptAllFilterChain(),
	}, fsm)
	fsm.ipv4Unicast.bmpInit()

takt's avatar
takt committed
	rib6, found := fsm.peer.vrf.RIBByName("inet6.0")
	if !found {
		return fmt.Errorf("Unable to get inet6 RIB")
	}

Oliver Herms's avatar
Oliver Herms committed
	fsm.ipv6Unicast = newFSMAddressFamily(packet.IPv6AFI, packet.UnicastSAFI, &peerAddressFamily{
		rib:               rib6,
		importFilterChain: filter.NewAcceptAllFilterChain(),
	}, fsm)
Oliver Herms's avatar
Oliver Herms committed
	fsm.ipv6Unicast.bmpInit()

	fsm.state = newOpenSentState(fsm)
	openSent := fsm.state.(*openSentState)
	openSent.openMsgReceived(recvOpen)

	fsm.state = newEstablishedState(fsm)
	n := &neighbor{
takt's avatar
takt committed
		vrfID:       msg.PerPeerHeader.PeerDistinguisher,
Oliver Herms's avatar
Oliver Herms committed
		localAS:     fsm.peer.localASN,
		peerAS:      msg.PerPeerHeader.PeerAS,
		peerAddress: msg.PerPeerHeader.PeerAddress,
		routerID:    recvOpen.BGPIdentifier,
		fsm:         fsm,
		opt:         fsm.decodeOptions(),
takt's avatar
takt committed
	err = r.neighborManager.addNeighbor(n)
	if err != nil {
		return errors.Wrap(err, "Unable to add neighbor")
	}

	r.ribClientsMu.Lock()
	defer r.ribClientsMu.Unlock()
	n.registerClients(r.ribClients)

func (n *neighbor) registerClients(clients map[afiClient]struct{}) {
	for ac := range clients {
		if ac.afi == packet.IPv4AFI {
Oliver Herms's avatar
Oliver Herms committed
			n.fsm.ipv4Unicast.adjRIBIn.Register(ac.client)
		}
		if ac.afi == packet.IPv6AFI {
Oliver Herms's avatar
Oliver Herms committed
			n.fsm.ipv6Unicast.adjRIBIn.Register(ac.client)
func (p *peer) configureBySentOpen(msg *packet.BGPOpen) {
takt's avatar
takt committed
	capsList := getCaps(msg.OptParams)
	for _, caps := range capsList {
		for _, cap := range caps {
			switch cap.Code {
			case packet.AddPathCapabilityCode:
				addPathCap := cap.Value.(packet.AddPathCapability)
				peerFamily := p.addressFamily(addPathCap.AFI, addPathCap.SAFI)
				if peerFamily == nil {
					continue
takt's avatar
takt committed
				switch addPathCap.SendReceive {
				case packet.AddPathSend:
					peerFamily.addPathSend = routingtable.ClientOptions{
						MaxPaths: 10,
					}
				case packet.AddPathReceive:
					peerFamily.addPathReceive = true
				case packet.AddPathSendReceive:
					peerFamily.addPathReceive = true
					peerFamily.addPathSend = routingtable.ClientOptions{
						MaxPaths: 10,
					}
takt's avatar
takt committed
			case packet.ASN4CapabilityCode:
				asn4Cap := cap.Value.(packet.ASN4Capability)
				p.localASN = asn4Cap.ASN4
takt's avatar
takt committed
func getCaps(optParams []packet.OptParam) []packet.Capabilities {
	res := make([]packet.Capabilities, 0)
	for _, optParam := range optParams {
		if optParam.Type != packet.CapabilitiesParamType {
			continue
		}

takt's avatar
takt committed
		res = append(res, optParam.Value.(packet.Capabilities))
takt's avatar
takt committed

	return res
takt's avatar
takt committed

func addrToNetIP(a [16]byte) net.IP {
	for i := 0; i < 12; i++ {
		if a[i] != 0 {
			return net.IP(a[:])
		}
	}

	return net.IP(a[12:])
}