Newer
Older
"sync"
"time"
"github.com/bio-routing/bio-rd/protocols/bgp/packet"
"github.com/bio-routing/bio-rd/route"
"github.com/bio-routing/bio-rd/routingtable"
bnet "github.com/bio-routing/bio-rd/net"
log "github.com/sirupsen/logrus"
fsm *FSM
iBGP bool
toSendMu sync.Mutex
toSend map[string]*pathPfxs
destroyCh chan struct{}
}
type pathPfxs struct {
path *route.Path
pfxs []bnet.Prefix
}
fsm: fsm,
iBGP: fsm.peer.localASN == fsm.peer.peerASN,
destroyCh: make(chan struct{}),
toSend: make(map[string]*pathPfxs),
// Start starts the update sender
func (u *UpdateSender) Start(aggrTime time.Duration) {
go u.sender(aggrTime)
}
// Destroy destroys everything (with greetings to Hatebreed)
func (u *UpdateSender) Destroy() {
u.destroyCh <- struct{}{}
}
// AddPath adds path p for pfx to toSend queue
func (u *UpdateSender) AddPath(pfx bnet.Prefix, p *route.Path) error {
u.toSendMu.Lock()
hash := p.BGPPath.ComputeHash()
if _, exists := u.toSend[hash]; exists {
u.toSend[hash].pfxs = append(u.toSend[hash].pfxs, pfx)
u.toSendMu.Unlock()
u.toSend[p.BGPPath.ComputeHash()] = &pathPfxs{
path: p,
pfxs: []bnet.Prefix{
pfx,
u.toSendMu.Unlock()
return nil
}
// sender serializes BGP update messages
func (u *UpdateSender) sender(aggrTime time.Duration) {
ticker := time.NewTicker(aggrTime)
var err error
var pathAttrs *packet.PathAttribute
var budget int
for {
select {
case <-u.destroyCh:
return
case <-ticker.C:
}
u.toSendMu.Lock()
for key, pathNLRIs := range u.toSend {
budget = packet.MaxLen - packet.HeaderLen - packet.MinUpdateLen - int(pathNLRIs.path.BGPPath.Length()) - overhead
pathAttrs, err = packet.PathAttributes(pathNLRIs.path, u.iBGP)
if err != nil {
log.Errorf("Unable to get path attributes: %v", err)
continue
}
updatesPrefixes := make([][]bnet.Prefix, 0, 1)
prefixes := make([]bnet.Prefix, 0, 1)
for _, pfx := range pathNLRIs.pfxs {
if budget < 0 {
updatesPrefixes = append(updatesPrefixes, prefixes)
budget = packet.MaxLen - int(pathNLRIs.path.BGPPath.Length()) - overhead
}
prefixes = append(prefixes, pfx)
}
if len(prefixes) > 0 {
updatesPrefixes = append(updatesPrefixes, prefixes)
}
delete(u.toSend, key)
u.toSendMu.Unlock()
u.sendUpdates(pathAttrs, updatesPrefixes, pathNLRIs.path.BGPPath.PathIdentifier)
u.toSendMu.Lock()
}
}
}
func (u *UpdateSender) updateOverhead() int {
// TODO: for multi RIB support we need the AFI/SAFI combination to determine overhead. For now: MultiProtocol = IPv6
if u.fsm.options.SupportsMultiProtocol {
// since we are replacing the next hop attribute IPv4Len has to be subtracted, we also add another byte for extended length
return packet.AFILen + packet.SAFILen + 1 + packet.IPv6Len - packet.IPv4Len + 1
}
return 0
}
func (u *UpdateSender) sendUpdates(pathAttrs *packet.PathAttribute, updatePrefixes [][]bnet.Prefix, pathID uint32) {
var err error
for _, prefixes := range updatePrefixes {
update := u.updateMessageForPrefixes(prefixes, pathAttrs, pathID)
err = serializeAndSendUpdate(u.fsm.con, update, u.fsm.options)
if err != nil {
log.Errorf("Failed to serialize and send: %v", err)
}
func (u *UpdateSender) updateMessageForPrefixes(pfxs []bnet.Prefix, pa *packet.PathAttribute, pathID uint32) *packet.BGPUpdate {
if u.fsm.options.SupportsMultiProtocol {
return u.bgpUpdateMultiProtocol(pfxs, pa, pathID)
}
return u.bgpUpdate(pfxs, pa, pathID)
}
func (u *UpdateSender) bgpUpdate(pfxs []bnet.Prefix, pa *packet.PathAttribute, pathID uint32) *packet.BGPUpdate {
update := &packet.BGPUpdate{
PathAttributes: pa,
}
var nlri *packet.NLRI
for _, pfx := range pfxs {
nlri = &packet.NLRI{
PathIdentifier: pathID,
IP: pfx.Addr().ToUint32(),
Pfxlen: pfx.Pfxlen(),
Next: update.NLRI,
}
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
return update
}
func (u *UpdateSender) bgpUpdateMultiProtocol(pfxs []bnet.Prefix, pa *packet.PathAttribute, pathID uint32) *packet.BGPUpdate {
pa, nextHop := u.copyAttributesWithoutNextHop(pa)
attrs := &packet.PathAttribute{
TypeCode: packet.MultiProtocolReachNLRICode,
Value: packet.MultiProtocolReachNLRI{
AFI: packet.IPv6AFI,
SAFI: packet.UnicastSAFI,
NextHop: nextHop,
Prefixes: pfxs,
},
}
attrs.Next = pa
return &packet.BGPUpdate{
PathAttributes: attrs,
}
}
func (u *UpdateSender) copyAttributesWithoutNextHop(pa *packet.PathAttribute) (attrs *packet.PathAttribute, nextHop bnet.IP) {
attrs = pa.Copy()
curCopy := attrs
cur := pa
if cur.TypeCode == packet.NextHopAttr {
nextHop = cur.Value.(bnet.IP)
cur = cur.Next
}
for cur != nil {
if cur.TypeCode == packet.NextHopAttr {
nextHop = cur.Value.(bnet.IP)
} else {
curCopy.Next = cur.Copy()
curCopy = curCopy.Next
}
}
func (u *UpdateSender) RemovePath(pfx bnet.Prefix, p *route.Path) bool {
err := withDrawPrefixesAddPath(u.fsm.con, u.fsm.options, pfx, p)
if err != nil {
log.Errorf("Unable to withdraw prefix: %v", err)
return false
}
return true
func (u *UpdateSender) UpdateNewClient(client routingtable.RouteTableClient) error {
log.Warningf("BGP Update Sender: UpdateNewClient not implemented")
// RouteCount returns the number of stored routes
func (u *UpdateSender) RouteCount() int64 {
log.Warningf("BGP Update Sender: RouteCount not implemented")
return 0
}