Skip to content
Snippets Groups Projects

Resolve "Internal Changes representation is not working as expected"

2 files
+ 55
25
Compare changes
  • Side-by-side
  • Inline
Files
2
  • fa9a4f14
    added context which allows to interrupt · fa9a4f14
    Malte Bauch authored
    This change is needed to prevent that the stateManager goroutine is
    running forever. Now it will finish after the change has been confirmed.
    
    This is probably also the reason for the data race within the tests,
    because it seems to be a read/write race for t.done from within the test
    package. This is because the goroutine keeps running and triggers
    (because of the ticker) a t.Logf for a test which is already finished.
    
    TODO: A method should be added which allows to Cancel() a Change.
    
    Addresses: #152
+ 34
20
package nucleus
package nucleus
import (
import (
 
"context"
"fmt"
"fmt"
"os"
"os"
"sync"
"sync"
@@ -43,10 +44,12 @@ func NewChange(device uuid.UUID, currentState ygot.GoStruct, change ygot.GoStruc
@@ -43,10 +44,12 @@ func NewChange(device uuid.UUID, currentState ygot.GoStruct, change ygot.GoStruc
intendedState: change,
intendedState: change,
callback: callback,
callback: callback,
}
}
stateIn, stateOut, errChan := stateManager(c, changeTimeout)
stateManagerCtx, stateManagerCancel := context.WithCancel(context.Background())
 
stateIn, stateOut, errChan := stateManager(c, stateManagerCtx, changeTimeout)
c.stateIn = stateIn
c.stateIn = stateIn
c.stateOut = stateOut
c.stateOut = stateOut
c.errChan = errChan
c.errChan = errChan
 
c.stateManagerCancel = stateManagerCancel
return c
return c
}
}
@@ -55,17 +58,18 @@ func NewChange(device uuid.UUID, currentState ygot.GoStruct, change ygot.GoStruc
@@ -55,17 +58,18 @@ func NewChange(device uuid.UUID, currentState ygot.GoStruct, change ygot.GoStruc
// state. It keeps track if the state is committed and confirmed. A callback
// state. It keeps track if the state is committed and confirmed. A callback
// exists to acess the proper transport for the changed OND
// exists to acess the proper transport for the changed OND
type Change struct {
type Change struct {
cuid uuid.UUID
cuid uuid.UUID
duid uuid.UUID
duid uuid.UUID
state ppb.Change_State
state ppb.Change_State
timestamp time.Time
timestamp time.Time
previousState ygot.GoStruct
previousState ygot.GoStruct
intendedState ygot.GoStruct
intendedState ygot.GoStruct
callback func(ygot.GoStruct, ygot.GoStruct) error
callback func(ygot.GoStruct, ygot.GoStruct) error
stateMu sync.RWMutex
stateMu sync.RWMutex
errChan <-chan error
errChan <-chan error
stateIn chan<- ppb.Change_State
stateIn chan<- ppb.Change_State
stateOut <-chan ppb.Change_State
stateOut <-chan ppb.Change_State
 
stateManagerCancel context.CancelFunc
}
}
// ID returns the Change's UUID
// ID returns the Change's UUID
@@ -77,6 +81,7 @@ func (c *Change) ID() uuid.UUID {
@@ -77,6 +81,7 @@ func (c *Change) ID() uuid.UUID {
// and starts the timeout-timer for the Change. If the timer expires
// and starts the timeout-timer for the Change. If the timer expires
// the change is rolled back.
// the change is rolled back.
func (c *Change) Commit() error {
func (c *Change) Commit() error {
 
//TODO: check if already commited
c.stateIn <- ppb.Change_COMMITTED
c.stateIn <- ppb.Change_COMMITTED
select {
select {
case err := <-c.errChan:
case err := <-c.errChan:
@@ -88,6 +93,7 @@ func (c *Change) Commit() error {
@@ -88,6 +93,7 @@ func (c *Change) Commit() error {
// Confirm confirms a committed Change and stops the rollback timer.
// Confirm confirms a committed Change and stops the rollback timer.
func (c *Change) Confirm() error {
func (c *Change) Confirm() error {
 
//TODO: check if already confirmed
c.stateIn <- ppb.Change_CONFIRMED
c.stateIn <- ppb.Change_CONFIRMED
select {
select {
case err := <-c.errChan:
case err := <-c.errChan:
@@ -110,7 +116,7 @@ func (c *Change) State() ppb.Change_State {
@@ -110,7 +116,7 @@ func (c *Change) State() ppb.Change_State {
return state
return state
}
}
func stateManager(ch *Change, timeout time.Duration) (chan<- ppb.Change_State, <-chan ppb.Change_State, <-chan error) {
func stateManager(ch *Change, ctx context.Context, timeout time.Duration) (chan<- ppb.Change_State, <-chan ppb.Change_State, <-chan error) {
stateIn := make(chan ppb.Change_State)
stateIn := make(chan ppb.Change_State)
stateOut := make(chan ppb.Change_State)
stateOut := make(chan ppb.Change_State)
// A Goroutine, which is created while a new Change is initialized acts as
// A Goroutine, which is created while a new Change is initialized acts as
@@ -122,11 +128,13 @@ func stateManager(ch *Change, timeout time.Duration) (chan<- ppb.Change_State, <
@@ -122,11 +128,13 @@ func stateManager(ch *Change, timeout time.Duration) (chan<- ppb.Change_State, <
ticker.Stop()
ticker.Stop()
go func() {
go func() {
 
running:
for {
for {
//TODO: priority select? should ticker have priority?
//TODO: priority select? should ticker have priority?
select {
select {
case <-ticker.C:
case <-ticker.C:
if ch.state == ppb.Change_CONFIRMED {
state := ch.State()
 
if state == ppb.Change_CONFIRMED {
continue
continue
}
}
err := ch.callback(ch.intendedState, ch.previousState)
err := ch.callback(ch.intendedState, ch.previousState)
@@ -148,8 +156,9 @@ func stateManager(ch *Change, timeout time.Duration) (chan<- ppb.Change_State, <
@@ -148,8 +156,9 @@ func stateManager(ch *Change, timeout time.Duration) (chan<- ppb.Change_State, <
case s := <-stateIn:
case s := <-stateIn:
switch s {
switch s {
case ppb.Change_COMMITTED:
case ppb.Change_COMMITTED:
if ch.state == ppb.Change_COMMITTED || ch.state == ppb.Change_CONFIRMED {
state := ch.State()
errChan <- fmt.Errorf("change %v already %s", ch.cuid, ch.state.String())
if state == ppb.Change_COMMITTED || state == ppb.Change_CONFIRMED {
 
errChan <- fmt.Errorf("change %v already %s", ch.cuid, state.String())
continue
continue
}
}
// reset ticker to enable activate the change timeout
// reset ticker to enable activate the change timeout
@@ -165,22 +174,27 @@ func stateManager(ch *Change, timeout time.Duration) (chan<- ppb.Change_State, <
@@ -165,22 +174,27 @@ func stateManager(ch *Change, timeout time.Duration) (chan<- ppb.Change_State, <
ch.stateMu.Lock()
ch.stateMu.Lock()
ch.state = ppb.Change_COMMITTED
ch.state = ppb.Change_COMMITTED
ch.stateMu.Unlock()
ch.stateMu.Unlock()
stateOut <- ch.state
stateOut <- state
case ppb.Change_CONFIRMED:
case ppb.Change_CONFIRMED:
if ch.state != ppb.Change_COMMITTED {
state := ch.State()
 
if state != ppb.Change_COMMITTED {
errChan <- fmt.Errorf("cannot confirm uncommitted change %v", ch.cuid)
errChan <- fmt.Errorf("cannot confirm uncommitted change %v", ch.cuid)
continue
continue
}
}
// The change has been confirmed and the timer is stopped,
// The change has been confirmed and the timer is stopped,
// since a rollback is not necessary anymore.
// since a rollback is not necessary anymore.
ticker.Stop()
ch.stateMu.Lock()
ch.stateMu.Lock()
ch.state = ppb.Change_CONFIRMED
ch.state = ppb.Change_CONFIRMED
ch.stateMu.Unlock()
ch.stateMu.Unlock()
stateOut <- ch.state
stateOut <- state
 
ch.stateManagerCancel()
}
}
 
case <-ctx.Done():
 
ticker.Stop()
 
break running
}
}
}
}
 
log.Info("statemanager routine done for: ", ch.cuid)
}()
}()
return stateIn, stateOut, errChan
return stateIn, stateOut, errChan
}
}
Loading