Newer
Older
// This package kms implements a simplistic key management system (kms) for
// Quantum Key Distribution Networks (QKDN) which is a simple emulated KMS. x
// It relies on the emulated quantum link out of the quantumlayer package
package kms
import (
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/metadata"
pbIC "code.fbi.h-da.de/danet/quant/goKMS/api/gen/proto/go/kmsintercom"
"code.fbi.h-da.de/danet/quant/goKMS/config"
"code.fbi.h-da.de/danet/quant/goKMS/kms/akms/client"
"code.fbi.h-da.de/danet/quant/goKMS/kms/akms/server"
"code.fbi.h-da.de/danet/quant/goKMS/kms/crypto"
"code.fbi.h-da.de/danet/quant/goKMS/kms/event"
"code.fbi.h-da.de/danet/quant/goKMS/kms/peers"
"code.fbi.h-da.de/danet/quant/goKMS/kms/store"
kmstls "code.fbi.h-da.de/danet/quant/goKMS/kms/tls"
"code.fbi.h-da.de/danet/quant/goKMS/kms/util"
pbQS "code.fbi.h-da.de/danet/quipsec/gen/go/quipsec"
"github.com/google/uuid"
)
PathId uuid.UUID
Previous *peers.Peer
Next *peers.Peer
RemoteKMS *RemoteKMS
}
type RemoteKMS struct {
Id string
Address string
type BitKeyLength string
const (
BitKeyLen128 BitKeyLength = "128"
BitKeyLen256 BitKeyLength = "256"
BitKeyLen512 BitKeyLength = "512"
)
type PlatformKey struct {
Id uuid.UUID
Value []byte
ProcessId string
}
kmsName string
kmsUUID uuid.UUID
interComAddr string
quantumAddress string
tlsConfig config.TLSConfig
// TODO create a mapping between ids and address
remoteKMSMapping map[string]*RemoteKMS
remoteKMSMappingMutex sync.RWMutex
quantumModules map[string]peers.QuantumModule
quantumModulesMutex sync.RWMutex
kmsPeersMutex sync.Mutex
// NOTE: There is probably a better way to handle this
PKStore map[string]map[uuid.UUID]*PlatformKey
// TODO(maba): find a better name for this
routingTable map[uuid.UUID]*Route
routingTableMutex sync.RWMutex
pbIC.UnimplementedKmsTalkerServer
supportedKeyLengths map[BitKeyLength]bool
eventBus *event.EventBus
CKMSAkmsClient client.CkmsAkmsClient
CKMSAkmsServer *server.AKMSReceiverServer
// Will keep information about the quantum elements that this EKMS is talking to
// This actually constitutes a quantum element with only a single link
/*
type QuantumElementInterface interface {
GetQlID() qlElementId
}*/
func NewKMS(kmsUUID uuid.UUID, logOutput io.Writer, logLevel log.Level, logInJson bool, config *config.Config) (newKMS *KMS) {
/*
* Setup logging
*/
//What level
log.SetLevel(logLevel)
// Where to send log out put
log.SetOutput(logOutput)
// and plain-text (standard) or json
if !logInJson {
log.SetFormatter(&log.TextFormatter{})
} else {
log.SetFormatter(&log.JSONFormatter{})
}
// print code function if level is set to Trace
if logLevel == log.TraceLevel {
log.SetReportCaller(true)
} else {
log.SetReportCaller(false)
}
ckmsAkmsClient := client.NewCkmsAkmsClient(config.AkmsURL)
kmsUUID: kmsUUID,
interComAddr: config.InterComAddr,
quantumAddress: config.QuantumAddr,
tlsConfig: config.KmsTLS,
remoteKMSMapping: make(map[string]*RemoteKMS),
quantumModules: make(map[string]peers.QuantumModule),
routingTable: make(map[uuid.UUID]*Route),
PKStore: make(map[string]map[uuid.UUID]*PlatformKey),
KmsPeers: make(map[string]*peers.Peer),
supportedKeyLengths: make(map[BitKeyLength]bool),
eventBus: event.NewEventBus(),
createdKMS.supportedKeyLengths[BitKeyLen256] = true
// start the inter communication gRPC server
go createdKMS.startGRPC()
err := createdKMS.initializePeers(config)
if err != nil {
log.Fatalf("Failed to initialize peers: %s", err)
}
// Start the akmsCkmsReceiverServer
if config.AkmsCkmsServerPort != "" {
createdKMS.CKMSAkmsServer = server.NewAKMSReceiver(config.AkmsCkmsServerPort, createdKMS.eventBus, createdKMS.GenerateAndSendKSAKey)
log.Infof("Starting AKMS receiver server on port: %s", config.AkmsCkmsServerPort)
go createdKMS.CKMSAkmsServer.Serve()
}
func (kms *KMS) initializePeers(config *config.Config) error {
var qm peers.QuantumModule
var err error
err = resolveHostnameToIPForQuantumModule(&peer)
if err != nil {
log.Error(err)
continue
}
pqm := peer.QuantumModule
switch qmt := peer.QuantumModule.QmType; qmt {
case "emulated":
qm = peers.NewEmulatedQuantumModule(pqm.Address, config.Id)
case "etsi":
qm, err = peers.NewETSI014HTTPQuantumModule(pqm.Address, config.Id, pqm.SlaveSAEID, pqm.MasterSAEID, config.QuantumModuleTLS, pqm.MasterMode)
if err != nil {
log.Fatalf("Failed to create ETSI QKD module: %s", err)
return nil
}
default:
log.Fatalf("Unknown type: %s for quantum module", qmt)
return nil
}
if err != nil {
log.Fatalf("Failed to add quantum element: %s", err)
return nil
}
gRPCTransportCreds, err := kmstls.GenerateGRPCClientTransportCredsBasedOnTLSFlag(kms.tlsConfig)
if err != nil {
return fmt.Errorf("unable to generate gRPC transport creds: %w", err)
}
newPeerConn, err := grpc.Dial(peer.PeerInterComAddr, grpc.WithTransportCredentials(gRPCTransportCreds))
if err != nil {
return nil
}
client := &peers.GRPCClient{}
switch pt := peer.Type; pt {
case "danet":
client.KmsTalkerClient = pbIC.NewKmsTalkerClient(newPeerConn)
}
p, err := kms.AddPeer(peer.PeerId, peer.PeerInterComAddr, qm, client)
if err != nil {
log.Fatalf("Failed to create a peer: %s", err)
return nil
}
// TODO: check again; we might want to use this based on the quantum
// module in use.
if peer.Sync {
go func() {
time.Sleep(time.Second * 32)
if err := p.SyncBulkKeys(); err != nil {
log.Info("SYNC ERROR: ", err)
} else {
log.Info("SYNCED successfully!")
}
}()
}
}
return nil
}
func (kms *KMS) startGRPC() {
interKMSLis, err := net.Listen("tcp", kms.interComAddr)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
gRPCTransportCreds, err := kmstls.GenerateGRPCServerTransportCredsBasedOnTLSFlag(kms.tlsConfig)
if err != nil {
log.Fatalf("unable to generate gRPC server: %v", err)
}
interKMSServer := grpc.NewServer(grpc.Creds(gRPCTransportCreds))
healthCheck := health.NewServer()
healthpb.RegisterHealthServer(interKMSServer, healthCheck)
pbIC.RegisterKmsTalkerServer(interKMSServer, &kmsTalkerServer{
keyNegotiationMap: make(map[uuid.UUID]*store.KmsKSElement),
if kms.quantumAddress != "" {
quantumLis, err := net.Listen("tcp", kms.quantumAddress)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
quantumServ := grpc.NewServer()
pbQS.RegisterKmsQkdmCommunicationServiceServer(quantumServ, &quipSecServer{
KMS: kms,
})
log.Infof("quantum server listening at %v", quantumLis.Addr())
go quantumServ.Serve(quantumLis) //nolint:errcheck
go func() {
// set status to serving
healthCheck.SetServingStatus(pbIC.KmsTalker_ServiceDesc.ServiceName, healthpb.HealthCheckResponse_SERVING)
// TODO: add logic for adjusting health status based on operating status of
// the services
// for{}
}()
log.Infof("inter KMS server listening at %v", interKMSLis.Addr())
if err := interKMSServer.Serve(interKMSLis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
func (kms *KMS) AddQuantumElement(qm peers.QuantumModule) error {
kms.quantumModulesMutex.Lock()
defer kms.quantumModulesMutex.Unlock()
log.Infof("quantum module address: %s ", qm.Address())
kms.quantumModules[qm.Address()] = qm
func (kms *KMS) AddPeer(peerKmsId string, kmsPeerSocket string, servingQLE peers.QuantumModule, client *peers.GRPCClient) (*peers.Peer, error) {
if _, there := kms.KmsPeers[peerKmsId]; there {
log.Errorf("Trying to add existing peer %s, with KMS ID %s", kmsPeerSocket, peerKmsId)
return nil, fmt.Errorf("trying to add existing peer %s, with KMS ID %s", kmsPeerSocket, peerKmsId)
peer, err := peers.NewKmsPeer(peerKmsId, servingQLE, kmsPeerSocket, kms.interComAddr, client, kms.eventBus)
kms.kmsPeersMutex.Lock()
defer kms.kmsPeersMutex.Unlock()
kms.KmsPeers[peerKmsId] = peer
func (kms *KMS) AssignForwardingRoute(pId, pHop, nHop string, remoteKMS *RemoteKMS) error {
pathId, err := uuid.Parse(pId)
if err != nil {
return fmt.Errorf("the given path id %s is no uuid; err = %w", pathId, err)
var previousHop *peers.Peer
var nextHop *peers.Peer
var ok bool
if pHop != "" {
previousHop, ok = kms.KmsPeers[pHop]
if !ok {
return fmt.Errorf("no peer found for %s", pHop)
}
}
if nHop != "" {
nextHop, ok = kms.KmsPeers[nHop]
return fmt.Errorf("no peer found for %s", nHop)
PathId: pathId,
Previous: previousHop,
Next: nextHop,
RemoteKMS: remoteKMS,
kms.routingTableMutex.Lock()
// set the route within routing table
kms.routingTableMutex.Unlock()
if tmpRoute.RemoteKMS != nil {
kms.remoteKMSMappingMutex.Lock()
if _, ok := kms.remoteKMSMapping[tmpRoute.RemoteKMS.Id]; !ok {
kms.remoteKMSMapping[tmpRoute.RemoteKMS.Id] = tmpRoute.RemoteKMS
}
kms.remoteKMSMappingMutex.Unlock()
}
if tmpRoute.Previous == nil && tmpRoute.Next != nil && tmpRoute.RemoteKMS != nil {
// generate pk key
pk, err := crypto.Random256BitKey()
if err != nil {
return err
}
// generate process id
processId := uuid.New()
// update PKStore
kms.PKStoreMutex.Lock()
keys, ok := kms.PKStore[tmpRoute.RemoteKMS.Id]
kms.PKStore[tmpRoute.RemoteKMS.Id] = map[uuid.UUID]*PlatformKey{
Id: pk.ID,
Value: pk.Key,
ProcessId: processId.String(),
},
keys[pathId] = &PlatformKey{
Id: pk.ID,
Value: pk.Key,
ProcessId: processId.String(),
}
log.Debug("Current PKSTORE: ", kms.PKStore)
err = tmpRoute.Next.SendInitialPayloadBasedOnGRPCClient(pk, tmpRoute.PathId, processId, kms.kmsUUID.String(), remoteKMS.Address)
if err != nil {
log.Error(err)
return err
}
}
func (kms *KMS) GetSpecificPlatformKey(remoteKMSId string, keyId uuid.UUID) (*PlatformKey, error) {
kms.PKStoreMutex.Lock()
defer kms.PKStoreMutex.Unlock()
keyIDs, ok := kms.PKStore[remoteKMSId]
if !ok {
return nil, fmt.Errorf("no entry for given KMS id: %s", remoteKMSId)
pk, ok := keyIDs[keyId]
if !ok {
return nil, fmt.Errorf("no key ready to use for given KMS id: %s", remoteKMSId)
}
delete(keyIDs, keyId)
return pk, nil
}
func (kms *KMS) GetRandomItemFromPKStore(remoteKMSId string) (uuid.UUID, *PlatformKey, error) {
defer kms.PKStoreMutex.Unlock()
keyIds, ok := kms.PKStore[remoteKMSId]
log.Errorf("error with he following pk store: %v", kms.PKStore)
return uuid.Nil, nil, fmt.Errorf("path not found for: %s", remoteKMSId)
return util.RandomItemFromMapAndRemove(keyIds)
}
func (kms *KMS) GetRemoteKMS(remoteKMSId string) (*RemoteKMS, error) {
kms.remoteKMSMappingMutex.RLock()
defer kms.remoteKMSMappingMutex.RUnlock()
remoteKMS, ok := kms.remoteKMSMapping[remoteKMSId]
if !ok {
return nil, fmt.Errorf("address for remoteKMS with id %s not found", remoteKMSId)
}
return remoteKMS, nil
}
// NOTE: address/remoteid still have to decide.
func (kms *KMS) GenerateAndSendKSAKey(remoteKMSId string, pathId uuid.UUID, requestID string, number int) error {
if number < 1 {
log.Errorf("number must be positive and at least 1, provided: %d\n", number)
return fmt.Errorf("number must be positive and at least 1, provided: %d", number)
}
log.Infof("KMS store: %v", kms.remoteKMSMapping)
remoteKMS, err := kms.GetRemoteKMS(remoteKMSId)
if err != nil {
log.Error(err)
return err
}
platformKey, err := kms.GetSpecificPlatformKey(remoteKMSId, pathId)
if err != nil {
log.Error(err)
return err
}
akmsKSAKeys := make([]client.KSAKey, number)
cryptoAlgo := crypto.NewAES()
for i := 0; i < number; i++ {
ksaKey, akmsKSAKey, err := generateNewKSAKey(cryptoAlgo, platformKey.Value)
if err != nil {
log.Error(err)
return err
}
ksaKeys[i] = ksaKey
akmsKSAKeys[i] = *akmsKSAKey
err = kms.sendKSAKeysToPlatformKmsPeer(remoteKMS.Address, platformKey.Id.String(), requestID, ksaKeys)
if err != nil {
log.Error(err)
return err
}
// Use the real processID when we know what it is
err = kms.CKMSAkmsClient.SendKSAKeysToRequestingInstances(requestID, platformKey.ProcessId, akmsKSAKeys)
if err != nil {
log.Error(err)
return err
}
func (kms *KMS) EventBus() *event.EventBus {
return kms.eventBus
}
func (kms *KMS) RemovePeer(kmsPeerSocket string) {
log.Errorf("%s: Can not find a peer with socket: %s", kms.kmsName, kmsPeerSocket)
func (kms *KMS) FindPeerUuid(lookup uuid.UUID) (peer *peers.Peer) {
if kms.KmsPeers != nil {
for _, peer = range kms.KmsPeers {
return peer
}
}
}
return nil
}
func (kms *KMS) RoutingTableDeepCopy() map[uuid.UUID]*Route {
routingTableCopy := make(map[uuid.UUID]*Route, len(kms.KmsPeers))
kms.routingTableMutex.Lock()
defer kms.routingTableMutex.Unlock()
for k, v := range kms.routingTable {
func (kms *KMS) PeersDeepCopy() map[string]*peers.Peer {
peersCopy := make(map[string]*peers.Peer, len(kms.KmsPeers))
kms.kmsPeersMutex.Lock()
defer kms.kmsPeersMutex.Unlock()
for k, v := range kms.KmsPeers {
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
func resolveHostnameToIPForQuantumModule(peer *config.Peer) error {
const connectionRetries = 60
var ipAddr []net.IP
var err error
// If the address is not set, try to resolve the hostname.
if peer.QuantumModule.Address == "" && peer.QuantumModule.Hostname != "" {
log.Info("Trying to get IP from hostname for quantum module: ", peer.QuantumModule.Hostname)
for j := 0; j < connectionRetries; j++ {
ipAddr, err = net.LookupIP(peer.QuantumModule.Hostname)
if err == nil {
break
}
log.Errorf("Failed to get IP from hostname %s, retrying in 2 seconds (attempt %d/%d)", peer.QuantumModule.Hostname, j+1, connectionRetries)
time.Sleep(2 * time.Second)
}
if err != nil {
return fmt.Errorf("IP address not set and failed to resolve hostname for two minutes for quantum module: %s. Error: %s", peer.QuantumModule.Hostname, err.Error())
}
// Just use the first valid IP for now.
ipAdrrString := ipAddr[0].String()
log.Infof("Resolved hostname to IP address for quantum module. Hostname: %s, IP: %s", peer.QuantumModule.Hostname, ipAdrrString)
peer.QuantumModule.Address = ipAdrrString
} else if peer.QuantumModule.Address == "" && peer.QuantumModule.Hostname == "" {
return fmt.Errorf("IP address and hostname not set for quantum module. Erros may occur and the module might not work properly.")
}
return nil
}
func generateNewKSAKey(cryptoAlgo crypto.CryptoAlgorithm, platformKeyValue []byte) (*pbIC.Key, *client.KSAKey, error) {
// generate ksa key
ksaKeyId := uuid.New()
ksaKey, err := crypto.Random256BitKey()
if err != nil {
log.Error(err)
return nil, nil, err
}
// encrypt the key
nonce, encryptedKSAKey, err := cryptoAlgo.Encrypt(ksaKey.Key, platformKeyValue)
if err != nil {
log.Error(err)
return nil, nil, err
}
ksaKeyAsString := base64.StdEncoding.EncodeToString(ksaKey.Key)
encryptedKSAKeyAsString := base64.StdEncoding.EncodeToString(encryptedKSAKey)
nonceAsString := base64.StdEncoding.EncodeToString(nonce)
ksaKeyToSend := &pbIC.Key{
Id: ksaKeyId.String(),
Nonce: nonceAsString,
Key: encryptedKSAKeyAsString,
}
akmsKSAKey := &client.KSAKey{
KeyID: ksaKeyId.String(),
Key: ksaKeyAsString,
}
return ksaKeyToSend, akmsKSAKey, nil
}
func (kms *KMS) sendKSAKeysToPlatformKmsPeer(kmsPeerAddress, platformKeyID, requestID string, ksaKeys []*pbIC.Key) error {
gRPCTransportCreds, err := kmstls.GenerateGRPCClientTransportCredsBasedOnTLSFlag(kms.tlsConfig)
if err != nil {
return fmt.Errorf("unable to generate gRPC transport creds: %w", err)
}
remoteConn, err := grpc.Dial(kmsPeerAddress, grpc.WithTransportCredentials(gRPCTransportCreds))
if err != nil {
log.Error(err)
return err
}
remoteClient := pbIC.NewKmsTalkerClient(remoteConn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// create a new context with some metadata
md := metadata.Pairs("hostname", kms.kmsName)
ctx = metadata.NewOutgoingContext(ctx, md)
defer cancel()
_, err = remoteClient.KeyDelivery(ctx, &pbIC.KeyDeliveryRequest{
KeyId: platformKeyID,
RequestId: requestID,
KmsId: kms.kmsUUID.String(),
Keys: ksaKeys,
})
if err != nil {
log.Error(err)
return err
}
return nil
}