-
Malte Bauch authored
See merge request !173
Malte Bauch authoredSee merge request !173
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
kms.go 20.16 KiB
// This package kms implements a simplistic key management system (kms) for
// Quantum Key Distribution Networks (QKDN) which is a simple emulated KMS. x
// It relies on the emulated quantum link out of the quantumlayer package
package kms
import (
"context"
"encoding/base64"
"fmt"
"io"
"net"
"sync"
"time"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/metadata"
pbIC "code.fbi.h-da.de/danet/quant/goKMS/api/gen/proto/go/kmsintercom"
"code.fbi.h-da.de/danet/quant/goKMS/config"
akmsClient "code.fbi.h-da.de/danet/quant/goKMS/kms/akms/client"
akmsServer "code.fbi.h-da.de/danet/quant/goKMS/kms/akms/server"
"code.fbi.h-da.de/danet/quant/goKMS/kms/crypto"
etsi14Server "code.fbi.h-da.de/danet/quant/goKMS/kms/etsi/etsi14/server"
"code.fbi.h-da.de/danet/quant/goKMS/kms/event"
"code.fbi.h-da.de/danet/quant/goKMS/kms/peers"
"code.fbi.h-da.de/danet/quant/goKMS/kms/receiver"
"code.fbi.h-da.de/danet/quant/goKMS/kms/store"
kmstls "code.fbi.h-da.de/danet/quant/goKMS/kms/tls"
"code.fbi.h-da.de/danet/quant/goKMS/kms/util"
pbQS "code.fbi.h-da.de/danet/quipsec/gen/go/quipsec"
"github.com/google/uuid"
)
type Route struct {
PathId uuid.UUID
Previous *peers.KmsPeer
Next *peers.KmsPeer
RemoteKMS *util.RemoteKMS
}
type BitKeyLength string
const (
BitKeyLen128 BitKeyLength = "128"
BitKeyLen256 BitKeyLength = "256"
BitKeyLen512 BitKeyLength = "512"
)
type PlatformKey struct {
Id uuid.UUID
Value []byte
ProcessId string
}
// The general emulated KMS.
type KMS struct {
kmsName string
kmsUUID uuid.UUID
interComAddr string
quantumAddress string
tlsConfig config.TLSConfig
// TODO create a mapping between ids and address
remoteKMSMapping map[string]*util.RemoteKMS
remoteKMSMappingMutex sync.RWMutex
quantumModules map[string]peers.QuantumModule
quantumModulesMutex sync.RWMutex
kmsPeersMutex sync.Mutex
// NOTE: There is probably a better way to handle this
PKStore map[string]map[uuid.UUID]*PlatformKey
PKStoreMutex sync.Mutex
// TODO(maba): find a better name for this
routingTable map[uuid.UUID]*Route
routingTableMutex sync.RWMutex
KmsPeers map[string]*peers.KmsPeer
pbIC.UnimplementedKmsTalkerServer
supportedKeyLengths map[BitKeyLength]bool
eventBus *event.EventBus
receiver *receiver.Receiver
// Akms things
ckmsAkmsClient *akmsClient.CkmsAkmsClient
ckmsAkmsServer *akmsServer.AKMSReceiverServer
// ETSI14 Server things
etsi14Server *etsi14Server.ETSI14RESTService
keyStoreChannel chan []crypto.KSAKey
}
// Will keep information about the quantum elements that this EKMS is talking to
// This actually constitutes a quantum element with only a single link
/*
type QuantumElementInterface interface {
GetQlID() qlElementId
}*/
func NewKMS(kmsUUID uuid.UUID, logOutput io.Writer, logLevel log.Level, logInJson bool, config *config.Config, receiver *receiver.Receiver) (newKMS *KMS) {
/*
* Setup logging
*/
//What level
log.SetLevel(logLevel)
// Where to send log out put
log.SetOutput(logOutput)
// and plain-text (standard) or json
if !logInJson {
log.SetFormatter(&log.TextFormatter{})
} else {
log.SetFormatter(&log.JSONFormatter{})
}
// print code function if level is set to Trace
if logLevel == log.TraceLevel {
log.SetReportCaller(true)
} else {
log.SetReportCaller(false)
}
var ckmsAkmsClient *akmsClient.CkmsAkmsClient
if config.AkmsURL != "" {
ckmsAkmsClient = akmsClient.NewCkmsAkmsClient(config.AkmsURL)
}
createdKMS := &KMS{
kmsName: config.Name,
kmsUUID: kmsUUID,
interComAddr: config.InterComAddr,
quantumAddress: config.QuantumAddr,
tlsConfig: config.KmsTLS,
remoteKMSMapping: make(map[string]*util.RemoteKMS),
quantumModules: make(map[string]peers.QuantumModule),
routingTable: make(map[uuid.UUID]*Route),
PKStore: make(map[string]map[uuid.UUID]*PlatformKey),
KmsPeers: make(map[string]*peers.KmsPeer),
supportedKeyLengths: make(map[BitKeyLength]bool),
eventBus: event.NewEventBus(),
ckmsAkmsClient: ckmsAkmsClient,
receiver: receiver,
}
createdKMS.supportedKeyLengths[BitKeyLen256] = true
// start the inter communication gRPC server
go createdKMS.startGRPC()
// initialize from config
err := createdKMS.initializePeers(config)
if err != nil {
log.Fatalf("Failed to initialize peers: %s", err)
}
// Start the akmsCkmsReceiverServer
if config.AkmsCkmsServerPort != "" {
createdKMS.ckmsAkmsServer = akmsServer.NewAKMSReceiver(config.AkmsCkmsServerPort, createdKMS.eventBus, receiver, createdKMS.GenerateAndSendKSAKey)
log.Infof("Starting AKMS receiver server on port: %s", config.AkmsCkmsServerPort)
go createdKMS.ckmsAkmsServer.Serve()
}
// Setup ETSI14 Server if info is in config
if config.ETSI14Server != nil {
remoteCKMSID, err := uuid.Parse(config.ETSI14Server.RemoteCKMSID)
if err != nil {
log.Error(err)
}
createdKMS.etsi14Server = etsi14Server.NewETSI14RESTService(config.ETSI14Server.Address, remoteCKMSID, createdKMS.exchangeKeyAfterETSI14GetKeyRequest)
go createdKMS.etsi14Server.Run()
createdKMS.keyStoreChannel = createdKMS.etsi14Server.KeyStoreChannel
} else {
log.Info("No ETSI14 server running.")
}
return createdKMS
}
func (kms *KMS) initializePeers(config *config.Config) error {
var qm peers.QuantumModule
var err error
for _, peer := range config.Peers {
err = resolveHostnameToIPForQuantumModule(&peer)
if err != nil {
log.Error(err)
continue
}
pqm := peer.QuantumModule
switch qmt := peer.QuantumModule.QmType; qmt {
case "emulated":
qm = peers.NewDanetQuantumModule(pqm.Address, config.Id)
case "etsi":
qm, err = peers.NewETSI014HTTPQuantumModule(pqm.Address, config.Id, pqm.LocalSAEID, pqm.TargetSAEID, config.QuantumModuleTLS, pqm.MasterMode)
if err != nil {
log.Fatalf("Failed to create ETSI QKD module: %s", err)
return nil
}
default:
log.Fatalf("Unknown type: %s for quantum module", qmt)
return nil
}
err := kms.AddQuantumElement(qm)
if err != nil {
log.Fatalf("Failed to add quantum element: %s", err)
return nil
}
gRPCTransportCreds, err := kmstls.GenerateGRPCClientTransportCredsBasedOnTLSFlag(kms.tlsConfig)
if err != nil {
return fmt.Errorf("unable to generate gRPC transport creds: %w", err)
}
newPeerConn, err := grpc.NewClient(peer.PeerInterComAddr, grpc.WithTransportCredentials(gRPCTransportCreds))
if err != nil {
return nil
}
client := &peers.GRPCClient{}
switch pt := peer.Type; pt {
case "danet":
client.KmsTalkerClient = pbIC.NewKmsTalkerClient(newPeerConn)
}
_, err = kms.AddPeer(peer.PeerId, peer.PeerInterComAddr, qm, client)
if err != nil {
log.Fatalf("Failed to create a peer: %s", err)
return nil
}
}
return nil
}
func (kms *KMS) startGRPC() {
interKMSLis, err := net.Listen("tcp", kms.interComAddr)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
gRPCTransportCreds, err := kmstls.GenerateGRPCServerTransportCredsBasedOnTLSFlag(kms.tlsConfig)
if err != nil {
log.Fatalf("unable to generate gRPC server: %v", err)
}
interKMSServer := grpc.NewServer(grpc.Creds(gRPCTransportCreds))
healthCheck := health.NewServer()
healthpb.RegisterHealthServer(interKMSServer, healthCheck)
pbIC.RegisterKmsTalkerServer(interKMSServer, &kmsTalkerServer{
keyNegotiationMap: make(map[uuid.UUID]*store.KmsKSElement),
kms: kms,
})
if kms.quantumAddress != "" {
quantumLis, err := net.Listen("tcp", kms.quantumAddress)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
quantumServ := grpc.NewServer()
pbQS.RegisterKmsQkdmCommunicationServiceServer(quantumServ, &quipSecServer{
KMS: kms,
})
log.Infof("quantum server listening at %v", quantumLis.Addr())
go quantumServ.Serve(quantumLis) //nolint:errcheck
}
go func() {
// set status to serving
healthCheck.SetServingStatus(pbIC.KmsTalker_ServiceDesc.ServiceName, healthpb.HealthCheckResponse_SERVING)
// TODO: add logic for adjusting health status based on operating status of
// the services
// for{}
}()
log.Infof("inter KMS server listening at %v", interKMSLis.Addr())
if err := interKMSServer.Serve(interKMSLis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
func (kms *KMS) AddQuantumElement(qm peers.QuantumModule) error {
kms.quantumModulesMutex.Lock()
defer kms.quantumModulesMutex.Unlock()
log.Infof("quantum module address: %s ", qm.Address())
kms.quantumModules[qm.Address()] = qm
return nil
}
func (kms *KMS) AddPeer(peerKmsId string, kmsPeerSocket string, servingQLE peers.QuantumModule, client *peers.GRPCClient) (*peers.KmsPeer, error) {
// check if peer exists
if _, there := kms.KmsPeers[peerKmsId]; there {
log.Errorf("Trying to add existing peer %s, with KMS ID %s", kmsPeerSocket, peerKmsId)
return nil, fmt.Errorf("trying to add existing peer %s, with KMS ID %s", kmsPeerSocket, peerKmsId)
}
peer, err := peers.NewKmsPeer(peerKmsId, servingQLE, kmsPeerSocket, client, kms.eventBus)
if err != nil {
return nil, err
}
peer.TcpSocketStr = kmsPeerSocket
kms.kmsPeersMutex.Lock()
defer kms.kmsPeersMutex.Unlock()
kms.KmsPeers[peerKmsId] = peer
// go peer.PeerHandler(kms.kmsName)
return peer, nil
}
func (kms *KMS) AssignForwardingRoute(pId, pHop, nHop string, remoteKMS *util.RemoteKMS) error {
pathId, err := uuid.Parse(pId)
if err != nil {
return fmt.Errorf("the given path id %s is no uuid; err = %w", pathId, err)
}
var previousHop *peers.KmsPeer
var nextHop *peers.KmsPeer
var ok bool
if pHop != "" {
previousHop, ok = kms.KmsPeers[pHop]
if !ok {
return fmt.Errorf("no peer found for %s", pHop)
}
}
if nHop != "" {
nextHop, ok = kms.KmsPeers[nHop]
if !ok {
return fmt.Errorf("no peer found for %s", nHop)
}
}
tmpRoute := &Route{
PathId: pathId,
Previous: previousHop,
Next: nextHop,
RemoteKMS: remoteKMS,
}
kms.routingTableMutex.Lock()
// set the route within routing table
kms.routingTable[pathId] = tmpRoute
kms.routingTableMutex.Unlock()
if tmpRoute.RemoteKMS != nil {
kms.remoteKMSMappingMutex.Lock()
if _, ok := kms.remoteKMSMapping[tmpRoute.RemoteKMS.Id]; !ok {
kms.remoteKMSMapping[tmpRoute.RemoteKMS.Id] = tmpRoute.RemoteKMS
}
kms.remoteKMSMappingMutex.Unlock()
}
if tmpRoute.Previous == nil && tmpRoute.Next != nil && tmpRoute.RemoteKMS != nil {
// generate pk key
pk, err := crypto.Random256BitKey()
if err != nil {
return err
}
// generate process id
processId := uuid.New()
// update PKStore
kms.AddSpecificPlatformKey(tmpRoute.RemoteKMS.Id, pathId, processId, pk)
err = tmpRoute.Next.SendInitialPayloadBasedOnGRPCClient(pk, tmpRoute.PathId, processId, kms.kmsUUID.String(), remoteKMS)
if err != nil {
log.Error(err)
return err
}
}
return nil
}
func (kms *KMS) AddSpecificPlatformKey(remoteKMSId string, pathId uuid.UUID, processId uuid.UUID, key *crypto.Key) {
kms.PKStoreMutex.Lock()
defer kms.PKStoreMutex.Unlock()
keys, ok := kms.PKStore[remoteKMSId]
if !ok {
kms.PKStore[remoteKMSId] = map[uuid.UUID]*PlatformKey{
pathId: {
Id: key.ID,
Value: key.Key,
ProcessId: processId.String(),
},
}
} else {
keys[pathId] = &PlatformKey{
Id: key.ID,
Value: key.Key,
ProcessId: processId.String(),
}
}
log.Debug("Current store of platform keys: ", kms.PKStore)
}
func (kms *KMS) GetSpecificPlatformKey(remoteKMSId string, keyId uuid.UUID) (*PlatformKey, error) {
kms.PKStoreMutex.Lock()
defer kms.PKStoreMutex.Unlock()
keyIDs, ok := kms.PKStore[remoteKMSId]
if !ok {
return nil, fmt.Errorf("no entry for given KMS id: %s", remoteKMSId)
}
pk, ok := keyIDs[keyId]
if !ok {
return nil, fmt.Errorf("no key ready to use for given KMS id: %s", remoteKMSId)
}
delete(keyIDs, keyId)
return pk, nil
}
func (kms *KMS) GetRemoteKMS(remoteKMSId string) (*util.RemoteKMS, error) {
kms.remoteKMSMappingMutex.RLock()
defer kms.remoteKMSMappingMutex.RUnlock()
remoteKMS, ok := kms.remoteKMSMapping[remoteKMSId]
if !ok {
return nil, fmt.Errorf("address for remoteKMS with id %s not found", remoteKMSId)
}
return remoteKMS, nil
}
// NOTE: address/remoteid still have to decide.
func (kms *KMS) GenerateAndSendKSAKey(remoteKMSId string, pathId uuid.UUID, requestID string, number int) error {
if number < 1 {
log.Errorf("number must be positive and at least 1, provided: %d\n", number)
return fmt.Errorf("number must be positive and at least 1, provided: %d", number)
}
log.Infof("KMS store: %v", kms.remoteKMSMapping)
remoteKMS, err := kms.GetRemoteKMS(remoteKMSId)
if err != nil {
log.Error(err)
return err
}
platformKey, err := kms.GetSpecificPlatformKey(remoteKMSId, pathId)
if err != nil {
log.Error(err)
return err
}
ksaKeys := make([]*pbIC.Key, number)
akmsKSAKeys := make([]crypto.KSAKey, number)
cryptoAlgo := crypto.NewAES()
for i := 0; i < number; i++ {
ksaKey, akmsKSAKey, err := generateNewKSAKey(cryptoAlgo, platformKey.Value)
if err != nil {
log.Error(err)
return err
}
ksaKeys[i] = ksaKey
akmsKSAKeys[i] = *akmsKSAKey
}
remoteKMSAdrress := fmt.Sprintf("%s:%d", remoteKMS.Address, remoteKMS.Port)
err = kms.sendKSAKeysToPlatformKmsPeer(remoteKMSAdrress, platformKey.Id.String(), requestID, ksaKeys)
if err != nil {
log.Error(err)
return err
}
// Use the real processID when we know what it is
err = kms.ckmsAkmsClient.SendKSAKeysToRequestingInstances(requestID, platformKey.ProcessId, akmsKSAKeys)
if err != nil {
log.Error(err)
return err
}
return nil
}
func (kms *KMS) EventBus() *event.EventBus {
return kms.eventBus
}
// TODO/XXX error handling.
func (kms *KMS) RemovePeer(kmsPeerSocket string) {
if _, there := kms.KmsPeers[kmsPeerSocket]; there {
// peer.quit <- true
delete(kms.KmsPeers, kmsPeerSocket)
return
}
log.Errorf("%s: Can not find a peer with socket: %s", kms.kmsName, kmsPeerSocket)
}
func (kms *KMS) FindPeerUuid(lookup uuid.UUID) (peer *peers.KmsPeer) {
if kms.KmsPeers != nil {
for _, peer = range kms.KmsPeers {
if peer.GetKmsPeerId() == lookup {
return peer
}
}
}
return nil
}
func (kms *KMS) RoutingTableDeepCopy() map[uuid.UUID]*Route {
routingTableCopy := make(map[uuid.UUID]*Route, len(kms.KmsPeers))
kms.routingTableMutex.Lock()
defer kms.routingTableMutex.Unlock()
for k, v := range kms.routingTable {
routingTableCopy[k] = v
}
return routingTableCopy
}
func (kms *KMS) PeersDeepCopy() map[string]*peers.KmsPeer {
peersCopy := make(map[string]*peers.KmsPeer, len(kms.KmsPeers))
kms.kmsPeersMutex.Lock()
defer kms.kmsPeersMutex.Unlock()
for k, v := range kms.KmsPeers {
peersCopy[k] = v
}
return peersCopy
}
func resolveHostnameToIPForQuantumModule(peer *config.Peer) error {
const connectionRetries = 60
var ipAddr []net.IP
var err error
// If the address is not set, try to resolve the hostname.
if peer.QuantumModule.Address == "" && peer.QuantumModule.Hostname != "" {
log.Info("Trying to get IP from hostname for quantum module: ", peer.QuantumModule.Hostname)
for j := 0; j < connectionRetries; j++ {
ipAddr, err = net.LookupIP(peer.QuantumModule.Hostname)
if err == nil {
break
}
log.Errorf("Failed to get IP from hostname %s, retrying in 2 seconds (attempt %d/%d)", peer.QuantumModule.Hostname, j+1, connectionRetries)
time.Sleep(2 * time.Second)
}
if err != nil {
return fmt.Errorf("IP address not set and failed to resolve hostname for two minutes for quantum module: %s. Error: %s", peer.QuantumModule.Hostname, err.Error())
}
// Just use the first valid IP for now.
ipAdrrString := ipAddr[0].String()
log.Infof("Resolved hostname to IP address for quantum module. Hostname: %s, IP: %s", peer.QuantumModule.Hostname, ipAdrrString)
peer.QuantumModule.Address = ipAdrrString
} else if peer.QuantumModule.Address == "" && peer.QuantumModule.Hostname == "" {
return fmt.Errorf("IP address and hostname not set for quantum module. Erros may occur and the module might not work properly.")
}
return nil
}
func generateNewKSAKey(cryptoAlgo crypto.CryptoAlgorithm, platformKeyValue []byte) (*pbIC.Key, *crypto.KSAKey, error) {
// generate ksa key
ksaKeyId := uuid.New()
ksaKey, err := crypto.Random256BitKey()
if err != nil {
log.Error(err)
return nil, nil, err
}
// encrypt the key
nonce, encryptedKSAKey, err := cryptoAlgo.Encrypt(ksaKey.Key, platformKeyValue)
if err != nil {
log.Error(err)
return nil, nil, err
}
ksaKeyAsString := base64.StdEncoding.EncodeToString(ksaKey.Key)
encryptedKSAKeyAsString := base64.StdEncoding.EncodeToString(encryptedKSAKey)
nonceAsString := base64.StdEncoding.EncodeToString(nonce)
ksaKeyToSend := &pbIC.Key{
Id: ksaKeyId.String(),
Nonce: nonceAsString,
Key: encryptedKSAKeyAsString,
}
akmsKSAKey := &crypto.KSAKey{
KeyID: ksaKeyId.String(),
Key: ksaKeyAsString,
}
return ksaKeyToSend, akmsKSAKey, nil
}
func (kms *KMS) sendKSAKeysToPlatformKmsPeer(kmsPeerAddress, platformKeyID, requestID string, ksaKeys []*pbIC.Key) error {
gRPCTransportCreds, err := kmstls.GenerateGRPCClientTransportCredsBasedOnTLSFlag(kms.tlsConfig)
if err != nil {
return fmt.Errorf("unable to generate gRPC transport creds: %w", err)
}
remoteConn, err := grpc.NewClient(kmsPeerAddress, grpc.WithTransportCredentials(gRPCTransportCreds))
if err != nil {
log.Error(err)
return err
}
remoteClient := pbIC.NewKmsTalkerClient(remoteConn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// create a new context with some metadata
md := metadata.Pairs("hostname", kms.kmsName)
ctx = metadata.NewOutgoingContext(ctx, md)
defer cancel()
_, err = remoteClient.KeyDelivery(ctx, &pbIC.KeyDeliveryRequest{
KeyId: platformKeyID,
RequestId: requestID,
KmsId: kms.kmsUUID.String(),
Keys: ksaKeys,
})
if err != nil {
log.Error(err)
return err
}
return nil
}
func (kms *KMS) exchangeKeyAfterETSI14GetKeyRequest(receivingCKMSID uuid.UUID, number int64, requestID string) ([]crypto.KSAKey, error) {
// CreateRouteRequest
pathID := uuid.New()
log.Debugf("(ETSI14) created new path id: %s, for incoming ETSI14 key request", pathID)
receiverChan, err := kms.receiver.RequestReceiverChannel(pathID)
if err != nil {
log.Error(err)
return nil, err
}
err = kms.eventBus.Publish(event.NewCreateRouteEvent(pathID.String(), receivingCKMSID.String()))
if err != nil {
log.Errorf("(ETSI14) Failed sending a create route request: %s", err)
return nil, err
}
select {
case <-receiverChan:
case <-time.After(20 * time.Second):
if err := kms.receiver.RemoveReceiver(pathID); err != nil {
log.Errorf("Failed removing receiver for pathId: %s ; err: %v", pathID, err)
}
log.Errorf("(ETSI14) Platform Key exchange failed for PathID: %s", pathID)
return nil, fmt.Errorf("Platform Key exchange failed for PathID: %s", pathID)
}
// GenerateAndSend
return kms.generateAndReturnKsaKey(receivingCKMSID, pathID, number, requestID)
}
func (kms *KMS) generateAndReturnKsaKey(receivingCKMSID, pathID uuid.UUID, number int64, requestID string) ([]crypto.KSAKey, error) {
if number < 1 {
log.Errorf("number must be positive and at least 1, provided: %d\n", number)
return nil, fmt.Errorf("number must be positive and at least 1, provided: %d", number)
}
log.Infof("KMS store: %v", kms.remoteKMSMapping)
remoteKMS, err := kms.GetRemoteKMS(receivingCKMSID.String())
if err != nil {
log.Error(err)
return nil, err
}
platformKey, err := kms.GetSpecificPlatformKey(receivingCKMSID.String(), pathID)
if err != nil {
log.Error(err)
return nil, err
}
ksaKeysToSendToRemoteKMS := make([]*pbIC.Key, number)
ksaKeysToReturn := make([]crypto.KSAKey, number)
cryptoAlgo := crypto.NewAES()
for i := int64(0); i < number; i++ {
remoteKSAKey, localKSAKey, err := generateNewKSAKey(cryptoAlgo, platformKey.Value)
if err != nil {
log.Error(err)
return nil, err
}
ksaKeysToReturn[i] = *localKSAKey
ksaKeysToSendToRemoteKMS[i] = remoteKSAKey
}
remoteKMSAdrress := fmt.Sprintf("%s:%d", remoteKMS.Address, remoteKMS.Port)
err = kms.sendKSAKeysToPlatformKmsPeer(remoteKMSAdrress, platformKey.Id.String(), requestID, ksaKeysToSendToRemoteKMS)
if err != nil {
log.Error(err)
return nil, err
}
return ksaKeysToReturn, nil
}
func (kms *KMS) GetID() uuid.UUID {
return kms.kmsUUID
}