Skip to content
Snippets Groups Projects

Resolve "Internal Changes representation is not working as expected"

Files
2
+ 74
32
package nucleus
import (
"context"
"fmt"
"os"
"sync"
"time"
ppb "code.fbi.h-da.de/danet/api/go/gosdn/pnd"
@@ -36,16 +38,18 @@ func NewChange(device uuid.UUID, currentState ygot.GoStruct, change ygot.GoStruc
c := &Change{
cuid: uuid.New(),
duid: device,
state: ppb.Change_PENDING,
timestamp: time.Now(),
previousState: currentState,
intendedState: change,
callback: callback,
}
stateIn, stateOut, requestState, errChan := stateManager(c, changeTimeout)
stateManagerCtx, stateManagerCancel := context.WithCancel(context.Background())
stateIn, stateOut, errChan := stateManager(stateManagerCtx, c, changeTimeout)
c.stateIn = stateIn
c.stateOut = stateOut
c.requestState = requestState
c.errChan = errChan
c.stateManagerCancel = stateManagerCancel
return c
}
@@ -54,16 +58,18 @@ func NewChange(device uuid.UUID, currentState ygot.GoStruct, change ygot.GoStruc
// state. It keeps track if the state is committed and confirmed. A callback
// exists to acess the proper transport for the changed OND
type Change struct {
cuid uuid.UUID
duid uuid.UUID
timestamp time.Time
previousState ygot.GoStruct
intendedState ygot.GoStruct
callback func(ygot.GoStruct, ygot.GoStruct) error
errChan <-chan error
requestState chan<- bool
stateIn chan<- ppb.Change_State
stateOut <-chan ppb.Change_State
cuid uuid.UUID
duid uuid.UUID
state ppb.Change_State
timestamp time.Time
previousState ygot.GoStruct
intendedState ygot.GoStruct
callback func(ygot.GoStruct, ygot.GoStruct) error
stateMu sync.RWMutex
errChan <-chan error
stateIn chan<- ppb.Change_State
stateOut <-chan ppb.Change_State
stateManagerCancel context.CancelFunc
}
// ID returns the Change's UUID
@@ -75,9 +81,7 @@ func (c *Change) ID() uuid.UUID {
// and starts the timeout-timer for the Change. If the timer expires
// the change is rolled back.
func (c *Change) Commit() error {
if c.State() == ppb.Change_COMMITTED {
return fmt.Errorf("change %v already committed", c.cuid)
}
//TODO: check if already commited
c.stateIn <- ppb.Change_COMMITTED
select {
case err := <-c.errChan:
@@ -89,9 +93,7 @@ func (c *Change) Commit() error {
// Confirm confirms a committed Change and stops the rollback timer.
func (c *Change) Confirm() error {
if c.State() != ppb.Change_COMMITTED {
return fmt.Errorf("cannot confirm uncommitted change %v", c.cuid)
}
//TODO: check if already confirmed
c.stateIn <- ppb.Change_CONFIRMED
select {
case err := <-c.errChan:
@@ -108,51 +110,91 @@ func (c *Change) Age() time.Duration {
// State returns the changes's state.
func (c *Change) State() ppb.Change_State {
c.requestState <- true
return <-c.stateOut
c.stateMu.RLock()
state := c.state
c.stateMu.RUnlock()
return state
}
func stateManager(ch *Change, timeout time.Duration) (chan<- ppb.Change_State, <-chan ppb.Change_State, chan<- bool, <-chan error) {
func stateManager(ctx context.Context, ch *Change, timeout time.Duration) (chan<- ppb.Change_State, <-chan ppb.Change_State, <-chan error) {
stateIn := make(chan ppb.Change_State)
stateOut := make(chan ppb.Change_State)
stateRequest := make(chan bool)
// A Goroutine, which is created while a new Change is initialized acts as
// the reciever for errorChan
errChan := make(chan error)
// create ticker and make it wait for 1 week
// workaround for delayed ticker start and ugly housekeeping
ticker := time.NewTicker(time.Hour * 7 * 24)
ticker.Stop()
go func() {
state := ppb.Change_PENDING
running:
for {
//TODO: priority select? should ticker have priority?
select {
case <-ticker.C:
state := ch.State()
if state == ppb.Change_CONFIRMED {
continue
}
err := ch.callback(ch.intendedState, ch.previousState)
if err != nil {
state = ppb.Change_INCONSISTENT
errChan <- err
ch.stateMu.Lock()
ch.state = ppb.Change_INCONSISTENT
ch.stateMu.Unlock()
log.Errorf("change %v timed out", ch.cuid)
log.Error(err)
continue
}
errChan <- fmt.Errorf("change %v timed out", ch.cuid)
// A rollback has been executed and the timer is stopped
ticker.Stop()
// TODO: keep the Change as pending, or remove it?
ch.stateMu.Lock()
ch.state = ppb.Change_PENDING
ch.stateMu.Unlock()
log.Errorf("change %v timed out", ch.cuid)
case s := <-stateIn:
switch s {
case ppb.Change_COMMITTED:
state := ch.State()
if state == ppb.Change_COMMITTED || state == ppb.Change_CONFIRMED {
errChan <- fmt.Errorf("change %v already %s", ch.cuid, state.String())
continue
}
// reset ticker to enable activate the change timeout
ticker.Reset(timeout)
err := ch.callback(ch.previousState, ch.intendedState)
if err != nil {
state = ppb.Change_INCONSISTENT
ch.stateMu.Lock()
ch.state = ppb.Change_INCONSISTENT
ch.stateMu.Unlock()
errChan <- err
continue
}
state = ppb.Change_COMMITTED
ch.stateMu.Lock()
ch.state = ppb.Change_COMMITTED
ch.stateMu.Unlock()
stateOut <- state
case ppb.Change_CONFIRMED:
state = ppb.Change_CONFIRMED
state := ch.State()
if state != ppb.Change_COMMITTED {
errChan <- fmt.Errorf("cannot confirm uncommitted change %v", ch.cuid)
continue
}
// The change has been confirmed and the timer is stopped,
// since a rollback is not necessary anymore.
ch.stateMu.Lock()
ch.state = ppb.Change_CONFIRMED
ch.stateMu.Unlock()
stateOut <- state
ch.stateManagerCancel()
}
case <-stateRequest:
stateOut <- state
case <-ctx.Done():
ticker.Stop()
break running
}
}
log.Info("statemanager routine done for: ", ch.cuid)
}()
return stateIn, stateOut, stateRequest, errChan
return stateIn, stateOut, errChan
}
Loading