Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
goSDN
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Iterations
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Build
Pipelines
Jobs
Pipeline schedules
Test cases
Artifacts
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Terraform modules
Analyze
Contributor analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
danet
goSDN
Commits
95843561
Commit
95843561
authored
3 years ago
by
Malte Bauch
Browse files
Options
Downloads
Patches
Plain Diff
Resolve "Internal Changes representation is not working as expected"
parent
9b843f04
Branches
Branches containing commit
Tags
Tags containing commit
6 merge requests
!246
Develop
,
!245
Develop into Master
,
!244
Master into develop2 into master
,
!228
Merge develop into stmakurz_http_server
,
!218
Resolve "Internal Changes representation is not working as expected"
,
!138
Develop
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
nucleus/change.go
+74
-32
74 additions, 32 deletions
nucleus/change.go
nucleus/change_test.go
+26
-13
26 additions, 13 deletions
nucleus/change_test.go
with
100 additions
and
45 deletions
nucleus/change.go
+
74
−
32
View file @
95843561
package
nucleus
package
nucleus
import
(
import
(
"context"
"fmt"
"fmt"
"os"
"os"
"sync"
"time"
"time"
ppb
"code.fbi.h-da.de/danet/api/go/gosdn/pnd"
ppb
"code.fbi.h-da.de/danet/api/go/gosdn/pnd"
...
@@ -36,16 +38,18 @@ func NewChange(device uuid.UUID, currentState ygot.GoStruct, change ygot.GoStruc
...
@@ -36,16 +38,18 @@ func NewChange(device uuid.UUID, currentState ygot.GoStruct, change ygot.GoStruc
c
:=
&
Change
{
c
:=
&
Change
{
cuid
:
uuid
.
New
(),
cuid
:
uuid
.
New
(),
duid
:
device
,
duid
:
device
,
state
:
ppb
.
Change_PENDING
,
timestamp
:
time
.
Now
(),
timestamp
:
time
.
Now
(),
previousState
:
currentState
,
previousState
:
currentState
,
intendedState
:
change
,
intendedState
:
change
,
callback
:
callback
,
callback
:
callback
,
}
}
stateIn
,
stateOut
,
requestState
,
errChan
:=
stateManager
(
c
,
changeTimeout
)
stateManagerCtx
,
stateManagerCancel
:=
context
.
WithCancel
(
context
.
Background
())
stateIn
,
stateOut
,
errChan
:=
stateManager
(
stateManagerCtx
,
c
,
changeTimeout
)
c
.
stateIn
=
stateIn
c
.
stateIn
=
stateIn
c
.
stateOut
=
stateOut
c
.
stateOut
=
stateOut
c
.
requestState
=
requestState
c
.
errChan
=
errChan
c
.
errChan
=
errChan
c
.
stateManagerCancel
=
stateManagerCancel
return
c
return
c
}
}
...
@@ -54,16 +58,18 @@ func NewChange(device uuid.UUID, currentState ygot.GoStruct, change ygot.GoStruc
...
@@ -54,16 +58,18 @@ func NewChange(device uuid.UUID, currentState ygot.GoStruct, change ygot.GoStruc
// state. It keeps track if the state is committed and confirmed. A callback
// state. It keeps track if the state is committed and confirmed. A callback
// exists to acess the proper transport for the changed OND
// exists to acess the proper transport for the changed OND
type
Change
struct
{
type
Change
struct
{
cuid
uuid
.
UUID
cuid
uuid
.
UUID
duid
uuid
.
UUID
duid
uuid
.
UUID
timestamp
time
.
Time
state
ppb
.
Change_State
previousState
ygot
.
GoStruct
timestamp
time
.
Time
intendedState
ygot
.
GoStruct
previousState
ygot
.
GoStruct
callback
func
(
ygot
.
GoStruct
,
ygot
.
GoStruct
)
error
intendedState
ygot
.
GoStruct
errChan
<-
chan
error
callback
func
(
ygot
.
GoStruct
,
ygot
.
GoStruct
)
error
requestState
chan
<-
bool
stateMu
sync
.
RWMutex
stateIn
chan
<-
ppb
.
Change_State
errChan
<-
chan
error
stateOut
<-
chan
ppb
.
Change_State
stateIn
chan
<-
ppb
.
Change_State
stateOut
<-
chan
ppb
.
Change_State
stateManagerCancel
context
.
CancelFunc
}
}
// ID returns the Change's UUID
// ID returns the Change's UUID
...
@@ -75,9 +81,7 @@ func (c *Change) ID() uuid.UUID {
...
@@ -75,9 +81,7 @@ func (c *Change) ID() uuid.UUID {
// and starts the timeout-timer for the Change. If the timer expires
// and starts the timeout-timer for the Change. If the timer expires
// the change is rolled back.
// the change is rolled back.
func
(
c
*
Change
)
Commit
()
error
{
func
(
c
*
Change
)
Commit
()
error
{
if
c
.
State
()
==
ppb
.
Change_COMMITTED
{
//TODO: check if already commited
return
fmt
.
Errorf
(
"change %v already committed"
,
c
.
cuid
)
}
c
.
stateIn
<-
ppb
.
Change_COMMITTED
c
.
stateIn
<-
ppb
.
Change_COMMITTED
select
{
select
{
case
err
:=
<-
c
.
errChan
:
case
err
:=
<-
c
.
errChan
:
...
@@ -89,9 +93,7 @@ func (c *Change) Commit() error {
...
@@ -89,9 +93,7 @@ func (c *Change) Commit() error {
// Confirm confirms a committed Change and stops the rollback timer.
// Confirm confirms a committed Change and stops the rollback timer.
func
(
c
*
Change
)
Confirm
()
error
{
func
(
c
*
Change
)
Confirm
()
error
{
if
c
.
State
()
!=
ppb
.
Change_COMMITTED
{
//TODO: check if already confirmed
return
fmt
.
Errorf
(
"cannot confirm uncommitted change %v"
,
c
.
cuid
)
}
c
.
stateIn
<-
ppb
.
Change_CONFIRMED
c
.
stateIn
<-
ppb
.
Change_CONFIRMED
select
{
select
{
case
err
:=
<-
c
.
errChan
:
case
err
:=
<-
c
.
errChan
:
...
@@ -108,51 +110,91 @@ func (c *Change) Age() time.Duration {
...
@@ -108,51 +110,91 @@ func (c *Change) Age() time.Duration {
// State returns the changes's state.
// State returns the changes's state.
func
(
c
*
Change
)
State
()
ppb
.
Change_State
{
func
(
c
*
Change
)
State
()
ppb
.
Change_State
{
c
.
requestState
<-
true
c
.
stateMu
.
RLock
()
return
<-
c
.
stateOut
state
:=
c
.
state
c
.
stateMu
.
RUnlock
()
return
state
}
}
func
stateManager
(
ch
*
Change
,
timeout
time
.
Duration
)
(
chan
<-
ppb
.
Change_State
,
<-
chan
ppb
.
Change_State
,
chan
<-
bool
,
<-
chan
error
)
{
func
stateManager
(
ctx
context
.
Context
,
ch
*
Change
,
timeout
time
.
Duration
)
(
chan
<-
ppb
.
Change_State
,
<-
chan
ppb
.
Change_State
,
<-
chan
error
)
{
stateIn
:=
make
(
chan
ppb
.
Change_State
)
stateIn
:=
make
(
chan
ppb
.
Change_State
)
stateOut
:=
make
(
chan
ppb
.
Change_State
)
stateOut
:=
make
(
chan
ppb
.
Change_State
)
stateRequest
:=
make
(
chan
bool
)
// A Goroutine, which is created while a new Change is initialized acts as
// the reciever for errorChan
errChan
:=
make
(
chan
error
)
errChan
:=
make
(
chan
error
)
// create ticker and make it wait for 1 week
// create ticker and make it wait for 1 week
// workaround for delayed ticker start and ugly housekeeping
// workaround for delayed ticker start and ugly housekeeping
ticker
:=
time
.
NewTicker
(
time
.
Hour
*
7
*
24
)
ticker
:=
time
.
NewTicker
(
time
.
Hour
*
7
*
24
)
ticker
.
Stop
()
go
func
()
{
go
func
()
{
state
:=
ppb
.
Change_PENDING
running
:
for
{
for
{
//TODO: priority select? should ticker have priority?
select
{
select
{
case
<-
ticker
.
C
:
case
<-
ticker
.
C
:
state
:=
ch
.
State
()
if
state
==
ppb
.
Change_CONFIRMED
{
continue
}
err
:=
ch
.
callback
(
ch
.
intendedState
,
ch
.
previousState
)
err
:=
ch
.
callback
(
ch
.
intendedState
,
ch
.
previousState
)
if
err
!=
nil
{
if
err
!=
nil
{
state
=
ppb
.
Change_INCONSISTENT
ch
.
stateMu
.
Lock
()
errChan
<-
err
ch
.
state
=
ppb
.
Change_INCONSISTENT
ch
.
stateMu
.
Unlock
()
log
.
Errorf
(
"change %v timed out"
,
ch
.
cuid
)
log
.
Error
(
err
)
continue
}
}
errChan
<-
fmt
.
Errorf
(
"change %v timed out"
,
ch
.
cuid
)
// A rollback has been executed and the timer is stopped
ticker
.
Stop
()
// TODO: keep the Change as pending, or remove it?
ch
.
stateMu
.
Lock
()
ch
.
state
=
ppb
.
Change_PENDING
ch
.
stateMu
.
Unlock
()
log
.
Errorf
(
"change %v timed out"
,
ch
.
cuid
)
case
s
:=
<-
stateIn
:
case
s
:=
<-
stateIn
:
switch
s
{
switch
s
{
case
ppb
.
Change_COMMITTED
:
case
ppb
.
Change_COMMITTED
:
state
:=
ch
.
State
()
if
state
==
ppb
.
Change_COMMITTED
||
state
==
ppb
.
Change_CONFIRMED
{
errChan
<-
fmt
.
Errorf
(
"change %v already %s"
,
ch
.
cuid
,
state
.
String
())
continue
}
// reset ticker to enable activate the change timeout
// reset ticker to enable activate the change timeout
ticker
.
Reset
(
timeout
)
ticker
.
Reset
(
timeout
)
err
:=
ch
.
callback
(
ch
.
previousState
,
ch
.
intendedState
)
err
:=
ch
.
callback
(
ch
.
previousState
,
ch
.
intendedState
)
if
err
!=
nil
{
if
err
!=
nil
{
state
=
ppb
.
Change_INCONSISTENT
ch
.
stateMu
.
Lock
()
ch
.
state
=
ppb
.
Change_INCONSISTENT
ch
.
stateMu
.
Unlock
()
errChan
<-
err
errChan
<-
err
continue
continue
}
}
state
=
ppb
.
Change_COMMITTED
ch
.
stateMu
.
Lock
()
ch
.
state
=
ppb
.
Change_COMMITTED
ch
.
stateMu
.
Unlock
()
stateOut
<-
state
stateOut
<-
state
case
ppb
.
Change_CONFIRMED
:
case
ppb
.
Change_CONFIRMED
:
state
=
ppb
.
Change_CONFIRMED
state
:=
ch
.
State
()
if
state
!=
ppb
.
Change_COMMITTED
{
errChan
<-
fmt
.
Errorf
(
"cannot confirm uncommitted change %v"
,
ch
.
cuid
)
continue
}
// The change has been confirmed and the timer is stopped,
// since a rollback is not necessary anymore.
ch
.
stateMu
.
Lock
()
ch
.
state
=
ppb
.
Change_CONFIRMED
ch
.
stateMu
.
Unlock
()
stateOut
<-
state
stateOut
<-
state
ch
.
stateManagerCancel
()
}
}
case
<-
stateRequest
:
case
<-
ctx
.
Done
()
:
stateOut
<-
state
ticker
.
Stop
()
break
running
}
}
}
}
log
.
Info
(
"statemanager routine done for: "
,
ch
.
cuid
)
}()
}()
return
stateIn
,
stateOut
,
stateRequest
,
errChan
return
stateIn
,
stateOut
,
errChan
}
}
This diff is collapsed.
Click to expand it.
nucleus/change_test.go
+
26
−
13
View file @
95843561
package
nucleus
package
nucleus
import
(
import
(
"context"
"errors"
"errors"
"reflect"
"reflect"
"sync"
"sync"
...
@@ -50,11 +51,12 @@ func TestChange_CommitRollback(t *testing.T) {
...
@@ -50,11 +51,12 @@ func TestChange_CommitRollback(t *testing.T) {
return
nil
return
nil
},
},
}
}
stateIn
,
stateOut
,
requestState
,
errChan
:=
stateManager
(
c
,
time
.
Millisecond
*
100
)
stateManagerCtx
,
stateManagerCancel
:=
context
.
WithCancel
(
context
.
Background
())
stateIn
,
stateOut
,
errChan
:=
stateManager
(
stateManagerCtx
,
c
,
time
.
Millisecond
*
100
)
c
.
stateIn
=
stateIn
c
.
stateIn
=
stateIn
c
.
stateOut
=
stateOut
c
.
stateOut
=
stateOut
c
.
requestState
=
requestState
c
.
errChan
=
errChan
c
.
errChan
=
errChan
c
.
stateManagerCancel
=
stateManagerCancel
wg
.
Add
(
1
)
wg
.
Add
(
1
)
go
func
()
{
go
func
()
{
defer
wg
.
Done
()
defer
wg
.
Done
()
...
@@ -69,6 +71,7 @@ func TestChange_CommitRollback(t *testing.T) {
...
@@ -69,6 +71,7 @@ func TestChange_CommitRollback(t *testing.T) {
t
.
Errorf
(
"Commit() = %v, want %v"
,
got
,
want
)
t
.
Errorf
(
"Commit() = %v, want %v"
,
got
,
want
)
}
}
wg
.
Wait
()
wg
.
Wait
()
c
.
stateManagerCancel
()
}
}
func
TestChange_CommitRollbackError
(
t
*
testing
.
T
)
{
func
TestChange_CommitRollbackError
(
t
*
testing
.
T
)
{
...
@@ -76,6 +79,7 @@ func TestChange_CommitRollbackError(t *testing.T) {
...
@@ -76,6 +79,7 @@ func TestChange_CommitRollbackError(t *testing.T) {
wg
.
Add
(
1
)
wg
.
Add
(
1
)
wantErr
:=
false
wantErr
:=
false
want
:=
errors
.
New
(
"this is an expected error"
)
want
:=
errors
.
New
(
"this is an expected error"
)
rollbackErrChannel
:=
make
(
chan
error
)
c
:=
&
Change
{
c
:=
&
Change
{
cuid
:
cuid
,
cuid
:
cuid
,
duid
:
did
,
duid
:
did
,
...
@@ -87,16 +91,17 @@ func TestChange_CommitRollbackError(t *testing.T) {
...
@@ -87,16 +91,17 @@ func TestChange_CommitRollbackError(t *testing.T) {
t
.
Logf
(
"callback in test %v"
,
t
.
Name
())
t
.
Logf
(
"callback in test %v"
,
t
.
Name
())
switch
hostname
{
switch
hostname
{
case
rollbackHostname
:
case
rollbackHostname
:
r
eturn
errors
.
New
(
"this is an expected error"
)
r
ollbackErrChannel
<-
errors
.
New
(
"this is an expected error"
)
}
}
return
nil
return
nil
},
},
}
}
stateIn
,
stateOut
,
requestState
,
errChan
:=
stateManager
(
c
,
time
.
Millisecond
*
100
)
stateManagerCtx
,
stateManagerCancel
:=
context
.
WithCancel
(
context
.
Background
())
stateIn
,
stateOut
,
errChan
:=
stateManager
(
stateManagerCtx
,
c
,
time
.
Millisecond
*
100
)
c
.
stateIn
=
stateIn
c
.
stateIn
=
stateIn
c
.
stateOut
=
stateOut
c
.
stateOut
=
stateOut
c
.
requestState
=
requestState
c
.
errChan
=
errChan
c
.
errChan
=
errChan
c
.
stateManagerCancel
=
stateManagerCancel
go
func
()
{
go
func
()
{
defer
wg
.
Done
()
defer
wg
.
Done
()
...
@@ -106,11 +111,12 @@ func TestChange_CommitRollbackError(t *testing.T) {
...
@@ -106,11 +111,12 @@ func TestChange_CommitRollbackError(t *testing.T) {
}
}
time
.
Sleep
(
config
.
ChangeTimeout
)
time
.
Sleep
(
config
.
ChangeTimeout
)
}()
}()
got
:=
<-
c
.
e
rrChan
got
:=
<-
rollbackE
rrChan
nel
if
!
reflect
.
DeepEqual
(
got
,
want
)
{
if
!
reflect
.
DeepEqual
(
got
,
want
)
{
t
.
Errorf
(
"Commit() = %v, want %v"
,
got
,
want
)
t
.
Errorf
(
"Commit() = %v, want %v"
,
got
,
want
)
}
}
wg
.
Wait
()
wg
.
Wait
()
c
.
stateManagerCancel
()
}
}
func
TestChange_CommitError
(
t
*
testing
.
T
)
{
func
TestChange_CommitError
(
t
*
testing
.
T
)
{
...
@@ -125,11 +131,12 @@ func TestChange_CommitError(t *testing.T) {
...
@@ -125,11 +131,12 @@ func TestChange_CommitError(t *testing.T) {
return
errors
.
New
(
"this is an expected error"
)
return
errors
.
New
(
"this is an expected error"
)
},
},
}
}
stateIn
,
stateOut
,
requestState
,
errChan
:=
stateManager
(
c
,
time
.
Millisecond
*
100
)
stateManagerCtx
,
stateManagerCancel
:=
context
.
WithCancel
(
context
.
Background
())
stateIn
,
stateOut
,
errChan
:=
stateManager
(
stateManagerCtx
,
c
,
time
.
Millisecond
*
100
)
c
.
stateIn
=
stateIn
c
.
stateIn
=
stateIn
c
.
stateOut
=
stateOut
c
.
stateOut
=
stateOut
c
.
requestState
=
requestState
c
.
errChan
=
errChan
c
.
errChan
=
errChan
c
.
stateManagerCancel
=
stateManagerCancel
time
.
Sleep
(
time
.
Millisecond
*
10
)
time
.
Sleep
(
time
.
Millisecond
*
10
)
if
err
:=
c
.
Commit
();
err
==
nil
{
if
err
:=
c
.
Commit
();
err
==
nil
{
...
@@ -139,6 +146,7 @@ func TestChange_CommitError(t *testing.T) {
...
@@ -139,6 +146,7 @@ func TestChange_CommitError(t *testing.T) {
if
!
reflect
.
DeepEqual
(
got
,
want
)
{
if
!
reflect
.
DeepEqual
(
got
,
want
)
{
t
.
Errorf
(
"Commit() = %v, want %v"
,
got
,
want
)
t
.
Errorf
(
"Commit() = %v, want %v"
,
got
,
want
)
}
}
c
.
stateManagerCancel
()
}
}
func
TestChange_Commit
(
t
*
testing
.
T
)
{
func
TestChange_Commit
(
t
*
testing
.
T
)
{
...
@@ -154,11 +162,12 @@ func TestChange_Commit(t *testing.T) {
...
@@ -154,11 +162,12 @@ func TestChange_Commit(t *testing.T) {
return
nil
return
nil
},
},
}
}
stateIn
,
stateOut
,
requestState
,
errChan
:=
stateManager
(
c
,
time
.
Millisecond
*
100
)
stateManagerCtx
,
stateManagerCancel
:=
context
.
WithCancel
(
context
.
Background
())
stateIn
,
stateOut
,
errChan
:=
stateManager
(
stateManagerCtx
,
c
,
time
.
Millisecond
*
100
)
c
.
stateIn
=
stateIn
c
.
stateIn
=
stateIn
c
.
stateOut
=
stateOut
c
.
stateOut
=
stateOut
c
.
requestState
=
requestState
c
.
errChan
=
errChan
c
.
errChan
=
errChan
c
.
stateManagerCancel
=
stateManagerCancel
if
err
:=
c
.
Commit
();
err
!=
nil
{
if
err
:=
c
.
Commit
();
err
!=
nil
{
t
.
Errorf
(
"Commit() error = %v"
,
err
)
t
.
Errorf
(
"Commit() error = %v"
,
err
)
...
@@ -204,11 +213,12 @@ func TestChange_Confirm(t *testing.T) {
...
@@ -204,11 +213,12 @@ func TestChange_Confirm(t *testing.T) {
return
nil
return
nil
},
},
}
}
stateIn
,
stateOut
,
requestState
,
errChan
:=
stateManager
(
c
,
time
.
Millisecond
*
100
)
stateManagerCtx
,
stateManagerCancel
:=
context
.
WithCancel
(
context
.
Background
())
stateIn
,
stateOut
,
errChan
:=
stateManager
(
stateManagerCtx
,
c
,
time
.
Millisecond
*
100
)
c
.
stateIn
=
stateIn
c
.
stateIn
=
stateIn
c
.
stateOut
=
stateOut
c
.
stateOut
=
stateOut
c
.
requestState
=
requestState
c
.
errChan
=
errChan
c
.
errChan
=
errChan
c
.
stateManagerCancel
=
stateManagerCancel
if
tt
.
name
==
"committed"
{
if
tt
.
name
==
"committed"
{
if
err
:=
c
.
Commit
();
err
!=
nil
{
if
err
:=
c
.
Commit
();
err
!=
nil
{
...
@@ -218,6 +228,7 @@ func TestChange_Confirm(t *testing.T) {
...
@@ -218,6 +228,7 @@ func TestChange_Confirm(t *testing.T) {
if
err
:=
c
.
Confirm
();
(
err
!=
nil
)
!=
tt
.
wantErr
{
if
err
:=
c
.
Confirm
();
(
err
!=
nil
)
!=
tt
.
wantErr
{
t
.
Errorf
(
"Confirm() error = %v, wantErr %v"
,
err
,
tt
.
wantErr
)
t
.
Errorf
(
"Confirm() error = %v, wantErr %v"
,
err
,
tt
.
wantErr
)
}
}
c
.
stateManagerCancel
()
})
})
}
}
}
}
...
@@ -269,8 +280,9 @@ func TestChange_State(t *testing.T) {
...
@@ -269,8 +280,9 @@ func TestChange_State(t *testing.T) {
}
}
for
_
,
tt
:=
range
tests
{
for
_
,
tt
:=
range
tests
{
t
.
Run
(
tt
.
name
,
func
(
t
*
testing
.
T
)
{
t
.
Run
(
tt
.
name
,
func
(
t
*
testing
.
T
)
{
testName
:=
t
.
Name
()
callback
:=
func
(
first
ygot
.
GoStruct
,
second
ygot
.
GoStruct
)
error
{
callback
:=
func
(
first
ygot
.
GoStruct
,
second
ygot
.
GoStruct
)
error
{
t
.
Logf
(
"callback in test %v"
,
t
.
Name
()
)
t
.
Logf
(
"callback in test %v"
,
t
est
Name
)
return
nil
return
nil
}
}
c
:=
NewChange
(
did
,
rollbackDevice
,
commitDevice
,
callback
)
c
:=
NewChange
(
did
,
rollbackDevice
,
commitDevice
,
callback
)
...
@@ -287,6 +299,7 @@ func TestChange_State(t *testing.T) {
...
@@ -287,6 +299,7 @@ func TestChange_State(t *testing.T) {
if
got
:=
c
.
State
();
!
reflect
.
DeepEqual
(
got
,
tt
.
want
)
{
if
got
:=
c
.
State
();
!
reflect
.
DeepEqual
(
got
,
tt
.
want
)
{
t
.
Errorf
(
"Change.State() = %v, want %v"
,
got
,
tt
.
want
)
t
.
Errorf
(
"Change.State() = %v, want %v"
,
got
,
tt
.
want
)
}
}
c
.
stateManagerCancel
()
})
})
}
}
}
}
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment