diff --git a/nucleus/change.go b/nucleus/change.go index 5340cfca151c77830830df0d336dd05f6d334681..ca4a6bd4bfcf43993995bc41139a1ff83ee0a09b 100644 --- a/nucleus/change.go +++ b/nucleus/change.go @@ -1,8 +1,10 @@ package nucleus import ( + "context" "fmt" "os" + "sync" "time" ppb "code.fbi.h-da.de/danet/api/go/gosdn/pnd" @@ -36,16 +38,18 @@ func NewChange(device uuid.UUID, currentState ygot.GoStruct, change ygot.GoStruc c := &Change{ cuid: uuid.New(), duid: device, + state: ppb.Change_PENDING, timestamp: time.Now(), previousState: currentState, intendedState: change, callback: callback, } - stateIn, stateOut, requestState, errChan := stateManager(c, changeTimeout) + stateManagerCtx, stateManagerCancel := context.WithCancel(context.Background()) + stateIn, stateOut, errChan := stateManager(stateManagerCtx, c, changeTimeout) c.stateIn = stateIn c.stateOut = stateOut - c.requestState = requestState c.errChan = errChan + c.stateManagerCancel = stateManagerCancel return c } @@ -54,16 +58,18 @@ func NewChange(device uuid.UUID, currentState ygot.GoStruct, change ygot.GoStruc // state. It keeps track if the state is committed and confirmed. A callback // exists to acess the proper transport for the changed OND type Change struct { - cuid uuid.UUID - duid uuid.UUID - timestamp time.Time - previousState ygot.GoStruct - intendedState ygot.GoStruct - callback func(ygot.GoStruct, ygot.GoStruct) error - errChan <-chan error - requestState chan<- bool - stateIn chan<- ppb.Change_State - stateOut <-chan ppb.Change_State + cuid uuid.UUID + duid uuid.UUID + state ppb.Change_State + timestamp time.Time + previousState ygot.GoStruct + intendedState ygot.GoStruct + callback func(ygot.GoStruct, ygot.GoStruct) error + stateMu sync.RWMutex + errChan <-chan error + stateIn chan<- ppb.Change_State + stateOut <-chan ppb.Change_State + stateManagerCancel context.CancelFunc } // ID returns the Change's UUID @@ -75,9 +81,7 @@ func (c *Change) ID() uuid.UUID { // and starts the timeout-timer for the Change. If the timer expires // the change is rolled back. func (c *Change) Commit() error { - if c.State() == ppb.Change_COMMITTED { - return fmt.Errorf("change %v already committed", c.cuid) - } + //TODO: check if already commited c.stateIn <- ppb.Change_COMMITTED select { case err := <-c.errChan: @@ -89,9 +93,7 @@ func (c *Change) Commit() error { // Confirm confirms a committed Change and stops the rollback timer. func (c *Change) Confirm() error { - if c.State() != ppb.Change_COMMITTED { - return fmt.Errorf("cannot confirm uncommitted change %v", c.cuid) - } + //TODO: check if already confirmed c.stateIn <- ppb.Change_CONFIRMED select { case err := <-c.errChan: @@ -108,51 +110,91 @@ func (c *Change) Age() time.Duration { // State returns the changes's state. func (c *Change) State() ppb.Change_State { - c.requestState <- true - return <-c.stateOut + c.stateMu.RLock() + state := c.state + c.stateMu.RUnlock() + return state } -func stateManager(ch *Change, timeout time.Duration) (chan<- ppb.Change_State, <-chan ppb.Change_State, chan<- bool, <-chan error) { +func stateManager(ctx context.Context, ch *Change, timeout time.Duration) (chan<- ppb.Change_State, <-chan ppb.Change_State, <-chan error) { stateIn := make(chan ppb.Change_State) stateOut := make(chan ppb.Change_State) - stateRequest := make(chan bool) + // A Goroutine, which is created while a new Change is initialized acts as + // the reciever for errorChan errChan := make(chan error) // create ticker and make it wait for 1 week // workaround for delayed ticker start and ugly housekeeping ticker := time.NewTicker(time.Hour * 7 * 24) + ticker.Stop() go func() { - state := ppb.Change_PENDING + running: for { + //TODO: priority select? should ticker have priority? select { case <-ticker.C: + state := ch.State() + if state == ppb.Change_CONFIRMED { + continue + } err := ch.callback(ch.intendedState, ch.previousState) if err != nil { - state = ppb.Change_INCONSISTENT - errChan <- err + ch.stateMu.Lock() + ch.state = ppb.Change_INCONSISTENT + ch.stateMu.Unlock() + log.Errorf("change %v timed out", ch.cuid) + log.Error(err) + continue } - errChan <- fmt.Errorf("change %v timed out", ch.cuid) + // A rollback has been executed and the timer is stopped + ticker.Stop() + // TODO: keep the Change as pending, or remove it? + ch.stateMu.Lock() + ch.state = ppb.Change_PENDING + ch.stateMu.Unlock() + log.Errorf("change %v timed out", ch.cuid) case s := <-stateIn: switch s { case ppb.Change_COMMITTED: + state := ch.State() + if state == ppb.Change_COMMITTED || state == ppb.Change_CONFIRMED { + errChan <- fmt.Errorf("change %v already %s", ch.cuid, state.String()) + continue + } // reset ticker to enable activate the change timeout ticker.Reset(timeout) err := ch.callback(ch.previousState, ch.intendedState) if err != nil { - state = ppb.Change_INCONSISTENT + ch.stateMu.Lock() + ch.state = ppb.Change_INCONSISTENT + ch.stateMu.Unlock() errChan <- err continue } - state = ppb.Change_COMMITTED + ch.stateMu.Lock() + ch.state = ppb.Change_COMMITTED + ch.stateMu.Unlock() stateOut <- state case ppb.Change_CONFIRMED: - state = ppb.Change_CONFIRMED + state := ch.State() + if state != ppb.Change_COMMITTED { + errChan <- fmt.Errorf("cannot confirm uncommitted change %v", ch.cuid) + continue + } + // The change has been confirmed and the timer is stopped, + // since a rollback is not necessary anymore. + ch.stateMu.Lock() + ch.state = ppb.Change_CONFIRMED + ch.stateMu.Unlock() stateOut <- state + ch.stateManagerCancel() } - case <-stateRequest: - stateOut <- state + case <-ctx.Done(): + ticker.Stop() + break running } } + log.Info("statemanager routine done for: ", ch.cuid) }() - return stateIn, stateOut, stateRequest, errChan + return stateIn, stateOut, errChan } diff --git a/nucleus/change_test.go b/nucleus/change_test.go index 6fdff64fc223abb8adcd43a0192ca9a74185f804..bb59c2f97706ad7522c40e51e601c0fed576c6fd 100644 --- a/nucleus/change_test.go +++ b/nucleus/change_test.go @@ -1,6 +1,7 @@ package nucleus import ( + "context" "errors" "reflect" "sync" @@ -50,11 +51,12 @@ func TestChange_CommitRollback(t *testing.T) { return nil }, } - stateIn, stateOut, requestState, errChan := stateManager(c, time.Millisecond*100) + stateManagerCtx, stateManagerCancel := context.WithCancel(context.Background()) + stateIn, stateOut, errChan := stateManager(stateManagerCtx, c, time.Millisecond*100) c.stateIn = stateIn c.stateOut = stateOut - c.requestState = requestState c.errChan = errChan + c.stateManagerCancel = stateManagerCancel wg.Add(1) go func() { defer wg.Done() @@ -69,6 +71,7 @@ func TestChange_CommitRollback(t *testing.T) { t.Errorf("Commit() = %v, want %v", got, want) } wg.Wait() + c.stateManagerCancel() } func TestChange_CommitRollbackError(t *testing.T) { @@ -76,6 +79,7 @@ func TestChange_CommitRollbackError(t *testing.T) { wg.Add(1) wantErr := false want := errors.New("this is an expected error") + rollbackErrChannel := make(chan error) c := &Change{ cuid: cuid, duid: did, @@ -87,16 +91,17 @@ func TestChange_CommitRollbackError(t *testing.T) { t.Logf("callback in test %v", t.Name()) switch hostname { case rollbackHostname: - return errors.New("this is an expected error") + rollbackErrChannel <- errors.New("this is an expected error") } return nil }, } - stateIn, stateOut, requestState, errChan := stateManager(c, time.Millisecond*100) + stateManagerCtx, stateManagerCancel := context.WithCancel(context.Background()) + stateIn, stateOut, errChan := stateManager(stateManagerCtx, c, time.Millisecond*100) c.stateIn = stateIn c.stateOut = stateOut - c.requestState = requestState c.errChan = errChan + c.stateManagerCancel = stateManagerCancel go func() { defer wg.Done() @@ -106,11 +111,12 @@ func TestChange_CommitRollbackError(t *testing.T) { } time.Sleep(config.ChangeTimeout) }() - got := <-c.errChan + got := <-rollbackErrChannel if !reflect.DeepEqual(got, want) { t.Errorf("Commit() = %v, want %v", got, want) } wg.Wait() + c.stateManagerCancel() } func TestChange_CommitError(t *testing.T) { @@ -125,11 +131,12 @@ func TestChange_CommitError(t *testing.T) { return errors.New("this is an expected error") }, } - stateIn, stateOut, requestState, errChan := stateManager(c, time.Millisecond*100) + stateManagerCtx, stateManagerCancel := context.WithCancel(context.Background()) + stateIn, stateOut, errChan := stateManager(stateManagerCtx, c, time.Millisecond*100) c.stateIn = stateIn c.stateOut = stateOut - c.requestState = requestState c.errChan = errChan + c.stateManagerCancel = stateManagerCancel time.Sleep(time.Millisecond * 10) if err := c.Commit(); err == nil { @@ -139,6 +146,7 @@ func TestChange_CommitError(t *testing.T) { if !reflect.DeepEqual(got, want) { t.Errorf("Commit() = %v, want %v", got, want) } + c.stateManagerCancel() } func TestChange_Commit(t *testing.T) { @@ -154,11 +162,12 @@ func TestChange_Commit(t *testing.T) { return nil }, } - stateIn, stateOut, requestState, errChan := stateManager(c, time.Millisecond*100) + stateManagerCtx, stateManagerCancel := context.WithCancel(context.Background()) + stateIn, stateOut, errChan := stateManager(stateManagerCtx, c, time.Millisecond*100) c.stateIn = stateIn c.stateOut = stateOut - c.requestState = requestState c.errChan = errChan + c.stateManagerCancel = stateManagerCancel if err := c.Commit(); err != nil { t.Errorf("Commit() error = %v", err) @@ -204,11 +213,12 @@ func TestChange_Confirm(t *testing.T) { return nil }, } - stateIn, stateOut, requestState, errChan := stateManager(c, time.Millisecond*100) + stateManagerCtx, stateManagerCancel := context.WithCancel(context.Background()) + stateIn, stateOut, errChan := stateManager(stateManagerCtx, c, time.Millisecond*100) c.stateIn = stateIn c.stateOut = stateOut - c.requestState = requestState c.errChan = errChan + c.stateManagerCancel = stateManagerCancel if tt.name == "committed" { if err := c.Commit(); err != nil { @@ -218,6 +228,7 @@ func TestChange_Confirm(t *testing.T) { if err := c.Confirm(); (err != nil) != tt.wantErr { t.Errorf("Confirm() error = %v, wantErr %v", err, tt.wantErr) } + c.stateManagerCancel() }) } } @@ -269,8 +280,9 @@ func TestChange_State(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + testName := t.Name() callback := func(first ygot.GoStruct, second ygot.GoStruct) error { - t.Logf("callback in test %v", t.Name()) + t.Logf("callback in test %v", testName) return nil } c := NewChange(did, rollbackDevice, commitDevice, callback) @@ -287,6 +299,7 @@ func TestChange_State(t *testing.T) { if got := c.State(); !reflect.DeepEqual(got, tt.want) { t.Errorf("Change.State() = %v, want %v", got, tt.want) } + c.stateManagerCancel() }) } }