Skip to content
Snippets Groups Projects
Commit fa9a4f14 authored by Malte Bauch's avatar Malte Bauch
Browse files

added context which allows to interrupt

This change is needed to prevent that the stateManager goroutine is
running forever. Now it will finish after the change has been confirmed.

This is probably also the reason for the data race within the tests,
because it seems to be a read/write race for t.done from within the test
package. This is because the goroutine keeps running and triggers
(because of the ticker) a t.Logf for a test which is already finished.

TODO: A method should be added which allows to Cancel() a Change.

Addresses: #152
parent dd4c1457
No related branches found
No related tags found
1 merge request!218Resolve "Internal Changes representation is not working as expected"
Pipeline #88275 passed
This commit is part of merge request !218. Comments created here will be created in the context of that merge request.
package nucleus
import (
"context"
"fmt"
"os"
"sync"
......@@ -43,10 +44,12 @@ func NewChange(device uuid.UUID, currentState ygot.GoStruct, change ygot.GoStruc
intendedState: change,
callback: callback,
}
stateIn, stateOut, errChan := stateManager(c, changeTimeout)
stateManagerCtx, stateManagerCancel := context.WithCancel(context.Background())
stateIn, stateOut, errChan := stateManager(c, stateManagerCtx, changeTimeout)
c.stateIn = stateIn
c.stateOut = stateOut
c.errChan = errChan
c.stateManagerCancel = stateManagerCancel
return c
}
......@@ -66,6 +69,7 @@ type Change struct {
errChan <-chan error
stateIn chan<- ppb.Change_State
stateOut <-chan ppb.Change_State
stateManagerCancel context.CancelFunc
}
// ID returns the Change's UUID
......@@ -77,6 +81,7 @@ func (c *Change) ID() uuid.UUID {
// and starts the timeout-timer for the Change. If the timer expires
// the change is rolled back.
func (c *Change) Commit() error {
//TODO: check if already commited
c.stateIn <- ppb.Change_COMMITTED
select {
case err := <-c.errChan:
......@@ -88,6 +93,7 @@ func (c *Change) Commit() error {
// Confirm confirms a committed Change and stops the rollback timer.
func (c *Change) Confirm() error {
//TODO: check if already confirmed
c.stateIn <- ppb.Change_CONFIRMED
select {
case err := <-c.errChan:
......@@ -110,7 +116,7 @@ func (c *Change) State() ppb.Change_State {
return state
}
func stateManager(ch *Change, timeout time.Duration) (chan<- ppb.Change_State, <-chan ppb.Change_State, <-chan error) {
func stateManager(ch *Change, ctx context.Context, timeout time.Duration) (chan<- ppb.Change_State, <-chan ppb.Change_State, <-chan error) {
stateIn := make(chan ppb.Change_State)
stateOut := make(chan ppb.Change_State)
// A Goroutine, which is created while a new Change is initialized acts as
......@@ -122,11 +128,13 @@ func stateManager(ch *Change, timeout time.Duration) (chan<- ppb.Change_State, <
ticker.Stop()
go func() {
running:
for {
//TODO: priority select? should ticker have priority?
select {
case <-ticker.C:
if ch.state == ppb.Change_CONFIRMED {
state := ch.State()
if state == ppb.Change_CONFIRMED {
continue
}
err := ch.callback(ch.intendedState, ch.previousState)
......@@ -148,8 +156,9 @@ func stateManager(ch *Change, timeout time.Duration) (chan<- ppb.Change_State, <
case s := <-stateIn:
switch s {
case ppb.Change_COMMITTED:
if ch.state == ppb.Change_COMMITTED || ch.state == ppb.Change_CONFIRMED {
errChan <- fmt.Errorf("change %v already %s", ch.cuid, ch.state.String())
state := ch.State()
if state == ppb.Change_COMMITTED || state == ppb.Change_CONFIRMED {
errChan <- fmt.Errorf("change %v already %s", ch.cuid, state.String())
continue
}
// reset ticker to enable activate the change timeout
......@@ -165,22 +174,27 @@ func stateManager(ch *Change, timeout time.Duration) (chan<- ppb.Change_State, <
ch.stateMu.Lock()
ch.state = ppb.Change_COMMITTED
ch.stateMu.Unlock()
stateOut <- ch.state
stateOut <- state
case ppb.Change_CONFIRMED:
if ch.state != ppb.Change_COMMITTED {
state := ch.State()
if state != ppb.Change_COMMITTED {
errChan <- fmt.Errorf("cannot confirm uncommitted change %v", ch.cuid)
continue
}
// The change has been confirmed and the timer is stopped,
// since a rollback is not necessary anymore.
ticker.Stop()
ch.stateMu.Lock()
ch.state = ppb.Change_CONFIRMED
ch.stateMu.Unlock()
stateOut <- ch.state
stateOut <- state
ch.stateManagerCancel()
}
case <-ctx.Done():
ticker.Stop()
break running
}
}
log.Info("statemanager routine done for: ", ch.cuid)
}()
return stateIn, stateOut, errChan
}
package nucleus
import (
"context"
"errors"
"reflect"
"sync"
......@@ -50,10 +51,12 @@ func TestChange_CommitRollback(t *testing.T) {
return nil
},
}
stateIn, stateOut, errChan := stateManager(c, time.Millisecond*100)
stateManagerCtx, stateManagerCancel := context.WithCancel(context.Background())
stateIn, stateOut, errChan := stateManager(c, stateManagerCtx, time.Millisecond*100)
c.stateIn = stateIn
c.stateOut = stateOut
c.errChan = errChan
c.stateManagerCancel = stateManagerCancel
wg.Add(1)
go func() {
defer wg.Done()
......@@ -68,6 +71,7 @@ func TestChange_CommitRollback(t *testing.T) {
t.Errorf("Commit() = %v, want %v", got, want)
}
wg.Wait()
c.stateManagerCancel()
}
func TestChange_CommitRollbackError(t *testing.T) {
......@@ -92,10 +96,12 @@ func TestChange_CommitRollbackError(t *testing.T) {
return nil
},
}
stateIn, stateOut, errChan := stateManager(c, time.Millisecond*100)
stateManagerCtx, stateManagerCancel := context.WithCancel(context.Background())
stateIn, stateOut, errChan := stateManager(c, stateManagerCtx, time.Millisecond*100)
c.stateIn = stateIn
c.stateOut = stateOut
c.errChan = errChan
c.stateManagerCancel = stateManagerCancel
go func() {
defer wg.Done()
......@@ -110,6 +116,7 @@ func TestChange_CommitRollbackError(t *testing.T) {
t.Errorf("Commit() = %v, want %v", got, want)
}
wg.Wait()
c.stateManagerCancel()
}
func TestChange_CommitError(t *testing.T) {
......@@ -124,10 +131,12 @@ func TestChange_CommitError(t *testing.T) {
return errors.New("this is an expected error")
},
}
stateIn, stateOut, errChan := stateManager(c, time.Millisecond*100)
stateManagerCtx, stateManagerCancel := context.WithCancel(context.Background())
stateIn, stateOut, errChan := stateManager(c, stateManagerCtx, time.Millisecond*100)
c.stateIn = stateIn
c.stateOut = stateOut
c.errChan = errChan
c.stateManagerCancel = stateManagerCancel
time.Sleep(time.Millisecond * 10)
if err := c.Commit(); err == nil {
......@@ -137,6 +146,7 @@ func TestChange_CommitError(t *testing.T) {
if !reflect.DeepEqual(got, want) {
t.Errorf("Commit() = %v, want %v", got, want)
}
c.stateManagerCancel()
}
func TestChange_Commit(t *testing.T) {
......@@ -152,10 +162,12 @@ func TestChange_Commit(t *testing.T) {
return nil
},
}
stateIn, stateOut, errChan := stateManager(c, time.Millisecond*100)
stateManagerCtx, stateManagerCancel := context.WithCancel(context.Background())
stateIn, stateOut, errChan := stateManager(c, stateManagerCtx, time.Millisecond*100)
c.stateIn = stateIn
c.stateOut = stateOut
c.errChan = errChan
c.stateManagerCancel = stateManagerCancel
if err := c.Commit(); err != nil {
t.Errorf("Commit() error = %v", err)
......@@ -201,10 +213,12 @@ func TestChange_Confirm(t *testing.T) {
return nil
},
}
stateIn, stateOut, errChan := stateManager(c, time.Millisecond*100)
stateManagerCtx, stateManagerCancel := context.WithCancel(context.Background())
stateIn, stateOut, errChan := stateManager(c, stateManagerCtx, time.Millisecond*100)
c.stateIn = stateIn
c.stateOut = stateOut
c.errChan = errChan
c.stateManagerCancel = stateManagerCancel
if tt.name == "committed" {
if err := c.Commit(); err != nil {
......@@ -214,6 +228,7 @@ func TestChange_Confirm(t *testing.T) {
if err := c.Confirm(); (err != nil) != tt.wantErr {
t.Errorf("Confirm() error = %v, wantErr %v", err, tt.wantErr)
}
c.stateManagerCancel()
})
}
}
......@@ -284,6 +299,7 @@ func TestChange_State(t *testing.T) {
if got := c.State(); !reflect.DeepEqual(got, tt.want) {
t.Errorf("Change.State() = %v, want %v", got, tt.want)
}
c.stateManagerCancel()
})
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment