Newer
Older
"context"
mnepb "code.fbi.h-da.de/danet/gosdn/api/go/gosdn/networkelement"
"github.com/openconfig/gnmi/proto/gnmi"
var changeTimeout time.Duration
func init() {
var err error
e := os.Getenv("GOSDN_CHANGE_TIMEOUT")
if e != "" {
changeTimeout, err = time.ParseDuration(e)
if err != nil {
log.Fatal(err)
}
// NewChange takes a Device UUID, a pair GoStructs (current and intended state),
// a callback function, and returns a *Change.
// The callback function is used by the Commit() and Confirm() functions. It
// must define how the change is carried out.
func NewChange(mne uuid.UUID, currentState, change []byte, diff *gnmi.Notification, callback func([]byte, []byte) error) *Change {
state: mnepb.ChangeState_CHANGE_STATE_PENDING,
timestamp: time.Now(),
previousState: currentState,
diff: diff,
intendedState: change,
callback: callback,
}
stateManagerCtx, stateManagerCancel := context.WithCancel(context.Background())
stateIn, stateOut, errChan := stateManager(stateManagerCtx, c, changeTimeout)
c.stateIn = stateIn
c.stateOut = stateOut
c.errChan = errChan
c.stateManagerCancel = stateManagerCancel
// Change is an intended change to an MNE. It is unique and immutable.
// It has a cuid, a timestamp, and holds both the previous and the new
// state. It keeps track if the state is committed and confirmed. A callback
// exists to acess the proper transport for the changed MNE.
cuid uuid.UUID
duid uuid.UUID
state mnepb.ChangeState
timestamp time.Time
previousState []byte
intendedState []byte
diff *gnmi.Notification
callback func([]byte, []byte) error
stateMu sync.RWMutex
errChan <-chan error
stateIn chan<- mnepb.ChangeState
stateOut <-chan mnepb.ChangeState
stateManagerCancel context.CancelFunc
// ID returns the Change's UUID.
func (c *Change) ID() uuid.UUID {
return c.cuid
}
// AssociatedDeviceID returns the change's asssociated device UUID.
func (c *Change) AssociatedDeviceID() uuid.UUID {
return c.duid
}
// Commit pushes the change to the MNE using the callback() function
// and starts the timeout-timer for the Change. If the timer expires
// the change is rolled back.
//TODO: check if already committed
c.stateIn <- mnepb.ChangeState_CHANGE_STATE_COMMITTED
return err
case <-c.stateOut:
// Confirm confirms a committed Change and stops the rollback timer.
//TODO: check if already confirmed
c.stateIn <- mnepb.ChangeState_CHANGE_STATE_CONFIRMED
return err
case <-c.stateOut:
// Age returns the passed time since the Change was created.
func (c *Change) Age() time.Duration {
return time.Since(c.timestamp)
}
// State returns the changes's state.
func (c *Change) State() mnepb.ChangeState {
c.stateMu.RLock()
state := c.state
c.stateMu.RUnlock()
return state
// PreviousState returns the previous state of the devices config as
// ygot.GoStruct.
func (c *Change) PreviousState() []byte {
return c.previousState
}
// IntendedState returns the intended state of the devices config as
// ygot.GoStruct.
func (c *Change) IntendedState() []byte {
return c.intendedState
}
// Diff returns the differences between the previous and the intended state as
// gnmi.Notification.
func (c *Change) Diff() *gnmi.Notification {
return c.diff
}
func stateManager(ctx context.Context, ch *Change, timeout time.Duration) (chan<- mnepb.ChangeState, <-chan mnepb.ChangeState, <-chan error) {
stateIn := make(chan mnepb.ChangeState)
stateOut := make(chan mnepb.ChangeState)
// A Goroutine, which is created while a new Change is initialized acts as
// the receiver for errorChan
// create ticker and make it wait for 1 week
// workaround for delayed ticker start and ugly housekeeping
ticker := time.NewTicker(time.Hour * 7 * 24)
ticker.Stop()
running:
//TODO: priority select? should ticker have priority?
state := ch.State()
if state == mnepb.ChangeState_CHANGE_STATE_CONFIRMED {
continue
}
err := ch.callback(ch.intendedState, ch.previousState)
if err != nil {
ch.stateMu.Lock()
ch.state = mnepb.ChangeState_CHANGE_STATE_INCONSISTENT
ch.stateMu.Unlock()
log.Errorf("change %v timed out", ch.cuid)
log.Error(err)
continue
// A rollback has been executed and the timer is stopped
ticker.Stop()
// TODO: keep the Change as pending, or remove it?
ch.stateMu.Lock()
ch.state = mnepb.ChangeState_CHANGE_STATE_PENDING
ch.stateMu.Unlock()
log.Errorf("change %v timed out", ch.cuid)
case s := <-stateIn:
switch s {
case mnepb.ChangeState_CHANGE_STATE_COMMITTED:
state := ch.State()
if state == mnepb.ChangeState_CHANGE_STATE_COMMITTED || state == mnepb.ChangeState_CHANGE_STATE_CONFIRMED {
errChan <- fmt.Errorf("change %v already %s", ch.cuid, state.String())
continue
}
// reset ticker to enable activate the change timeout
ticker.Reset(timeout)
err := ch.callback(ch.previousState, ch.intendedState)
if err != nil {
ch.stateMu.Lock()
ch.state = mnepb.ChangeState_CHANGE_STATE_INCONSISTENT
ch.stateMu.Unlock()
errChan <- err
continue
}
ch.stateMu.Lock()
ch.state = mnepb.ChangeState_CHANGE_STATE_COMMITTED
ch.stateMu.Unlock()
case mnepb.ChangeState_CHANGE_STATE_CONFIRMED:
state := ch.State()
if state != mnepb.ChangeState_CHANGE_STATE_COMMITTED {
errChan <- fmt.Errorf("cannot confirm uncommitted change %v", ch.cuid)
continue
}
// The change has been confirmed and the timer is stopped,
// since a rollback is not necessary anymore.
ch.stateMu.Lock()
ch.state = mnepb.ChangeState_CHANGE_STATE_CONFIRMED
ch.stateMu.Unlock()
ch.stateManagerCancel()
case <-ctx.Done():
ticker.Stop()
break running
log.Info("statemanager routine done for: ", ch.cuid)
return stateIn, stateOut, errChan