Newer
Older
"github.com/google/uuid"
"github.com/openconfig/ygot/ygot"
log "github.com/sirupsen/logrus"
)
var changeTimeout time.Duration
func init() {
var err error
e := os.Getenv("GOSDN_CHANGE_TIMEOUT")
if e != "" {
changeTimeout, err = time.ParseDuration(e)
if err != nil {
log.Fatal(err)
}
// NewChange takes a Device UUID, a pair GoStructs (current and intended state),
// a callback function, and returns a *Change.
// The callback function is used by the Commit() and Confirm() functions. It
// must define how the change is carried out.
func NewChange(device uuid.UUID, currentState ygot.GoStruct, change ygot.GoStruct, callback func(ygot.GoStruct, ygot.GoStruct) error) *Change {
c := &Change{
cuid: uuid.New(),
duid: device,
timestamp: time.Now(),
previousState: currentState,
intendedState: change,
callback: callback,
}
stateIn, stateOut, requestState, errChan := stateManager(c, changeTimeout)
c.stateIn = stateIn
c.stateOut = stateOut
c.requestState = requestState
c.errChan = errChan
return c
// Change is an intended change to an OND. It is unique and immutable.
// It has a cuid, a timestamp, and holds both the previous and the new
// state. It keeps track if the state is committed and confirmed. A callback
// exists to acess the proper transport for the changed OND
type Change struct {
cuid uuid.UUID
duid uuid.UUID
timestamp time.Time
previousState ygot.GoStruct
intendedState ygot.GoStruct
callback func(ygot.GoStruct, ygot.GoStruct) error
errChan <-chan error
requestState chan<- bool
stateIn chan<- ppb.Change_State
stateOut <-chan ppb.Change_State
func (c *Change) ID() uuid.UUID {
return c.cuid
}
// Commit pushes the change to the OND using the callback() function
// and starts the timeout-timer for the Change. If the timer expires
// the change is rolled back.
if c.State() == ppb.Change_COMMITTED {
return fmt.Errorf("change %v already committed", c.cuid)
}
c.stateIn <- ppb.Change_COMMITTED
return err
case <-c.stateOut:
// Confirm confirms a committed Change and stops the rollback timer.
if c.State() != ppb.Change_COMMITTED {
return fmt.Errorf("cannot confirm uncommitted change %v", c.cuid)
}
c.stateIn <- ppb.Change_CONFIRMED
return err
case <-c.stateOut:
// Age returns the passed time since the Change was created
func (c *Change) Age() time.Duration {
return time.Since(c.timestamp)
}
// State returns the changes's state.
func (c *Change) State() ppb.Change_State {
c.requestState <- true
return <-c.stateOut
func stateManager(ch *Change, timeout time.Duration) (chan<- ppb.Change_State, <-chan ppb.Change_State, chan<- bool, <-chan error) {
stateIn := make(chan ppb.Change_State)
stateOut := make(chan ppb.Change_State)
stateRequest := make(chan bool)
errChan := make(chan error)
// create ticker and make it wait for 1 week
// workaround for delayed ticker start and ugly housekeeping
ticker := time.NewTicker(time.Hour * 7 * 24)
err := ch.callback(ch.intendedState, ch.previousState)
if err != nil {
state = ppb.Change_INCONSISTENT
errChan <- err
errChan <- fmt.Errorf("change %v timed out", ch.cuid)
break
case s := <-stateIn:
switch s {
case ppb.Change_COMMITTED:
// reset ticker to enable activate the change timeout
ticker.Reset(timeout)
err := ch.callback(ch.previousState, ch.intendedState)
if err != nil {
state = ppb.Change_INCONSISTENT
errChan <- err
continue
}
state = ppb.Change_COMMITTED
stateOut <- state
case ppb.Change_CONFIRMED:
state = ppb.Change_CONFIRMED
stateOut <- state
case <-stateRequest:
stateOut <- state
return stateIn, stateOut, stateRequest, errChan