Skip to content
Snippets Groups Projects
Commit aca2c0e8 authored by Malte Bauch's avatar Malte Bauch
Browse files

Merge branch '152-internal-changes-representation-is-not-working-as-expected' into 'develop'

Resolve "Internal Changes representation is not working as expected"

See merge request !218
parents 9b843f04 95843561
No related branches found
No related tags found
6 merge requests!246Develop,!245Develop into Master,!244Master into develop2 into master,!228Merge develop into stmakurz_http_server,!218Resolve "Internal Changes representation is not working as expected",!138Develop
Checking pipeline status
package nucleus package nucleus
import ( import (
"context"
"fmt" "fmt"
"os" "os"
"sync"
"time" "time"
ppb "code.fbi.h-da.de/danet/api/go/gosdn/pnd" ppb "code.fbi.h-da.de/danet/api/go/gosdn/pnd"
...@@ -36,16 +38,18 @@ func NewChange(device uuid.UUID, currentState ygot.GoStruct, change ygot.GoStruc ...@@ -36,16 +38,18 @@ func NewChange(device uuid.UUID, currentState ygot.GoStruct, change ygot.GoStruc
c := &Change{ c := &Change{
cuid: uuid.New(), cuid: uuid.New(),
duid: device, duid: device,
state: ppb.Change_PENDING,
timestamp: time.Now(), timestamp: time.Now(),
previousState: currentState, previousState: currentState,
intendedState: change, intendedState: change,
callback: callback, callback: callback,
} }
stateIn, stateOut, requestState, errChan := stateManager(c, changeTimeout) stateManagerCtx, stateManagerCancel := context.WithCancel(context.Background())
stateIn, stateOut, errChan := stateManager(stateManagerCtx, c, changeTimeout)
c.stateIn = stateIn c.stateIn = stateIn
c.stateOut = stateOut c.stateOut = stateOut
c.requestState = requestState
c.errChan = errChan c.errChan = errChan
c.stateManagerCancel = stateManagerCancel
return c return c
} }
...@@ -54,16 +58,18 @@ func NewChange(device uuid.UUID, currentState ygot.GoStruct, change ygot.GoStruc ...@@ -54,16 +58,18 @@ func NewChange(device uuid.UUID, currentState ygot.GoStruct, change ygot.GoStruc
// state. It keeps track if the state is committed and confirmed. A callback // state. It keeps track if the state is committed and confirmed. A callback
// exists to acess the proper transport for the changed OND // exists to acess the proper transport for the changed OND
type Change struct { type Change struct {
cuid uuid.UUID cuid uuid.UUID
duid uuid.UUID duid uuid.UUID
timestamp time.Time state ppb.Change_State
previousState ygot.GoStruct timestamp time.Time
intendedState ygot.GoStruct previousState ygot.GoStruct
callback func(ygot.GoStruct, ygot.GoStruct) error intendedState ygot.GoStruct
errChan <-chan error callback func(ygot.GoStruct, ygot.GoStruct) error
requestState chan<- bool stateMu sync.RWMutex
stateIn chan<- ppb.Change_State errChan <-chan error
stateOut <-chan ppb.Change_State stateIn chan<- ppb.Change_State
stateOut <-chan ppb.Change_State
stateManagerCancel context.CancelFunc
} }
// ID returns the Change's UUID // ID returns the Change's UUID
...@@ -75,9 +81,7 @@ func (c *Change) ID() uuid.UUID { ...@@ -75,9 +81,7 @@ func (c *Change) ID() uuid.UUID {
// and starts the timeout-timer for the Change. If the timer expires // and starts the timeout-timer for the Change. If the timer expires
// the change is rolled back. // the change is rolled back.
func (c *Change) Commit() error { func (c *Change) Commit() error {
if c.State() == ppb.Change_COMMITTED { //TODO: check if already commited
return fmt.Errorf("change %v already committed", c.cuid)
}
c.stateIn <- ppb.Change_COMMITTED c.stateIn <- ppb.Change_COMMITTED
select { select {
case err := <-c.errChan: case err := <-c.errChan:
...@@ -89,9 +93,7 @@ func (c *Change) Commit() error { ...@@ -89,9 +93,7 @@ func (c *Change) Commit() error {
// Confirm confirms a committed Change and stops the rollback timer. // Confirm confirms a committed Change and stops the rollback timer.
func (c *Change) Confirm() error { func (c *Change) Confirm() error {
if c.State() != ppb.Change_COMMITTED { //TODO: check if already confirmed
return fmt.Errorf("cannot confirm uncommitted change %v", c.cuid)
}
c.stateIn <- ppb.Change_CONFIRMED c.stateIn <- ppb.Change_CONFIRMED
select { select {
case err := <-c.errChan: case err := <-c.errChan:
...@@ -108,51 +110,91 @@ func (c *Change) Age() time.Duration { ...@@ -108,51 +110,91 @@ func (c *Change) Age() time.Duration {
// State returns the changes's state. // State returns the changes's state.
func (c *Change) State() ppb.Change_State { func (c *Change) State() ppb.Change_State {
c.requestState <- true c.stateMu.RLock()
return <-c.stateOut state := c.state
c.stateMu.RUnlock()
return state
} }
func stateManager(ch *Change, timeout time.Duration) (chan<- ppb.Change_State, <-chan ppb.Change_State, chan<- bool, <-chan error) { func stateManager(ctx context.Context, ch *Change, timeout time.Duration) (chan<- ppb.Change_State, <-chan ppb.Change_State, <-chan error) {
stateIn := make(chan ppb.Change_State) stateIn := make(chan ppb.Change_State)
stateOut := make(chan ppb.Change_State) stateOut := make(chan ppb.Change_State)
stateRequest := make(chan bool) // A Goroutine, which is created while a new Change is initialized acts as
// the reciever for errorChan
errChan := make(chan error) errChan := make(chan error)
// create ticker and make it wait for 1 week // create ticker and make it wait for 1 week
// workaround for delayed ticker start and ugly housekeeping // workaround for delayed ticker start and ugly housekeeping
ticker := time.NewTicker(time.Hour * 7 * 24) ticker := time.NewTicker(time.Hour * 7 * 24)
ticker.Stop()
go func() { go func() {
state := ppb.Change_PENDING running:
for { for {
//TODO: priority select? should ticker have priority?
select { select {
case <-ticker.C: case <-ticker.C:
state := ch.State()
if state == ppb.Change_CONFIRMED {
continue
}
err := ch.callback(ch.intendedState, ch.previousState) err := ch.callback(ch.intendedState, ch.previousState)
if err != nil { if err != nil {
state = ppb.Change_INCONSISTENT ch.stateMu.Lock()
errChan <- err ch.state = ppb.Change_INCONSISTENT
ch.stateMu.Unlock()
log.Errorf("change %v timed out", ch.cuid)
log.Error(err)
continue
} }
errChan <- fmt.Errorf("change %v timed out", ch.cuid) // A rollback has been executed and the timer is stopped
ticker.Stop()
// TODO: keep the Change as pending, or remove it?
ch.stateMu.Lock()
ch.state = ppb.Change_PENDING
ch.stateMu.Unlock()
log.Errorf("change %v timed out", ch.cuid)
case s := <-stateIn: case s := <-stateIn:
switch s { switch s {
case ppb.Change_COMMITTED: case ppb.Change_COMMITTED:
state := ch.State()
if state == ppb.Change_COMMITTED || state == ppb.Change_CONFIRMED {
errChan <- fmt.Errorf("change %v already %s", ch.cuid, state.String())
continue
}
// reset ticker to enable activate the change timeout // reset ticker to enable activate the change timeout
ticker.Reset(timeout) ticker.Reset(timeout)
err := ch.callback(ch.previousState, ch.intendedState) err := ch.callback(ch.previousState, ch.intendedState)
if err != nil { if err != nil {
state = ppb.Change_INCONSISTENT ch.stateMu.Lock()
ch.state = ppb.Change_INCONSISTENT
ch.stateMu.Unlock()
errChan <- err errChan <- err
continue continue
} }
state = ppb.Change_COMMITTED ch.stateMu.Lock()
ch.state = ppb.Change_COMMITTED
ch.stateMu.Unlock()
stateOut <- state stateOut <- state
case ppb.Change_CONFIRMED: case ppb.Change_CONFIRMED:
state = ppb.Change_CONFIRMED state := ch.State()
if state != ppb.Change_COMMITTED {
errChan <- fmt.Errorf("cannot confirm uncommitted change %v", ch.cuid)
continue
}
// The change has been confirmed and the timer is stopped,
// since a rollback is not necessary anymore.
ch.stateMu.Lock()
ch.state = ppb.Change_CONFIRMED
ch.stateMu.Unlock()
stateOut <- state stateOut <- state
ch.stateManagerCancel()
} }
case <-stateRequest: case <-ctx.Done():
stateOut <- state ticker.Stop()
break running
} }
} }
log.Info("statemanager routine done for: ", ch.cuid)
}() }()
return stateIn, stateOut, stateRequest, errChan return stateIn, stateOut, errChan
} }
package nucleus package nucleus
import ( import (
"context"
"errors" "errors"
"reflect" "reflect"
"sync" "sync"
...@@ -50,11 +51,12 @@ func TestChange_CommitRollback(t *testing.T) { ...@@ -50,11 +51,12 @@ func TestChange_CommitRollback(t *testing.T) {
return nil return nil
}, },
} }
stateIn, stateOut, requestState, errChan := stateManager(c, time.Millisecond*100) stateManagerCtx, stateManagerCancel := context.WithCancel(context.Background())
stateIn, stateOut, errChan := stateManager(stateManagerCtx, c, time.Millisecond*100)
c.stateIn = stateIn c.stateIn = stateIn
c.stateOut = stateOut c.stateOut = stateOut
c.requestState = requestState
c.errChan = errChan c.errChan = errChan
c.stateManagerCancel = stateManagerCancel
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
...@@ -69,6 +71,7 @@ func TestChange_CommitRollback(t *testing.T) { ...@@ -69,6 +71,7 @@ func TestChange_CommitRollback(t *testing.T) {
t.Errorf("Commit() = %v, want %v", got, want) t.Errorf("Commit() = %v, want %v", got, want)
} }
wg.Wait() wg.Wait()
c.stateManagerCancel()
} }
func TestChange_CommitRollbackError(t *testing.T) { func TestChange_CommitRollbackError(t *testing.T) {
...@@ -76,6 +79,7 @@ func TestChange_CommitRollbackError(t *testing.T) { ...@@ -76,6 +79,7 @@ func TestChange_CommitRollbackError(t *testing.T) {
wg.Add(1) wg.Add(1)
wantErr := false wantErr := false
want := errors.New("this is an expected error") want := errors.New("this is an expected error")
rollbackErrChannel := make(chan error)
c := &Change{ c := &Change{
cuid: cuid, cuid: cuid,
duid: did, duid: did,
...@@ -87,16 +91,17 @@ func TestChange_CommitRollbackError(t *testing.T) { ...@@ -87,16 +91,17 @@ func TestChange_CommitRollbackError(t *testing.T) {
t.Logf("callback in test %v", t.Name()) t.Logf("callback in test %v", t.Name())
switch hostname { switch hostname {
case rollbackHostname: case rollbackHostname:
return errors.New("this is an expected error") rollbackErrChannel <- errors.New("this is an expected error")
} }
return nil return nil
}, },
} }
stateIn, stateOut, requestState, errChan := stateManager(c, time.Millisecond*100) stateManagerCtx, stateManagerCancel := context.WithCancel(context.Background())
stateIn, stateOut, errChan := stateManager(stateManagerCtx, c, time.Millisecond*100)
c.stateIn = stateIn c.stateIn = stateIn
c.stateOut = stateOut c.stateOut = stateOut
c.requestState = requestState
c.errChan = errChan c.errChan = errChan
c.stateManagerCancel = stateManagerCancel
go func() { go func() {
defer wg.Done() defer wg.Done()
...@@ -106,11 +111,12 @@ func TestChange_CommitRollbackError(t *testing.T) { ...@@ -106,11 +111,12 @@ func TestChange_CommitRollbackError(t *testing.T) {
} }
time.Sleep(config.ChangeTimeout) time.Sleep(config.ChangeTimeout)
}() }()
got := <-c.errChan got := <-rollbackErrChannel
if !reflect.DeepEqual(got, want) { if !reflect.DeepEqual(got, want) {
t.Errorf("Commit() = %v, want %v", got, want) t.Errorf("Commit() = %v, want %v", got, want)
} }
wg.Wait() wg.Wait()
c.stateManagerCancel()
} }
func TestChange_CommitError(t *testing.T) { func TestChange_CommitError(t *testing.T) {
...@@ -125,11 +131,12 @@ func TestChange_CommitError(t *testing.T) { ...@@ -125,11 +131,12 @@ func TestChange_CommitError(t *testing.T) {
return errors.New("this is an expected error") return errors.New("this is an expected error")
}, },
} }
stateIn, stateOut, requestState, errChan := stateManager(c, time.Millisecond*100) stateManagerCtx, stateManagerCancel := context.WithCancel(context.Background())
stateIn, stateOut, errChan := stateManager(stateManagerCtx, c, time.Millisecond*100)
c.stateIn = stateIn c.stateIn = stateIn
c.stateOut = stateOut c.stateOut = stateOut
c.requestState = requestState
c.errChan = errChan c.errChan = errChan
c.stateManagerCancel = stateManagerCancel
time.Sleep(time.Millisecond * 10) time.Sleep(time.Millisecond * 10)
if err := c.Commit(); err == nil { if err := c.Commit(); err == nil {
...@@ -139,6 +146,7 @@ func TestChange_CommitError(t *testing.T) { ...@@ -139,6 +146,7 @@ func TestChange_CommitError(t *testing.T) {
if !reflect.DeepEqual(got, want) { if !reflect.DeepEqual(got, want) {
t.Errorf("Commit() = %v, want %v", got, want) t.Errorf("Commit() = %v, want %v", got, want)
} }
c.stateManagerCancel()
} }
func TestChange_Commit(t *testing.T) { func TestChange_Commit(t *testing.T) {
...@@ -154,11 +162,12 @@ func TestChange_Commit(t *testing.T) { ...@@ -154,11 +162,12 @@ func TestChange_Commit(t *testing.T) {
return nil return nil
}, },
} }
stateIn, stateOut, requestState, errChan := stateManager(c, time.Millisecond*100) stateManagerCtx, stateManagerCancel := context.WithCancel(context.Background())
stateIn, stateOut, errChan := stateManager(stateManagerCtx, c, time.Millisecond*100)
c.stateIn = stateIn c.stateIn = stateIn
c.stateOut = stateOut c.stateOut = stateOut
c.requestState = requestState
c.errChan = errChan c.errChan = errChan
c.stateManagerCancel = stateManagerCancel
if err := c.Commit(); err != nil { if err := c.Commit(); err != nil {
t.Errorf("Commit() error = %v", err) t.Errorf("Commit() error = %v", err)
...@@ -204,11 +213,12 @@ func TestChange_Confirm(t *testing.T) { ...@@ -204,11 +213,12 @@ func TestChange_Confirm(t *testing.T) {
return nil return nil
}, },
} }
stateIn, stateOut, requestState, errChan := stateManager(c, time.Millisecond*100) stateManagerCtx, stateManagerCancel := context.WithCancel(context.Background())
stateIn, stateOut, errChan := stateManager(stateManagerCtx, c, time.Millisecond*100)
c.stateIn = stateIn c.stateIn = stateIn
c.stateOut = stateOut c.stateOut = stateOut
c.requestState = requestState
c.errChan = errChan c.errChan = errChan
c.stateManagerCancel = stateManagerCancel
if tt.name == "committed" { if tt.name == "committed" {
if err := c.Commit(); err != nil { if err := c.Commit(); err != nil {
...@@ -218,6 +228,7 @@ func TestChange_Confirm(t *testing.T) { ...@@ -218,6 +228,7 @@ func TestChange_Confirm(t *testing.T) {
if err := c.Confirm(); (err != nil) != tt.wantErr { if err := c.Confirm(); (err != nil) != tt.wantErr {
t.Errorf("Confirm() error = %v, wantErr %v", err, tt.wantErr) t.Errorf("Confirm() error = %v, wantErr %v", err, tt.wantErr)
} }
c.stateManagerCancel()
}) })
} }
} }
...@@ -269,8 +280,9 @@ func TestChange_State(t *testing.T) { ...@@ -269,8 +280,9 @@ func TestChange_State(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
testName := t.Name()
callback := func(first ygot.GoStruct, second ygot.GoStruct) error { callback := func(first ygot.GoStruct, second ygot.GoStruct) error {
t.Logf("callback in test %v", t.Name()) t.Logf("callback in test %v", testName)
return nil return nil
} }
c := NewChange(did, rollbackDevice, commitDevice, callback) c := NewChange(did, rollbackDevice, commitDevice, callback)
...@@ -287,6 +299,7 @@ func TestChange_State(t *testing.T) { ...@@ -287,6 +299,7 @@ func TestChange_State(t *testing.T) {
if got := c.State(); !reflect.DeepEqual(got, tt.want) { if got := c.State(); !reflect.DeepEqual(got, tt.want) {
t.Errorf("Change.State() = %v, want %v", got, tt.want) t.Errorf("Change.State() = %v, want %v", got, tt.want)
} }
c.stateManagerCancel()
}) })
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment