Newer
Older
"github.com/google/uuid"
"github.com/openconfig/ygot/ygot"
log "github.com/sirupsen/logrus"
)
var changeTimeout time.Duration
func init() {
var err error
e := os.Getenv("GOSDN_CHANGE_TIMEOUT")
if e != "" {
changeTimeout, err = time.ParseDuration(e)
if err != nil {
log.Fatal(err)
}
// NewChange takes a Device UUID, a pair GoStructs (current and intended state)
// a callback function and a channel for errors and returns a *Change
// The callback function is used by the Commit() and Confirm() functions. It
// must define how the change is carried out.
func NewChange(device uuid.UUID, currentState ygot.GoStruct, change ygot.GoStruct, callback func(ygot.GoStruct, ygot.GoStruct) error, errChan chan error) *Change {
return &Change{
cuid: uuid.New(),
duid: device,
timestamp: time.Now(),
previousState: currentState,
intendedState: change,
committed: false,
confirmed: false,
callback: callback,
// Change is an intended change to an OND. It is unique and immutable.
// It has a cuid, a timestamp, and holds both the previous and the new
// state. It keeps track if the state is committed and confirmed. A callback
// exists to acess the proper transport for the changed OND
type Change struct {
cuid uuid.UUID
duid uuid.UUID
timestamp time.Time
previousState ygot.GoStruct
intendedState ygot.GoStruct
committed bool
confirmed bool
callback func(ygot.GoStruct, ygot.GoStruct) error
out <-chan bool
commit chan<- *Change
confirm chan<- *Change
func (c *Change) ID() uuid.UUID {
return c.cuid
}
// Commit pushes the change to the OND using the callback() function
// and starts the timeout-timer for the Change. If the timer expires
// the change is rolled back.
if c.committed {
return fmt.Errorf("change %v already committed", c.cuid)
}
case err := <-c.errChan:
if err != nil {
return err
// Confirm confirms a committed Change and stops the rollback timer.
func (c *Change) Confirm() error {
if !c.committed {
return fmt.Errorf("cannot confirm uncommitted change %v", c.cuid)
}
c.confirm <- c
select {
case err := <-c.errChan:
if err != nil {
return err
}
case <-c.out:
return nil
// Age returns the passed time since the Change was created
func (c *Change) Age() time.Duration {
return time.Since(c.timestamp)
}
// State returns the changes's state.
func (c *Change) State() ppb.Change_State {
if !c.committed {
return ppb.Change_PENDING
} else if !c.confirmed {
return ppb.Change_COMMITTED
} else {
return ppb.Change_CONFIRMED
}
}
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
func stateManager(timeout time.Duration) (chan<- *Change, chan<- *Change, <-chan bool) {
commit := make(chan *Change)
confirm := make(chan *Change)
out := make(chan bool)
ticker := time.NewTicker(timeout)
go func() {
ch := <-commit
err := ch.callback(ch.previousState, ch.intendedState)
if err != nil {
ch.errChan <- err
}
ch.committed = true
out <- true
for {
select {
case <-ticker.C:
err := ch.callback(ch.intendedState, ch.previousState)
if err != nil {
ch.errChan <- err
}
ch.errChan <- fmt.Errorf("change %v timed out", ch.cuid)
break
case <-confirm:
ch.confirmed = true
out <- true
break
}
}
}()
return commit, confirm, out
}