Newer
Older
package kms
import (
"context"
"errors"
"fmt"
"sync"
"time"
etsi14 "code.fbi.h-da.de/danet/ekms/api/go/rest/etsi/client"
pbIC "code.fbi.h-da.de/danet/ekms/internal/api/gen/proto/go/kmsintercom"
"code.fbi.h-da.de/danet/ekms/internal/kms/event"
"code.fbi.h-da.de/danet/quantumlayer"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
)
// QuantumModule ...
type QuantumModule interface {
ID() uuid.UUID
Initialize() error
// NOTE: Sync will be removed as soon as the emulated quantum module has been
// changed to push a constant byte stream.
KeyStore() *kmsKeyStore
Peer() *kmsPeer
SetPeer(*kmsPeer)
}
type EmulatedQuantumModule struct {
// QuantumElementLink *quantumlayer.QuantumlayerEmuPRNG // contains information about the quantum links
// key stores of unchopped bulk keys go here
rawBulkKeysMutex sync.Mutex
rawBulkKeys map[int64]*quantumlayer.QuantumLayerBulkKey
keyStore *kmsKeyStore // the keys used between two peers.
peer *kmsPeer
}
func NewEmulatedQuantumModule(kmsUDPAddr string) *EmulatedQuantumModule {
return &EmulatedQuantumModule{
addr: kmsUDPAddr,
rawBulkKeys: make(map[int64]*quantumlayer.QuantumLayerBulkKey),
keyStore: NewKmsKeyStore(256),
peer: nil,
}
}
func (eqe *EmulatedQuantumModule) ID() uuid.UUID {
return eqe.QlID
}
func (eqe *EmulatedQuantumModule) Initialize() error {
// TODO: error handling
return nil
}
func (eqe *EmulatedQuantumModule) Address() string {
return eqe.addr
}
func (eqe *EmulatedQuantumModule) Sync() error {
rawBulkKeyIds := keysOfMap[int64](eqe.rawBulkKeys)
logrus.Info("Found the following bulk key ids for usage: ", rawBulkKeyIds)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
initialPeerSetupResponse, err := eqe.peer.peerClient.SyncQkdBulk(ctx, &pbIC.SyncQkdBulkRequest{
Timestamp: time.Now().Unix(),
InterComAddr: eqe.peer.interComAddr,
BulkId: rawBulkKeyIds,
})
if err != nil {
return err
}
bulkKey, ok := eqe.rawBulkKeys[initialPeerSetupResponse.BulkId]
if !ok {
// TODO: add proper error message
return fmt.Errorf("Could not find raw bulk key with id: %d", initialPeerSetupResponse.BulkId)
}
// TODO: Initially the peer partners should discuss about the key length,
// for now it is hardcoded.
eqe.keyStore = NewKmsKeyStore(256)
keyIds, keyData, err := eqe.KeyChopper(bulkKey, []string{})
if err != nil {
return err
}
_, err = eqe.peer.peerClient.SyncKeyIdsForBulk(ctx, &pbIC.SyncKeyIdsForBulkRequest{
Timestamp: time.Now().Unix(),
InterComAddr: eqe.peer.interComAddr,
BulkId: initialPeerSetupResponse.BulkId,
KeyId: keyIds,
})
if err != nil {
return err
}
for keyId, key := range keyData {
eqe.keyStore.addKey(keyId, key)
}
eqe.rawBulkKeysMutex.Lock()
delete(eqe.rawBulkKeys, initialPeerSetupResponse.BulkId)
eqe.rawBulkKeysMutex.Unlock()
// update the peer status to up
eqe.peer.peerStatus = KmsPeerUp
// Send notification about change
if eqe.peer.eventBus != nil {
err := eqe.peer.eventBus.Publish(event.NewPeerEvent(eqe.peer.tcpSocketStr))
if err != nil {
log.Error(err)
}
}
return nil
}
func (eqe *EmulatedQuantumModule) KeyStore() *kmsKeyStore {
return eqe.keyStore
}
func (eqe *EmulatedQuantumModule) Peer() *kmsPeer {
return eqe.peer
}
func (eqe *EmulatedQuantumModule) SetPeer(peer *kmsPeer) {
eqe.peer = peer
}
//func (eqe *EmulatedQuantumModule) keyHandler() {
// // periodically walk through quantum element and retrieve the key bulk buffer
// for {
// log.Debugf("%s: KeyHandler reading...\n", eqe.ID())
// bulkKeys, err := eqe.QuantumElementLink.GetKeyBulkPeer()
// if err != nil {
// log.Errorf("%s: failed to retrieve bulkkeys with error %s", eqe.ID(), err)
// } else {
// // Add to the slice, but not process yet.
// log.Debugf("%s: produced %d bytes of key", eqe.ID(), bulkKeys.BulkKeyLength)
// eqe.rawBulkKeysMutex.Lock()
// eqe.rawBulkKeys[bulkKeys.BulkKeyId] = &bulkKeys
// eqe.rawBulkKeysMutex.Unlock()
// }
// // TODO: hardcoded
// time.Sleep(5 * time.Second)
// }
//}
// Takes a bulk of keys and chops them in chopFactor keys each
// Any remainder is discarded
// If keyIds is empty, uuids are generated
func (eqe *EmulatedQuantumModule) KeyChopper(bulkKey *quantumlayer.QuantumLayerBulkKey, keyIds []string) ([]string, map[uuid.UUID][]byte, error) {
if eqe.keyStore.keySingleSize == 0 {
return nil, nil, errors.New("KeyChopper: no keySingleSize set")
}
// TODO check if multiple of 8 (1 Byte)
if bulkKey.BulkKeyLength != len(*bulkKey.BulkKey) {
return nil, nil, errors.New("bulkKey length mismatch")
initialKeyIdsLen := len(keyIds)
// Let's chop!
keyData := make(map[uuid.UUID][]byte)
chopFactor := eqe.keyStore.keySingleSize >> 3
key := *bulkKey.BulkKey
for len(key) > int(chopFactor) {
var keyId uuid.UUID
var err error
if initialKeyIdsLen == 0 {
keyId = uuid.New()
keyIds = append(keyIds, keyId.String())
} else {
keyId, err = uuid.Parse(keyIds[counter])
if err != nil {
return nil, nil, fmt.Errorf("The provided ID: %s can not be parsed as UUID.", keyIds[counter])
}
counter++
}
tmpkey := key[:chopFactor]
keyData[keyId] = tmpkey
// shorten the key storage
key = key[chopFactor:]
}
return keyIds, keyData, nil
keyStore *kmsKeyStore
peer *kmsPeer
client *restclient.ClientImpl
slaveSAEID string
masterSAEID string
func NewETSI014HTTPQuantumModule(addr, slaveSAEID, masterSAEID string, master bool) (*ETSI014HTTPQuantumModule, error) {
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
parsedUrl, err := url.Parse("http://" + addr)
if err != nil {
return nil, err
}
restClientConf := &etsi14.Configuration{
Debug: true,
Servers: etsi14.ServerConfigurations{
{
URL: parsedUrl.String(),
Description: "QKD Module with ETSI14 implemented as API.",
},
},
Scheme: parsedUrl.Scheme,
// HTTPClient: &http.Client{
// Transport: &http.Transport{
// TLSClientConfig: &tls.Config{},
// },
// },
}
// TODO: we might want to add mastersaeid here aswell
client, err := restclient.NewClientImpl(slaveSAEID, restClientConf)
if err != nil {
return nil, err
}
return &ETSI014HTTPQuantumModule{
id: uuid.New(),
keyStore: NewKmsKeyStore(256),
peer: nil,
client: client,
slaveSAEID: slaveSAEID,
masterSAEID: masterSAEID,
}, nil
}
func (eqe *ETSI014HTTPQuantumModule) ID() uuid.UUID {
return eqe.id
}
func (eqe *ETSI014HTTPQuantumModule) Initialize() error {
// start polling
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
if eqe.master {
go func() {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
// TODO: add context/channel to stop
for {
select {
case <-ticker.C:
container, _, err := eqe.client.GetKey()
if err != nil {
log.Error(err)
break
}
keyIds := make([]string, len(container.GetKeys()))
for i, keyItem := range container.GetKeys() {
keyIds[i] = keyItem.GetKeyID()
}
_, err = eqe.peer.peerClient.KeyIdNotification(context.Background(),
&pbIC.KeyIdNotificationRequest{
Timestamp: time.Now().Unix(),
InterComAddr: eqe.peer.interComAddr,
KeyIds: keyIds,
})
if err != nil {
log.Error(err)
break
}
if err := addETSIKeysToKeystore(eqe.keyStore, container.GetKeys()); err != nil {
log.Error(err)
break
}
func (eqe *ETSI014HTTPQuantumModule) Address() string {
return eqe.addr
}
func (eqe *ETSI014HTTPQuantumModule) KeyStore() *kmsKeyStore {
return eqe.keyStore
}
func (eqe *ETSI014HTTPQuantumModule) Peer() *kmsPeer {
return eqe.peer
}
func (eqe *ETSI014HTTPQuantumModule) SetPeer(peer *kmsPeer) {
eqe.peer = peer
}
func (eqe *ETSI014HTTPQuantumModule) Sync() error {
return nil