Skip to content
Snippets Groups Projects
Commit bdfd8419 authored by Malte Bauch's avatar Malte Bauch
Browse files

reverted some changes, removed testcase and added mutex

Some of the previous edits have been undone, since no individual
channels should be needed.
Since the state is now returned directly, it is protected by a mutex.
Testcase was not working and has been removed. Multiple simultaneous
requests are handled by channels therefore synchronized communication
should be ensured.

Addresses: #152
parent b8cc1c9f
No related branches found
No related tags found
1 merge request!218Resolve "Internal Changes representation is not working as expected"
Pipeline #87852 failed
......@@ -3,6 +3,7 @@ package nucleus
import (
"fmt"
"os"
"sync"
"time"
ppb "code.fbi.h-da.de/danet/api/go/gosdn/pnd"
......@@ -14,12 +15,6 @@ import (
var changeTimeout time.Duration
type stateInput struct {
state ppb.Change_State
stateChan chan ppb.Change_State
errChan chan error
}
func init() {
var err error
e := os.Getenv("GOSDN_CHANGE_TIMEOUT")
......@@ -48,15 +43,17 @@ func NewChange(device uuid.UUID, currentState ygot.GoStruct, change ygot.GoStruc
intendedState: change,
callback: callback,
}
stateIn, errChan := stateManager(c, changeTimeout)
stateIn, stateOut, internErrChan, errChan := stateManager(c, changeTimeout)
c.stateIn = stateIn
c.stateOut = stateOut
c.errChan = errChan
c.internErrChan = internErrChan
// recieve the errors from a Change and log them within the controller
go func() {
for {
errChan := <-c.errChan
log.Error(errChan)
err := <-c.internErrChan
log.Error(err)
}
}()
......@@ -75,9 +72,11 @@ type Change struct {
previousState ygot.GoStruct
intendedState ygot.GoStruct
callback func(ygot.GoStruct, ygot.GoStruct) error
lock sync.Mutex
errChan <-chan error
requestState chan<- chan ppb.Change_State
stateIn chan<- *stateInput
internErrChan <-chan error
stateIn chan<- ppb.Change_State
stateOut <-chan ppb.Change_State
}
// ID returns the Change's UUID
......@@ -89,33 +88,22 @@ func (c *Change) ID() uuid.UUID {
// and starts the timeout-timer for the Change. If the timer expires
// the change is rolled back.
func (c *Change) Commit() error {
si := &stateInput{
ppb.Change_COMMITTED,
make(chan ppb.Change_State),
make(chan error),
}
c.stateIn <- si
c.stateIn <- ppb.Change_COMMITTED
select {
case err := <-si.errChan:
case err := <-c.errChan:
return err
case <-si.stateChan:
case <-c.stateOut:
return nil
}
}
// Confirm confirms a committed Change and stops the rollback timer.
func (c *Change) Confirm() error {
si := &stateInput{
ppb.Change_CONFIRMED,
make(chan ppb.Change_State),
make(chan error),
}
c.stateIn <- si
c.stateIn <- ppb.Change_CONFIRMED
select {
case err := <-si.errChan:
case err := <-c.errChan:
return err
case <-si.stateChan:
case <-c.stateOut:
return nil
}
}
......@@ -127,14 +115,19 @@ func (c *Change) Age() time.Duration {
// State returns the changes's state.
func (c *Change) State() ppb.Change_State {
return c.state
c.lock.Lock()
state := c.state
c.lock.Unlock()
return state
}
func stateManager(ch *Change, timeout time.Duration) (chan<- *stateInput, <-chan error) {
stateIn := make(chan *stateInput)
func stateManager(ch *Change, timeout time.Duration) (chan<- ppb.Change_State, <-chan ppb.Change_State, <-chan error, <-chan error) {
stateIn := make(chan ppb.Change_State)
stateOut := make(chan ppb.Change_State)
// A Goroutine, which is created while a new Change is initialized acts as
// the reciever for errorChan
errChan := make(chan error)
internErrChan := make(chan error)
// create ticker and make it wait for 1 week
// workaround for delayed ticker start and ugly housekeeping
ticker := time.NewTicker(time.Hour * 7 * 24)
......@@ -150,46 +143,56 @@ func stateManager(ch *Change, timeout time.Duration) (chan<- *stateInput, <-chan
}
err := ch.callback(ch.intendedState, ch.previousState)
if err != nil {
ch.lock.Lock()
ch.state = ppb.Change_INCONSISTENT
ch.lock.Unlock()
log.Errorf("change %v timed out", ch.cuid)
errChan <- err
internErrChan <- err
continue
}
// A rollback has been executed and the timer is stopped
ticker.Stop()
// TODO: keep the Change as pending, or remove it?
ch.lock.Lock()
ch.state = ppb.Change_PENDING
errChan <- fmt.Errorf("change %v timed out", ch.cuid)
ch.lock.Unlock()
internErrChan <- fmt.Errorf("change %v timed out", ch.cuid)
case s := <-stateIn:
switch s.state {
switch s {
case ppb.Change_COMMITTED:
if ch.state == ppb.Change_COMMITTED {
s.errChan <- fmt.Errorf("change %v already committed", ch.cuid)
errChan <- fmt.Errorf("change %v already committed", ch.cuid)
continue
}
// reset ticker to enable activate the change timeout
ticker.Reset(timeout)
err := ch.callback(ch.previousState, ch.intendedState)
if err != nil {
ch.lock.Lock()
ch.state = ppb.Change_INCONSISTENT
s.errChan <- err
ch.lock.Unlock()
errChan <- err
continue
}
ch.lock.Lock()
ch.state = ppb.Change_COMMITTED
s.stateChan <- ch.state
ch.lock.Unlock()
stateOut <- ch.state
case ppb.Change_CONFIRMED:
if ch.state != ppb.Change_COMMITTED {
s.errChan <- fmt.Errorf("cannot confirm uncommitted change %v", ch.cuid)
errChan <- fmt.Errorf("cannot confirm uncommitted change %v", ch.cuid)
continue
}
// The change has been confirmed and the timer is stopped,
// since a rollback is not necessary anymore.
ticker.Stop()
ch.lock.Lock()
ch.state = ppb.Change_CONFIRMED
s.stateChan <- ch.state
ch.lock.Unlock()
stateOut <- ch.state
}
}
}
}()
return stateIn, errChan
return stateIn, stateOut, internErrChan, errChan
}
......@@ -2,7 +2,6 @@ package nucleus
import (
"errors"
"fmt"
"reflect"
"sync"
"testing"
......@@ -51,10 +50,11 @@ func TestChange_CommitRollback(t *testing.T) {
return nil
},
}
stateIn, requestState, errChan := stateManager(c, time.Millisecond*100)
stateIn, stateOut, internErrChan, errChan := stateManager(c, time.Millisecond*100)
c.stateIn = stateIn
c.requestState = requestState
c.stateOut = stateOut
c.errChan = errChan
c.internErrChan = internErrChan
wg.Add(1)
go func() {
defer wg.Done()
......@@ -92,10 +92,11 @@ func TestChange_CommitRollbackError(t *testing.T) {
return nil
},
}
stateIn, requestState, errChan := stateManager(c, time.Millisecond*100)
stateIn, stateOut, internErrChan, errChan := stateManager(c, time.Millisecond*100)
c.stateIn = stateIn
c.requestState = requestState
c.stateOut = stateOut
c.errChan = errChan
c.internErrChan = internErrChan
go func() {
defer wg.Done()
......@@ -105,7 +106,7 @@ func TestChange_CommitRollbackError(t *testing.T) {
}
time.Sleep(config.ChangeTimeout)
}()
got := <-c.errChan
got := <-c.internErrChan
if !reflect.DeepEqual(got, want) {
t.Errorf("Commit() = %v, want %v", got, want)
}
......@@ -124,10 +125,11 @@ func TestChange_CommitError(t *testing.T) {
return errors.New("this is an expected error")
},
}
stateIn, requestState, errChan := stateManager(c, time.Millisecond*100)
stateIn, stateOut, internErrChan, errChan := stateManager(c, time.Millisecond*100)
c.stateIn = stateIn
c.requestState = requestState
c.stateOut = stateOut
c.errChan = errChan
c.internErrChan = internErrChan
time.Sleep(time.Millisecond * 10)
if err := c.Commit(); err == nil {
......@@ -152,10 +154,11 @@ func TestChange_Commit(t *testing.T) {
return nil
},
}
stateIn, requestState, errChan := stateManager(c, time.Millisecond*100)
stateIn, stateOut, internErrChan, errChan := stateManager(c, time.Millisecond*100)
c.stateIn = stateIn
c.requestState = requestState
c.stateOut = stateOut
c.errChan = errChan
c.internErrChan = internErrChan
if err := c.Commit(); err != nil {
t.Errorf("Commit() error = %v", err)
......@@ -201,10 +204,11 @@ func TestChange_Confirm(t *testing.T) {
return nil
},
}
stateIn, requestState, errChan := stateManager(c, time.Millisecond*100)
stateIn, stateOut, internErrChan, errChan := stateManager(c, time.Millisecond*100)
c.stateIn = stateIn
c.requestState = requestState
c.stateOut = stateOut
c.errChan = errChan
c.internErrChan = internErrChan
if tt.name == "committed" {
if err := c.Commit(); err != nil {
......@@ -286,51 +290,3 @@ func TestChange_State(t *testing.T) {
})
}
}
//TODO: cleanup
func TestChange_MultipleCommitsAtTheSameTime(t *testing.T) {
wg := sync.WaitGroup{}
testErrors := make(chan error)
wantErr := true
c := &Change{
cuid: cuid,
duid: did,
timestamp: time.Now(),
previousState: rollbackDevice,
intendedState: commitDevice,
callback: func(first ygot.GoStruct, second ygot.GoStruct) error {
wg.Done()
time.Sleep(time.Millisecond * 10)
t.Logf("callback in test %v", t.Name())
return nil
},
}
stateIn, requestState, errChan := stateManager(c, time.Millisecond*100)
c.stateIn = stateIn
c.requestState = requestState
c.errChan = errChan
want := fmt.Errorf("change %v already committed", c.cuid)
for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
testErrors <- c.Commit()
}()
wg.Wait()
}
errFirstCommit := <-testErrors
errSecondCommit := <-testErrors
close(testErrors)
if errFirstCommit != nil {
t.Errorf("First Commit() error = %v", errFirstCommit)
}
if (errSecondCommit != nil) != wantErr {
t.Errorf("Second Commit() error = %v, wantErr %v", errSecondCommit, wantErr)
}
if errSecondCommit.Error() != want.Error() {
t.Errorf(" = %v, want %v", errSecondCommit, want)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment