Skip to content
Snippets Groups Projects
change.go 5.57 KiB
Newer Older
  • Learn to ignore specific revisions
  • package nucleus
    
    
    import (
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	"fmt"
    
    Andre Sterba's avatar
    Andre Sterba committed
    	"os"
    
    Andre Sterba's avatar
    Andre Sterba committed
    	"time"
    
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	ppb "code.fbi.h-da.de/danet/api/go/gosdn/pnd"
    
    	"github.com/google/uuid"
    	"github.com/openconfig/ygot/ygot"
    	log "github.com/sirupsen/logrus"
    )
    
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    var changeTimeout time.Duration
    
    func init() {
    
    	var err error
    	e := os.Getenv("GOSDN_CHANGE_TIMEOUT")
    	if e != "" {
    		changeTimeout, err = time.ParseDuration(e)
    		if err != nil {
    			log.Fatal(err)
    		}
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    		log.Debugf("change timeout set to %v", changeTimeout)
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	} else {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    		changeTimeout = time.Minute * 10
    
    // NewChange takes a Device UUID, a pair GoStructs (current and intended state),
    // a callback function, and returns a *Change.
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    // The callback function is used by the Commit() and Confirm() functions. It
    // must define how the change is carried out.
    
    func NewChange(device uuid.UUID, currentState ygot.GoStruct, change ygot.GoStruct, callback func(ygot.GoStruct, ygot.GoStruct) error) *Change {
    	c := &Change{
    
    		cuid:          uuid.New(),
    		duid:          device,
    
    		timestamp:     time.Now(),
    		previousState: currentState,
    		intendedState: change,
    		callback:      callback,
    	}
    
    	stateManagerCtx, stateManagerCancel := context.WithCancel(context.Background())
    	stateIn, stateOut, errChan := stateManager(stateManagerCtx, c, changeTimeout)
    
    	c.stateIn = stateIn
    	c.stateOut = stateOut
    	c.errChan = errChan
    
    	c.stateManagerCancel = stateManagerCancel
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    // Change is an intended change to an OND. It is unique and immutable.
    
    // It has a cuid, a timestamp, and holds both the previous and the new
    // state. It keeps track if the state is committed and confirmed. A callback
    // exists to acess the proper transport for the changed OND
    type Change struct {
    
    	cuid               uuid.UUID
    	duid               uuid.UUID
    	state              ppb.Change_State
    	timestamp          time.Time
    	previousState      ygot.GoStruct
    	intendedState      ygot.GoStruct
    	callback           func(ygot.GoStruct, ygot.GoStruct) error
    	stateMu            sync.RWMutex
    	errChan            <-chan error
    	stateIn            chan<- ppb.Change_State
    	stateOut           <-chan ppb.Change_State
    	stateManagerCancel context.CancelFunc
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    // ID returns the Change's UUID
    
    func (c *Change) ID() uuid.UUID {
    	return c.cuid
    }
    
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    // Commit pushes the change to the OND using the callback() function
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    // and starts the timeout-timer for the Change. If the timer expires
    // the change is rolled back.
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    func (c *Change) Commit() error {
    
    	//TODO: check if already commited
    
    	c.stateIn <- ppb.Change_COMMITTED
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	select {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	case err := <-c.errChan:
    
    		return err
    	case <-c.stateOut:
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    		return nil
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    // Confirm confirms a committed Change and stops the rollback timer.
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    func (c *Change) Confirm() error {
    
    	//TODO: check if already confirmed
    
    	c.stateIn <- ppb.Change_CONFIRMED
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	select {
    	case err := <-c.errChan:
    
    		return err
    	case <-c.stateOut:
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    		return nil
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    
    // Age returns the passed time since the Change was created
    func (c *Change) Age() time.Duration {
    	return time.Since(c.timestamp)
    }
    
    // State returns the changes's state.
    func (c *Change) State() ppb.Change_State {
    
    	c.stateMu.RLock()
    	state := c.state
    	c.stateMu.RUnlock()
    	return state
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    }
    
    func stateManager(ctx context.Context, ch *Change, timeout time.Duration) (chan<- ppb.Change_State, <-chan ppb.Change_State, <-chan error) {
    
    	stateIn := make(chan ppb.Change_State)
    	stateOut := make(chan ppb.Change_State)
    
    	// A Goroutine, which is created while a new Change is initialized acts as
    	// the reciever for errorChan
    
    	errChan := make(chan error)
    
    	// create ticker and make it wait for 1 week
    	// workaround for delayed ticker start and ugly housekeeping
    	ticker := time.NewTicker(time.Hour * 7 * 24)
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    
    	go func() {
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    		for {
    
    			//TODO: priority select? should ticker have priority?
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    			select {
    			case <-ticker.C:
    
    				state := ch.State()
    				if state == ppb.Change_CONFIRMED {
    					continue
    				}
    
    				err := ch.callback(ch.intendedState, ch.previousState)
    				if err != nil {
    
    					ch.stateMu.Lock()
    					ch.state = ppb.Change_INCONSISTENT
    					ch.stateMu.Unlock()
    					log.Errorf("change %v timed out", ch.cuid)
    					log.Error(err)
    					continue
    
    				// A rollback has been executed and the timer is stopped
    				ticker.Stop()
    				// TODO: keep the Change as pending, or remove it?
    				ch.stateMu.Lock()
    				ch.state = ppb.Change_PENDING
    				ch.stateMu.Unlock()
    				log.Errorf("change %v timed out", ch.cuid)
    
    			case s := <-stateIn:
    				switch s {
    				case ppb.Change_COMMITTED:
    
    					state := ch.State()
    					if state == ppb.Change_COMMITTED || state == ppb.Change_CONFIRMED {
    						errChan <- fmt.Errorf("change %v already %s", ch.cuid, state.String())
    						continue
    					}
    
    					// reset ticker to enable activate the change timeout
    					ticker.Reset(timeout)
    
    					err := ch.callback(ch.previousState, ch.intendedState)
    					if err != nil {
    
    						ch.stateMu.Lock()
    						ch.state = ppb.Change_INCONSISTENT
    						ch.stateMu.Unlock()
    
    						errChan <- err
    						continue
    					}
    
    					ch.stateMu.Lock()
    					ch.state = ppb.Change_COMMITTED
    					ch.stateMu.Unlock()
    
    					stateOut <- state
    				case ppb.Change_CONFIRMED:
    
    					state := ch.State()
    					if state != ppb.Change_COMMITTED {
    						errChan <- fmt.Errorf("cannot confirm uncommitted change %v", ch.cuid)
    						continue
    					}
    					// The change has been confirmed and the timer is stopped,
    					// since a rollback is not necessary anymore.
    					ch.stateMu.Lock()
    					ch.state = ppb.Change_CONFIRMED
    					ch.stateMu.Unlock()
    
    					stateOut <- state
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    				}
    
    			case <-ctx.Done():
    				ticker.Stop()
    				break running
    
    		log.Info("statemanager routine done for: ", ch.cuid)
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    	}()
    
    	return stateIn, stateOut, errChan
    
    Manuel Kieweg's avatar
    Manuel Kieweg committed
    }